From ad8dc7fa6c514d62b345fe3676cc78201cd4c70e Mon Sep 17 00:00:00 2001 From: Matthew Baggett Date: Tue, 16 Jun 2020 10:22:47 +0200 Subject: [PATCH] Moving in Controller Abstracts and Worker related stuff --- src/Controllers/Controller.php | 111 +++++++++++ src/Controllers/CrudController.php | 126 ++++++++++++ src/Controllers/HtmlController.php | 45 +++++ .../EnvironmentHeadersOnResponse.php | 2 - src/Services/QueueService.php | 182 ++++++++++++++++++ src/Workers/AbstractQueueWorker.php | 172 +++++++++++++++++ src/Workers/AbstractWorker.php | 71 +++++++ src/Workers/ExampleQueueWorker.php | 11 ++ src/Workers/ForeverLoopWorker.php | 15 ++ src/Workers/QueueWorkerInterface.php | 13 ++ src/Workers/WorkerInterface.php | 16 ++ src/Workers/WorkerWorkItem.php | 77 ++++++++ 12 files changed, 839 insertions(+), 2 deletions(-) create mode 100644 src/Controllers/Controller.php create mode 100644 src/Controllers/CrudController.php create mode 100644 src/Controllers/HtmlController.php create mode 100644 src/Services/QueueService.php create mode 100644 src/Workers/AbstractQueueWorker.php create mode 100644 src/Workers/AbstractWorker.php create mode 100644 src/Workers/ExampleQueueWorker.php create mode 100644 src/Workers/ForeverLoopWorker.php create mode 100644 src/Workers/QueueWorkerInterface.php create mode 100644 src/Workers/WorkerInterface.php create mode 100644 src/Workers/WorkerWorkItem.php diff --git a/src/Controllers/Controller.php b/src/Controllers/Controller.php new file mode 100644 index 0000000..41570c1 --- /dev/null +++ b/src/Controllers/Controller.php @@ -0,0 +1,111 @@ +service; + } + + /** + * @param Service $service + */ + public function setService($service): self + { + $this->service = $service; + + return $this; + } + + /** + * @return bool + */ + public function isApiExplorerEnabled(): self + { + return $this->apiExplorerEnabled; + } + + public function setApiExplorerEnabled(bool $apiExplorerEnabled): self + { + $this->apiExplorerEnabled = $apiExplorerEnabled; + + return $this; + } + + public function xmlResponse(\SimpleXMLElement $root, Request $request, Response $response): Response + { + $response = $response->withBody($root->asXML()); + + return $response->withHeader('Content-type', 'text/xml'); + } + + public function jsonResponse($json, Request $request, Response $response): Response + { + return $response->withJson($json); + } + + public function jsonResponseException(\Exception $e, Request $request, Response $response): Response + { + return $this->jsonResponse( + [ + 'Status' => 'Fail', + 'Reason' => $e->getMessage(), + ], + $request, + $response + ); + } + + /** + * Decide if a request has a filter attached to it. + * + * @throws FilterDecodeException + */ + protected function requestHasFilters(Request $request, Response $response): bool + { + if ($request->hasHeader('Filter')) { + $filterText = trim($request->getHeader('Filter')[0]); + if (!empty($filterText)) { + $decode = json_decode($filterText); + if (null !== $decode) { + return true; + } + + throw new FilterDecodeException('Could not decode given Filter. Reason: Not JSON. Given: "'.$filterText.'"'); + } + } + + return false; + } + + /** + * Parse filters header into filter objects. + */ + protected function parseFilters(Request $request, Response $response): Filter + { + $filter = new Filter(); + $filter->parseFromHeader(json_decode($request->getHeader('Filter')[0], true)); + + return $filter; + } +} diff --git a/src/Controllers/CrudController.php b/src/Controllers/CrudController.php new file mode 100644 index 0000000..6faad0b --- /dev/null +++ b/src/Controllers/CrudController.php @@ -0,0 +1,126 @@ +getService(); + if ($this->requestHasFilters($request, $response)) { + $filterBehaviours = $this->parseFilters($request, $response); + $foundObjects = $service->getAll( + $filterBehaviours->getLimit(), + $filterBehaviours->getOffset(), + $filterBehaviours->getWheres(), + $filterBehaviours->getOrder(), + $filterBehaviours->getOrderDirection() + ); + } else { + $foundObjects = $service->getAll(); + } + + foreach ($foundObjects as $object) { + $objects[] = $object->__toPublicArray(); + } + + return $this->jsonResponse( + [ + 'Status' => 'Okay', + 'Action' => 'LIST', + $this->service->getTermPlural() => $objects, + ], + $request, + $response + ); + } + + public function getRequest(Request $request, Response $response, $args): Response + { + $object = $this->getService()->getById($args['id']); + if ($object) { + return $this->jsonResponse( + [ + 'Status' => 'Okay', + 'Action' => 'GET', + $this->service->getTermSingular() => $object->__toArray(), + ], + $request, + $response + ); + } + + return $this->jsonResponse( + [ + 'Status' => 'Fail', + 'Reason' => sprintf( + 'No such %s found with id %s', + strtolower($this->service->getTermSingular()), + $args['id'] + ), + ], + $request, + $response + ); + } + + public function createRequest(Request $request, Response $response, $args): Response + { + $newObjectArray = $request->getParsedBody(); + + try { + $object = $this->getService()->createFromArray($newObjectArray); + + return $this->jsonResponse( + [ + 'Status' => 'Okay', + 'Action' => 'CREATE', + $this->service->getTermSingular() => $object->__toArray(), + ], + $request, + $response + ); + } catch (InvalidQueryException $iqe) { + return $this->jsonResponseException($iqe, $request, $response); + } + } + + public function deleteRequest(Request $request, Response $response, $args): Response + { + /** @var ModelInterface $object */ + $object = $this->getService()->getById($args['id']); + if ($object) { + $array = $object->__toArray(); + $object->destroy(); + + return $this->jsonResponse( + [ + 'Status' => 'Okay', + 'Action' => 'DELETE', + $this->service->getTermSingular() => $array, + ], + $request, + $response + ); + } + + return $this->jsonResponse( + [ + 'Status' => 'Fail', + 'Reason' => sprintf( + 'No such %s found with id %s', + strtolower($this->service->getTermSingular()), + $args['id'] + ), + ], + $request, + $response + ); + } +} diff --git a/src/Controllers/HtmlController.php b/src/Controllers/HtmlController.php new file mode 100644 index 0000000..0da3e60 --- /dev/null +++ b/src/Controllers/HtmlController.php @@ -0,0 +1,45 @@ +twig = $twig; + } + + protected function renderInlineCss(array $files) + { + $css = ''; + foreach ($files as $file) { + $css .= file_get_contents($file); + } + + return ""; + } + + protected function renderHtml(Request $request, Response $response, string $template, array $parameters = []): Response + { + // If the path ends in .json, return the parameters + if ('.json' == substr($request->getUri()->getPath(), -5, 5)) { + return $this->jsonResponse($parameters, $request, $response); + } + + return $this->twig->render( + $response, + $template, + $parameters + ); + } +} diff --git a/src/Middlewares/EnvironmentHeadersOnResponse.php b/src/Middlewares/EnvironmentHeadersOnResponse.php index d22dc77..3fe321e 100644 --- a/src/Middlewares/EnvironmentHeadersOnResponse.php +++ b/src/Middlewares/EnvironmentHeadersOnResponse.php @@ -6,12 +6,10 @@ use Slim\Http\Request; use Slim\Http\Response; use Benzine\Configuration; use Benzine\ORM\Profiler; -use Benzine\Traits\InlineCssTrait; use Benzine\⌬; class EnvironmentHeadersOnResponse { - use InlineCssTrait; protected $apiExplorerEnabled = true; diff --git a/src/Services/QueueService.php b/src/Services/QueueService.php new file mode 100644 index 0000000..4a9b2ba --- /dev/null +++ b/src/Services/QueueService.php @@ -0,0 +1,182 @@ +redis = $redis; + } + + /** + * @param string $queueName + * @param \Serializable[] $queueItems + * + * @return int the number of items successfully added + */ + public function push(string $queueName, array $queueItems): int + { + $this->redis->multi(); + foreach ($queueItems as $item) { + $itemId = UUID::v4(); + $serialised = serialize($item); + // Set the data element itself + $this->redis->set("queue:data:{$queueName}:{$itemId}", $serialised); + // Push the element into the index list + $this->redis->rpush("queue:index:{$queueName}", [$itemId]); + // Increment the length count + $this->redis->incr("queue:length:{$queueName}"); + // Set the queue identifier to the current time, if it doesn't already exist + $this->redis->setnx("queue:queues:{$queueName}", date('Y-m-d H:i:s')); + // And set that identifier to expire in a day. + $this->redis->expire("queue:queues:{$queueName}", self::MAX_QUEUE_AGE); + } + $this->redis->exec(); + $this->redis->setifhigher("queue:length-peak:{$queueName}", $this->redis->get("queue:length:{$queueName}")); + + return count($queueItems); + } + + /** + * Get the length of the queue. + * + * @param string $queueName + * + * @return int + */ + public function getQueueLength(string $queueName): int + { + return $this->redis->get("queue:length:{$queueName}") ?? 0; + } + + /** + * Get the peak/maximum length of the queue. + * + * @param string $queueName + * + * @return int + */ + public function getQueueLengthPeak(string $queueName): int + { + return $this->redis->get("queue:length-peak:{$queueName}") ?? 0; + } + + /** + * Number of seconds that the queue was created ago. + * + * @param string $queueName + * + * @return \DateTime + */ + public function getQueueCreatedAgo(string $queueName): \DateTime + { + return (new \DateTime()) + ->setTimestamp(strtotime($this->redis->get("queue:queues:{$queueName}"))) + ; + } + + /** + * Number of seconds ago that the queue was updated. + * + * @param string $queueName + * + * @return \DateTime + */ + public function getQueueUpdatedAgo(string $queueName): \DateTime + { + return (new \DateTime()) + ->setTimestamp(time() - abs($this->getQueueExpiresIn($queueName) - self::MAX_QUEUE_AGE)) + ; + } + + /** + * Number of seconds until a given queue will expire. + * + * @param string $queueName + * + * @return \DateTime + */ + public function getQueueExpiresIn(string $queueName): int + { + return $this->redis->ttl("queue:queues:{$queueName}"); + } + + /** + * @param string $queueName + * @param int $count + * + * @return WorkerWorkItem[] + */ + public function pop(string $queueName, int $count = 1): array + { + $workerWorkItems = []; + for ($i = 0; $i < $count; ++$i) { + $itemId = $this->redis->rpop("queue:index:{$queueName}"); + if (!$itemId) { + continue; + } + $this->redis->multi(); + $this->redis->get("queue:data:{$queueName}:{$itemId}"); + $this->redis->del(["queue:data:{$queueName}:{$itemId}"]); + $this->redis->decr("queue:length:{$queueName}"); + $response = $this->redis->exec(); + $workerWorkItems[] = unserialize($response[0]); + } + if ($this->redis->get("queue:length:{$queueName}") <= 0) { + $this->redis->set("queue:length:{$queueName}", 0); + } + + return array_filter($workerWorkItems); + } + + /** + * Destroy a queue and all data inside it. + * Returns number of redis keys deleted. + * + * @param string $queueName + * + * @return int + */ + public function destroyQueue(string $queueName): int + { + $queueDataKeys = $this->redis->keys("queue:data:{$queueName}:*"); + + return $this->redis->del([...$queueDataKeys, "queue:length:{$queueName}", "queue:length-peak:{$queueName}", "queue:index:{$queueName}", "queue:queues:{$queueName}"]); + } + + /** + * @return string[] + */ + public function listLists(): array + { + $lists = []; + foreach ($this->redis->keys(('queue:queues:*')) as $queue) { + $lists[$queue] = substr($queue, strlen('queue:queues:')); + } + ksort($lists); + + return $lists; + } + + /** + * Return an key->value array of queue lengths. + * + * @return array + */ + public function allQueueLengths(): array + { + $lengths = []; + foreach ($this->listLists() as $key => $name) { + $lengths[$name] = $this->getQueueLength($name); + } + + return $lengths; + } +} diff --git a/src/Workers/AbstractQueueWorker.php b/src/Workers/AbstractQueueWorker.php new file mode 100644 index 0000000..80b7238 --- /dev/null +++ b/src/Workers/AbstractQueueWorker.php @@ -0,0 +1,172 @@ +queueService = $queueService; + parent::__construct($logger, $environmentService); + } + + protected function setUp(): void + { + parent::setUp(); + // Set default queues + if (!isset($this->inputQueue)) { + $this->inputQueue = sprintf('%s:input', $this->getClassWithoutNamespace()); + } + if (!isset($this->outputQueues)) { + $this->outputQueues[] = sprintf('%s:output', $this->getClassWithoutNamespace()); + } + $this->logger->debug( + sprintf( + 'Listening to "%s" and outputting on %d channel(s)', + $this->getClassWithoutNamespace(), + $this->inputQueue, + count($this->outputQueues) + ) + ); + } + + /** + * @return QueueService + */ + public function getQueueService(): QueueService + { + return $this->queueService; + } + + /** + * @param QueueService $queueService + * + * @return AbstractQueueWorker + */ + public function setQueueService(QueueService $queueService): AbstractQueueWorker + { + $this->queueService = $queueService; + + return $this; + } + + /** + * @return string + */ + public function getInputQueue(): string + { + return $this->inputQueue; + } + + /** + * @param string $inputQueue + * + * @return AbstractQueueWorker + */ + public function setInputQueue(string $inputQueue): AbstractQueueWorker + { + $this->inputQueue = $inputQueue; + + return $this; + } + + /** + * @return string[] + */ + public function getOutputQueues(): array + { + return $this->outputQueues; + } + + /** + * @param string[] $outputQueues + * + * @return AbstractQueueWorker + */ + public function setOutputQueues(array $outputQueues): AbstractQueueWorker + { + $this->outputQueues = $outputQueues; + + return $this; + } + + public function iterate(): bool + { + $queueLength = $this->queueService->getQueueLength($this->inputQueue); + + $this->logger->debug(sprintf( + 'Queue %s Length: %d', + $this->inputQueue, + $queueLength + )); + + if (isset($this->cliArguments['stop-on-zero']) && true === $this->cliArguments['stop-on-zero'] && 0 == $queueLength) { + $this->logger->warning('--stop-on-zero is set, and the queue length is zero! Stopping!'); + exit; + exit; + } + + if ($queueLength <= 0) { + return false; + } + + $items = $this->queueService->pop($this->inputQueue); + $resultItems = []; + foreach ($items as $item) { + $processResults = $this->process($item); + if (is_array($processResults)) { + $resultItems[] += $processResults; + } else { + $resultItems[] = $processResults; + } + } + foreach ($this->outputQueues as $outputQueue) { + $this->queueService->push($outputQueue, $resultItems); + } + + return true; + } + + /** + * Send work item back to the queue it came from. + * + * @param WorkerWorkItem $item + */ + protected function returnToInputQueue(WorkerWorkItem $item): void + { + $this->queueService->push($this->inputQueue, [$item]); + } + + protected function sendToSuccessQueues(WorkerWorkItem $item): int + { + $queuedItems = 0; + foreach ($this->outputQueues as $outputQueue) { + $queuedItems += $this->queueService->push($outputQueue, [$item]); + } + + return $queuedItems; + } + + protected function sendToFailureQueue(WorkerWorkItem $item): void + { + $this->queueService->push($this->getFailureQueue(), [$item]); + } + + protected function getFailureQueue(): string + { + return sprintf('%s:failures', $this->inputQueue); + } +} diff --git a/src/Workers/AbstractWorker.php b/src/Workers/AbstractWorker.php new file mode 100644 index 0000000..fcde716 --- /dev/null +++ b/src/Workers/AbstractWorker.php @@ -0,0 +1,71 @@ +logger = $logger; + $this->environmentService = $environmentService; + $this->setUp(); + $this->logger->info( + sprintf( + 'Started Worker "%s".', + $this->getClassWithoutNamespace() + ) + ); + } + + protected function setUp(): void + { + } + + /** + * @return array + */ + public function getCliArguments(): array + { + return $this->cliArguments; + } + + /** + * @param array $cliArguments + * + * @return AbstractWorker + */ + public function setCliArguments(array $cliArguments): AbstractWorker + { + $this->cliArguments = $cliArguments; + + return $this; + } + + public function run(): void + { + $this->logger->debug("Running with an interval of {$this->timeBetweenRuns}"); + while (true) { + $didWork = $this->iterate(); + if (!$didWork) { + sleep($this->timeBetweenRuns); + } + } + } + + protected function getClassWithoutNamespace(): string + { + $classNameElems = explode('\\', get_called_class()); + + return end($classNameElems); + } +} diff --git a/src/Workers/ExampleQueueWorker.php b/src/Workers/ExampleQueueWorker.php new file mode 100644 index 0000000..2b7d6ab --- /dev/null +++ b/src/Workers/ExampleQueueWorker.php @@ -0,0 +1,11 @@ +setOutput(sqrt($item->getInput())); + } +} diff --git a/src/Workers/ForeverLoopWorker.php b/src/Workers/ForeverLoopWorker.php new file mode 100644 index 0000000..8034dde --- /dev/null +++ b/src/Workers/ForeverLoopWorker.php @@ -0,0 +1,15 @@ +logger->debug("Running with an interval of {$this->timeBetweenRuns}"); + while (true) { + $didWork = $this->iterate(); + sleep($this->timeBetweenRuns); + } + } +} diff --git a/src/Workers/QueueWorkerInterface.php b/src/Workers/QueueWorkerInterface.php new file mode 100644 index 0000000..059434e --- /dev/null +++ b/src/Workers/QueueWorkerInterface.php @@ -0,0 +1,13 @@ +data[$field] = $arguments[0]; + + return $this; + case 'get': + return $this->data[$field]; + default: + throw new \Exception("Method {$name} doesn't exist"); + } + } + + public function __serialize(): array + { + return $this->data; + } + + public function __unserialize(array $data): void + { + $this->data = $data; + } + + /** + * @return array + */ + public function getData(): array + { + return $this->data; + } + + /** + * @param array $data + * + * @return WorkerWorkItem + */ + public function setData(array $data): self + { + $this->data = $data; + + return $this; + } + + public function setKey(string $key, $value): self + { + $this->data[$key] = $value; + + return $this; + } + + public function getKey(string $key) + { + if ($this->data[$key] instanceof Model) { + $this->data[$key]->__setUp(); + } + + return $this->data[$key]; + } + + public function getKeys(): array + { + return array_keys($this->data); + } +}