diff --git a/includes/deferred/DeferredUpdates.php b/includes/deferred/DeferredUpdates.php index 42086395bc5..5ddfc021924 100644 --- a/includes/deferred/DeferredUpdates.php +++ b/includes/deferred/DeferredUpdates.php @@ -55,15 +55,30 @@ use Wikimedia\Rdbms\LoadBalancer; * for post-send). Updates enqueued *during* doUpdate() of a "top" update go into the "sub-queue" * for that update. After that method finishes, the sub-queue is run until drained. This continues * for each top-queue job until the entire top queue is drained. This happens for the pre-send - * top-queue, and later on, the post-send top-queue, in execute(). + * top-queue, and later on, the post-send top-queue, in doUpdates(). * * @since 1.19 */ class DeferredUpdates { - /** @var DeferrableUpdate[] Updates to be deferred until before request end */ + /** + * @var DeferrableUpdate[] Updates to be deferred until just before HTTP response emission. + * Integer-keyed entries form a list of FIFO updates and a string-keyed entries form a map + * of (class => MergeableUpdate) for updates that absorb the work of any already pending + * updates of the same class. + */ private static $preSendUpdates = []; - /** @var DeferrableUpdate[] Updates to be deferred until after request end */ + /** + * @var DeferrableUpdate[] Updates to be deferred until just after HTTP response emission. + * Integer-keyed entries form a list of FIFO updates and a string-keyed entries form a map + * of (class => MergeableUpdate) for updates that absorb the work of any already pending + * updates of the same class. + */ private static $postSendUpdates = []; + /** + * @var array[] Execution stack of currently running updates + * @phan-var array + */ + private static $executionStack = []; const ALL = 0; // all updates; in web requests, use only after flushing the output buffer const PRESEND = 1; // for updates that should run before flushing output buffer @@ -71,32 +86,37 @@ class DeferredUpdates { const BIG_QUEUE_SIZE = 100; - /** @var array|null Information about the current execute() call or null if not running */ - private static $executeContext; - /** - * Add an update to the deferred list to be run later by execute() + * Add an update to the deferred update queue for execution at the appropriate time * * In CLI mode, callback magic will also be used to run updates when safe * + * If an update is already in progress, then what happens to this update is as follows: + * - MergeableUpdate instances always go on the top-queue for the specified stage, with + * existing updates melding into the newly added instance at the end of the queue. + * - Non-MergeableUpdate instances with a "defer until" stage at/before the actual run + * stage of the innermost in-progress update go into the sub-queue of that in-progress + * update. They are executed right after the update finishes to maximize isolation. + * - Non-MergeableUpdate instances with a "defer until" stage after the actual run stage + * of the innermost in-progress update go into the normal top-queue for that stage. + * * @param DeferrableUpdate $update Some object that implements doUpdate() * @param int $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27) */ public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) { global $wgCommandLineMode; - if ( - self::$executeContext !== null && - // @phan-suppress-next-line PhanTypeArraySuspiciousNullable - self::$executeContext['stage'] >= $stage && - !( $update instanceof MergeableUpdate ) - ) { - // This is a sub-DeferredUpdate; run it right after its parent update. - // Also, while post-send updates are running, push any "pre-send" jobs to the - // active post-send queue to make sure they get run this round (or at all). - self::$executeContext['subqueue'][] = $update; + // Special handling for updates pushed while another update is in progress + if ( self::$executionStack && !( $update instanceof MergeableUpdate ) ) { + // Get the innermost in-progress update + end( self::$executionStack ); + $topStackPos = key( self::$executionStack ); + if ( self::$executionStack[$topStackPos]['stage'] >= $stage ) { + // Put this update into the sub-queue of that in-progress update + self::push( self::$executionStack[$topStackPos]['subqueue'], $update ); - return; + return; + } } if ( $stage === self::PRESEND ) { @@ -129,16 +149,40 @@ class DeferredUpdates { } /** - * Do any deferred updates and clear the list + * Consume the list of deferred updates and execute them * - * If $stage is self::ALL then the queue of PRESEND updates will be resolved, - * followed by the queue of POSTSEND updates + * Note that it is rarely the case that this method should be called outside of a few + * select entry points. For simplicity, that kind of recursion is discouraged. Recursion + * cannot happen if an explicit transaction round is active, which limits usage to updates + * with TRX_ROUND_ABSENT that do not leave open an transactions round of their own during + * the call to this method. + * + * In the less-common case of this being called within an in-progress DeferrableUpdate, + * this will not see any top-queue updates (since they were consumed and are being run + * inside an outer execution loop). In that case, it will instead operate on the sub-queue + * of the innermost in-progress update on the stack. + * + * If $stage is self::ALL then the queue of PRESEND updates will be resolved, followed + * by the queue of POSTSEND updates. * * @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"] * @param int $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27) */ public static function doUpdates( $mode = 'run', $stage = self::ALL ) { $stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage; + // Special handling for when an in-progress update triggers this method + if ( self::$executionStack ) { + // Run the sub-queue updates for the innermost in-progress update + end( self::$executionStack ); + $topStackPos = key( self::$executionStack ); + self::handleUpdateQueue( + self::$executionStack[$topStackPos]['subqueue'], + $mode, + $stageEffective + ); + + return; + } // For ALL mode, make sure that any PRESEND updates added along the way get run. // Normally, these use the subqueue, but that isn't true for MergeableUpdate items. do { @@ -153,7 +197,7 @@ class DeferredUpdates { } /** - * @param DeferrableUpdate[] &$queue + * @param DeferrableUpdate[] &$queue Combined FIFO update list and MergeableUpdate map * @param DeferrableUpdate $update */ private static function push( array &$queue, DeferrableUpdate $update ) { @@ -186,7 +230,6 @@ class DeferredUpdates { * @param int $stage Class constant (PRESEND, POSTSEND) (since 1.28) * @throws ErrorPageError Happens on top-level calls * @throws Exception Happens on second-level calls - * @suppress PhanTypeArraySuspiciousNullable False positives */ protected static function handleUpdateQueue( array &$queue, $mode, $stage ) { $services = MediaWikiServices::getInstance(); @@ -221,23 +264,25 @@ class DeferredUpdates { } // Execute all DataUpdate queue followed by the DeferrableUpdate queue... foreach ( [ $dataUpdateQueue, $genericUpdateQueue ] as $updateQueue ) { - foreach ( $updateQueue as $du ) { - // Enqueue the task into the job queue system instead if applicable - if ( $mode === 'enqueue' && $du instanceof EnqueueableDataUpdate ) { - self::jobify( $du, $lbf, $logger, $stats, $httpMethod ); + foreach ( $updateQueue as $curUpdate ) { + // Enqueue the update into the job queue system instead if applicable + if ( $mode === 'enqueue' && $curUpdate instanceof EnqueueableDataUpdate ) { + self::jobify( $curUpdate, $lbf, $logger, $stats, $httpMethod ); continue; } - // Otherwise, execute the task and any subtasks that it spawns - self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ]; + // Otherwise, execute the update, followed by any sub-updates that it spawns + $stackEntry = [ 'stage' => $stage, 'update' => $curUpdate, 'subqueue' => [] ]; + $stackKey = count( self::$executionStack ); + self::$executionStack[$stackKey] =& $stackEntry; try { - $e = self::run( $du, $lbf, $logger, $stats, $httpMethod ); + $e = self::run( $curUpdate, $lbf, $logger, $stats, $httpMethod ); $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null ); $exception = $exception ?: $e; // Do the subqueue updates for $update until there are none - while ( self::$executeContext['subqueue'] ) { - $duChild = reset( self::$executeContext['subqueue'] ); - $firstKey = key( self::$executeContext['subqueue'] ); - unset( self::$executeContext['subqueue'][$firstKey] ); + while ( $stackEntry['subqueue'] ) { + $duChild = reset( $stackEntry['subqueue'] ); + $duChildKey = key( $stackEntry['subqueue'] ); + unset( $stackEntry['subqueue'][$duChildKey] ); $e = self::run( $duChild, $lbf, $logger, $stats, $httpMethod ); $guiEx = $guiEx ?: ( $e instanceof ErrorPageError ? $e : null ); @@ -247,7 +292,7 @@ class DeferredUpdates { // Make sure we always clean up the context. // Losing updates while rewinding the stack is acceptable, // losing updates that are added later is not. - self::$executeContext = null; + unset( self::$executionStack[$stackKey] ); } } } @@ -271,7 +316,7 @@ class DeferredUpdates { } /** - * Run a task and catch/log any throwables + * Run an update, and, if an error was thrown, catch/log it and fallback to the job queue * * @param DeferrableUpdate $update * @param LBFactory $lbFactory @@ -337,7 +382,7 @@ class DeferredUpdates { } /** - * Push a task into the job queue system and catch/log any exceptions + * Push a update into the job queue system and catch/log any exceptions * * @param EnqueueableDataUpdate $update * @param LBFactory $lbFactory @@ -424,15 +469,15 @@ class DeferredUpdates { * * If there are many deferred updates pending, $mode is 'run', and there * are still busy LBFactory database handles, then any EnqueueableDataUpdate - * tasks might be enqueued as jobs to be executed later. + * updates might be enqueued as jobs to be executed later. * * @param string $mode Use "enqueue" to use the job queue when possible * @return bool Whether updates were allowed to run * @since 1.28 */ public static function tryOpportunisticExecute( $mode = 'run' ) { - // execute() loop is already running - if ( self::$executeContext ) { + // An update is already in progress + if ( self::$executionStack ) { return false; } @@ -475,14 +520,28 @@ class DeferredUpdates { } /** - * @return int Number of enqueued updates + * Get the number of currently enqueued updates in the top-queues + * + * Calling this while an update is in-progress produces undefined results + * + * @return int * @since 1.28 */ public static function pendingUpdatesCount() { + if ( self::$executionStack ) { + throw new LogicException( "Called during execution of a DeferrableUpdate" ); + } + return count( self::$preSendUpdates ) + count( self::$postSendUpdates ); } /** + * Get the list of pending updates in the top-queues + * + * Calling this while an update is in-progress produces undefined results + * + * This method should only be used for unit tests + * * @param int $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) * @return DeferrableUpdate[] * @since 1.29 @@ -495,12 +554,16 @@ class DeferredUpdates { if ( $stage === self::ALL || $stage === self::POSTSEND ) { $updates = array_merge( $updates, self::$postSendUpdates ); } + return $updates; } /** - * Clear all pending updates without performing them. Generally, you don't - * want or need to call this. Unit tests need it though. + * Clear all pending updates without performing them + * + * Calling this while an update is in-progress produces undefined results + * + * This method should only be used for unit tests */ public static function clearPendingUpdates() { self::$preSendUpdates = []; diff --git a/tests/phpunit/includes/deferred/DeferredUpdatesTest.php b/tests/phpunit/includes/deferred/DeferredUpdatesTest.php index 540c31d9cff..4ef4b356e84 100644 --- a/tests/phpunit/includes/deferred/DeferredUpdatesTest.php +++ b/tests/phpunit/includes/deferred/DeferredUpdatesTest.php @@ -396,4 +396,51 @@ class DeferredUpdatesTest extends MediaWikiTestCase { $this->assertTrue( $called, "Callback ran" ); } + + /** + * @covers DeferredUpdates::doUpdates + */ + public function testNestedExecution() { + // No immediate execution + $this->setMwGlobals( 'wgCommandLineMode', false ); + + $res = null; + $resSub = null; + $resSubSub = null; + $resA = null; + + // T249069: TransactionRoundDefiningUpdate => JobRunner => DeferredUpdates::doUpdates() + DeferredUpdates::addUpdate( new TransactionRoundDefiningUpdate( + function () use ( &$res, &$resSub, &$resSubSub, &$resA ) { + $res = 1; + // Add update to subqueue of in-progress top-queue job + DeferredUpdates::addCallableUpdate( function () use ( &$resSub, &$resSubSub ) { + $resSub = 'a'; + // Add update to subqueue of in-progress top-queue job (not recursive) + DeferredUpdates::addCallableUpdate( function () use ( &$resSubSub ) { + $resSubSub = 'b'; + } ); + } ); + if ( $resSub === null && $resA === null && $resSubSub === null ) { + $res = 418; + } + DeferredUpdates::doUpdates(); + } + ) ); + DeferredUpdates::addCallableUpdate( function () use ( &$resA ) { + $resA = 93; + } ); + + $this->assertSame( null, $resA ); + $this->assertSame( null, $res ); + $this->assertSame( null, $resSub ); + $this->assertSame( null, $resSubSub ); + + DeferredUpdates::doUpdates(); + + $this->assertSame( 418, $res ); + $this->assertSame( 'a', $resSub ); + $this->assertSame( 'b', $resSubSub ); + $this->assertSame( 93, $resA ); + } }