Moving in Controller Abstracts and Worker related stuff

This commit is contained in:
Greyscale 2020-06-16 10:22:47 +02:00
parent c4181551ca
commit ad8dc7fa6c
12 changed files with 839 additions and 2 deletions

View file

@ -0,0 +1,111 @@
<?php
namespace Benzine\Controllers;
use Slim\Http\Request;
use Slim\Http\Response;
use ⌬\Filters\Exceptions\FilterDecodeException;
use ⌬\Filters\Filter;
abstract class Controller
{
/** @var Service */
protected $service;
/** @var bool */
protected $apiExplorerEnabled = true;
public function __construct()
{
}
/**
* @return Service
*/
public function getService()
{
return $this->service;
}
/**
* @param Service $service
*/
public function setService($service): self
{
$this->service = $service;
return $this;
}
/**
* @return bool
*/
public function isApiExplorerEnabled(): self
{
return $this->apiExplorerEnabled;
}
public function setApiExplorerEnabled(bool $apiExplorerEnabled): self
{
$this->apiExplorerEnabled = $apiExplorerEnabled;
return $this;
}
public function xmlResponse(\SimpleXMLElement $root, Request $request, Response $response): Response
{
$response = $response->withBody($root->asXML());
return $response->withHeader('Content-type', 'text/xml');
}
public function jsonResponse($json, Request $request, Response $response): Response
{
return $response->withJson($json);
}
public function jsonResponseException(\Exception $e, Request $request, Response $response): Response
{
return $this->jsonResponse(
[
'Status' => 'Fail',
'Reason' => $e->getMessage(),
],
$request,
$response
);
}
/**
* Decide if a request has a filter attached to it.
*
* @throws FilterDecodeException
*/
protected function requestHasFilters(Request $request, Response $response): bool
{
if ($request->hasHeader('Filter')) {
$filterText = trim($request->getHeader('Filter')[0]);
if (!empty($filterText)) {
$decode = json_decode($filterText);
if (null !== $decode) {
return true;
}
throw new FilterDecodeException('Could not decode given Filter. Reason: Not JSON. Given: "'.$filterText.'"');
}
}
return false;
}
/**
* Parse filters header into filter objects.
*/
protected function parseFilters(Request $request, Response $response): Filter
{
$filter = new Filter();
$filter->parseFromHeader(json_decode($request->getHeader('Filter')[0], true));
return $filter;
}
}

View file

@ -0,0 +1,126 @@
<?php
namespace Benzine\Controllers;
use Laminas\Db\Adapter\Exception\InvalidQueryException;
use Slim\Http\Request;
use Slim\Http\Response;
use ⌬\Database\Interfaces\ModelInterface;
abstract class CrudController extends Controller
{
public function listRequest(Request $request, Response $response): Response
{
$objects = [];
$service = $this->getService();
if ($this->requestHasFilters($request, $response)) {
$filterBehaviours = $this->parseFilters($request, $response);
$foundObjects = $service->getAll(
$filterBehaviours->getLimit(),
$filterBehaviours->getOffset(),
$filterBehaviours->getWheres(),
$filterBehaviours->getOrder(),
$filterBehaviours->getOrderDirection()
);
} else {
$foundObjects = $service->getAll();
}
foreach ($foundObjects as $object) {
$objects[] = $object->__toPublicArray();
}
return $this->jsonResponse(
[
'Status' => 'Okay',
'Action' => 'LIST',
$this->service->getTermPlural() => $objects,
],
$request,
$response
);
}
public function getRequest(Request $request, Response $response, $args): Response
{
$object = $this->getService()->getById($args['id']);
if ($object) {
return $this->jsonResponse(
[
'Status' => 'Okay',
'Action' => 'GET',
$this->service->getTermSingular() => $object->__toArray(),
],
$request,
$response
);
}
return $this->jsonResponse(
[
'Status' => 'Fail',
'Reason' => sprintf(
'No such %s found with id %s',
strtolower($this->service->getTermSingular()),
$args['id']
),
],
$request,
$response
);
}
public function createRequest(Request $request, Response $response, $args): Response
{
$newObjectArray = $request->getParsedBody();
try {
$object = $this->getService()->createFromArray($newObjectArray);
return $this->jsonResponse(
[
'Status' => 'Okay',
'Action' => 'CREATE',
$this->service->getTermSingular() => $object->__toArray(),
],
$request,
$response
);
} catch (InvalidQueryException $iqe) {
return $this->jsonResponseException($iqe, $request, $response);
}
}
public function deleteRequest(Request $request, Response $response, $args): Response
{
/** @var ModelInterface $object */
$object = $this->getService()->getById($args['id']);
if ($object) {
$array = $object->__toArray();
$object->destroy();
return $this->jsonResponse(
[
'Status' => 'Okay',
'Action' => 'DELETE',
$this->service->getTermSingular() => $array,
],
$request,
$response
);
}
return $this->jsonResponse(
[
'Status' => 'Fail',
'Reason' => sprintf(
'No such %s found with id %s',
strtolower($this->service->getTermSingular()),
$args['id']
),
],
$request,
$response
);
}
}

View file

@ -0,0 +1,45 @@
<?php
namespace Benzine\Controllers;
use Slim\Http\Request;
use Slim\Http\Response;
use Slim\Views\Twig;
use ⌬\Traits\RenderHtmlTrait;
abstract class HtmlController extends Controller
{
/** @var Twig */
protected $twig;
public function __construct(
Twig $twig
) {
$this->twig = $twig;
}
protected function renderInlineCss(array $files)
{
$css = '';
foreach ($files as $file) {
$css .= file_get_contents($file);
}
return "<style>{$css}</style>";
}
protected function renderHtml(Request $request, Response $response, string $template, array $parameters = []): Response
{
// If the path ends in .json, return the parameters
if ('.json' == substr($request->getUri()->getPath(), -5, 5)) {
return $this->jsonResponse($parameters, $request, $response);
}
return $this->twig->render(
$response,
$template,
$parameters
);
}
}

View file

@ -6,12 +6,10 @@ use Slim\Http\Request;
use Slim\Http\Response;
use Benzine\Configuration;
use Benzine\ORM\Profiler;
use Benzine\Traits\InlineCssTrait;
use Benzine\⌬;
class EnvironmentHeadersOnResponse
{
use InlineCssTrait;
protected $apiExplorerEnabled = true;

View file

@ -0,0 +1,182 @@
<?php
namespace Benzine\Services;
use ⌬\UUID\UUID;
class QueueService
{
public const MAX_QUEUE_AGE = 60 * 60 * 24;
protected \Redis $redis;
public function __construct(
\Redis $redis
) {
$this->redis = $redis;
}
/**
* @param string $queueName
* @param \Serializable[] $queueItems
*
* @return int the number of items successfully added
*/
public function push(string $queueName, array $queueItems): int
{
$this->redis->multi();
foreach ($queueItems as $item) {
$itemId = UUID::v4();
$serialised = serialize($item);
// Set the data element itself
$this->redis->set("queue:data:{$queueName}:{$itemId}", $serialised);
// Push the element into the index list
$this->redis->rpush("queue:index:{$queueName}", [$itemId]);
// Increment the length count
$this->redis->incr("queue:length:{$queueName}");
// Set the queue identifier to the current time, if it doesn't already exist
$this->redis->setnx("queue:queues:{$queueName}", date('Y-m-d H:i:s'));
// And set that identifier to expire in a day.
$this->redis->expire("queue:queues:{$queueName}", self::MAX_QUEUE_AGE);
}
$this->redis->exec();
$this->redis->setifhigher("queue:length-peak:{$queueName}", $this->redis->get("queue:length:{$queueName}"));
return count($queueItems);
}
/**
* Get the length of the queue.
*
* @param string $queueName
*
* @return int
*/
public function getQueueLength(string $queueName): int
{
return $this->redis->get("queue:length:{$queueName}") ?? 0;
}
/**
* Get the peak/maximum length of the queue.
*
* @param string $queueName
*
* @return int
*/
public function getQueueLengthPeak(string $queueName): int
{
return $this->redis->get("queue:length-peak:{$queueName}") ?? 0;
}
/**
* Number of seconds that the queue was created ago.
*
* @param string $queueName
*
* @return \DateTime
*/
public function getQueueCreatedAgo(string $queueName): \DateTime
{
return (new \DateTime())
->setTimestamp(strtotime($this->redis->get("queue:queues:{$queueName}")))
;
}
/**
* Number of seconds ago that the queue was updated.
*
* @param string $queueName
*
* @return \DateTime
*/
public function getQueueUpdatedAgo(string $queueName): \DateTime
{
return (new \DateTime())
->setTimestamp(time() - abs($this->getQueueExpiresIn($queueName) - self::MAX_QUEUE_AGE))
;
}
/**
* Number of seconds until a given queue will expire.
*
* @param string $queueName
*
* @return \DateTime
*/
public function getQueueExpiresIn(string $queueName): int
{
return $this->redis->ttl("queue:queues:{$queueName}");
}
/**
* @param string $queueName
* @param int $count
*
* @return WorkerWorkItem[]
*/
public function pop(string $queueName, int $count = 1): array
{
$workerWorkItems = [];
for ($i = 0; $i < $count; ++$i) {
$itemId = $this->redis->rpop("queue:index:{$queueName}");
if (!$itemId) {
continue;
}
$this->redis->multi();
$this->redis->get("queue:data:{$queueName}:{$itemId}");
$this->redis->del(["queue:data:{$queueName}:{$itemId}"]);
$this->redis->decr("queue:length:{$queueName}");
$response = $this->redis->exec();
$workerWorkItems[] = unserialize($response[0]);
}
if ($this->redis->get("queue:length:{$queueName}") <= 0) {
$this->redis->set("queue:length:{$queueName}", 0);
}
return array_filter($workerWorkItems);
}
/**
* Destroy a queue and all data inside it.
* Returns number of redis keys deleted.
*
* @param string $queueName
*
* @return int
*/
public function destroyQueue(string $queueName): int
{
$queueDataKeys = $this->redis->keys("queue:data:{$queueName}:*");
return $this->redis->del([...$queueDataKeys, "queue:length:{$queueName}", "queue:length-peak:{$queueName}", "queue:index:{$queueName}", "queue:queues:{$queueName}"]);
}
/**
* @return string[]
*/
public function listLists(): array
{
$lists = [];
foreach ($this->redis->keys(('queue:queues:*')) as $queue) {
$lists[$queue] = substr($queue, strlen('queue:queues:'));
}
ksort($lists);
return $lists;
}
/**
* Return an key->value array of queue lengths.
*
* @return array
*/
public function allQueueLengths(): array
{
$lengths = [];
foreach ($this->listLists() as $key => $name) {
$lengths[$name] = $this->getQueueLength($name);
}
return $lengths;
}
}

View file

@ -0,0 +1,172 @@
<?php
namespace Benzine\Workers;
use Monolog\Logger;
use Benzine\Services\EnvironmentService;
abstract class AbstractQueueWorker extends AbstractWorker
{
protected QueueService $queueService;
/** @var string Name of the input redis queue */
protected ?string $inputQueue;
/** @var string[] Name of the output redis queues */
protected ?array $outputQueues;
public function __construct(
QueueService $queueService,
Logger $logger,
EnvironmentService $environmentService
) {
$this->queueService = $queueService;
parent::__construct($logger, $environmentService);
}
protected function setUp(): void
{
parent::setUp();
// Set default queues
if (!isset($this->inputQueue)) {
$this->inputQueue = sprintf('%s:input', $this->getClassWithoutNamespace());
}
if (!isset($this->outputQueues)) {
$this->outputQueues[] = sprintf('%s:output', $this->getClassWithoutNamespace());
}
$this->logger->debug(
sprintf(
'Listening to "%s" and outputting on %d channel(s)',
$this->getClassWithoutNamespace(),
$this->inputQueue,
count($this->outputQueues)
)
);
}
/**
* @return QueueService
*/
public function getQueueService(): QueueService
{
return $this->queueService;
}
/**
* @param QueueService $queueService
*
* @return AbstractQueueWorker
*/
public function setQueueService(QueueService $queueService): AbstractQueueWorker
{
$this->queueService = $queueService;
return $this;
}
/**
* @return string
*/
public function getInputQueue(): string
{
return $this->inputQueue;
}
/**
* @param string $inputQueue
*
* @return AbstractQueueWorker
*/
public function setInputQueue(string $inputQueue): AbstractQueueWorker
{
$this->inputQueue = $inputQueue;
return $this;
}
/**
* @return string[]
*/
public function getOutputQueues(): array
{
return $this->outputQueues;
}
/**
* @param string[] $outputQueues
*
* @return AbstractQueueWorker
*/
public function setOutputQueues(array $outputQueues): AbstractQueueWorker
{
$this->outputQueues = $outputQueues;
return $this;
}
public function iterate(): bool
{
$queueLength = $this->queueService->getQueueLength($this->inputQueue);
$this->logger->debug(sprintf(
'Queue %s Length: %d',
$this->inputQueue,
$queueLength
));
if (isset($this->cliArguments['stop-on-zero']) && true === $this->cliArguments['stop-on-zero'] && 0 == $queueLength) {
$this->logger->warning('--stop-on-zero is set, and the queue length is zero! Stopping!');
exit;
exit;
}
if ($queueLength <= 0) {
return false;
}
$items = $this->queueService->pop($this->inputQueue);
$resultItems = [];
foreach ($items as $item) {
$processResults = $this->process($item);
if (is_array($processResults)) {
$resultItems[] += $processResults;
} else {
$resultItems[] = $processResults;
}
}
foreach ($this->outputQueues as $outputQueue) {
$this->queueService->push($outputQueue, $resultItems);
}
return true;
}
/**
* Send work item back to the queue it came from.
*
* @param WorkerWorkItem $item
*/
protected function returnToInputQueue(WorkerWorkItem $item): void
{
$this->queueService->push($this->inputQueue, [$item]);
}
protected function sendToSuccessQueues(WorkerWorkItem $item): int
{
$queuedItems = 0;
foreach ($this->outputQueues as $outputQueue) {
$queuedItems += $this->queueService->push($outputQueue, [$item]);
}
return $queuedItems;
}
protected function sendToFailureQueue(WorkerWorkItem $item): void
{
$this->queueService->push($this->getFailureQueue(), [$item]);
}
protected function getFailureQueue(): string
{
return sprintf('%s:failures', $this->inputQueue);
}
}

View file

@ -0,0 +1,71 @@
<?php
namespace Benzine\Workers;
use Monolog\Logger;
use Benzine\Services\EnvironmentService;
abstract class AbstractWorker
{
protected Logger $logger;
protected EnvironmentService $environmentService;
protected array $cliArguments;
protected int $timeBetweenRuns = 1;
public function __construct(
Logger $logger,
EnvironmentService $environmentService
) {
$this->logger = $logger;
$this->environmentService = $environmentService;
$this->setUp();
$this->logger->info(
sprintf(
'Started Worker "%s".',
$this->getClassWithoutNamespace()
)
);
}
protected function setUp(): void
{
}
/**
* @return array
*/
public function getCliArguments(): array
{
return $this->cliArguments;
}
/**
* @param array $cliArguments
*
* @return AbstractWorker
*/
public function setCliArguments(array $cliArguments): AbstractWorker
{
$this->cliArguments = $cliArguments;
return $this;
}
public function run(): void
{
$this->logger->debug("Running with an interval of {$this->timeBetweenRuns}");
while (true) {
$didWork = $this->iterate();
if (!$didWork) {
sleep($this->timeBetweenRuns);
}
}
}
protected function getClassWithoutNamespace(): string
{
$classNameElems = explode('\\', get_called_class());
return end($classNameElems);
}
}

View file

@ -0,0 +1,11 @@
<?php
namespace Benzine\Workers;
class ExampleQueueWorker extends AbstractQueueWorker implements QueueWorkerInterface
{
public function process(WorkerWorkItem $item): ?WorkerWorkItem
{
return $item->setOutput(sqrt($item->getInput()));
}
}

View file

@ -0,0 +1,15 @@
<?php
namespace Benzine\Workers;
abstract class ForeverLoopWorker extends AbstractWorker implements WorkerInterface
{
public function run(): void
{
$this->logger->debug("Running with an interval of {$this->timeBetweenRuns}");
while (true) {
$didWork = $this->iterate();
sleep($this->timeBetweenRuns);
}
}
}

View file

@ -0,0 +1,13 @@
<?php
namespace Benzine\Workers;
interface QueueWorkerInterface extends WorkerInterface
{
/**
* @param $item WorkerWorkItem
*
* @return null|WorkerWorkItem mutated result work item, or null
*/
public function process(WorkerWorkItem $item): ?WorkerWorkItem;
}

View file

@ -0,0 +1,16 @@
<?php
namespace Benzine\Workers;
interface WorkerInterface
{
/**
* @return bool true if work done successfully, false if not
*/
public function iterate(): bool;
/**
* Indefinitely run an instance of this worker.
*/
public function run(): void;
}

View file

@ -0,0 +1,77 @@
<?php
namespace Benzine\Workers;
use ⌬\Controllers\Abstracts\Model;
class WorkerWorkItem
{
protected array $data;
public function __call($name, $arguments)
{
$method = substr(strtolower($name), 0, 3);
$field = substr(strtolower($name), 3);
switch ($method) {
case 'set':
$this->data[$field] = $arguments[0];
return $this;
case 'get':
return $this->data[$field];
default:
throw new \Exception("Method {$name} doesn't exist");
}
}
public function __serialize(): array
{
return $this->data;
}
public function __unserialize(array $data): void
{
$this->data = $data;
}
/**
* @return array
*/
public function getData(): array
{
return $this->data;
}
/**
* @param array $data
*
* @return WorkerWorkItem
*/
public function setData(array $data): self
{
$this->data = $data;
return $this;
}
public function setKey(string $key, $value): self
{
$this->data[$key] = $value;
return $this;
}
public function getKey(string $key)
{
if ($this->data[$key] instanceof Model) {
$this->data[$key]->__setUp();
}
return $this->data[$key];
}
public function getKeys(): array
{
return array_keys($this->data);
}
}