Core/bin/worker

75 lines
2.3 KiB
PHP

#!/usr/bin/php
<?php
require_once(__DIR__ . "/../../../../vendor/autoload.php");
use Benzine\Workers\AbstractQueueWorker;
use duncan3dc\Forker\Fork;
use Benzine\App;
$args = CommandLine::parseArgs($_SERVER['argv']);
if(!isset($args['worker'])){
die("You must pass a --worker= argument with this script\n");
}
$environment = array_merge($_SERVER, $_ENV);
ksort($environment);
$threads = $args['threads'] ?? $environment['THREADS'] ?? (int) shell_exec("grep -c processor /proc/cpuinfo");
if(class_exists($args['worker'])){
$workerClass = $args['worker'];
} else {
$workerFile = __DIR__ . "/../../../../src/Workers/{$args['worker']}.php";
if (!file_exists($workerFile)) {
throw new \⌬\Exceptions\BenzineException(sprintf("No such worker as \"%s\"", realpath($workerFile)));
}
require_once($workerFile);
$acceptedInterfaces = [
\⌬\Workers\WorkerInterface::class,
\⌬\Workers\QueueWorkerInterface::class,
];
$workerClass = null;
foreach (get_declared_classes() as $declaredClass) {
$implements = class_implements($declaredClass);
if (count($implements) > 0) {
#\Kint::dump($implements, $declaredClass);
foreach ($acceptedInterfaces as $acceptedInterface) {
$testClass = new \ReflectionClass($declaredClass);
if ($testClass->isAbstract()) {
continue;
}
if (isset($implements[$acceptedInterface])) {
$workerClass = "\\{$declaredClass}";
continue;
}
}
}
}
}
if(!$workerClass || !class_exists($workerClass)){
die("No such worker \"{$workerClass}\".\n");
}
echo "Starting {$workerClass} with {$threads} threads.\n";
if($threads == 1){
/** @var AbstractQueueWorker $worker */
$worker = App::Instance()->get($workerClass);
$worker->setCliArguments($args);
$worker->run();
}else {
$fork = new Fork;
for ($i = 0; $i < $threads; $i++) {
$fork->call(function () use ($workerClass, $args) {
/** @var AbstractQueueWorker $worker */
$worker = App::Instance()->get($workerClass);
$worker->setCliArguments($args);
$worker->run();
});
}
$fork->wait();
}