Add $wgChronologyProtectorStash and improve $wgMainStash comments
Remove WRITE_SYNC flag from ChronologyProtector since the current plan is to simply use a datacenter-local storage cluster. Move the touched timestamps into the same stash key that holds the replication positions. Update the ChronologyProtector::getTouched() comments. Also: * Use $wgMainCacheType as a $wgChronologyProtectorStash fallback since the main stash will be 'db-replicated' for most sites. * Remove HashBagOStuff default for position store since that can result in timeouts waiting on a write position index to appear since the data does not actually persist accress requests. * Rename ChronologyProtector::saveSessionReplicationPosition() since it does not actually save replication positions to storage. * Make ChronologyProtector::getTouched() check the "enabled" field. * Allow mocking the current time in ChronologyProtector. * Mark some internal methods with @internal. * Migrate various comments from $wgMainStash to BagOStuff. * Update some other ObjectCache related comments. Bug: T254634 Change-Id: I0456f5d40a558122a1b50baf4ab400c5cf0b623d
This commit is contained in:
parent
5b6782b1ff
commit
bd7cf4dce9
11 changed files with 441 additions and 342 deletions
|
|
@ -2277,6 +2277,10 @@ $wgLBFactoryConf = [ 'class' => \Wikimedia\Rdbms\LBFactorySimple::class ];
|
|||
* After a state-changing request is done by a client, this determines
|
||||
* how many seconds that client should keep using the master datacenter.
|
||||
* This avoids unexpected stale or 404 responses due to replication lag.
|
||||
*
|
||||
* This must be greater than or equal to
|
||||
* Wikimedia\Rdbms\ChronologyProtector::POSITION_COOKIE_TTL.
|
||||
*
|
||||
* @since 1.27
|
||||
*/
|
||||
$wgDataCenterUpdateStickTTL = 10;
|
||||
|
|
@ -2616,16 +2620,16 @@ $wgObjectCaches = [
|
|||
* By default, this will wrap $wgMainCacheType (which is disabled, since the basic
|
||||
* stock default of CACHE_DB is not fast enough to make it worthwhile).
|
||||
*
|
||||
* For single server or single data-center setup, setting $wgMainCacheType
|
||||
* For single server or single datacenter setup, setting $wgMainCacheType
|
||||
* is enough.
|
||||
*
|
||||
* For a multiple data-center setup, WANObjectCache should be configured to
|
||||
* For a multiple datacenter setup, WANObjectCache should be configured to
|
||||
* broadcast some if its operations using Mcrouter or Dynomite.
|
||||
* See @ref wanobjectcache-deployment "Deploying WANObjectCache".
|
||||
*
|
||||
* The options are:
|
||||
* - false: Configure the cache using $wgMainCacheType, without using
|
||||
* a relayer (only matters if there are multiple data-centers)
|
||||
* a relayer (only matters if there are multiple datacenters)
|
||||
* - CACHE_NONE: Do not cache
|
||||
* - (other): A string may be used which identifies a cache
|
||||
* configuration in $wgWANObjectCaches
|
||||
|
|
@ -2681,40 +2685,47 @@ $wgEnableWANCacheReaper = false;
|
|||
/**
|
||||
* The object store type of the main stash.
|
||||
*
|
||||
* This store should be a very fast storage system optimized for holding lightweight data
|
||||
* like incrementable hit counters and current user activity. The store should replicate the
|
||||
* dataset among all data-centers. Any add(), merge(), lock(), and unlock() operations should
|
||||
* maintain "best effort" linearizability; as long as connectivity is strong, latency is low,
|
||||
* and there is no eviction pressure prompted by low free space, those operations should be
|
||||
* linearizable. In terms of PACELC (https://en.wikipedia.org/wiki/PACELC_theorem), the store
|
||||
* should act as a PA/EL distributed system for these operations. One optimization for these
|
||||
* operations is to route them to a "primary" data-center (e.g. one that serves HTTP POST) for
|
||||
* synchronous execution and then replicate to the others asynchronously. This means that at
|
||||
* least calls to these operations during HTTP POST requests would quickly return.
|
||||
* This should be a fast storage system optimized for lightweight data, both ephemeral and
|
||||
* permanent, for things like counters, tokens, and blobs. The dataset access scope should
|
||||
* include all the application servers in all datacenters. Thus, the data must be replicated
|
||||
* among all datacenters. The store should have "Last Write Wins" eventual consistency. Per
|
||||
* https://en.wikipedia.org/wiki/PACELC_theorem, the store should act as a PA/EL distributed
|
||||
* system for these operations.
|
||||
*
|
||||
* All other operations, such as get(), set(), delete(), changeTTL(), incr(), and decr(),
|
||||
* should be synchronous in the local data-center, replicating asynchronously to the others.
|
||||
* This behavior can be overriden by the use of the WRITE_SYNC and READ_LATEST flags.
|
||||
* The multi-datacenter strategy for MediaWiki is to have CDN route HTTP POST requests to the
|
||||
* master datacenter and HTTP GET/HEAD/OPTIONS requests to the closest datacenter to the client.
|
||||
* The stash accepts write operations from any datacenter, but cross-datacenter replication is
|
||||
* asynchronous.
|
||||
*
|
||||
* The store should *preferably* have eventual consistency to handle network partitions.
|
||||
*
|
||||
* Modules that rely on the stash should be prepared for:
|
||||
* - add(), merge(), lock(), and unlock() to be slower than other write operations,
|
||||
* at least in "secondary" data-centers (e.g. one that only serves HTTP GET/HEAD)
|
||||
* - Other write operations to have race conditions accross data-centers
|
||||
* - Read operations to have race conditions accross data-centers
|
||||
* - Consistency to be either eventual (with Last-Write-Wins) or just "best effort"
|
||||
*
|
||||
* In general, this means avoiding updates during idempotent HTTP requests (GET/HEAD) and
|
||||
* avoiding assumptions of true linearizability (e.g. accepting anomalies). Modules that need
|
||||
* these kind of guarantees should use other storage mediums.
|
||||
* Modules that use the main stash can expect race conditions to occur if a key can receive
|
||||
* write operations originating from multiple datacenters. To improve consistency, callers
|
||||
* should avoid main stash updates during non-POST requests. In any case, callers should
|
||||
* gracefully tolerate occasional key evictions, temporary inconsistencies among datacenters,
|
||||
* and violations of linearizability (e.g. during timeouts). Modules that can never handle these
|
||||
* kind of anamolies should use other storage mediums.
|
||||
*
|
||||
* Valid options are the keys of $wgObjectCaches, e.g. CACHE_* constants.
|
||||
*
|
||||
* @see BagOStuff
|
||||
* @since 1.26
|
||||
*/
|
||||
$wgMainStash = 'db-replicated';
|
||||
|
||||
/**
|
||||
* The object store type of the ChronologyProtector position store.
|
||||
*
|
||||
* This should be a fast storage system optimized for lightweight ephemeral data.
|
||||
* The dataset access scope should include all application servers in the local datacenter.
|
||||
* A set of single-key operations should maintain "best effort" linearizability (e.g. they
|
||||
* observe linearizability unless connectivity/latency/disk problems arise).
|
||||
*
|
||||
* Valid options are the keys of $wgObjectCaches, e.g. CACHE_* constants.
|
||||
*
|
||||
* @see BagOStuff
|
||||
* @since 1.36
|
||||
*/
|
||||
$wgChronologyProtectorStash = null;
|
||||
|
||||
/**
|
||||
* The expiry time for the parser cache, in seconds.
|
||||
* The default is 86400 (one day).
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ use Psr\Log\LoggerInterface;
|
|||
use Wikimedia\AtEase;
|
||||
use Wikimedia\Rdbms\ChronologyProtector;
|
||||
use Wikimedia\Rdbms\DBConnectionError;
|
||||
use Wikimedia\Rdbms\ILBFactory;
|
||||
|
||||
/**
|
||||
* The MediaWiki class is the helper class for the index.php entry point.
|
||||
|
|
@ -576,8 +575,6 @@ class MediaWiki {
|
|||
exit;
|
||||
}
|
||||
}
|
||||
// GUI-ify and stash the page output in MediaWiki::doPreOutputCommit() while
|
||||
// ChronologyProtector synchronizes DB positions or replicas across all datacenters.
|
||||
MWExceptionHandler::handleException( $e, MWExceptionHandler::CAUGHT_BY_ENTRYPOINT );
|
||||
} catch ( Throwable $e ) {
|
||||
// Type errors and such: at least handle it now and clean up the LBFactory state
|
||||
|
|
@ -687,31 +684,44 @@ class MediaWiki {
|
|||
// Run updates that need to block the client or affect output (this is the last chance)
|
||||
DeferredUpdates::doUpdates( 'run', DeferredUpdates::PRESEND );
|
||||
wfDebug( __METHOD__ . ': pre-send deferred updates completed' );
|
||||
|
||||
// Persist the session to avoid race conditions on subsequent requests by the client
|
||||
$request->getSession()->save(); // T214471
|
||||
wfDebug( __METHOD__ . ': session changes committed' );
|
||||
|
||||
// Figure out whether to wait for DB replication now or to use some method that assures
|
||||
// that subsequent requests by the client will use the DB replication positions written
|
||||
// during the shutdown() call below; the later requires working around replication lag
|
||||
// of the store containing DB replication positions (e.g. dynomite, mcrouter).
|
||||
list( $flags, $strategy ) = self::getChronProtStrategy( $lbFactory, $output );
|
||||
// Record ChronologyProtector positions for DBs affected in this request at this point
|
||||
// Subsequent requests by the client should see the DB replication positions written
|
||||
// during the shutdown() call below, even if the position store itself has asynchronous
|
||||
// replication. Setting the cpPosIndex cookie is normally enough. However, this might not
|
||||
// work for cross-domain redirects to foreign wikis, so set the ?cpPoxIndex in that case.
|
||||
$isCrossWikiRedirect = (
|
||||
$output->getRedirect() &&
|
||||
$lbFactory->hasOrMadeRecentMasterChanges( INF ) &&
|
||||
self::getUrlDomainDistance( $output->getRedirect() ) === 'remote'
|
||||
);
|
||||
|
||||
// Persist replication positions for DBs modified by this request (at this point).
|
||||
// These help provide "session consistency" for the client on their next requests.
|
||||
$cpIndex = null;
|
||||
$cpClientId = null;
|
||||
$lbFactory->shutdown( $flags, $postCommitWork, $cpIndex, $cpClientId );
|
||||
$lbFactory->shutdown(
|
||||
$lbFactory::SHUTDOWN_NORMAL,
|
||||
$postCommitWork,
|
||||
$cpIndex,
|
||||
$cpClientId
|
||||
);
|
||||
$now = time();
|
||||
|
||||
$allowHeaders = !( $output->isDisabled() || headers_sent() );
|
||||
|
||||
if ( $cpIndex > 0 ) {
|
||||
if ( $allowHeaders ) {
|
||||
$now = time();
|
||||
$expires = $now + ChronologyProtector::POSITION_COOKIE_TTL;
|
||||
$options = [ 'prefix' => '' ];
|
||||
$value = $lbFactory::makeCookieValueFromCPIndex( $cpIndex, $now, $cpClientId );
|
||||
$request->response()->setCookie( 'cpPosIndex', $value, $expires, $options );
|
||||
}
|
||||
|
||||
if ( $strategy === 'cookie+url' ) {
|
||||
if ( $isCrossWikiRedirect ) {
|
||||
if ( $output->getRedirect() ) { // sanity
|
||||
$safeUrl = $lbFactory->appendShutdownCPIndexAsQuery(
|
||||
$output->getRedirect(),
|
||||
|
|
@ -732,7 +742,10 @@ class MediaWiki {
|
|||
// handles this POST request (e.g. the "master" data center). Also have the user
|
||||
// briefly bypass CDN so ChronologyProtector works for cacheable URLs.
|
||||
if ( $request->wasPosted() && $lbFactory->hasOrMadeRecentMasterChanges() ) {
|
||||
$expires = time() + $config->get( 'DataCenterUpdateStickTTL' );
|
||||
$expires = $now + max(
|
||||
ChronologyProtector::POSITION_COOKIE_TTL,
|
||||
$config->get( 'DataCenterUpdateStickTTL' )
|
||||
);
|
||||
$options = [ 'prefix' => '' ];
|
||||
$request->response()->setCookie( 'UseDC', 'master', $expires, $options );
|
||||
$request->response()->setCookie( 'UseCDNCache', 'false', $expires, $options );
|
||||
|
|
@ -773,38 +786,6 @@ class MediaWiki {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param ILBFactory $lbFactory
|
||||
* @param OutputPage $output
|
||||
* @return array
|
||||
*/
|
||||
private static function getChronProtStrategy( ILBFactory $lbFactory, OutputPage $output ) {
|
||||
// Should the client return, their request should observe the new ChronologyProtector
|
||||
// DB positions. This request might be on a foreign wiki domain, so synchronously update
|
||||
// the DB positions in all datacenters to be safe. If this output is not a redirect,
|
||||
// then OutputPage::output() will be relatively slow, meaning that running it in
|
||||
// $postCommitWork should help mask the latency of those updates.
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_SYNC;
|
||||
$strategy = 'cookie+sync';
|
||||
|
||||
$allowHeaders = !( $output->isDisabled() || headers_sent() );
|
||||
if ( $output->getRedirect() && $lbFactory->hasOrMadeRecentMasterChanges( INF ) ) {
|
||||
// OutputPage::output() will be fast, so $postCommitWork is useless for masking
|
||||
// the latency of synchronously updating the DB positions in all datacenters.
|
||||
// Try to make use of the time the client spends following redirects instead.
|
||||
$domainDistance = self::getUrlDomainDistance( $output->getRedirect() );
|
||||
if ( $domainDistance === 'local' && $allowHeaders ) {
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
|
||||
$strategy = 'cookie'; // use same-domain cookie and keep the URL uncluttered
|
||||
} elseif ( $domainDistance === 'remote' ) {
|
||||
$flags = $lbFactory::SHUTDOWN_CHRONPROT_ASYNC;
|
||||
$strategy = 'cookie+url'; // cross-domain cookie might not work
|
||||
}
|
||||
}
|
||||
|
||||
return [ $flags, $strategy ];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $url
|
||||
* @return string Either "local", "remote" if in the farm, "external" otherwise
|
||||
|
|
@ -943,8 +924,7 @@ class MediaWiki {
|
|||
// Actually do the work of the request and build up any output
|
||||
$this->performRequest();
|
||||
|
||||
// GUI-ify and stash the page output in MediaWiki::doPreOutputCommit() while
|
||||
// ChronologyProtector synchronizes DB positions or replicas across all datacenters.
|
||||
// GUI-ify and stash the page output in MediaWiki::doPreOutputCommit()
|
||||
$buffer = null;
|
||||
$outputWork = static function () use ( $output, &$buffer ) {
|
||||
if ( $buffer === null ) {
|
||||
|
|
|
|||
|
|
@ -382,15 +382,15 @@ return [
|
|||
static function ( MediaWikiServices $services ) : Wikimedia\Rdbms\LBFactory {
|
||||
$mainConfig = $services->getMainConfig();
|
||||
|
||||
try {
|
||||
$stash = $services->getMainObjectStash();
|
||||
} catch ( RecursiveServiceDependencyException $e ) {
|
||||
$stash = new EmptyBagOStuff(); // T141804: handle cases like CACHE_DB
|
||||
}
|
||||
|
||||
if ( $stash instanceof EmptyBagOStuff ) {
|
||||
// Use process cache if the main stash is disabled or there was recursion
|
||||
$stash = new HashBagOStuff( [ 'maxKeys' => 100 ] );
|
||||
$cpStashType = $mainConfig->get( 'ChronologyProtectorStash' );
|
||||
if ( is_string( $cpStashType ) ) {
|
||||
$cpStash = ObjectCache::getInstance( $cpStashType );
|
||||
} else {
|
||||
try {
|
||||
$cpStash = ObjectCache::getLocalClusterInstance();
|
||||
} catch ( RecursiveServiceDependencyException $e ) {
|
||||
$cpStash = new EmptyBagOStuff(); // T141804: handle cases like CACHE_DB
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
@ -409,8 +409,8 @@ return [
|
|||
$mainConfig->get( 'LBFactoryConf' ),
|
||||
new ServiceOptions( MWLBFactory::APPLY_DEFAULT_CONFIG_OPTIONS, $mainConfig ),
|
||||
$services->getConfiguredReadOnlyMode(),
|
||||
$cpStash,
|
||||
$srvCache,
|
||||
$stash,
|
||||
$wanCache
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -63,8 +63,8 @@ abstract class MWLBFactory {
|
|||
* @param array $lbConf Config for LBFactory::__construct()
|
||||
* @param ServiceOptions $options
|
||||
* @param ConfiguredReadOnlyMode $readOnlyMode
|
||||
* @param BagOStuff $cpStash
|
||||
* @param BagOStuff $srvCache
|
||||
* @param BagOStuff $mainStash
|
||||
* @param WANObjectCache $wanCache
|
||||
* @return array
|
||||
* @internal For use with service wiring
|
||||
|
|
@ -73,8 +73,8 @@ abstract class MWLBFactory {
|
|||
array $lbConf,
|
||||
ServiceOptions $options,
|
||||
ConfiguredReadOnlyMode $readOnlyMode,
|
||||
BagOStuff $cpStash,
|
||||
BagOStuff $srvCache,
|
||||
BagOStuff $mainStash,
|
||||
WANObjectCache $wanCache
|
||||
) {
|
||||
$options->assertRequiredOptions( self::APPLY_DEFAULT_CONFIG_OPTIONS );
|
||||
|
|
@ -156,8 +156,8 @@ abstract class MWLBFactory {
|
|||
$options->get( 'DBprefix' )
|
||||
);
|
||||
|
||||
$lbConf['cpStash'] = $cpStash;
|
||||
$lbConf['srvCache'] = $srvCache;
|
||||
$lbConf['memStash'] = $mainStash;
|
||||
$lbConf['wanCache'] = $wanCache;
|
||||
|
||||
return $lbConf;
|
||||
|
|
|
|||
|
|
@ -39,21 +39,16 @@ use Wikimedia\ScopedCallback;
|
|||
*
|
||||
* This interface is intended to be more or less compatible with the PHP memcached client.
|
||||
*
|
||||
* Instances of this class should be created with an intended access scope, such as:
|
||||
* Class instances should be created with an intended access scope for the dataset, such as:
|
||||
* - a) A single PHP thread on a server (e.g. stored in a PHP variable)
|
||||
* - b) A single application server (e.g. stored in APC or sqlite)
|
||||
* - c) All application servers in datacenter (e.g. stored in memcached or mysql)
|
||||
* - d) All application servers in all datacenters (e.g. stored via mcrouter or dynomite)
|
||||
*
|
||||
* Callers should use the proper factory methods that yield BagOStuff instances. Site admins
|
||||
* should make sure the configuration for those factory methods matches their access scope.
|
||||
* BagOStuff subclasses have widely varying levels of support for replication features.
|
||||
*
|
||||
* For any given instance, methods like lock(), unlock(), merge(), and set() with WRITE_SYNC
|
||||
* should semantically operate over its entire access scope; any nodes/threads in that scope
|
||||
* should serialize appropriately when using them. Likewise, a call to get() with READ_LATEST
|
||||
* from one node in its access scope should reflect the prior changes of any other node its
|
||||
* access scope. Any get() should reflect the changes of any prior set() with WRITE_SYNC.
|
||||
* should make sure that the configuration for those factory methods match their access scope.
|
||||
* BagOStuff subclasses have widely varying levels of support replication features within and
|
||||
* among datacenters.
|
||||
*
|
||||
* Subclasses should override the default "segmentationSize" field with an appropriate value.
|
||||
* The value should not be larger than what the storage backend (by default) supports. It also
|
||||
|
|
@ -61,6 +56,25 @@ use Wikimedia\ScopedCallback;
|
|||
* having poor scalability). The same goes for the "segmentedValueMaxSize" member, which limits
|
||||
* the maximum size and chunk count (indirectly) of values.
|
||||
*
|
||||
* A few notes about data consistency for BagOStuff instances:
|
||||
* - Read operation methods, e.g. get(), should be synchronous in the local datacenter.
|
||||
* When used with READ_LATEST, such operations should reflect any prior writes originating
|
||||
* from the local datacenter (e.g. by avoiding replica DBs or invoking quorom reads).
|
||||
* - Write operation methods, e.g. set(), should be synchronous in the local datacenter, with
|
||||
* asynchronous cross-datacenter replication. This replication can be either "best effort"
|
||||
* or eventually consistent. When used with WRITE_SYNC, such operations will wait until all
|
||||
* datacenters are updated or a timeout occurs. If the write succeeded, then any subsequent
|
||||
* get() operations with READ_LATEST, regardless of datacenter, should reflect the changes.
|
||||
* - Locking operation methods, e.g. lock(), unlock(), and getScopedLock(), should only apply
|
||||
* to the local datacenter.
|
||||
* - Any set of single-key write operation method calls originating from a single datacenter
|
||||
* should observe "best effort" linearizability. Any set of single-key write operations using
|
||||
* WRITE_SYNC, regardless of the datacenter, should observe "best effort" linearizability.
|
||||
* In this context, "best effort" means that consistency holds as long as connectivity is
|
||||
* strong, network latency is low, and there are no relevant storage server failures.
|
||||
* Per https://en.wikipedia.org/wiki/PACELC_theorem, the store should act as a PA/EL
|
||||
* distributed system for these operations.
|
||||
*
|
||||
* @stable to extend
|
||||
* @ingroup Cache
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -52,28 +52,41 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
protected $clientLogInfo;
|
||||
/** @var int|null Expected minimum index of the last write to the position store */
|
||||
protected $waitForPosIndex;
|
||||
/** @var int Max seconds to wait on positions to appear */
|
||||
protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
|
||||
/** @var bool Whether to no-op all method calls */
|
||||
|
||||
/** @var bool Whether reading/writing session consistency replication positions is enabled */
|
||||
protected $enabled = true;
|
||||
/** @var bool Whether to check and wait on positions */
|
||||
protected $wait = true;
|
||||
/** @var bool Whether waiting on DB servers to reach replication positions is enabled */
|
||||
protected $positionWaitsEnabled = true;
|
||||
/** @var float|null UNIX timestamp when the client data was loaded */
|
||||
protected $startupTimestamp;
|
||||
|
||||
/** @var bool Whether the client data was loaded */
|
||||
protected $initialized = false;
|
||||
/** @var DBMasterPos[] Map of (DB master name => position) */
|
||||
protected $startupPositions = [];
|
||||
/** @var DBMasterPos[] Map of (DB master name => position) */
|
||||
protected $shutdownPositions = [];
|
||||
/** @var float[] Map of (DB master name => 1) */
|
||||
protected $shutdownTouchDBs = [];
|
||||
/** @var array<string,DBMasterPos> Map of (DB master name => position) */
|
||||
protected $startupPositionsByMaster = [];
|
||||
/** @var array<string,DBMasterPos> Map of (DB master name => position) */
|
||||
protected $shutdownPositionsByMaster = [];
|
||||
/** @var array<string,float> Map of (DB cluster name => UNIX timestamp) */
|
||||
protected $startupTimestampsByCluster = [];
|
||||
/** @var array<string,float> Map of (DB cluster name => UNIX timestamp) */
|
||||
protected $shutdownTimestampsByCluster = [];
|
||||
|
||||
/** @var int Seconds to store positions */
|
||||
public const POSITION_TTL = 60;
|
||||
/** @var int Seconds to store position write index cookies (safely less than POSITION_TTL) */
|
||||
/** @var float|null */
|
||||
private $wallClockOverride;
|
||||
|
||||
/** Seconds to store position write index cookies (safely less than POSITION_STORE_TTL) */
|
||||
public const POSITION_COOKIE_TTL = 10;
|
||||
/** @var int Max time to wait for positions to appear */
|
||||
private const POS_STORE_WAIT_TIMEOUT = 5;
|
||||
/** Seconds to store replication positions */
|
||||
private const POSITION_STORE_TTL = 60;
|
||||
/** Max seconds to wait for positions write indexes to appear (e.g. replicate) in storage */
|
||||
private const POSITION_INDEX_WAIT_TIMEOUT = 5;
|
||||
|
||||
/** Lock timeout to use for key updates */
|
||||
private const LOCK_TIMEOUT = 3;
|
||||
/** Lock expiry to use for key updates */
|
||||
private const LOCK_TTL = 6;
|
||||
|
||||
private const FLD_POSITIONS = 'positions';
|
||||
private const FLD_TIMESTAMPS = 'timestamps';
|
||||
private const FLD_WRITE_INDEX = 'writeIndex';
|
||||
|
||||
/**
|
||||
* @param BagOStuff $store
|
||||
|
|
@ -84,6 +97,7 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
*/
|
||||
public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
|
||||
$this->store = $store;
|
||||
|
||||
if ( isset( $client['clientId'] ) ) {
|
||||
$this->clientId = $client['clientId'];
|
||||
} else {
|
||||
|
|
@ -116,7 +130,7 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param bool $enabled Whether to no-op all method calls
|
||||
* @param bool $enabled Whether reading/writing session replication positions is enabled
|
||||
* @since 1.27
|
||||
*/
|
||||
public function setEnabled( $enabled ) {
|
||||
|
|
@ -124,11 +138,11 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param bool $enabled Whether to check and wait on positions
|
||||
* @param bool $enabled Whether session replication position wait barriers are enable
|
||||
* @since 1.27
|
||||
*/
|
||||
public function setWaitEnabled( $enabled ) {
|
||||
$this->wait = $enabled;
|
||||
$this->positionWaitsEnabled = $enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -139,254 +153,319 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
* to that position by delaying execution. The delay may timeout and allow stale
|
||||
* data if no non-lagged replica DBs are available.
|
||||
*
|
||||
* This method should only be called from LBFactory.
|
||||
* @internal This method should only be called from LBFactory.
|
||||
*
|
||||
* @param ILoadBalancer $lb
|
||||
* @return void
|
||||
*/
|
||||
public function applySessionReplicationPosition( ILoadBalancer $lb ) {
|
||||
if ( !$this->enabled ) {
|
||||
return; // disabled
|
||||
if ( !$this->enabled || !$this->positionWaitsEnabled ) {
|
||||
return;
|
||||
}
|
||||
|
||||
$cluster = $lb->getClusterName();
|
||||
$masterName = $lb->getServerName( $lb->getWriterIndex() );
|
||||
$startupPositions = $this->getStartupMasterPositions();
|
||||
|
||||
$pos = $startupPositions[$masterName] ?? null;
|
||||
$pos = $this->getStartupSessionPositions()[$masterName] ?? null;
|
||||
if ( $pos instanceof DBMasterPos ) {
|
||||
$this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'" );
|
||||
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) position is '$pos'" );
|
||||
$lb->waitFor( $pos );
|
||||
} else {
|
||||
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no position" );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save the "session consistency" DB replication position for an end-of-life ILoadBalancer
|
||||
* Update the "session consistency" DB replication position for an end-of-life ILoadBalancer
|
||||
*
|
||||
* This saves the replication position of the master DB if this request made writes to it.
|
||||
* This remarks the replication position of the master DB if this request made writes to
|
||||
* it using the provided ILoadBalancer instance.
|
||||
*
|
||||
* This method should only be called from LBFactory.
|
||||
* @internal This method should only be called from LBFactory.
|
||||
*
|
||||
* @param ILoadBalancer $lb
|
||||
* @return void
|
||||
*/
|
||||
public function storeSessionReplicationPosition( ILoadBalancer $lb ) {
|
||||
if ( !$this->enabled ) {
|
||||
return; // disabled
|
||||
} elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
|
||||
// Only save the position if writes have been done on the connection
|
||||
public function stageSessionReplicationPosition( ILoadBalancer $lb ) {
|
||||
if ( !$this->enabled || !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
|
||||
return;
|
||||
}
|
||||
|
||||
$cluster = $lb->getClusterName();
|
||||
$masterName = $lb->getServerName( $lb->getWriterIndex() );
|
||||
|
||||
if ( $lb->hasStreamingReplicaServers() ) {
|
||||
$pos = $lb->getReplicaResumePos();
|
||||
if ( $pos ) {
|
||||
$this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos" );
|
||||
$this->shutdownPositions[$masterName] = $pos;
|
||||
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) position now '$pos'" );
|
||||
$this->shutdownPositionsByMaster[$masterName] = $pos;
|
||||
$this->shutdownTimestampsByCluster[$cluster] = $pos->asOfTime();
|
||||
} else {
|
||||
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) position unknown" );
|
||||
$this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
|
||||
}
|
||||
} else {
|
||||
$this->logger->debug( __METHOD__ . ": DB '$masterName' touched" );
|
||||
$this->logger->debug( __METHOD__ . ": $cluster ($masterName) has no replication" );
|
||||
$this->shutdownTimestampsByCluster[$cluster] = $this->getCurrentTime();
|
||||
}
|
||||
$this->shutdownTouchDBs[$masterName] = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
|
||||
* May commit chronology data to persistent storage.
|
||||
* Save any remarked "session consistency" DB replication positions to persistent storage
|
||||
*
|
||||
* @internal This method should only be called from LBFactory.
|
||||
*
|
||||
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
|
||||
* @param string $mode One of (sync, async); whether to wait on remote datacenters
|
||||
* @param int|null &$cpIndex DB position key write counter; incremented on update
|
||||
* @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
|
||||
* @return DBMasterPos[] Empty on success; map of (db name => unsaved position) on failure
|
||||
*/
|
||||
public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
|
||||
public function shutdown( &$cpIndex = null ) {
|
||||
if ( !$this->enabled ) {
|
||||
return [];
|
||||
}
|
||||
|
||||
$store = $this->store;
|
||||
// Some callers might want to know if a user recently touched a DB.
|
||||
// These writes do not need to block on all datacenters receiving them.
|
||||
foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
|
||||
$store->set(
|
||||
$this->getTouchedKey( $this->store, $dbName ),
|
||||
microtime( true ),
|
||||
$store::TTL_DAY
|
||||
);
|
||||
if ( !$this->shutdownTimestampsByCluster ) {
|
||||
$this->logger->debug( __METHOD__ . ": no master positions/timestamps to save" );
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
if ( $this->shutdownPositions === [] ) {
|
||||
$this->logger->debug( __METHOD__ . ": no master positions to save" );
|
||||
|
||||
return []; // nothing to save
|
||||
}
|
||||
|
||||
$this->logger->debug(
|
||||
__METHOD__ . ": saving master pos for " .
|
||||
implode( ', ', array_keys( $this->shutdownPositions ) )
|
||||
);
|
||||
|
||||
// CP-protected writes should overwhelmingly go to the master datacenter, so merge the
|
||||
// positions with a DC-local lock, a DC-local get(), and an all-DC set() with WRITE_SYNC.
|
||||
// If set() returns success, then any get() should be able to see the new positions.
|
||||
if ( $store->lock( $this->key, 3 ) ) {
|
||||
if ( $workCallback ) {
|
||||
// Let the store run the work before blocking on a replication sync barrier.
|
||||
// If replication caught up while the work finished, the barrier will be fast.
|
||||
$store->addBusyCallback( $workCallback );
|
||||
}
|
||||
$ok = $store->set(
|
||||
$scopeLock = $this->store->getScopedLock( $this->key, self::LOCK_TIMEOUT, self::LOCK_TTL );
|
||||
if ( $scopeLock ) {
|
||||
$ok = $this->store->set(
|
||||
$this->key,
|
||||
$this->mergePositions(
|
||||
$store->get( $this->key ),
|
||||
$this->shutdownPositions,
|
||||
$this->store->get( $this->key ),
|
||||
$this->shutdownPositionsByMaster,
|
||||
$this->shutdownTimestampsByCluster,
|
||||
$cpIndex
|
||||
),
|
||||
self::POSITION_TTL,
|
||||
( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
|
||||
self::POSITION_STORE_TTL
|
||||
);
|
||||
$store->unlock( $this->key );
|
||||
unset( $scopeLock );
|
||||
} else {
|
||||
$ok = false;
|
||||
}
|
||||
|
||||
if ( !$ok ) {
|
||||
$cpIndex = null; // nothing saved
|
||||
$bouncedPositions = $this->shutdownPositions;
|
||||
// Raced out too many times or stash is down
|
||||
$this->logger->warning( __METHOD__ . ": failed to save master pos for " .
|
||||
implode( ', ', array_keys( $this->shutdownPositions ) )
|
||||
);
|
||||
} elseif ( $mode === 'sync' &&
|
||||
$store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
|
||||
) {
|
||||
// Positions may not be in all datacenters, force LBFactory to play it safe
|
||||
$this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
|
||||
$bouncedPositions = $this->shutdownPositions;
|
||||
} else {
|
||||
$clusterList = implode( ', ', array_keys( $this->shutdownTimestampsByCluster ) );
|
||||
|
||||
if ( $ok ) {
|
||||
$bouncedPositions = [];
|
||||
$this->logger->debug(
|
||||
__METHOD__ . ": saved master positions/timestamp for DB cluster(s) $clusterList"
|
||||
);
|
||||
|
||||
} else {
|
||||
$cpIndex = null; // nothing saved
|
||||
$bouncedPositions = $this->shutdownPositionsByMaster;
|
||||
// Raced out too many times or stash is down
|
||||
$this->logger->warning(
|
||||
__METHOD__ . ": failed to save master positions for DB cluster(s) $clusterList"
|
||||
);
|
||||
}
|
||||
|
||||
return $bouncedPositions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param ILoadBalancer $lb The load balancer. Prior to 1.35, the first parameter was the
|
||||
* master name.
|
||||
* @return float|bool UNIX timestamp when client last touched the DB; false if not on record
|
||||
* Get the UNIX timestamp when the client last touched the DB, if they did so recently
|
||||
*
|
||||
* @internal This method should only be called from LBFactory.
|
||||
*
|
||||
* @param ILoadBalancer $lb
|
||||
* @return float|false UNIX timestamp; false if not recent or on record
|
||||
* @since 1.35
|
||||
*/
|
||||
public function getTouched( ILoadBalancer $lb ) {
|
||||
$masterName = $lb->getServerName( $lb->getWriterIndex() );
|
||||
return $this->store->get( $this->getTouchedKey( $this->store, $masterName ) );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param BagOStuff $store
|
||||
* @param string $masterName
|
||||
* @return string
|
||||
*/
|
||||
private function getTouchedKey( BagOStuff $store, $masterName ) {
|
||||
return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $masterName );
|
||||
}
|
||||
|
||||
/**
|
||||
* Load in previous master positions for the client
|
||||
* @return DBMasterPos[]
|
||||
*/
|
||||
protected function getStartupMasterPositions() {
|
||||
if ( $this->initialized ) {
|
||||
return $this->startupPositions;
|
||||
if ( !$this->enabled ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$this->initialized = true;
|
||||
$this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)" );
|
||||
$cluster = $lb->getClusterName();
|
||||
|
||||
if ( $this->wait ) {
|
||||
// If there is an expectation to see master positions from a certain write
|
||||
// index or higher, then block until it appears, or until a timeout is reached.
|
||||
// Since the write index restarts each time the key is created, it is possible that
|
||||
// a lagged store has a matching key write index. However, in that case, it should
|
||||
// already be expired and thus treated as non-existing, maintaining correctness.
|
||||
if ( $this->waitForPosIndex > 0 ) {
|
||||
$data = null;
|
||||
$indexReached = null; // highest index reached in the position store
|
||||
$loop = new WaitConditionLoop(
|
||||
function () use ( &$data, &$indexReached ) {
|
||||
$data = $this->store->get( $this->key );
|
||||
if ( !is_array( $data ) ) {
|
||||
return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
|
||||
} elseif ( !isset( $data['writeIndex'] ) ) {
|
||||
return WaitConditionLoop::CONDITION_REACHED; // b/c
|
||||
}
|
||||
$indexReached = max( $data['writeIndex'], $indexReached );
|
||||
|
||||
return ( $data['writeIndex'] >= $this->waitForPosIndex )
|
||||
? WaitConditionLoop::CONDITION_REACHED
|
||||
: WaitConditionLoop::CONDITION_CONTINUE;
|
||||
},
|
||||
$this->waitForPosStoreTimeout
|
||||
);
|
||||
$result = $loop->invoke();
|
||||
$waitedMs = $loop->getLastWaitTime() * 1e3;
|
||||
|
||||
if ( $result == $loop::CONDITION_REACHED ) {
|
||||
$this->logger->debug(
|
||||
__METHOD__ . ": expected and found position index.",
|
||||
[
|
||||
'cpPosIndex' => $this->waitForPosIndex,
|
||||
'waitTimeMs' => $waitedMs
|
||||
] + $this->clientLogInfo
|
||||
);
|
||||
} else {
|
||||
$this->logger->warning(
|
||||
__METHOD__ . ": expected but failed to find position index.",
|
||||
[
|
||||
'cpPosIndex' => $this->waitForPosIndex,
|
||||
'indexReached' => $indexReached,
|
||||
'waitTimeMs' => $waitedMs
|
||||
] + $this->clientLogInfo
|
||||
);
|
||||
}
|
||||
} else {
|
||||
$data = $this->store->get( $this->key );
|
||||
}
|
||||
|
||||
$this->startupPositions = $data ? $data['positions'] : [];
|
||||
$this->logger->debug( __METHOD__ . ": key is {$this->key} (read)" );
|
||||
$timestampsByCluster = $this->getStartupSessionTimestamps();
|
||||
$timestamp = $timestampsByCluster[$cluster] ?? null;
|
||||
if ( $timestamp === null ) {
|
||||
$recentTouchTimestamp = false;
|
||||
} elseif ( ( $this->startupTimestamp - $timestamp ) > self::POSITION_COOKIE_TTL ) {
|
||||
// If the position store is not replicated among datacenters and the cookie that
|
||||
// sticks the client to the primary datacenter expires, then the touch timestamp
|
||||
// will be found for requests in one datacenter but not others. For consistency,
|
||||
// return false once the user is no longer routed to the primary datacenter.
|
||||
$recentTouchTimestamp = false;
|
||||
$this->logger->debug( __METHOD__ . ": old timestamp ($timestamp) for $cluster" );
|
||||
} else {
|
||||
$this->startupPositions = [];
|
||||
$this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)" );
|
||||
$recentTouchTimestamp = $timestamp;
|
||||
$this->logger->debug( __METHOD__ . ": recent timestamp ($timestamp) for $cluster" );
|
||||
}
|
||||
|
||||
return $this->startupPositions;
|
||||
return $recentTouchTimestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array|bool $curValue
|
||||
* @param DBMasterPos[] $shutdownPositions
|
||||
* @param int|null &$cpIndex
|
||||
* @return array
|
||||
* @return array<string,DBMasterPos>
|
||||
*/
|
||||
protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
|
||||
/** @var DBMasterPos[] $curPositions */
|
||||
$curPositions = $curValue['positions'] ?? [];
|
||||
// Use the newest positions for each DB master
|
||||
foreach ( $shutdownPositions as $db => $pos ) {
|
||||
if (
|
||||
!isset( $curPositions[$db] ) ||
|
||||
!( $curPositions[$db] instanceof DBMasterPos ) ||
|
||||
$pos->asOfTime() > $curPositions[$db]->asOfTime()
|
||||
) {
|
||||
$curPositions[$db] = $pos;
|
||||
protected function getStartupSessionPositions() {
|
||||
$this->lazyStartup();
|
||||
|
||||
return $this->startupPositionsByMaster;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<string,float>
|
||||
*/
|
||||
protected function getStartupSessionTimestamps() {
|
||||
$this->lazyStartup();
|
||||
|
||||
return $this->startupTimestampsByCluster;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the stored DB replication positions and touch timestamps for the client
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
protected function lazyStartup() {
|
||||
if ( $this->startupTimestamp !== null ) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->startupTimestamp = $this->getCurrentTime();
|
||||
$this->logger->debug(
|
||||
__METHOD__ .
|
||||
": client ID is {$this->clientId}; key is {$this->key}"
|
||||
);
|
||||
|
||||
// If there is an expectation to see master positions from a certain write
|
||||
// index or higher, then block until it appears, or until a timeout is reached.
|
||||
// Since the write index restarts each time the key is created, it is possible that
|
||||
// a lagged store has a matching key write index. However, in that case, it should
|
||||
// already be expired and thus treated as non-existing, maintaining correctness.
|
||||
if ( $this->positionWaitsEnabled && $this->waitForPosIndex > 0 ) {
|
||||
$data = null;
|
||||
$indexReached = null; // highest index reached in the position store
|
||||
$loop = new WaitConditionLoop(
|
||||
function () use ( &$data, &$indexReached ) {
|
||||
$data = $this->store->get( $this->key );
|
||||
if ( !is_array( $data ) ) {
|
||||
return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
|
||||
} elseif ( !isset( $data[self::FLD_WRITE_INDEX] ) ) {
|
||||
return WaitConditionLoop::CONDITION_REACHED; // b/c
|
||||
}
|
||||
$indexReached = max( $data[self::FLD_WRITE_INDEX], $indexReached );
|
||||
|
||||
return ( $data[self::FLD_WRITE_INDEX] >= $this->waitForPosIndex )
|
||||
? WaitConditionLoop::CONDITION_REACHED
|
||||
: WaitConditionLoop::CONDITION_CONTINUE;
|
||||
},
|
||||
self::POSITION_INDEX_WAIT_TIMEOUT
|
||||
);
|
||||
$result = $loop->invoke();
|
||||
$waitedMs = $loop->getLastWaitTime() * 1e3;
|
||||
|
||||
if ( $result == $loop::CONDITION_REACHED ) {
|
||||
$this->logger->debug(
|
||||
__METHOD__ . ": expected and found position index {cpPosIndex}.",
|
||||
[
|
||||
'cpPosIndex' => $this->waitForPosIndex,
|
||||
'waitTimeMs' => $waitedMs
|
||||
] + $this->clientLogInfo
|
||||
);
|
||||
} else {
|
||||
$this->logger->warning(
|
||||
__METHOD__ . ": expected but failed to find position index {cpPosIndex}.",
|
||||
[
|
||||
'cpPosIndex' => $this->waitForPosIndex,
|
||||
'indexReached' => $indexReached,
|
||||
'waitTimeMs' => $waitedMs
|
||||
] + $this->clientLogInfo
|
||||
);
|
||||
}
|
||||
} else {
|
||||
$data = $this->store->get( $this->key );
|
||||
$indexReached = $data[self::FLD_WRITE_INDEX] ?? null;
|
||||
if ( $indexReached ) {
|
||||
$this->logger->debug(
|
||||
__METHOD__ . ": found position/timestamp data with index {indexReached}.",
|
||||
[ 'indexReached' => $indexReached ] + $this->clientLogInfo
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
$cpIndex = $curValue['writeIndex'] ?? 0;
|
||||
$this->startupPositionsByMaster = $data ? $data[self::FLD_POSITIONS] : [];
|
||||
$this->startupTimestampsByCluster = $data[self::FLD_TIMESTAMPS] ?? [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge the new DB replication positions with the currently stored ones (highest wins)
|
||||
*
|
||||
* @param array<string,mixed>|false $storedValue Current DB replication position data
|
||||
* @param array<string,DBMasterPos> $shutdownPositions New DB replication positions
|
||||
* @param array<string,float> $shutdownTimestamps New DB post-commit shutdown timestamps
|
||||
* @param int|null &$cpIndex New position write index
|
||||
* @return array<string,mixed> Combined DB replication position data
|
||||
*/
|
||||
protected function mergePositions(
|
||||
$storedValue,
|
||||
array $shutdownPositions,
|
||||
array $shutdownTimestamps,
|
||||
&$cpIndex = null
|
||||
) {
|
||||
/** @var array<string,DBMasterPos> $mergedPositions */
|
||||
$mergedPositions = $storedValue[self::FLD_POSITIONS] ?? [];
|
||||
// Use the newest positions for each DB master
|
||||
foreach ( $shutdownPositions as $masterName => $pos ) {
|
||||
if (
|
||||
!isset( $mergedPositions[$masterName] ) ||
|
||||
!( $mergedPositions[$masterName] instanceof DBMasterPos ) ||
|
||||
$pos->asOfTime() > $mergedPositions[$masterName]->asOfTime()
|
||||
) {
|
||||
$mergedPositions[$masterName] = $pos;
|
||||
}
|
||||
}
|
||||
|
||||
/** @var array<string,float> $mergedTimestamps */
|
||||
$mergedTimestamps = $storedValue[self::FLD_TIMESTAMPS] ?? [];
|
||||
// Use the newest touch timestamp for each DB master
|
||||
foreach ( $shutdownTimestamps as $cluster => $timestamp ) {
|
||||
if (
|
||||
!isset( $mergedTimestamps[$cluster] ) ||
|
||||
$timestamp > $mergedTimestamps[$cluster]
|
||||
) {
|
||||
$mergedTimestamps[$cluster] = $timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
$cpIndex = $storedValue[self::FLD_WRITE_INDEX] ?? 0;
|
||||
|
||||
return [
|
||||
'positions' => $curPositions,
|
||||
'writeIndex' => ++$cpIndex
|
||||
self::FLD_POSITIONS => $mergedPositions,
|
||||
self::FLD_TIMESTAMPS => $mergedTimestamps,
|
||||
self::FLD_WRITE_INDEX => ++$cpIndex
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal For testing only
|
||||
* @return float UNIX timestamp
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
protected function getCurrentTime() {
|
||||
if ( $this->wallClockOverride ) {
|
||||
return $this->wallClockOverride;
|
||||
}
|
||||
|
||||
$clockTime = (float)time(); // call this first
|
||||
// microtime() can severely drift from time() and the microtime() value of other threads.
|
||||
// Instead of seeing the current time as being in the past, use the value of time().
|
||||
return max( microtime( true ), $clockTime );
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal For testing only
|
||||
* @param float|null &$time Mock UNIX timestamp
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public function setMockTime( &$time ) {
|
||||
$this->wallClockOverride =& $time;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,12 +31,10 @@ use InvalidArgumentException;
|
|||
* @since 1.28
|
||||
*/
|
||||
interface ILBFactory {
|
||||
/** @var int Don't save DB positions at all */
|
||||
public const SHUTDOWN_NO_CHRONPROT = 0; // don't save DB positions at all
|
||||
/** @var int Save DB positions, but don't wait on remote DCs */
|
||||
public const SHUTDOWN_CHRONPROT_ASYNC = 1;
|
||||
/** @var int Save DB positions, waiting on all DCs */
|
||||
public const SHUTDOWN_CHRONPROT_SYNC = 2;
|
||||
/** Idion for "no special shutdown flags" */
|
||||
public const SHUTDOWN_NORMAL = 0;
|
||||
/** Do not save "session consistency" DB replication positions */
|
||||
public const SHUTDOWN_NO_CHRONPROT = 1;
|
||||
|
||||
/** @var string Default main LB cluster name (do not change this) */
|
||||
public const CLUSTER_MAIN_DEFAULT = 'DEFAULT';
|
||||
|
|
@ -50,7 +48,7 @@ interface ILBFactory {
|
|||
* - localDomain: A DatabaseDomain or domain ID string.
|
||||
* - readOnlyReason: Reason the master DB is read-only if so [optional]
|
||||
* - srvCache: BagOStuff object for server cache [optional]
|
||||
* - memStash: BagOStuff object for cross-datacenter memory storage [optional]
|
||||
* - cpStash: BagOStuff object for ChronologyProtector store [optional]
|
||||
* - wanCache: WANObjectCache object [optional]
|
||||
* - hostname: The name of the current server [optional]
|
||||
* - cliMode: Whether the execution context is a CLI script. [optional]
|
||||
|
|
@ -177,13 +175,13 @@ interface ILBFactory {
|
|||
/**
|
||||
* Prepare all currently tracked (instantiated) load balancers for shutdown
|
||||
*
|
||||
* @param int $mode One of the class SHUTDOWN_* constants
|
||||
* @param int $flags Bit field of ILBFactory::SHUTDOWN_* constants
|
||||
* @param callable|null $workCallback Work to mask ChronologyProtector writes
|
||||
* @param int|null &$cpIndex Position key write counter for ChronologyProtector
|
||||
* @param string|null &$cpClientId Client ID hash for ChronologyProtector
|
||||
* @param int|null &$cpIndex Position key write counter for ChronologyProtector [returned]
|
||||
* @param string|null &$cpClientId Client ID hash for ChronologyProtector [returned]
|
||||
*/
|
||||
public function shutdown(
|
||||
$mode = self::SHUTDOWN_CHRONPROT_SYNC,
|
||||
$flags = self::SHUTDOWN_NORMAL,
|
||||
callable $workCallback = null,
|
||||
&$cpIndex = null,
|
||||
&$cpClientId = null
|
||||
|
|
@ -330,8 +328,10 @@ interface ILBFactory {
|
|||
public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] );
|
||||
|
||||
/**
|
||||
* Get the UNIX timestamp when the client last touched the DB, if they did so recently
|
||||
*
|
||||
* @param DatabaseDomain|string|bool $domain Domain ID, or false for the current domain
|
||||
* @return float|bool UNIX timestamp when client last touched the DB or false if not recent
|
||||
* @return float|false UNIX timestamp; false if not recent or on record
|
||||
*/
|
||||
public function getChronologyProtectorTouched( $domain = false );
|
||||
|
||||
|
|
|
|||
|
|
@ -62,9 +62,9 @@ abstract class LBFactory implements ILBFactory {
|
|||
private $deprecationLogger;
|
||||
|
||||
/** @var BagOStuff */
|
||||
protected $srvCache;
|
||||
protected $cpStash;
|
||||
/** @var BagOStuff */
|
||||
protected $memStash;
|
||||
protected $srvCache;
|
||||
/** @var WANObjectCache */
|
||||
protected $wanCache;
|
||||
|
||||
|
|
@ -134,8 +134,8 @@ abstract class LBFactory implements ILBFactory {
|
|||
$this->readOnlyReason = $conf['readOnlyReason'];
|
||||
}
|
||||
|
||||
$this->cpStash = $conf['cpStash'] ?? new EmptyBagOStuff();
|
||||
$this->srvCache = $conf['srvCache'] ?? new EmptyBagOStuff();
|
||||
$this->memStash = $conf['memStash'] ?? new EmptyBagOStuff();
|
||||
$this->wanCache = $conf['wanCache'] ?? WANObjectCache::newEmpty();
|
||||
|
||||
foreach ( self::$loggerFields as $key ) {
|
||||
|
|
@ -215,7 +215,7 @@ abstract class LBFactory implements ILBFactory {
|
|||
}
|
||||
|
||||
public function shutdown(
|
||||
$mode = self::SHUTDOWN_CHRONPROT_SYNC,
|
||||
$flags = self::SHUTDOWN_NORMAL,
|
||||
callable $workCallback = null,
|
||||
&$cpIndex = null,
|
||||
&$cpClientId = null
|
||||
|
|
@ -224,12 +224,10 @@ abstract class LBFactory implements ILBFactory {
|
|||
$scope = ScopedCallback::newScopedIgnoreUserAbort();
|
||||
|
||||
$chronProt = $this->getChronologyProtector();
|
||||
if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
|
||||
$this->shutdownChronologyProtector( $chronProt, $workCallback, 'sync', $cpIndex );
|
||||
} elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
|
||||
$this->shutdownChronologyProtector( $chronProt, null, 'async', $cpIndex );
|
||||
if ( ( $flags & self::SHUTDOWN_NO_CHRONPROT ) != self::SHUTDOWN_NO_CHRONPROT ) {
|
||||
$this->shutdownChronologyProtector( $chronProt, $workCallback, $cpIndex );
|
||||
$this->replLogger->debug( __METHOD__ . ': finished ChronologyProtector shutdown' );
|
||||
}
|
||||
|
||||
$cpClientId = $chronProt->getClientId();
|
||||
|
||||
$this->commitMasterChanges( __METHOD__ ); // sanity
|
||||
|
|
@ -554,7 +552,7 @@ abstract class LBFactory implements ILBFactory {
|
|||
}
|
||||
|
||||
$this->chronProt = new ChronologyProtector(
|
||||
$this->memStash,
|
||||
$this->cpStash,
|
||||
[
|
||||
'ip' => $this->requestInfo['IPAddress'],
|
||||
'agent' => $this->requestInfo['UserAgent'],
|
||||
|
|
@ -571,10 +569,10 @@ abstract class LBFactory implements ILBFactory {
|
|||
// Request opted out of using position wait logic. This is useful for requests
|
||||
// done by the job queue or background ETL that do not have a meaningful session.
|
||||
$this->chronProt->setWaitEnabled( false );
|
||||
} elseif ( $this->memStash instanceof EmptyBagOStuff ) {
|
||||
} elseif ( $this->cpStash instanceof EmptyBagOStuff ) {
|
||||
// No where to store any DB positions and wait for them to appear
|
||||
$this->chronProt->setEnabled( false );
|
||||
$this->replLogger->info( 'Cannot use ChronologyProtector with EmptyBagOStuff' );
|
||||
$this->replLogger->debug( 'Cannot use ChronologyProtector with EmptyBagOStuff' );
|
||||
}
|
||||
|
||||
$this->replLogger->debug(
|
||||
|
|
@ -590,27 +588,25 @@ abstract class LBFactory implements ILBFactory {
|
|||
*
|
||||
* @param ChronologyProtector $cp
|
||||
* @param callable|null $workCallback Work to do instead of waiting on syncing positions
|
||||
* @param string $mode One of (sync, async); whether to wait on remote datacenters
|
||||
* @param int|null &$cpIndex DB position key write counter; incremented on update
|
||||
* @param int|null &$cpIndex DB position key write counter; incremented on update [returned]
|
||||
*/
|
||||
protected function shutdownChronologyProtector(
|
||||
ChronologyProtector $cp, $workCallback, $mode, &$cpIndex = null
|
||||
ChronologyProtector $cp, $workCallback, &$cpIndex = null
|
||||
) {
|
||||
// Record all the master positions needed
|
||||
// Remark all of the relevant DB master positions
|
||||
$this->forEachLB( static function ( ILoadBalancer $lb ) use ( $cp ) {
|
||||
$cp->storeSessionReplicationPosition( $lb );
|
||||
$cp->stageSessionReplicationPosition( $lb );
|
||||
} );
|
||||
// Write them to the persistent stash. Try to do something useful by running $work
|
||||
// while ChronologyProtector waits for the stash write to replicate to all DCs.
|
||||
$unsavedPositions = $cp->shutdown( $workCallback, $mode, $cpIndex );
|
||||
// Write the positions to the persistent stash
|
||||
$unsavedPositions = $cp->shutdown( $cpIndex );
|
||||
if ( $unsavedPositions && $workCallback ) {
|
||||
// Invoke callback in case it did not cache the result yet
|
||||
$workCallback(); // work now to block for less time in waitForAll()
|
||||
$workCallback();
|
||||
}
|
||||
// If the positions failed to write to the stash, at least wait on local datacenter
|
||||
// replica DBs to catch up before responding. Even if there are several DCs, this increases
|
||||
// the chance that the user will see their own changes immediately afterwards. As long
|
||||
// as the sticky DC cookie applies (same domain), this is not even an issue.
|
||||
// If the positions failed to write to the stash, then wait on the local datacenter
|
||||
// replica DBs to catch up before sending an HTTP response. As long as the request that
|
||||
// caused such DB writes occurred in the master datacenter, and clients are temporarily
|
||||
// pinned to the master datacenter after causing DB writes, then this should suffice.
|
||||
$this->forEachLB( static function ( ILoadBalancer $lb ) use ( $unsavedPositions ) {
|
||||
$masterName = $lb->getServerName( $lb->getWriterIndex() );
|
||||
if ( isset( $unsavedPositions[$masterName] ) ) {
|
||||
|
|
@ -814,4 +810,12 @@ abstract class LBFactory implements ILBFactory {
|
|||
public function __destruct() {
|
||||
$this->destroy();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param float|null &$time Mock UNIX timestamp for testing
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public function setMockTime( &$time ) {
|
||||
$this->getChronologyProtector()->setMockTime( $time );
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,10 +37,6 @@ use MediaWiki\MediaWikiServices;
|
|||
* A place to store lightweight data that is not canonically
|
||||
* stored anywhere else (e.g. a "hoard" of objects).
|
||||
*
|
||||
* The former should always use strongly consistent stores, so callers don't
|
||||
* have to deal with stale reads. The latter may be eventually consistent, but
|
||||
* callers can use BagOStuff:READ_LATEST to see the latest available data.
|
||||
*
|
||||
* Primary entry points:
|
||||
*
|
||||
* - ObjectCache::getLocalServerInstance( $fallbackType )
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ class TestSetup {
|
|||
*/
|
||||
public static function applyInitialConfig() {
|
||||
global $wgMainCacheType, $wgMessageCacheType, $wgParserCacheType, $wgMainWANCache, $wgSessionCacheType;
|
||||
global $wgMainStash;
|
||||
global $wgMainStash, $wgChronologyProtectorStash;
|
||||
global $wgObjectCaches;
|
||||
global $wgLanguageConverterCacheType, $wgUseDatabaseMessages;
|
||||
global $wgLocaltimezone, $wgLocalisationCacheConf;
|
||||
|
|
@ -59,6 +59,7 @@ class TestSetup {
|
|||
$wgLanguageConverterCacheType = 'hash';
|
||||
// Uses db-replicated in DefaultSettings
|
||||
$wgMainStash = 'hash';
|
||||
$wgChronologyProtectorStash = 'hash';
|
||||
// Use hash instead of db
|
||||
$wgObjectCaches['db-replicated'] = $wgObjectCaches['hash'];
|
||||
// Use memory job queue
|
||||
|
|
|
|||
|
|
@ -338,10 +338,10 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
|
|||
$cp->applySessionReplicationPosition( $lb1 );
|
||||
$cp->applySessionReplicationPosition( $lb2 );
|
||||
// Record positions in stash on first HTTP request end
|
||||
$cp->storeSessionReplicationPosition( $lb1 );
|
||||
$cp->storeSessionReplicationPosition( $lb2 );
|
||||
$cp->stageSessionReplicationPosition( $lb1 );
|
||||
$cp->stageSessionReplicationPosition( $lb2 );
|
||||
$cpIndex = null;
|
||||
$cp->shutdown( null, 'sync', $cpIndex );
|
||||
$cp->shutdown( $cpIndex );
|
||||
|
||||
$this->assertSame( 1, $cpIndex, "CP write index set" );
|
||||
|
||||
|
|
@ -381,10 +381,10 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
|
|||
$cp->applySessionReplicationPosition( $lb1 );
|
||||
$cp->applySessionReplicationPosition( $lb2 );
|
||||
// Shutdown (nothing to record)
|
||||
$cp->storeSessionReplicationPosition( $lb1 );
|
||||
$cp->storeSessionReplicationPosition( $lb2 );
|
||||
$cp->stageSessionReplicationPosition( $lb1 );
|
||||
$cp->stageSessionReplicationPosition( $lb2 );
|
||||
$cpIndex = null;
|
||||
$cp->shutdown( null, 'sync', $cpIndex );
|
||||
$cp->shutdown( $cpIndex );
|
||||
|
||||
$this->assertNull( $cpIndex, "CP write index retained" );
|
||||
|
||||
|
|
@ -809,13 +809,27 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
|
|||
public function testGetChronologyProtectorTouched() {
|
||||
$store = new HashBagOStuff;
|
||||
$lbFactory = $this->newLBFactoryMulti( [
|
||||
'memStash' => $store
|
||||
'cpStash' => $store,
|
||||
'cliMode' => false
|
||||
] );
|
||||
$lbFactory->setRequestInfo( [ 'ChronologyClientId' => 'ii' ] );
|
||||
$key = $store->makeGlobalKey( ChronologyProtector::class,
|
||||
'mtime', 'ii', 'test-db1' );
|
||||
$store->set( $key, 2 );
|
||||
|
||||
$mockWallClock = 1549343530.2053;
|
||||
$priorTime = $mockWallClock; // reference time
|
||||
$lbFactory->setMockTime( $mockWallClock );
|
||||
|
||||
$key = $store->makeGlobalKey( ChronologyProtector::class, 'ii', 'v2' );
|
||||
$store->set(
|
||||
$key,
|
||||
[
|
||||
'positions' => [],
|
||||
'timestamps' => [ $lbFactory::CLUSTER_MAIN_DEFAULT => $priorTime ]
|
||||
],
|
||||
3600
|
||||
);
|
||||
|
||||
$mockWallClock += 1.0;
|
||||
$touched = $lbFactory->getChronologyProtectorTouched();
|
||||
$this->assertEquals( 2, $touched );
|
||||
$this->assertEquals( $priorTime, $touched );
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue