77 lines
2.3 KiB
PHP
Executable file
77 lines
2.3 KiB
PHP
Executable file
#!/usr/bin/php
|
|
<?php
|
|
require_once(__DIR__ . "/find-autoloader.php");
|
|
|
|
use Benzine\Workers\AbstractQueueWorker;
|
|
use duncan3dc\Forker\Fork;
|
|
use Benzine\App;
|
|
use Benzine\Exceptions\BenzineException;
|
|
|
|
$flags = new \donatj\Flags();
|
|
|
|
$worker = &$flags->string('worker',null,'Worker class to run in this job');
|
|
$threads = &$flags->int('threads',1,'Number of threads to use.');
|
|
$stopOnZero = &$flags->bool('stop-on-zero', false, 'Should this process stop on zero?');
|
|
|
|
$flags->parse();
|
|
|
|
$databases = App::Instance()->get(\Benzine\ORM\Connection\Databases::class)
|
|
->waitForConnectivity();
|
|
|
|
if(class_exists($worker)){
|
|
$workerClass = $worker;
|
|
} else {
|
|
$workerFile = __DIR__ . "/../../../../src/Workers/{$worker}.php";
|
|
if (!file_exists($workerFile)) {
|
|
throw new BenzineException(sprintf("No such worker as \"%s\", no such class as \"%s\"", realpath($workerFile), $worker));
|
|
}
|
|
require_once($workerFile);
|
|
|
|
$acceptedInterfaces = [
|
|
\Benzine\Workers\WorkerInterface::class,
|
|
\Benzine\Workers\QueueWorkerInterface::class,
|
|
];
|
|
$workerClass = null;
|
|
foreach (get_declared_classes() as $declaredClass) {
|
|
$implements = class_implements($declaredClass);
|
|
if (count($implements) > 0) {
|
|
#\Kint::dump($implements, $declaredClass);
|
|
foreach ($acceptedInterfaces as $acceptedInterface) {
|
|
$testClass = new \ReflectionClass($declaredClass);
|
|
if ($testClass->isAbstract()) {
|
|
continue;
|
|
}
|
|
if (isset($implements[$acceptedInterface])) {
|
|
$workerClass = "\\{$declaredClass}";
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if(!$workerClass || !class_exists($workerClass)){
|
|
die("No such worker \"{$workerClass}\".\n");
|
|
}
|
|
|
|
echo "Starting {$workerClass} with {$threads} threads.\n";
|
|
|
|
if($threads == 1){
|
|
/** @var AbstractQueueWorker $worker */
|
|
$worker = App::DI($workerClass);
|
|
$worker->setStopOnZero($stopOnZero);
|
|
$worker->run();
|
|
}else {
|
|
$fork = new Fork;
|
|
|
|
for ($i = 0; $i < $threads; $i++) {
|
|
$fork->call(function () use ($workerClass, $args) {
|
|
/** @var AbstractQueueWorker $worker */
|
|
$worker = App::DI($workerClass);
|
|
$worker->setStopOnZero($stopOnZero);
|
|
$worker->run();
|
|
});
|
|
}
|
|
|
|
$fork->wait();
|
|
}
|