Make it possible to extract what was just processed from the worker. Mostly needed for testing.
This commit is contained in:
parent
486dd43ded
commit
1e5dca2eb3
1 changed files with 14 additions and 5 deletions
|
|
@ -15,6 +15,8 @@ abstract class AbstractQueueWorker extends AbstractWorker
|
|||
/** @var string[] Name of the output redis queues */
|
||||
protected ?array $outputQueues;
|
||||
|
||||
protected ?array $resultItems;
|
||||
|
||||
public function __construct(
|
||||
QueueService $queueService,
|
||||
Logger $logger,
|
||||
|
|
@ -116,7 +118,6 @@ abstract class AbstractQueueWorker extends AbstractWorker
|
|||
if (isset($this->cliArguments['stop-on-zero']) && true === $this->cliArguments['stop-on-zero'] && 0 == $queueLength) {
|
||||
$this->logger->warning('--stop-on-zero is set, and the queue length is zero! Stopping!');
|
||||
exit;
|
||||
exit;
|
||||
}
|
||||
|
||||
if ($queueLength <= 0) {
|
||||
|
|
@ -124,24 +125,32 @@ abstract class AbstractQueueWorker extends AbstractWorker
|
|||
}
|
||||
|
||||
$items = $this->queueService->pop($this->inputQueue);
|
||||
$resultItems = [];
|
||||
$this->resultItems = [];
|
||||
foreach ($items as $item) {
|
||||
$processResults = $this->process($item);
|
||||
if (is_array($processResults)) {
|
||||
foreach ($processResults as $processResult) {
|
||||
$resultItems[] = $processResult;
|
||||
$this->resultItems[] = $processResult;
|
||||
}
|
||||
} else {
|
||||
$resultItems[] = $processResults;
|
||||
$this->resultItems[] = $processResults;
|
||||
}
|
||||
}
|
||||
foreach ($this->outputQueues as $outputQueue) {
|
||||
$this->queueService->push($outputQueue, $resultItems);
|
||||
$this->queueService->push($outputQueue, $this->resultItems);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return null|array
|
||||
*/
|
||||
public function getResultItems(): ?array
|
||||
{
|
||||
return $this->resultItems;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send work item back to the queue it came from.
|
||||
*
|
||||
|
|
|
|||
Loading…
Reference in a new issue