Add $wgChronologyProtectorStash and improve $wgMainStash comments

Remove WRITE_SYNC flag from ChronologyProtector since the current
plan is to simply use a datacenter-local storage cluster.

Move the touched timestamps into the same stash key that holds the
replication positions. Update the ChronologyProtector::getTouched()
comments.

Also:
* Use $wgMainCacheType as a $wgChronologyProtectorStash fallback
  since the main stash will be 'db-replicated' for most sites.
* Remove HashBagOStuff default for position store since that can
  result in timeouts waiting on a write position index to appear
  since the data does not actually persist accress requests.
* Rename ChronologyProtector::saveSessionReplicationPosition()
  since it does not actually save replication positions to storage.
* Make ChronologyProtector::getTouched() check the "enabled" field.
* Allow mocking the current time in ChronologyProtector.
* Mark some internal methods with @internal.
* Migrate various comments from $wgMainStash to BagOStuff.
* Update some other ObjectCache related comments.

Bug: T254634
Change-Id: I0456f5d40a558122a1b50baf4ab400c5cf0b623d
This commit is contained in:
Aaron Schulz 2021-02-04 20:18:47 -08:00 committed by Krinkle
parent 5b6782b1ff
commit bd7cf4dce9
11 changed files with 441 additions and 342 deletions

View file

@ -2277,6 +2277,10 @@ $wgLBFactoryConf = [ 'class' => \Wikimedia\Rdbms\LBFactorySimple::class ];
* After a state-changing request is done by a client, this determines
* how many seconds that client should keep using the master datacenter.
* This avoids unexpected stale or 404 responses due to replication lag.
*
* This must be greater than or equal to
* Wikimedia\Rdbms\ChronologyProtector::POSITION_COOKIE_TTL.
*
* @since 1.27
*/
$wgDataCenterUpdateStickTTL = 10;
@ -2616,16 +2620,16 @@ $wgObjectCaches = [
* By default, this will wrap $wgMainCacheType (which is disabled, since the basic
* stock default of CACHE_DB is not fast enough to make it worthwhile).
*
* For single server or single data-center setup, setting $wgMainCacheType
* For single server or single datacenter setup, setting $wgMainCacheType
* is enough.
*
* For a multiple data-center setup, WANObjectCache should be configured to
* For a multiple datacenter setup, WANObjectCache should be configured to
* broadcast some if its operations using Mcrouter or Dynomite.
* See @ref wanobjectcache-deployment "Deploying WANObjectCache".
*
* The options are:
* - false: Configure the cache using $wgMainCacheType, without using
* a relayer (only matters if there are multiple data-centers)
* a relayer (only matters if there are multiple datacenters)
* - CACHE_NONE: Do not cache
* - (other): A string may be used which identifies a cache
* configuration in $wgWANObjectCaches
@ -2681,40 +2685,47 @@ $wgEnableWANCacheReaper = false;
/**
* The object store type of the main stash.
*
* This store should be a very fast storage system optimized for holding lightweight data
* like incrementable hit counters and current user activity. The store should replicate the
* dataset among all data-centers. Any add(), merge(), lock(), and unlock() operations should
* maintain "best effort" linearizability; as long as connectivity is strong, latency is low,
* and there is no eviction pressure prompted by low free space, those operations should be
* linearizable. In terms of PACELC (https://en.wikipedia.org/wiki/PACELC_theorem), the store
* should act as a PA/EL distributed system for these operations. One optimization for these
* operations is to route them to a "primary" data-center (e.g. one that serves HTTP POST) for
* synchronous execution and then replicate to the others asynchronously. This means that at
* least calls to these operations during HTTP POST requests would quickly return.
* This should be a fast storage system optimized for lightweight data, both ephemeral and
* permanent, for things like counters, tokens, and blobs. The dataset access scope should
* include all the application servers in all datacenters. Thus, the data must be replicated
* among all datacenters. The store should have "Last Write Wins" eventual consistency. Per
* https://en.wikipedia.org/wiki/PACELC_theorem, the store should act as a PA/EL distributed
* system for these operations.
*
* All other operations, such as get(), set(), delete(), changeTTL(), incr(), and decr(),
* should be synchronous in the local data-center, replicating asynchronously to the others.
* This behavior can be overriden by the use of the WRITE_SYNC and READ_LATEST flags.
* The multi-datacenter strategy for MediaWiki is to have CDN route HTTP POST requests to the
* master datacenter and HTTP GET/HEAD/OPTIONS requests to the closest datacenter to the client.
* The stash accepts write operations from any datacenter, but cross-datacenter replication is
* asynchronous.
*
* The store should *preferably* have eventual consistency to handle network partitions.
*
* Modules that rely on the stash should be prepared for:
* - add(), merge(), lock(), and unlock() to be slower than other write operations,
* at least in "secondary" data-centers (e.g. one that only serves HTTP GET/HEAD)
* - Other write operations to have race conditions accross data-centers
* - Read operations to have race conditions accross data-centers
* - Consistency to be either eventual (with Last-Write-Wins) or just "best effort"
*
* In general, this means avoiding updates during idempotent HTTP requests (GET/HEAD) and
* avoiding assumptions of true linearizability (e.g. accepting anomalies). Modules that need
* these kind of guarantees should use other storage mediums.
* Modules that use the main stash can expect race conditions to occur if a key can receive
* write operations originating from multiple datacenters. To improve consistency, callers
* should avoid main stash updates during non-POST requests. In any case, callers should
* gracefully tolerate occasional key evictions, temporary inconsistencies among datacenters,
* and violations of linearizability (e.g. during timeouts). Modules that can never handle these
* kind of anamolies should use other storage mediums.
*
* Valid options are the keys of $wgObjectCaches, e.g. CACHE_* constants.
*
* @see BagOStuff
* @since 1.26
*/
$wgMainStash = 'db-replicated';
/**
* The object store type of the ChronologyProtector position store.
*
* This should be a fast storage system optimized for lightweight ephemeral data.
* The dataset access scope should include all application servers in the local datacenter.
* A set of single-key operations should maintain "best effort" linearizability (e.g. they
* observe linearizability unless connectivity/latency/disk problems arise).
*
* Valid options are the keys of $wgObjectCaches, e.g. CACHE_* constants.
*
* @see BagOStuff
* @since 1.36
*/
$wgChronologyProtectorStash = null;
/**
* The expiry time for the parser cache, in seconds.
* The default is 86400 (one day).

View file

@ -28,7 +28,6 @@ use Psr\Log\LoggerInterface;
use Wikimedia\AtEase;
use Wikimedia\Rdbms\ChronologyProtector;
use Wikimedia\Rdbms\DBConnectionError;
use Wikimedia\Rdbms\ILBFactory;
/**
* The MediaWiki class is the helper class for the index.php entry point.
@ -576,8 +575,6 @@ class MediaWiki {
exit;
}
}
// GUI-ify and stash the page output in MediaWiki::doPreOutputCommit() while
// ChronologyProtector synchronizes DB positions or replicas across all datacenters.
MWExceptionHandler::handleException( $e, MWExceptionHandler::CAUGHT_BY_ENTRYPOINT );
} catch ( Throwable $e ) {
// Type errors and such: at least handle it now and clean up the LBFactory state
@ -687,31 +684,44 @@ class MediaWiki {
// Run updates that need to block the client or affect output (this is the last chance)
DeferredUpdates::doUpdates( 'run', DeferredUpdates::PRESEND );
wfDebug( __METHOD__ . ': pre-send deferred updates completed' );
// Persist the session to avoid race conditions on subsequent requests by the client
$request->getSession()->save(); // T214471
wfDebug( __METHOD__ . ': session changes committed' );
// Figure out whether to wait for DB replication now or to use some method that assures
// that subsequent requests by the client will use the DB replication positions written
// during the shutdown() call below; the later requires working around replication lag
// of the store containing DB replication positions (e.g. dynomite, mcrouter).
list( $flags, $strategy ) = self::getChronProtStrategy( $lbFactory, $output );
// Record ChronologyProtector positions for DBs affected in this request at this point
// Subsequent requests by the client should see the DB replication positions written
// during the shutdown() call below, even if the position store itself has asynchronous
// replication. Setting the cpPosIndex cookie is normally enough. However, this might not
// work for cross-domain redirects to foreign wikis, so set the ?cpPoxIndex in that case.
$isCrossWikiRedirect = (
$output->getRedirect() &&
$lbFactory->hasOrMadeRecentMasterChanges( INF ) &&
self::getUrlDomainDistance( $output->getRedirect() ) === 'remote'
);
// Persist replication positions for DBs modified by this request (at this point).
// These help provide "session consistency" for the client on their next requests.
$cpIndex = null;
$cpClientId = null;
$lbFactory->shutdown( $flags, $postCommitWork, $cpIndex, $cpClientId );
$lbFactory->shutdown(
$lbFactory::SHUTDOWN_NORMAL,
$postCommitWork,
$cpIndex,
$cpClientId
);
$now = time();
$allowHeaders = !( $output->isDisabled() || headers_sent() );
if ( $cpIndex > 0 ) {
if ( $allowHeaders ) {
$now = time();
$expires = $now + ChronologyProtector::POSITION_COOKIE_TTL;
$options = [ 'prefix' => '' ];
$value = $lbFactory::makeCookieValueFromCPIndex( $cpIndex, $now, $cpClientId );
$request->response()->setCookie( 'cpPosIndex', $value, $expires, $options );
}
if ( $strategy === 'cookie+url' ) {
if ( $isCrossWikiRedirect ) {
if ( $output->getRedirect() ) { // sanity
$safeUrl = $lbFactory->appendShutdownCPIndexAsQuery(
$output->getRedirect(),
@ -732,7 +742,10 @@ class MediaWiki {
// handles this POST request (e.g. the "master" data center). Also have the user
// briefly bypass CDN so ChronologyProtector works for cacheable URLs.
if ( $request->wasPosted() && $lbFactory->hasOrMadeRecentMasterChanges() ) {
$expires = time() + $config->get( 'DataCenterUpdateStickTTL' );
$expires = $now + max(
ChronologyProtector::POSITION_COOKIE_TTL,
$config->get( 'DataCenterUpdateStickTTL' )
);
$options = [ 'prefix' => '' ];
$request->response()->setCookie( 'UseDC', 'master', $expires, $options );
$request->response()->setCookie( 'UseCDNCache', 'false', $expires, $options );
@ -773,38 +786,6 @@ class MediaWiki {
}
}
/**
* @param ILBFactory $lbFactory
* @param OutputPage $output
* @return array
*/
private static function getChronProtStrategy( ILBFactory $lbFactory, OutputPage $output ) {
// Should the client return, their request should observe the new ChronologyProtector
// DB positions. This request might be on a foreign wiki domain, so synchronously update
// the DB positions in all datacenters to be safe. If this output is not a redirect,
// then OutputPage::output() will be relatively slow, meaning that running it in
// $postCommitWork should help mask the latency of those updates.
$flags = $lbFactory::SHUTDOWN_CHRONPROT_SYNC;
$strategy = 'cookie+sync';
$allowHeaders = !( $output->isDisabled() || headers_sent() );
if ( $output->getRedirect() && $lbFactory->hasOrMadeRecentMasterChanges( INF ) ) {
// OutputPage::output() will be fast, so $postCommitWork is useless for masking
// the latency of synchronously updating the DB positions in all datacenters.
// Try to make use of the time the client spends following redirects instead.
$domainDistance = self::getUrlDomainDistance( $output->getRedirect() );
if ( $domainDistance === 'local' && $allowHeaders ) {
$flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
$strategy = 'cookie'; // use same-domain cookie and keep the URL uncluttered
} elseif ( $domainDistance === 'remote' ) {
$flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
$strategy = 'cookie+url'; // cross-domain cookie might not work
}
}
return [ $flags, $strategy ];
}
/**
* @param string $url
* @return string Either "local", "remote" if in the farm, "external" otherwise
@ -943,8 +924,7 @@ class MediaWiki {
// Actually do the work of the request and build up any output
$this->performRequest();
// GUI-ify and stash the page output in MediaWiki::doPreOutputCommit() while
// ChronologyProtector synchronizes DB positions or replicas across all datacenters.
// GUI-ify and stash the page output in MediaWiki::doPreOutputCommit()
$buffer = null;
$outputWork = static function () use ( $output, &$buffer ) {
if ( $buffer === null ) {

View file

@ -382,15 +382,15 @@ return [
static function ( MediaWikiServices $services ) : Wikimedia\Rdbms\LBFactory {
$mainConfig = $services->getMainConfig();
try {
$stash = $services->getMainObjectStash();
} catch ( RecursiveServiceDependencyException $e ) {
$stash = new EmptyBagOStuff(); // T141804: handle cases like CACHE_DB
}
if ( $stash instanceof EmptyBagOStuff ) {
// Use process cache if the main stash is disabled or there was recursion
$stash = new HashBagOStuff( [ 'maxKeys' => 100 ] );
$cpStashType = $mainConfig->get( 'ChronologyProtectorStash' );
if ( is_string( $cpStashType ) ) {
$cpStash = ObjectCache::getInstance( $cpStashType );
} else {
try {
$cpStash = ObjectCache::getLocalClusterInstance();
} catch ( RecursiveServiceDependencyException $e ) {
$cpStash = new EmptyBagOStuff(); // T141804: handle cases like CACHE_DB
}
}
try {
@ -409,8 +409,8 @@ return [
$mainConfig->get( 'LBFactoryConf' ),
new ServiceOptions( MWLBFactory::APPLY_DEFAULT_CONFIG_OPTIONS, $mainConfig ),
$services->getConfiguredReadOnlyMode(),
$cpStash,
$srvCache,
$stash,
$wanCache
);

View file

@ -63,8 +63,8 @@ abstract class MWLBFactory {
* @param array $lbConf Config for LBFactory::__construct()
* @param ServiceOptions $options
* @param ConfiguredReadOnlyMode $readOnlyMode
* @param BagOStuff $cpStash
* @param BagOStuff $srvCache
* @param BagOStuff $mainStash
* @param WANObjectCache $wanCache
* @return array
* @internal For use with service wiring
@ -73,8 +73,8 @@ abstract class MWLBFactory {
array $lbConf,
ServiceOptions $options,
ConfiguredReadOnlyMode $readOnlyMode,
BagOStuff $cpStash,
BagOStuff $srvCache,
BagOStuff $mainStash,
WANObjectCache $wanCache
) {
$options->assertRequiredOptions( self::APPLY_DEFAULT_CONFIG_OPTIONS );
@ -156,8 +156,8 @@ abstract class MWLBFactory {
$options->get( 'DBprefix' )
);
$lbConf['cpStash'] = $cpStash;
$lbConf['srvCache'] = $srvCache;
$lbConf['memStash'] = $mainStash;
$lbConf['wanCache'] = $wanCache;
return $lbConf;

View file

@ -39,21 +39,16 @@ use Wikimedia\ScopedCallback;
*
* This interface is intended to be more or less compatible with the PHP memcached client.
*
* Instances of this class should be created with an intended access scope, such as:
* Class instances should be created with an intended access scope for the dataset, such as:
* - a) A single PHP thread on a server (e.g. stored in a PHP variable)
* - b) A single application server (e.g. stored in APC or sqlite)
* - c) All application servers in datacenter (e.g. stored in memcached or mysql)
* - d) All application servers in all datacenters (e.g. stored via mcrouter or dynomite)
*
* Callers should use the proper factory methods that yield BagOStuff instances. Site admins
* should make sure the configuration for those factory methods matches their access scope.
* BagOStuff subclasses have widely varying levels of support for replication features.
*
* For any given instance, methods like lock(), unlock(), merge(), and set() with WRITE_SYNC
* should semantically operate over its entire access scope; any nodes/threads in that scope
* should serialize appropriately when using them. Likewise, a call to get() with READ_LATEST
* from one node in its access scope should reflect the prior changes of any other node its
* access scope. Any get() should reflect the changes of any prior set() with WRITE_SYNC.
* should make sure that the configuration for those factory methods match their access scope.
* BagOStuff subclasses have widely varying levels of support replication features within and
* among datacenters.
*
* Subclasses should override the default "segmentationSize" field with an appropriate value.
* The value should not be larger than what the storage backend (by default) supports. It also
@ -61,6 +56,25 @@ use Wikimedia\ScopedCallback;
* having poor scalability). The same goes for the "segmentedValueMaxSize" member, which limits
* the maximum size and chunk count (indirectly) of values.
*
* A few notes about data consistency for BagOStuff instances:
* - Read operation methods, e.g. get(), should be synchronous in the local datacenter.
* When used with READ_LATEST, such operations should reflect any prior writes originating
* from the local datacenter (e.g. by avoiding replica DBs or invoking quorom reads).
* - Write operation methods, e.g. set(), should be synchronous in the local datacenter, with
* asynchronous cross-datacenter replication. This replication can be either "best effort"
* or eventually consistent. When used with WRITE_SYNC, such operations will wait until all
* datacenters are updated or a timeout occurs. If the write succeeded, then any subsequent
* get() operations with READ_LATEST, regardless of datacenter, should reflect the changes.
* - Locking operation methods, e.g. lock(), unlock(), and getScopedLock(), should only apply
* to the local datacenter.
* - Any set of single-key write operation method calls originating from a single datacenter
* should observe "best effort" linearizability. Any set of single-key write operations using
* WRITE_SYNC, regardless of the datacenter, should observe "best effort" linearizability.
* In this context, "best effort" means that consistency holds as long as connectivity is
* strong, network latency is low, and there are no relevant storage server failures.
* Per https://en.wikipedia.org/wiki/PACELC_theorem, the store should act as a PA/EL
* distributed system for these operations.
*
* @stable to extend
* @ingroup Cache
*/

View file

@ -52,28 +52,41 @@ class ChronologyProtector implements LoggerAwareInterface {
protected $clientLogInfo;
/** @var int|null Expected minimum index of the last write to the position store */
protected $waitForPosIndex;
/** @var int Max seconds to wait on positions to appear */
protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
/** @var bool Whether to no-op all method calls */
/** @var bool Whether reading/writing session consistency replication positions is enabled */
protected $enabled = true;
/** @var bool Whether to check and wait on positions */
protected $wait = true;
/** @var bool Whether waiting on DB servers to reach replication positions is enabled */
protected $positionWaitsEnabled = true;
/** @var float|null UNIX timestamp when the client data was loaded */
protected $startupTimestamp;
/** @var bool Whether the client data was loaded */
protected $initialized = false;
/** @var DBMasterPos[] Map of (DB master name => position) */
protected $startupPositions = [];
/** @var DBMasterPos[] Map of (DB master name => position) */
protected $shutdownPositions = [];
/** @var float[] Map of (DB master name => 1) */
protected $shutdownTouchDBs = [];
/** @var array<string,DBMasterPos> Map of (DB master name => position) */
protected $startupPositionsByMaster = [];
/** @var array<string,DBMasterPos> Map of (DB master name => position) */
protected $shutdownPositionsByMaster = [];
/** @var array<string,float> Map of (DB cluster name => UNIX timestamp) */
protected $startupTimestampsByCluster = [];
/** @var array<string,float> Map of (DB cluster name => UNIX timestamp) */
protected $shutdownTimestampsByCluster = [];
/** @var int Seconds to store positions */
public const POSITION_TTL = 60;
/** @var int Seconds to store position write index cookies (safely less than POSITION_TTL) */
/** @var float|null */
private $wallClockOverride;
/** Seconds to store position write index cookies (safely less than POSITION_STORE_TTL) */
public const POSITION_COOKIE_TTL = 10;
/** @var int Max time to wait for positions to appear */
private const POS_STORE_WAIT_TIMEOUT = 5;
/** Seconds to store replication positions */
private const POSITION_STORE_TTL = 60;
/** Max seconds to wait for positions write indexes to appear (e.g. replicate) in storage */
private const POSITION_INDEX_WAIT_TIMEOUT = 5;
/** Lock timeout to use for key updates */
private const LOCK_TIMEOUT = 3;
/** Lock expiry to use for key updates */
private const LOCK_TTL = 6;
private const FLD_POSITIONS = 'positions';
private const FLD_TIMESTAMPS = 'timestamps';
private const FLD_WRITE_INDEX = 'writeIndex';
/**
* @param BagOStuff $store
@ -84,6 +97,7 @@ class ChronologyProtector implements LoggerAwareInterface {
*/
public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
$this->store = $store;
if ( isset( $client['clientId'] ) ) {
$this->clientId = $client['clientId'];
} else {
@ -116,7 +130,7 @@ class ChronologyProtector implements LoggerAwareInterface {
}
/**
* @param bool $enabled Whether to no-op all method calls
* @param bool $enabled Whether reading/writing session replication positions is enabled
* @since 1.27
*/
public function setEnabled( $enabled ) {
@ -124,11 +138,11 @@ class ChronologyProtector implements LoggerAwareInterface {
}
/**
* @param bool $enabled Whether to check and wait on positions
* @param bool $enabled Whether session replication position wait barriers are enable
* @since 1.27
*/
public function setWaitEnabled( $enabled ) {
$this->wait = $enabled;
$this->positionWaitsEnabled = $enabled;
}
/**
@ -139,254 +153,319 @@ class ChronologyProtector implements LoggerAwareInterface {
* to that position by delaying execution. The delay may timeout and allow stale
* data if no non-lagged replica DBs are available.
*
* This method should only be called from LBFactory.
* @internal This method should only be called from LBFactory.
*
* @param ILoadBalancer $lb
* @return void
*/
public function applySessionReplicationPosition( ILoadBalancer $lb ) {
if ( !$this->enabled ) {
return; // disabled
if ( !$this->enabled || !$this->positionWaitsEnabled ) {
return;
}
$cluster = $lb->getClusterName();
$masterName = $lb->getServerName( $lb->getWriterIndex() );
$startupPositions = $this->getStartupMasterPositions();
$pos = $startupPositions[$masterName] ?? null;
$pos = $this->getStartupSessionPositions()[$masterName] ?? null;
if ( $pos instanceof DBMasterPos ) {
$this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'" );
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) position is '$pos'" );
$lb->waitFor( $pos );
} else {
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no position" );
}
}
/**
* Save the "session consistency" DB replication position for an end-of-life ILoadBalancer
* Update the "session consistency" DB replication position for an end-of-life ILoadBalancer
*
* This saves the replication position of the master DB if this request made writes to it.
* This remarks the replication position of the master DB if this request made writes to
* it using the provided ILoadBalancer instance.
*
* This method should only be called from LBFactory.
* @internal This method should only be called from LBFactory.
*
* @param ILoadBalancer $lb
* @return void
*/
public function storeSessionReplicationPosition( ILoadBalancer $lb ) {
if ( !$this->enabled ) {
return; // disabled
} elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
// Only save the position if writes have been done on the connection
public function stageSessionReplicationPosition( ILoadBalancer $lb ) {
if ( !$this->enabled || !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
return;
}
$cluster = $lb->getClusterName();
$masterName = $lb->getServerName( $lb->getWriterIndex() );
if ( $lb->hasStreamingReplicaServers() ) {
$pos = $lb->getReplicaResumePos();
if ( $pos ) {
$this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos" );
$this->shutdownPositions[$masterName] = $pos;
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
$this->shutdownPositionsByMaster[$masterName] = $pos;
$this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
} else {
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
$this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
}
} else {
$this->logger->debug( __METHOD__ . ": DB '$masterName' touched" );
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
$this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
}
$this->shutdownTouchDBs[$masterName] = 1;
}
/**
* Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
* May commit chronology data to persistent storage.
* Save any remarked "session consistency" DB replication positions to persistent storage
*
* @internal This method should only be called from LBFactory.
*
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
* @param string $mode One of (sync, async); whether to wait on remote datacenters
* @param int|null &$cpIndex DB position key write counter; incremented on update
* @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
* @return DBMasterPos[] Empty on success; map of (db name => unsaved position) on failure
*/
public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
public function shutdown( &$cpIndex = null ) {
if ( !$this->enabled ) {
return [];
}
$store = $this->store;
// Some callers might want to know if a user recently touched a DB.
// These writes do not need to block on all datacenters receiving them.
foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
$store->set(
$this->getTouchedKey( $this->store, $dbName ),
microtime( true ),
$store::TTL_DAY
);
if ( !$this->shutdownTimestampsByCluster ) {
$this->logger->debug( __METHOD__ . ": no master positions/timestamps to save" );
return [];
}
if ( $this->shutdownPositions === [] ) {
$this->logger->debug( __METHOD__ . ": no master positions to save" );
return []; // nothing to save
}
$this->logger->debug(
__METHOD__ . ": saving master pos for " .
implode( ', ', array_keys( $this->shutdownPositions ) )
);
// CP-protected writes should overwhelmingly go to the master datacenter, so merge the
// positions with a DC-local lock, a DC-local get(), and an all-DC set() with WRITE_SYNC.
// If set() returns success, then any get() should be able to see the new positions.
if ( $store->lock( $this->key, 3 ) ) {
if ( $workCallback ) {
// Let the store run the work before blocking on a replication sync barrier.
// If replication caught up while the work finished, the barrier will be fast.
$store->addBusyCallback( $workCallback );
}
$ok = $store->set(
$scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
if ( $scopeLock ) {
$ok = $this->store->set(
$this->key,
$this->mergePositions(
$store->get( $this->key ),
$this->shutdownPositions,
$this->store->get( $this->key ),
$this->shutdownPositionsByMaster,
$this->shutdownTimestampsByCluster,
$cpIndex
),
self::POSITION_TTL,
( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
self::POSITION_STORE_TTL
);
$store->unlock( $this->key );
unset( $scopeLock );
} else {
$ok = false;
}
if ( !$ok ) {
$cpIndex = null; // nothing saved
$bouncedPositions = $this->shutdownPositions;
// Raced out too many times or stash is down
$this->logger->warning( __METHOD__ . ": failed to save master pos for " .
implode( ', ', array_keys( $this->shutdownPositions ) )
);
} elseif ( $mode === 'sync' &&
$store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
) {
// Positions may not be in all datacenters, force LBFactory to play it safe
$this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
$bouncedPositions = $this->shutdownPositions;
} else {
$clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
if ( $ok ) {
$bouncedPositions = [];
$this->logger->debug(
__METHOD__ . ": saved master positions/timestamp for DB cluster(s) $clusterList"
);
} else {
$cpIndex = null; // nothing saved
$bouncedPositions = $this->shutdownPositionsByMaster;
// Raced out too many times or stash is down
$this->logger->warning(
__METHOD__ . ": failed to save master positions for DB cluster(s) $clusterList"
);
}
return $bouncedPositions;
}
/**
* @param ILoadBalancer $lb The load balancer. Prior to 1.35, the first parameter was the
* master name.
* @return float|bool UNIX timestamp when client last touched the DB; false if not on record
* Get the UNIX timestamp when the client last touched the DB, if they did so recently
*
* @internal This method should only be called from LBFactory.
*
* @param ILoadBalancer $lb
* @return float|false UNIX timestamp; false if not recent or on record
* @since 1.35
*/
public function getTouched( ILoadBalancer $lb ) {
$masterName = $lb->getServerName( $lb->getWriterIndex() );
return $this->store->get( $this->getTouchedKey( $this->store, $masterName ) );
}
/**
* @param BagOStuff $store
* @param string $masterName
* @return string
*/
private function getTouchedKey( BagOStuff $store, $masterName ) {
return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $masterName );
}
/**
* Load in previous master positions for the client
* @return DBMasterPos[]
*/
protected function getStartupMasterPositions() {
if ( $this->initialized ) {
return $this->startupPositions;
if ( !$this->enabled ) {
return false;
}
$this->initialized = true;
$this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)" );
$cluster = $lb->getClusterName();
if ( $this->wait ) {
// If there is an expectation to see master positions from a certain write
// index or higher, then block until it appears, or until a timeout is reached.
// Since the write index restarts each time the key is created, it is possible that
// a lagged store has a matching key write index. However, in that case, it should
// already be expired and thus treated as non-existing, maintaining correctness.
if ( $this->waitForPosIndex > 0 ) {
$data = null;
$indexReached = null; // highest index reached in the position store
$loop = new WaitConditionLoop(
function () use ( &$data, &$indexReached ) {
$data = $this->store->get( $this->key );
if ( !is_array( $data ) ) {
return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
} elseif ( !isset( $data['writeIndex'] ) ) {
return WaitConditionLoop::CONDITION_REACHED; // b/c
}
$indexReached = max( $data['writeIndex'], $indexReached );
return ( $data['writeIndex'] >= $this->waitForPosIndex )
? WaitConditionLoop::CONDITION_REACHED
: WaitConditionLoop::CONDITION_CONTINUE;
},
$this->waitForPosStoreTimeout
);
$result = $loop->invoke();
$waitedMs = $loop->getLastWaitTime() * 1e3;
if ( $result == $loop::CONDITION_REACHED ) {
$this->logger->debug(
__METHOD__ . ": expected and found position index.",
[
'cpPosIndex' => $this->waitForPosIndex,
'waitTimeMs' => $waitedMs
] + $this->clientLogInfo
);
} else {
$this->logger->warning(
__METHOD__ . ": expected but failed to find position index.",
[
'cpPosIndex' => $this->waitForPosIndex,
'indexReached' => $indexReached,
'waitTimeMs' => $waitedMs
] + $this->clientLogInfo
);
}
} else {
$data = $this->store->get( $this->key );
}
$this->startupPositions = $data ? $data['positions'] : [];
$this->logger->debug( __METHOD__ . ": key is {$this->key} (read)" );
$timestampsByCluster = $this->getStartupSessionTimestamps();
$timestamp = $timestampsByCluster[$cluster] ?? null;
if ( $timestamp === null ) {
$recentTouchTimestamp = false;
} elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
// If the position store is not replicated among datacenters and the cookie that
// sticks the client to the primary datacenter expires, then the touch timestamp
// will be found for requests in one datacenter but not others. For consistency,
// return false once the user is no longer routed to the primary datacenter.
$recentTouchTimestamp = false;
$this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
} else {
$this->startupPositions = [];
$this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)" );
$recentTouchTimestamp = $timestamp;
$this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
}
return $this->startupPositions;
return $recentTouchTimestamp;
}
/**
* @param array|bool $curValue
* @param DBMasterPos[] $shutdownPositions
* @param int|null &$cpIndex
* @return array
* @return array<string,DBMasterPos>
*/
protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
/** @var DBMasterPos[] $curPositions */
$curPositions = $curValue['positions'] ?? [];
// Use the newest positions for each DB master
foreach ( $shutdownPositions as $db => $pos ) {
if (
!isset( $curPositions[$db] ) ||
!( $curPositions[$db] instanceof DBMasterPos ) ||
$pos->asOfTime() > $curPositions[$db]->asOfTime()
) {
$curPositions[$db] = $pos;
protected function getStartupSessionPositions() {
$this->lazyStartup();
return $this->startupPositionsByMaster;
}
/**
* @return array<string,float>
*/
protected function getStartupSessionTimestamps() {
$this->lazyStartup();
return $this->startupTimestampsByCluster;
}
/**
* Load the stored DB replication positions and touch timestamps for the client
*
* @return void
*/
protected function lazyStartup() {
if ( $this->startupTimestamp !== null ) {
return;
}
$this->startupTimestamp = $this->getCurrentTime();
$this->logger->debug(
__METHOD__ .
": client ID is {$this->clientId}; key is {$this->key}"
);
// If there is an expectation to see master positions from a certain write
// index or higher, then block until it appears, or until a timeout is reached.
// Since the write index restarts each time the key is created, it is possible that
// a lagged store has a matching key write index. However, in that case, it should
// already be expired and thus treated as non-existing, maintaining correctness.
if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
$data = null;
$indexReached = null; // highest index reached in the position store
$loop = new WaitConditionLoop(
function () use ( &$data, &$indexReached ) {
$data = $this->store->get( $this->key );
if ( !is_array( $data ) ) {
return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
} elseif ( !isset( $data[self::FLD_WRITE_INDEX] ) ) {
return WaitConditionLoop::CONDITION_REACHED; // b/c
}
$indexReached = max( $data[self::FLD_WRITE_INDEX], $indexReached );
return ( $data[self::FLD_WRITE_INDEX] >= $this->waitForPosIndex )
? WaitConditionLoop::CONDITION_REACHED
: WaitConditionLoop::CONDITION_CONTINUE;
},
self::POSITION_INDEX_WAIT_TIMEOUT
);
$result = $loop->invoke();
$waitedMs = $loop->getLastWaitTime() * 1e3;
if ( $result == $loop::CONDITION_REACHED ) {
$this->logger->debug(
__METHOD__ . ": expected and found position index {cpPosIndex}.",
[
'cpPosIndex' => $this->waitForPosIndex,
'waitTimeMs' => $waitedMs
] + $this->clientLogInfo
);
} else {
$this->logger->warning(
__METHOD__ . ": expected but failed to find position index {cpPosIndex}.",
[
'cpPosIndex' => $this->waitForPosIndex,
'indexReached' => $indexReached,
'waitTimeMs' => $waitedMs
] + $this->clientLogInfo
);
}
} else {
$data = $this->store->get( $this->key );
$indexReached = $data[self::FLD_WRITE_INDEX] ?? null;
if ( $indexReached ) {
$this->logger->debug(
__METHOD__ . ": found position/timestamp data with index {indexReached}.",
[ 'indexReached' => $indexReached ] + $this->clientLogInfo
);
}
}
$cpIndex = $curValue['writeIndex'] ?? 0;
$this->startupPositionsByMaster = $data ? $data[self::FLD_POSITIONS] : [];
$this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
}
/**
* Merge the new DB replication positions with the currently stored ones (highest wins)
*
* @param array<string,mixed>|false $storedValue Current DB replication position data
* @param array<string,DBMasterPos> $shutdownPositions New DB replication positions
* @param array<string,float> $shutdownTimestamps New DB post-commit shutdown timestamps
* @param int|null &$cpIndex New position write index
* @return array<string,mixed> Combined DB replication position data
*/
protected function mergePositions(
$storedValue,
array $shutdownPositions,
array $shutdownTimestamps,
&$cpIndex = null
) {
/** @var array<string,DBMasterPos> $mergedPositions */
$mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
// Use the newest positions for each DB master
foreach ( $shutdownPositions as $masterName => $pos ) {
if (
!isset( $mergedPositions[$masterName] ) ||
!( $mergedPositions[$masterName] instanceof DBMasterPos ) ||
$pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
) {
$mergedPositions[$masterName] = $pos;
}
}
/** @var array<string,float> $mergedTimestamps */
$mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
// Use the newest touch timestamp for each DB master
foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
if (
!isset( $mergedTimestamps[$cluster] ) ||
$timestamp > $mergedTimestamps[$cluster]
) {
$mergedTimestamps[$cluster] = $timestamp;
}
}
$cpIndex = $storedValue[self::FLD_WRITE_INDEX] ?? 0;
return [
'positions' => $curPositions,
'writeIndex' => ++$cpIndex
self::FLD_POSITIONS => $mergedPositions,
self::FLD_TIMESTAMPS => $mergedTimestamps,
self::FLD_WRITE_INDEX => ++$cpIndex
];
}
/**
* @internal For testing only
* @return float UNIX timestamp
* @codeCoverageIgnore
*/
protected function getCurrentTime() {
if ( $this->wallClockOverride ) {
return $this->wallClockOverride;
}
$clockTime = (float)time(); // call this first
// microtime() can severely drift from time() and the microtime() value of other threads.
// Instead of seeing the current time as being in the past, use the value of time().
return max( microtime( true ), $clockTime );
}
/**
* @internal For testing only
* @param float|null &$time Mock UNIX timestamp
* @codeCoverageIgnore
*/
public function setMockTime( &$time ) {
$this->wallClockOverride =& $time;
}
}

View file

@ -31,12 +31,10 @@ use InvalidArgumentException;
* @since 1.28
*/
interface ILBFactory {
/** @var int Don't save DB positions at all */
public const SHUTDOWN_NO_CHRONPROT = 0; // don't save DB positions at all
/** @var int Save DB positions, but don't wait on remote DCs */
public const SHUTDOWN_CHRONPROT_ASYNC = 1;
/** @var int Save DB positions, waiting on all DCs */
public const SHUTDOWN_CHRONPROT_SYNC = 2;
/** Idion for "no special shutdown flags" */
public const SHUTDOWN_NORMAL = 0;
/** Do not save "session consistency" DB replication positions */
public const SHUTDOWN_NO_CHRONPROT = 1;
/** @var string Default main LB cluster name (do not change this) */
public const CLUSTER_MAIN_DEFAULT = 'DEFAULT';
@ -50,7 +48,7 @@ interface ILBFactory {
* - localDomain: A DatabaseDomain or domain ID string.
* - readOnlyReason: Reason the master DB is read-only if so [optional]
* - srvCache: BagOStuff object for server cache [optional]
* - memStash: BagOStuff object for cross-datacenter memory storage [optional]
* - cpStash: BagOStuff object for ChronologyProtector store [optional]
* - wanCache: WANObjectCache object [optional]
* - hostname: The name of the current server [optional]
* - cliMode: Whether the execution context is a CLI script. [optional]
@ -177,13 +175,13 @@ interface ILBFactory {
/**
* Prepare all currently tracked (instantiated) load balancers for shutdown
*
* @param int $mode One of the class SHUTDOWN_* constants
* @param int $flags Bit field of ILBFactory::SHUTDOWN_* constants
* @param callable|null $workCallback Work to mask ChronologyProtector writes
* @param int|null &$cpIndex Position key write counter for ChronologyProtector
* @param string|null &$cpClientId Client ID hash for ChronologyProtector
* @param int|null &$cpIndex Position key write counter for ChronologyProtector [returned]
* @param string|null &$cpClientId Client ID hash for ChronologyProtector [returned]
*/
public function shutdown(
$mode = self::SHUTDOWN_CHRONPROT_SYNC,
$flags = self::SHUTDOWN_NORMAL,
callable $workCallback = null,
&$cpIndex = null,
&$cpClientId = null
@ -330,8 +328,10 @@ interface ILBFactory {
public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] );
/**
* Get the UNIX timestamp when the client last touched the DB, if they did so recently
*
* @param DatabaseDomain|string|bool $domain Domain ID, or false for the current domain
* @return float|bool UNIX timestamp when client last touched the DB or false if not recent
* @return float|false UNIX timestamp; false if not recent or on record
*/
public function getChronologyProtectorTouched( $domain = false );

View file

@ -62,9 +62,9 @@ abstract class LBFactory implements ILBFactory {
private $deprecationLogger;
/** @var BagOStuff */
protected $srvCache;
protected $cpStash;
/** @var BagOStuff */
protected $memStash;
protected $srvCache;
/** @var WANObjectCache */
protected $wanCache;
@ -134,8 +134,8 @@ abstract class LBFactory implements ILBFactory {
$this->readOnlyReason = $conf['readOnlyReason'];
}
$this->cpStash = $conf['cpStash'] ?? new EmptyBagOStuff();
$this->srvCache = $conf['srvCache'] ?? new EmptyBagOStuff();
$this->memStash = $conf['memStash'] ?? new EmptyBagOStuff();
$this->wanCache = $conf['wanCache'] ?? WANObjectCache::newEmpty();
foreach ( self::$loggerFields as $key ) {
@ -215,7 +215,7 @@ abstract class LBFactory implements ILBFactory {
}
public function shutdown(
$mode = self::SHUTDOWN_CHRONPROT_SYNC,
$flags = self::SHUTDOWN_NORMAL,
callable $workCallback = null,
&$cpIndex = null,
&$cpClientId = null
@ -224,12 +224,10 @@ abstract class LBFactory implements ILBFactory {
$scope = ScopedCallback::newScopedIgnoreUserAbort();
$chronProt = $this->getChronologyProtector();
if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
$this->shutdownChronologyProtector( $chronProt, $workCallback, 'sync', $cpIndex );
} elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
$this->shutdownChronologyProtector( $chronProt, null, 'async', $cpIndex );
if ( ( $flags & self::SHUTDOWN_NO_CHRONPROT ) != self::SHUTDOWN_NO_CHRONPROT ) {
$this->shutdownChronologyProtector( $chronProt, $workCallback, $cpIndex );
$this->replLogger->debug( __METHOD__ . ': finished ChronologyProtector shutdown' );
}
$cpClientId = $chronProt->getClientId();
$this->commitMasterChanges( __METHOD__ ); // sanity
@ -554,7 +552,7 @@ abstract class LBFactory implements ILBFactory {
}
$this->chronProt = new ChronologyProtector(
$this->memStash,
$this->cpStash,
[
'ip' => $this->requestInfo['IPAddress'],
'agent' => $this->requestInfo['UserAgent'],
@ -571,10 +569,10 @@ abstract class LBFactory implements ILBFactory {
// Request opted out of using position wait logic. This is useful for requests
// done by the job queue or background ETL that do not have a meaningful session.
$this->chronProt->setWaitEnabled( false );
} elseif ( $this->memStash instanceof EmptyBagOStuff ) {
} elseif ( $this->cpStash instanceof EmptyBagOStuff ) {
// No where to store any DB positions and wait for them to appear
$this->chronProt->setEnabled( false );
$this->replLogger->info( 'Cannot use ChronologyProtector with EmptyBagOStuff' );
$this->replLogger->debug( 'Cannot use ChronologyProtector with EmptyBagOStuff' );
}
$this->replLogger->debug(
@ -590,27 +588,25 @@ abstract class LBFactory implements ILBFactory {
*
* @param ChronologyProtector $cp
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
* @param string $mode One of (sync, async); whether to wait on remote datacenters
* @param int|null &$cpIndex DB position key write counter; incremented on update
* @param int|null &$cpIndex DB position key write counter; incremented on update [returned]
*/
protected function shutdownChronologyProtector(
ChronologyProtector $cp, $workCallback, $mode, &$cpIndex = null
ChronologyProtector $cp, $workCallback, &$cpIndex = null
) {
// Record all the master positions needed
// Remark all of the relevant DB master positions
$this->forEachLB( static function ( ILoadBalancer $lb ) use ( $cp ) {
$cp->storeSessionReplicationPosition( $lb );
$cp->stageSessionReplicationPosition( $lb );
} );
// Write them to the persistent stash. Try to do something useful by running $work
// while ChronologyProtector waits for the stash write to replicate to all DCs.
$unsavedPositions = $cp->shutdown( $workCallback, $mode, $cpIndex );
// Write the positions to the persistent stash
$unsavedPositions = $cp->shutdown( $cpIndex );
if ( $unsavedPositions && $workCallback ) {
// Invoke callback in case it did not cache the result yet
$workCallback(); // work now to block for less time in waitForAll()
$workCallback();
}
// If the positions failed to write to the stash, at least wait on local datacenter
// replica DBs to catch up before responding. Even if there are several DCs, this increases
// the chance that the user will see their own changes immediately afterwards. As long
// as the sticky DC cookie applies (same domain), this is not even an issue.
// If the positions failed to write to the stash, then wait on the local datacenter
// replica DBs to catch up before sending an HTTP response. As long as the request that
// caused such DB writes occurred in the master datacenter, and clients are temporarily
// pinned to the master datacenter after causing DB writes, then this should suffice.
$this->forEachLB( static function ( ILoadBalancer $lb ) use ( $unsavedPositions ) {
$masterName = $lb->getServerName( $lb->getWriterIndex() );
if ( isset( $unsavedPositions[$masterName] ) ) {
@ -814,4 +810,12 @@ abstract class LBFactory implements ILBFactory {
public function __destruct() {
$this->destroy();
}
/**
* @param float|null &$time Mock UNIX timestamp for testing
* @codeCoverageIgnore
*/
public function setMockTime( &$time ) {
$this->getChronologyProtector()->setMockTime( $time );
}
}

View file

@ -37,10 +37,6 @@ use MediaWiki\MediaWikiServices;
* A place to store lightweight data that is not canonically
* stored anywhere else (e.g. a "hoard" of objects).
*
* The former should always use strongly consistent stores, so callers don't
* have to deal with stale reads. The latter may be eventually consistent, but
* callers can use BagOStuff:READ_LATEST to see the latest available data.
*
* Primary entry points:
*
* - ObjectCache::getLocalServerInstance( $fallbackType )

View file

@ -27,7 +27,7 @@ class TestSetup {
*/
public static function applyInitialConfig() {
global $wgMainCacheType, $wgMessageCacheType, $wgParserCacheType, $wgMainWANCache, $wgSessionCacheType;
global $wgMainStash;
global $wgMainStash, $wgChronologyProtectorStash;
global $wgObjectCaches;
global $wgLanguageConverterCacheType, $wgUseDatabaseMessages;
global $wgLocaltimezone, $wgLocalisationCacheConf;
@ -59,6 +59,7 @@ class TestSetup {
$wgLanguageConverterCacheType = 'hash';
// Uses db-replicated in DefaultSettings
$wgMainStash = 'hash';
$wgChronologyProtectorStash = 'hash';
// Use hash instead of db
$wgObjectCaches['db-replicated'] = $wgObjectCaches['hash'];
// Use memory job queue

View file

@ -338,10 +338,10 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
$cp->applySessionReplicationPosition( $lb1 );
$cp->applySessionReplicationPosition( $lb2 );
// Record positions in stash on first HTTP request end
$cp->storeSessionReplicationPosition( $lb1 );
$cp->storeSessionReplicationPosition( $lb2 );
$cp->stageSessionReplicationPosition( $lb1 );
$cp->stageSessionReplicationPosition( $lb2 );
$cpIndex = null;
$cp->shutdown( null, 'sync', $cpIndex );
$cp->shutdown( $cpIndex );
$this->assertSame( 1, $cpIndex, "CP write index set" );
@ -381,10 +381,10 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
$cp->applySessionReplicationPosition( $lb1 );
$cp->applySessionReplicationPosition( $lb2 );
// Shutdown (nothing to record)
$cp->storeSessionReplicationPosition( $lb1 );
$cp->storeSessionReplicationPosition( $lb2 );
$cp->stageSessionReplicationPosition( $lb1 );
$cp->stageSessionReplicationPosition( $lb2 );
$cpIndex = null;
$cp->shutdown( null, 'sync', $cpIndex );
$cp->shutdown( $cpIndex );
$this->assertNull( $cpIndex, "CP write index retained" );
@ -809,13 +809,27 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
public function testGetChronologyProtectorTouched() {
$store = new HashBagOStuff;
$lbFactory = $this->newLBFactoryMulti( [
'memStash' => $store
'cpStash' => $store,
'cliMode' => false
] );
$lbFactory->setRequestInfo( [ 'ChronologyClientId' => 'ii' ] );
$key = $store->makeGlobalKey( ChronologyProtector::class,
'mtime', 'ii', 'test-db1' );
$store->set( $key, 2 );
$mockWallClock = 1549343530.2053;
$priorTime = $mockWallClock; // reference time
$lbFactory->setMockTime( $mockWallClock );
$key = $store->makeGlobalKey( ChronologyProtector::class, 'ii', 'v2' );
$store->set(
$key,
[
'positions' => [],
'timestamps' => [ $lbFactory::CLUSTER_MAIN_DEFAULT => $priorTime ]
],
3600
);
$mockWallClock += 1.0;
$touched = $lbFactory->getChronologyProtectorTouched();
$this->assertEquals( 2, $touched );
$this->assertEquals( $priorTime, $touched );
}
}