Produce monolog messages through kafka+avro
This allows a logging channel to be configured to write
directly to kafka. Logs can be serialized either to json
blobs or the more compact apache avro format.
The Kafka handler for monolog needs a list of one of more
kafka servers to query cluster metadata from. This should be
able to use any monolog formatter, although some like
JsonFormatter require you to disable formatBatch as Kafka
protocol would prefer to encode each record independently in
the protocol. This requires the nmred/kafka-php library,
version >= 1.3.0.
Adds a new formatter which serializes to the apache avro
format. This is a compact binary format which uses pre-
defined schemas. This initial implementation is very simple
and takes the plain schemas as a constructor argument.
Adds a new option to MonologSpi to wrap handlers in a
BufferHandler. This doesn't flush until the request shuts
down and prevents any network requests in the logger from
adding latency to web requests.
Related mediawiki/vendor update: Ibfe4bd2036ae8e998e2973f07bd9a6f057691578
The necessary config is something like:
array(
'loggers' => array(
'CirrusSearchRequests' => array(
'handlers' => array( 'kafka' ),
),
),
'handlers' => array(
'kafka' => array(
'factory' => '\\MediaWiki\\Logger\\Monolog\\KafkaHandler::factory',
'args' => array( 'localhost:9092' ),
'formatter' => 'avro',
'buffer' => true,
),
),
'formatters' => array(
'avro' => array(
'class' => '\\MediaWiki\\Logger\\Monolog\\AvroFormatter',
'args' => array(
array(
'CirrusSearchRequests' => array(
'type' => 'record',
'name' => 'CirrusSearchRequests'
'fields' => array( ... )
),
),
),
),
),
)
Bug: T106256
Change-Id: I6ee744b3e5306af0bed70811b558a543eed22840
2015-08-04 18:02:47 +00:00
|
|
|
<?php
|
|
|
|
|
/**
|
|
|
|
|
* This program is free software; you can redistribute it and/or modify
|
|
|
|
|
* it under the terms of the GNU General Public License as published by
|
|
|
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
|
|
|
* (at your option) any later version.
|
|
|
|
|
*
|
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
|
* GNU General Public License for more details.
|
|
|
|
|
*
|
|
|
|
|
* You should have received a copy of the GNU General Public License along
|
|
|
|
|
* with this program; if not, write to the Free Software Foundation, Inc.,
|
|
|
|
|
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
|
|
|
* http://www.gnu.org/copyleft/gpl.html
|
|
|
|
|
*
|
|
|
|
|
* @file
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Generate error strings for data that doesn't match the specified
|
|
|
|
|
* Avro schema. This is very similar to AvroSchema::is_valid_datum(),
|
|
|
|
|
* but returns error messages instead of a boolean.
|
|
|
|
|
*
|
|
|
|
|
* @since 1.26
|
|
|
|
|
* @author Erik Bernhardson <ebernhardson@wikimedia.org>
|
|
|
|
|
* @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
|
|
|
|
|
*/
|
|
|
|
|
class AvroValidator {
|
|
|
|
|
/**
|
|
|
|
|
* @param AvroSchema $schema The rules to conform to.
|
|
|
|
|
* @param mixed $datum The value to validate against $schema.
|
|
|
|
|
* @return string|string[] An error or list of errors in the
|
|
|
|
|
* provided $datum. When no errors exist the empty array is
|
|
|
|
|
* returned.
|
|
|
|
|
*/
|
|
|
|
|
public static function getErrors( AvroSchema $schema, $datum ) {
|
2015-09-26 15:48:10 +00:00
|
|
|
switch ( $schema->type ) {
|
2017-12-11 03:07:50 +00:00
|
|
|
case AvroSchema::NULL_TYPE:
|
|
|
|
|
if ( !is_null( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'null', $datum );
|
|
|
|
|
}
|
|
|
|
|
return [];
|
|
|
|
|
case AvroSchema::BOOLEAN_TYPE:
|
|
|
|
|
if ( !is_bool( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'boolean', $datum );
|
|
|
|
|
}
|
|
|
|
|
return [];
|
|
|
|
|
case AvroSchema::STRING_TYPE:
|
|
|
|
|
case AvroSchema::BYTES_TYPE:
|
|
|
|
|
if ( !is_string( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'string', $datum );
|
|
|
|
|
}
|
|
|
|
|
return [];
|
|
|
|
|
case AvroSchema::INT_TYPE:
|
|
|
|
|
if ( !is_int( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'integer', $datum );
|
|
|
|
|
}
|
|
|
|
|
if ( AvroSchema::INT_MIN_VALUE > $datum
|
|
|
|
|
|| $datum > AvroSchema::INT_MAX_VALUE
|
|
|
|
|
) {
|
|
|
|
|
return self::outOfRange(
|
|
|
|
|
AvroSchema::INT_MIN_VALUE,
|
|
|
|
|
AvroSchema::INT_MAX_VALUE,
|
|
|
|
|
$datum
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
return [];
|
|
|
|
|
case AvroSchema::LONG_TYPE:
|
|
|
|
|
if ( !is_int( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'integer', $datum );
|
|
|
|
|
}
|
|
|
|
|
if ( AvroSchema::LONG_MIN_VALUE > $datum
|
|
|
|
|
|| $datum > AvroSchema::LONG_MAX_VALUE
|
|
|
|
|
) {
|
|
|
|
|
return self::outOfRange(
|
|
|
|
|
AvroSchema::LONG_MIN_VALUE,
|
|
|
|
|
AvroSchema::LONG_MAX_VALUE,
|
|
|
|
|
$datum
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
return [];
|
|
|
|
|
case AvroSchema::FLOAT_TYPE:
|
|
|
|
|
case AvroSchema::DOUBLE_TYPE:
|
|
|
|
|
if ( !is_float( $datum ) && !is_int( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'float or integer', $datum );
|
|
|
|
|
}
|
|
|
|
|
return [];
|
|
|
|
|
case AvroSchema::ARRAY_SCHEMA:
|
|
|
|
|
if ( !is_array( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'array', $datum );
|
|
|
|
|
}
|
|
|
|
|
$errors = [];
|
|
|
|
|
foreach ( $datum as $d ) {
|
|
|
|
|
$result = self::getErrors( $schema->items(), $d );
|
|
|
|
|
if ( $result ) {
|
|
|
|
|
$errors[] = $result;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return $errors;
|
|
|
|
|
case AvroSchema::MAP_SCHEMA:
|
|
|
|
|
if ( !is_array( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'array', $datum );
|
|
|
|
|
}
|
|
|
|
|
$errors = [];
|
|
|
|
|
foreach ( $datum as $k => $v ) {
|
|
|
|
|
if ( !is_string( $k ) ) {
|
|
|
|
|
$errors[] = self::wrongType( 'string key', $k );
|
|
|
|
|
}
|
|
|
|
|
$result = self::getErrors( $schema->values(), $v );
|
|
|
|
|
if ( $result ) {
|
|
|
|
|
$errors[$k] = $result;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return $errors;
|
|
|
|
|
case AvroSchema::UNION_SCHEMA:
|
|
|
|
|
$errors = [];
|
|
|
|
|
foreach ( $schema->schemas() as $schema ) {
|
|
|
|
|
$result = self::getErrors( $schema, $datum );
|
|
|
|
|
if ( !$result ) {
|
|
|
|
|
return [];
|
|
|
|
|
}
|
Produce monolog messages through kafka+avro
This allows a logging channel to be configured to write
directly to kafka. Logs can be serialized either to json
blobs or the more compact apache avro format.
The Kafka handler for monolog needs a list of one of more
kafka servers to query cluster metadata from. This should be
able to use any monolog formatter, although some like
JsonFormatter require you to disable formatBatch as Kafka
protocol would prefer to encode each record independently in
the protocol. This requires the nmred/kafka-php library,
version >= 1.3.0.
Adds a new formatter which serializes to the apache avro
format. This is a compact binary format which uses pre-
defined schemas. This initial implementation is very simple
and takes the plain schemas as a constructor argument.
Adds a new option to MonologSpi to wrap handlers in a
BufferHandler. This doesn't flush until the request shuts
down and prevents any network requests in the logger from
adding latency to web requests.
Related mediawiki/vendor update: Ibfe4bd2036ae8e998e2973f07bd9a6f057691578
The necessary config is something like:
array(
'loggers' => array(
'CirrusSearchRequests' => array(
'handlers' => array( 'kafka' ),
),
),
'handlers' => array(
'kafka' => array(
'factory' => '\\MediaWiki\\Logger\\Monolog\\KafkaHandler::factory',
'args' => array( 'localhost:9092' ),
'formatter' => 'avro',
'buffer' => true,
),
),
'formatters' => array(
'avro' => array(
'class' => '\\MediaWiki\\Logger\\Monolog\\AvroFormatter',
'args' => array(
array(
'CirrusSearchRequests' => array(
'type' => 'record',
'name' => 'CirrusSearchRequests'
'fields' => array( ... )
),
),
),
),
),
)
Bug: T106256
Change-Id: I6ee744b3e5306af0bed70811b558a543eed22840
2015-08-04 18:02:47 +00:00
|
|
|
$errors[] = $result;
|
|
|
|
|
}
|
2017-12-11 03:07:50 +00:00
|
|
|
if ( $errors ) {
|
|
|
|
|
return [ "Expected any one of these to be true", $errors ];
|
|
|
|
|
}
|
|
|
|
|
return "No schemas provided to union";
|
|
|
|
|
case AvroSchema::ENUM_SCHEMA:
|
|
|
|
|
if ( !in_array( $datum, $schema->symbols() ) ) {
|
|
|
|
|
$symbols = implode( ', ', $schema->symbols );
|
|
|
|
|
return "Expected one of $symbols but recieved $datum";
|
|
|
|
|
}
|
|
|
|
|
return [];
|
|
|
|
|
case AvroSchema::FIXED_SCHEMA:
|
|
|
|
|
if ( !is_string( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'string', $datum );
|
|
|
|
|
}
|
|
|
|
|
$len = strlen( $datum );
|
|
|
|
|
if ( $len !== $schema->size() ) {
|
|
|
|
|
return "Expected string of length {$schema->size()}, "
|
|
|
|
|
. "but recieved one of length $len";
|
|
|
|
|
}
|
|
|
|
|
return [];
|
|
|
|
|
case AvroSchema::RECORD_SCHEMA:
|
|
|
|
|
case AvroSchema::ERROR_SCHEMA:
|
|
|
|
|
case AvroSchema::REQUEST_SCHEMA:
|
|
|
|
|
if ( !is_array( $datum ) ) {
|
|
|
|
|
return self::wrongType( 'array', $datum );
|
|
|
|
|
}
|
|
|
|
|
$errors = [];
|
|
|
|
|
foreach ( $schema->fields() as $field ) {
|
|
|
|
|
$name = $field->name();
|
|
|
|
|
if ( !array_key_exists( $name, $datum ) ) {
|
|
|
|
|
$errors[$name] = 'Missing expected field';
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
$result = self::getErrors( $field->type(), $datum[$name] );
|
|
|
|
|
if ( $result ) {
|
|
|
|
|
$errors[$name] = $result;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return $errors;
|
|
|
|
|
default:
|
|
|
|
|
return "Unknown avro schema type: {$schema->type}";
|
Produce monolog messages through kafka+avro
This allows a logging channel to be configured to write
directly to kafka. Logs can be serialized either to json
blobs or the more compact apache avro format.
The Kafka handler for monolog needs a list of one of more
kafka servers to query cluster metadata from. This should be
able to use any monolog formatter, although some like
JsonFormatter require you to disable formatBatch as Kafka
protocol would prefer to encode each record independently in
the protocol. This requires the nmred/kafka-php library,
version >= 1.3.0.
Adds a new formatter which serializes to the apache avro
format. This is a compact binary format which uses pre-
defined schemas. This initial implementation is very simple
and takes the plain schemas as a constructor argument.
Adds a new option to MonologSpi to wrap handlers in a
BufferHandler. This doesn't flush until the request shuts
down and prevents any network requests in the logger from
adding latency to web requests.
Related mediawiki/vendor update: Ibfe4bd2036ae8e998e2973f07bd9a6f057691578
The necessary config is something like:
array(
'loggers' => array(
'CirrusSearchRequests' => array(
'handlers' => array( 'kafka' ),
),
),
'handlers' => array(
'kafka' => array(
'factory' => '\\MediaWiki\\Logger\\Monolog\\KafkaHandler::factory',
'args' => array( 'localhost:9092' ),
'formatter' => 'avro',
'buffer' => true,
),
),
'formatters' => array(
'avro' => array(
'class' => '\\MediaWiki\\Logger\\Monolog\\AvroFormatter',
'args' => array(
array(
'CirrusSearchRequests' => array(
'type' => 'record',
'name' => 'CirrusSearchRequests'
'fields' => array( ... )
),
),
),
),
),
)
Bug: T106256
Change-Id: I6ee744b3e5306af0bed70811b558a543eed22840
2015-08-04 18:02:47 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static function typeOf( $datum ) {
|
|
|
|
|
return is_object( $datum ) ? get_class( $datum ) : gettype( $datum );
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static function wrongType( $expected, $datum ) {
|
|
|
|
|
return "Expected $expected, but recieved " . self::typeOf( $datum );
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static function outOfRange( $min, $max, $datum ) {
|
|
|
|
|
return "Expected value between $min and $max, but recieved $datum";
|
|
|
|
|
}
|
|
|
|
|
}
|