objectcache: allow for callbacks to mask SYNC_WRITE latency
Change-Id: I908222ad3788ebe330aa58831cda139da32becd8
This commit is contained in:
parent
52511952de
commit
e99bb1b7dc
5 changed files with 383 additions and 28 deletions
|
|
@ -1498,6 +1498,7 @@ $wgAutoloadLocalClasses = [
|
|||
'VirtualRESTService' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTService.php',
|
||||
'VirtualRESTServiceClient' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTServiceClient.php',
|
||||
'WANObjectCache' => __DIR__ . '/includes/libs/objectcache/WANObjectCache.php',
|
||||
'WaitConditionLoop' => __DIR__ . '/includes/libs/WaitConditionLoop.php',
|
||||
'WantedCategoriesPage' => __DIR__ . '/includes/specials/SpecialWantedcategories.php',
|
||||
'WantedFilesPage' => __DIR__ . '/includes/specials/SpecialWantedfiles.php',
|
||||
'WantedPagesPage' => __DIR__ . '/includes/specials/SpecialWantedpages.php',
|
||||
|
|
|
|||
171
includes/libs/WaitConditionLoop.php
Normal file
171
includes/libs/WaitConditionLoop.php
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
<?php
|
||||
/**
|
||||
* Wait loop that reaches a condition or times out.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along
|
||||
* with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
* http://www.gnu.org/copyleft/gpl.html
|
||||
*
|
||||
* @file
|
||||
* @author Aaron Schulz
|
||||
*/
|
||||
|
||||
/**
|
||||
* Wait loop that reaches a condition or times out
|
||||
* @since 1.28
|
||||
*/
|
||||
class WaitConditionLoop {
|
||||
/** @var callable */
|
||||
private $condition;
|
||||
/** @var callable[] */
|
||||
private $busyCallbacks = [];
|
||||
/** @var float Seconds */
|
||||
private $timeout;
|
||||
/** @var float Seconds */
|
||||
private $lastWaitTime;
|
||||
|
||||
const CONDITION_REACHED = 1;
|
||||
const CONDITION_CONTINUE = 0; // evaluates as falsey
|
||||
const CONDITION_TIMED_OUT = -1;
|
||||
const CONDITION_ABORTED = -2;
|
||||
|
||||
/**
|
||||
* @param callable $condition Callback that returns a WaitConditionLoop::CONDITION_ constant
|
||||
* @param float $timeout Timeout in seconds
|
||||
* @param array &$busyCallbacks List of callbacks to do useful work (by reference)
|
||||
*/
|
||||
public function __construct( callable $condition, $timeout = 5.0, &$busyCallbacks = [] ) {
|
||||
$this->condition = $condition;
|
||||
$this->timeout = $timeout;
|
||||
$this->busyCallbacks =& $busyCallbacks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke the loop and continue until either:
|
||||
* - a) The condition callback does not return either CONDITION_CONTINUE or true
|
||||
* - b) The timeout is reached
|
||||
* This a condition callback can return true (stop) or false (continue) for convenience.
|
||||
* In such cases, the halting result of "true" will be converted to CONDITION_REACHED.
|
||||
*
|
||||
* Exceptions in callbacks will be caught and the callback will be swapped with
|
||||
* one that simply rethrows that exception back to the caller when invoked.
|
||||
*
|
||||
* @return integer WaitConditionLoop::CONDITION_* constant
|
||||
* @throws Exception Any error from the condition callback
|
||||
*/
|
||||
public function invoke() {
|
||||
$elapsed = 0.0; // seconds
|
||||
$sleepUs = 0; // microseconds to sleep each time
|
||||
$lastCheck = false;
|
||||
$finalResult = self::CONDITION_TIMED_OUT;
|
||||
do {
|
||||
$checkStartTime = $this->getWallTime();
|
||||
// Check if the condition is met yet
|
||||
$realStart = $this->getWallTime();
|
||||
$cpuStart = $this->getCpuTime();
|
||||
$checkResult = call_user_func( $this->condition );
|
||||
$cpu = $this->getCpuTime() - $cpuStart;
|
||||
$real = $this->getWallTime() - $realStart;
|
||||
// Exit if the condition is reached
|
||||
if ( (int)$checkResult !== self::CONDITION_CONTINUE ) {
|
||||
$finalResult = is_int( $checkResult ) ? $checkResult : self::CONDITION_REACHED;
|
||||
break;
|
||||
} elseif ( $lastCheck ) {
|
||||
break; // timeout
|
||||
}
|
||||
// Detect if condition callback seems to block or if justs burns CPU
|
||||
$conditionUsesInterrupts = ( $real > 0.100 && $cpu <= $real * .03 );
|
||||
if ( !$this->popAndRunBusyCallback() && !$conditionUsesInterrupts ) {
|
||||
// 10 queries = 10(10+100)/2 ms = 550ms, 14 queries = 1050ms
|
||||
$sleepUs = min( $sleepUs + 10 * 1e3, 1e6 ); // stop incrementing at ~1s
|
||||
$this->usleep( $sleepUs );
|
||||
}
|
||||
$checkEndTime = $this->getWallTime();
|
||||
// The max() protects against the clock getting set back
|
||||
$elapsed += max( $checkEndTime - $checkStartTime, 0.010 );
|
||||
// Do not let slow callbacks timeout without checking the condition one more time
|
||||
$lastCheck = ( $elapsed >= $this->timeout );
|
||||
} while ( true );
|
||||
|
||||
$this->lastWaitTime = $elapsed;
|
||||
|
||||
return $finalResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return float Seconds
|
||||
*/
|
||||
public function getLastWaitTime() {
|
||||
return $this->lastWaitTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param integer $microseconds
|
||||
*/
|
||||
protected function usleep( $microseconds ) {
|
||||
usleep( $microseconds );
|
||||
}
|
||||
|
||||
/**
|
||||
* @return float
|
||||
*/
|
||||
protected function getWallTime() {
|
||||
return microtime( true );
|
||||
}
|
||||
|
||||
/**
|
||||
* @return float Returns 0.0 if not supported (Windows on PHP < 7)
|
||||
*/
|
||||
protected function getCpuTime() {
|
||||
$time = 0.0;
|
||||
|
||||
if ( defined( 'HHVM_VERSION' ) && PHP_OS === 'Linux' ) {
|
||||
$ru = getrusage( 2 /* RUSAGE_THREAD */ );
|
||||
} else {
|
||||
$ru = getrusage( 0 /* RUSAGE_SELF */ );
|
||||
}
|
||||
if ( $ru ) {
|
||||
$time += $ru['ru_utime.tv_sec'] + $ru['ru_utime.tv_usec'] / 1e6;
|
||||
$time += $ru['ru_stime.tv_sec'] + $ru['ru_stime.tv_usec'] / 1e6;
|
||||
}
|
||||
|
||||
return $time;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run one of the callbacks that does work ahead of time for another caller
|
||||
*
|
||||
* @return bool Whether a callback was executed
|
||||
*/
|
||||
private function popAndRunBusyCallback() {
|
||||
if ( $this->busyCallbacks ) {
|
||||
reset( $this->busyCallbacks );
|
||||
$key = key( $this->busyCallbacks );
|
||||
/** @var callable $workCallback */
|
||||
$workCallback =& $this->busyCallbacks[$key];
|
||||
try {
|
||||
$workCallback();
|
||||
} catch ( Exception $e ) {
|
||||
$workCallback = function () use ( $e ) {
|
||||
throw $e;
|
||||
};
|
||||
}
|
||||
unset( $this->busyCallbacks[$key] ); // consume
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
@ -45,31 +45,29 @@ use Psr\Log\NullLogger;
|
|||
abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
|
||||
/** @var array[] Lock tracking */
|
||||
protected $locks = [];
|
||||
|
||||
/** @var integer ERR_* class constant */
|
||||
protected $lastError = self::ERR_NONE;
|
||||
|
||||
/** @var string */
|
||||
protected $keyspace = 'local';
|
||||
|
||||
/** @var LoggerInterface */
|
||||
protected $logger;
|
||||
|
||||
/** @var callback|null */
|
||||
protected $asyncHandler;
|
||||
/** @var integer Seconds */
|
||||
protected $syncTimeout;
|
||||
|
||||
/** @var bool */
|
||||
private $debugMode = false;
|
||||
|
||||
/** @var array */
|
||||
private $duplicateKeyLookups = [];
|
||||
|
||||
/** @var bool */
|
||||
private $reportDupes = false;
|
||||
|
||||
/** @var bool */
|
||||
private $dupeTrackScheduled = false;
|
||||
|
||||
/** @var callable[] */
|
||||
protected $busyCallbacks = [];
|
||||
|
||||
/** @var integer[] Map of (ATTR_* class constant => QOS_* class constant) */
|
||||
protected $attrMap = [];
|
||||
|
||||
|
|
@ -94,6 +92,7 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
|
|||
* In CLI mode, it should run the task immediately.
|
||||
* - reportDupes: Whether to emit warning log messages for all keys that were
|
||||
* requested more than once (requires an asyncHandler).
|
||||
* - syncTimeout: How long to wait with WRITE_SYNC in seconds.
|
||||
* @param array $params
|
||||
*/
|
||||
public function __construct( array $params = [] ) {
|
||||
|
|
@ -114,6 +113,8 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
|
|||
if ( !empty( $params['reportDupes'] ) && is_callable( $this->asyncHandler ) ) {
|
||||
$this->reportDupes = true;
|
||||
}
|
||||
|
||||
$this->syncTimeout = isset( $params['syncTimeout'] ) ? $params['syncTimeout'] : 3;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -642,6 +643,30 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
|
|||
$this->lastError = $err;
|
||||
}
|
||||
|
||||
/**
|
||||
* Let a callback be run to avoid wasting time on special blocking calls
|
||||
*
|
||||
* The callbacks may or may not be called ever, in any particular order.
|
||||
* They are likely to be invoked when something WRITE_SYNC is used used.
|
||||
* They should follow a caching pattern as shown below, so that any code
|
||||
* using the word will get it's result no matter what happens.
|
||||
* @code
|
||||
* $result = null;
|
||||
* $workCallback = function () use ( &$result ) {
|
||||
* if ( !$result ) {
|
||||
* $result = ....
|
||||
* }
|
||||
* return $result;
|
||||
* }
|
||||
* @endcode
|
||||
*
|
||||
* @param callable $workCallback
|
||||
* @since 1.28
|
||||
*/
|
||||
public function addBusyCallback( callable $workCallback ) {
|
||||
$this->busyCallbacks[] = $workCallback;
|
||||
}
|
||||
|
||||
/**
|
||||
* Modify a cache update operation array for EventRelayer::notify()
|
||||
*
|
||||
|
|
|
|||
|
|
@ -377,7 +377,7 @@ class SqlBagOStuff extends BagOStuff {
|
|||
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
|
||||
$ok = $this->setMulti( [ $key => $value ], $exptime );
|
||||
if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
|
||||
$ok = $ok && $this->waitForSlaves();
|
||||
$ok = $this->waitForReplication() && $ok;
|
||||
}
|
||||
|
||||
return $ok;
|
||||
|
|
@ -489,7 +489,7 @@ class SqlBagOStuff extends BagOStuff {
|
|||
public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
|
||||
$ok = $this->mergeViaCas( $key, $callback, $exptime, $attempts );
|
||||
if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
|
||||
$ok = $ok && $this->waitForSlaves();
|
||||
$ok = $this->waitForReplication() && $ok;
|
||||
}
|
||||
|
||||
return $ok;
|
||||
|
|
@ -797,26 +797,30 @@ class SqlBagOStuff extends BagOStuff {
|
|||
return !$this->serverInfos;
|
||||
}
|
||||
|
||||
protected function waitForSlaves() {
|
||||
if ( $this->usesMainDB() ) {
|
||||
$lb = $this->getSeparateMainLB()
|
||||
?: MediaWikiServices::getInstance()->getDBLoadBalancer();
|
||||
// Return if there are no slaves
|
||||
if ( $lb->getServerCount() <= 1 ) {
|
||||
return true;
|
||||
}
|
||||
// Main LB is used; wait for any slaves to catch up
|
||||
try {
|
||||
$pos = $lb->getMasterPos();
|
||||
if ( $pos ) {
|
||||
return $lb->waitForAll( $pos, 3 );
|
||||
}
|
||||
} catch ( DBReplicationWaitError $e ) {
|
||||
return false;
|
||||
}
|
||||
protected function waitForReplication() {
|
||||
if ( !$this->usesMainDB() ) {
|
||||
// Custom DB server list; probably doesn't use replication
|
||||
return true;
|
||||
}
|
||||
|
||||
// Custom DB server list; probably doesn't use replication
|
||||
return true;
|
||||
$lb = $this->getSeparateMainLB()
|
||||
?: MediaWikiServices::getInstance()->getDBLoadBalancer();
|
||||
|
||||
if ( $lb->getServerCount() <= 1 ) {
|
||||
return true; // no slaves
|
||||
}
|
||||
|
||||
// Main LB is used; wait for any slaves to catch up
|
||||
$masterPos = $lb->getMasterPos();
|
||||
|
||||
$loop = new WaitConditionLoop(
|
||||
function () use ( $lb, $masterPos ) {
|
||||
return $lb->waitForAll( $masterPos, 1 );
|
||||
},
|
||||
$this->syncTimeout,
|
||||
$this->busyCallbacks
|
||||
);
|
||||
|
||||
return ( $loop->invoke() === $loop::CONDITION_REACHED );
|
||||
}
|
||||
}
|
||||
|
|
|
|||
154
tests/phpunit/includes/libs/WaitConditionLoopTest.php
Normal file
154
tests/phpunit/includes/libs/WaitConditionLoopTest.php
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
<?php
|
||||
|
||||
class WaitConditionLoopFakeTime extends WaitConditionLoop {
|
||||
protected $wallClock = 1;
|
||||
|
||||
function __construct( callable $condition, $timeout, array $busyCallbacks ) {
|
||||
parent::__construct( $condition, $timeout, $busyCallbacks );
|
||||
}
|
||||
|
||||
function usleep( $microseconds ) {
|
||||
$this->wallClock += $microseconds / 1e6;
|
||||
}
|
||||
|
||||
function getCpuTime() {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
function getWallTime() {
|
||||
return $this->wallClock;
|
||||
}
|
||||
|
||||
public function setWallClock( &$timestamp ) {
|
||||
$this->wallClock =& $timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
class WaitConditionLoopTest extends PHPUnit_Framework_TestCase {
|
||||
public function testCallbackReached() {
|
||||
$wallClock = microtime( true );
|
||||
|
||||
$count = 0;
|
||||
$status = new StatusValue();
|
||||
$loop = new WaitConditionLoopFakeTime(
|
||||
function () use ( &$count, $status ) {
|
||||
++$count;
|
||||
$status->value = 'cookie';
|
||||
|
||||
return WaitConditionLoop::CONDITION_REACHED;
|
||||
},
|
||||
10.0,
|
||||
$this->newBusyWork( $x, $y, $z )
|
||||
);
|
||||
$this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke() );
|
||||
$this->assertEquals( 1, $count );
|
||||
$this->assertEquals( 'cookie', $status->value );
|
||||
$this->assertEquals( [ 0, 0, 0 ], [ $x, $y, $z ], "No busy work done" );
|
||||
|
||||
$count = 0;
|
||||
$loop = new WaitConditionLoopFakeTime(
|
||||
function () use ( &$count, &$wallClock ) {
|
||||
$wallClock += 1;
|
||||
++$count;
|
||||
|
||||
return $count >= 2 ? WaitConditionLoop::CONDITION_REACHED : false;
|
||||
},
|
||||
7.0,
|
||||
$this->newBusyWork( $x, $y, $z, $wallClock )
|
||||
);
|
||||
$this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke(),
|
||||
"Busy work did not cause timeout" );
|
||||
$this->assertEquals( [ 1, 0, 0 ], [ $x, $y, $z ] );
|
||||
|
||||
$count = 0;
|
||||
$loop = new WaitConditionLoopFakeTime(
|
||||
function () use ( &$count, &$wallClock ) {
|
||||
$wallClock += .1;
|
||||
++$count;
|
||||
|
||||
return $count > 80 ? true : false;
|
||||
},
|
||||
50.0,
|
||||
$this->newBusyWork( $x, $y, $z, $wallClock, $dontCallMe, $badCalls )
|
||||
);
|
||||
$this->assertEquals( 0, $badCalls, "Callback exception not yet called" );
|
||||
$this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke() );
|
||||
$this->assertEquals( [ 1, 1, 1 ], [ $x, $y, $z ], "Busy work done" );
|
||||
$this->assertEquals( 1, $badCalls, "Bad callback ran and was exception caught" );
|
||||
|
||||
try {
|
||||
$e = null;
|
||||
$dontCallMe();
|
||||
} catch ( Exception $e ) {
|
||||
}
|
||||
|
||||
$this->assertInstanceOf( 'RunTimeException', $e );
|
||||
$this->assertEquals( 1, $badCalls, "Callback exception cached" );
|
||||
}
|
||||
|
||||
public function testCallbackTimeout() {
|
||||
$count = 0;
|
||||
$wallClock = microtime( true );
|
||||
$loop = new WaitConditionLoopFakeTime(
|
||||
function () use ( &$count, &$wallClock ) {
|
||||
$wallClock += 3;
|
||||
++$count;
|
||||
|
||||
return $count > 300 ? true : false;
|
||||
},
|
||||
50.0,
|
||||
$this->newBusyWork( $x, $y, $z, $wallClock )
|
||||
);
|
||||
$loop->setWallClock( $wallClock );
|
||||
$this->assertEquals( $loop::CONDITION_TIMED_OUT, $loop->invoke() );
|
||||
$this->assertEquals( [ 1, 1, 1 ], [ $x, $y, $z ], "Busy work done" );
|
||||
}
|
||||
|
||||
public function testCallbackAborted() {
|
||||
$x = 0;
|
||||
$wallClock = microtime( true );
|
||||
$loop = new WaitConditionLoopFakeTime(
|
||||
function () use ( &$x, &$wallClock ) {
|
||||
$wallClock += 2;
|
||||
++$x;
|
||||
|
||||
return $x > 2 ? WaitConditionLoop::CONDITION_ABORTED : false;
|
||||
},
|
||||
10.0,
|
||||
$this->newBusyWork( $x, $y, $z, $wallClock )
|
||||
);
|
||||
$loop->setWallClock( $wallClock );
|
||||
$this->assertEquals( $loop::CONDITION_ABORTED, $loop->invoke() );
|
||||
}
|
||||
|
||||
private function newBusyWork(
|
||||
&$x, &$y, &$z, &$wallClock = 1, &$dontCallMe = null, &$badCalls = 0
|
||||
) {
|
||||
$x = $y = $z = 0;
|
||||
$badCalls = 0;
|
||||
|
||||
$list = [];
|
||||
$list[] = function () use ( &$x, &$wallClock ) {
|
||||
$wallClock += 1;
|
||||
|
||||
return ++$x;
|
||||
};
|
||||
$dontCallMe = function () use ( &$badCalls ) {
|
||||
++$badCalls;
|
||||
throw new RuntimeException( "TrollyMcTrollFace" );
|
||||
};
|
||||
$list[] =& $dontCallMe;
|
||||
$list[] = function () use ( &$y, &$wallClock ) {
|
||||
$wallClock += 15;
|
||||
|
||||
return ++$y;
|
||||
};
|
||||
$list[] = function () use ( &$z, &$wallClock ) {
|
||||
$wallClock += 0.1;
|
||||
|
||||
return ++$z;
|
||||
};
|
||||
|
||||
return $list;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue