tokuhirom's Blog

Q4M + Parallel::Prefork + Signal handling + Log::Minimal

conditions precedent:

Points:

package OreOre::Worker::Base;
use strict;
use warnings;
use parent qw/Class::Accessor::Fast/;
use Try::Tiny;
use Parallel::Prefork;
use Log::Minimal;
use Carp ();

__PACKAGE__->mk_accessors(qw/max_workers job_count max_job_count connect_info timeout/);

sub new {
    my $class = shift;
    my %args = @_ == 1 ? %{$_[0]} : @_;

    my $self = bless {
        max_workers   => 2,
        job_count     => 0,
        max_job_count => 20,
        timeout       => 60,
        %args
    }, $class;
    Carp::croak("missing mandatory args: connect_info") unless $self->{connect_info};
    return $self;
}

sub table   { die "abstract base method. please implement this method in child class" }

# this method must return boolean value.
# true if suceeded, false otherwise.
sub process { die "abstract base method. please implement this method in child class" }

sub run {
    my $self = shift;

    # time is not required, because I use multilog
    # print pid for debugging
    local $Log::Minimal::PRINT = sub {
        my ( $time, $type, $message, $trace ) = @_;
        print STDERR "[$$] [$type] $message at $trace\n";
    };

    infof("start: $$");

    my $pm = Parallel::Prefork->new(
        {   
            max_workers  => $self->max_workers,
            trap_signals => {
                TERM => 'TERM',
                HUP  => 'TERM',
                INT  => 'TERM',
                USR1 => undef,
            }
        }
    );
    while ($pm->signal_received !~ /^(?:TERM|INT)$/) {
        $pm->start and next;

        debugf($self->job_count.'/'.$self->max_job_count);

        my $dbh = DBI->connect(@{$self->{connect_info}}) or die "cannot connect to q4m";

        my $term = 0;
        LOOP: while (!$term && $self->max_job_count >= $self->{job_count}++) {
            infof("waiting...");
            my $got_job = $dbh->selectrow_array('SELECT queue_wait(?, ?);', {}, $self->table, $self->timeout);
            unless ($got_job) {
                debugf("no job...");
                next LOOP;
            };
            infof("got job");

            try {
                local $SIG{TERM} = sub { infof("TERM RECEIVED : ");  $term++ };
                my $retval = $self->process($dbh);
                if ($retval) {
                    infof("finished");
                    $dbh->do(q{SELECT queue_end();});
                } else {
                    infof("failed");
                    $dbh->do(q{SELECT queue_abort();});
                }
            } catch {
                critf("error occured: $_");
                $dbh->do(q{SELECT queue_abort();});
            };
        }
        infof("ready to die:  $self->{job_count}, $self->{max_job_count}, $term");

        $pm->finish;
    }
    infof("parent is ready for exit");
    $pm->wait_all_children();
    infof("ok, I'll die!");
}

1;
__END__

    package MyWorker;
    use base qw/OreOre::Worker::Base/;

    sub table { 'foo' }

    sub process {
        my ($self, $dbh) = @_;

        my ($col1, $col2) = $dbh->selectrow_array('SELECT * FROM foo');
        ...
    }

    package main;
    use DBI;

    my $worker = MyWorker->new(
        max_workers   => 3,
        connect_info  => ['dbi:mysql:database=queue', 'root', ''],
        timeout       => 30,
    );
    $worker->run();