1 |
3 |
ahitrov@rambler.ru |
package Contenido::Cluster::Storage; |
2 |
|
|
use strict; |
3 |
|
|
|
4 |
|
|
use Contenido::Globals; |
5 |
|
|
|
6 |
|
|
use HTTP::Async; |
7 |
|
|
use HTTP::Request; |
8 |
|
|
use HTTP::Headers; |
9 |
|
|
use LWP::UserAgent; |
10 |
|
|
use String::CRC32; |
11 |
|
|
use Data::Dumper; |
12 |
|
|
|
13 |
|
|
|
14 |
|
|
sub new { |
15 |
|
|
my $proto = shift; |
16 |
|
|
my $class = ref($proto) || $proto; |
17 |
|
|
my %opts = @_; |
18 |
|
|
|
19 |
|
|
my $self = {}; |
20 |
|
|
$self->{sql} = $opts{sql} || die $log->error("You MUST pass sql object"); |
21 |
|
|
$self->{users} = $opts{users} || die $log->error("You MUST pass users dir"); |
22 |
|
|
$self->{backend} = $opts{backend} || die $log->error("You MUST pass backend host"); |
23 |
|
|
$self->{is_async} = $opts{is_async} ? 1 : 0; |
24 |
|
|
$self->{login} = undef; |
25 |
|
|
$self->{temp} = $opts{temp}; |
26 |
|
|
|
27 |
|
|
bless ($self, $class); |
28 |
|
|
|
29 |
|
|
return $self; |
30 |
|
|
} |
31 |
|
|
|
32 |
|
|
sub start { |
33 |
|
|
my $self = shift; |
34 |
|
|
my %opts = @_; |
35 |
|
|
|
36 |
|
|
$self->{is_async} = $opts{is_async} || $self->{is_async} ? 1 : 0; |
37 |
|
|
$self->{login} = $opts{login} || die $log->error("You MUST pass user login"); |
38 |
|
|
$self->{result} = 1; |
39 |
|
|
|
40 |
|
|
if ($self->{is_async} && !$self->{async}) { |
41 |
|
|
|
42 |
|
|
$self->{async} = new HTTP::Async(); |
43 |
|
|
$self->{async}->slots(1000); |
44 |
|
|
$self->{async}->timeout(900); |
45 |
|
|
$self->{async}->max_request_time(1800); |
46 |
|
|
|
47 |
|
|
} |
48 |
|
|
|
49 |
|
|
} |
50 |
|
|
|
51 |
|
|
sub stop { |
52 |
|
|
my $self = shift; |
53 |
|
|
|
54 |
|
|
$self->{login} = undef; |
55 |
|
|
|
56 |
|
|
return $self->_stop_async(); |
57 |
|
|
|
58 |
|
|
} |
59 |
|
|
|
60 |
|
|
sub login { |
61 |
|
|
my $self = shift; |
62 |
|
|
|
63 |
|
|
return $self->{login} || die $log->error("You MUST start cluster session before use"); |
64 |
|
|
} |
65 |
|
|
|
66 |
|
|
sub async { |
67 |
|
|
my $self = shift; |
68 |
|
|
|
69 |
|
|
return $self->{async} || die $log->error("You MUST start cluster session before use"); |
70 |
|
|
} |
71 |
|
|
|
72 |
|
|
sub cluster { |
73 |
|
|
my $self = shift; |
74 |
|
|
|
75 |
|
|
unless ($request->{cluster}) { |
76 |
|
|
my $cursor = $self->{sql}->prepare("select bgroup, ip, host, status from backend where status = 0 order by id"); |
77 |
|
|
$cursor->execute(); |
78 |
|
|
while (my ($group, $ip, $host, $status) = $cursor->fetchrow_array()) { |
79 |
|
|
push(@{$request->{cluster}->{by_group}->[$group]}, [$ip, $host, $status]); |
80 |
|
|
} |
81 |
|
|
$cursor->finish(); |
82 |
|
|
|
83 |
|
|
$cursor = $self->{sql}->prepare("select bgroup, hash from backend_virtual order by id"); |
84 |
|
|
$cursor->execute(); |
85 |
|
|
while (my ($group, $key) = $cursor->fetchrow_array()) { |
86 |
|
|
my ($from, $to) = map { hex } split("-", $key); |
87 |
|
|
$to ||= $from; |
88 |
|
|
foreach my $hash ($from..$to) { |
89 |
|
|
$request->{cluster}->{by_key}->[$hash] = $group; |
90 |
|
|
push(@{$request->{cluster}->{by_cluster}->[$group]}, $hash); |
91 |
|
|
} |
92 |
|
|
} |
93 |
|
|
$cursor->finish(); |
94 |
|
|
|
95 |
|
|
$cursor = $self->{sql}->prepare("select bgroup, hash from backend_relocate order by id"); |
96 |
|
|
$cursor->execute(); |
97 |
|
|
while (my ($group, $key) = $cursor->fetchrow_array()) { |
98 |
|
|
my ($from, $to) = map { hex } split("-", $key); |
99 |
|
|
$to ||= $from; |
100 |
|
|
foreach my $hash ($from..$to) { |
101 |
|
|
$request->{cluster}->{by_relocate}->[$hash] = $group; |
102 |
|
|
} |
103 |
|
|
} |
104 |
|
|
$cursor->finish(); |
105 |
|
|
} |
106 |
|
|
|
107 |
|
|
return $request->{cluster}; |
108 |
|
|
|
109 |
|
|
} |
110 |
|
|
|
111 |
|
|
sub get_friend { |
112 |
|
|
my $self = shift; |
113 |
|
|
my $force = shift || 0; |
114 |
|
|
|
115 |
|
|
my @host = (); |
116 |
|
|
|
117 |
|
|
if ($self->cluster) { |
118 |
|
|
my $hash = hex($self->get_full_hash); |
119 |
|
|
|
120 |
|
|
my $group = $self->cluster->{by_group}->[$self->cluster->{by_key}->[$hash]] || return (); |
121 |
|
|
foreach my $host (@{$group}) { |
122 |
|
|
if (($self->{backend} ne $host->[1]) || $force) { |
123 |
|
|
push(@host, $host->[1]); |
124 |
|
|
} |
125 |
|
|
} |
126 |
|
|
|
127 |
|
|
if ($self->is_locked) { |
128 |
|
|
my $group = $self->cluster->{by_group}->[$self->cluster->{by_relocate}->[$hash]]; |
129 |
|
|
foreach my $host (@{$group}) { |
130 |
|
|
if (($self->{backend} ne $host->[1]) || $force) { |
131 |
|
|
push(@host, $host->[1]); |
132 |
|
|
} |
133 |
|
|
} |
134 |
|
|
} |
135 |
|
|
} |
136 |
|
|
|
137 |
|
|
return @host; |
138 |
|
|
} |
139 |
|
|
|
140 |
|
|
sub is_friend { |
141 |
|
|
my $self = shift; |
142 |
|
|
|
143 |
|
|
if ($self->cluster) { |
144 |
|
|
my $group = $self->cluster->{by_group}->[$self->cluster->{by_key}->[hex($self->get_full_hash(shift))]] || return (); |
145 |
|
|
|
146 |
|
|
foreach my $host (@{$group}) { |
147 |
|
|
if ($self->{backend} eq $host->[1]) { |
148 |
|
|
return 1; |
149 |
|
|
} |
150 |
|
|
} |
151 |
|
|
} |
152 |
|
|
|
153 |
|
|
return 0; |
154 |
|
|
} |
155 |
|
|
|
156 |
|
|
sub is_same_cluster { |
157 |
|
|
my $self = shift; |
158 |
|
|
my $to_login = shift || return 0; |
159 |
|
|
|
160 |
|
|
return $self->cluster->{by_key}->[hex($self->get_full_hash)] == $self->cluster->{by_key}->[hex($self->get_full_hash($to_login))]; |
161 |
|
|
|
162 |
|
|
} |
163 |
|
|
|
164 |
|
|
sub is_locked { |
165 |
|
|
my $self = shift; |
166 |
|
|
|
167 |
|
|
if ($self->cluster) { |
168 |
|
|
return defined($self->cluster->{by_relocate}->[hex($self->get_full_hash())]) ? 1 : 0; |
169 |
|
|
} |
170 |
|
|
|
171 |
|
|
return 1; |
172 |
|
|
} |
173 |
|
|
|
174 |
|
|
sub put { |
175 |
|
|
my ($self, $url) = @_; |
176 |
|
|
|
177 |
|
|
die $log->error("wrong put usage") unless ($url); |
178 |
|
|
|
179 |
|
|
$self->_store_file($url); |
180 |
|
|
} |
181 |
|
|
|
182 |
|
|
sub relocate { |
183 |
|
|
my ($self, $url, $to_login, $to_url) = @_; |
184 |
|
|
|
185 |
|
|
die $log->error("wrong relocate usage") unless ($url || $to_login || $to_url); |
186 |
|
|
|
187 |
|
|
unless ($self->is_friend) { |
188 |
|
|
$log->error("not home cluster"); |
189 |
|
|
return 0; |
190 |
|
|
} |
191 |
|
|
|
192 |
|
|
$self->_store_file($url, $to_login, $to_url); |
193 |
|
|
} |
194 |
|
|
|
195 |
|
|
sub relocate_dir { |
196 |
|
|
my ($self, $url, $to_login, $to_url) = @_; |
197 |
|
|
|
198 |
|
|
die $log->error("wrong relocate_dir usage") unless ($url && $to_login && $to_url); |
199 |
|
|
|
200 |
|
|
unless ($self->is_friend) { |
201 |
|
|
$log->error("not home cluster"); |
202 |
|
|
return 0; |
203 |
|
|
} |
204 |
|
|
|
205 |
|
|
my $dir = join('/', $$self->{users}, $self->get_hash, $url); |
206 |
|
|
opendir(D, $dir); |
207 |
|
|
foreach my $file (grep(!/^[\.]+$/, readdir(D))) { |
208 |
|
|
$self->relocate(join("/", $url, $file), $to_login, join("/", $to_url, $file)); |
209 |
|
|
} |
210 |
|
|
closedir(D); |
211 |
|
|
} |
212 |
|
|
|
213 |
|
|
sub move { |
214 |
|
|
my ($self, $url, $to_login, $to_url) = @_; |
215 |
|
|
|
216 |
|
|
die $log->error("wrong move usage") unless ($url || $to_login || $to_url); |
217 |
|
|
|
218 |
|
|
return $self->_transfer_file(1, $url, $to_login, $to_url); |
219 |
|
|
|
220 |
|
|
} |
221 |
|
|
|
222 |
|
|
sub copy { |
223 |
|
|
my ($self, $url, $to_login, $to_url) = @_; |
224 |
|
|
|
225 |
|
|
die $log->error("wrong copy usage") unless ($url || $to_login || $to_url); |
226 |
|
|
|
227 |
|
|
return $self->_transfer_file(0, $url, $to_login, $to_url); |
228 |
|
|
} |
229 |
|
|
|
230 |
|
|
|
231 |
|
|
sub delete { |
232 |
|
|
my ($self, $url, $recursive) = @_; |
233 |
|
|
|
234 |
|
|
die $log->error("wrong delete usage") unless ($url); |
235 |
|
|
|
236 |
|
|
my $req_file = join('/', "protected", $self->get_hash, $url); |
237 |
|
|
|
238 |
|
|
foreach my $host ($self->get_friend) { |
239 |
|
|
|
240 |
|
|
my $req_url = 'http://'.join('/', $host, $req_file); |
241 |
|
|
my $req = HTTP::Request->new( |
242 |
|
|
DELETE => $req_url, |
243 |
|
|
); |
244 |
|
|
if ($recursive) { |
245 |
|
|
$req->header('Depth' => 'infinity'); |
246 |
|
|
} |
247 |
|
|
$self->_send_request($req, "DELETE $req_url"); |
248 |
|
|
|
249 |
|
|
} |
250 |
|
|
} |
251 |
|
|
|
252 |
|
|
|
253 |
|
|
sub lock { |
254 |
|
|
my $self = shift; |
255 |
|
|
|
256 |
|
|
return $self->_lock_request(1); |
257 |
|
|
} |
258 |
|
|
|
259 |
|
|
|
260 |
|
|
sub unlock { |
261 |
|
|
my $self = shift; |
262 |
|
|
|
263 |
|
|
return $self->_lock_request(0); |
264 |
|
|
} |
265 |
|
|
|
266 |
|
|
|
267 |
|
|
sub acl { |
268 |
|
|
my ($self, $url, $is_public) = @_; |
269 |
|
|
|
270 |
|
|
die $log->error("wrong acl usage") unless ($url); |
271 |
|
|
|
272 |
|
|
my $req_file = join('/', "protected", $self->get_hash, $url); |
273 |
|
|
|
274 |
|
|
foreach my $host ($self->get_friend) { |
275 |
|
|
|
276 |
|
|
my $req_url = 'http://'.join('/', $host, $req_file); |
277 |
|
|
my $req = HTTP::Request->new( |
278 |
|
|
ACL => $req_url, |
279 |
|
|
); |
280 |
|
|
$req->header('X-Access' => $is_public ? 775 : 770 ); |
281 |
|
|
|
282 |
|
|
$self->_send_request($req, "ACL $req_url"); |
283 |
|
|
|
284 |
|
|
} |
285 |
|
|
} |
286 |
|
|
|
287 |
|
|
sub _store_file { |
288 |
|
|
my ($self, $url, $to_login, $to_url) = @_; |
289 |
|
|
|
290 |
|
|
my $file = join('/', $self->{temp} && !$self->is_friend ? $self->{temp} : $self->{users}, $self->get_hash, $url); |
291 |
|
|
|
292 |
|
|
if (open($Contenido::Cluster::Storage::fh, $file)) { |
293 |
|
|
if (my @host = $self->get_friend) { |
294 |
|
|
|
295 |
|
|
my $req_file = join('/', "public", $to_login && $to_url ? ($self->get_hash($to_login), $to_url) : ($self->get_hash, $url)); |
296 |
|
|
|
297 |
|
|
if ($self->{is_async} && $self->async) { |
298 |
|
|
|
299 |
|
|
my $content; |
300 |
|
|
while (read($Contenido::Cluster::Storage::fh, my $buf, 1024*1024)) { |
301 |
|
|
$content .= $buf; |
302 |
|
|
} |
303 |
|
|
|
304 |
|
|
foreach my $host (@host) { |
305 |
|
|
|
306 |
|
|
my $req_url = 'http://'.join('/', $host, $req_file); |
307 |
|
|
my $req = HTTP::Request->new( |
308 |
|
|
PUT => $req_url, |
309 |
|
|
); |
310 |
|
|
$req->content_length((stat($file))[7]); |
311 |
|
|
$req->content_ref(\$content); |
312 |
|
|
|
313 |
|
|
$self->_send_request($req, "PUT $file to $req_url"); |
314 |
|
|
|
315 |
|
|
} |
316 |
|
|
|
317 |
|
|
} else { |
318 |
|
|
|
319 |
|
|
foreach my $host (@host) { |
320 |
|
|
seek($Contenido::Cluster::Storage::fh, 0, 0); |
321 |
|
|
|
322 |
|
|
my $req_url = 'http://'.join('/', $host, $req_file); |
323 |
|
|
my $req = HTTP::Request->new( |
324 |
|
|
PUT => $req_url, |
325 |
|
|
undef, |
326 |
|
|
sub { |
327 |
|
|
read($Contenido::Cluster::Storage::fh, my $buf, 1024*1024); |
328 |
|
|
return $buf; |
329 |
|
|
} |
330 |
|
|
); |
331 |
|
|
$req->content_length((stat($file))[7]); |
332 |
|
|
|
333 |
|
|
$self->_send_request($req, "PUT $file to $req_url"); |
334 |
|
|
} |
335 |
|
|
} |
336 |
|
|
|
337 |
|
|
} |
338 |
|
|
close($Contenido::Cluster::Storage::fh); |
339 |
|
|
} |
340 |
|
|
} |
341 |
|
|
|
342 |
|
|
sub _send_request { |
343 |
|
|
my $self = shift; |
344 |
|
|
my $req = shift || die $log->error("no request passed"); |
345 |
|
|
my $msg = shift || "LWP"; |
346 |
|
|
|
347 |
|
|
$log->debug( $msg." ..." ); |
348 |
|
|
|
349 |
|
|
if ($self->{is_async}) { |
350 |
|
|
|
351 |
|
|
$self->async->add($req); |
352 |
|
|
$self->async->poke(); |
353 |
|
|
|
354 |
|
|
} else { |
355 |
|
|
|
356 |
|
|
my $res = new LWP::UserAgent->request($req); |
357 |
|
|
unless ($res->is_success) { |
358 |
|
|
$log->error( $msg." failed - ".$res->code ); |
359 |
|
|
$self->{result} = 0; |
360 |
|
|
} |
361 |
|
|
|
362 |
|
|
} |
363 |
|
|
} |
364 |
|
|
|
365 |
|
|
sub _lock_request { |
366 |
|
|
my $self = shift; |
367 |
|
|
my $mode = shift || 0; |
368 |
|
|
$mode = $mode ? "LOCK" : "UNLOCK"; |
369 |
|
|
|
370 |
|
|
my $req_file = join('/', "protected", $self->login); |
371 |
|
|
|
372 |
|
|
foreach my $host ($self->get_friend(1)) { |
373 |
|
|
|
374 |
|
|
my $req_url = 'http://'.join('/', $host, $req_file); |
375 |
|
|
my $req = HTTP::Request->new( |
376 |
|
|
$mode => $req_url, |
377 |
|
|
); |
378 |
|
|
$req->header('X-Hash' => $self->get_full_hash); |
379 |
|
|
|
380 |
|
|
$self->_send_request($req, "$mode $req_url"); |
381 |
|
|
|
382 |
|
|
} |
383 |
|
|
|
384 |
|
|
return $self->_stop_async(); |
385 |
|
|
} |
386 |
|
|
|
387 |
|
|
sub _transfer_file { |
388 |
|
|
my $self = shift; |
389 |
|
|
my $mode = shift || 0; |
390 |
|
|
$mode = $mode ? "MOVE" : "COPY"; |
391 |
|
|
|
392 |
|
|
my ($url, $to_login, $to_url) = @_; |
393 |
|
|
|
394 |
|
|
unless ($self->is_same_cluster($to_login)) { |
395 |
|
|
$log->error("users not on same cluster"); |
396 |
|
|
return 0; |
397 |
|
|
} |
398 |
|
|
|
399 |
|
|
my $req_file = join('/', "protected", $self->get_hash, $url); |
400 |
|
|
my $to_req_file = join('/', "protected", $self->get_hash($to_login), $to_url); |
401 |
|
|
|
402 |
|
|
foreach my $host ($self->get_friend) { |
403 |
|
|
|
404 |
|
|
my $req_url = 'http://'.join('/', $host, $req_file); |
405 |
|
|
my $to_req_url = 'http://'.join('/', $host, $to_req_file); |
406 |
|
|
my $req = HTTP::Request->new( |
407 |
|
|
$mode => $req_url, |
408 |
|
|
); |
409 |
|
|
$req->header('Destination' => $to_req_url); |
410 |
|
|
|
411 |
|
|
$self->_send_request($req, "$mode $req_url to $to_req_url"); |
412 |
|
|
|
413 |
|
|
} |
414 |
|
|
} |
415 |
|
|
|
416 |
|
|
sub _stop_async { |
417 |
|
|
my $self = shift; |
418 |
|
|
|
419 |
|
|
if ($self->{is_async}) { |
420 |
|
|
while ( my $res = $self->{async}->wait_for_next_response ) { |
421 |
|
|
if (!$res->is_success() && ($res->code ne '405')) { |
422 |
|
|
$self->{result} = 0; |
423 |
|
|
$log->error("async failed - ".$res->code); |
424 |
|
|
} |
425 |
|
|
} |
426 |
|
|
} |
427 |
|
|
|
428 |
|
|
return $self->{result}; |
429 |
|
|
} |
430 |
|
|
|
431 |
|
|
sub get_hash { |
432 |
|
|
my $self = shift; |
433 |
|
|
my $login = shift || $self->login; |
434 |
|
|
|
435 |
|
|
my $hash = substr(sprintf("%X", crc32($login)), -4, 4); |
436 |
|
|
return substr($hash, -4, 2).'/'. substr($hash, -2, 2) || '00/00'; |
437 |
|
|
|
438 |
|
|
} |
439 |
|
|
|
440 |
|
|
sub get_full_hash { |
441 |
|
|
my $self = shift; |
442 |
|
|
my $login = shift || $self->login; |
443 |
|
|
|
444 |
|
|
return substr(sprintf("%X", crc32($login)), -4, 4) || '0000' |
445 |
|
|
} |
446 |
|
|
|
447 |
|
|
|
448 |
|
|
|
449 |
|
|
|
450 |
|
|
|
451 |
|
|
|
452 |
|
|
1; |