deferredupdates: support limited DeferredUpdates::doUpdates()

Bug: T249069
Change-Id: I8b9ce062764c7993bf47aeb15f995b4257f24ee0
This commit is contained in:
Aaron Schulz 2020-05-06 07:37:00 -07:00
parent 3963d7256a
commit ba6490aa1e
2 changed files with 153 additions and 43 deletions

View file

@ -55,15 +55,30 @@ use Wikimedia\Rdbms\LoadBalancer;
* for post-send). Updates enqueued *during* doUpdate() of a "top" update go into the "sub-queue"
* for that update. After that method finishes, the sub-queue is run until drained. This continues
* for each top-queue job until the entire top queue is drained. This happens for the pre-send
* top-queue, and later on, the post-send top-queue, in execute().
* top-queue, and later on, the post-send top-queue, in doUpdates().
*
* @since 1.19
*/
class DeferredUpdates {
/** @var DeferrableUpdate[] Updates to be deferred until before request end */
/**
* @var DeferrableUpdate[] Updates to be deferred until just before HTTP response emission.
* Integer-keyed entries form a list of FIFO updates and a string-keyed entries form a map
* of (class => MergeableUpdate) for updates that absorb the work of any already pending
* updates of the same class.
*/
private static $preSendUpdates = [];
/** @var DeferrableUpdate[] Updates to be deferred until after request end */
/**
* @var DeferrableUpdate[] Updates to be deferred until just after HTTP response emission.
* Integer-keyed entries form a list of FIFO updates and a string-keyed entries form a map
* of (class => MergeableUpdate) for updates that absorb the work of any already pending
* updates of the same class.
*/
private static $postSendUpdates = [];
/**
* @var array[] Execution stack of currently running updates
* @phan-var array<array{stage:int,update:DeferrableUpdate,subqueue:DeferrableUpdate[]}>
*/
private static $executionStack = [];
const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
const PRESEND = 1; // for updates that should run before flushing output buffer
@ -71,32 +86,37 @@ class DeferredUpdates {
const BIG_QUEUE_SIZE = 100;
/** @var array|null Information about the current execute() call or null if not running */
private static $executeContext;
/**
* Add an update to the deferred list to be run later by execute()
* Add an update to the deferred update queue for execution at the appropriate time
*
* In CLI mode, callback magic will also be used to run updates when safe
*
* If an update is already in progress, then what happens to this update is as follows:
* - MergeableUpdate instances always go on the top-queue for the specified stage, with
* existing updates melding into the newly added instance at the end of the queue.
* - Non-MergeableUpdate instances with a "defer until" stage at/before the actual run
* stage of the innermost in-progress update go into the sub-queue of that in-progress
* update. They are executed right after the update finishes to maximize isolation.
* - Non-MergeableUpdate instances with a "defer until" stage after the actual run stage
* of the innermost in-progress update go into the normal top-queue for that stage.
*
* @param DeferrableUpdate $update Some object that implements doUpdate()
* @param int $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
*/
public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
global $wgCommandLineMode;
if (
self::$executeContext !== null &&
// @phan-suppress-next-line PhanTypeArraySuspiciousNullable
self::$executeContext['stage'] >= $stage &&
!( $update instanceof MergeableUpdate )
) {
// This is a sub-DeferredUpdate; run it right after its parent update.
// Also, while post-send updates are running, push any "pre-send" jobs to the
// active post-send queue to make sure they get run this round (or at all).
self::$executeContext['subqueue'][] = $update;
// Special handling for updates pushed while another update is in progress
if ( self::$executionStack && !( $update instanceof MergeableUpdate ) ) {
// Get the innermost in-progress update
end( self::$executionStack );
$topStackPos = key( self::$executionStack );
if ( self::$executionStack[$topStackPos]['stage'] >= $stage ) {
// Put this update into the sub-queue of that in-progress update
self::push( self::$executionStack[$topStackPos]['subqueue'], $update );
return;
return;
}
}
if ( $stage === self::PRESEND ) {
@ -129,16 +149,40 @@ class DeferredUpdates {
}
/**
* Do any deferred updates and clear the list
* Consume the list of deferred updates and execute them
*
* If $stage is self::ALL then the queue of PRESEND updates will be resolved,
* followed by the queue of POSTSEND updates
* Note that it is rarely the case that this method should be called outside of a few
* select entry points. For simplicity, that kind of recursion is discouraged. Recursion
* cannot happen if an explicit transaction round is active, which limits usage to updates
* with TRX_ROUND_ABSENT that do not leave open an transactions round of their own during
* the call to this method.
*
* In the less-common case of this being called within an in-progress DeferrableUpdate,
* this will not see any top-queue updates (since they were consumed and are being run
* inside an outer execution loop). In that case, it will instead operate on the sub-queue
* of the innermost in-progress update on the stack.
*
* If $stage is self::ALL then the queue of PRESEND updates will be resolved, followed
* by the queue of POSTSEND updates.
*
* @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"]
* @param int $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27)
*/
public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
$stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
// Special handling for when an in-progress update triggers this method
if ( self::$executionStack ) {
// Run the sub-queue updates for the innermost in-progress update
end( self::$executionStack );
$topStackPos = key( self::$executionStack );
self::handleUpdateQueue(
self::$executionStack[$topStackPos]['subqueue'],
$mode,
$stageEffective
);
return;
}
// For ALL mode, make sure that any PRESEND updates added along the way get run.
// Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
do {
@ -153,7 +197,7 @@ class DeferredUpdates {
}
/**
* @param DeferrableUpdate[] &$queue
* @param DeferrableUpdate[] &$queue Combined FIFO update list and MergeableUpdate map
* @param DeferrableUpdate $update
*/
private static function push( array &$queue, DeferrableUpdate $update ) {
@ -186,7 +230,6 @@ class DeferredUpdates {
* @param int $stage Class constant (PRESEND, POSTSEND) (since 1.28)
* @throws ErrorPageError Happens on top-level calls
* @throws Exception Happens on second-level calls
* @suppress PhanTypeArraySuspiciousNullable False positives
*/
protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
$services = MediaWikiServices::getInstance();
@ -221,23 +264,25 @@ class DeferredUpdates {
}
// Execute all DataUpdate queue followed by the DeferrableUpdate queue...
foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
foreach ( $updateQueue as $du ) {
// Enqueue the task into the job queue system instead if applicable
if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) {
self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
foreach ( $updateQueue as $curUpdate ) {
// Enqueue the update into the job queue system instead if applicable
if ( $mode === 'enqueue' && $curUpdate instanceof EnqueueableDataUpdate ) {
self::jobify( $curUpdate, $lbf, $logger, $stats, $httpMethod );
continue;
}
// Otherwise, execute the task and any subtasks that it spawns
self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
// Otherwise, execute the update, followed by any sub-updates that it spawns
$stackEntry = [ 'stage' => $stage, 'update' => $curUpdate, 'subqueue' => [] ];
$stackKey = count( self::$executionStack );
self::$executionStack[$stackKey] =& $stackEntry;
try {
$e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
$e = self::run( $curUpdate, $lbf, $logger, $stats, $httpMethod );
$guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
$exception = $exception ?: $e;
// Do the subqueue updates for $update until there are none
while ( self::$executeContext['subqueue'] ) {
$duChild = reset( self::$executeContext['subqueue'] );
$firstKey = key( self::$executeContext['subqueue'] );
unset( self::$executeContext['subqueue'][$firstKey] );
while ( $stackEntry['subqueue'] ) {
$duChild = reset( $stackEntry['subqueue'] );
$duChildKey = key( $stackEntry['subqueue'] );
unset( $stackEntry['subqueue'][$duChildKey] );
$e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
$guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
@ -247,7 +292,7 @@ class DeferredUpdates {
// Make sure we always clean up the context.
// Losing updates while rewinding the stack is acceptable,
// losing updates that are added later is not.
self::$executeContext = null;
unset( self::$executionStack[$stackKey] );
}
}
}
@ -271,7 +316,7 @@ class DeferredUpdates {
}
/**
* Run a task and catch/log any throwables
* Run an update, and, if an error was thrown, catch/log it and fallback to the job queue
*
* @param DeferrableUpdate $update
* @param LBFactory $lbFactory
@ -337,7 +382,7 @@ class DeferredUpdates {
}
/**
* Push a task into the job queue system and catch/log any exceptions
* Push a update into the job queue system and catch/log any exceptions
*
* @param EnqueueableDataUpdate $update
* @param LBFactory $lbFactory
@ -424,15 +469,15 @@ class DeferredUpdates {
*
* If there are many deferred updates pending, $mode is 'run', and there
* are still busy LBFactory database handles, then any EnqueueableDataUpdate
* tasks might be enqueued as jobs to be executed later.
* updates might be enqueued as jobs to be executed later.
*
* @param string $mode Use "enqueue" to use the job queue when possible
* @return bool Whether updates were allowed to run
* @since 1.28
*/
public static function tryOpportunisticExecute( $mode = 'run' ) {
// execute() loop is already running
if ( self::$executeContext ) {
// An update is already in progress
if ( self::$executionStack ) {
return false;
}
@ -475,14 +520,28 @@ class DeferredUpdates {
}
/**
* @return int Number of enqueued updates
* Get the number of currently enqueued updates in the top-queues
*
* Calling this while an update is in-progress produces undefined results
*
* @return int
* @since 1.28
*/
public static function pendingUpdatesCount() {
if ( self::$executionStack ) {
throw new LogicException( "Called during execution of a DeferrableUpdate" );
}
return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
}
/**
* Get the list of pending updates in the top-queues
*
* Calling this while an update is in-progress produces undefined results
*
* This method should only be used for unit tests
*
* @param int $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL)
* @return DeferrableUpdate[]
* @since 1.29
@ -495,12 +554,16 @@ class DeferredUpdates {
if ( $stage === self::ALL || $stage === self::POSTSEND ) {
$updates = array_merge( $updates, self::$postSendUpdates );
}
return $updates;
}
/**
* Clear all pending updates without performing them. Generally, you don't
* want or need to call this. Unit tests need it though.
* Clear all pending updates without performing them
*
* Calling this while an update is in-progress produces undefined results
*
* This method should only be used for unit tests
*/
public static function clearPendingUpdates() {
self::$preSendUpdates = [];

View file

@ -396,4 +396,51 @@ class DeferredUpdatesTest extends MediaWikiTestCase {
$this->assertTrue( $called, "Callback ran" );
}
/**
* @covers DeferredUpdates::doUpdates
*/
public function testNestedExecution() {
// No immediate execution
$this->setMwGlobals( 'wgCommandLineMode', false );
$res = null;
$resSub = null;
$resSubSub = null;
$resA = null;
// T249069: TransactionRoundDefiningUpdate => JobRunner => DeferredUpdates::doUpdates()
DeferredUpdates::addUpdate( new TransactionRoundDefiningUpdate(
function () use ( &$res, &$resSub, &$resSubSub, &$resA ) {
$res = 1;
// Add update to subqueue of in-progress top-queue job
DeferredUpdates::addCallableUpdate( function () use ( &$resSub, &$resSubSub ) {
$resSub = 'a';
// Add update to subqueue of in-progress top-queue job (not recursive)
DeferredUpdates::addCallableUpdate( function () use ( &$resSubSub ) {
$resSubSub = 'b';
} );
} );
if ( $resSub === null && $resA === null && $resSubSub === null ) {
$res = 418;
}
DeferredUpdates::doUpdates();
}
) );
DeferredUpdates::addCallableUpdate( function () use ( &$resA ) {
$resA = 93;
} );
$this->assertSame( null, $resA );
$this->assertSame( null, $res );
$this->assertSame( null, $resSub );
$this->assertSame( null, $resSubSub );
DeferredUpdates::doUpdates();
$this->assertSame( 418, $res );
$this->assertSame( 'a', $resSub );
$this->assertSame( 'b', $resSubSub );
$this->assertSame( 93, $resA );
}
}