Merge "Convert JobRunner into a service and use DI"

This commit is contained in:
jenkins-bot 2020-02-27 16:36:46 +00:00 committed by Gerrit Code Review
commit 9e3f782cf0
10 changed files with 151 additions and 80 deletions

View file

@ -529,6 +529,11 @@ because of Phabricator reports.
class in 1.31. ::userCanBitfield was individually hard deprecated; use
RevisionRecord::userCanBitfield instead.
* RecentChange::markPatrolled was deprecated. Use ::doMarkPatrolled instead.
* The JobRunner class has been converted to a service class.
Direct construction is deprecated, use MediaWikiServices::getJobRunner.
* JobRunner::setLogger has been deprecated, thus using JobRunner as a
LoggerAwareInterface is deprecated as well. Rely on the logger passed in the
constructor instead.
* …
=== Other changes in 1.35 ===

View file

@ -623,10 +623,10 @@ class MediaWiki {
if ( !$invokedWithSuccess ) {
// Fall back to blocking on running the job(s)
$logger->warning( "Jobs switched to blocking; Special:RunJobs disabled" );
$this->triggerSyncJobs( $n, $logger );
$this->triggerSyncJobs( $n );
}
} else {
$this->triggerSyncJobs( $n, $logger );
$this->triggerSyncJobs( $n );
}
} );
}
@ -1128,10 +1128,10 @@ class MediaWiki {
if ( !$invokedWithSuccess ) {
// Fall back to blocking on running the job(s)
$logger->warning( "Jobs switched to blocking; Special:RunJobs disabled" );
$this->triggerSyncJobs( $n, $logger );
$this->triggerSyncJobs( $n );
}
} else {
$this->triggerSyncJobs( $n, $logger );
$this->triggerSyncJobs( $n );
}
} catch ( JobQueueError $e ) {
// Do not make the site unavailable (T88312)
@ -1141,13 +1141,12 @@ class MediaWiki {
/**
* @param int $n Number of jobs to try to run
* @param LoggerInterface $runJobsLogger
*/
private function triggerSyncJobs( $n, LoggerInterface $runJobsLogger ) {
private function triggerSyncJobs( $n ) {
$trxProfiler = Profiler::instance()->getTransactionProfiler();
$old = $trxProfiler->setSilenced( true );
try {
$runner = new JobRunner( $runJobsLogger );
$runner = MediaWikiServices::getInstance()->getJobRunner();
$runner->run( [ 'maxJobs' => $n ] );
} finally {
$trxProfiler->setSilenced( $old );

View file

@ -18,6 +18,7 @@ use GenderCache;
use GlobalVarConfig;
use Hooks;
use IBufferingStatsdDataFactory;
use JobRunner;
use Language;
use LinkCache;
use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
@ -695,6 +696,14 @@ class MediaWikiServices extends ServiceContainer {
return $this->getService( 'InterwikiLookup' );
}
/**
* @since 1.35
* @return JobRunner
*/
public function getJobRunner() : JobRunner {
return $this->getService( 'JobRunner' );
}
/**
* @since 1.35
* @return LanguageConverterFactory

View file

@ -350,6 +350,18 @@ return [
);
},
'JobRunner' => function ( MediaWikiServices $services ) : JobRunner {
return new JobRunner(
new ServiceOptions( JobRunner::CONSTRUCTOR_OPTIONS, $services->getMainConfig() ),
$services->getDBLoadBalancerFactory(),
JobQueueGroup::singleton(),
$services->getReadOnlyMode(),
$services->getLinkCache(),
$services->getStatsdDataFactory(),
LoggerFactory::getInstance( 'runJobs' )
);
},
'LanguageConverterFactory' => function ( MediaWikiServices $services ) : LanguageConverterFactory {
$usePigLatinVariant = $services->getMainConfig()->get( 'UsePigLatinVariant' );
return new LanguageConverterFactory( $usePigLatinVariant, function () use ( $services ) {

View file

@ -22,13 +22,14 @@
*/
use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
use MediaWiki\Config\ServiceOptions;
use MediaWiki\Logger\LoggerFactory;
use MediaWiki\MediaWikiServices;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Wikimedia\Rdbms\DBError;
use Wikimedia\Rdbms\DBReadOnlyError;
use Wikimedia\Rdbms\LBFactory;
use Wikimedia\Rdbms\ILBFactory;
use Wikimedia\ScopedCallback;
/**
@ -38,13 +39,38 @@ use Wikimedia\ScopedCallback;
* @since 1.24
*/
class JobRunner implements LoggerAwareInterface {
/** @var Config */
protected $config;
public const CONSTRUCTOR_OPTIONS = [
'JobBackoffThrottling',
'JobClasses',
'JobSerialCommitThreshold',
'MaxJobDBWriteDuration',
'TrxProfilerLimits'
];
/** @var ServiceOptions */
private $options;
/** @var ILBFactory */
private $lbFactory;
/** @var JobQueueGroup */
private $jobQueueGroup;
/** @var ReadOnlyMode */
private $readOnlyMode;
/** @var LinkCache */
private $linkCache;
/** @var StatsdDataFactoryInterface */
private $stats;
/** @var callable|null Debug output handler */
protected $debug;
private $debug;
/** @var LoggerInterface */
protected $logger;
private $logger;
/** @var int Abort if more than this much DB lag is present */
private const MAX_ALLOWED_LAG = 3;
@ -67,20 +93,49 @@ class JobRunner implements LoggerAwareInterface {
/**
* @param LoggerInterface $logger
* @return void
* @deprecated since 1.35. Rely on the logger passed in the constructor.
*/
public function setLogger( LoggerInterface $logger ) {
wfDeprecated( __METHOD__, '1.35' );
$this->logger = $logger;
}
/**
* Calling this directly is deprecated.
* Obtain an instance via MediaWikiServices instead.
* @param ServiceOptions|LoggerInterface|null $serviceOptions
* @param ILBFactory|null $lbFactory
* @param JobQueueGroup|null $jobQueueGroup The JobQueueGroup for this wiki
* @param ReadOnlyMode|null $readOnlyMode
* @param LinkCache|null $linkCache
* @param StatsdDataFactoryInterface|null $statsdDataFactory
* @param LoggerInterface|null $logger
*/
public function __construct( LoggerInterface $logger = null ) {
if ( $logger === null ) {
$logger = LoggerFactory::getInstance( 'runJobs' );
public function __construct(
$serviceOptions = null,
ILBFactory $lbFactory = null,
JobQueueGroup $jobQueueGroup = null,
ReadOnlyMode $readOnlyMode = null,
LinkCache $linkCache = null,
StatsdDataFactoryInterface $statsdDataFactory = null,
LoggerInterface $logger = null
) {
if ( !$serviceOptions || $serviceOptions instanceof LoggerInterface ) {
// TODO: wfDeprecated( __METHOD__ . 'called directly. Use MediaWikiServices instead', '1.35' );
$logger = $serviceOptions;
$serviceOptions = new ServiceOptions(
static::CONSTRUCTOR_OPTIONS,
MediaWikiServices::getInstance()->getMainConfig()
);
}
$this->setLogger( $logger );
$this->config = MediaWikiServices::getInstance()->getMainConfig();
$this->options = $serviceOptions;
$this->lbFactory = $lbFactory ?? MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$this->jobQueueGroup = $jobQueueGroup ?? JobQueueGroup::singleton();
$this->readOnlyMode = $readOnlyMode ?: MediaWikiServices::getInstance()->getReadOnlyMode();
$this->linkCache = $linkCache ?? MediaWikiServices::getInstance()->getLinkCache();
$this->stats = $statsdDataFactory ?? MediaWikiServices::getInstance()->getStatsdDataFactory();
$this->logger = $logger ?? LoggerFactory::getInstance( 'runJobs' );
}
/**
@ -114,9 +169,8 @@ class JobRunner implements LoggerAwareInterface {
$maxTime = $options['maxTime'] ?? false;
$throttle = $options['throttle'] ?? true;
$services = MediaWikiServices::getInstance();
$jobClasses = $this->config->get( 'JobClasses' );
$profilerLimits = $this->config->get( 'TrxProfilerLimits' );
$jobClasses = $this->options->get( 'JobClasses' );
$profilerLimits = $this->options->get( 'TrxProfilerLimits' );
$response = [ 'jobs' => [], 'reached' => 'none-ready' ];
@ -126,14 +180,13 @@ class JobRunner implements LoggerAwareInterface {
return $response;
}
if ( $services->getReadOnlyMode()->isReadOnly() ) {
if ( $this->readOnlyMode->isReadOnly() ) {
// Any jobs popped off the queue might fail to run and thus might end up lost
$response['reached'] = 'read-only';
return $response;
}
$lbFactory = $services->getDBLoadBalancerFactory();
list( , $maxLag ) = $lbFactory->getMainLB()->getMaxLag();
list( , $maxLag ) = $this->lbFactory->getMainLB()->getMaxLag();
if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
// DB lag is already too high; caller can immediately try other wikis if applicable
$response['reached'] = 'replica-lag-limit';
@ -141,12 +194,11 @@ class JobRunner implements LoggerAwareInterface {
}
// Narrow DB query expectations for this HTTP request
$trxProfiler = Profiler::instance()->getTransactionProfiler();
$trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
$trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
$this->lbFactory->getTransactionProfiler()
->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
// Error out if an explicit DB transaction round is somehow active
if ( $lbFactory->hasTransactionRound() ) {
if ( $this->lbFactory->hasTransactionRound() ) {
throw new LogicException( __METHOD__ . ' called with an active transaction round.' );
}
@ -155,9 +207,6 @@ class JobRunner implements LoggerAwareInterface {
$backoffDeltas = []; // map of (type => seconds)
$wait = 'wait'; // block to read backoffs the first time
$group = JobQueueGroup::singleton();
$stats = $services->getStatsdDataFactory();
$loopStartTime = microtime( true );
$jobsPopped = 0;
$timeMsTotal = 0;
@ -171,11 +220,12 @@ class JobRunner implements LoggerAwareInterface {
if ( $type === false ) {
// Treat the default job type queues as a single queue and pop off a job
$job = $group->pop( $group::TYPE_DEFAULT, $group::USE_CACHE, $blacklist );
$job = $this->jobQueueGroup
->pop( JobQueueGroup::TYPE_DEFAULT, JobQueueGroup::USE_CACHE, $blacklist );
} else {
// Pop off a job from the specified job type queue unless the execution of
// that type of job is currently rate-limited by the back-off blacklist
$job = in_array( $type, $blacklist ) ? false : $group->pop( $type );
$job = in_array( $type, $blacklist ) ? false : $this->jobQueueGroup->pop( $type );
}
if ( $job ) {
@ -191,11 +241,11 @@ class JobRunner implements LoggerAwareInterface {
$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
}
$info = $this->executeJob( $job, $lbFactory, $stats );
$info = $this->executeJob( $job );
// Mark completed or "one shot only" jobs as resolved
if ( $info['status'] !== false || !$job->allowRetries() ) {
$group->ack( $job );
$this->jobQueueGroup->ack( $job );
}
// Back off of certain jobs for a while (for throttling and for errors)
@ -229,7 +279,7 @@ class JobRunner implements LoggerAwareInterface {
$timePassed = microtime( true ) - $lastSyncTime;
if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
$opts = [ 'ifWritesSince' => $lastSyncTime, 'timeout' => self::SYNC_TIMEOUT ];
if ( !$lbFactory->waitForReplication( $opts ) ) {
if ( !$this->lbFactory->waitForReplication( $opts ) ) {
$response['reached'] = 'replica-lag-limit';
break;
}
@ -266,8 +316,6 @@ class JobRunner implements LoggerAwareInterface {
* This should never be called if there are explicit transaction rounds or pending DB writes
*
* @param RunnableJob $job
* @param LBFactory $lbFactory
* @param StatsdDataFactoryInterface $stats
* @return array Map of:
* - status: boolean; whether the job succeed
* - error: error string; empty if there was no error specified
@ -275,39 +323,29 @@ class JobRunner implements LoggerAwareInterface {
* - timeMs: float; job execution time in milliseconds
* @since 1.35
*/
public function executeJob(
RunnableJob $job,
LBFactory $lbFactory,
StatsdDataFactoryInterface $stats
) {
public function executeJob( RunnableJob $job ) {
$oldRequestId = WebRequest::getRequestId();
// Temporarily inherit the original ID of the web request that spawned this job
WebRequest::overrideRequestId( $job->getRequestId() );
// Use an appropriate timeout to balance lag avoidance and job progress
$oldTimeout = $lbFactory->setDefaultReplicationWaitTimeout( self::SYNC_TIMEOUT );
$oldTimeout = $this->lbFactory->setDefaultReplicationWaitTimeout( self::SYNC_TIMEOUT );
try {
return $this->doExecuteJob( $job, $lbFactory, $stats );
return $this->doExecuteJob( $job );
} finally {
$lbFactory->setDefaultReplicationWaitTimeout( $oldTimeout );
$this->lbFactory->setDefaultReplicationWaitTimeout( $oldTimeout );
WebRequest::overrideRequestId( $oldRequestId );
}
}
/**
* @param RunnableJob $job
* @param LBFactory $lbFactory
* @param StatsdDataFactoryInterface $stats
* @return array Map of:
* - status: boolean; whether the job succeed
* - error: error string; empty if there was no error specified
* - caught: list of FQCNs corresponding to any exceptions caught
* - timeMs: float; job execution time in milliseconds
*/
private function doExecuteJob(
RunnableJob $job,
LBFactory $lbFactory,
StatsdDataFactoryInterface $stats
) {
private function doExecuteJob( RunnableJob $job ) {
$jType = $job->getType();
$msg = $job->toString() . " STARTING";
$this->logger->debug( $msg, [ 'job_type' => $job->getType() ] );
@ -315,7 +353,7 @@ class JobRunner implements LoggerAwareInterface {
// Clear out title cache data from prior snapshots
// (e.g. from before JobRunner was invoked in this process)
MediaWikiServices::getInstance()->getLinkCache()->clear();
$this->linkCache->clear();
// Run the job...
$caught = [];
@ -325,16 +363,16 @@ class JobRunner implements LoggerAwareInterface {
$fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
// Flush any pending changes left over from an implicit transaction round
if ( $job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
$lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
$this->lbFactory->commitMasterChanges( $fnameTrxOwner ); // new implicit round
} else {
$lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
$this->lbFactory->beginMasterChanges( $fnameTrxOwner ); // new explicit round
}
// Clear any stale REPEATABLE-READ snapshots from replica DB connections
$lbFactory->flushReplicaSnapshots( $fnameTrxOwner );
$this->lbFactory->flushReplicaSnapshots( $fnameTrxOwner );
$status = $job->run();
$error = $job->getLastError();
// Commit all pending changes from this job
$this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
$this->commitMasterChanges( $job, $fnameTrxOwner );
// Run any deferred update tasks; doUpdates() manages transactions itself
DeferredUpdates::doUpdates();
} catch ( Throwable $e ) {
@ -357,20 +395,20 @@ class JobRunner implements LoggerAwareInterface {
$readyTs = $job->getReadyTimestamp();
if ( $readyTs ) {
$pickupDelay = max( 0, $jobStartTime - $readyTs );
$stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
$stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
$this->stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
$this->stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
}
// Record root job age for jobs being run
$rootTimestamp = $job->getRootJobParams()['rootJobTimestamp'];
if ( $rootTimestamp ) {
$age = max( 0, $jobStartTime - wfTimestamp( TS_UNIX, $rootTimestamp ) );
$stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
$this->stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
}
// Track the execution time for jobs
$stats->timing( "jobqueue.run.$jType", $timeMs );
$this->stats->timing( "jobqueue.run.$jType", $timeMs );
// Track RSS increases for jobs (in case of memory leaks)
if ( $rssStart && $rssEnd ) {
$stats->updateCount( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
$this->stats->updateCount( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
}
if ( $status === false ) {
@ -427,7 +465,7 @@ class JobRunner implements LoggerAwareInterface {
* @see $wgJobBackoffThrottling
*/
private function getBackoffTimeToWait( RunnableJob $job ) {
$throttling = $this->config->get( 'JobBackoffThrottling' );
$throttling = $this->options->get( 'JobBackoffThrottling' );
if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
return 0; // not throttled
@ -580,16 +618,15 @@ class JobRunner implements LoggerAwareInterface {
* local wiki's replica DBs to catch up. See the documentation for
* $wgJobSerialCommitThreshold for more.
*
* @param LBFactory $lbFactory
* @param RunnableJob $job
* @param string $fnameTrxOwner
* @throws DBError
*/
private function commitMasterChanges( LBFactory $lbFactory, RunnableJob $job, $fnameTrxOwner ) {
$syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
private function commitMasterChanges( RunnableJob $job, $fnameTrxOwner ) {
$syncThreshold = $this->options->get( 'JobSerialCommitThreshold' );
$time = false;
$lb = $lbFactory->getMainLB();
$lb = $this->lbFactory->getMainLB();
if ( $syncThreshold !== false && $lb->hasStreamingReplicaServers() ) {
// Generally, there is one master connection to the local DB
$dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
@ -608,10 +645,10 @@ class JobRunner implements LoggerAwareInterface {
}
if ( !$dbwSerial ) {
$lbFactory->commitMasterChanges(
$this->lbFactory->commitMasterChanges(
$fnameTrxOwner,
// Abort if any transaction was too big
[ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
[ 'maxWriteDuration' => $this->options->get( 'MaxJobDBWriteDuration' ) ]
);
return;
@ -644,10 +681,10 @@ class JobRunner implements LoggerAwareInterface {
}
// Actually commit the DB master changes
$lbFactory->commitMasterChanges(
$this->lbFactory->commitMasterChanges(
$fnameTrxOwner,
// Abort if any transaction was too big
[ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
[ 'maxWriteDuration' => $this->options->get( 'MaxJobDBWriteDuration' ) ]
);
ScopedCallback::consume( $unlocker );
}

View file

@ -433,4 +433,12 @@ interface ILBFactory {
* @since 1.35
*/
public function setDomainAliases( array $aliases );
/**
* Get a TransactionProfiler used by this instance.
*
* @return TransactionProfiler
* @since 1.35
*/
public function getTransactionProfiler(): TransactionProfiler;
}

View file

@ -678,6 +678,10 @@ abstract class LBFactory implements ILBFactory {
$this->domainAliases = $aliases;
}
public function getTransactionProfiler(): TransactionProfiler {
return $this->trxProfiler;
}
public function setLocalDomainPrefix( $prefix ) {
$this->localDomain = new DatabaseDomain(
$this->localDomain->getDatabase(),

View file

@ -21,7 +21,7 @@
* @ingroup SpecialPage
*/
use MediaWiki\Logger\LoggerFactory;
use MediaWiki\MediaWikiServices;
/**
* Special page designed for running background tasks (internal use only)
@ -108,7 +108,7 @@ class SpecialRunJobs extends UnlistedSpecialPage {
}
protected function doRun( array $params ) {
$runner = new JobRunner( LoggerFactory::getInstance( 'runJobs' ) );
$runner = MediaWikiServices::getInstance()->getJobRunner();
return $runner->run( [
'type' => $params['type'],
'maxJobs' => $params['maxjobs'] ?: 1,

View file

@ -29,7 +29,7 @@ if ( !defined( 'MEDIAWIKI' ) ) {
require_once __DIR__ . '/Maintenance.php';
use MediaWiki\Logger\LoggerFactory;
use MediaWiki\MediaWikiServices;
/**
* Maintenance script that runs pending jobs.
@ -74,7 +74,7 @@ class RunJobs extends Maintenance {
$outputJSON = ( $this->getOption( 'result' ) === 'json' );
$wait = $this->hasOption( 'wait' );
$runner = new JobRunner( LoggerFactory::getInstance( 'runJobs' ) );
$runner = MediaWikiServices::getInstance()->getJobRunner();
if ( !$outputJSON ) {
$runner->setDebugHandler( [ $this, 'debugInternal' ] );
}

View file

@ -33,7 +33,7 @@ class JobRunnerTest extends MediaWikiIntegrationTestCase {
$this->assertTrue( $this->page->exists(), 'Sanity: The created page exists' );
$this->jobRunner = new JobRunner();
$this->jobRunner = MediaWikiServices::getInstance()->getJobRunner();
$jobParams = [
'namespace' => $this->page->getNamespace(),
'title' => $this->page->getDBkey(),
@ -70,10 +70,7 @@ class JobRunnerTest extends MediaWikiIntegrationTestCase {
}
public function testExecuteJob() {
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
$stats = new NullStatsdDataFactory();
$results = $this->jobRunner->executeJob( $this->deletePageJob, $lbFactory, $stats );
$results = $this->jobRunner->executeJob( $this->deletePageJob );
$this->assertIsInt( $results['timeMs'] );
$this->assertTrue( $results['status'] );