Use transaction listener to run DeferredUpdates in CLI mode

This sets triggers on master position waits typically called
after commitMasterChanges() or in commitAndWaitForReplication().

Change-Id: I127a8fe3cfc319abfa84fcd221ee2dae191c6d3b
This commit is contained in:
Aaron Schulz 2016-08-28 09:06:57 -07:00 committed by Krinkle
parent c9f1f64cad
commit 44f486ffac
3 changed files with 126 additions and 0 deletions

View file

@ -19,6 +19,7 @@
*
* @file
*/
use MediaWiki\MediaWikiServices;
/**
* Class for managing the deferred updates
@ -46,6 +47,8 @@ class DeferredUpdates {
const PRESEND = 1; // for updates that should run before flushing output buffer
const POSTSEND = 2; // for updates that should run after flushing output buffer
const BIG_QUEUE_SIZE = 100;
/**
* Add an update to the deferred list
*
@ -181,6 +184,71 @@ class DeferredUpdates {
}
}
/**
* Run all deferred updates immediately if there are no DB writes active
*
* If $mode is 'run' but there are busy databates, EnqueueableDataUpdate
* tasks will be enqueued anyway for the sake of progress.
*
* @param string $mode Use "enqueue" to use the job queue when possible
* @return bool Whether updates were allowed to run
* @since 1.28
*/
public static function tryOpportunisticExecute( $mode = 'run' ) {
static $recursionGuard = false;
if ( $recursionGuard ) {
return false; // COMMITs trigger inside update loop and inside some updates
}
try {
$recursionGuard = true;
if ( !self::getBusyDbConnections() ) {
self::doUpdates( $mode );
return true;
}
if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
// If we cannot run the updates with outer transaction context, try to
// at least enqueue all the updates that support queueing to job queue
self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
}
return !self::pendingUpdatesCount();
} finally {
$recursionGuard = false;
}
}
/**
* Enqueue a job for each EnqueueableDataUpdate item and return the other items
*
* @param DeferrableUpdate[] $updates A list of deferred update instances
* @return DeferrableUpdate[] Remaining updates that do not support being queued
*/
private static function enqueueUpdates( array $updates ) {
$remaining = [];
foreach ( $updates as $update ) {
if ( $update instanceof EnqueueableDataUpdate ) {
$spec = $update->getAsJobSpecification();
JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
} else {
$remaining[] = $update;
}
}
return $remaining;
}
/**
* @return integer Number of enqueued updates
* @since 1.28
*/
public static function pendingUpdatesCount() {
return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
}
/**
* Clear all pending updates without performing them. Generally, you don't
* want or need to call this. Unit tests need it though.
@ -189,4 +257,44 @@ class DeferredUpdates {
self::$preSendUpdates = [];
self::$postSendUpdates = [];
}
/**
* Set the rollback/commit watcher on a DB to trigger update runs when safe
*
* @TODO: use this to replace DB logic in push()
* @param LoadBalancer $lb
* @since 1.28
*/
public static function installDBListener( LoadBalancer $lb ) {
static $triggers = [ IDatabase::TRIGGER_COMMIT, IDatabase::TRIGGER_ROLLBACK ];
// Hook into active master connections to find a moment where no writes are pending
$lb->setTransactionListener(
__METHOD__,
function ( $trigger, IDatabase $conn ) use ( $triggers ) {
global $wgCommandLineMode;
if ( $wgCommandLineMode && in_array( $trigger, $triggers ) ) {
DeferredUpdates::tryOpportunisticExecute();
}
}
);
}
/**
* @return IDatabase[] Connection where commit() cannot be called yet
*/
private static function getBusyDbConnections() {
$connsBusy = [];
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
$lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
$connsBusy[] = $conn;
}
} );
} );
return $connsBusy;
}
}

View file

@ -37,6 +37,7 @@ define( 'DO_MAINTENANCE', RUN_MAINTENANCE_IF_MAIN ); // original name, harmless
$maintClass = false;
use MediaWiki\Logger\LoggerFactory;
use MediaWiki\MediaWikiServices;
/**
* Abstract maintenance class for quickly writing and churning out
@ -548,6 +549,19 @@ abstract class Maintenance {
}
/**
* Set triggers like when to try to run deferred updates
* @since 1.28
*/
public function setTriggers() {
// Hook into period lag checks which often happen in long-running scripts
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$lbFactory->setWaitForReplicationListener(
__METHOD__,
[ 'DeferredUpdates', 'tryOpportunisticExecute' ]
);
}
/**
* Run a child maintenance script. Pass all of the current arguments
* to it.

View file

@ -102,6 +102,10 @@ $maintenance->setConfig( ConfigFactory::getDefaultInstance()->makeConfig( 'main'
// Sanity-check required extensions are installed
$maintenance->checkRequiredExtensions();
// A good time when no DBs have writes pending is around lag checks.
// This avoids having long running scripts just OOM and lose all the updates.
$maintenance->setTriggers();
// Do the work
$maintenance->execute();