diff --git a/includes/libs/rdbms/ChronologyProtector.php b/includes/libs/rdbms/ChronologyProtector.php index 2041c4b2866..5e299fe0d18 100644 --- a/includes/libs/rdbms/ChronologyProtector.php +++ b/includes/libs/rdbms/ChronologyProtector.php @@ -261,7 +261,7 @@ class ChronologyProtector implements LoggerAwareInterface { } /** - * Apply client "session consistency" replication position to a new ILoadBalancer + * Yield client "session consistency" replication position for a new ILoadBalancer * * If the stash has a previous primary position recorded, this will try to make * sure that the next query to a replica server of that primary will see changes up @@ -271,11 +271,11 @@ class ChronologyProtector implements LoggerAwareInterface { * @internal This method should only be called from LBFactory. * * @param ILoadBalancer $lb - * @return void + * @return DBPrimaryPos|null */ - public function applySessionReplicationPosition( ILoadBalancer $lb ) { + public function yieldSessionPrimaryPos( ILoadBalancer $lb ) { if ( !$this->enabled || !$this->positionWaitsEnabled ) { - return; + return null; } $cluster = $lb->getClusterName(); @@ -284,10 +284,11 @@ class ChronologyProtector implements LoggerAwareInterface { $pos = $this->getStartupSessionPositions()[$primaryName] ?? null; if ( $pos instanceof DBPrimaryPos ) { $this->logger->debug( __METHOD__ . ": $cluster ($primaryName) position is '$pos'" ); - $lb->waitFor( $pos ); } else { $this->logger->debug( __METHOD__ . ": $cluster ($primaryName) has no position" ); } + + return $pos; } /** @@ -301,7 +302,7 @@ class ChronologyProtector implements LoggerAwareInterface { * @param ILoadBalancer $lb * @return void */ - public function stageSessionReplicationPosition( ILoadBalancer $lb ) { + public function stageSessionPrimaryPos( ILoadBalancer $lb ) { if ( !$this->enabled || !$lb->hasOrMadeRecentPrimaryChanges( INF ) ) { return; } diff --git a/includes/libs/rdbms/lbfactory/LBFactory.php b/includes/libs/rdbms/lbfactory/LBFactory.php index 83aca100092..293fd897ba0 100644 --- a/includes/libs/rdbms/lbfactory/LBFactory.php +++ b/includes/libs/rdbms/lbfactory/LBFactory.php @@ -679,7 +679,7 @@ abstract class LBFactory implements ILBFactory { ) { // Remark all of the relevant DB primary positions foreach ( $this->getLBsForOwner() as $lb ) { - $cp->stageSessionReplicationPosition( $lb ); + $cp->stageSessionPrimaryPos( $lb ); } // Write the positions to the persistent stash $cp->persistSessionReplicationPositions( $cpIndex ); @@ -717,7 +717,7 @@ abstract class LBFactory implements ILBFactory { 'chronologyCallback' => function ( ILoadBalancer $lb ) { // Defer ChronologyProtector construction in case setRequestInfo() ends up // being called later (but before the first connection attempt) (T192611) - $this->getChronologyProtector()->applySessionReplicationPosition( $lb ); + return $this->getChronologyProtector()->yieldSessionPrimaryPos( $lb ); }, 'roundStage' => $initStage, 'criticalSectionProvider' => $this->csProvider diff --git a/includes/libs/rdbms/loadbalancer/ILoadBalancer.php b/includes/libs/rdbms/loadbalancer/ILoadBalancer.php index 218dbf0003d..de74ee65de3 100644 --- a/includes/libs/rdbms/loadbalancer/ILoadBalancer.php +++ b/includes/libs/rdbms/loadbalancer/ILoadBalancer.php @@ -225,9 +225,9 @@ interface ILoadBalancer { * will return true. This is useful for discouraging clients from taking further actions * if session consistency could not be maintained with respect to their last actions. * - * @param DBPrimaryPos|false $pos Primary position or false + * @param DBPrimaryPos $pos Primary position */ - public function waitFor( $pos ); + public function waitFor( DBPrimaryPos $pos ); /** * Set the primary wait position and wait for ALL replica DBs to catch up to it @@ -235,11 +235,11 @@ interface ILoadBalancer { * This method is only intended for use a throttling mechanism for high-volume updates. * Unlike waitFor(), failure does not effect laggedReplicaUsed(). * - * @param DBPrimaryPos|false $pos Primary position or false + * @param DBPrimaryPos $pos Primary position * @param int|null $timeout Max seconds to wait; default is mWaitTimeout * @return bool Success (able to connect and no timeouts reached) */ - public function waitForAll( $pos, $timeout = null ); + public function waitForAll( DBPrimaryPos $pos, $timeout = null ); /** * Get an existing DB handle to the given server index (on any domain) @@ -480,6 +480,7 @@ interface ILoadBalancer { * This can be useful for implementing session consistency, where the session * will be resumed across multiple HTTP requests or CLI script instances. * + * @internal For use by Rdbms classes only * @return DBPrimaryPos|false Replication position or false if not applicable * @since 1.34 */ diff --git a/includes/libs/rdbms/loadbalancer/ILoadBalancerForOwner.php b/includes/libs/rdbms/loadbalancer/ILoadBalancerForOwner.php index b6e1cffdfda..949b46e3b73 100644 --- a/includes/libs/rdbms/loadbalancer/ILoadBalancerForOwner.php +++ b/includes/libs/rdbms/loadbalancer/ILoadBalancerForOwner.php @@ -52,7 +52,9 @@ interface ILoadBalancerForOwner extends ILoadBalancer { * - srvCache : BagOStuff object for server cache [optional] * - wanCache : WANObjectCache object [optional] * - databaseFactory: DatabaseFactory object [optional] - * - chronologyCallback: Callback to run before the first connection attempt [optional] + * - chronologyCallback: Callback to run before the first connection attempt. + * It takes this ILoadBalancerForOwner instance and yields the relevant DBPrimaryPos + * for session (null if not applicable). [optional] * - defaultGroup: Default query group; the generic group if not specified [optional] * - hostname : The name of the current server [optional] * - cliMode: Whether the execution context is a CLI script [optional] diff --git a/includes/libs/rdbms/loadbalancer/LoadBalancer.php b/includes/libs/rdbms/loadbalancer/LoadBalancer.php index 66b3556086a..83348b66764 100644 --- a/includes/libs/rdbms/loadbalancer/LoadBalancer.php +++ b/includes/libs/rdbms/loadbalancer/LoadBalancer.php @@ -107,7 +107,7 @@ class LoadBalancer implements ILoadBalancerForOwner { private $trxRoundStage = self::ROUND_CURSORY; /** @var int[] The group replica server indexes keyed by group */ private $readIndexByGroup = []; - /** @var DBPrimaryPos|false Replication sync position or false if not set */ + /** @var DBPrimaryPos|null Replication sync position or false if not set */ private $waitForPos; /** @var bool Whether a lagged replica DB was used */ private $laggedReplicaMode = false; @@ -594,7 +594,7 @@ class LoadBalancer implements ILoadBalancerForOwner { return $i; } - public function waitFor( $pos ) { + public function waitFor( DBPrimaryPos $pos ) { $oldPos = $this->waitForPos; try { $this->waitForPos = $pos; @@ -609,16 +609,13 @@ class LoadBalancer implements ILoadBalancerForOwner { } } finally { // Restore the older position if it was higher since this is used for lag-protection - if ( !$oldPos ) { - return; - } - if ( !$this->waitForPos || $oldPos->hasReached( $this->waitForPos ) ) { - $this->waitForPos = $oldPos; + if ( $oldPos ) { + $this->setSessionPrimaryPosIfHigher( $oldPos ); } } } - public function waitForAll( $pos, $timeout = null ) { + public function waitForAll( DBPrimaryPos $pos, $timeout = null ) { $timeout = $timeout ?: self::MAX_WAIT_DEFAULT; $oldPos = $this->waitForPos; @@ -658,6 +655,17 @@ class LoadBalancer implements ILoadBalancerForOwner { return false; } + /** + * Update the session "waitForPos" if the given position is newer + * + * @param DBPrimaryPos $pos + */ + private function setSessionPrimaryPosIfHigher( DBPrimaryPos $pos ) { + if ( !$this->waitForPos || $pos->hasReached( $this->waitForPos ) ) { + $this->waitForPos = $pos; + } + } + public function getAnyOpenConnection( $i, $flags = 0 ) { $i = ( $i === self::DB_PRIMARY ) ? $this->getWriterIndex() : $i; @@ -1217,8 +1225,11 @@ class LoadBalancer implements ILoadBalancerForOwner { private function loadSessionPrimaryPos() { if ( !$this->chronologyCallbackTriggered && $this->chronologyCallback ) { $this->chronologyCallbackTriggered = true; - ( $this->chronologyCallback )( $this ); // generally calls waitFor() + $pos = ( $this->chronologyCallback )( $this ); $this->logger->debug( __METHOD__ . ': executed chronology callback.' ); + if ( $pos ) { + $this->setSessionPrimaryPosIfHigher( $pos ); + } } } diff --git a/tests/phpunit/includes/db/LBFactoryTest.php b/tests/phpunit/includes/db/LBFactoryTest.php index 758cad4f759..93bfdd1bbf8 100644 --- a/tests/phpunit/includes/db/LBFactoryTest.php +++ b/tests/phpunit/includes/db/LBFactoryTest.php @@ -306,14 +306,16 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase { $mockDB2->expects( $this->once() )->method( 'lastDoneWrites' ); // Nothing to wait for on first HTTP request start - $cp->applySessionReplicationPosition( $lb1 ); - $cp->applySessionReplicationPosition( $lb2 ); + $sPos1 = $cp->yieldSessionPrimaryPos( $lb1 ); + $sPos2 = $cp->yieldSessionPrimaryPos( $lb2 ); // Record positions in stash on first HTTP request end - $cp->stageSessionReplicationPosition( $lb1 ); - $cp->stageSessionReplicationPosition( $lb2 ); + $cp->stageSessionPrimaryPos( $lb1 ); + $cp->stageSessionPrimaryPos( $lb2 ); $cpIndex = null; $cp->persistSessionReplicationPositions( $cpIndex ); + $this->assertNull( $sPos1 ); + $this->assertNull( $sPos2 ); $this->assertSame( 1, $cpIndex, "CP write index set" ); // (b) Second HTTP request @@ -324,16 +326,12 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase { $lb1->method( 'hasReplicaServers' )->willReturn( true ); $lb1->method( 'hasStreamingReplicaServers' )->willReturn( true ); $lb1->method( 'getServerName' )->with( 0 )->willReturn( 'master1' ); - $lb1->expects( $this->once() ) - ->method( 'waitFor' )->with( $m1Pos ); // Load balancer for primary DB 2 $lb2 = $this->createMock( LoadBalancer::class ); $lb2->method( 'getServerCount' )->willReturn( 2 ); $lb2->method( 'hasReplicaServers' )->willReturn( true ); $lb2->method( 'hasStreamingReplicaServers' )->willReturn( true ); $lb2->method( 'getServerName' )->with( 0 )->willReturn( 'master2' ); - $lb2->expects( $this->once() ) - ->method( 'waitFor' )->with( $m2Pos ); $cp = new ChronologyProtector( $bag, @@ -344,15 +342,19 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase { $cpIndex ); - // Wait for last positions to be reached on second HTTP request start - $cp->applySessionReplicationPosition( $lb1 ); - $cp->applySessionReplicationPosition( $lb2 ); + // Get last positions to be reached on second HTTP request start + $sPos1 = $cp->yieldSessionPrimaryPos( $lb1 ); + $sPos2 = $cp->yieldSessionPrimaryPos( $lb2 ); // Shutdown (nothing to record) - $cp->stageSessionReplicationPosition( $lb1 ); - $cp->stageSessionReplicationPosition( $lb2 ); + $cp->stageSessionPrimaryPos( $lb1 ); + $cp->stageSessionPrimaryPos( $lb2 ); $cpIndex = null; $cp->persistSessionReplicationPositions( $cpIndex ); + $this->assertNotNull( $sPos1 ); + $this->assertNotNull( $sPos2 ); + $this->assertSame( $m1Pos->__toString(), $sPos1->__toString() ); + $this->assertSame( $m2Pos->__toString(), $sPos2->__toString() ); $this->assertNull( $cpIndex, "CP write index retained" ); $this->assertEquals( '45e93a9c215c031d38b7c42d8e4700ca', $cp->getClientId() ); diff --git a/tests/phpunit/unit/includes/libs/rdbms/ChronologyProtectorTest.php b/tests/phpunit/unit/includes/libs/rdbms/ChronologyProtectorTest.php index 811395f7d6d..2aa418d22df 100644 --- a/tests/phpunit/unit/includes/libs/rdbms/ChronologyProtectorTest.php +++ b/tests/phpunit/unit/includes/libs/rdbms/ChronologyProtectorTest.php @@ -22,7 +22,6 @@ */ use Wikimedia\Rdbms\ChronologyProtector; -use Wikimedia\Rdbms\DBPrimaryPos; use Wikimedia\Rdbms\ILoadBalancer; use Wikimedia\Rdbms\MySQLPrimaryPos; @@ -110,23 +109,19 @@ class ChronologyProtectorTest extends PHPUnit\Framework\TestCase { $cp = new ChronologyProtector( $bag, $client, null, $secret ); $clientPostIndex = 0; - $cp->stageSessionReplicationPosition( $lb ); + $cp->stageSessionPrimaryPos( $lb ); $cp->persistSessionReplicationPositions( $clientPostIndex ); // Do it a second time so the values that were written the first // time get read from the cache. $replicationPos = '3-4-5'; $time++; - $cp->stageSessionReplicationPosition( $lb ); + $cp->stageSessionPrimaryPos( $lb ); $cp->persistSessionReplicationPositions( $clientPostIndex ); - $lb->method( 'waitFor' )->willReturnCallback( - function ( DBPrimaryPos $pos ) use ( &$replicationPos, &$time ) { - $this->assertSame( $time, $pos->asOfTime() ); - $this->assertSame( "$replicationPos", "$pos" ); - } - ); - $cp->applySessionReplicationPosition( $lb ); + $waitForPos = $cp->yieldSessionPrimaryPos( $lb ); + $this->assertNotNull( $waitForPos ); + $this->assertSame( $time, $waitForPos->asOfTime() ); + $this->assertSame( "$replicationPos", "$waitForPos" ); } - }