Porting across worker stuff
This commit is contained in:
parent
3b78eadd3b
commit
49fddc70db
3 changed files with 124 additions and 1 deletions
44
bin/queue-status
Normal file
44
bin/queue-status
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
#!/usr/bin/php
|
||||
<?php
|
||||
require_once(__DIR__ . "/../../../../vendor/autoload.php");
|
||||
|
||||
use Benzine\App;
|
||||
use Benzine\Services\QueueService;
|
||||
use jc21\CliTable;
|
||||
|
||||
$timeAgo = new Westsworld\TimeAgo();
|
||||
|
||||
/** @var QueueService $queueService */
|
||||
$queueService = ⌬::Container()->get(QueueService::class);
|
||||
|
||||
while(true) {
|
||||
$data = [];
|
||||
foreach($queueService->listLists() as $list){
|
||||
$data[] = [
|
||||
"name" => $list,
|
||||
"length" => $queueService->getQueueLength($list),
|
||||
"peak" => $queueService->getQueueLengthPeak($list),
|
||||
"firstSeen" => $timeAgo->inWords($queueService->getQueueCreatedAgo($list)),
|
||||
"lastSeen" => $timeAgo->inWords($queueService->getQueueUpdatedAgo($list)),
|
||||
];
|
||||
}
|
||||
|
||||
$rowsCount = count($data) + 4;
|
||||
|
||||
$table = new CliTable;
|
||||
$table->setTableColor('blue');
|
||||
$table->setHeaderColor('cyan');
|
||||
$table->addField("Queue Name", "name", false, "white");
|
||||
$table->addField("Length", "length", false, "blue");
|
||||
$table->addField("Peak", "peak", false, "blue");
|
||||
$table->addField("Queue Existed Since", "firstSeen", false, "red");
|
||||
$table->addField("Queue Updated Last", "lastSeen", false, "green");
|
||||
$table->injectData($data);
|
||||
$table->display();
|
||||
sleep(1);
|
||||
for($i = 0; $i <= $rowsCount; $i++){
|
||||
echo "\033[2K";
|
||||
echo "\033[1A";
|
||||
}
|
||||
|
||||
}
|
||||
75
bin/worker
Normal file
75
bin/worker
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
#!/usr/bin/php
|
||||
<?php
|
||||
require_once(__DIR__ . "/../../../../vendor/autoload.php");
|
||||
|
||||
use Benzine\Workers\AbstractQueueWorker;
|
||||
use duncan3dc\Forker\Fork;
|
||||
use Benzine\App;
|
||||
|
||||
|
||||
$args = CommandLine::parseArgs($_SERVER['argv']);
|
||||
if(!isset($args['worker'])){
|
||||
die("You must pass a --worker= argument with this script\n");
|
||||
}
|
||||
|
||||
$environment = array_merge($_SERVER, $_ENV);
|
||||
ksort($environment);
|
||||
|
||||
$threads = $args['threads'] ?? $environment['THREADS'] ?? (int) shell_exec("grep -c processor /proc/cpuinfo");
|
||||
if(class_exists($args['worker'])){
|
||||
$workerClass = $args['worker'];
|
||||
} else {
|
||||
$workerFile = __DIR__ . "/../../../../src/Workers/{$args['worker']}.php";
|
||||
if (!file_exists($workerFile)) {
|
||||
throw new \⌬\Exceptions\BenzineException(sprintf("No such worker as \"%s\"", realpath($workerFile)));
|
||||
}
|
||||
require_once($workerFile);
|
||||
|
||||
$acceptedInterfaces = [
|
||||
\⌬\Workers\WorkerInterface::class,
|
||||
\⌬\Workers\QueueWorkerInterface::class,
|
||||
];
|
||||
$workerClass = null;
|
||||
foreach (get_declared_classes() as $declaredClass) {
|
||||
$implements = class_implements($declaredClass);
|
||||
if (count($implements) > 0) {
|
||||
#\Kint::dump($implements, $declaredClass);
|
||||
foreach ($acceptedInterfaces as $acceptedInterface) {
|
||||
$testClass = new \ReflectionClass($declaredClass);
|
||||
if ($testClass->isAbstract()) {
|
||||
continue;
|
||||
}
|
||||
if (isset($implements[$acceptedInterface])) {
|
||||
$workerClass = "\\{$declaredClass}";
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(!$workerClass || !class_exists($workerClass)){
|
||||
die("No such worker \"{$workerClass}\".\n");
|
||||
}
|
||||
|
||||
echo "Starting {$workerClass} with {$threads} threads.\n";
|
||||
|
||||
if($threads == 1){
|
||||
/** @var AbstractQueueWorker $worker */
|
||||
$worker = ⌬::Container()->get($workerClass);
|
||||
$worker->setCliArguments($args);
|
||||
$worker->run();
|
||||
}else {
|
||||
$fork = new Fork;
|
||||
|
||||
for ($i = 0; $i < $threads; $i++) {
|
||||
$fork->call(function () use ($workerClass, $args) {
|
||||
/** @var AbstractQueueWorker $worker */
|
||||
$worker = ⌬::Container()->get($workerClass);
|
||||
$worker->setCliArguments($args);
|
||||
$worker->run();
|
||||
});
|
||||
}
|
||||
|
||||
$fork->wait();
|
||||
}
|
||||
|
|
@ -65,5 +65,9 @@
|
|||
"Benzine\\": "src",
|
||||
"Benzine\\Tests\\": "tests/"
|
||||
}
|
||||
}
|
||||
},
|
||||
"bin": [
|
||||
"bin/queue-status",
|
||||
"bin/worker"
|
||||
]
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue