Revision 3 (by ahitrov@rambler.ru, 2010/03/24 15:19:32) The CORE
package Contenido::Cluster::Storage;
use strict;

use Contenido::Globals;

use HTTP::Async;
use HTTP::Request;
use HTTP::Headers;
use LWP::UserAgent;
use String::CRC32;
use Data::Dumper;


sub new {
    my $proto		= shift;
    my $class		= ref($proto) || $proto;
    my %opts		= @_;

    my $self 		= {};
    $self->{sql}	= $opts{sql} || die $log->error("You MUST pass sql object");
    $self->{users}	= $opts{users} || die $log->error("You MUST pass users dir");
    $self->{backend}	= $opts{backend} || die $log->error("You MUST pass backend host");
    $self->{is_async}	= $opts{is_async} ? 1 : 0;
    $self->{login}	= undef;
    $self->{temp}	= $opts{temp};

    bless ($self, $class);

    return $self;
}

sub start {
    my $self 		= shift;
    my %opts		= @_;

    $self->{is_async}	= $opts{is_async} || $self->{is_async} ? 1 : 0;
    $self->{login}	= $opts{login} || die $log->error("You MUST pass user login");
    $self->{result}	= 1;

    if ($self->{is_async} && !$self->{async}) {

        $self->{async} = new HTTP::Async();
        $self->{async}->slots(1000);
        $self->{async}->timeout(900);   
        $self->{async}->max_request_time(1800); 

    }

}

sub stop {
    my $self 		= shift;

    $self->{login}	= undef;

    return $self->_stop_async();
    
}

sub login {
    my $self 	= shift;

    return $self->{login} || die $log->error("You MUST start cluster session before use");
}

sub async {
    my $self 	= shift;

    return $self->{async} || die $log->error("You MUST start cluster session before use");
}

sub cluster {
    my $self 	= shift;

    unless ($request->{cluster}) {
        my $cursor = $self->{sql}->prepare("select bgroup, ip, host, status from backend where status = 0 order by id");
        $cursor->execute();
        while (my ($group, $ip, $host, $status) = $cursor->fetchrow_array()) {
            push(@{$request->{cluster}->{by_group}->[$group]}, [$ip, $host, $status]);
        }
        $cursor->finish();
       
        $cursor = $self->{sql}->prepare("select bgroup, hash from backend_virtual order by id");
        $cursor->execute();
        while (my ($group, $key) = $cursor->fetchrow_array()) {
            my ($from, $to) = map { hex } split("-", $key);
            $to ||= $from;
            foreach my $hash ($from..$to) {
                $request->{cluster}->{by_key}->[$hash] = $group;
                push(@{$request->{cluster}->{by_cluster}->[$group]}, $hash);
            }
        }
        $cursor->finish();

        $cursor = $self->{sql}->prepare("select bgroup, hash from backend_relocate order by id");
        $cursor->execute();
        while (my ($group, $key) = $cursor->fetchrow_array()) {
            my ($from, $to) = map { hex } split("-", $key);
            $to ||= $from;
            foreach my $hash ($from..$to) {
                $request->{cluster}->{by_relocate}->[$hash] = $group;
            }
        }
        $cursor->finish();
    }

    return $request->{cluster};

}

sub get_friend {
    my $self	= shift;
    my $force	= shift || 0;

    my @host	= ();

    if ($self->cluster) {
        my $hash = hex($self->get_full_hash);

        my $group = $self->cluster->{by_group}->[$self->cluster->{by_key}->[$hash]] || return ();
        foreach my $host (@{$group}) {
            if (($self->{backend} ne $host->[1]) || $force) {
                push(@host, $host->[1]);
            }
        }

        if ($self->is_locked) {
            my $group = $self->cluster->{by_group}->[$self->cluster->{by_relocate}->[$hash]];
            foreach my $host (@{$group}) {
                if (($self->{backend} ne $host->[1]) || $force) {
                    push(@host, $host->[1]);
                }
            }
        }
    }

    return @host;
}

sub is_friend {
    my $self	= shift;

    if ($self->cluster) {
        my $group = $self->cluster->{by_group}->[$self->cluster->{by_key}->[hex($self->get_full_hash(shift))]] || return ();

        foreach my $host (@{$group}) {
            if ($self->{backend} eq $host->[1]) {
                return 1;
            }
        }
    }

    return 0;
}

sub is_same_cluster {
    my $self		= shift;
    my $to_login	= shift || return 0;

    return $self->cluster->{by_key}->[hex($self->get_full_hash)] == $self->cluster->{by_key}->[hex($self->get_full_hash($to_login))];

}

sub is_locked {
    my $self	= shift;

    if ($self->cluster) {
        return defined($self->cluster->{by_relocate}->[hex($self->get_full_hash())]) ? 1 : 0;
    }

    return 1;
}

sub put {
    my ($self, $url) = @_; 

    die $log->error("wrong put usage") unless ($url);

    $self->_store_file($url);
}

sub relocate {
    my ($self, $url, $to_login, $to_url) = @_;

    die $log->error("wrong relocate usage") unless ($url || $to_login || $to_url);

    unless ($self->is_friend) {
        $log->error("not home cluster");        
        return 0;
    }

    $self->_store_file($url, $to_login, $to_url);
}

sub relocate_dir {
    my ($self, $url, $to_login, $to_url) = @_;

    die $log->error("wrong relocate_dir usage") unless ($url && $to_login && $to_url);

    unless ($self->is_friend) {
        $log->error("not home cluster");        
        return 0;
    }

    my $dir = join('/', $$self->{users}, $self->get_hash, $url);
    opendir(D, $dir);
    foreach my $file (grep(!/^[\.]+$/, readdir(D))) {
        $self->relocate(join("/", $url, $file), $to_login, join("/", $to_url, $file));
    } 
    closedir(D);
}

sub move {
    my ($self, $url, $to_login, $to_url) = @_;

    die $log->error("wrong move usage") unless ($url || $to_login || $to_url);

    return $self->_transfer_file(1, $url, $to_login, $to_url);

}

sub copy {
    my ($self, $url, $to_login, $to_url) = @_;

    die $log->error("wrong copy usage") unless ($url || $to_login || $to_url);

    return $self->_transfer_file(0, $url, $to_login, $to_url);
}


sub delete {
    my ($self, $url, $recursive) = @_;

    die $log->error("wrong delete usage") unless ($url);

    my $req_file = join('/', "protected", $self->get_hash, $url);

    foreach my $host ($self->get_friend) {

        my $req_url = 'http://'.join('/', $host, $req_file);
        my $req = HTTP::Request->new(
            DELETE => $req_url,
        );
        if ($recursive) {
            $req->header('Depth' => 'infinity');
        }
        $self->_send_request($req, "DELETE $req_url");

    }
}


sub lock {
    my $self	= shift;

    return $self->_lock_request(1);
}


sub unlock {
    my $self	= shift;

    return $self->_lock_request(0);
}


sub acl {
    my ($self, $url, $is_public) = @_;

    die $log->error("wrong acl usage") unless ($url);

    my $req_file = join('/', "protected", $self->get_hash, $url);

    foreach my $host ($self->get_friend) {

        my $req_url = 'http://'.join('/', $host, $req_file);
        my $req = HTTP::Request->new(
            ACL => $req_url,
        );
        $req->header('X-Access' => $is_public ? 775 : 770 );

        $self->_send_request($req, "ACL $req_url");

    }
}

sub _store_file {
    my ($self, $url, $to_login, $to_url) = @_;

    my $file = join('/', $self->{temp} && !$self->is_friend ? $self->{temp} : $self->{users}, $self->get_hash, $url);

    if (open($Contenido::Cluster::Storage::fh, $file)) {
        if (my @host = $self->get_friend) {

            my $req_file = join('/', "public", $to_login && $to_url ? ($self->get_hash($to_login), $to_url) : ($self->get_hash, $url));

            if ($self->{is_async} && $self->async) {

                my $content;
                while (read($Contenido::Cluster::Storage::fh, my $buf, 1024*1024)) {
                    $content .= $buf;
                }

                foreach my $host (@host) {

                    my $req_url = 'http://'.join('/', $host, $req_file);
                    my $req = HTTP::Request->new(
                        PUT	=> $req_url,
                    );
                    $req->content_length((stat($file))[7]);
                    $req->content_ref(\$content);

                    $self->_send_request($req, "PUT $file to $req_url");

                }

            } else {

                foreach my $host (@host) {
                    seek($Contenido::Cluster::Storage::fh, 0, 0);

                    my $req_url = 'http://'.join('/', $host, $req_file);
                    my $req = HTTP::Request->new(
                        PUT	=> $req_url,
                        undef,
                        sub { 
                            read($Contenido::Cluster::Storage::fh, my $buf, 1024*1024);
                            return $buf;
                        }
                    );
                    $req->content_length((stat($file))[7]);

                    $self->_send_request($req, "PUT $file to $req_url");
                }
           }
 
       }
       close($Contenido::Cluster::Storage::fh);
    }
}

sub _send_request {
    my $self	= shift;
    my $req	= shift || die $log->error("no request passed");
    my $msg	= shift || "LWP";

    $log->debug( $msg." ..." );

    if ($self->{is_async}) {

        $self->async->add($req);
        $self->async->poke();

    } else {

        my $res = new LWP::UserAgent->request($req);
        unless ($res->is_success) {
            $log->error( $msg." failed - ".$res->code );
            $self->{result} = 0;
        }

    }
}

sub _lock_request {
    my $self	= shift;
    my $mode	= shift || 0;
    $mode	= $mode ? "LOCK" : "UNLOCK";

    my $req_file = join('/', "protected", $self->login);

    foreach my $host ($self->get_friend(1)) {

        my $req_url = 'http://'.join('/', $host, $req_file);
        my $req = HTTP::Request->new(
            $mode => $req_url,
        );
        $req->header('X-Hash' => $self->get_full_hash);

        $self->_send_request($req, "$mode $req_url");
 
    }

    return $self->_stop_async();
}

sub _transfer_file {
    my $self	= shift;
    my $mode	= shift || 0;
    $mode	= $mode ? "MOVE" : "COPY";

    my ($url, $to_login, $to_url) = @_;

    unless ($self->is_same_cluster($to_login)) {
        $log->error("users not on same cluster");        
        return 0;
    }

    my $req_file	= join('/', "protected", $self->get_hash, $url);
    my $to_req_file	= join('/', "protected", $self->get_hash($to_login), $to_url);

    foreach my $host ($self->get_friend) {

        my $req_url	= 'http://'.join('/', $host, $req_file);
        my $to_req_url	= 'http://'.join('/', $host, $to_req_file);
        my $req = HTTP::Request->new(
            $mode => $req_url,
        );
        $req->header('Destination' => $to_req_url);

        $self->_send_request($req, "$mode $req_url to $to_req_url");

    }
}

sub _stop_async {
    my $self 		= shift;

    if ($self->{is_async}) {
        while ( my $res = $self->{async}->wait_for_next_response ) {
            if (!$res->is_success() && ($res->code ne '405')) {
                $self->{result} = 0;
                $log->error("async failed - ".$res->code);
            }
        }
    }

    return $self->{result};
}

sub get_hash {
    my $self	= shift;
    my $login	= shift || $self->login;

    my $hash = substr(sprintf("%X", crc32($login)), -4, 4);
    return substr($hash, -4, 2).'/'. substr($hash, -2, 2) || '00/00';

}

sub get_full_hash {
    my $self	= shift;
    my $login	= shift || $self->login;

    return substr(sprintf("%X", crc32($login)), -4, 4) || '0000'
}






1;