Support masking the WRITE_SYNC latency from ChronologyProtector

* Use OutputPage::output() as the method to mask latency, since it
  takes a good while to run. By the time it runs, cache replication
  should have caught up, so the reap call will likely not block.
* For redirects emitted after changes in POST, instead of masking
  with OutputPage, add a parameter to the redirect and block on
  the positions appearing. This uses the redirection RTT to mask
  the replication latency.

Change-Id: Ib23690c302e8033610fef9a0ef451dafe8a5803e
This commit is contained in:
Aaron Schulz 2016-08-24 14:22:11 -07:00 committed by Tim Starling
parent 75f7a86b2e
commit a3dacac90f
11 changed files with 275 additions and 55 deletions

View file

@ -54,6 +54,11 @@ production.
* mw.Api has a new option, useUS, to use U+001F (Unit Separator) when
appropriate for sending multi-valued parameters. This defaults to true when
the mw.Api instance seems to be for the local wiki.
* After a client performs an action which alters a database that has replica databases,
MediaWiki will wait for the replica databases to synchronize with the master database
while it renders the HTML output. However, if the output is a redirect, it will instead
alter the redirect URL to include a ?cpPosTime parameter that triggers the database
synchronization when the URL is followed by the client.
=== External library changes in 1.28 ===

View file

@ -535,10 +535,11 @@ class MediaWiki {
/**
* @see MediaWiki::preOutputCommit()
* @param callable $postCommitWork [default: null]
* @since 1.26
*/
public function doPreOutputCommit() {
self::preOutputCommit( $this->context );
public function doPreOutputCommit( callable $postCommitWork = null ) {
self::preOutputCommit( $this->context, $postCommitWork );
}
/**
@ -546,33 +547,61 @@ class MediaWiki {
* the user can receive a response (in case commit fails)
*
* @param IContextSource $context
* @param callable $postCommitWork [default: null]
* @since 1.27
*/
public static function preOutputCommit( IContextSource $context ) {
public static function preOutputCommit(
IContextSource $context, callable $postCommitWork = null
) {
// Either all DBs should commit or none
ignore_user_abort( true );
$config = $context->getConfig();
$request = $context->getRequest();
$output = $context->getOutput();
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
// Commit all changes
$lbFactory->commitMasterChanges(
__METHOD__,
// Abort if any transaction was too big
[ 'maxWriteDuration' => $config->get( 'MaxUserDBWriteDuration' ) ]
);
wfDebug( __METHOD__ . ': primary transaction round committed' );
// Run updates that need to block the user or affect output (this is the last chance)
DeferredUpdates::doUpdates( 'enqueue', DeferredUpdates::PRESEND );
wfDebug( __METHOD__ . ': pre-send deferred updates completed' );
// Record ChronologyProtector positions
$lbFactory->shutdown();
wfDebug( __METHOD__ . ': all transactions committed' );
// Decide when clients block on ChronologyProtector DB position writes
if (
$request->wasPosted() &&
$output->getRedirect() &&
$lbFactory->hasOrMadeRecentMasterChanges( INF ) &&
self::isWikiClusterURL( $output->getRedirect(), $context )
) {
// OutputPage::output() will be fast; $postCommitWork will not be useful for
// masking the latency of syncing DB positions accross all datacenters synchronously.
// Instead, make use of the RTT time of the client follow redirects.
$flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
// Client's next request should see 1+ positions with this DBMasterPos::asOf() time
$safeUrl = $lbFactory->appendPreShutdownTimeAsQuery(
$output->getRedirect(),
microtime( true )
);
$output->redirect( $safeUrl );
} else {
// OutputPage::output() is fairly slow; run it in $postCommitWork to mask
// the latency of syncing DB positions accross all datacenters synchronously
$flags = $lbFactory::SHUTDOWN_CHRONPROT_SYNC;
}
// Record ChronologyProtector positions for DBs affected in this request at this point
$lbFactory->shutdown( $flags, $postCommitWork );
wfDebug( __METHOD__ . ': LBFactory shutdown completed' );
// Set a cookie to tell all CDN edge nodes to "stick" the user to the DC that handles this
// POST request (e.g. the "master" data center). Also have the user briefly bypass CDN so
// ChronologyProtector works for cacheable URLs.
$request = $context->getRequest();
if ( $request->wasPosted() && $lbFactory->hasOrMadeRecentMasterChanges() ) {
$expires = time() + $config->get( 'DataCenterUpdateStickTTL' );
$options = [ 'prefix' => '' ];
@ -584,7 +613,7 @@ class MediaWiki {
// also intimately related to the value of $wgCdnReboundPurgeDelay.
if ( $lbFactory->laggedReplicaUsed() ) {
$maxAge = $config->get( 'CdnMaxageLagged' );
$context->getOutput()->lowerCdnMaxage( $maxAge );
$output->lowerCdnMaxage( $maxAge );
$request->response()->header( "X-Database-Lagged: true" );
wfDebugLog( 'replication', "Lagged DB used; CDN cache TTL limited to $maxAge seconds" );
}
@ -592,11 +621,42 @@ class MediaWiki {
// Avoid long-term cache pollution due to message cache rebuild timeouts (T133069)
if ( MessageCache::singleton()->isDisabled() ) {
$maxAge = $config->get( 'CdnMaxageSubstitute' );
$context->getOutput()->lowerCdnMaxage( $maxAge );
$output->lowerCdnMaxage( $maxAge );
$request->response()->header( "X-Response-Substitute: true" );
}
}
/**
* @param string $url
* @param IContextSource $context
* @return bool Whether $url is to something on this wiki farm
*/
private function isWikiClusterURL( $url, IContextSource $context ) {
static $relevantKeys = [ 'host' => true, 'port' => true ];
$infoCandidate = wfParseUrl( $url );
if ( $infoCandidate === false ) {
return false;
}
$infoCandidate = array_intersect_key( $infoCandidate, $relevantKeys );
$clusterHosts = array_merge(
// Local wiki host (the most common case)
[ $context->getConfig()->get( 'CanonicalServer' ) ],
// Any local/remote wiki virtual hosts for this wiki farm
$context->getConfig()->get( 'LocalVirtualHosts' )
);
foreach ( $clusterHosts as $clusterHost ) {
$infoHost = array_intersect_key( wfParseUrl( $clusterHost ), $relevantKeys );
if ( $infoCandidate === $infoHost ) {
return true;
}
}
return false;
}
/**
* This function does work that can be done *after* the
* user gets the HTTP response so they don't block on it
@ -614,10 +674,9 @@ class MediaWiki {
// Show visible profiling data if enabled (which cannot be post-send)
Profiler::instance()->logDataPageOutputOnly();
$that = $this;
$callback = function () use ( $that, $mode ) {
$callback = function () use ( $mode ) {
try {
$that->restInPeace( $mode );
$this->restInPeace( $mode );
} catch ( Exception $e ) {
MWExceptionHandler::handleException( $e );
}
@ -643,6 +702,7 @@ class MediaWiki {
private function main() {
global $wgTitle;
$output = $this->context->getOutput();
$request = $this->context->getRequest();
// Send Ajax requests to the Ajax dispatcher.
@ -656,6 +716,7 @@ class MediaWiki {
$dispatcher = new AjaxDispatcher( $this->config );
$dispatcher->performAction( $this->context->getUser() );
return;
}
@ -717,11 +778,11 @@ class MediaWiki {
// Setup dummy Title, otherwise OutputPage::redirect will fail
$title = Title::newFromText( 'REDIR', NS_MAIN );
$this->context->setTitle( $title );
$output = $this->context->getOutput();
// Since we only do this redir to change proto, always send a vary header
$output->addVaryHeader( 'X-Forwarded-Proto' );
$output->redirect( $redirUrl );
$output->output();
return;
}
}
@ -733,14 +794,15 @@ class MediaWiki {
if ( $cache->isCacheGood( /* Assume up to date */ ) ) {
// Check incoming headers to see if client has this cached
$timestamp = $cache->cacheTimestamp();
if ( !$this->context->getOutput()->checkLastModified( $timestamp ) ) {
if ( !$output->checkLastModified( $timestamp ) ) {
$cache->loadFromFileCache( $this->context );
}
// Do any stats increment/watchlist stuff
// Assume we're viewing the latest revision (this should always be the case with file cache)
$this->context->getWikiPage()->doViewUpdates( $this->context->getUser() );
// Tell OutputPage that output is taken care of
$this->context->getOutput()->disable();
$output->disable();
return;
}
}
@ -749,13 +811,24 @@ class MediaWiki {
// Actually do the work of the request and build up any output
$this->performRequest();
// GUI-ify and stash the page output in MediaWiki::doPreOutputCommit() while
// ChronologyProtector synchronizes DB positions or slaves accross all datacenters.
$buffer = null;
$outputWork = function () use ( $output, &$buffer ) {
if ( $buffer === null ) {
$buffer = $output->output( true );
}
return $buffer;
};
// Now commit any transactions, so that unreported errors after
// output() don't roll back the whole DB transaction and so that
// we avoid having both success and error text in the response
$this->doPreOutputCommit();
$this->doPreOutputCommit( $outputWork );
// Output everything!
$this->context->getOutput()->output();
// Now send the actual output
print $outputWork();
}
/**

View file

@ -2214,10 +2214,16 @@ class OutputPage extends ContextSource {
/**
* Finally, all the text has been munged and accumulated into
* the object, let's actually output it:
*
* @param bool $return Set to true to get the result as a string rather than sending it
* @return string|null
* @throws Exception
* @throws FatalError
* @throws MWException
*/
public function output() {
public function output( $return = false ) {
if ( $this->mDoNothing ) {
return;
return $return ? '' : null;
}
$response = $this->getRequest()->response();
@ -2253,7 +2259,7 @@ class OutputPage extends ContextSource {
}
}
return;
return $return ? '' : null;
} elseif ( $this->mStatusCode ) {
$response->statusHeader( $this->mStatusCode );
}
@ -2322,8 +2328,12 @@ class OutputPage extends ContextSource {
$this->sendCacheControl();
ob_end_flush();
if ( $return ) {
return ob_get_clean();
} else {
ob_end_flush();
return null;
}
}
/**

View file

@ -33,6 +33,10 @@ class ChronologyProtector {
protected $key;
/** @var string Hash of client parameters */
protected $clientId;
/** @var float|null Minimum UNIX timestamp of 1+ expected startup positions */
protected $waitForPosTime;
/** @var int Max seconds to wait on positions to appear */
protected $waitForPosTimeout = self::POS_WAIT_TIMEOUT;
/** @var bool Whether to no-op all method calls */
protected $enabled = true;
/** @var bool Whether to check and wait on positions */
@ -47,15 +51,22 @@ class ChronologyProtector {
/** @var float[] Map of (DB master name => 1) */
protected $shutdownTouchDBs = [];
/** @var integer Seconds to store positions */
const POSITION_TTL = 60;
/** @var integer Max time to wait for positions to appear */
const POS_WAIT_TIMEOUT = 5;
/**
* @param BagOStuff $store
* @param array $client Map of (ip: <IP>, agent: <user-agent>)
* @param float $posTime UNIX timestamp
* @since 1.27
*/
public function __construct( BagOStuff $store, array $client ) {
public function __construct( BagOStuff $store, array $client, $posTime = null ) {
$this->store = $store;
$this->clientId = md5( $client['ip'] . "\n" . $client['agent'] );
$this->key = $store->makeGlobalKey( __CLASS__, $this->clientId );
$this->waitForPosTime = $posTime;
}
/**
@ -130,20 +141,23 @@ class ChronologyProtector {
* Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
* May commit chronology data to persistent storage.
*
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
* @param string $mode One of (sync, async); whether to wait on remote datacenters
* @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
*/
public function shutdown() {
public function shutdown( callable $workCallback = null, $mode = 'sync' ) {
if ( !$this->enabled ) {
return [];
}
$store = $this->store;
// Some callers might want to know if a user recently touched a DB.
// These writes do not need to block on all datacenters receiving them.
foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
$this->store->set(
$store->set(
$this->getTouchedKey( $this->store, $dbName ),
microtime( true ),
BagOStuff::TTL_DAY
$store::TTL_DAY
);
}
@ -159,29 +173,42 @@ class ChronologyProtector {
// CP-protected writes should overwhemingly go to the master datacenter, so get DC-local
// lock to merge the values. Use a DC-local get() and a synchronous all-DC set(). This
// makes it possible for the BagOStuff class to write in parallel to all DCs with one RTT.
if ( $this->store->lock( $this->key, 3 ) ) {
$ok = $this->store->set(
if ( $store->lock( $this->key, 3 ) ) {
if ( $workCallback ) {
// Let the store run the work before blocking on a replication sync barrier. By the
// time it's done with the work, the barrier should be fast if replication caught up.
$store->addBusyCallback( $workCallback );
}
$ok = $store->set(
$this->key,
self::mergePositions( $this->store->get( $this->key ), $this->shutdownPositions ),
BagOStuff::TTL_MINUTE,
BagOStuff::WRITE_SYNC
self::mergePositions( $store->get( $this->key ), $this->shutdownPositions ),
self::POSITION_TTL,
( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
);
$this->store->unlock( $this->key );
$store->unlock( $this->key );
} else {
$ok = false;
}
if ( !$ok ) {
$bouncedPositions = $this->shutdownPositions;
// Raced out too many times or stash is down
wfDebugLog( 'replication',
__METHOD__ . ": failed to save master pos for " .
implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
);
return $this->shutdownPositions;
} elseif ( $mode === 'sync' &&
$store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
) {
// Positions may not be in all datacenters, force LBFactory to play it safe
wfDebugLog( 'replication',
__METHOD__ . ": store does not report ability to sync replicas. " );
$bouncedPositions = $this->shutdownPositions;
} else {
$bouncedPositions = [];
}
return [];
return $bouncedPositions;
}
/**
@ -212,7 +239,33 @@ class ChronologyProtector {
$this->initialized = true;
if ( $this->wait ) {
$data = $this->store->get( $this->key );
// If there is an expectation to see master positions with a certain min
// timestamp, then block until they appear, or until a timeout is reached.
if ( $this->waitForPosTime ) {
$data = null;
$loop = new WaitConditionLoop(
function () use ( &$data ) {
$data = $this->store->get( $this->key );
return ( self::minPosTime( $data ) >= $this->waitForPosTime )
? WaitConditionLoop::CONDITION_REACHED
: WaitConditionLoop::CONDITION_CONTINUE;
},
$this->waitForPosTimeout
);
$result = $loop->invoke();
$waitedMs = $loop->getLastWaitTime() * 1e3;
if ( $result == $loop::CONDITION_REACHED ) {
$msg = "expected and found pos time {$this->waitForPosTime} ({$waitedMs}ms)";
} else {
$msg = "expected but missed pos time {$this->waitForPosTime} ({$waitedMs}ms)";
}
wfDebugLog( 'replication', $msg );
} else {
$data = $this->store->get( $this->key );
}
$this->startupPositions = $data ? $data['positions'] : [];
wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (read)\n" );
} else {
@ -221,6 +274,24 @@ class ChronologyProtector {
}
}
/**
* @param array|bool $data
* @return float|null
*/
private static function minPosTime( $data ) {
if ( !isset( $data['positions'] ) ) {
return null;
}
$min = null;
foreach ( $data['positions'] as $pos ) {
/** @var DBMasterPos $pos */
$min = $min ? min( $pos->asOfTime(), $min ) : $pos->asOfTime();
}
return $min;
}
/**
* @param array|bool $curValue
* @param DBMasterPos[] $shutdownPositions

View file

@ -51,7 +51,9 @@ abstract class LBFactory implements DestructibleService {
/** @var callable[] */
protected $replicationWaitCallbacks = [];
const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
const SHUTDOWN_NO_CHRONPROT = 0; // don't save DB positions at all
const SHUTDOWN_CHRONPROT_ASYNC = 1; // save DB positions, but don't wait on remote DCs
const SHUTDOWN_CHRONPROT_SYNC = 2; // save DB positions, waiting on all DCs
/**
* Construct a factory based on a configuration array (typically from $wgLBFactoryConf)
@ -87,7 +89,7 @@ abstract class LBFactory implements DestructibleService {
* @see LoadBalancer::disable()
*/
public function destroy() {
$this->shutdown();
$this->shutdown( self::SHUTDOWN_NO_CHRONPROT );
$this->forEachLBCallMethod( 'disable' );
}
@ -199,12 +201,18 @@ abstract class LBFactory implements DestructibleService {
/**
* Prepare all tracked load balancers for shutdown
* @param integer $flags Supports SHUTDOWN_* flags
* @param integer $mode One of the class SHUTDOWN_* constants
* @param callable|null $workCallback Work to mask ChronologyProtector writes
*/
public function shutdown( $flags = 0 ) {
if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
$this->shutdownChronologyProtector( $this->chronProt );
public function shutdown(
$mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null
) {
if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
$this->shutdownChronologyProtector( $this->chronProt, $workCallback, 'sync' );
} elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
$this->shutdownChronologyProtector( $this->chronProt, null, 'async' );
}
$this->commitMasterChanges( __METHOD__ ); // sanity
}
@ -387,13 +395,14 @@ abstract class LBFactory implements DestructibleService {
/**
* Determine if any master connection has pending/written changes from this request
* @param float $age How many seconds ago is "recent" [defaults to LB lag wait timeout]
* @return bool
* @since 1.27
*/
public function hasOrMadeRecentMasterChanges() {
public function hasOrMadeRecentMasterChanges( $age = null ) {
$ret = false;
$this->forEachLB( function ( LoadBalancer $lb ) use ( &$ret ) {
$ret = $ret || $lb->hasOrMadeRecentMasterChanges();
$this->forEachLB( function ( LoadBalancer $lb ) use ( $age, &$ret ) {
$ret = $ret || $lb->hasOrMadeRecentMasterChanges( $age );
} );
return $ret;
}
@ -584,8 +593,9 @@ abstract class LBFactory implements DestructibleService {
ObjectCache::getMainStashInstance(),
[
'ip' => $request->getIP(),
'agent' => $request->getHeader( 'User-Agent' )
]
'agent' => $request->getHeader( 'User-Agent' ),
],
$request->getFloat( 'cpPosTime', null )
);
if ( PHP_SAPI === 'cli' ) {
$chronProt->setEnabled( false );
@ -599,15 +609,26 @@ abstract class LBFactory implements DestructibleService {
}
/**
* Get and record all of the staged DB positions into persistent memory storage
*
* @param ChronologyProtector $cp
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
* @param string $mode One of (sync, async); whether to wait on remote datacenters
*/
protected function shutdownChronologyProtector( ChronologyProtector $cp ) {
// Get all the master positions needed
protected function shutdownChronologyProtector(
ChronologyProtector $cp, $workCallback, $mode
) {
// Record all the master positions needed
$this->forEachLB( function ( LoadBalancer $lb ) use ( $cp ) {
$cp->shutdownLB( $lb );
} );
// Write them to the stash
$unsavedPositions = $cp->shutdown();
// Write them to the persistent stash. Try to do something useful by running $work
// while ChronologyProtector waits for the stash write to replicate to all DCs.
$unsavedPositions = $cp->shutdown( $workCallback, $mode );
if ( $unsavedPositions && $workCallback ) {
// Invoke callback in case it did not cache the result yet
$workCallback(); // work now to block for less time in waitForAll()
}
// If the positions failed to write to the stash, at least wait on local datacenter
// replica DBs to catch up before responding. Even if there are several DCs, this increases
// the chance that the user will see their own changes immediately afterwards. As long
@ -629,6 +650,29 @@ abstract class LBFactory implements DestructibleService {
}
}
/**
* Append ?cpPosTime parameter to a URL for ChronologyProtector purposes if needed
*
* Note that unlike cookies, this works accross domains
*
* @param string $url
* @param float $time UNIX timestamp just before shutdown() was called
* @return string
* @since 1.28
*/
public function appendPreShutdownTimeAsQuery( $url, $time ) {
$usedCluster = 0;
$this->forEachLB( function ( LoadBalancer $lb ) use ( &$usedCluster ) {
$usedCluster |= ( $lb->getServerCount() > 1 );
} );
if ( !$usedCluster ) {
return $url; // no master/replica clusters touched
}
return wfAppendQuery( $url, [ 'cpPosTime' => $time ] );
}
/**
* Close all open database connections on all open load balancers.
* @since 1.28
@ -636,5 +680,4 @@ abstract class LBFactory implements DestructibleService {
public function closeAll() {
$this->forEachLBCallMethod( 'closeAll', [] );
}
}

View file

@ -47,6 +47,12 @@ interface IExpiringStore {
// Medium attributes constants related to emulation or media type
const ATTR_EMULATION = 1;
const QOS_EMULATION_SQL = 1;
// Medium attributes constants related to replica consistency
const ATTR_SYNCWRITES = 2; // SYNC_WRITES flag support
const QOS_SYNCWRITES_NONE = 1; // replication only supports eventual consistency or less
const QOS_SYNCWRITES_BE = 2; // best effort synchronous with limited retries
const QOS_SYNCWRITES_QC = 3; // write quorum applied directly to state machines where R+W > N
const QOS_SYNCWRITES_SS = 4; // strict-serializable, nodes refuse reads if possible stale
// Generic "unknown" value that is useful for comparisons (e.g. always good enough)
const QOS_UNKNOWN = INF;
}

View file

@ -30,6 +30,12 @@ class MemcachedBagOStuff extends BagOStuff {
/** @var MemcachedClient|Memcached */
protected $client;
function __construct( array $params ) {
parent::__construct( $params );
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE; // unreliable
}
/**
* Fill in some defaults for missing keys in $params.
*

View file

@ -51,6 +51,8 @@ class RESTBagOStuff extends BagOStuff {
}
// Make sure URL ends with /
$this->url = rtrim( $params['url'], '/' ) . '/';
// Default config, R+W > N; no locks on reads though; writes go straight to state-machine
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_QC;
}
/**

View file

@ -83,6 +83,8 @@ class RedisBagOStuff extends BagOStuff {
} else {
$this->automaticFailover = true;
}
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
}
protected function doGet( $key, $flags = 0 ) {

View file

@ -97,6 +97,7 @@ class SqlBagOStuff extends BagOStuff {
parent::__construct( $params );
$this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
if ( isset( $params['servers'] ) ) {
$this->serverInfos = [];
@ -119,6 +120,7 @@ class SqlBagOStuff extends BagOStuff {
// Default to using the main wiki's database servers
$this->serverInfos = false;
$this->numServers = 1;
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE;
}
if ( isset( $params['purgePeriod'] ) ) {
$this->purgePeriod = intval( $params['purgePeriod'] );

View file

@ -121,4 +121,4 @@ wfLogProfilingData();
// Commit and close up!
$factory = wfGetLBFactory();
$factory->commitMasterChanges( 'doMaintenance' );
$factory->shutdown();
$factory->shutdown( $factory::SHUTDOWN_NO_CHRONPROT );