Support masking the WRITE_SYNC latency from ChronologyProtector
* Use OutputPage::output() as the method to mask latency, since it takes a good while to run. By the time it runs, cache replication should have caught up, so the reap call will likely not block. * For redirects emitted after changes in POST, instead of masking with OutputPage, add a parameter to the redirect and block on the positions appearing. This uses the redirection RTT to mask the replication latency. Change-Id: Ib23690c302e8033610fef9a0ef451dafe8a5803e
This commit is contained in:
parent
75f7a86b2e
commit
a3dacac90f
11 changed files with 275 additions and 55 deletions
|
|
@ -54,6 +54,11 @@ production.
|
|||
* mw.Api has a new option, useUS, to use U+001F (Unit Separator) when
|
||||
appropriate for sending multi-valued parameters. This defaults to true when
|
||||
the mw.Api instance seems to be for the local wiki.
|
||||
* After a client performs an action which alters a database that has replica databases,
|
||||
MediaWiki will wait for the replica databases to synchronize with the master database
|
||||
while it renders the HTML output. However, if the output is a redirect, it will instead
|
||||
alter the redirect URL to include a ?cpPosTime parameter that triggers the database
|
||||
synchronization when the URL is followed by the client.
|
||||
|
||||
=== External library changes in 1.28 ===
|
||||
|
||||
|
|
|
|||
|
|
@ -535,10 +535,11 @@ class MediaWiki {
|
|||
|
||||
/**
|
||||
* @see MediaWiki::preOutputCommit()
|
||||
* @param callable $postCommitWork [default: null]
|
||||
* @since 1.26
|
||||
*/
|
||||
public function doPreOutputCommit() {
|
||||
self::preOutputCommit( $this->context );
|
||||
public function doPreOutputCommit( callable $postCommitWork = null ) {
|
||||
self::preOutputCommit( $this->context, $postCommitWork );
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -546,33 +547,61 @@ class MediaWiki {
|
|||
* the user can receive a response (in case commit fails)
|
||||
*
|
||||
* @param IContextSource $context
|
||||
* @param callable $postCommitWork [default: null]
|
||||
* @since 1.27
|
||||
*/
|
||||
public static function preOutputCommit( IContextSource $context ) {
|
||||
public static function preOutputCommit(
|
||||
IContextSource $context, callable $postCommitWork = null
|
||||
) {
|
||||
// Either all DBs should commit or none
|
||||
ignore_user_abort( true );
|
||||
|
||||
$config = $context->getConfig();
|
||||
|
||||
$request = $context->getRequest();
|
||||
$output = $context->getOutput();
|
||||
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
|
||||
|
||||
// Commit all changes
|
||||
$lbFactory->commitMasterChanges(
|
||||
__METHOD__,
|
||||
// Abort if any transaction was too big
|
||||
[ 'maxWriteDuration' => $config->get( 'MaxUserDBWriteDuration' ) ]
|
||||
);
|
||||
wfDebug( __METHOD__ . ': primary transaction round committed' );
|
||||
|
||||
// Run updates that need to block the user or affect output (this is the last chance)
|
||||
DeferredUpdates::doUpdates( 'enqueue', DeferredUpdates::PRESEND );
|
||||
wfDebug( __METHOD__ . ': pre-send deferred updates completed' );
|
||||
|
||||
// Record ChronologyProtector positions
|
||||
$lbFactory->shutdown();
|
||||
wfDebug( __METHOD__ . ': all transactions committed' );
|
||||
// Decide when clients block on ChronologyProtector DB position writes
|
||||
if (
|
||||
$request->wasPosted() &&
|
||||
$output->getRedirect() &&
|
||||
$lbFactory->hasOrMadeRecentMasterChanges( INF ) &&
|
||||
self::isWikiClusterURL( $output->getRedirect(), $context )
|
||||
) {
|
||||
// OutputPage::output() will be fast; $postCommitWork will not be useful for
|
||||
// masking the latency of syncing DB positions accross all datacenters synchronously.
|
||||
// Instead, make use of the RTT time of the client follow redirects.
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
|
||||
// Client's next request should see 1+ positions with this DBMasterPos::asOf() time
|
||||
$safeUrl = $lbFactory->appendPreShutdownTimeAsQuery(
|
||||
$output->getRedirect(),
|
||||
microtime( true )
|
||||
);
|
||||
$output->redirect( $safeUrl );
|
||||
} else {
|
||||
// OutputPage::output() is fairly slow; run it in $postCommitWork to mask
|
||||
// the latency of syncing DB positions accross all datacenters synchronously
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_SYNC;
|
||||
}
|
||||
// Record ChronologyProtector positions for DBs affected in this request at this point
|
||||
$lbFactory->shutdown( $flags, $postCommitWork );
|
||||
wfDebug( __METHOD__ . ': LBFactory shutdown completed' );
|
||||
|
||||
// Set a cookie to tell all CDN edge nodes to "stick" the user to the DC that handles this
|
||||
// POST request (e.g. the "master" data center). Also have the user briefly bypass CDN so
|
||||
// ChronologyProtector works for cacheable URLs.
|
||||
$request = $context->getRequest();
|
||||
if ( $request->wasPosted() && $lbFactory->hasOrMadeRecentMasterChanges() ) {
|
||||
$expires = time() + $config->get( 'DataCenterUpdateStickTTL' );
|
||||
$options = [ 'prefix' => '' ];
|
||||
|
|
@ -584,7 +613,7 @@ class MediaWiki {
|
|||
// also intimately related to the value of $wgCdnReboundPurgeDelay.
|
||||
if ( $lbFactory->laggedReplicaUsed() ) {
|
||||
$maxAge = $config->get( 'CdnMaxageLagged' );
|
||||
$context->getOutput()->lowerCdnMaxage( $maxAge );
|
||||
$output->lowerCdnMaxage( $maxAge );
|
||||
$request->response()->header( "X-Database-Lagged: true" );
|
||||
wfDebugLog( 'replication', "Lagged DB used; CDN cache TTL limited to $maxAge seconds" );
|
||||
}
|
||||
|
|
@ -592,11 +621,42 @@ class MediaWiki {
|
|||
// Avoid long-term cache pollution due to message cache rebuild timeouts (T133069)
|
||||
if ( MessageCache::singleton()->isDisabled() ) {
|
||||
$maxAge = $config->get( 'CdnMaxageSubstitute' );
|
||||
$context->getOutput()->lowerCdnMaxage( $maxAge );
|
||||
$output->lowerCdnMaxage( $maxAge );
|
||||
$request->response()->header( "X-Response-Substitute: true" );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $url
|
||||
* @param IContextSource $context
|
||||
* @return bool Whether $url is to something on this wiki farm
|
||||
*/
|
||||
private function isWikiClusterURL( $url, IContextSource $context ) {
|
||||
static $relevantKeys = [ 'host' => true, 'port' => true ];
|
||||
|
||||
$infoCandidate = wfParseUrl( $url );
|
||||
if ( $infoCandidate === false ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$infoCandidate = array_intersect_key( $infoCandidate, $relevantKeys );
|
||||
$clusterHosts = array_merge(
|
||||
// Local wiki host (the most common case)
|
||||
[ $context->getConfig()->get( 'CanonicalServer' ) ],
|
||||
// Any local/remote wiki virtual hosts for this wiki farm
|
||||
$context->getConfig()->get( 'LocalVirtualHosts' )
|
||||
);
|
||||
|
||||
foreach ( $clusterHosts as $clusterHost ) {
|
||||
$infoHost = array_intersect_key( wfParseUrl( $clusterHost ), $relevantKeys );
|
||||
if ( $infoCandidate === $infoHost ) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function does work that can be done *after* the
|
||||
* user gets the HTTP response so they don't block on it
|
||||
|
|
@ -614,10 +674,9 @@ class MediaWiki {
|
|||
// Show visible profiling data if enabled (which cannot be post-send)
|
||||
Profiler::instance()->logDataPageOutputOnly();
|
||||
|
||||
$that = $this;
|
||||
$callback = function () use ( $that, $mode ) {
|
||||
$callback = function () use ( $mode ) {
|
||||
try {
|
||||
$that->restInPeace( $mode );
|
||||
$this->restInPeace( $mode );
|
||||
} catch ( Exception $e ) {
|
||||
MWExceptionHandler::handleException( $e );
|
||||
}
|
||||
|
|
@ -643,6 +702,7 @@ class MediaWiki {
|
|||
private function main() {
|
||||
global $wgTitle;
|
||||
|
||||
$output = $this->context->getOutput();
|
||||
$request = $this->context->getRequest();
|
||||
|
||||
// Send Ajax requests to the Ajax dispatcher.
|
||||
|
|
@ -656,6 +716,7 @@ class MediaWiki {
|
|||
|
||||
$dispatcher = new AjaxDispatcher( $this->config );
|
||||
$dispatcher->performAction( $this->context->getUser() );
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -717,11 +778,11 @@ class MediaWiki {
|
|||
// Setup dummy Title, otherwise OutputPage::redirect will fail
|
||||
$title = Title::newFromText( 'REDIR', NS_MAIN );
|
||||
$this->context->setTitle( $title );
|
||||
$output = $this->context->getOutput();
|
||||
// Since we only do this redir to change proto, always send a vary header
|
||||
$output->addVaryHeader( 'X-Forwarded-Proto' );
|
||||
$output->redirect( $redirUrl );
|
||||
$output->output();
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
@ -733,14 +794,15 @@ class MediaWiki {
|
|||
if ( $cache->isCacheGood( /* Assume up to date */ ) ) {
|
||||
// Check incoming headers to see if client has this cached
|
||||
$timestamp = $cache->cacheTimestamp();
|
||||
if ( !$this->context->getOutput()->checkLastModified( $timestamp ) ) {
|
||||
if ( !$output->checkLastModified( $timestamp ) ) {
|
||||
$cache->loadFromFileCache( $this->context );
|
||||
}
|
||||
// Do any stats increment/watchlist stuff
|
||||
// Assume we're viewing the latest revision (this should always be the case with file cache)
|
||||
$this->context->getWikiPage()->doViewUpdates( $this->context->getUser() );
|
||||
// Tell OutputPage that output is taken care of
|
||||
$this->context->getOutput()->disable();
|
||||
$output->disable();
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
@ -749,13 +811,24 @@ class MediaWiki {
|
|||
// Actually do the work of the request and build up any output
|
||||
$this->performRequest();
|
||||
|
||||
// GUI-ify and stash the page output in MediaWiki::doPreOutputCommit() while
|
||||
// ChronologyProtector synchronizes DB positions or slaves accross all datacenters.
|
||||
$buffer = null;
|
||||
$outputWork = function () use ( $output, &$buffer ) {
|
||||
if ( $buffer === null ) {
|
||||
$buffer = $output->output( true );
|
||||
}
|
||||
|
||||
return $buffer;
|
||||
};
|
||||
|
||||
// Now commit any transactions, so that unreported errors after
|
||||
// output() don't roll back the whole DB transaction and so that
|
||||
// we avoid having both success and error text in the response
|
||||
$this->doPreOutputCommit();
|
||||
$this->doPreOutputCommit( $outputWork );
|
||||
|
||||
// Output everything!
|
||||
$this->context->getOutput()->output();
|
||||
// Now send the actual output
|
||||
print $outputWork();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -2214,10 +2214,16 @@ class OutputPage extends ContextSource {
|
|||
/**
|
||||
* Finally, all the text has been munged and accumulated into
|
||||
* the object, let's actually output it:
|
||||
*
|
||||
* @param bool $return Set to true to get the result as a string rather than sending it
|
||||
* @return string|null
|
||||
* @throws Exception
|
||||
* @throws FatalError
|
||||
* @throws MWException
|
||||
*/
|
||||
public function output() {
|
||||
public function output( $return = false ) {
|
||||
if ( $this->mDoNothing ) {
|
||||
return;
|
||||
return $return ? '' : null;
|
||||
}
|
||||
|
||||
$response = $this->getRequest()->response();
|
||||
|
|
@ -2253,7 +2259,7 @@ class OutputPage extends ContextSource {
|
|||
}
|
||||
}
|
||||
|
||||
return;
|
||||
return $return ? '' : null;
|
||||
} elseif ( $this->mStatusCode ) {
|
||||
$response->statusHeader( $this->mStatusCode );
|
||||
}
|
||||
|
|
@ -2322,8 +2328,12 @@ class OutputPage extends ContextSource {
|
|||
|
||||
$this->sendCacheControl();
|
||||
|
||||
ob_end_flush();
|
||||
|
||||
if ( $return ) {
|
||||
return ob_get_clean();
|
||||
} else {
|
||||
ob_end_flush();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -33,6 +33,10 @@ class ChronologyProtector {
|
|||
protected $key;
|
||||
/** @var string Hash of client parameters */
|
||||
protected $clientId;
|
||||
/** @var float|null Minimum UNIX timestamp of 1+ expected startup positions */
|
||||
protected $waitForPosTime;
|
||||
/** @var int Max seconds to wait on positions to appear */
|
||||
protected $waitForPosTimeout = self::POS_WAIT_TIMEOUT;
|
||||
/** @var bool Whether to no-op all method calls */
|
||||
protected $enabled = true;
|
||||
/** @var bool Whether to check and wait on positions */
|
||||
|
|
@ -47,15 +51,22 @@ class ChronologyProtector {
|
|||
/** @var float[] Map of (DB master name => 1) */
|
||||
protected $shutdownTouchDBs = [];
|
||||
|
||||
/** @var integer Seconds to store positions */
|
||||
const POSITION_TTL = 60;
|
||||
/** @var integer Max time to wait for positions to appear */
|
||||
const POS_WAIT_TIMEOUT = 5;
|
||||
|
||||
/**
|
||||
* @param BagOStuff $store
|
||||
* @param array $client Map of (ip: <IP>, agent: <user-agent>)
|
||||
* @param float $posTime UNIX timestamp
|
||||
* @since 1.27
|
||||
*/
|
||||
public function __construct( BagOStuff $store, array $client ) {
|
||||
public function __construct( BagOStuff $store, array $client, $posTime = null ) {
|
||||
$this->store = $store;
|
||||
$this->clientId = md5( $client['ip'] . "\n" . $client['agent'] );
|
||||
$this->key = $store->makeGlobalKey( __CLASS__, $this->clientId );
|
||||
$this->waitForPosTime = $posTime;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -130,20 +141,23 @@ class ChronologyProtector {
|
|||
* Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
|
||||
* May commit chronology data to persistent storage.
|
||||
*
|
||||
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
|
||||
* @param string $mode One of (sync, async); whether to wait on remote datacenters
|
||||
* @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
|
||||
*/
|
||||
public function shutdown() {
|
||||
public function shutdown( callable $workCallback = null, $mode = 'sync' ) {
|
||||
if ( !$this->enabled ) {
|
||||
return [];
|
||||
}
|
||||
|
||||
$store = $this->store;
|
||||
// Some callers might want to know if a user recently touched a DB.
|
||||
// These writes do not need to block on all datacenters receiving them.
|
||||
foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
|
||||
$this->store->set(
|
||||
$store->set(
|
||||
$this->getTouchedKey( $this->store, $dbName ),
|
||||
microtime( true ),
|
||||
BagOStuff::TTL_DAY
|
||||
$store::TTL_DAY
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -159,29 +173,42 @@ class ChronologyProtector {
|
|||
// CP-protected writes should overwhemingly go to the master datacenter, so get DC-local
|
||||
// lock to merge the values. Use a DC-local get() and a synchronous all-DC set(). This
|
||||
// makes it possible for the BagOStuff class to write in parallel to all DCs with one RTT.
|
||||
if ( $this->store->lock( $this->key, 3 ) ) {
|
||||
$ok = $this->store->set(
|
||||
if ( $store->lock( $this->key, 3 ) ) {
|
||||
if ( $workCallback ) {
|
||||
// Let the store run the work before blocking on a replication sync barrier. By the
|
||||
// time it's done with the work, the barrier should be fast if replication caught up.
|
||||
$store->addBusyCallback( $workCallback );
|
||||
}
|
||||
$ok = $store->set(
|
||||
$this->key,
|
||||
self::mergePositions( $this->store->get( $this->key ), $this->shutdownPositions ),
|
||||
BagOStuff::TTL_MINUTE,
|
||||
BagOStuff::WRITE_SYNC
|
||||
self::mergePositions( $store->get( $this->key ), $this->shutdownPositions ),
|
||||
self::POSITION_TTL,
|
||||
( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
|
||||
);
|
||||
$this->store->unlock( $this->key );
|
||||
$store->unlock( $this->key );
|
||||
} else {
|
||||
$ok = false;
|
||||
}
|
||||
|
||||
if ( !$ok ) {
|
||||
$bouncedPositions = $this->shutdownPositions;
|
||||
// Raced out too many times or stash is down
|
||||
wfDebugLog( 'replication',
|
||||
__METHOD__ . ": failed to save master pos for " .
|
||||
implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
|
||||
);
|
||||
|
||||
return $this->shutdownPositions;
|
||||
} elseif ( $mode === 'sync' &&
|
||||
$store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
|
||||
) {
|
||||
// Positions may not be in all datacenters, force LBFactory to play it safe
|
||||
wfDebugLog( 'replication',
|
||||
__METHOD__ . ": store does not report ability to sync replicas. " );
|
||||
$bouncedPositions = $this->shutdownPositions;
|
||||
} else {
|
||||
$bouncedPositions = [];
|
||||
}
|
||||
|
||||
return [];
|
||||
return $bouncedPositions;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -212,7 +239,33 @@ class ChronologyProtector {
|
|||
|
||||
$this->initialized = true;
|
||||
if ( $this->wait ) {
|
||||
$data = $this->store->get( $this->key );
|
||||
// If there is an expectation to see master positions with a certain min
|
||||
// timestamp, then block until they appear, or until a timeout is reached.
|
||||
if ( $this->waitForPosTime ) {
|
||||
$data = null;
|
||||
$loop = new WaitConditionLoop(
|
||||
function () use ( &$data ) {
|
||||
$data = $this->store->get( $this->key );
|
||||
|
||||
return ( self::minPosTime( $data ) >= $this->waitForPosTime )
|
||||
? WaitConditionLoop::CONDITION_REACHED
|
||||
: WaitConditionLoop::CONDITION_CONTINUE;
|
||||
},
|
||||
$this->waitForPosTimeout
|
||||
);
|
||||
$result = $loop->invoke();
|
||||
$waitedMs = $loop->getLastWaitTime() * 1e3;
|
||||
|
||||
if ( $result == $loop::CONDITION_REACHED ) {
|
||||
$msg = "expected and found pos time {$this->waitForPosTime} ({$waitedMs}ms)";
|
||||
} else {
|
||||
$msg = "expected but missed pos time {$this->waitForPosTime} ({$waitedMs}ms)";
|
||||
}
|
||||
wfDebugLog( 'replication', $msg );
|
||||
} else {
|
||||
$data = $this->store->get( $this->key );
|
||||
}
|
||||
|
||||
$this->startupPositions = $data ? $data['positions'] : [];
|
||||
wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (read)\n" );
|
||||
} else {
|
||||
|
|
@ -221,6 +274,24 @@ class ChronologyProtector {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array|bool $data
|
||||
* @return float|null
|
||||
*/
|
||||
private static function minPosTime( $data ) {
|
||||
if ( !isset( $data['positions'] ) ) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$min = null;
|
||||
foreach ( $data['positions'] as $pos ) {
|
||||
/** @var DBMasterPos $pos */
|
||||
$min = $min ? min( $pos->asOfTime(), $min ) : $pos->asOfTime();
|
||||
}
|
||||
|
||||
return $min;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array|bool $curValue
|
||||
* @param DBMasterPos[] $shutdownPositions
|
||||
|
|
|
|||
|
|
@ -51,7 +51,9 @@ abstract class LBFactory implements DestructibleService {
|
|||
/** @var callable[] */
|
||||
protected $replicationWaitCallbacks = [];
|
||||
|
||||
const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
|
||||
const SHUTDOWN_NO_CHRONPROT = 0; // don't save DB positions at all
|
||||
const SHUTDOWN_CHRONPROT_ASYNC = 1; // save DB positions, but don't wait on remote DCs
|
||||
const SHUTDOWN_CHRONPROT_SYNC = 2; // save DB positions, waiting on all DCs
|
||||
|
||||
/**
|
||||
* Construct a factory based on a configuration array (typically from $wgLBFactoryConf)
|
||||
|
|
@ -87,7 +89,7 @@ abstract class LBFactory implements DestructibleService {
|
|||
* @see LoadBalancer::disable()
|
||||
*/
|
||||
public function destroy() {
|
||||
$this->shutdown();
|
||||
$this->shutdown( self::SHUTDOWN_NO_CHRONPROT );
|
||||
$this->forEachLBCallMethod( 'disable' );
|
||||
}
|
||||
|
||||
|
|
@ -199,12 +201,18 @@ abstract class LBFactory implements DestructibleService {
|
|||
|
||||
/**
|
||||
* Prepare all tracked load balancers for shutdown
|
||||
* @param integer $flags Supports SHUTDOWN_* flags
|
||||
* @param integer $mode One of the class SHUTDOWN_* constants
|
||||
* @param callable|null $workCallback Work to mask ChronologyProtector writes
|
||||
*/
|
||||
public function shutdown( $flags = 0 ) {
|
||||
if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
|
||||
$this->shutdownChronologyProtector( $this->chronProt );
|
||||
public function shutdown(
|
||||
$mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null
|
||||
) {
|
||||
if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
|
||||
$this->shutdownChronologyProtector( $this->chronProt, $workCallback, 'sync' );
|
||||
} elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
|
||||
$this->shutdownChronologyProtector( $this->chronProt, null, 'async' );
|
||||
}
|
||||
|
||||
$this->commitMasterChanges( __METHOD__ ); // sanity
|
||||
}
|
||||
|
||||
|
|
@ -387,13 +395,14 @@ abstract class LBFactory implements DestructibleService {
|
|||
|
||||
/**
|
||||
* Determine if any master connection has pending/written changes from this request
|
||||
* @param float $age How many seconds ago is "recent" [defaults to LB lag wait timeout]
|
||||
* @return bool
|
||||
* @since 1.27
|
||||
*/
|
||||
public function hasOrMadeRecentMasterChanges() {
|
||||
public function hasOrMadeRecentMasterChanges( $age = null ) {
|
||||
$ret = false;
|
||||
$this->forEachLB( function ( LoadBalancer $lb ) use ( &$ret ) {
|
||||
$ret = $ret || $lb->hasOrMadeRecentMasterChanges();
|
||||
$this->forEachLB( function ( LoadBalancer $lb ) use ( $age, &$ret ) {
|
||||
$ret = $ret || $lb->hasOrMadeRecentMasterChanges( $age );
|
||||
} );
|
||||
return $ret;
|
||||
}
|
||||
|
|
@ -584,8 +593,9 @@ abstract class LBFactory implements DestructibleService {
|
|||
ObjectCache::getMainStashInstance(),
|
||||
[
|
||||
'ip' => $request->getIP(),
|
||||
'agent' => $request->getHeader( 'User-Agent' )
|
||||
]
|
||||
'agent' => $request->getHeader( 'User-Agent' ),
|
||||
],
|
||||
$request->getFloat( 'cpPosTime', null )
|
||||
);
|
||||
if ( PHP_SAPI === 'cli' ) {
|
||||
$chronProt->setEnabled( false );
|
||||
|
|
@ -599,15 +609,26 @@ abstract class LBFactory implements DestructibleService {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get and record all of the staged DB positions into persistent memory storage
|
||||
*
|
||||
* @param ChronologyProtector $cp
|
||||
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
|
||||
* @param string $mode One of (sync, async); whether to wait on remote datacenters
|
||||
*/
|
||||
protected function shutdownChronologyProtector( ChronologyProtector $cp ) {
|
||||
// Get all the master positions needed
|
||||
protected function shutdownChronologyProtector(
|
||||
ChronologyProtector $cp, $workCallback, $mode
|
||||
) {
|
||||
// Record all the master positions needed
|
||||
$this->forEachLB( function ( LoadBalancer $lb ) use ( $cp ) {
|
||||
$cp->shutdownLB( $lb );
|
||||
} );
|
||||
// Write them to the stash
|
||||
$unsavedPositions = $cp->shutdown();
|
||||
// Write them to the persistent stash. Try to do something useful by running $work
|
||||
// while ChronologyProtector waits for the stash write to replicate to all DCs.
|
||||
$unsavedPositions = $cp->shutdown( $workCallback, $mode );
|
||||
if ( $unsavedPositions && $workCallback ) {
|
||||
// Invoke callback in case it did not cache the result yet
|
||||
$workCallback(); // work now to block for less time in waitForAll()
|
||||
}
|
||||
// If the positions failed to write to the stash, at least wait on local datacenter
|
||||
// replica DBs to catch up before responding. Even if there are several DCs, this increases
|
||||
// the chance that the user will see their own changes immediately afterwards. As long
|
||||
|
|
@ -629,6 +650,29 @@ abstract class LBFactory implements DestructibleService {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Append ?cpPosTime parameter to a URL for ChronologyProtector purposes if needed
|
||||
*
|
||||
* Note that unlike cookies, this works accross domains
|
||||
*
|
||||
* @param string $url
|
||||
* @param float $time UNIX timestamp just before shutdown() was called
|
||||
* @return string
|
||||
* @since 1.28
|
||||
*/
|
||||
public function appendPreShutdownTimeAsQuery( $url, $time ) {
|
||||
$usedCluster = 0;
|
||||
$this->forEachLB( function ( LoadBalancer $lb ) use ( &$usedCluster ) {
|
||||
$usedCluster |= ( $lb->getServerCount() > 1 );
|
||||
} );
|
||||
|
||||
if ( !$usedCluster ) {
|
||||
return $url; // no master/replica clusters touched
|
||||
}
|
||||
|
||||
return wfAppendQuery( $url, [ 'cpPosTime' => $time ] );
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all open database connections on all open load balancers.
|
||||
* @since 1.28
|
||||
|
|
@ -636,5 +680,4 @@ abstract class LBFactory implements DestructibleService {
|
|||
public function closeAll() {
|
||||
$this->forEachLBCallMethod( 'closeAll', [] );
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,6 +47,12 @@ interface IExpiringStore {
|
|||
// Medium attributes constants related to emulation or media type
|
||||
const ATTR_EMULATION = 1;
|
||||
const QOS_EMULATION_SQL = 1;
|
||||
// Medium attributes constants related to replica consistency
|
||||
const ATTR_SYNCWRITES = 2; // SYNC_WRITES flag support
|
||||
const QOS_SYNCWRITES_NONE = 1; // replication only supports eventual consistency or less
|
||||
const QOS_SYNCWRITES_BE = 2; // best effort synchronous with limited retries
|
||||
const QOS_SYNCWRITES_QC = 3; // write quorum applied directly to state machines where R+W > N
|
||||
const QOS_SYNCWRITES_SS = 4; // strict-serializable, nodes refuse reads if possible stale
|
||||
// Generic "unknown" value that is useful for comparisons (e.g. always good enough)
|
||||
const QOS_UNKNOWN = INF;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,12 @@ class MemcachedBagOStuff extends BagOStuff {
|
|||
/** @var MemcachedClient|Memcached */
|
||||
protected $client;
|
||||
|
||||
function __construct( array $params ) {
|
||||
parent::__construct( $params );
|
||||
|
||||
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE; // unreliable
|
||||
}
|
||||
|
||||
/**
|
||||
* Fill in some defaults for missing keys in $params.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -51,6 +51,8 @@ class RESTBagOStuff extends BagOStuff {
|
|||
}
|
||||
// Make sure URL ends with /
|
||||
$this->url = rtrim( $params['url'], '/' ) . '/';
|
||||
// Default config, R+W > N; no locks on reads though; writes go straight to state-machine
|
||||
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_QC;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -83,6 +83,8 @@ class RedisBagOStuff extends BagOStuff {
|
|||
} else {
|
||||
$this->automaticFailover = true;
|
||||
}
|
||||
|
||||
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
|
||||
}
|
||||
|
||||
protected function doGet( $key, $flags = 0 ) {
|
||||
|
|
|
|||
|
|
@ -97,6 +97,7 @@ class SqlBagOStuff extends BagOStuff {
|
|||
parent::__construct( $params );
|
||||
|
||||
$this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
|
||||
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
|
||||
|
||||
if ( isset( $params['servers'] ) ) {
|
||||
$this->serverInfos = [];
|
||||
|
|
@ -119,6 +120,7 @@ class SqlBagOStuff extends BagOStuff {
|
|||
// Default to using the main wiki's database servers
|
||||
$this->serverInfos = false;
|
||||
$this->numServers = 1;
|
||||
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE;
|
||||
}
|
||||
if ( isset( $params['purgePeriod'] ) ) {
|
||||
$this->purgePeriod = intval( $params['purgePeriod'] );
|
||||
|
|
|
|||
|
|
@ -121,4 +121,4 @@ wfLogProfilingData();
|
|||
// Commit and close up!
|
||||
$factory = wfGetLBFactory();
|
||||
$factory->commitMasterChanges( 'doMaintenance' );
|
||||
$factory->shutdown();
|
||||
$factory->shutdown( $factory::SHUTDOWN_NO_CHRONPROT );
|
||||
|
|
|
|||
Loading…
Reference in a new issue