package Contenido::Cluster::Storage;
use strict;
use Contenido::Globals;
use HTTP::Async;
use HTTP::Request;
use HTTP::Headers;
use LWP::UserAgent;
use String::CRC32;
use Data::Dumper;
sub new {
my $proto = shift;
my $class = ref($proto) || $proto;
my %opts = @_;
my $self = {};
$self->{sql} = $opts{sql} || die $log->error("You MUST pass sql object");
$self->{users} = $opts{users} || die $log->error("You MUST pass users dir");
$self->{backend} = $opts{backend} || die $log->error("You MUST pass backend host");
$self->{is_async} = $opts{is_async} ? 1 : 0;
$self->{login} = undef;
$self->{temp} = $opts{temp};
bless ($self, $class);
return $self;
}
sub start {
my $self = shift;
my %opts = @_;
$self->{is_async} = $opts{is_async} || $self->{is_async} ? 1 : 0;
$self->{login} = $opts{login} || die $log->error("You MUST pass user login");
$self->{result} = 1;
if ($self->{is_async} && !$self->{async}) {
$self->{async} = new HTTP::Async();
$self->{async}->slots(1000);
$self->{async}->timeout(900);
$self->{async}->max_request_time(1800);
}
}
sub stop {
my $self = shift;
$self->{login} = undef;
return $self->_stop_async();
}
sub login {
my $self = shift;
return $self->{login} || die $log->error("You MUST start cluster session before use");
}
sub async {
my $self = shift;
return $self->{async} || die $log->error("You MUST start cluster session before use");
}
sub cluster {
my $self = shift;
unless ($request->{cluster}) {
my $cursor = $self->{sql}->prepare("select bgroup, ip, host, status from backend where status = 0 order by id");
$cursor->execute();
while (my ($group, $ip, $host, $status) = $cursor->fetchrow_array()) {
push(@{$request->{cluster}->{by_group}->[$group]}, [$ip, $host, $status]);
}
$cursor->finish();
$cursor = $self->{sql}->prepare("select bgroup, hash from backend_virtual order by id");
$cursor->execute();
while (my ($group, $key) = $cursor->fetchrow_array()) {
my ($from, $to) = map { hex } split("-", $key);
$to ||= $from;
foreach my $hash ($from..$to) {
$request->{cluster}->{by_key}->[$hash] = $group;
push(@{$request->{cluster}->{by_cluster}->[$group]}, $hash);
}
}
$cursor->finish();
$cursor = $self->{sql}->prepare("select bgroup, hash from backend_relocate order by id");
$cursor->execute();
while (my ($group, $key) = $cursor->fetchrow_array()) {
my ($from, $to) = map { hex } split("-", $key);
$to ||= $from;
foreach my $hash ($from..$to) {
$request->{cluster}->{by_relocate}->[$hash] = $group;
}
}
$cursor->finish();
}
return $request->{cluster};
}
sub get_friend {
my $self = shift;
my $force = shift || 0;
my @host = ();
if ($self->cluster) {
my $hash = hex($self->get_full_hash);
my $group = $self->cluster->{by_group}->[$self->cluster->{by_key}->[$hash]] || return ();
foreach my $host (@{$group}) {
if (($self->{backend} ne $host->[1]) || $force) {
push(@host, $host->[1]);
}
}
if ($self->is_locked) {
my $group = $self->cluster->{by_group}->[$self->cluster->{by_relocate}->[$hash]];
foreach my $host (@{$group}) {
if (($self->{backend} ne $host->[1]) || $force) {
push(@host, $host->[1]);
}
}
}
}
return @host;
}
sub is_friend {
my $self = shift;
if ($self->cluster) {
my $group = $self->cluster->{by_group}->[$self->cluster->{by_key}->[hex($self->get_full_hash(shift))]] || return ();
foreach my $host (@{$group}) {
if ($self->{backend} eq $host->[1]) {
return 1;
}
}
}
return 0;
}
sub is_same_cluster {
my $self = shift;
my $to_login = shift || return 0;
return $self->cluster->{by_key}->[hex($self->get_full_hash)] == $self->cluster->{by_key}->[hex($self->get_full_hash($to_login))];
}
sub is_locked {
my $self = shift;
if ($self->cluster) {
return defined($self->cluster->{by_relocate}->[hex($self->get_full_hash())]) ? 1 : 0;
}
return 1;
}
sub put {
my ($self, $url) = @_;
die $log->error("wrong put usage") unless ($url);
$self->_store_file($url);
}
sub relocate {
my ($self, $url, $to_login, $to_url) = @_;
die $log->error("wrong relocate usage") unless ($url || $to_login || $to_url);
unless ($self->is_friend) {
$log->error("not home cluster");
return 0;
}
$self->_store_file($url, $to_login, $to_url);
}
sub relocate_dir {
my ($self, $url, $to_login, $to_url) = @_;
die $log->error("wrong relocate_dir usage") unless ($url && $to_login && $to_url);
unless ($self->is_friend) {
$log->error("not home cluster");
return 0;
}
my $dir = join('/', $$self->{users}, $self->get_hash, $url);
opendir(D, $dir);
foreach my $file (grep(!/^[\.]+$/, readdir(D))) {
$self->relocate(join("/", $url, $file), $to_login, join("/", $to_url, $file));
}
closedir(D);
}
sub move {
my ($self, $url, $to_login, $to_url) = @_;
die $log->error("wrong move usage") unless ($url || $to_login || $to_url);
return $self->_transfer_file(1, $url, $to_login, $to_url);
}
sub copy {
my ($self, $url, $to_login, $to_url) = @_;
die $log->error("wrong copy usage") unless ($url || $to_login || $to_url);
return $self->_transfer_file(0, $url, $to_login, $to_url);
}
sub delete {
my ($self, $url, $recursive) = @_;
die $log->error("wrong delete usage") unless ($url);
my $req_file = join('/', "protected", $self->get_hash, $url);
foreach my $host ($self->get_friend) {
my $req_url = 'http://'.join('/', $host, $req_file);
my $req = HTTP::Request->new(
DELETE => $req_url,
);
if ($recursive) {
$req->header('Depth' => 'infinity');
}
$self->_send_request($req, "DELETE $req_url");
}
}
sub lock {
my $self = shift;
return $self->_lock_request(1);
}
sub unlock {
my $self = shift;
return $self->_lock_request(0);
}
sub acl {
my ($self, $url, $is_public) = @_;
die $log->error("wrong acl usage") unless ($url);
my $req_file = join('/', "protected", $self->get_hash, $url);
foreach my $host ($self->get_friend) {
my $req_url = 'http://'.join('/', $host, $req_file);
my $req = HTTP::Request->new(
ACL => $req_url,
);
$req->header('X-Access' => $is_public ? 775 : 770 );
$self->_send_request($req, "ACL $req_url");
}
}
sub _store_file {
my ($self, $url, $to_login, $to_url) = @_;
my $file = join('/', $self->{temp} && !$self->is_friend ? $self->{temp} : $self->{users}, $self->get_hash, $url);
if (open($Contenido::Cluster::Storage::fh, $file)) {
if (my @host = $self->get_friend) {
my $req_file = join('/', "public", $to_login && $to_url ? ($self->get_hash($to_login), $to_url) : ($self->get_hash, $url));
if ($self->{is_async} && $self->async) {
my $content;
while (read($Contenido::Cluster::Storage::fh, my $buf, 1024*1024)) {
$content .= $buf;
}
foreach my $host (@host) {
my $req_url = 'http://'.join('/', $host, $req_file);
my $req = HTTP::Request->new(
PUT => $req_url,
);
$req->content_length((stat($file))[7]);
$req->content_ref(\$content);
$self->_send_request($req, "PUT $file to $req_url");
}
} else {
foreach my $host (@host) {
seek($Contenido::Cluster::Storage::fh, 0, 0);
my $req_url = 'http://'.join('/', $host, $req_file);
my $req = HTTP::Request->new(
PUT => $req_url,
undef,
sub {
read($Contenido::Cluster::Storage::fh, my $buf, 1024*1024);
return $buf;
}
);
$req->content_length((stat($file))[7]);
$self->_send_request($req, "PUT $file to $req_url");
}
}
}
close($Contenido::Cluster::Storage::fh);
}
}
sub _send_request {
my $self = shift;
my $req = shift || die $log->error("no request passed");
my $msg = shift || "LWP";
$log->debug( $msg." ..." );
if ($self->{is_async}) {
$self->async->add($req);
$self->async->poke();
} else {
my $res = new LWP::UserAgent->request($req);
unless ($res->is_success) {
$log->error( $msg." failed - ".$res->code );
$self->{result} = 0;
}
}
}
sub _lock_request {
my $self = shift;
my $mode = shift || 0;
$mode = $mode ? "LOCK" : "UNLOCK";
my $req_file = join('/', "protected", $self->login);
foreach my $host ($self->get_friend(1)) {
my $req_url = 'http://'.join('/', $host, $req_file);
my $req = HTTP::Request->new(
$mode => $req_url,
);
$req->header('X-Hash' => $self->get_full_hash);
$self->_send_request($req, "$mode $req_url");
}
return $self->_stop_async();
}
sub _transfer_file {
my $self = shift;
my $mode = shift || 0;
$mode = $mode ? "MOVE" : "COPY";
my ($url, $to_login, $to_url) = @_;
unless ($self->is_same_cluster($to_login)) {
$log->error("users not on same cluster");
return 0;
}
my $req_file = join('/', "protected", $self->get_hash, $url);
my $to_req_file = join('/', "protected", $self->get_hash($to_login), $to_url);
foreach my $host ($self->get_friend) {
my $req_url = 'http://'.join('/', $host, $req_file);
my $to_req_url = 'http://'.join('/', $host, $to_req_file);
my $req = HTTP::Request->new(
$mode => $req_url,
);
$req->header('Destination' => $to_req_url);
$self->_send_request($req, "$mode $req_url to $to_req_url");
}
}
sub _stop_async {
my $self = shift;
if ($self->{is_async}) {
while ( my $res = $self->{async}->wait_for_next_response ) {
if (!$res->is_success() && ($res->code ne '405')) {
$self->{result} = 0;
$log->error("async failed - ".$res->code);
}
}
}
return $self->{result};
}
sub get_hash {
my $self = shift;
my $login = shift || $self->login;
my $hash = substr(sprintf("%X", crc32($login)), -4, 4);
return substr($hash, -4, 2).'/'. substr($hash, -2, 2) || '00/00';
}
sub get_full_hash {
my $self = shift;
my $login = shift || $self->login;
return substr(sprintf("%X", crc32($login)), -4, 4) || '0000'
}
1;