objectcache: allow for callbacks to mask SYNC_WRITE latency

Change-Id: I908222ad3788ebe330aa58831cda139da32becd8
This commit is contained in:
Aaron Schulz 2016-08-29 10:00:05 -07:00
parent 52511952de
commit e99bb1b7dc
5 changed files with 383 additions and 28 deletions

View file

@ -1498,6 +1498,7 @@ $wgAutoloadLocalClasses = [
'VirtualRESTService' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTService.php',
'VirtualRESTServiceClient' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTServiceClient.php',
'WANObjectCache' => __DIR__ . '/includes/libs/objectcache/WANObjectCache.php',
'WaitConditionLoop' => __DIR__ . '/includes/libs/WaitConditionLoop.php',
'WantedCategoriesPage' => __DIR__ . '/includes/specials/SpecialWantedcategories.php',
'WantedFilesPage' => __DIR__ . '/includes/specials/SpecialWantedfiles.php',
'WantedPagesPage' => __DIR__ . '/includes/specials/SpecialWantedpages.php',

View file

@ -0,0 +1,171 @@
<?php
/**
* Wait loop that reaches a condition or times out.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
* @author Aaron Schulz
*/
/**
* Wait loop that reaches a condition or times out
* @since 1.28
*/
class WaitConditionLoop {
/** @var callable */
private $condition;
/** @var callable[] */
private $busyCallbacks = [];
/** @var float Seconds */
private $timeout;
/** @var float Seconds */
private $lastWaitTime;
const CONDITION_REACHED = 1;
const CONDITION_CONTINUE = 0; // evaluates as falsey
const CONDITION_TIMED_OUT = -1;
const CONDITION_ABORTED = -2;
/**
* @param callable $condition Callback that returns a WaitConditionLoop::CONDITION_ constant
* @param float $timeout Timeout in seconds
* @param array &$busyCallbacks List of callbacks to do useful work (by reference)
*/
public function __construct( callable $condition, $timeout = 5.0, &$busyCallbacks = [] ) {
$this->condition = $condition;
$this->timeout = $timeout;
$this->busyCallbacks =& $busyCallbacks;
}
/**
* Invoke the loop and continue until either:
* - a) The condition callback does not return either CONDITION_CONTINUE or true
* - b) The timeout is reached
* This a condition callback can return true (stop) or false (continue) for convenience.
* In such cases, the halting result of "true" will be converted to CONDITION_REACHED.
*
* Exceptions in callbacks will be caught and the callback will be swapped with
* one that simply rethrows that exception back to the caller when invoked.
*
* @return integer WaitConditionLoop::CONDITION_* constant
* @throws Exception Any error from the condition callback
*/
public function invoke() {
$elapsed = 0.0; // seconds
$sleepUs = 0; // microseconds to sleep each time
$lastCheck = false;
$finalResult = self::CONDITION_TIMED_OUT;
do {
$checkStartTime = $this->getWallTime();
// Check if the condition is met yet
$realStart = $this->getWallTime();
$cpuStart = $this->getCpuTime();
$checkResult = call_user_func( $this->condition );
$cpu = $this->getCpuTime() - $cpuStart;
$real = $this->getWallTime() - $realStart;
// Exit if the condition is reached
if ( (int)$checkResult !== self::CONDITION_CONTINUE ) {
$finalResult = is_int( $checkResult ) ? $checkResult : self::CONDITION_REACHED;
break;
} elseif ( $lastCheck ) {
break; // timeout
}
// Detect if condition callback seems to block or if justs burns CPU
$conditionUsesInterrupts = ( $real > 0.100 && $cpu <= $real * .03 );
if ( !$this->popAndRunBusyCallback() && !$conditionUsesInterrupts ) {
// 10 queries = 10(10+100)/2 ms = 550ms, 14 queries = 1050ms
$sleepUs = min( $sleepUs + 10 * 1e3, 1e6 ); // stop incrementing at ~1s
$this->usleep( $sleepUs );
}
$checkEndTime = $this->getWallTime();
// The max() protects against the clock getting set back
$elapsed += max( $checkEndTime - $checkStartTime, 0.010 );
// Do not let slow callbacks timeout without checking the condition one more time
$lastCheck = ( $elapsed >= $this->timeout );
} while ( true );
$this->lastWaitTime = $elapsed;
return $finalResult;
}
/**
* @return float Seconds
*/
public function getLastWaitTime() {
return $this->lastWaitTime;
}
/**
* @param integer $microseconds
*/
protected function usleep( $microseconds ) {
usleep( $microseconds );
}
/**
* @return float
*/
protected function getWallTime() {
return microtime( true );
}
/**
* @return float Returns 0.0 if not supported (Windows on PHP < 7)
*/
protected function getCpuTime() {
$time = 0.0;
if ( defined( 'HHVM_VERSION' ) && PHP_OS === 'Linux' ) {
$ru = getrusage( 2 /* RUSAGE_THREAD */ );
} else {
$ru = getrusage( 0 /* RUSAGE_SELF */ );
}
if ( $ru ) {
$time += $ru['ru_utime.tv_sec'] + $ru['ru_utime.tv_usec'] / 1e6;
$time += $ru['ru_stime.tv_sec'] + $ru['ru_stime.tv_usec'] / 1e6;
}
return $time;
}
/**
* Run one of the callbacks that does work ahead of time for another caller
*
* @return bool Whether a callback was executed
*/
private function popAndRunBusyCallback() {
if ( $this->busyCallbacks ) {
reset( $this->busyCallbacks );
$key = key( $this->busyCallbacks );
/** @var callable $workCallback */
$workCallback =& $this->busyCallbacks[$key];
try {
$workCallback();
} catch ( Exception $e ) {
$workCallback = function () use ( $e ) {
throw $e;
};
}
unset( $this->busyCallbacks[$key] ); // consume
return true;
}
return false;
}
}

View file

@ -45,31 +45,29 @@ use Psr\Log\NullLogger;
abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
/** @var array[] Lock tracking */
protected $locks = [];
/** @var integer ERR_* class constant */
protected $lastError = self::ERR_NONE;
/** @var string */
protected $keyspace = 'local';
/** @var LoggerInterface */
protected $logger;
/** @var callback|null */
protected $asyncHandler;
/** @var integer Seconds */
protected $syncTimeout;
/** @var bool */
private $debugMode = false;
/** @var array */
private $duplicateKeyLookups = [];
/** @var bool */
private $reportDupes = false;
/** @var bool */
private $dupeTrackScheduled = false;
/** @var callable[] */
protected $busyCallbacks = [];
/** @var integer[] Map of (ATTR_* class constant => QOS_* class constant) */
protected $attrMap = [];
@ -94,6 +92,7 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
* In CLI mode, it should run the task immediately.
* - reportDupes: Whether to emit warning log messages for all keys that were
* requested more than once (requires an asyncHandler).
* - syncTimeout: How long to wait with WRITE_SYNC in seconds.
* @param array $params
*/
public function __construct( array $params = [] ) {
@ -114,6 +113,8 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
if ( !empty( $params['reportDupes'] ) && is_callable( $this->asyncHandler ) ) {
$this->reportDupes = true;
}
$this->syncTimeout = isset( $params['syncTimeout'] ) ? $params['syncTimeout'] : 3;
}
/**
@ -642,6 +643,30 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
$this->lastError = $err;
}
/**
* Let a callback be run to avoid wasting time on special blocking calls
*
* The callbacks may or may not be called ever, in any particular order.
* They are likely to be invoked when something WRITE_SYNC is used used.
* They should follow a caching pattern as shown below, so that any code
* using the word will get it's result no matter what happens.
* @code
* $result = null;
* $workCallback = function () use ( &$result ) {
* if ( !$result ) {
* $result = ....
* }
* return $result;
* }
* @endcode
*
* @param callable $workCallback
* @since 1.28
*/
public function addBusyCallback( callable $workCallback ) {
$this->busyCallbacks[] = $workCallback;
}
/**
* Modify a cache update operation array for EventRelayer::notify()
*

View file

@ -377,7 +377,7 @@ class SqlBagOStuff extends BagOStuff {
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
$ok = $this->setMulti( [ $key => $value ], $exptime );
if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
$ok = $ok && $this->waitForSlaves();
$ok = $this->waitForReplication() && $ok;
}
return $ok;
@ -489,7 +489,7 @@ class SqlBagOStuff extends BagOStuff {
public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
$ok = $this->mergeViaCas( $key, $callback, $exptime, $attempts );
if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
$ok = $ok && $this->waitForSlaves();
$ok = $this->waitForReplication() && $ok;
}
return $ok;
@ -797,26 +797,30 @@ class SqlBagOStuff extends BagOStuff {
return !$this->serverInfos;
}
protected function waitForSlaves() {
if ( $this->usesMainDB() ) {
$lb = $this->getSeparateMainLB()
?: MediaWikiServices::getInstance()->getDBLoadBalancer();
// Return if there are no slaves
if ( $lb->getServerCount() <= 1 ) {
return true;
}
// Main LB is used; wait for any slaves to catch up
try {
$pos = $lb->getMasterPos();
if ( $pos ) {
return $lb->waitForAll( $pos, 3 );
}
} catch ( DBReplicationWaitError $e ) {
return false;
}
protected function waitForReplication() {
if ( !$this->usesMainDB() ) {
// Custom DB server list; probably doesn't use replication
return true;
}
// Custom DB server list; probably doesn't use replication
return true;
$lb = $this->getSeparateMainLB()
?: MediaWikiServices::getInstance()->getDBLoadBalancer();
if ( $lb->getServerCount() <= 1 ) {
return true; // no slaves
}
// Main LB is used; wait for any slaves to catch up
$masterPos = $lb->getMasterPos();
$loop = new WaitConditionLoop(
function () use ( $lb, $masterPos ) {
return $lb->waitForAll( $masterPos, 1 );
},
$this->syncTimeout,
$this->busyCallbacks
);
return ( $loop->invoke() === $loop::CONDITION_REACHED );
}
}

View file

@ -0,0 +1,154 @@
<?php
class WaitConditionLoopFakeTime extends WaitConditionLoop {
protected $wallClock = 1;
function __construct( callable $condition, $timeout, array $busyCallbacks ) {
parent::__construct( $condition, $timeout, $busyCallbacks );
}
function usleep( $microseconds ) {
$this->wallClock += $microseconds / 1e6;
}
function getCpuTime() {
return 0.0;
}
function getWallTime() {
return $this->wallClock;
}
public function setWallClock( &$timestamp ) {
$this->wallClock =& $timestamp;
}
}
class WaitConditionLoopTest extends PHPUnit_Framework_TestCase {
public function testCallbackReached() {
$wallClock = microtime( true );
$count = 0;
$status = new StatusValue();
$loop = new WaitConditionLoopFakeTime(
function () use ( &$count, $status ) {
++$count;
$status->value = 'cookie';
return WaitConditionLoop::CONDITION_REACHED;
},
10.0,
$this->newBusyWork( $x, $y, $z )
);
$this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke() );
$this->assertEquals( 1, $count );
$this->assertEquals( 'cookie', $status->value );
$this->assertEquals( [ 0, 0, 0 ], [ $x, $y, $z ], "No busy work done" );
$count = 0;
$loop = new WaitConditionLoopFakeTime(
function () use ( &$count, &$wallClock ) {
$wallClock += 1;
++$count;
return $count >= 2 ? WaitConditionLoop::CONDITION_REACHED : false;
},
7.0,
$this->newBusyWork( $x, $y, $z, $wallClock )
);
$this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke(),
"Busy work did not cause timeout" );
$this->assertEquals( [ 1, 0, 0 ], [ $x, $y, $z ] );
$count = 0;
$loop = new WaitConditionLoopFakeTime(
function () use ( &$count, &$wallClock ) {
$wallClock += .1;
++$count;
return $count > 80 ? true : false;
},
50.0,
$this->newBusyWork( $x, $y, $z, $wallClock, $dontCallMe, $badCalls )
);
$this->assertEquals( 0, $badCalls, "Callback exception not yet called" );
$this->assertEquals( $loop::CONDITION_REACHED, $loop->invoke() );
$this->assertEquals( [ 1, 1, 1 ], [ $x, $y, $z ], "Busy work done" );
$this->assertEquals( 1, $badCalls, "Bad callback ran and was exception caught" );
try {
$e = null;
$dontCallMe();
} catch ( Exception $e ) {
}
$this->assertInstanceOf( 'RunTimeException', $e );
$this->assertEquals( 1, $badCalls, "Callback exception cached" );
}
public function testCallbackTimeout() {
$count = 0;
$wallClock = microtime( true );
$loop = new WaitConditionLoopFakeTime(
function () use ( &$count, &$wallClock ) {
$wallClock += 3;
++$count;
return $count > 300 ? true : false;
},
50.0,
$this->newBusyWork( $x, $y, $z, $wallClock )
);
$loop->setWallClock( $wallClock );
$this->assertEquals( $loop::CONDITION_TIMED_OUT, $loop->invoke() );
$this->assertEquals( [ 1, 1, 1 ], [ $x, $y, $z ], "Busy work done" );
}
public function testCallbackAborted() {
$x = 0;
$wallClock = microtime( true );
$loop = new WaitConditionLoopFakeTime(
function () use ( &$x, &$wallClock ) {
$wallClock += 2;
++$x;
return $x > 2 ? WaitConditionLoop::CONDITION_ABORTED : false;
},
10.0,
$this->newBusyWork( $x, $y, $z, $wallClock )
);
$loop->setWallClock( $wallClock );
$this->assertEquals( $loop::CONDITION_ABORTED, $loop->invoke() );
}
private function newBusyWork(
&$x, &$y, &$z, &$wallClock = 1, &$dontCallMe = null, &$badCalls = 0
) {
$x = $y = $z = 0;
$badCalls = 0;
$list = [];
$list[] = function () use ( &$x, &$wallClock ) {
$wallClock += 1;
return ++$x;
};
$dontCallMe = function () use ( &$badCalls ) {
++$badCalls;
throw new RuntimeException( "TrollyMcTrollFace" );
};
$list[] =& $dontCallMe;
$list[] = function () use ( &$y, &$wallClock ) {
$wallClock += 15;
return ++$y;
};
$list[] = function () use ( &$z, &$wallClock ) {
$wallClock += 0.1;
return ++$z;
};
return $list;
}
}