rdbms: clean up LoadBalancer/ChronologyProtector primary pos methods
In LoadBalancer: * Make the "chronologyCallback" return the DBPrimaryPos and make loadSessionPrimaryPos() set the "waitForPos" more directly by calling setSessionPrimaryPosIfHigher(). Previously, it relied on the callback calling waitFor() to set the position as a side effect. * Remove redundant debug log entry in loadSessionPrimaryPos(). * Use type hints for waitFor()/waitForAll(). All callers already check this for before invocation. * Mark getReplicaResumePos() as @internal. In ChronologyProtector: * Update applySessionReplicationPosition() to return the position. * Rename applySessionReplicationPosition() to yieldSessionPrimaryPos() and stageSessionReplicationPosition() to stageSessionPrimaryPos() for for consistency LoadBalancer/DBPrimaryPos. Bug: T314434 Change-Id: I32aa784b424e7534047c9240e32fa5e0a2ac90b0
This commit is contained in:
parent
50d6048daa
commit
8e1bb24eed
7 changed files with 58 additions and 46 deletions
|
|
@ -261,7 +261,7 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
}
|
||||
|
||||
/**
|
||||
* Apply client "session consistency" replication position to a new ILoadBalancer
|
||||
* Yield client "session consistency" replication position for a new ILoadBalancer
|
||||
*
|
||||
* If the stash has a previous primary position recorded, this will try to make
|
||||
* sure that the next query to a replica server of that primary will see changes up
|
||||
|
|
@ -271,11 +271,11 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
* @internal This method should only be called from LBFactory.
|
||||
*
|
||||
* @param ILoadBalancer $lb
|
||||
* @return void
|
||||
* @return DBPrimaryPos|null
|
||||
*/
|
||||
public function applySessionReplicationPosition( ILoadBalancer $lb ) {
|
||||
public function yieldSessionPrimaryPos( ILoadBalancer $lb ) {
|
||||
if ( !$this->enabled || !$this->positionWaitsEnabled ) {
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
$cluster = $lb->getClusterName();
|
||||
|
|
@ -284,10 +284,11 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
$pos = $this->getStartupSessionPositions()[$primaryName] ?? null;
|
||||
if ( $pos instanceof DBPrimaryPos ) {
|
||||
$this->logger->debug( __METHOD__ . ": $cluster ($primaryName) position is '$pos'" );
|
||||
$lb->waitFor( $pos );
|
||||
} else {
|
||||
$this->logger->debug( __METHOD__ . ": $cluster ($primaryName) has no position" );
|
||||
}
|
||||
|
||||
return $pos;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -301,7 +302,7 @@ class ChronologyProtector implements LoggerAwareInterface {
|
|||
* @param ILoadBalancer $lb
|
||||
* @return void
|
||||
*/
|
||||
public function stageSessionReplicationPosition( ILoadBalancer $lb ) {
|
||||
public function stageSessionPrimaryPos( ILoadBalancer $lb ) {
|
||||
if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) {
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -679,7 +679,7 @@ abstract class LBFactory implements ILBFactory {
|
|||
) {
|
||||
// Remark all of the relevant DB primary positions
|
||||
foreach ( $this->getLBsForOwner() as $lb ) {
|
||||
$cp->stageSessionReplicationPosition( $lb );
|
||||
$cp->stageSessionPrimaryPos( $lb );
|
||||
}
|
||||
// Write the positions to the persistent stash
|
||||
$cp->persistSessionReplicationPositions( $cpIndex );
|
||||
|
|
@ -717,7 +717,7 @@ abstract class LBFactory implements ILBFactory {
|
|||
'chronologyCallback' => function ( ILoadBalancer $lb ) {
|
||||
// Defer ChronologyProtector construction in case setRequestInfo() ends up
|
||||
// being called later (but before the first connection attempt) (T192611)
|
||||
$this->getChronologyProtector()->applySessionReplicationPosition( $lb );
|
||||
return $this->getChronologyProtector()->yieldSessionPrimaryPos( $lb );
|
||||
},
|
||||
'roundStage' => $initStage,
|
||||
'criticalSectionProvider' => $this->csProvider
|
||||
|
|
|
|||
|
|
@ -225,9 +225,9 @@ interface ILoadBalancer {
|
|||
* will return true. This is useful for discouraging clients from taking further actions
|
||||
* if session consistency could not be maintained with respect to their last actions.
|
||||
*
|
||||
* @param DBPrimaryPos|false $pos Primary position or false
|
||||
* @param DBPrimaryPos $pos Primary position
|
||||
*/
|
||||
public function waitFor( $pos );
|
||||
public function waitFor( DBPrimaryPos $pos );
|
||||
|
||||
/**
|
||||
* Set the primary wait position and wait for ALL replica DBs to catch up to it
|
||||
|
|
@ -235,11 +235,11 @@ interface ILoadBalancer {
|
|||
* This method is only intended for use a throttling mechanism for high-volume updates.
|
||||
* Unlike waitFor(), failure does not effect laggedReplicaUsed().
|
||||
*
|
||||
* @param DBPrimaryPos|false $pos Primary position or false
|
||||
* @param DBPrimaryPos $pos Primary position
|
||||
* @param int|null $timeout Max seconds to wait; default is mWaitTimeout
|
||||
* @return bool Success (able to connect and no timeouts reached)
|
||||
*/
|
||||
public function waitForAll( $pos, $timeout = null );
|
||||
public function waitForAll( DBPrimaryPos $pos, $timeout = null );
|
||||
|
||||
/**
|
||||
* Get an existing DB handle to the given server index (on any domain)
|
||||
|
|
@ -480,6 +480,7 @@ interface ILoadBalancer {
|
|||
* This can be useful for implementing session consistency, where the session
|
||||
* will be resumed across multiple HTTP requests or CLI script instances.
|
||||
*
|
||||
* @internal For use by Rdbms classes only
|
||||
* @return DBPrimaryPos|false Replication position or false if not applicable
|
||||
* @since 1.34
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -52,7 +52,9 @@ interface ILoadBalancerForOwner extends ILoadBalancer {
|
|||
* - srvCache : BagOStuff object for server cache [optional]
|
||||
* - wanCache : WANObjectCache object [optional]
|
||||
* - databaseFactory: DatabaseFactory object [optional]
|
||||
* - chronologyCallback: Callback to run before the first connection attempt [optional]
|
||||
* - chronologyCallback: Callback to run before the first connection attempt.
|
||||
* It takes this ILoadBalancerForOwner instance and yields the relevant DBPrimaryPos
|
||||
* for session (null if not applicable). [optional]
|
||||
* - defaultGroup: Default query group; the generic group if not specified [optional]
|
||||
* - hostname : The name of the current server [optional]
|
||||
* - cliMode: Whether the execution context is a CLI script [optional]
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
|
|||
private $trxRoundStage = self::ROUND_CURSORY;
|
||||
/** @var int[] The group replica server indexes keyed by group */
|
||||
private $readIndexByGroup = [];
|
||||
/** @var DBPrimaryPos|false Replication sync position or false if not set */
|
||||
/** @var DBPrimaryPos|null Replication sync position or false if not set */
|
||||
private $waitForPos;
|
||||
/** @var bool Whether a lagged replica DB was used */
|
||||
private $laggedReplicaMode = false;
|
||||
|
|
@ -594,7 +594,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
|
|||
return $i;
|
||||
}
|
||||
|
||||
public function waitFor( $pos ) {
|
||||
public function waitFor( DBPrimaryPos $pos ) {
|
||||
$oldPos = $this->waitForPos;
|
||||
try {
|
||||
$this->waitForPos = $pos;
|
||||
|
|
@ -609,16 +609,13 @@ class LoadBalancer implements ILoadBalancerForOwner {
|
|||
}
|
||||
} finally {
|
||||
// Restore the older position if it was higher since this is used for lag-protection
|
||||
if ( !$oldPos ) {
|
||||
return;
|
||||
}
|
||||
if ( !$this->waitForPos || $oldPos->hasReached( $this->waitForPos ) ) {
|
||||
$this->waitForPos = $oldPos;
|
||||
if ( $oldPos ) {
|
||||
$this->setSessionPrimaryPosIfHigher( $oldPos );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function waitForAll( $pos, $timeout = null ) {
|
||||
public function waitForAll( DBPrimaryPos $pos, $timeout = null ) {
|
||||
$timeout = $timeout ?: self::MAX_WAIT_DEFAULT;
|
||||
|
||||
$oldPos = $this->waitForPos;
|
||||
|
|
@ -658,6 +655,17 @@ class LoadBalancer implements ILoadBalancerForOwner {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the session "waitForPos" if the given position is newer
|
||||
*
|
||||
* @param DBPrimaryPos $pos
|
||||
*/
|
||||
private function setSessionPrimaryPosIfHigher( DBPrimaryPos $pos ) {
|
||||
if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) {
|
||||
$this->waitForPos = $pos;
|
||||
}
|
||||
}
|
||||
|
||||
public function getAnyOpenConnection( $i, $flags = 0 ) {
|
||||
$i = ( $i === self::DB_PRIMARY ) ? $this->getWriterIndex() : $i;
|
||||
|
||||
|
|
@ -1217,8 +1225,11 @@ class LoadBalancer implements ILoadBalancerForOwner {
|
|||
private function loadSessionPrimaryPos() {
|
||||
if ( !$this->chronologyCallbackTriggered && $this->chronologyCallback ) {
|
||||
$this->chronologyCallbackTriggered = true;
|
||||
( $this->chronologyCallback )( $this ); // generally calls waitFor()
|
||||
$pos = ( $this->chronologyCallback )( $this );
|
||||
$this->logger->debug( __METHOD__ . ': executed chronology callback.' );
|
||||
if ( $pos ) {
|
||||
$this->setSessionPrimaryPosIfHigher( $pos );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -306,14 +306,16 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
|
|||
$mockDB2->expects( $this->once() )->method( 'lastDoneWrites' );
|
||||
|
||||
// Nothing to wait for on first HTTP request start
|
||||
$cp->applySessionReplicationPosition( $lb1 );
|
||||
$cp->applySessionReplicationPosition( $lb2 );
|
||||
$sPos1 = $cp->yieldSessionPrimaryPos( $lb1 );
|
||||
$sPos2 = $cp->yieldSessionPrimaryPos( $lb2 );
|
||||
// Record positions in stash on first HTTP request end
|
||||
$cp->stageSessionReplicationPosition( $lb1 );
|
||||
$cp->stageSessionReplicationPosition( $lb2 );
|
||||
$cp->stageSessionPrimaryPos( $lb1 );
|
||||
$cp->stageSessionPrimaryPos( $lb2 );
|
||||
$cpIndex = null;
|
||||
$cp->persistSessionReplicationPositions( $cpIndex );
|
||||
|
||||
$this->assertNull( $sPos1 );
|
||||
$this->assertNull( $sPos2 );
|
||||
$this->assertSame( 1, $cpIndex, "CP write index set" );
|
||||
|
||||
// (b) Second HTTP request
|
||||
|
|
@ -324,16 +326,12 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
|
|||
$lb1->method( 'hasReplicaServers' )->willReturn( true );
|
||||
$lb1->method( 'hasStreamingReplicaServers' )->willReturn( true );
|
||||
$lb1->method( 'getServerName' )->with( 0 )->willReturn( 'master1' );
|
||||
$lb1->expects( $this->once() )
|
||||
->method( 'waitFor' )->with( $m1Pos );
|
||||
// Load balancer for primary DB 2
|
||||
$lb2 = $this->createMock( LoadBalancer::class );
|
||||
$lb2->method( 'getServerCount' )->willReturn( 2 );
|
||||
$lb2->method( 'hasReplicaServers' )->willReturn( true );
|
||||
$lb2->method( 'hasStreamingReplicaServers' )->willReturn( true );
|
||||
$lb2->method( 'getServerName' )->with( 0 )->willReturn( 'master2' );
|
||||
$lb2->expects( $this->once() )
|
||||
->method( 'waitFor' )->with( $m2Pos );
|
||||
|
||||
$cp = new ChronologyProtector(
|
||||
$bag,
|
||||
|
|
@ -344,15 +342,19 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
|
|||
$cpIndex
|
||||
);
|
||||
|
||||
// Wait for last positions to be reached on second HTTP request start
|
||||
$cp->applySessionReplicationPosition( $lb1 );
|
||||
$cp->applySessionReplicationPosition( $lb2 );
|
||||
// Get last positions to be reached on second HTTP request start
|
||||
$sPos1 = $cp->yieldSessionPrimaryPos( $lb1 );
|
||||
$sPos2 = $cp->yieldSessionPrimaryPos( $lb2 );
|
||||
// Shutdown (nothing to record)
|
||||
$cp->stageSessionReplicationPosition( $lb1 );
|
||||
$cp->stageSessionReplicationPosition( $lb2 );
|
||||
$cp->stageSessionPrimaryPos( $lb1 );
|
||||
$cp->stageSessionPrimaryPos( $lb2 );
|
||||
$cpIndex = null;
|
||||
$cp->persistSessionReplicationPositions( $cpIndex );
|
||||
|
||||
$this->assertNotNull( $sPos1 );
|
||||
$this->assertNotNull( $sPos2 );
|
||||
$this->assertSame( $m1Pos->__toString(), $sPos1->__toString() );
|
||||
$this->assertSame( $m2Pos->__toString(), $sPos2->__toString() );
|
||||
$this->assertNull( $cpIndex, "CP write index retained" );
|
||||
|
||||
$this->assertEquals( '45e93a9c215c031d38b7c42d8e4700ca', $cp->getClientId() );
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@
|
|||
*/
|
||||
|
||||
use Wikimedia\Rdbms\ChronologyProtector;
|
||||
use Wikimedia\Rdbms\DBPrimaryPos;
|
||||
use Wikimedia\Rdbms\ILoadBalancer;
|
||||
use Wikimedia\Rdbms\MySQLPrimaryPos;
|
||||
|
||||
|
|
@ -110,23 +109,19 @@ class ChronologyProtectorTest extends PHPUnit\Framework\TestCase {
|
|||
$cp = new ChronologyProtector( $bag, $client, null, $secret );
|
||||
|
||||
$clientPostIndex = 0;
|
||||
$cp->stageSessionReplicationPosition( $lb );
|
||||
$cp->stageSessionPrimaryPos( $lb );
|
||||
$cp->persistSessionReplicationPositions( $clientPostIndex );
|
||||
|
||||
// Do it a second time so the values that were written the first
|
||||
// time get read from the cache.
|
||||
$replicationPos = '3-4-5';
|
||||
$time++;
|
||||
$cp->stageSessionReplicationPosition( $lb );
|
||||
$cp->stageSessionPrimaryPos( $lb );
|
||||
$cp->persistSessionReplicationPositions( $clientPostIndex );
|
||||
|
||||
$lb->method( 'waitFor' )->willReturnCallback(
|
||||
function ( DBPrimaryPos $pos ) use ( &$replicationPos, &$time ) {
|
||||
$this->assertSame( $time, $pos->asOfTime() );
|
||||
$this->assertSame( "$replicationPos", "$pos" );
|
||||
}
|
||||
);
|
||||
$cp->applySessionReplicationPosition( $lb );
|
||||
$waitForPos = $cp->yieldSessionPrimaryPos( $lb );
|
||||
$this->assertNotNull( $waitForPos );
|
||||
$this->assertSame( $time, $waitForPos->asOfTime() );
|
||||
$this->assertSame( "$replicationPos", "$waitForPos" );
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue