Replace cpPosTime cookie/parameter with cpPosIndex
This handles multi-DB transactions properly, instead of causing wait timeouts in the WaitConditionLoop. It also is more correct in using a counter instead of relying on wall clocks. In addition: * Refactor related code in MediaWiki.php to be comprehensible. * Always send the cookie even the "remote wiki redirect" case. * Renamed ChronologyProtector field and constant to avoid any confusion of "wait for server X to reach Y" with "wait for Y to show up in position store". * Add an "asOfTime" field to the position keys for debugging. Bug: T182322 Change-Id: I5c73cd07eaf664f02ba00c38fab9f49b609f4284
This commit is contained in:
parent
80f7ac1ccf
commit
d5aa846d84
6 changed files with 198 additions and 139 deletions
|
|
@ -603,51 +603,55 @@ class MediaWiki {
|
|||
DeferredUpdates::doUpdates( 'enqueue', DeferredUpdates::PRESEND );
|
||||
wfDebug( __METHOD__ . ': pre-send deferred updates completed' );
|
||||
|
||||
// Decide when clients block on ChronologyProtector DB position writes
|
||||
$urlDomainDistance = (
|
||||
$request->wasPosted() &&
|
||||
$output->getRedirect() &&
|
||||
$lbFactory->hasOrMadeRecentMasterChanges( INF )
|
||||
) ? self::getUrlDomainDistance( $output->getRedirect() ) : false;
|
||||
// Should the client return, their request should observe the new ChronologyProtector
|
||||
// DB positions. This request might be on a foreign wiki domain, so synchronously update
|
||||
// the DB positions in all datacenters to be safe. If this output is not a redirect,
|
||||
// then OutputPage::output() will be relatively slow, meaning that running it in
|
||||
// $postCommitWork should help mask the latency of those updates.
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_SYNC;
|
||||
$strategy = 'cookie+sync';
|
||||
|
||||
$allowHeaders = !( $output->isDisabled() || headers_sent() );
|
||||
if ( $urlDomainDistance === 'local' || $urlDomainDistance === 'remote' ) {
|
||||
// OutputPage::output() will be fast; $postCommitWork will not be useful for
|
||||
// masking the latency of syncing DB positions accross all datacenters synchronously.
|
||||
// Instead, make use of the RTT time of the client follow redirects.
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
|
||||
$cpPosTime = microtime( true );
|
||||
// Client's next request should see 1+ positions with this DBMasterPos::asOf() time
|
||||
if ( $urlDomainDistance === 'local' && $allowHeaders ) {
|
||||
// Client will stay on this domain, so set an unobtrusive cookie
|
||||
$expires = time() + ChronologyProtector::POSITION_TTL;
|
||||
$options = [ 'prefix' => '' ];
|
||||
$request->response()->setCookie( 'cpPosTime', $cpPosTime, $expires, $options );
|
||||
} else {
|
||||
// Cookies may not work across wiki domains, so use a URL parameter
|
||||
$safeUrl = $lbFactory->appendPreShutdownTimeAsQuery(
|
||||
$output->getRedirect(),
|
||||
$cpPosTime
|
||||
);
|
||||
$output->redirect( $safeUrl );
|
||||
}
|
||||
} else {
|
||||
// OutputPage::output() is fairly slow; run it in $postCommitWork to mask
|
||||
// the latency of syncing DB positions accross all datacenters synchronously
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_SYNC;
|
||||
if ( $lbFactory->hasOrMadeRecentMasterChanges( INF ) && $allowHeaders ) {
|
||||
$cpPosTime = microtime( true );
|
||||
// Set a cookie in case the DB position store cannot sync accross datacenters.
|
||||
// This will at least cover the common case of the user staying on the domain.
|
||||
$expires = time() + ChronologyProtector::POSITION_TTL;
|
||||
$options = [ 'prefix' => '' ];
|
||||
$request->response()->setCookie( 'cpPosTime', $cpPosTime, $expires, $options );
|
||||
if ( $output->getRedirect() && $lbFactory->hasOrMadeRecentMasterChanges( INF ) ) {
|
||||
// OutputPage::output() will be fast, so $postCommitWork is useless for masking
|
||||
// the latency of synchronously updating the DB positions in all datacenters.
|
||||
// Try to make use of the time the client spends following redirects instead.
|
||||
$domainDistance = self::getUrlDomainDistance( $output->getRedirect() );
|
||||
if ( $domainDistance === 'local' && $allowHeaders ) {
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
|
||||
$strategy = 'cookie'; // use same-domain cookie and keep the URL uncluttered
|
||||
} elseif ( $domainDistance === 'remote' ) {
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
|
||||
$strategy = 'cookie+url'; // cross-domain cookie might not work
|
||||
}
|
||||
}
|
||||
|
||||
// Record ChronologyProtector positions for DBs affected in this request at this point
|
||||
$lbFactory->shutdown( $flags, $postCommitWork );
|
||||
$cpIndex = null;
|
||||
$lbFactory->shutdown( $flags, $postCommitWork, $cpIndex );
|
||||
wfDebug( __METHOD__ . ': LBFactory shutdown completed' );
|
||||
|
||||
if ( $cpIndex > 0 ) {
|
||||
if ( $allowHeaders ) {
|
||||
$expires = time() + ChronologyProtector::POSITION_TTL;
|
||||
$options = [ 'prefix' => '' ];
|
||||
$request->response()->setCookie( 'cpPosIndex', $cpIndex, $expires, $options );
|
||||
}
|
||||
|
||||
if ( $strategy === 'cookie+url' ) {
|
||||
if ( $output->getRedirect() ) { // sanity
|
||||
$safeUrl = $lbFactory->appendShutdownCPIndexAsQuery(
|
||||
$output->getRedirect(),
|
||||
$cpIndex
|
||||
);
|
||||
$output->redirect( $safeUrl );
|
||||
} else {
|
||||
$e = new LogicException( "No redirect; cannot append cpPosIndex parameter." );
|
||||
MWExceptionHandler::logException( $e );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set a cookie to tell all CDN edge nodes to "stick" the user to the DC that handles this
|
||||
// POST request (e.g. the "master" data center). Also have the user briefly bypass CDN so
|
||||
// ChronologyProtector works for cacheable URLs.
|
||||
|
|
|
|||
|
|
@ -736,19 +736,19 @@ if ( !$wgDBerrorLogTZ ) {
|
|||
// Initialize the request object in $wgRequest
|
||||
$wgRequest = RequestContext::getMain()->getRequest(); // BackCompat
|
||||
// Set user IP/agent information for causal consistency purposes.
|
||||
// The cpPosTime cookie has no prefix and is set by MediaWiki::preOutputCommit().
|
||||
$cpPosTime = $wgRequest->getFloat( 'cpPosTime', $wgRequest->getCookie( 'cpPosTime', '' ) );
|
||||
// The cpPosIndex cookie has no prefix and is set by MediaWiki::preOutputCommit().
|
||||
$cpPosIndex = $wgRequest->getInt( 'cpPosIndex', (int)$wgRequest->getCookie( 'cpPosIndex', '' ) );
|
||||
MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->setRequestInfo( [
|
||||
'IPAddress' => $wgRequest->getIP(),
|
||||
'UserAgent' => $wgRequest->getHeader( 'User-Agent' ),
|
||||
'ChronologyProtection' => $wgRequest->getHeader( 'ChronologyProtection' ),
|
||||
'ChronologyPositionTime' => $cpPosTime
|
||||
'ChronologyPositionIndex' => $cpPosIndex
|
||||
] );
|
||||
// Make sure that caching does not compromise the consistency improvements
|
||||
if ( $cpPosTime ) {
|
||||
if ( $cpPosIndex ) {
|
||||
MediaWikiServices::getInstance()->getMainWANObjectCache()->useInterimHoldOffCaching( false );
|
||||
}
|
||||
unset( $cpPosTime );
|
||||
unset( $cpPosIndex );
|
||||
|
||||
// Useful debug output
|
||||
if ( $wgCommandLineMode ) {
|
||||
|
|
|
|||
|
|
@ -43,10 +43,10 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
protected $key;
|
||||
/** @var string Hash of client parameters */
|
||||
protected $clientId;
|
||||
/** @var float|null Minimum UNIX timestamp of 1+ expected startup positions */
|
||||
protected $waitForPosTime;
|
||||
/** @var int|null Expected minimum index of the last write to the position store */
|
||||
protected $waitForPosIndex;
|
||||
/** @var int Max seconds to wait on positions to appear */
|
||||
protected $waitForPosTimeout = self::POS_WAIT_TIMEOUT;
|
||||
protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
|
||||
/** @var bool Whether to no-op all method calls */
|
||||
protected $enabled = true;
|
||||
/** @var bool Whether to check and wait on positions */
|
||||
|
|
@ -64,19 +64,19 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
/** @var int Seconds to store positions */
|
||||
const POSITION_TTL = 60;
|
||||
/** @var int Max time to wait for positions to appear */
|
||||
const POS_WAIT_TIMEOUT = 5;
|
||||
const POS_STORE_WAIT_TIMEOUT = 5;
|
||||
|
||||
/**
|
||||
* @param BagOStuff $store
|
||||
* @param array $client Map of (ip: <IP>, agent: <user-agent>)
|
||||
* @param float $posTime UNIX timestamp
|
||||
* @param array[] $client Map of (ip: <IP>, agent: <user-agent>)
|
||||
* @param int|null $posIndex Write counter index [optional]
|
||||
* @since 1.27
|
||||
*/
|
||||
public function __construct( BagOStuff $store, array $client, $posTime = null ) {
|
||||
public function __construct( BagOStuff $store, array $client, $posIndex = null ) {
|
||||
$this->store = $store;
|
||||
$this->clientId = md5( $client['ip'] . "\n" . $client['agent'] );
|
||||
$this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v1' );
|
||||
$this->waitForPosTime = $posTime;
|
||||
$this->waitForPosIndex = $posIndex;
|
||||
$this->logger = new NullLogger();
|
||||
}
|
||||
|
||||
|
|
@ -161,9 +161,10 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
*
|
||||
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
|
||||
* @param string $mode One of (sync, async); whether to wait on remote datacenters
|
||||
* @param int|null &$cpIndex DB position key write counter; incremented on update
|
||||
* @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
|
||||
*/
|
||||
public function shutdown( callable $workCallback = null, $mode = 'sync' ) {
|
||||
public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
|
||||
if ( !$this->enabled ) {
|
||||
return [];
|
||||
}
|
||||
|
|
@ -198,13 +199,18 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
}
|
||||
$ok = $store->set(
|
||||
$this->key,
|
||||
self::mergePositions( $store->get( $this->key ), $this->shutdownPositions ),
|
||||
$this->mergePositions(
|
||||
$store->get( $this->key ),
|
||||
$this->shutdownPositions,
|
||||
$cpIndex
|
||||
),
|
||||
self::POSITION_TTL,
|
||||
( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
|
||||
);
|
||||
$store->unlock( $this->key );
|
||||
} else {
|
||||
$ok = false;
|
||||
$cpIndex = null; // nothing saved
|
||||
}
|
||||
|
||||
if ( !$ok ) {
|
||||
|
|
@ -254,28 +260,36 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
|
||||
$this->initialized = true;
|
||||
if ( $this->wait ) {
|
||||
// If there is an expectation to see master positions with a certain min
|
||||
// timestamp, then block until they appear, or until a timeout is reached.
|
||||
if ( $this->waitForPosTime > 0.0 ) {
|
||||
// If there is an expectation to see master positions from a certain write
|
||||
// index or higher, then block until it appears, or until a timeout is reached.
|
||||
// Since the write index restarts each time the key is created, it is possible that
|
||||
// a lagged store has a matching key write index. However, in that case, it should
|
||||
// already be expired and thus treated as non-existing, maintaining correctness.
|
||||
if ( $this->waitForPosIndex > 0 ) {
|
||||
$data = null;
|
||||
$loop = new WaitConditionLoop(
|
||||
function () use ( &$data ) {
|
||||
$data = $this->store->get( $this->key );
|
||||
if ( !is_array( $data ) ) {
|
||||
return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
|
||||
} elseif ( !isset( $data['writeIndex'] ) ) {
|
||||
return WaitConditionLoop::CONDITION_REACHED; // b/c
|
||||
}
|
||||
|
||||
return ( self::minPosTime( $data ) >= $this->waitForPosTime )
|
||||
return ( $data['writeIndex'] >= $this->waitForPosIndex )
|
||||
? WaitConditionLoop::CONDITION_REACHED
|
||||
: WaitConditionLoop::CONDITION_CONTINUE;
|
||||
},
|
||||
$this->waitForPosTimeout
|
||||
$this->waitForPosStoreTimeout
|
||||
);
|
||||
$result = $loop->invoke();
|
||||
$waitedMs = $loop->getLastWaitTime() * 1e3;
|
||||
|
||||
if ( $result == $loop::CONDITION_REACHED ) {
|
||||
$msg = "expected and found pos time {$this->waitForPosTime} ({$waitedMs}ms)";
|
||||
$msg = "expected and found pos index {$this->waitForPosIndex} ({$waitedMs}ms)";
|
||||
$this->logger->debug( $msg );
|
||||
} else {
|
||||
$msg = "expected but missed pos time {$this->waitForPosTime} ({$waitedMs}ms)";
|
||||
$msg = "expected but missed pos index {$this->waitForPosIndex} ({$waitedMs}ms)";
|
||||
$this->logger->info( $msg );
|
||||
}
|
||||
} else {
|
||||
|
|
@ -290,48 +304,31 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array|bool $data
|
||||
* @return float|null
|
||||
*/
|
||||
private static function minPosTime( $data ) {
|
||||
if ( !isset( $data['positions'] ) ) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$min = null;
|
||||
foreach ( $data['positions'] as $pos ) {
|
||||
if ( $pos instanceof DBMasterPos ) {
|
||||
$min = $min ? min( $pos->asOfTime(), $min ) : $pos->asOfTime();
|
||||
}
|
||||
}
|
||||
|
||||
return $min;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array|bool $curValue
|
||||
* @param DBMasterPos[] $shutdownPositions
|
||||
* @param int|null &$cpIndex
|
||||
* @return array
|
||||
*/
|
||||
private static function mergePositions( $curValue, array $shutdownPositions ) {
|
||||
protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
|
||||
/** @var DBMasterPos[] $curPositions */
|
||||
if ( $curValue === false ) {
|
||||
$curPositions = $shutdownPositions;
|
||||
} else {
|
||||
$curPositions = $curValue['positions'];
|
||||
// Use the newest positions for each DB master
|
||||
foreach ( $shutdownPositions as $db => $pos ) {
|
||||
if (
|
||||
!isset( $curPositions[$db] ) ||
|
||||
!( $curPositions[$db] instanceof DBMasterPos ) ||
|
||||
$pos->asOfTime() > $curPositions[$db]->asOfTime()
|
||||
) {
|
||||
$curPositions[$db] = $pos;
|
||||
}
|
||||
$curPositions = isset( $curValue['positions'] ) ? $curValue['positions'] : [];
|
||||
// Use the newest positions for each DB master
|
||||
foreach ( $shutdownPositions as $db => $pos ) {
|
||||
if (
|
||||
!isset( $curPositions[$db] ) ||
|
||||
!( $curPositions[$db] instanceof DBMasterPos ) ||
|
||||
$pos->asOfTime() > $curPositions[$db]->asOfTime()
|
||||
) {
|
||||
$curPositions[$db] = $pos;
|
||||
}
|
||||
}
|
||||
|
||||
return [ 'positions' => $curPositions ];
|
||||
$cpIndex = isset( $curValue['writeIndex'] ) ? $curValue['writeIndex'] : 0;
|
||||
|
||||
return [
|
||||
'positions' => $curPositions,
|
||||
'writeIndex' => ++$cpIndex
|
||||
];
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -140,9 +140,10 @@ interface ILBFactory {
|
|||
* Prepare all tracked load balancers for shutdown
|
||||
* @param int $mode One of the class SHUTDOWN_* constants
|
||||
* @param callable|null $workCallback Work to mask ChronologyProtector writes
|
||||
* @param int|null &$cpIndex Position key write counter for ChronologyProtector
|
||||
*/
|
||||
public function shutdown(
|
||||
$mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null
|
||||
$mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null, &$cpIndex = null
|
||||
);
|
||||
|
||||
/**
|
||||
|
|
@ -304,7 +305,7 @@ interface ILBFactory {
|
|||
public function setAgentName( $agent );
|
||||
|
||||
/**
|
||||
* Append ?cpPosTime parameter to a URL for ChronologyProtector purposes if needed
|
||||
* Append ?cpPosIndex parameter to a URL for ChronologyProtector purposes if needed
|
||||
*
|
||||
* Note that unlike cookies, this works accross domains
|
||||
*
|
||||
|
|
@ -312,14 +313,14 @@ interface ILBFactory {
|
|||
* @param float $time UNIX timestamp just before shutdown() was called
|
||||
* @return string
|
||||
*/
|
||||
public function appendPreShutdownTimeAsQuery( $url, $time );
|
||||
public function appendShutdownCPIndexAsQuery( $url, $time );
|
||||
|
||||
/**
|
||||
* @param array $info Map of fields, including:
|
||||
* - IPAddress : IP address
|
||||
* - UserAgent : User-Agent HTTP header
|
||||
* - ChronologyProtection : cookie/header value specifying ChronologyProtector usage
|
||||
* - ChronologyPositionTime: timestamp used to get up-to-date DB positions for the agent
|
||||
* - ChronologyPositionIndex: timestamp used to get up-to-date DB positions for the agent
|
||||
*/
|
||||
public function setRequestInfo( array $info );
|
||||
}
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ abstract class LBFactory implements ILBFactory {
|
|||
'IPAddress' => isset( $_SERVER[ 'REMOTE_ADDR' ] ) ? $_SERVER[ 'REMOTE_ADDR' ] : '',
|
||||
'UserAgent' => isset( $_SERVER['HTTP_USER_AGENT'] ) ? $_SERVER['HTTP_USER_AGENT'] : '',
|
||||
'ChronologyProtection' => 'true',
|
||||
'ChronologyPositionTime' => isset( $_GET['cpPosTime'] ) ? $_GET['cpPosTime'] : null
|
||||
'ChronologyPositionIndex' => isset( $_GET['cpPosIndex'] ) ? $_GET['cpPosIndex'] : null
|
||||
];
|
||||
|
||||
$this->cliMode = isset( $conf['cliMode'] ) ? $conf['cliMode'] : PHP_SAPI === 'cli';
|
||||
|
|
@ -132,13 +132,13 @@ abstract class LBFactory implements ILBFactory {
|
|||
}
|
||||
|
||||
public function shutdown(
|
||||
$mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null
|
||||
$mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null, &$cpIndex = null
|
||||
) {
|
||||
$chronProt = $this->getChronologyProtector();
|
||||
if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
|
||||
$this->shutdownChronologyProtector( $chronProt, $workCallback, 'sync' );
|
||||
$this->shutdownChronologyProtector( $chronProt, $workCallback, 'sync', $cpIndex );
|
||||
} elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
|
||||
$this->shutdownChronologyProtector( $chronProt, null, 'async' );
|
||||
$this->shutdownChronologyProtector( $chronProt, null, 'async', $cpIndex );
|
||||
}
|
||||
|
||||
$this->commitMasterChanges( __METHOD__ ); // sanity
|
||||
|
|
@ -441,7 +441,7 @@ abstract class LBFactory implements ILBFactory {
|
|||
'ip' => $this->requestInfo['IPAddress'],
|
||||
'agent' => $this->requestInfo['UserAgent'],
|
||||
],
|
||||
$this->requestInfo['ChronologyPositionTime']
|
||||
$this->requestInfo['ChronologyPositionIndex']
|
||||
);
|
||||
$this->chronProt->setLogger( $this->replLogger );
|
||||
|
||||
|
|
@ -465,9 +465,10 @@ abstract class LBFactory implements ILBFactory {
|
|||
* @param ChronologyProtector $cp
|
||||
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
|
||||
* @param string $mode One of (sync, async); whether to wait on remote datacenters
|
||||
* @param int|null &$cpIndex DB position key write counter; incremented on update
|
||||
*/
|
||||
protected function shutdownChronologyProtector(
|
||||
ChronologyProtector $cp, $workCallback, $mode
|
||||
ChronologyProtector $cp, $workCallback, $mode, &$cpIndex = null
|
||||
) {
|
||||
// Record all the master positions needed
|
||||
$this->forEachLB( function ( ILoadBalancer $lb ) use ( $cp ) {
|
||||
|
|
@ -475,7 +476,7 @@ abstract class LBFactory implements ILBFactory {
|
|||
} );
|
||||
// Write them to the persistent stash. Try to do something useful by running $work
|
||||
// while ChronologyProtector waits for the stash write to replicate to all DCs.
|
||||
$unsavedPositions = $cp->shutdown( $workCallback, $mode );
|
||||
$unsavedPositions = $cp->shutdown( $workCallback, $mode, $cpIndex );
|
||||
if ( $unsavedPositions && $workCallback ) {
|
||||
// Invoke callback in case it did not cache the result yet
|
||||
$workCallback(); // work now to block for less time in waitForAll()
|
||||
|
|
@ -544,7 +545,7 @@ abstract class LBFactory implements ILBFactory {
|
|||
$this->agent = $agent;
|
||||
}
|
||||
|
||||
public function appendPreShutdownTimeAsQuery( $url, $time ) {
|
||||
public function appendShutdownCPIndexAsQuery( $url, $index ) {
|
||||
$usedCluster = 0;
|
||||
$this->forEachLB( function ( ILoadBalancer $lb ) use ( &$usedCluster ) {
|
||||
$usedCluster |= ( $lb->getServerCount() > 1 );
|
||||
|
|
@ -554,7 +555,7 @@ abstract class LBFactory implements ILBFactory {
|
|||
return $url; // no master/replica clusters touched
|
||||
}
|
||||
|
||||
return strpos( $url, '?' ) === false ? "$url?cpPosTime=$time" : "$url&cpPosTime=$time";
|
||||
return strpos( $url, '?' ) === false ? "$url?cpPosIndex=$index" : "$url&cpPosIndex=$index";
|
||||
}
|
||||
|
||||
public function setRequestInfo( array $info ) {
|
||||
|
|
|
|||
|
|
@ -192,33 +192,61 @@ class LBFactoryTest extends MediaWikiTestCase {
|
|||
*/
|
||||
public function testChronologyProtector() {
|
||||
// (a) First HTTP request
|
||||
$mPos = new MySQLMasterPos( 'db1034-bin.000976', '843431247' );
|
||||
$m1Pos = new MySQLMasterPos( 'db1034-bin.000976', '843431247' );
|
||||
$m2Pos = new MySQLMasterPos( 'db1064-bin.002400', '794074907' );
|
||||
|
||||
$now = microtime( true );
|
||||
$mockDB = $this->getMockBuilder( 'DatabaseMysqli' )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$mockDB->method( 'writesOrCallbacksPending' )->willReturn( true );
|
||||
$mockDB->method( 'lastDoneWrites' )->willReturn( $now );
|
||||
$mockDB->method( 'getMasterPos' )->willReturn( $mPos );
|
||||
|
||||
$lb = $this->getMockBuilder( 'LoadBalancer' )
|
||||
// Master DB 1
|
||||
$mockDB1 = $this->getMockBuilder( 'DatabaseMysqli' )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$lb->method( 'getConnection' )->willReturn( $mockDB );
|
||||
$lb->method( 'getServerCount' )->willReturn( 2 );
|
||||
$lb->method( 'parentInfo' )->willReturn( [ 'id' => "main-DEFAULT" ] );
|
||||
$lb->method( 'getAnyOpenConnection' )->willReturn( $mockDB );
|
||||
$lb->method( 'hasOrMadeRecentMasterChanges' )->will( $this->returnCallback(
|
||||
function () use ( $mockDB ) {
|
||||
$mockDB1->method( 'writesOrCallbacksPending' )->willReturn( true );
|
||||
$mockDB1->method( 'lastDoneWrites' )->willReturn( $now );
|
||||
$mockDB1->method( 'getMasterPos' )->willReturn( $m1Pos );
|
||||
// Load balancer for master DB 1
|
||||
$lb1 = $this->getMockBuilder( 'LoadBalancer' )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$lb1->method( 'getConnection' )->willReturn( $mockDB1 );
|
||||
$lb1->method( 'getServerCount' )->willReturn( 2 );
|
||||
$lb1->method( 'getAnyOpenConnection' )->willReturn( $mockDB1 );
|
||||
$lb1->method( 'hasOrMadeRecentMasterChanges' )->will( $this->returnCallback(
|
||||
function () use ( $mockDB1 ) {
|
||||
$p = 0;
|
||||
$p |= call_user_func( [ $mockDB, 'writesOrCallbacksPending' ] );
|
||||
$p |= call_user_func( [ $mockDB, 'lastDoneWrites' ] );
|
||||
$p |= call_user_func( [ $mockDB1, 'writesOrCallbacksPending' ] );
|
||||
$p |= call_user_func( [ $mockDB1, 'lastDoneWrites' ] );
|
||||
|
||||
return (bool)$p;
|
||||
}
|
||||
) );
|
||||
$lb->method( 'getMasterPos' )->willReturn( $mPos );
|
||||
$lb1->method( 'getMasterPos' )->willReturn( $m1Pos );
|
||||
$lb1->method( 'getServerName' )->with( 0 )->willReturn( 'master1' );
|
||||
// Master DB 2
|
||||
$mockDB2 = $this->getMockBuilder( 'DatabaseMysqli' )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$mockDB2->method( 'writesOrCallbacksPending' )->willReturn( true );
|
||||
$mockDB2->method( 'lastDoneWrites' )->willReturn( $now );
|
||||
$mockDB2->method( 'getMasterPos' )->willReturn( $m2Pos );
|
||||
// Load balancer for master DB 2
|
||||
$lb2 = $this->getMockBuilder( 'LoadBalancer' )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$lb2->method( 'getConnection' )->willReturn( $mockDB2 );
|
||||
$lb2->method( 'getServerCount' )->willReturn( 2 );
|
||||
$lb2->method( 'getAnyOpenConnection' )->willReturn( $mockDB2 );
|
||||
$lb2->method( 'hasOrMadeRecentMasterChanges' )->will( $this->returnCallback(
|
||||
function () use ( $mockDB2 ) {
|
||||
$p = 0;
|
||||
$p |= call_user_func( [ $mockDB2, 'writesOrCallbacksPending' ] );
|
||||
$p |= call_user_func( [ $mockDB2, 'lastDoneWrites' ] );
|
||||
|
||||
return (bool)$p;
|
||||
}
|
||||
) );
|
||||
$lb2->method( 'getMasterPos' )->willReturn( $m2Pos );
|
||||
$lb2->method( 'getServerName' )->with( 0 )->willReturn( 'master2' );
|
||||
|
||||
$bag = new HashBagOStuff();
|
||||
$cp = new ChronologyProtector(
|
||||
|
|
@ -229,32 +257,60 @@ class LBFactoryTest extends MediaWikiTestCase {
|
|||
]
|
||||
);
|
||||
|
||||
$mockDB->expects( $this->exactly( 2 ) )->method( 'writesOrCallbacksPending' );
|
||||
$mockDB->expects( $this->exactly( 2 ) )->method( 'lastDoneWrites' );
|
||||
$mockDB1->expects( $this->exactly( 1 ) )->method( 'writesOrCallbacksPending' );
|
||||
$mockDB1->expects( $this->exactly( 1 ) )->method( 'lastDoneWrites' );
|
||||
$mockDB2->expects( $this->exactly( 1 ) )->method( 'writesOrCallbacksPending' );
|
||||
$mockDB2->expects( $this->exactly( 1 ) )->method( 'lastDoneWrites' );
|
||||
|
||||
// Nothing to wait for
|
||||
$cp->initLB( $lb );
|
||||
// Record in stash
|
||||
$cp->shutdownLB( $lb );
|
||||
$cp->shutdown();
|
||||
// Nothing to wait for on first HTTP request start
|
||||
$cp->initLB( $lb1 );
|
||||
$cp->initLB( $lb2 );
|
||||
// Record positions in stash on first HTTP request end
|
||||
$cp->shutdownLB( $lb1 );
|
||||
$cp->shutdownLB( $lb2 );
|
||||
$cpIndex = null;
|
||||
$cp->shutdown( null, 'sync', $cpIndex );
|
||||
|
||||
$this->assertEquals( 1, $cpIndex, "CP write index set" );
|
||||
|
||||
// (b) Second HTTP request
|
||||
|
||||
// Load balancer for master DB 1
|
||||
$lb1 = $this->getMockBuilder( 'LoadBalancer' )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$lb1->method( 'getServerCount' )->willReturn( 2 );
|
||||
$lb1->method( 'getServerName' )->with( 0 )->willReturn( 'master1' );
|
||||
$lb1->expects( $this->once() )
|
||||
->method( 'waitFor' )->with( $this->equalTo( $m1Pos ) );
|
||||
// Load balancer for master DB 2
|
||||
$lb2 = $this->getMockBuilder( 'LoadBalancer' )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
$lb2->method( 'getServerCount' )->willReturn( 2 );
|
||||
$lb2->method( 'getServerName' )->with( 0 )->willReturn( 'master2' );
|
||||
$lb2->expects( $this->once() )
|
||||
->method( 'waitFor' )->with( $this->equalTo( $m2Pos ) );
|
||||
|
||||
$cp = new ChronologyProtector(
|
||||
$bag,
|
||||
[
|
||||
'ip' => '127.0.0.1',
|
||||
'agent' => "Totally-Not-FireFox"
|
||||
]
|
||||
],
|
||||
$cpIndex
|
||||
);
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'waitFor' )->with( $this->equalTo( $mPos ) );
|
||||
// Wait for last positions to be reached on second HTTP request start
|
||||
$cp->initLB( $lb1 );
|
||||
$cp->initLB( $lb2 );
|
||||
// Shutdown (nothing to record)
|
||||
$cp->shutdownLB( $lb1 );
|
||||
$cp->shutdownLB( $lb2 );
|
||||
$cpIndex = null;
|
||||
$cp->shutdown( null, 'sync', $cpIndex );
|
||||
|
||||
// Wait
|
||||
$cp->initLB( $lb );
|
||||
// Record in stash
|
||||
$cp->shutdownLB( $lb );
|
||||
$cp->shutdown();
|
||||
$this->assertEquals( null, $cpIndex, "CP write index retained" );
|
||||
}
|
||||
|
||||
private function newLBFactoryMulti( array $baseOverride = [], array $serverOverride = [] ) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue