Core/bin/worker

77 lines
2.3 KiB
PHP
Executable file

#!/usr/bin/php
<?php
require_once(__DIR__ . "/find-autoloader.php");
use Benzine\Workers\AbstractQueueWorker;
use duncan3dc\Forker\Fork;
use Benzine\App;
use Benzine\Exceptions\BenzineException;
$flags = new \donatj\Flags();
$worker = &$flags->string('worker',null,'Worker class to run in this job');
$threads = &$flags->int('threads',1,'Number of threads to use.');
$stopOnZero = &$flags->bool('stop-on-zero', false, 'Should this process stop on zero?');
$flags->parse();
$databases = App::Instance()->get(\Benzine\ORM\Connection\Databases::class)
->waitForConnectivity();
if(class_exists($worker)){
$workerClass = $worker;
} else {
$workerFile = __DIR__ . "/../../../../src/Workers/{$worker}.php";
if (!file_exists($workerFile)) {
throw new BenzineException(sprintf("No such worker as \"%s\", no such class as \"%s\"", realpath($workerFile), $worker));
}
require_once($workerFile);
$acceptedInterfaces = [
\Benzine\Workers\WorkerInterface::class,
\Benzine\Workers\QueueWorkerInterface::class,
];
$workerClass = null;
foreach (get_declared_classes() as $declaredClass) {
$implements = class_implements($declaredClass);
if (count($implements) > 0) {
#\Kint::dump($implements, $declaredClass);
foreach ($acceptedInterfaces as $acceptedInterface) {
$testClass = new \ReflectionClass($declaredClass);
if ($testClass->isAbstract()) {
continue;
}
if (isset($implements[$acceptedInterface])) {
$workerClass = "\\{$declaredClass}";
continue;
}
}
}
}
}
if(!$workerClass || !class_exists($workerClass)){
die("No such worker \"{$workerClass}\".\n");
}
echo "Starting {$workerClass} with {$threads} threads.\n";
if($threads == 1){
/** @var AbstractQueueWorker $worker */
$worker = App::DI($workerClass);
$worker->setStopOnZero($stopOnZero);
$worker->run();
}else {
$fork = new Fork;
for ($i = 0; $i < $threads; $i++) {
$fork->call(function () use ($workerClass, $args) {
/** @var AbstractQueueWorker $worker */
$worker = App::DI($workerClass);
$worker->setStopOnZero($stopOnZero);
$worker->run();
});
}
$fork->wait();
}