Revision 296 (by ahitrov, 2013/03/26 17:59:01) Promosite (anthill) project source
#!/usr/bin/perl -w
use strict;

umask (002);

my $ROOT=$ENV{HOME}."/Contenido/usr/projects/promosuite/services/loader";
my $DATAROOT=$ENV{HOME}."/Contenido/var/projects/promosuite/loader";
my $WDIR="$DATAROOT/incom/";
my $EDIR="$DATAROOT/error/";
my $PDIR="$DATAROOT/done/";
my $PIDF="$DATAROOT/logs/smaild.lock";
my $LOGF="$DATAROOT/logs/smaild.log";
#my $EXTCOM="$ROOT/bin/msgbody \| $ROOT/bin/loader";
my $EXTCOM="$ROOT/bin/load.sh";

my $MAXDTM=900;
my $N2proc=150;


## Locking
my ($errcode,$errmsg)=proc_lock($PIDF,$MAXDTM);
if ($errcode==1){
   die $errmsg;
}
elsif ($errcode==2){
   warn $errmsg;
   exit;
}
elsif ($errcode==3){
#   warn "previous incarnation is running";
   exit;
  
}


#open log file

my $Stm=time();
my $Stmstr=scalar(localtime($Stm));

my $PK="$Stm.$$";
my $log2stderr=0;

if (open(LOG,">>$LOGF")){
   select LOG;
   $|=1;
   select STDOUT;
}
else{
   warn "Can\'t open log file $LOGF: $!\nlogging to STDERR";
   $log2stderr=1;
   open (LOG,">&=STDERR");
}

#starting
print LOG "$PK\t$Stm\t$Stmstr\tStarted\n";
my $tm=time();
my $tmstr=scalar(localtime($tm));
print LOG "$PK\t$tm\t$tmstr\tScanning workdir $WDIR for new files\n";

my %tmpf;
my @files;
if (opendir(DIR,$WDIR)){
   my $totN=0;
   while (defined(my $f=readdir(DIR))){
      next if ($f eq '.');
      next if ($f eq '..');
      my ($part_tm,$part_pid,$part_rand,$part_prior)=split(/\./,$f);
      $tmpf{$f}{p}=$part_prior;
      $tmpf{$f}{tm}=$part_tm; 
      $totN++;
   }   
   closedir DIR;
   
   if ($totN){
      @files=sort {
         if ($tmpf{$a}{p} < $tmpf{$b}{p}){
            return 1;
         }
         elsif ($tmpf{$a}{p} > $tmpf{$b}{p}){
            return -1;
         }
         else{
            return $tmpf{$a}{tm}<=>$tmpf{$b}{tm};
         }
      } keys (%tmpf);
      $tm=time();
      $tmstr=scalar(localtime());
      my $procN=$totN;
      if ($N2proc<$totN){
         @files=splice(@files,0,$N2proc);
         $procN=$N2proc;
      }
      print LOG "$PK\t$tm\t$tmstr\t$totN files in workdir, $procN of them will be processed\n"; 
#      map {print "$_\n"} @files;

      foreach my $f (@files){
         $tm=time();
         $tmstr=scalar(localtime());
         my $proc_started_tm=$tm;
         print LOG "$PK\t$tm\t$tmstr\tProcessing $f by $EXTCOM\n";
         my $err=system("$EXTCOM < $WDIR$f");
         $tm=time();
         $tmstr=scalar(localtime());
         unless ($err){
            my $proc_l=$tm - $proc_started_tm;
            print LOG "$PK\t$tm\t$tmstr\tFile $f processed [$proc_l s]\n"; 
            unless (rename("$WDIR$f","$PDIR$f")){
               print LOG "$PK\t$tm\t$tmstr\tError: Can\'t move $WDIR$f to $PDIR$f: $!\n";
               warn "Can\'t move $WDIR$f to $PDIR$f: $!" unless($log2stderr);
            }
         }
         else{
            print LOG "$PK\t$tm\t$tmstr\tError: processing $f by $EXTCOM: $!: $?\n";
            warn "$PK\t$tm\t$tmstr\tError: processing $f by $EXTCOM: $!: $?" unless ($log2stderr);
            if (rename("$WDIR$f","$EDIR$f")){
               print LOG "$PK\t$tm\t$tmstr\t$f moved to $EDIR\n";
            }
            else{
               warn "Can\'t move $WDIR$f to $EDIR$f: $!";
            }
         }
      }
   }
   else{
      $tm=time();
      $tmstr=scalar(localtime());
      print LOG "$PK\t$tm\t$tmstr\tWorkdir $WDIR is empty\n";
   }
}
else{
   $tm=time();
   $tmstr=scalar(localtime($tm));
   print LOG "$PK\t$tm\t$tmstr\tError: Can\'t open dir $WDIR for scanning: $!\n";
   warn "Can\'t open dir $WDIR for scanning: $!" unless ($log2stderr);
}

$tm=time();
$tmstr=scalar(localtime());
print LOG "$PK\t$tm\t$tmstr\tFinished\n";

unless ($log2stderr){
   close LOG;
}

## Unlocking;
unlink ($PIDF) || die "Can\'t unlink $PIDF: $!";

sub proc_lock{
   my ($pidfn,$maxDTM)=@_;
   my $stm=time();
   if (-e $pidfn){
      if (open(F,$pidfn)){
         my $str=<F>;
         close F;
         return (1,"empty pid file $pidfn") unless(defined($str));
         my ($ppstm,$pppid)=split(/\t/,$str) if ($str);
         return (1,"No proc with pid $pppid") unless (kill(0,$pppid));
         my $ctm=time();
         my $dtm=$ctm-$ppstm;
         if (($ctm-$ppstm)>$maxDTM){
            return (2,"Previous proc [pid=$pppid;started: ".scalar(localtime($ppstm))."] working more then $maxDTM secs");
         } 
         return (3,'');
      }
      else{
         my $pidfm=(stat($pidfn))[2];
         return (1,"Can\'t open $pidfn [mode=$pidfm]: $!");
      }
   }
   else{
      if (open(F,">$pidfn")){
         select F;
         $|=1;
         select STDOUT;
         print F "$stm\t$$";
         close F;
      }
      else{
         return (1,"Can\'t open $pidfn: $!");
      }
   }
   return 0;
}