Remove deprecated EventRelayerKafka and KafkaHandler
Also remove the unmaintained kafka-php package from the from "suggested" and "dev" composer dependencies, as this is now no longer used. Change-Id: If5668974f417b627df95bce47db18d46fa03327c
This commit is contained in:
parent
1fbbcb6371
commit
2de79774e1
6 changed files with 3 additions and 562 deletions
|
|
@ -80,6 +80,8 @@ because of Phabricator reports.
|
|||
- ::fetchRow()
|
||||
- ::numRows()
|
||||
- ::freeResult()
|
||||
* EventRelayerKafka, deprecated in 1.38, was removed.
|
||||
* MediaWiki\Logger\Monolog\KafkaHandler, deprecated in 1.38, was removed.
|
||||
* Database::wasKnownStatementRollbackError() was removed. Subclasses should
|
||||
override isKnownStatementRollbackError() instead.
|
||||
* Changes to skins:
|
||||
|
|
|
|||
|
|
@ -442,7 +442,6 @@ $wgAutoloadLocalClasses = [
|
|||
'EtcdConfigParseError' => __DIR__ . '/includes/config/EtcdConfigParseError.php',
|
||||
'EventRelayer' => __DIR__ . '/includes/libs/eventrelayer/EventRelayer.php',
|
||||
'EventRelayerGroup' => __DIR__ . '/includes/EventRelayerGroup.php',
|
||||
'EventRelayerKafka' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerKafka.php',
|
||||
'EventRelayerNull' => __DIR__ . '/includes/libs/eventrelayer/EventRelayerNull.php',
|
||||
'ExecutableFinder' => __DIR__ . '/includes/utils/ExecutableFinder.php',
|
||||
'Exif' => __DIR__ . '/includes/media/Exif.php',
|
||||
|
|
|
|||
|
|
@ -90,7 +90,6 @@
|
|||
"mediawiki/mediawiki-codesniffer": "38.0.0",
|
||||
"mediawiki/mediawiki-phan-config": "0.11.0",
|
||||
"nikic/php-parser": "^4.10.2",
|
||||
"nmred/kafka-php": "0.1.5",
|
||||
"php-parallel-lint/php-console-highlighter": "0.5",
|
||||
"php-parallel-lint/php-parallel-lint": "1.3.1",
|
||||
"phpunit/phpunit": "^8.5",
|
||||
|
|
@ -117,8 +116,7 @@
|
|||
"ext-sockets": "Enable CLI concurrent processing, e.g. for rebuildLocalisationCache.php.",
|
||||
"ext-wikidiff2": "Faster text difference engine.",
|
||||
"ext-zlib": "Enable use of GZIP compression, e.g. for SqlBagOStuff (ParserCache), $wgCompressRevisions, or $wgUseFileCache.",
|
||||
"monolog/monolog": "Enable use of MonologSpi ($wgMWLoggerDefaultSpi).",
|
||||
"nmred/kafka-php": "Enable use of KafkaHandler (MonologSpi), or EventRelayerKafka ($wgEventRelayerConfig)."
|
||||
"monolog/monolog": "Enable use of MonologSpi ($wgMWLoggerDefaultSpi)."
|
||||
},
|
||||
"autoload": {
|
||||
"psr-0": {
|
||||
|
|
|
|||
|
|
@ -1,278 +0,0 @@
|
|||
<?php
|
||||
/**
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along
|
||||
* with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
* http://www.gnu.org/copyleft/gpl.html
|
||||
*
|
||||
* @file
|
||||
*/
|
||||
|
||||
namespace MediaWiki\Logger\Monolog;
|
||||
|
||||
use Kafka\MetaDataFromKafka;
|
||||
use Kafka\Produce;
|
||||
use Kafka\Protocol\Decoder;
|
||||
use MediaWiki\Logger\LoggerFactory;
|
||||
use Monolog\Handler\AbstractProcessingHandler;
|
||||
use Monolog\Logger;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
/**
|
||||
* Log handler sends log events to a kafka server.
|
||||
*
|
||||
* Constructor options array arguments:
|
||||
* * alias: map from monolog channel to kafka topic name. When no
|
||||
* alias exists the topic "monolog_$channel" will be used.
|
||||
* * swallowExceptions: Swallow exceptions that occur while talking to
|
||||
* kafka. Defaults to false.
|
||||
* * logExceptions: Log exceptions talking to kafka here. Either null,
|
||||
* the name of a channel to log to, or an object implementing
|
||||
* FormatterInterface. Defaults to null.
|
||||
*
|
||||
* Requires the nmred/kafka-php library, version >= 1.3.0
|
||||
*
|
||||
* @deprecated since 1.38
|
||||
* @since 1.26
|
||||
* @author Erik Bernhardson <ebernhardson@wikimedia.org>
|
||||
* @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
|
||||
*/
|
||||
class KafkaHandler extends AbstractProcessingHandler {
|
||||
/**
|
||||
* @var Produce Sends requests to kafka
|
||||
*/
|
||||
protected $produce;
|
||||
|
||||
/**
|
||||
* @var array Optional handler configuration
|
||||
*/
|
||||
protected $options;
|
||||
|
||||
/**
|
||||
* @var array Map from topic name to partition this request produces to
|
||||
*/
|
||||
protected $partitions = [];
|
||||
|
||||
/**
|
||||
* @var array defaults for constructor options
|
||||
*/
|
||||
private const DEFAULT_OPTIONS = [
|
||||
'alias' => [], // map from monolog channel to kafka topic
|
||||
'swallowExceptions' => false, // swallow exceptions sending records
|
||||
'logExceptions' => null, // A PSR3 logger to inform about errors
|
||||
'requireAck' => 0,
|
||||
];
|
||||
|
||||
/**
|
||||
* @param Produce $produce Kafka instance to produce through
|
||||
* @param array $options optional handler configuration
|
||||
* @param int $level The minimum logging level at which this handler will be triggered
|
||||
* @param bool $bubble Whether the messages that are handled can bubble up the stack or not
|
||||
*/
|
||||
public function __construct(
|
||||
Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
|
||||
) {
|
||||
wfDeprecated( __CLASS__, '1.38' );
|
||||
parent::__construct( $level, $bubble );
|
||||
$this->produce = $produce;
|
||||
$this->options = array_merge( self::DEFAULT_OPTIONS, $options );
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs the necessary support objects and returns a KafkaHandler
|
||||
* instance.
|
||||
*
|
||||
* @param string[] $kafkaServers
|
||||
* @param array $options
|
||||
* @param int $level The minimum logging level at which this handle will be triggered
|
||||
* @param bool $bubble Whether the messages that are handled can bubble the stack or not
|
||||
* @return KafkaHandler
|
||||
*/
|
||||
public static function factory(
|
||||
$kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
|
||||
) {
|
||||
$metadata = new MetaDataFromKafka( $kafkaServers );
|
||||
$produce = new Produce( $metadata );
|
||||
|
||||
if ( isset( $options['sendTimeout'] ) ) {
|
||||
$timeOut = $options['sendTimeout'];
|
||||
$produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
|
||||
$produce->getClient()->setStreamOption( 'SendTimeoutUSec',
|
||||
intval( $timeOut * 1000000 )
|
||||
);
|
||||
}
|
||||
if ( isset( $options['recvTimeout'] ) ) {
|
||||
$timeOut = $options['recvTimeout'];
|
||||
$produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
|
||||
$produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
|
||||
intval( $timeOut * 1000000 )
|
||||
);
|
||||
}
|
||||
if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
|
||||
$options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
|
||||
}
|
||||
|
||||
if ( isset( $options['requireAck'] ) ) {
|
||||
$produce->setRequireAck( $options['requireAck'] );
|
||||
}
|
||||
|
||||
return new self( $produce, $options, $level, $bubble );
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
protected function write( array $record ): void {
|
||||
if ( $record['formatted'] !== null ) {
|
||||
$this->addMessages( $record['channel'], [ $record['formatted'] ] );
|
||||
$this->send();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
* @phan-param array[] $batch
|
||||
*/
|
||||
public function handleBatch( array $batch ): void {
|
||||
$channels = [];
|
||||
foreach ( $batch as $record ) {
|
||||
if ( $record['level'] < $this->level ) {
|
||||
continue;
|
||||
}
|
||||
$channels[$record['channel']][] = $this->processRecord( $record );
|
||||
}
|
||||
|
||||
$formatter = $this->getFormatter();
|
||||
foreach ( $channels as $channel => $records ) {
|
||||
$messages = [];
|
||||
foreach ( $records as $idx => $record ) {
|
||||
$message = $formatter->format( $record );
|
||||
if ( $message !== null ) {
|
||||
$messages[] = $message;
|
||||
}
|
||||
}
|
||||
if ( $messages ) {
|
||||
$this->addMessages( $channel, $messages );
|
||||
}
|
||||
}
|
||||
|
||||
$this->send();
|
||||
}
|
||||
|
||||
/**
|
||||
* Send any records in the kafka client internal queue.
|
||||
*/
|
||||
protected function send() {
|
||||
try {
|
||||
$response = $this->produce->send();
|
||||
} catch ( \Kafka\Exception $e ) {
|
||||
$ignore = $this->warning(
|
||||
'Error sending records to kafka: {exception}',
|
||||
[ 'exception' => $e ] );
|
||||
if ( !$ignore ) {
|
||||
throw $e;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if ( is_bool( $response ) ) {
|
||||
return;
|
||||
}
|
||||
|
||||
$errors = [];
|
||||
foreach ( $response as $topicName => $partitionResponse ) {
|
||||
foreach ( $partitionResponse as $partition => $info ) {
|
||||
if ( $info['errCode'] === 0 ) {
|
||||
// no error
|
||||
continue;
|
||||
}
|
||||
$errors[] = sprintf(
|
||||
'Error producing to %s (errno %d): %s',
|
||||
$topicName,
|
||||
$info['errCode'],
|
||||
Decoder::getError( $info['errCode'] )
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if ( $errors ) {
|
||||
$error = implode( "\n", $errors );
|
||||
if ( !$this->warning( $error ) ) {
|
||||
throw new \RuntimeException( $error );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $topic Name of topic to get partition for
|
||||
* @return int|null The random partition to produce to for this request,
|
||||
* or null if a partition could not be determined.
|
||||
*/
|
||||
protected function getRandomPartition( $topic ) {
|
||||
if ( !array_key_exists( $topic, $this->partitions ) ) {
|
||||
try {
|
||||
$partitions = $this->produce->getAvailablePartitions( $topic );
|
||||
} catch ( \Kafka\Exception $e ) {
|
||||
$ignore = $this->warning(
|
||||
'Error getting metadata for kafka topic {topic}: {exception}',
|
||||
[ 'topic' => $topic, 'exception' => $e ] );
|
||||
if ( $ignore ) {
|
||||
return null;
|
||||
}
|
||||
throw $e;
|
||||
}
|
||||
if ( $partitions ) {
|
||||
$key = array_rand( $partitions );
|
||||
$this->partitions[$topic] = $partitions[$key];
|
||||
} else {
|
||||
$details = $this->produce->getClient()->getTopicDetail( $topic );
|
||||
$ignore = $this->warning(
|
||||
'No partitions available for kafka topic {topic}',
|
||||
[ 'topic' => $topic, 'kafka' => $details ]
|
||||
);
|
||||
if ( !$ignore ) {
|
||||
throw new \RuntimeException( "No partitions available for kafka topic $topic" );
|
||||
}
|
||||
$this->partitions[$topic] = null;
|
||||
}
|
||||
}
|
||||
return $this->partitions[$topic];
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds records for a channel to the Kafka client internal queue.
|
||||
*
|
||||
* @param string $channel Name of Monolog channel records belong to
|
||||
* @param array $records List of records to append
|
||||
*/
|
||||
protected function addMessages( $channel, array $records ) {
|
||||
$topic = $this->options['alias'][$channel] ?? "monolog_$channel";
|
||||
$partition = $this->getRandomPartition( $topic );
|
||||
if ( $partition !== null ) {
|
||||
$this->produce->setMessages( $topic, $partition, $records );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $message PSR3 compatible message string
|
||||
* @param array $context PSR3 compatible log context
|
||||
* @return bool true if caller should ignore warning
|
||||
*/
|
||||
protected function warning( $message, array $context = [] ) {
|
||||
if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
|
||||
$this->options['logExceptions']->warning( $message, $context );
|
||||
}
|
||||
return $this->options['swallowExceptions'];
|
||||
}
|
||||
}
|
||||
|
|
@ -1,69 +0,0 @@
|
|||
<?php
|
||||
|
||||
use Kafka\Produce;
|
||||
|
||||
/**
|
||||
* Event relayer for Apache Kafka.
|
||||
* Configuring for WANCache:
|
||||
* 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
|
||||
*
|
||||
* @see $wgEventRelayerConfig
|
||||
* @since 1.27
|
||||
* @deprecated since 1.38
|
||||
*/
|
||||
class EventRelayerKafka extends EventRelayer {
|
||||
/**
|
||||
* Configuration.
|
||||
*
|
||||
* @var Config
|
||||
*/
|
||||
protected $config;
|
||||
|
||||
/**
|
||||
* Kafka producer.
|
||||
*
|
||||
* @var Produce
|
||||
*/
|
||||
protected $producer;
|
||||
|
||||
/**
|
||||
* Create Kafka producer.
|
||||
*
|
||||
* @param array $params
|
||||
*/
|
||||
public function __construct( array $params ) {
|
||||
wfDeprecated( __CLASS__, '1.38' );
|
||||
|
||||
parent::__construct( $params );
|
||||
|
||||
$this->config = new HashConfig( $params );
|
||||
if ( !$this->config->has( 'KafkaEventHost' ) ) {
|
||||
throw new InvalidArgumentException( "KafkaEventHost must be configured" );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the producer object from kafka-php.
|
||||
* @return Produce
|
||||
*/
|
||||
protected function getKafkaProducer() {
|
||||
if ( !$this->producer ) {
|
||||
$this->producer = Produce::getInstance(
|
||||
null, null, $this->config->get( 'KafkaEventHost' ) );
|
||||
}
|
||||
return $this->producer;
|
||||
}
|
||||
|
||||
protected function doNotify( $channel, array $events ) {
|
||||
$jsonEvents = array_map( 'json_encode', $events );
|
||||
try {
|
||||
$producer = $this->getKafkaProducer();
|
||||
$producer->setMessages( $channel, 0, $jsonEvents );
|
||||
$producer->send();
|
||||
} catch ( \Kafka\Exception $e ) {
|
||||
$this->logger->warning( "Sending events failed: $e" );
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,211 +0,0 @@
|
|||
<?php
|
||||
/**
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along
|
||||
* with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
* http://www.gnu.org/copyleft/gpl.html
|
||||
*
|
||||
* @file
|
||||
*/
|
||||
|
||||
namespace MediaWiki\Logger\Monolog;
|
||||
|
||||
use Kafka\Exception;
|
||||
use Monolog\Logger;
|
||||
|
||||
/**
|
||||
* @covers \MediaWiki\Logger\Monolog\KafkaHandler
|
||||
*/
|
||||
class KafkaHandlerTest extends \MediaWikiUnitTestCase {
|
||||
|
||||
protected function setUp(): void {
|
||||
parent::setUp();
|
||||
if ( !class_exists( \Monolog\Handler\AbstractProcessingHandler::class )
|
||||
|| !class_exists( \Kafka\Produce::class )
|
||||
) {
|
||||
$this->markTestSkipped( 'Monolog and Kafka are required for the KafkaHandlerTest' );
|
||||
}
|
||||
$this->hideDeprecated( KafkaHandler::class );
|
||||
}
|
||||
|
||||
public function topicNamingProvider() {
|
||||
return [
|
||||
[ [], 'monolog_foo' ],
|
||||
[ [ 'alias' => [ 'foo' => 'bar' ] ], 'bar' ]
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider topicNamingProvider
|
||||
*/
|
||||
public function testTopicNaming( $options, $expect ) {
|
||||
$produce = $this->getMockBuilder( \Kafka\Produce::class )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$produce->method( 'getAvailablePartitions' )
|
||||
->willReturn( [ 'A' ] );
|
||||
$produce->expects( $this->once() )
|
||||
->method( 'setMessages' )
|
||||
->with( $expect, $this->anything(), $this->anything() );
|
||||
$produce->method( 'send' )
|
||||
->willReturn( true );
|
||||
|
||||
$handler = new KafkaHandler( $produce, $options );
|
||||
$handler->handle( [
|
||||
'channel' => 'foo',
|
||||
'level' => Logger::EMERGENCY,
|
||||
'extra' => [],
|
||||
'context' => [],
|
||||
] );
|
||||
}
|
||||
|
||||
public function swallowsExceptionsWhenRequested() {
|
||||
return [
|
||||
// defaults to false
|
||||
[ [], true ],
|
||||
// also try false explicitly
|
||||
[ [ 'swallowExceptions' => false ], true ],
|
||||
// turn it on
|
||||
[ [ 'swallowExceptions' => true ], false ],
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider swallowsExceptionsWhenRequested
|
||||
*/
|
||||
public function testGetAvailablePartitionsException( $options, $expectException ) {
|
||||
$produce = $this->getMockBuilder( \Kafka\Produce::class )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$produce->method( 'getAvailablePartitions' )
|
||||
->will( $this->throwException( new Exception ) );
|
||||
$produce->method( 'send' )
|
||||
->willReturn( true );
|
||||
|
||||
if ( $expectException ) {
|
||||
$this->expectException( Exception::class );
|
||||
}
|
||||
|
||||
$handler = new KafkaHandler( $produce, $options );
|
||||
$handler->handle( [
|
||||
'channel' => 'foo',
|
||||
'level' => Logger::EMERGENCY,
|
||||
'extra' => [],
|
||||
'context' => [],
|
||||
] );
|
||||
|
||||
if ( !$expectException ) {
|
||||
$this->assertTrue( true, 'no exception was thrown' );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider swallowsExceptionsWhenRequested
|
||||
*/
|
||||
public function testSendException( $options, $expectException ) {
|
||||
$produce = $this->getMockBuilder( \Kafka\Produce::class )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$produce->method( 'getAvailablePartitions' )
|
||||
->willReturn( [ 'A' ] );
|
||||
$produce->method( 'send' )
|
||||
->will( $this->throwException( new Exception ) );
|
||||
|
||||
if ( $expectException ) {
|
||||
$this->expectException( Exception::class );
|
||||
}
|
||||
|
||||
$handler = new KafkaHandler( $produce, $options );
|
||||
$handler->handle( [
|
||||
'channel' => 'foo',
|
||||
'level' => Logger::EMERGENCY,
|
||||
'extra' => [],
|
||||
'context' => [],
|
||||
] );
|
||||
|
||||
if ( !$expectException ) {
|
||||
$this->assertTrue( true, 'no exception was thrown' );
|
||||
}
|
||||
}
|
||||
|
||||
public function testHandlesNullFormatterResult() {
|
||||
$produce = $this->getMockBuilder( \Kafka\Produce::class )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$produce->method( 'getAvailablePartitions' )
|
||||
->willReturn( [ 'A' ] );
|
||||
$produce->expects( $this->exactly( 2 ) )
|
||||
->method( 'setMessages' )
|
||||
->will( $this->onConsecutiveCalls(
|
||||
[ $this->anything(), $this->anything(), [ 'words' ] ],
|
||||
[ $this->anything(), $this->anything(), [ 'lines' ] ]
|
||||
) );
|
||||
$produce->method( 'send' )
|
||||
->willReturn( true );
|
||||
|
||||
$formatter = $this->createMock( \Monolog\Formatter\FormatterInterface::class );
|
||||
$formatter->method( 'format' )
|
||||
->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) );
|
||||
|
||||
$handler = new KafkaHandler( $produce, [] );
|
||||
$handler->setFormatter( $formatter );
|
||||
for ( $i = 0; $i < 3; ++$i ) {
|
||||
$handler->handle( [
|
||||
'channel' => 'foo',
|
||||
'level' => Logger::EMERGENCY,
|
||||
'extra' => [],
|
||||
'context' => [],
|
||||
] );
|
||||
}
|
||||
}
|
||||
|
||||
public function testBatchHandlesNullFormatterResult() {
|
||||
$produce = $this->getMockBuilder( \Kafka\Produce::class )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$produce->method( 'getAvailablePartitions' )
|
||||
->willReturn( [ 'A' ] );
|
||||
$produce->expects( $this->once() )
|
||||
->method( 'setMessages' )
|
||||
->with( $this->anything(), $this->anything(), [ 'words', 'lines' ] );
|
||||
$produce->method( 'send' )
|
||||
->willReturn( true );
|
||||
|
||||
$formatter = $this->createMock( \Monolog\Formatter\FormatterInterface::class );
|
||||
$formatter->method( 'format' )
|
||||
->will( $this->onConsecutiveCalls( 'words', null, 'lines' ) );
|
||||
|
||||
$handler = new KafkaHandler( $produce, [] );
|
||||
$handler->setFormatter( $formatter );
|
||||
$handler->handleBatch( [
|
||||
[
|
||||
'channel' => 'foo',
|
||||
'level' => Logger::EMERGENCY,
|
||||
'extra' => [],
|
||||
'context' => [],
|
||||
],
|
||||
[
|
||||
'channel' => 'foo',
|
||||
'level' => Logger::EMERGENCY,
|
||||
'extra' => [],
|
||||
'context' => [],
|
||||
],
|
||||
[
|
||||
'channel' => 'foo',
|
||||
'level' => Logger::EMERGENCY,
|
||||
'extra' => [],
|
||||
'context' => [],
|
||||
],
|
||||
] );
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue