deferred: Decouple DeferredUpdates from MediaWikiServices

* Create task-specific methods with simple defaults that require no
  mocking or stubbing of any kind, as used by the pure unit tests
  where service container (and by extent, storage services) are
  disabled.

* Remove all use of global variables, LBFactory, JobQueue,
  StatsdFactory, and RequestContext.

Bug: T265749
Change-Id: If85c448d2d1b806e70f641f06263680d49c6eeec
This commit is contained in:
Timo Tijhof 2023-08-25 00:38:02 +01:00
parent 47cdcbac8b
commit e56552557f
8 changed files with 220 additions and 107 deletions

View file

@ -344,6 +344,7 @@ $wgAutoloadLocalClasses = [
'DeferrableUpdate' => __DIR__ . '/includes/deferred/DeferrableUpdate.php',
'DeferredUpdates' => __DIR__ . '/includes/deferred/DeferredUpdates.php',
'DeferredUpdatesScope' => __DIR__ . '/includes/deferred/DeferredUpdatesScope.php',
'DeferredUpdatesScopeMediaWikiStack' => __DIR__ . '/includes/deferred/DeferredUpdatesScopeMediaWikiStack.php',
'DeferredUpdatesScopeStack' => __DIR__ . '/includes/deferred/DeferredUpdatesScopeStack.php',
'Deflate' => __DIR__ . '/includes/libs/Deflate.php',
'DeleteAction' => __DIR__ . '/includes/actions/DeleteAction.php',

View file

@ -1827,7 +1827,7 @@ class DerivedPageDataUpdater implements IDBAccessObject, LoggerAwareInterface, P
$update->setCause( $causeAction, $causeAgent );
if ( $options['defer'] === false ) {
DeferredUpdates::attemptUpdate( $update, $this->loadbalancerFactory );
DeferredUpdates::attemptUpdate( $update );
} else {
DeferredUpdates::addUpdate( $update, $options['defer'] );
}

View file

@ -19,10 +19,7 @@
*/
use MediaWiki\Logger\LoggerFactory;
use MediaWiki\MediaWikiServices;
use Wikimedia\Rdbms\DBTransactionError;
use Wikimedia\Rdbms\IDatabase;
use Wikimedia\Rdbms\ILBFactory;
use Wikimedia\ScopedCallback;
/**
@ -119,13 +116,21 @@ class DeferredUpdates {
* @return DeferredUpdatesScopeStack
*/
private static function getScopeStack(): DeferredUpdatesScopeStack {
if ( self::$scopeStack === null ) {
self::$scopeStack = new DeferredUpdatesScopeStack();
}
self::$scopeStack ??= new DeferredUpdatesScopeMediaWikiStack();
return self::$scopeStack;
}
/**
* @param DeferredUpdatesScopeStack $scopeStack
* @internal Only for use in tests.
*/
public static function setScopeStack( DeferredUpdatesScopeStack $scopeStack ): void {
if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
throw new LogicException( 'Cannot reconfigure DeferredUpdates outside tests' );
}
self::$scopeStack = $scopeStack;
}
/**
* Add an update to the pending update queue for execution at the appropriate time
*
@ -147,12 +152,8 @@ class DeferredUpdates {
* @since 1.28 Added the $stage parameter
*/
public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
global $wgCommandLineMode;
self::getScopeStack()->current()->addUpdate( $update, $stage );
if ( $wgCommandLineMode ) {
self::tryOpportunisticExecute();
}
self::tryOpportunisticExecute();
}
/**
@ -174,24 +175,13 @@ class DeferredUpdates {
* a job in the job queue system if possible (e.g. implements EnqueueableDataUpdate)
*
* @param DeferrableUpdate $update
* @param string $httpMethod
* @return Throwable|null
*/
private static function run(
DeferrableUpdate $update,
$httpMethod
): ?Throwable {
private static function run( DeferrableUpdate $update ): ?Throwable {
$logger = LoggerFactory::getInstance( 'DeferredUpdates' );
$services = MediaWikiServices::getInstance();
$stats = $services->getStatsdDataFactory();
$lbFactory = $services->getDBLoadBalancerFactory();
$jobQueueGroupFactory = $services->getJobQueueGroupFactory();
$suffix = $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '';
$type = get_class( $update ) . $suffix;
$stats->increment( "deferred_updates.$httpMethod.$type" );
$type = get_class( $update )
. ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' );
$updateId = spl_object_id( $update );
$logger->debug( __METHOD__ . ": started $type #$updateId" );
@ -199,7 +189,7 @@ class DeferredUpdates {
$startTime = microtime( true );
try {
self::attemptUpdate( $update, $lbFactory );
self::attemptUpdate( $update );
} catch ( Throwable $updateException ) {
MWExceptionHandler::logException( $updateException );
$logger->error(
@ -209,7 +199,7 @@ class DeferredUpdates {
'exception' => $updateException,
]
);
$lbFactory->rollbackPrimaryChanges( __METHOD__ );
self::getScopeStack()->onRunUpdateFailed( $update );
} finally {
$walltime = microtime( true ) - $startTime;
$logger->debug( __METHOD__ . ": ended $type #$updateId, processing time: $walltime" );
@ -218,8 +208,7 @@ class DeferredUpdates {
// Try to push the update as a job so it can run later if possible
if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
try {
$spec = $update->getAsJobSpecification();
$jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
self::getScopeStack()->queueDataUpdate( $update );
} catch ( Throwable $jobException ) {
MWExceptionHandler::logException( $jobException );
$logger->error(
@ -229,7 +218,7 @@ class DeferredUpdates {
'exception' => $jobException,
]
);
$lbFactory->rollbackPrimaryChanges( __METHOD__ );
self::getScopeStack()->onRunUpdateFailed( $update );
}
}
@ -255,12 +244,6 @@ class DeferredUpdates {
* (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
*/
public static function doUpdates( $stage = self::ALL ) {
global $wgCommandLineMode;
$httpMethod = $wgCommandLineMode
? 'cli'
: strtolower( RequestContext::getMain()->getRequest()->getMethod() );
/** @var ErrorPageError $guiError First presentable client-level error thrown */
$guiError = null;
/** @var Throwable $exception First of any error thrown */
@ -286,13 +269,11 @@ class DeferredUpdates {
$scope->processUpdates(
$stage,
static function ( DeferrableUpdate $update, $activeStage )
use ( $httpMethod, &$guiError, &$exception )
{
static function ( DeferrableUpdate $update, $activeStage ) use ( &$guiError, &$exception ) {
$scopeStack = self::getScopeStack();
$childScope = $scopeStack->descend( $activeStage, $update );
try {
$e = self::run( $update, $httpMethod );
$e = self::run( $update );
$guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
$exception = $exception ?: $e;
// Any addUpdate() calls between descend() and ascend() used the sub-queue.
@ -302,10 +283,8 @@ class DeferredUpdates {
// queues as appropriate...
$childScope->processUpdates(
$activeStage,
static function ( DeferrableUpdate $subUpdate )
use ( $httpMethod, &$guiError, &$exception )
{
$e = self::run( $subUpdate, $httpMethod );
static function ( DeferrableUpdate $sub ) use ( &$guiError, &$exception ) {
$e = self::run( $sub );
$guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
$exception = $exception ?: $e;
}
@ -331,26 +310,6 @@ class DeferredUpdates {
}
}
/**
* @return bool If a transaction round is active or connection is not ready for commit()
*/
private static function areDatabaseTransactionsActive() {
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
if ( $lbFactory->hasTransactionRound()
|| !$lbFactory->isReadyForRoundOperations()
) {
return true;
}
foreach ( $lbFactory->getAllLBs() as $lb ) {
if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
return true;
}
}
return false;
}
/**
* Consume and execute pending updates now if possible, instead of waiting.
*
@ -396,25 +355,25 @@ class DeferredUpdates {
return false;
}
// Run the updates for this context if they will have outer transaction scope
if ( !self::areDatabaseTransactionsActive() ) {
if ( self::getScopeStack()->allowOpportunisticUpdates() ) {
self::doUpdates( self::ALL );
return true;
}
if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
// There are a large number of pending updates and none of them can run yet.
// The odds of losing updates due to an error increase when executing long queues
// The odds of losing updates due to an error increases when executing long queues
// and when large amounts of time pass while tasks are queued. Mitigate this by
// trying to migrate updates to the job queue system (where applicable).
// trying to eagerly move updates to the JobQueue when possible.
//
// TODO: Do we still need this now maintenance scripts automatically call
// tryOpportunisticExecute from addUpdate, from every commit, and every
// waitForReplication call?
self::getScopeStack()->current()->consumeMatchingUpdates(
self::ALL,
EnqueueableDataUpdate::class,
static function ( EnqueueableDataUpdate $update ) {
$spec = $update->getAsJobSpecification();
$jobQueueGroupFactory = MediaWikiServices::getInstance()->getJobQueueGroupFactory();
$jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
self::getScopeStack()->queueDataUpdate( $update );
}
);
}
@ -496,39 +455,13 @@ class DeferredUpdates {
* transaction round.
*
* @param DeferrableUpdate $update
* @param ILBFactory $lbFactory
* @since 1.34
*/
public static function attemptUpdate( DeferrableUpdate $update, ILBFactory $lbFactory ) {
$ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
if ( !$ticket || $lbFactory->hasTransactionRound() ) {
throw new DBTransactionError( null, "A database transaction round is pending." );
}
public static function attemptUpdate( DeferrableUpdate $update ) {
self::getScopeStack()->onRunUpdateStart( $update );
if ( $update instanceof DataUpdate ) {
$update->setTransactionTicket( $ticket );
}
// Designate $update::doUpdate() as the write round owner
$fnameTrxOwner = ( $update instanceof DeferrableCallback )
? $update->getOrigin()
: get_class( $update ) . '::doUpdate';
// Determine whether the write round will be explicit or implicit
$useExplicitTrxRound = !(
$update instanceof TransactionRoundAwareUpdate &&
$update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
);
// Flush any pending changes left over from an implicit transaction round
if ( $useExplicitTrxRound ) {
$lbFactory->beginPrimaryChanges( $fnameTrxOwner ); // new explicit round
} else {
$lbFactory->commitPrimaryChanges( $fnameTrxOwner ); // new implicit round
}
// Run the update after any stale primary DB view snapshots have been flushed
$update->doUpdate();
// Commit any pending changes from the explicit or implicit transaction round
$lbFactory->commitPrimaryChanges( $fnameTrxOwner );
}
self::getScopeStack()->onRunUpdateEnd( $update );
}
}

View file

@ -0,0 +1,133 @@
<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
use MediaWiki\MediaWikiServices;
use Wikimedia\Rdbms\DBTransactionError;
/**
* This class decouples DeferredUpdates's awareness of MediaWikiServices to ease unit testing.
*
* NOTE: As a process-level utility, it is important that MediaWikiServices::getInstance() is
* referenced explicitly each time, so as to not cache potentially stale references.
* For example after the Installer, or MediaWikiIntegrationTestCase, replace the service container.
*
* @internal For use by DeferredUpdates only
* @since 1.41
*/
class DeferredUpdatesScopeMediaWikiStack extends DeferredUpdatesScopeStack {
private function areDatabaseTransactionsActive(): bool {
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
if ( $lbFactory->hasTransactionRound()
|| !$lbFactory->isReadyForRoundOperations()
) {
return true;
}
foreach ( $lbFactory->getAllLBs() as $lb ) {
if ( $lb->hasPrimaryChanges() || $lb->explicitTrxActive() ) {
return true;
}
}
return false;
}
public function allowOpportunisticUpdates(): bool {
global $wgCommandLineMode;
if ( !$wgCommandLineMode ) {
// In web req
return false;
}
// Run the updates only if they will have outer transaction scope
if ( $this->areDatabaseTransactionsActive() ) {
// transaction round is active or connection is not ready for commit()
return false;
}
return true;
}
public function queueDataUpdate( EnqueueableDataUpdate $update ): void {
$spec = $update->getAsJobSpecification();
$jobQueueGroupFactory = MediaWikiServices::getInstance()->getJobQueueGroupFactory();
$jobQueueGroupFactory->makeJobQueueGroup( $spec['domain'] )->push( $spec['job'] );
}
public function onRunUpdateStart( DeferrableUpdate $update ): void {
global $wgCommandLineMode;
// Increment a counter metric
$type = get_class( $update )
. ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' );
$httpMethod = $wgCommandLineMode ? 'cli' : strtolower( $_SERVER['REQUEST_METHOD'] ?? 'GET' );
$stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
$stats->increment( "deferred_updates.$httpMethod.$type" );
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
if ( !$ticket || $lbFactory->hasTransactionRound() ) {
throw new DBTransactionError( null, "A database transaction round is pending." );
}
if ( $update instanceof DataUpdate ) {
$update->setTransactionTicket( $ticket );
}
// Designate $update::doUpdate() as the write round owner
$fnameTrxOwner = ( $update instanceof DeferrableCallback )
? $update->getOrigin()
: get_class( $update ) . '::doUpdate';
// Determine whether the write round will be explicit or implicit
$useExplicitTrxRound = !(
$update instanceof TransactionRoundAwareUpdate &&
$update->getTransactionRoundRequirement() == $update::TRX_ROUND_ABSENT
);
// Ensure any stale repeatable-read snapshot on the primary DB have been flushed
// before running the update. E.g. left-over from an implicit transaction round
if ( $useExplicitTrxRound ) {
// new explicit round
$lbFactory->beginPrimaryChanges( $fnameTrxOwner );
} else {
// new implicit round
$lbFactory->commitPrimaryChanges( $fnameTrxOwner );
}
}
public function onRunUpdateEnd( DeferrableUpdate $update ): void {
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$fnameTrxOwner = ( $update instanceof DeferrableCallback )
? $update->getOrigin()
: get_class( $update ) . '::doUpdate';
// Commit any pending changes from the explicit or implicit transaction round
$lbFactory->commitPrimaryChanges( $fnameTrxOwner );
}
public function onRunUpdateFailed( DeferrableUpdate $update ): void {
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$lbFactory->rollbackPrimaryChanges( __METHOD__ );
}
}

View file

@ -75,4 +75,48 @@ class DeferredUpdatesScopeStack {
public function getRecursiveDepth() {
return count( $this->stack ) - 1;
}
/**
* Whether DeferredUpdates::addUpdate() may run the update right away
*
* @return bool
*/
public function allowOpportunisticUpdates(): bool {
// Overridden in DeferredUpdatesScopeMediaWikiStack::allowOpportunisticUpdates
return false;
}
/**
* Queue an EnqueueableDataUpdate as a job instead
*
* @see JobQueueGroup::push
* @param EnqueueableDataUpdate $update
*/
public function queueDataUpdate( EnqueueableDataUpdate $update ): void {
throw new LogicException( 'Cannot queue jobs from DeferredUpdates in standalone mode' );
}
/**
* @param DeferrableUpdate $update
*/
public function onRunUpdateStart( DeferrableUpdate $update ): void {
// No-op
// Overridden in DeferredUpdatesScopeMediaWikiStack::onRunUpdateStart
}
/**
* @param DeferrableUpdate $update
*/
public function onRunUpdateEnd( DeferrableUpdate $update ): void {
// No-op
// Overridden in DeferredUpdatesScopeMediaWikiStack::onRunUpdateEnd
}
/**
* @param DeferrableUpdate $update
*/
public function onRunUpdateFailed( DeferrableUpdate $update ): void {
// No-op
// Overridden in DeferredUpdatesScopeMediaWikiStack::onRunUpdateFailed
}
}

View file

@ -100,7 +100,7 @@ class RefreshSecondaryDataUpdate extends DataUpdate
$e = null;
foreach ( $updates as $update ) {
try {
DeferredUpdates::attemptUpdate( $update, $this->lbFactory );
DeferredUpdates::attemptUpdate( $update );
} catch ( Exception $e ) {
// Try as many updates as possible on the first pass
MWExceptionHandler::rollbackPrimaryChangesAndLog( $e );

View file

@ -103,6 +103,8 @@ abstract class MediaWikiUnitTestCase extends TestCase {
$GLOBALS[ $key ] = $value;
}
// Set DeferredUpdates into standalone mode
DeferredUpdates::setScopeStack( new DeferredUpdatesScopeStack() );
MediaWikiServices::disallowGlobalInstanceInUnitTests();
}
@ -160,6 +162,7 @@ abstract class MediaWikiUnitTestCase extends TestCase {
unset( $value );
MediaWikiServices::allowGlobalInstanceAfterUnitTests();
DeferredUpdates::setScopeStack( new DeferredUpdatesScopeMediaWikiStack() );
}
}

View file

@ -356,8 +356,7 @@ class DeferredUpdatesTest extends MediaWikiIntegrationTestCase {
$called = true;
},
$fname
),
$lbFactory
)
);
$this->assertTrue( $called, "Callback ran" );