wiki.techinc.nl/maintenance/includes/OrderedStreamingForkController.php

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

236 lines
6.7 KiB
PHP
Raw Normal View History

<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
namespace MediaWiki\Maintenance;
/**
* Apply a transformation to values via a pool of sub processes.
*
* The controller reads lines from a given input stream, where each line
* describes work to be done. This work is then farmed out to multiple
* child streams that correspond to child procesess. Each child has exactly
* one piece of work in-flight at a given moment. The result of each work
* is written to an output stream.
*
* If numProcs is zero, the fallback is to perform work in-process instead.
*
* This class guarantees that the output is produced in the same exact order
* as input values were.
*
* Currently used by CirrusSearch extension to implement CLI search script.
*
* @ingroup Maintenance
* @since 1.30
*/
class OrderedStreamingForkController extends ForkController {
/** @var callable */
protected $workCallback;
/** @var resource */
protected $input;
/** @var resource */
protected $output;
/** @var int */
protected $nextOutputId;
/** @var string[] Int key indicates order, value is data */
protected $delayedOutputData = [];
/**
* @param int $numProcs The number of worker processes to fork
* @param callable $workCallback A callback to call in the child process
* once for each line of work to process.
* @param resource $input A socket to read work lines from
* @param resource $output A socket to write the result of work to.
*/
public function __construct( $numProcs, $workCallback, $input, $output ) {
parent::__construct( $numProcs );
$this->workCallback = $workCallback;
$this->input = $input;
$this->output = $output;
}
/**
* @inheritDoc
*/
public function start() {
if ( $this->procsToStart > 0 ) {
$status = parent::start();
if ( $status === 'child' ) {
$this->consume();
}
} else {
$status = 'parent';
$this->consumeNoFork();
}
return $status;
}
/**
* @param int $numProcs
* @return string
*/
protected function forkWorkers( $numProcs ) {
$this->prepareEnvironment();
$childSockets = [];
// Create the child processes
for ( $i = 0; $i < $numProcs; $i++ ) {
$sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
// Do the fork
$pid = pcntl_fork();
Respond to some messages from Phan on PHP 8.1 * ForkController, OrderedStreamingForkController: indeed pcntl_fork() can't return false. * RL\Image: Specify type instead of using suppression, since the issue name changes. * VueComponentParser: Accept complaint about nullable nodeValue. * Disable PHP 8.0 polyfill stubs when running on PHP 8.0+ to avoid duplicate interface errors. * Add Socket stub and use it in LegacyHandler instead of multiple existing suppressions. * MemcachedPeclBagOStuff: accept complaint recommending !$result over $result === false when the type is boolean. * MemcachedPeclBagOStuff: fix probable bug, ignoring errors from Memcached::getMulti(). Phan noticed that $res=false was unreachable, but it should probably be reachable. * DatabaseMysqli: accept complaint that $this->conn->errno is already known to be an int. It was probably a hack for some previous version of Phan. * BcryptPassword, MWOldPassword, MWSaltedPassword: accept complaint that the !is_string() checks are unnecessary, after code review of PHP. * Pbkdf2PasswordUsingHashExtension: note that contrary to Phan's suggestion, this check is necessary. * DefaultPreferencesFactory: remove an existing hack for array_diff_key(), no longer necessary on 7.4 and causes an error on 8.1. Use coalesce instead of cast for the remaining array_intersect_key() hack since it better shows that we are casting away null. * FullSearchResultWidget: fix likely bug involving strict comparison between a float and an int. * SpecialWatchlist: accept complaint that $selectedHours is unconditionally a float, being the return value of round(), and thus the cast is unnecessary. * Add stub for AllowDynamicProperties, resolving an error in User.php. * Xml: accept complaint that $encMonth is already known to be an int. Six errors remain. These need suppressions or otherwise conflict with PHP 7.4 support. Bug: T322278 Change-Id: Ie375bbc8ccf22330b9a169e8da98f2bbe26ec8b9
2022-11-03 01:55:46 +00:00
if ( $pid === -1 ) {
echo "Error creating child processes\n";
exit( 1 );
}
if ( !$pid ) {
$this->initChild();
$this->childNumber = $i;
$this->input = $sockets[0];
$this->output = $sockets[0];
fclose( $sockets[1] );
return 'child';
} else {
// This is the parent process
$this->children[$pid] = true;
fclose( $sockets[0] );
$childSockets[] = $sockets[1];
}
}
$this->feedChildren( $childSockets );
foreach ( $childSockets as $socket ) {
fclose( $socket );
}
return 'parent';
}
/**
* Child worker process. Reads work from $this->input and writes the
* result of that work to $this->output when completed.
*/
protected function consume() {
while ( !feof( $this->input ) ) {
$line = trim( fgets( $this->input ) );
if ( $line ) {
[ $id, $data ] = json_decode( $line );
$result = call_user_func( $this->workCallback, $data );
fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" );
}
}
}
/**
* Special cased version of self::consume() when no forking occurs
*/
protected function consumeNoFork() {
while ( !feof( $this->input ) ) {
$data = fgets( $this->input );
if ( substr( $data, -1 ) === "\n" ) {
// Strip any final new line used to delimit lines of input.
// The last line of input might not have it, though.
$data = substr( $data, 0, -1 );
}
if ( $data === '' ) {
continue;
}
$result = call_user_func( $this->workCallback, $data );
fwrite( $this->output, "$result\n" );
}
}
/**
* Reads lines of work from $this->input and farms them out to
* the provided socket.
*
* @param resource[] $sockets
*/
protected function feedChildren( array $sockets ) {
$used = [];
$id = 0;
$this->nextOutputId = 0;
while ( !feof( $this->input ) ) {
$data = fgets( $this->input );
if ( $used ) {
do {
$this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 );
} while ( !$sockets );
}
if ( substr( $data, -1 ) === "\n" ) {
// Strip any final new line used to delimit lines of input.
// The last line of input might not have it, though.
$data = substr( $data, 0, -1 );
}
if ( $data === '' ) {
continue;
}
$socket = array_pop( $sockets );
fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" );
$used[] = $socket;
}
while ( $used ) {
$this->updateAvailableSockets( $sockets, $used, 5 );
}
}
/**
* Moves sockets from $used to $sockets when they are available
* for more work
*
* @param resource[] &$sockets List of sockets that are waiting for work
* @param resource[] &$used List of sockets currently performing work
* @param int $timeout The number of seconds to block waiting. 0 for
* non-blocking operation.
*/
protected function updateAvailableSockets( &$sockets, &$used, $timeout ) {
$read = $used;
$write = $except = [];
stream_select( $read, $write, $except, $timeout );
foreach ( $read as $socket ) {
$line = fgets( $socket );
[ $id, $data ] = json_decode( trim( $line ) );
$this->receive( (int)$id, $data );
$sockets[] = $socket;
$idx = array_search( $socket, $used );
unset( $used[$idx] );
}
}
/**
* @param int $id
* @param string $data
*/
protected function receive( $id, $data ) {
if ( $id !== $this->nextOutputId ) {
$this->delayedOutputData[$id] = $data;
return;
}
fwrite( $this->output, $data . "\n" );
$this->nextOutputId = $id + 1;
while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" );
unset( $this->delayedOutputData[$this->nextOutputId] );
$this->nextOutputId++;
}
}
}
/** @deprecated class alias since 1.40 */
class_alias( OrderedStreamingForkController::class, 'OrderedStreamingForkController' );