Refactor workers to use donatj/flags instead of pwfisher/command-line-php
This commit is contained in:
parent
d1a19cc160
commit
1991528ed6
4 changed files with 23 additions and 28 deletions
24
bin/worker
24
bin/worker
|
|
@ -7,22 +7,20 @@ use duncan3dc\Forker\Fork;
|
||||||
use Benzine\App;
|
use Benzine\App;
|
||||||
use Benzine\Exceptions\BenzineException;
|
use Benzine\Exceptions\BenzineException;
|
||||||
|
|
||||||
$args = CommandLine::parseArgs($_SERVER['argv']);
|
$flags = new \donatj\Flags();
|
||||||
if(!isset($args['worker'])){
|
|
||||||
die("You must pass a --worker= argument with this script\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
$environment = array_merge($_SERVER, $_ENV);
|
$worker = &$flags->string('worker',null,'Worker class to run in this job');
|
||||||
ksort($environment);
|
$threads = &$flags->int('threads',1,'Number of threads to use.');
|
||||||
|
$stopOnZero = &$flags->bool('stop-on-zero', false, 'Should this process stop on zero?');
|
||||||
|
|
||||||
$threads = $args['threads'] ?? $environment['THREADS'] ?? (int) shell_exec("grep -c processor /proc/cpuinfo");
|
$flags->parse();
|
||||||
|
|
||||||
if(class_exists($args['worker'])){
|
if(class_exists($worker)){
|
||||||
$workerClass = $args['worker'];
|
$workerClass = $worker;
|
||||||
} else {
|
} else {
|
||||||
$workerFile = __DIR__ . "/../../../../src/Workers/{$args['worker']}.php";
|
$workerFile = __DIR__ . "/../../../../src/Workers/{$worker}.php";
|
||||||
if (!file_exists($workerFile)) {
|
if (!file_exists($workerFile)) {
|
||||||
throw new BenzineException(sprintf("No such worker as \"%s\", no such class as \"%s\"", realpath($workerFile), $args['worker']));
|
throw new BenzineException(sprintf("No such worker as \"%s\", no such class as \"%s\"", realpath($workerFile), $worker));
|
||||||
}
|
}
|
||||||
require_once($workerFile);
|
require_once($workerFile);
|
||||||
|
|
||||||
|
|
@ -58,7 +56,7 @@ echo "Starting {$workerClass} with {$threads} threads.\n";
|
||||||
if($threads == 1){
|
if($threads == 1){
|
||||||
/** @var AbstractQueueWorker $worker */
|
/** @var AbstractQueueWorker $worker */
|
||||||
$worker = App::DI($workerClass);
|
$worker = App::DI($workerClass);
|
||||||
$worker->setCliArguments($args);
|
$worker->setStopOnZero($stopOnZero);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
}else {
|
}else {
|
||||||
$fork = new Fork;
|
$fork = new Fork;
|
||||||
|
|
@ -67,7 +65,7 @@ if($threads == 1){
|
||||||
$fork->call(function () use ($workerClass, $args) {
|
$fork->call(function () use ($workerClass, $args) {
|
||||||
/** @var AbstractQueueWorker $worker */
|
/** @var AbstractQueueWorker $worker */
|
||||||
$worker = App::DI($workerClass);
|
$worker = App::DI($workerClass);
|
||||||
$worker->setCliArguments($args);
|
$worker->setStopOnZero($stopOnZero);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@
|
||||||
"cache/chain-adapter": "^1.0",
|
"cache/chain-adapter": "^1.0",
|
||||||
"cache/redis-adapter": "^1.0",
|
"cache/redis-adapter": "^1.0",
|
||||||
"doctrine/annotations": "^1.10",
|
"doctrine/annotations": "^1.10",
|
||||||
|
"donatj/flags": "^1.4",
|
||||||
"duncan3dc/fork-helper": "^2.0",
|
"duncan3dc/fork-helper": "^2.0",
|
||||||
"friendsofphp/php-cs-fixer": "^2.0",
|
"friendsofphp/php-cs-fixer": "^2.0",
|
||||||
"fzaninotto/faker": "^1.9",
|
"fzaninotto/faker": "^1.9",
|
||||||
|
|
@ -53,7 +54,6 @@
|
||||||
"psr/cache": "^1.0",
|
"psr/cache": "^1.0",
|
||||||
"psr/container": "^1.0",
|
"psr/container": "^1.0",
|
||||||
"psr/simple-cache": "^1.0",
|
"psr/simple-cache": "^1.0",
|
||||||
"pwfisher/command-line-php": "dev-master",
|
|
||||||
"slim/http-cache": "^1.0",
|
"slim/http-cache": "^1.0",
|
||||||
"slim/psr7": "^1.1",
|
"slim/psr7": "^1.1",
|
||||||
"slim/slim": "^4.5",
|
"slim/slim": "^4.5",
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ abstract class AbstractQueueWorker extends AbstractWorker
|
||||||
$this->lastLength = $queueLength;
|
$this->lastLength = $queueLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isset($this->cliArguments['stop-on-zero']) && true === $this->cliArguments['stop-on-zero'] && 0 == $queueLength) {
|
if ($this->stopOnZero && 0 == $queueLength) {
|
||||||
$this->logger->warning('--stop-on-zero is set, and the queue length is zero! Stopping!');
|
$this->logger->warning('--stop-on-zero is set, and the queue length is zero! Stopping!');
|
||||||
exit;
|
exit;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,8 +9,8 @@ abstract class AbstractWorker implements WorkerInterface
|
||||||
{
|
{
|
||||||
protected Logger $logger;
|
protected Logger $logger;
|
||||||
protected EnvironmentService $environmentService;
|
protected EnvironmentService $environmentService;
|
||||||
protected array $cliArguments;
|
|
||||||
protected int $timeBetweenRuns = 5;
|
protected int $timeBetweenRuns = 5;
|
||||||
|
protected bool $stopOnZero = false;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
Logger $logger,
|
Logger $logger,
|
||||||
|
|
@ -27,22 +27,19 @@ abstract class AbstractWorker implements WorkerInterface
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param bool $stopOnZero
|
||||||
|
*/
|
||||||
|
public function setStopOnZero(bool $stopOnZero): self
|
||||||
|
{
|
||||||
|
$this->stopOnZero = $stopOnZero;
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
protected function setUp(): void
|
protected function setUp(): void
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getCliArguments(): array
|
|
||||||
{
|
|
||||||
return $this->cliArguments;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function setCliArguments(array $cliArguments): AbstractWorker
|
|
||||||
{
|
|
||||||
$this->cliArguments = $cliArguments;
|
|
||||||
|
|
||||||
return $this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public function run(): void
|
public function run(): void
|
||||||
{
|
{
|
||||||
$this->logger->debug("Running with an interval of {$this->timeBetweenRuns} seconds.");
|
$this->logger->debug("Running with an interval of {$this->timeBetweenRuns} seconds.");
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue