Fixed redis ineractions.
This commit is contained in:
parent
b2968edb77
commit
5a40567697
4 changed files with 9 additions and 19 deletions
|
|
@ -3,16 +3,20 @@
|
|||
namespace Benzine\Services;
|
||||
|
||||
use Gone\UUID\UUID;
|
||||
use Monolog\Logger;
|
||||
|
||||
class QueueService
|
||||
{
|
||||
public const MAX_QUEUE_AGE = 60 * 60 * 24;
|
||||
protected \Redis $redis;
|
||||
protected Logger $logger;
|
||||
|
||||
public function __construct(
|
||||
\Redis $redis
|
||||
\Redis $redis,
|
||||
Logger $logger
|
||||
) {
|
||||
$this->redis = $redis;
|
||||
$this->logger = $logger;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -30,7 +34,7 @@ class QueueService
|
|||
// Set the data element itself
|
||||
$this->redis->set("queue:data:{$queueName}:{$itemId}", $serialised);
|
||||
// Push the element into the index list
|
||||
$this->redis->rpush("queue:index:{$queueName}", [$itemId]);
|
||||
$this->redis->rpush("queue:index:{$queueName}", $itemId);
|
||||
// Increment the length count
|
||||
$this->redis->incr("queue:length:{$queueName}");
|
||||
// Set the queue identifier to the current time, if it doesn't already exist
|
||||
|
|
|
|||
|
|
@ -107,7 +107,6 @@ abstract class AbstractQueueWorker extends AbstractWorker
|
|||
public function iterate(): bool
|
||||
{
|
||||
$queueLength = $this->queueService->getQueueLength($this->inputQueue);
|
||||
|
||||
$this->logger->debug(sprintf(
|
||||
'Queue %s Length: %d',
|
||||
$this->inputQueue,
|
||||
|
|
@ -173,5 +172,5 @@ abstract class AbstractQueueWorker extends AbstractWorker
|
|||
return sprintf('%s:failures', $this->inputQueue);
|
||||
}
|
||||
|
||||
abstract public function process(WorkerWorkItem $item): ?WorkerWorkItem;
|
||||
abstract protected function process(WorkerWorkItem $item): ?WorkerWorkItem;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,9 +2,9 @@
|
|||
|
||||
namespace Benzine\Workers;
|
||||
|
||||
class ExampleQueueWorker extends AbstractQueueWorker implements QueueWorkerInterface
|
||||
class ExampleQueueWorker extends AbstractQueueWorker
|
||||
{
|
||||
public function process(WorkerWorkItem $item): ?WorkerWorkItem
|
||||
protected function process(WorkerWorkItem $item): ?WorkerWorkItem
|
||||
{
|
||||
return $item->setOutput(sqrt($item->getInput()));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,13 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace Benzine\Workers;
|
||||
|
||||
interface QueueWorkerInterface extends WorkerInterface
|
||||
{
|
||||
/**
|
||||
* @param $item WorkerWorkItem
|
||||
*
|
||||
* @return null|WorkerWorkItem mutated result work item, or null
|
||||
*/
|
||||
public function process(WorkerWorkItem $item): ?WorkerWorkItem;
|
||||
}
|
||||
Loading…
Reference in a new issue