Line # Revision Author
1 3 ahitrov@rambler.ru package Contenido::Cluster::Storage;
2 use strict;
3
4 use Contenido::Globals;
5
6 use HTTP::Async;
7 use HTTP::Request;
8 use HTTP::Headers;
9 use LWP::UserAgent;
10 use String::CRC32;
11 use Data::Dumper;
12
13
14 sub new {
15 my $proto = shift;
16 my $class = ref($proto) || $proto;
17 my %opts = @_;
18
19 my $self = {};
20 $self->{sql} = $opts{sql} || die $log->error("You MUST pass sql object");
21 $self->{users} = $opts{users} || die $log->error("You MUST pass users dir");
22 $self->{backend} = $opts{backend} || die $log->error("You MUST pass backend host");
23 $self->{is_async} = $opts{is_async} ? 1 : 0;
24 $self->{login} = undef;
25 $self->{temp} = $opts{temp};
26
27 bless ($self, $class);
28
29 return $self;
30 }
31
32 sub start {
33 my $self = shift;
34 my %opts = @_;
35
36 $self->{is_async} = $opts{is_async} || $self->{is_async} ? 1 : 0;
37 $self->{login} = $opts{login} || die $log->error("You MUST pass user login");
38 $self->{result} = 1;
39
40 if ($self->{is_async} && !$self->{async}) {
41
42 $self->{async} = new HTTP::Async();
43 $self->{async}->slots(1000);
44 $self->{async}->timeout(900);
45 $self->{async}->max_request_time(1800);
46
47 }
48
49 }
50
51 sub stop {
52 my $self = shift;
53
54 $self->{login} = undef;
55
56 return $self->_stop_async();
57
58 }
59
60 sub login {
61 my $self = shift;
62
63 return $self->{login} || die $log->error("You MUST start cluster session before use");
64 }
65
66 sub async {
67 my $self = shift;
68
69 return $self->{async} || die $log->error("You MUST start cluster session before use");
70 }
71
72 sub cluster {
73 my $self = shift;
74
75 unless ($request->{cluster}) {
76 my $cursor = $self->{sql}->prepare("select bgroup, ip, host, status from backend where status = 0 order by id");
77 $cursor->execute();
78 while (my ($group, $ip, $host, $status) = $cursor->fetchrow_array()) {
79 push(@{$request->{cluster}->{by_group}->[$group]}, [$ip, $host, $status]);
80 }
81 $cursor->finish();
82
83 $cursor = $self->{sql}->prepare("select bgroup, hash from backend_virtual order by id");
84 $cursor->execute();
85 while (my ($group, $key) = $cursor->fetchrow_array()) {
86 my ($from, $to) = map { hex } split("-", $key);
87 $to ||= $from;
88 foreach my $hash ($from..$to) {
89 $request->{cluster}->{by_key}->[$hash] = $group;
90 push(@{$request->{cluster}->{by_cluster}->[$group]}, $hash);
91 }
92 }
93 $cursor->finish();
94
95 $cursor = $self->{sql}->prepare("select bgroup, hash from backend_relocate order by id");
96 $cursor->execute();
97 while (my ($group, $key) = $cursor->fetchrow_array()) {
98 my ($from, $to) = map { hex } split("-", $key);
99 $to ||= $from;
100 foreach my $hash ($from..$to) {
101 $request->{cluster}->{by_relocate}->[$hash] = $group;
102 }
103 }
104 $cursor->finish();
105 }
106
107 return $request->{cluster};
108
109 }
110
111 sub get_friend {
112 my $self = shift;
113 my $force = shift || 0;
114
115 my @host = ();
116
117 if ($self->cluster) {
118 my $hash = hex($self->get_full_hash);
119
120 my $group = $self->cluster->{by_group}->[$self->cluster->{by_key}->[$hash]] || return ();
121 foreach my $host (@{$group}) {
122 if (($self->{backend} ne $host->[1]) || $force) {
123 push(@host, $host->[1]);
124 }
125 }
126
127 if ($self->is_locked) {
128 my $group = $self->cluster->{by_group}->[$self->cluster->{by_relocate}->[$hash]];
129 foreach my $host (@{$group}) {
130 if (($self->{backend} ne $host->[1]) || $force) {
131 push(@host, $host->[1]);
132 }
133 }
134 }
135 }
136
137 return @host;
138 }
139
140 sub is_friend {
141 my $self = shift;
142
143 if ($self->cluster) {
144 my $group = $self->cluster->{by_group}->[$self->cluster->{by_key}->[hex($self->get_full_hash(shift))]] || return ();
145
146 foreach my $host (@{$group}) {
147 if ($self->{backend} eq $host->[1]) {
148 return 1;
149 }
150 }
151 }
152
153 return 0;
154 }
155
156 sub is_same_cluster {
157 my $self = shift;
158 my $to_login = shift || return 0;
159
160 return $self->cluster->{by_key}->[hex($self->get_full_hash)] == $self->cluster->{by_key}->[hex($self->get_full_hash($to_login))];
161
162 }
163
164 sub is_locked {
165 my $self = shift;
166
167 if ($self->cluster) {
168 return defined($self->cluster->{by_relocate}->[hex($self->get_full_hash())]) ? 1 : 0;
169 }
170
171 return 1;
172 }
173
174 sub put {
175 my ($self, $url) = @_;
176
177 die $log->error("wrong put usage") unless ($url);
178
179 $self->_store_file($url);
180 }
181
182 sub relocate {
183 my ($self, $url, $to_login, $to_url) = @_;
184
185 die $log->error("wrong relocate usage") unless ($url || $to_login || $to_url);
186
187 unless ($self->is_friend) {
188 $log->error("not home cluster");
189 return 0;
190 }
191
192 $self->_store_file($url, $to_login, $to_url);
193 }
194
195 sub relocate_dir {
196 my ($self, $url, $to_login, $to_url) = @_;
197
198 die $log->error("wrong relocate_dir usage") unless ($url && $to_login && $to_url);
199
200 unless ($self->is_friend) {
201 $log->error("not home cluster");
202 return 0;
203 }
204
205 my $dir = join('/', $$self->{users}, $self->get_hash, $url);
206 opendir(D, $dir);
207 foreach my $file (grep(!/^[\.]+$/, readdir(D))) {
208 $self->relocate(join("/", $url, $file), $to_login, join("/", $to_url, $file));
209 }
210 closedir(D);
211 }
212
213 sub move {
214 my ($self, $url, $to_login, $to_url) = @_;
215
216 die $log->error("wrong move usage") unless ($url || $to_login || $to_url);
217
218 return $self->_transfer_file(1, $url, $to_login, $to_url);
219
220 }
221
222 sub copy {
223 my ($self, $url, $to_login, $to_url) = @_;
224
225 die $log->error("wrong copy usage") unless ($url || $to_login || $to_url);
226
227 return $self->_transfer_file(0, $url, $to_login, $to_url);
228 }
229
230
231 sub delete {
232 my ($self, $url, $recursive) = @_;
233
234 die $log->error("wrong delete usage") unless ($url);
235
236 my $req_file = join('/', "protected", $self->get_hash, $url);
237
238 foreach my $host ($self->get_friend) {
239
240 my $req_url = 'http://'.join('/', $host, $req_file);
241 my $req = HTTP::Request->new(
242 DELETE => $req_url,
243 );
244 if ($recursive) {
245 $req->header('Depth' => 'infinity');
246 }
247 $self->_send_request($req, "DELETE $req_url");
248
249 }
250 }
251
252
253 sub lock {
254 my $self = shift;
255
256 return $self->_lock_request(1);
257 }
258
259
260 sub unlock {
261 my $self = shift;
262
263 return $self->_lock_request(0);
264 }
265
266
267 sub acl {
268 my ($self, $url, $is_public) = @_;
269
270 die $log->error("wrong acl usage") unless ($url);
271
272 my $req_file = join('/', "protected", $self->get_hash, $url);
273
274 foreach my $host ($self->get_friend) {
275
276 my $req_url = 'http://'.join('/', $host, $req_file);
277 my $req = HTTP::Request->new(
278 ACL => $req_url,
279 );
280 $req->header('X-Access' => $is_public ? 775 : 770 );
281
282 $self->_send_request($req, "ACL $req_url");
283
284 }
285 }
286
287 sub _store_file {
288 my ($self, $url, $to_login, $to_url) = @_;
289
290 my $file = join('/', $self->{temp} && !$self->is_friend ? $self->{temp} : $self->{users}, $self->get_hash, $url);
291
292 if (open($Contenido::Cluster::Storage::fh, $file)) {
293 if (my @host = $self->get_friend) {
294
295 my $req_file = join('/', "public", $to_login && $to_url ? ($self->get_hash($to_login), $to_url) : ($self->get_hash, $url));
296
297 if ($self->{is_async} && $self->async) {
298
299 my $content;
300 while (read($Contenido::Cluster::Storage::fh, my $buf, 1024*1024)) {
301 $content .= $buf;
302 }
303
304 foreach my $host (@host) {
305
306 my $req_url = 'http://'.join('/', $host, $req_file);
307 my $req = HTTP::Request->new(
308 PUT => $req_url,
309 );
310 $req->content_length((stat($file))[7]);
311 $req->content_ref(\$content);
312
313 $self->_send_request($req, "PUT $file to $req_url");
314
315 }
316
317 } else {
318
319 foreach my $host (@host) {
320 seek($Contenido::Cluster::Storage::fh, 0, 0);
321
322 my $req_url = 'http://'.join('/', $host, $req_file);
323 my $req = HTTP::Request->new(
324 PUT => $req_url,
325 undef,
326 sub {
327 read($Contenido::Cluster::Storage::fh, my $buf, 1024*1024);
328 return $buf;
329 }
330 );
331 $req->content_length((stat($file))[7]);
332
333 $self->_send_request($req, "PUT $file to $req_url");
334 }
335 }
336
337 }
338 close($Contenido::Cluster::Storage::fh);
339 }
340 }
341
342 sub _send_request {
343 my $self = shift;
344 my $req = shift || die $log->error("no request passed");
345 my $msg = shift || "LWP";
346
347 $log->debug( $msg." ..." );
348
349 if ($self->{is_async}) {
350
351 $self->async->add($req);
352 $self->async->poke();
353
354 } else {
355
356 my $res = new LWP::UserAgent->request($req);
357 unless ($res->is_success) {
358 $log->error( $msg." failed - ".$res->code );
359 $self->{result} = 0;
360 }
361
362 }
363 }
364
365 sub _lock_request {
366 my $self = shift;
367 my $mode = shift || 0;
368 $mode = $mode ? "LOCK" : "UNLOCK";
369
370 my $req_file = join('/', "protected", $self->login);
371
372 foreach my $host ($self->get_friend(1)) {
373
374 my $req_url = 'http://'.join('/', $host, $req_file);
375 my $req = HTTP::Request->new(
376 $mode => $req_url,
377 );
378 $req->header('X-Hash' => $self->get_full_hash);
379
380 $self->_send_request($req, "$mode $req_url");
381
382 }
383
384 return $self->_stop_async();
385 }
386
387 sub _transfer_file {
388 my $self = shift;
389 my $mode = shift || 0;
390 $mode = $mode ? "MOVE" : "COPY";
391
392 my ($url, $to_login, $to_url) = @_;
393
394 unless ($self->is_same_cluster($to_login)) {
395 $log->error("users not on same cluster");
396 return 0;
397 }
398
399 my $req_file = join('/', "protected", $self->get_hash, $url);
400 my $to_req_file = join('/', "protected", $self->get_hash($to_login), $to_url);
401
402 foreach my $host ($self->get_friend) {
403
404 my $req_url = 'http://'.join('/', $host, $req_file);
405 my $to_req_url = 'http://'.join('/', $host, $to_req_file);
406 my $req = HTTP::Request->new(
407 $mode => $req_url,
408 );
409 $req->header('Destination' => $to_req_url);
410
411 $self->_send_request($req, "$mode $req_url to $to_req_url");
412
413 }
414 }
415
416 sub _stop_async {
417 my $self = shift;
418
419 if ($self->{is_async}) {
420 while ( my $res = $self->{async}->wait_for_next_response ) {
421 if (!$res->is_success() && ($res->code ne '405')) {
422 $self->{result} = 0;
423 $log->error("async failed - ".$res->code);
424 }
425 }
426 }
427
428 return $self->{result};
429 }
430
431 sub get_hash {
432 my $self = shift;
433 my $login = shift || $self->login;
434
435 my $hash = substr(sprintf("%X", crc32($login)), -4, 4);
436 return substr($hash, -4, 2).'/'. substr($hash, -2, 2) || '00/00';
437
438 }
439
440 sub get_full_hash {
441 my $self = shift;
442 my $login = shift || $self->login;
443
444 return substr(sprintf("%X", crc32($login)), -4, 4) || '0000'
445 }
446
447
448
449
450
451
452 1;