Listen function

This commit is contained in:
Greyscale 2022-06-19 00:59:05 +02:00
parent 9cd475f456
commit 15892541b2
No known key found for this signature in database
GPG key ID: 74BAFF55434DA4B2
2 changed files with 66 additions and 1 deletions

View file

@ -307,6 +307,15 @@ class Redis
}
public function emit(array $message){
return $this->redis->publish(APP_NAME, json_encode($message));
return $this->redis->publish(strtolower(APP_NAME), json_encode($message));
}
public function listen($callback){
ini_set("default_socket_timeout", -1);
try {
$this->redis->psubscribe([strtolower(APP_NAME)], $callback);
}catch(\RedisException $exception){
}
}
}

View file

@ -0,0 +1,56 @@
<?php
namespace Benzine\Workers;
use Benzine\Redis\Redis;
use Benzine\Services\EnvironmentService;
use Benzine\Workers\AbstractWorker;
use Monolog\Logger;
abstract class WaitForEmitWorker extends AbstractWorker
{
protected array $messageTypes = [];
public function addMessageTypeListener(string $messageType)
{
$this->messageTypes[] = $messageType;
$this->logger->debug("Added {$messageType} to message type handlers.");
return $this;
}
public $callback;
public function setCallback($callback)
{
$this->callback = $callback;
return $this;
}
public function __construct(
protected Redis $redis,
Logger $logger,
EnvironmentService $environmentService
)
{
parent::__construct($logger, $environmentService);
$this->setCallback([$this, 'message']);
}
public function run(): void
{
$this->logger->debug("Running Emit Worker");
$this->redis->listen(array($this, "recv"));
}
public function recv($redis, $pattern, $chan, $msg)
{
$json = json_decode($msg, true);
if (in_array($json['MESSAGE_TYPE'], $this->messageTypes)) {
call_user_func($this->callback, $json);
}
}
public function iterate(): bool
{
}
}