deferredupdates: support limited DeferredUpdates::doUpdates()
Bug: T249069 Change-Id: I8b9ce062764c7993bf47aeb15f995b4257f24ee0
This commit is contained in:
parent
3963d7256a
commit
ba6490aa1e
2 changed files with 153 additions and 43 deletions
|
|
@ -55,15 +55,30 @@ use Wikimedia\Rdbms\LoadBalancer;
|
|||
* for post-send). Updates enqueued *during* doUpdate() of a "top" update go into the "sub-queue"
|
||||
* for that update. After that method finishes, the sub-queue is run until drained. This continues
|
||||
* for each top-queue job until the entire top queue is drained. This happens for the pre-send
|
||||
* top-queue, and later on, the post-send top-queue, in execute().
|
||||
* top-queue, and later on, the post-send top-queue, in doUpdates().
|
||||
*
|
||||
* @since 1.19
|
||||
*/
|
||||
class DeferredUpdates {
|
||||
/** @var DeferrableUpdate[] Updates to be deferred until before request end */
|
||||
/**
|
||||
* @var DeferrableUpdate[] Updates to be deferred until just before HTTP response emission.
|
||||
* Integer-keyed entries form a list of FIFO updates and a string-keyed entries form a map
|
||||
* of (class => MergeableUpdate) for updates that absorb the work of any already pending
|
||||
* updates of the same class.
|
||||
*/
|
||||
private static $preSendUpdates = [];
|
||||
/** @var DeferrableUpdate[] Updates to be deferred until after request end */
|
||||
/**
|
||||
* @var DeferrableUpdate[] Updates to be deferred until just after HTTP response emission.
|
||||
* Integer-keyed entries form a list of FIFO updates and a string-keyed entries form a map
|
||||
* of (class => MergeableUpdate) for updates that absorb the work of any already pending
|
||||
* updates of the same class.
|
||||
*/
|
||||
private static $postSendUpdates = [];
|
||||
/**
|
||||
* @var array[] Execution stack of currently running updates
|
||||
* @phan-var array<array{stage:int,update:DeferrableUpdate,subqueue:DeferrableUpdate[]}>
|
||||
*/
|
||||
private static $executionStack = [];
|
||||
|
||||
const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
|
||||
const PRESEND = 1; // for updates that should run before flushing output buffer
|
||||
|
|
@ -71,32 +86,37 @@ class DeferredUpdates {
|
|||
|
||||
const BIG_QUEUE_SIZE = 100;
|
||||
|
||||
/** @var array|null Information about the current execute() call or null if not running */
|
||||
private static $executeContext;
|
||||
|
||||
/**
|
||||
* Add an update to the deferred list to be run later by execute()
|
||||
* Add an update to the deferred update queue for execution at the appropriate time
|
||||
*
|
||||
* In CLI mode, callback magic will also be used to run updates when safe
|
||||
*
|
||||
* If an update is already in progress, then what happens to this update is as follows:
|
||||
* - MergeableUpdate instances always go on the top-queue for the specified stage, with
|
||||
* existing updates melding into the newly added instance at the end of the queue.
|
||||
* - Non-MergeableUpdate instances with a "defer until" stage at/before the actual run
|
||||
* stage of the innermost in-progress update go into the sub-queue of that in-progress
|
||||
* update. They are executed right after the update finishes to maximize isolation.
|
||||
* - Non-MergeableUpdate instances with a "defer until" stage after the actual run stage
|
||||
* of the innermost in-progress update go into the normal top-queue for that stage.
|
||||
*
|
||||
* @param DeferrableUpdate $update Some object that implements doUpdate()
|
||||
* @param int $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
|
||||
*/
|
||||
public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
|
||||
global $wgCommandLineMode;
|
||||
|
||||
if (
|
||||
self::$executeContext !== null &&
|
||||
// @phan-suppress-next-line PhanTypeArraySuspiciousNullable
|
||||
self::$executeContext['stage'] >= $stage &&
|
||||
!( $update instanceof MergeableUpdate )
|
||||
) {
|
||||
// This is a sub-DeferredUpdate; run it right after its parent update.
|
||||
// Also, while post-send updates are running, push any "pre-send" jobs to the
|
||||
// active post-send queue to make sure they get run this round (or at all).
|
||||
self::$executeContext['subqueue'][] = $update;
|
||||
// Special handling for updates pushed while another update is in progress
|
||||
if ( self::$executionStack && !( $update instanceof MergeableUpdate ) ) {
|
||||
// Get the innermost in-progress update
|
||||
end( self::$executionStack );
|
||||
$topStackPos = key( self::$executionStack );
|
||||
if ( self::$executionStack[$topStackPos]['stage'] >= $stage ) {
|
||||
// Put this update into the sub-queue of that in-progress update
|
||||
self::push( self::$executionStack[$topStackPos]['subqueue'], $update );
|
||||
|
||||
return;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if ( $stage === self::PRESEND ) {
|
||||
|
|
@ -129,16 +149,40 @@ class DeferredUpdates {
|
|||
}
|
||||
|
||||
/**
|
||||
* Do any deferred updates and clear the list
|
||||
* Consume the list of deferred updates and execute them
|
||||
*
|
||||
* If $stage is self::ALL then the queue of PRESEND updates will be resolved,
|
||||
* followed by the queue of POSTSEND updates
|
||||
* Note that it is rarely the case that this method should be called outside of a few
|
||||
* select entry points. For simplicity, that kind of recursion is discouraged. Recursion
|
||||
* cannot happen if an explicit transaction round is active, which limits usage to updates
|
||||
* with TRX_ROUND_ABSENT that do not leave open an transactions round of their own during
|
||||
* the call to this method.
|
||||
*
|
||||
* In the less-common case of this being called within an in-progress DeferrableUpdate,
|
||||
* this will not see any top-queue updates (since they were consumed and are being run
|
||||
* inside an outer execution loop). In that case, it will instead operate on the sub-queue
|
||||
* of the innermost in-progress update on the stack.
|
||||
*
|
||||
* If $stage is self::ALL then the queue of PRESEND updates will be resolved, followed
|
||||
* by the queue of POSTSEND updates.
|
||||
*
|
||||
* @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"]
|
||||
* @param int $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27)
|
||||
*/
|
||||
public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
|
||||
$stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;
|
||||
// Special handling for when an in-progress update triggers this method
|
||||
if ( self::$executionStack ) {
|
||||
// Run the sub-queue updates for the innermost in-progress update
|
||||
end( self::$executionStack );
|
||||
$topStackPos = key( self::$executionStack );
|
||||
self::handleUpdateQueue(
|
||||
self::$executionStack[$topStackPos]['subqueue'],
|
||||
$mode,
|
||||
$stageEffective
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
// For ALL mode, make sure that any PRESEND updates added along the way get run.
|
||||
// Normally, these use the subqueue, but that isn't true for MergeableUpdate items.
|
||||
do {
|
||||
|
|
@ -153,7 +197,7 @@ class DeferredUpdates {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param DeferrableUpdate[] &$queue
|
||||
* @param DeferrableUpdate[] &$queue Combined FIFO update list and MergeableUpdate map
|
||||
* @param DeferrableUpdate $update
|
||||
*/
|
||||
private static function push( array &$queue, DeferrableUpdate $update ) {
|
||||
|
|
@ -186,7 +230,6 @@ class DeferredUpdates {
|
|||
* @param int $stage Class constant (PRESEND, POSTSEND) (since 1.28)
|
||||
* @throws ErrorPageError Happens on top-level calls
|
||||
* @throws Exception Happens on second-level calls
|
||||
* @suppress PhanTypeArraySuspiciousNullable False positives
|
||||
*/
|
||||
protected static function handleUpdateQueue( array &$queue, $mode, $stage ) {
|
||||
$services = MediaWikiServices::getInstance();
|
||||
|
|
@ -221,23 +264,25 @@ class DeferredUpdates {
|
|||
}
|
||||
// Execute all DataUpdate queue followed by the DeferrableUpdate queue...
|
||||
foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) {
|
||||
foreach ( $updateQueue as $du ) {
|
||||
// Enqueue the task into the job queue system instead if applicable
|
||||
if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) {
|
||||
self::jobify( $du, $lbf, $logger, $stats, $httpMethod );
|
||||
foreach ( $updateQueue as $curUpdate ) {
|
||||
// Enqueue the update into the job queue system instead if applicable
|
||||
if ( $mode === 'enqueue' && $curUpdate instanceof EnqueueableDataUpdate ) {
|
||||
self::jobify( $curUpdate, $lbf, $logger, $stats, $httpMethod );
|
||||
continue;
|
||||
}
|
||||
// Otherwise, execute the task and any subtasks that it spawns
|
||||
self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
|
||||
// Otherwise, execute the update, followed by any sub-updates that it spawns
|
||||
$stackEntry = [ 'stage' => $stage, 'update' => $curUpdate, 'subqueue' => [] ];
|
||||
$stackKey = count( self::$executionStack );
|
||||
self::$executionStack[$stackKey] =& $stackEntry;
|
||||
try {
|
||||
$e = self::run( $du, $lbf, $logger, $stats, $httpMethod );
|
||||
$e = self::run( $curUpdate, $lbf, $logger, $stats, $httpMethod );
|
||||
$guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
|
||||
$exception = $exception ?: $e;
|
||||
// Do the subqueue updates for $update until there are none
|
||||
while ( self::$executeContext['subqueue'] ) {
|
||||
$duChild = reset( self::$executeContext['subqueue'] );
|
||||
$firstKey = key( self::$executeContext['subqueue'] );
|
||||
unset( self::$executeContext['subqueue'][$firstKey] );
|
||||
while ( $stackEntry['subqueue'] ) {
|
||||
$duChild = reset( $stackEntry['subqueue'] );
|
||||
$duChildKey = key( $stackEntry['subqueue'] );
|
||||
unset( $stackEntry['subqueue'][$duChildKey] );
|
||||
|
||||
$e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod );
|
||||
$guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null );
|
||||
|
|
@ -247,7 +292,7 @@ class DeferredUpdates {
|
|||
// Make sure we always clean up the context.
|
||||
// Losing updates while rewinding the stack is acceptable,
|
||||
// losing updates that are added later is not.
|
||||
self::$executeContext = null;
|
||||
unset( self::$executionStack[$stackKey] );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -271,7 +316,7 @@ class DeferredUpdates {
|
|||
}
|
||||
|
||||
/**
|
||||
* Run a task and catch/log any throwables
|
||||
* Run an update, and, if an error was thrown, catch/log it and fallback to the job queue
|
||||
*
|
||||
* @param DeferrableUpdate $update
|
||||
* @param LBFactory $lbFactory
|
||||
|
|
@ -337,7 +382,7 @@ class DeferredUpdates {
|
|||
}
|
||||
|
||||
/**
|
||||
* Push a task into the job queue system and catch/log any exceptions
|
||||
* Push a update into the job queue system and catch/log any exceptions
|
||||
*
|
||||
* @param EnqueueableDataUpdate $update
|
||||
* @param LBFactory $lbFactory
|
||||
|
|
@ -424,15 +469,15 @@ class DeferredUpdates {
|
|||
*
|
||||
* If there are many deferred updates pending, $mode is 'run', and there
|
||||
* are still busy LBFactory database handles, then any EnqueueableDataUpdate
|
||||
* tasks might be enqueued as jobs to be executed later.
|
||||
* updates might be enqueued as jobs to be executed later.
|
||||
*
|
||||
* @param string $mode Use "enqueue" to use the job queue when possible
|
||||
* @return bool Whether updates were allowed to run
|
||||
* @since 1.28
|
||||
*/
|
||||
public static function tryOpportunisticExecute( $mode = 'run' ) {
|
||||
// execute() loop is already running
|
||||
if ( self::$executeContext ) {
|
||||
// An update is already in progress
|
||||
if ( self::$executionStack ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -475,14 +520,28 @@ class DeferredUpdates {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return int Number of enqueued updates
|
||||
* Get the number of currently enqueued updates in the top-queues
|
||||
*
|
||||
* Calling this while an update is in-progress produces undefined results
|
||||
*
|
||||
* @return int
|
||||
* @since 1.28
|
||||
*/
|
||||
public static function pendingUpdatesCount() {
|
||||
if ( self::$executionStack ) {
|
||||
throw new LogicException( "Called during execution of a DeferrableUpdate" );
|
||||
}
|
||||
|
||||
return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of pending updates in the top-queues
|
||||
*
|
||||
* Calling this while an update is in-progress produces undefined results
|
||||
*
|
||||
* This method should only be used for unit tests
|
||||
*
|
||||
* @param int $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL)
|
||||
* @return DeferrableUpdate[]
|
||||
* @since 1.29
|
||||
|
|
@ -495,12 +554,16 @@ class DeferredUpdates {
|
|||
if ( $stage === self::ALL || $stage === self::POSTSEND ) {
|
||||
$updates = array_merge( $updates, self::$postSendUpdates );
|
||||
}
|
||||
|
||||
return $updates;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all pending updates without performing them. Generally, you don't
|
||||
* want or need to call this. Unit tests need it though.
|
||||
* Clear all pending updates without performing them
|
||||
*
|
||||
* Calling this while an update is in-progress produces undefined results
|
||||
*
|
||||
* This method should only be used for unit tests
|
||||
*/
|
||||
public static function clearPendingUpdates() {
|
||||
self::$preSendUpdates = [];
|
||||
|
|
|
|||
|
|
@ -396,4 +396,51 @@ class DeferredUpdatesTest extends MediaWikiTestCase {
|
|||
|
||||
$this->assertTrue( $called, "Callback ran" );
|
||||
}
|
||||
|
||||
/**
|
||||
* @covers DeferredUpdates::doUpdates
|
||||
*/
|
||||
public function testNestedExecution() {
|
||||
// No immediate execution
|
||||
$this->setMwGlobals( 'wgCommandLineMode', false );
|
||||
|
||||
$res = null;
|
||||
$resSub = null;
|
||||
$resSubSub = null;
|
||||
$resA = null;
|
||||
|
||||
// T249069: TransactionRoundDefiningUpdate => JobRunner => DeferredUpdates::doUpdates()
|
||||
DeferredUpdates::addUpdate( new TransactionRoundDefiningUpdate(
|
||||
function () use ( &$res, &$resSub, &$resSubSub, &$resA ) {
|
||||
$res = 1;
|
||||
// Add update to subqueue of in-progress top-queue job
|
||||
DeferredUpdates::addCallableUpdate( function () use ( &$resSub, &$resSubSub ) {
|
||||
$resSub = 'a';
|
||||
// Add update to subqueue of in-progress top-queue job (not recursive)
|
||||
DeferredUpdates::addCallableUpdate( function () use ( &$resSubSub ) {
|
||||
$resSubSub = 'b';
|
||||
} );
|
||||
} );
|
||||
if ( $resSub === null && $resA === null && $resSubSub === null ) {
|
||||
$res = 418;
|
||||
}
|
||||
DeferredUpdates::doUpdates();
|
||||
}
|
||||
) );
|
||||
DeferredUpdates::addCallableUpdate( function () use ( &$resA ) {
|
||||
$resA = 93;
|
||||
} );
|
||||
|
||||
$this->assertSame( null, $resA );
|
||||
$this->assertSame( null, $res );
|
||||
$this->assertSame( null, $resSub );
|
||||
$this->assertSame( null, $resSubSub );
|
||||
|
||||
DeferredUpdates::doUpdates();
|
||||
|
||||
$this->assertSame( 418, $res );
|
||||
$this->assertSame( 'a', $resSub );
|
||||
$this->assertSame( 'b', $resSubSub );
|
||||
$this->assertSame( 93, $resA );
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue