Q4M + Parallel::Prefork + Signal handling + Log::Minimal
conditions precedent:
- use daemontools & multilog
Points:
- make DB connection after fork
- do not trap SIGTERM while calling queue_wait(), so that the process can be shut down while waiting for the response from q4m
- remove $time and add $PID for log. Because I'm using multilog. It prints the tai64n automatically(and, tai64n is better).
package OreOre::Worker::Base; use strict; use warnings; use parent qw/Class::Accessor::Fast/; use Try::Tiny; use Parallel::Prefork; use Log::Minimal; use Carp (); __PACKAGE__->mk_accessors(qw/max_workers job_count max_job_count connect_info timeout/); sub new { my $class = shift; my %args = @_ == 1 ? %{$_[0]} : @_; my $self = bless { max_workers => 2, job_count => 0, max_job_count => 20, timeout => 60, %args }, $class; Carp::croak("missing mandatory args: connect_info") unless $self->{connect_info}; return $self; } sub table { die "abstract base method. please implement this method in child class" } # this method must return boolean value. # true if suceeded, false otherwise. sub process { die "abstract base method. please implement this method in child class" } sub run { my $self = shift; # time is not required, because I use multilog # print pid for debugging local $Log::Minimal::PRINT = sub { my ( $time, $type, $message, $trace ) = @_; print STDERR "[$$] [$type] $message at $trace\n"; }; infof("start: $$"); my $pm = Parallel::Prefork->new( { max_workers => $self->max_workers, trap_signals => { TERM => 'TERM', HUP => 'TERM', INT => 'TERM', USR1 => undef, } } ); while ($pm->signal_received !~ /^(?:TERM|INT)$/) { $pm->start and next; debugf($self->job_count.'/'.$self->max_job_count); my $dbh = DBI->connect(@{$self->{connect_info}}) or die "cannot connect to q4m"; my $term = 0; LOOP: while (!$term && $self->max_job_count >= $self->{job_count}++) { infof("waiting..."); my $got_job = $dbh->selectrow_array('SELECT queue_wait(?, ?);', {}, $self->table, $self->timeout); unless ($got_job) { debugf("no job..."); next LOOP; }; infof("got job"); try { local $SIG{TERM} = sub { infof("TERM RECEIVED : "); $term++ }; my $retval = $self->process($dbh); if ($retval) { infof("finished"); $dbh->do(q{SELECT queue_end();}); } else { infof("failed"); $dbh->do(q{SELECT queue_abort();}); } } catch { critf("error occured: $_"); $dbh->do(q{SELECT queue_abort();}); }; } infof("ready to die: $self->{job_count}, $self->{max_job_count}, $term"); $pm->finish; } infof("parent is ready for exit"); $pm->wait_all_children(); infof("ok, I'll die!"); } 1; __END__ package MyWorker; use base qw/OreOre::Worker::Base/; sub table { 'foo' } sub process { my ($self, $dbh) = @_; my ($col1, $col2) = $dbh->selectrow_array('SELECT * FROM foo'); ... } package main; use DBI; my $worker = MyWorker->new( max_workers => 3, connect_info => ['dbi:mysql:database=queue', 'root', ''], timeout => 30, ); $worker->run();