Revision 296 (by ahitrov, 2013/03/26 17:59:01) |
Promosite (anthill) project source
|
#!/usr/bin/perl -w
use strict;
umask (002);
my $ROOT=$ENV{HOME}."/Contenido/usr/projects/promosuite/services/loader";
my $DATAROOT=$ENV{HOME}."/Contenido/var/projects/promosuite/loader";
my $WDIR="$DATAROOT/incom/";
my $EDIR="$DATAROOT/error/";
my $PDIR="$DATAROOT/done/";
my $PIDF="$DATAROOT/logs/smaild.lock";
my $LOGF="$DATAROOT/logs/smaild.log";
#my $EXTCOM="$ROOT/bin/msgbody \| $ROOT/bin/loader";
my $EXTCOM="$ROOT/bin/load.sh";
my $MAXDTM=900;
my $N2proc=150;
## Locking
my ($errcode,$errmsg)=proc_lock($PIDF,$MAXDTM);
if ($errcode==1){
die $errmsg;
}
elsif ($errcode==2){
warn $errmsg;
exit;
}
elsif ($errcode==3){
# warn "previous incarnation is running";
exit;
}
#open log file
my $Stm=time();
my $Stmstr=scalar(localtime($Stm));
my $PK="$Stm.$$";
my $log2stderr=0;
if (open(LOG,">>$LOGF")){
select LOG;
$|=1;
select STDOUT;
}
else{
warn "Can\'t open log file $LOGF: $!\nlogging to STDERR";
$log2stderr=1;
open (LOG,">&=STDERR");
}
#starting
print LOG "$PK\t$Stm\t$Stmstr\tStarted\n";
my $tm=time();
my $tmstr=scalar(localtime($tm));
print LOG "$PK\t$tm\t$tmstr\tScanning workdir $WDIR for new files\n";
my %tmpf;
my @files;
if (opendir(DIR,$WDIR)){
my $totN=0;
while (defined(my $f=readdir(DIR))){
next if ($f eq '.');
next if ($f eq '..');
my ($part_tm,$part_pid,$part_rand,$part_prior)=split(/\./,$f);
$tmpf{$f}{p}=$part_prior;
$tmpf{$f}{tm}=$part_tm;
$totN++;
}
closedir DIR;
if ($totN){
@files=sort {
if ($tmpf{$a}{p} < $tmpf{$b}{p}){
return 1;
}
elsif ($tmpf{$a}{p} > $tmpf{$b}{p}){
return -1;
}
else{
return $tmpf{$a}{tm}<=>$tmpf{$b}{tm};
}
} keys (%tmpf);
$tm=time();
$tmstr=scalar(localtime());
my $procN=$totN;
if ($N2proc<$totN){
@files=splice(@files,0,$N2proc);
$procN=$N2proc;
}
print LOG "$PK\t$tm\t$tmstr\t$totN files in workdir, $procN of them will be processed\n";
# map {print "$_\n"} @files;
foreach my $f (@files){
$tm=time();
$tmstr=scalar(localtime());
my $proc_started_tm=$tm;
print LOG "$PK\t$tm\t$tmstr\tProcessing $f by $EXTCOM\n";
my $err=system("$EXTCOM < $WDIR$f");
$tm=time();
$tmstr=scalar(localtime());
unless ($err){
my $proc_l=$tm - $proc_started_tm;
print LOG "$PK\t$tm\t$tmstr\tFile $f processed [$proc_l s]\n";
unless (rename("$WDIR$f","$PDIR$f")){
print LOG "$PK\t$tm\t$tmstr\tError: Can\'t move $WDIR$f to $PDIR$f: $!\n";
warn "Can\'t move $WDIR$f to $PDIR$f: $!" unless($log2stderr);
}
}
else{
print LOG "$PK\t$tm\t$tmstr\tError: processing $f by $EXTCOM: $!: $?\n";
warn "$PK\t$tm\t$tmstr\tError: processing $f by $EXTCOM: $!: $?" unless ($log2stderr);
if (rename("$WDIR$f","$EDIR$f")){
print LOG "$PK\t$tm\t$tmstr\t$f moved to $EDIR\n";
}
else{
warn "Can\'t move $WDIR$f to $EDIR$f: $!";
}
}
}
}
else{
$tm=time();
$tmstr=scalar(localtime());
print LOG "$PK\t$tm\t$tmstr\tWorkdir $WDIR is empty\n";
}
}
else{
$tm=time();
$tmstr=scalar(localtime($tm));
print LOG "$PK\t$tm\t$tmstr\tError: Can\'t open dir $WDIR for scanning: $!\n";
warn "Can\'t open dir $WDIR for scanning: $!" unless ($log2stderr);
}
$tm=time();
$tmstr=scalar(localtime());
print LOG "$PK\t$tm\t$tmstr\tFinished\n";
unless ($log2stderr){
close LOG;
}
## Unlocking;
unlink ($PIDF) || die "Can\'t unlink $PIDF: $!";
sub proc_lock{
my ($pidfn,$maxDTM)=@_;
my $stm=time();
if (-e $pidfn){
if (open(F,$pidfn)){
my $str=<F>;
close F;
return (1,"empty pid file $pidfn") unless(defined($str));
my ($ppstm,$pppid)=split(/\t/,$str) if ($str);
return (1,"No proc with pid $pppid") unless (kill(0,$pppid));
my $ctm=time();
my $dtm=$ctm-$ppstm;
if (($ctm-$ppstm)>$maxDTM){
return (2,"Previous proc [pid=$pppid;started: ".scalar(localtime($ppstm))."] working more then $maxDTM secs");
}
return (3,'');
}
else{
my $pidfm=(stat($pidfn))[2];
return (1,"Can\'t open $pidfn [mode=$pidfm]: $!");
}
}
else{
if (open(F,">$pidfn")){
select F;
$|=1;
select STDOUT;
print F "$stm\t$$";
close F;
}
else{
return (1,"Can\'t open $pidfn: $!");
}
}
return 0;
}