Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Child batch #35

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 59 additions & 17 deletions lib/Resque/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ class Resque_Worker
*/
private $child = null;

/**
* @var int How many to process per child.
*/
private $perChild = 1;

/**
* @var int How many jobs processed since last fork.
*/
private $processedInChild = 0;

/**
* Return all workers known to Resque as instantiated instances.
*/
Expand Down Expand Up @@ -126,8 +136,9 @@ public function setId($workerId)
* this method.
*
* @param string|array $queues String with a single queue name, array with multiple.
* @param int $perChild How many jobs to perform per child
*/
public function __construct($queues)
public function __construct($queues, $perChild = 1)
{
if(!is_array($queues)) {
$queues = array($queues);
Expand All @@ -142,6 +153,8 @@ public function __construct($queues)
}
$this->hostname = $hostname;
$this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues);

$this->perChild = $perChild;
}

/**
Expand Down Expand Up @@ -191,19 +204,18 @@ public function work($interval = 5)

$this->child = $this->fork();

// Forked and we're the child. Run the job.
// We're the child. Run the job.
if($this->child === 0 || $this->child === false) {
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->perform($job);
if($this->child === 0) {
exit(0);
}
$this->doneWorking();
}

if($this->child > 0) {
// Parent process, sit and wait
$job = null; // we forget the job, because it might change
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
Expand All @@ -212,19 +224,33 @@ public function work($interval = 5)
pcntl_wait($status);
$exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));
$this->fail($exitStatus);
$this->doneWorking();
}
$this->child = null;
}

$this->child = null;
$this->doneWorking();
}

$this->unregisterWorker();
}

/**
* Fail the current job
*
* @param int $exitStatus the exit code
*/
private function fail($exitStatus)
{
// error, grab the job from redis and fail it
$jobData = Resque::redis()->get('worker:' . (string)$this);
$job = new Resque_Job($jobData['queue'], $jobData['payload']);
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));

}

/**
* Process a single job.
*
Expand Down Expand Up @@ -304,12 +330,28 @@ private function fork()
return false;
}

$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
}

return $pid;
// if we are the child
if ($this->child === 0 || $this->child === false) {

// if we're meant to carry on in the same fork
if ($this->processedInChild > 0 && $this->processedInChild < $this->perChild) {
$this->processedInChild++;
return false; // tells work to move on as if we forked
}

// otherwise we die and let the parent deal with the situation below
exit(0);
} else {
// if we pass test above, fork and reset
$this->processedInChild = 1; // theoretically this is unnecessary, since it's only incremented in the child which doesn't affect the parent

$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
}

return $pid;
}
}

/**
Expand Down Expand Up @@ -580,4 +622,4 @@ public function getStat($stat)
return Resque_Stat::get($stat . ':' . $this);
}
}
?>
?>
10 changes: 8 additions & 2 deletions resque.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
$count = $COUNT;
}

$perChild = 1;
$PERCHILD = getenv('PERCHILD');
if(!empty($PERCHILD) && $PERCHILD > 1) {
$perChild = $PERCHILD;
}

if($count > 1) {
for($i = 0; $i < $count; ++$i) {
$pid = pcntl_fork();
Expand All @@ -53,7 +59,7 @@
// Child, start the worker
else if(!$pid) {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker = new Resque_Worker($queues, $perChild);
$worker->logLevel = $logLevel;
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
$worker->work($interval);
Expand All @@ -64,7 +70,7 @@
// Start a single worker
else {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker = new Resque_Worker($queues, $perChild);
$worker->logLevel = $logLevel;

$PIDFILE = getenv('PIDFILE');
Expand Down