diff --git a/src/Services/QueueService.php b/src/Services/QueueService.php index d23e9d3..da04a93 100644 --- a/src/Services/QueueService.php +++ b/src/Services/QueueService.php @@ -3,16 +3,20 @@ namespace Benzine\Services; use Gone\UUID\UUID; +use Monolog\Logger; class QueueService { public const MAX_QUEUE_AGE = 60 * 60 * 24; protected \Redis $redis; + protected Logger $logger; public function __construct( - \Redis $redis + \Redis $redis, + Logger $logger ) { $this->redis = $redis; + $this->logger = $logger; } /** @@ -30,7 +34,7 @@ class QueueService // Set the data element itself $this->redis->set("queue:data:{$queueName}:{$itemId}", $serialised); // Push the element into the index list - $this->redis->rpush("queue:index:{$queueName}", [$itemId]); + $this->redis->rpush("queue:index:{$queueName}", $itemId); // Increment the length count $this->redis->incr("queue:length:{$queueName}"); // Set the queue identifier to the current time, if it doesn't already exist diff --git a/src/Workers/AbstractQueueWorker.php b/src/Workers/AbstractQueueWorker.php index 63c8df2..6bfe996 100644 --- a/src/Workers/AbstractQueueWorker.php +++ b/src/Workers/AbstractQueueWorker.php @@ -107,7 +107,6 @@ abstract class AbstractQueueWorker extends AbstractWorker public function iterate(): bool { $queueLength = $this->queueService->getQueueLength($this->inputQueue); - $this->logger->debug(sprintf( 'Queue %s Length: %d', $this->inputQueue, @@ -173,5 +172,5 @@ abstract class AbstractQueueWorker extends AbstractWorker return sprintf('%s:failures', $this->inputQueue); } - abstract public function process(WorkerWorkItem $item): ?WorkerWorkItem; + abstract protected function process(WorkerWorkItem $item): ?WorkerWorkItem; } diff --git a/src/Workers/ExampleQueueWorker.php b/src/Workers/ExampleQueueWorker.php index 2b7d6ab..fe32e8a 100644 --- a/src/Workers/ExampleQueueWorker.php +++ b/src/Workers/ExampleQueueWorker.php @@ -2,9 +2,9 @@ namespace Benzine\Workers; -class ExampleQueueWorker extends AbstractQueueWorker implements QueueWorkerInterface +class ExampleQueueWorker extends AbstractQueueWorker { - public function process(WorkerWorkItem $item): ?WorkerWorkItem + protected function process(WorkerWorkItem $item): ?WorkerWorkItem { return $item->setOutput(sqrt($item->getInput())); } diff --git a/src/Workers/QueueWorkerInterface.php b/src/Workers/QueueWorkerInterface.php deleted file mode 100644 index 059434e..0000000 --- a/src/Workers/QueueWorkerInterface.php +++ /dev/null @@ -1,13 +0,0 @@ -