Add a JobQueueGroupFactory service

This replaces JobQueueGroup::singleton and ::destroySingletons.
A JobQueueGroup service has been added to serve as convenience
getter.

Bug: T277648
Change-Id: Ic8709119773ab788a07531193dc65d418536eda2
This commit is contained in:
mainframe98 2021-03-17 13:37:06 +01:00 committed by Mainframe98
parent bb285a709d
commit ae55be3000
8 changed files with 277 additions and 76 deletions

View file

@ -232,6 +232,10 @@ because of Phabricator reports.
post-initialization configuration.
* FileBackendGroup::singleton() and ::destroySingletons(), deprecated since
1.35, now emit deprecation warnings.
* JobQueueGroup::singleton was deprecated - use
MediaWikiServices::getJobQueueGroup instead.
* JobQueueGroup::destroySingletons was deprecated. JobQueueGroups are now
automatically destroyed after tests.
* …
=== Other changes in 1.37 ===

View file

@ -160,6 +160,7 @@ class AutoLoader {
'MediaWiki\\Edit\\' => __DIR__ . '/edit/',
'MediaWiki\\EditPage\\' => __DIR__ . '/editpage/',
'MediaWiki\\FileBackend\\LockManager\\' => __DIR__ . '/filebackend/lockmanager/',
'MediaWiki\\JobQueue\\' => __DIR__ . '/jobqueue/',
'MediaWiki\\Json\\' => __DIR__ . '/json/',
'MediaWiki\\Http\\' => __DIR__ . '/http/',
'MediaWiki\\Installer\\' => __DIR__ . '/installer/',

View file

@ -18,6 +18,7 @@ use GenderCache;
use GlobalVarConfig;
use HtmlCacheUpdater;
use IBufferingStatsdDataFactory;
use JobQueueGroup;
use JobRunner;
use Language;
use LinkCache;
@ -45,6 +46,7 @@ use MediaWiki\HookContainer\HookContainer;
use MediaWiki\HookContainer\HookRunner;
use MediaWiki\Http\HttpRequestFactory;
use MediaWiki\Interwiki\InterwikiLookup;
use MediaWiki\JobQueue\JobQueueGroupFactory;
use MediaWiki\Json\JsonCodec;
use MediaWiki\Languages\LanguageConverterFactory;
use MediaWiki\Languages\LanguageFactory;
@ -908,6 +910,22 @@ class MediaWikiServices extends ServiceContainer {
return $this->getService( 'InterwikiLookup' );
}
/**
* @since 1.37
* @return JobQueueGroup
*/
public function getJobQueueGroup() : JobQueueGroup {
return $this->getService( 'JobQueueGroup' );
}
/**
* @since 1.37
* @return JobQueueGroupFactory
*/
public function getJobQueueGroupFactory() : JobQueueGroupFactory {
return $this->getService( 'JobQueueGroupFactory' );
}
/**
* @since 1.35
* @return JobRunner

View file

@ -73,6 +73,7 @@ use MediaWiki\HookContainer\HookRunner;
use MediaWiki\Http\HttpRequestFactory;
use MediaWiki\Interwiki\ClassicInterwikiLookup;
use MediaWiki\Interwiki\InterwikiLookup;
use MediaWiki\JobQueue\JobQueueGroupFactory;
use MediaWiki\Json\JsonCodec;
use MediaWiki\Languages\LanguageConverterFactory;
use MediaWiki\Languages\LanguageFactory;
@ -585,11 +586,25 @@ return [
);
},
'JobQueueGroup' => static function ( MediaWikiServices $services ) : JobQueueGroup {
return $services->getJobQueueGroupFactory()->makeJobQueueGroup();
},
'JobQueueGroupFactory' => static function ( MediaWikiServices $services ) : JobQueueGroupFactory {
return new JobQueueGroupFactory(
new ServiceOptions( JobQueueGroupFactory::CONSTRUCTOR_OPTIONS, $services->getMainConfig() ),
$services->getConfiguredReadOnlyMode(),
$services->getStatsdDataFactory(),
$services->getMainWANObjectCache(),
$services->getGlobalIdGenerator()
);
},
'JobRunner' => static function ( MediaWikiServices $services ) : JobRunner {
return new JobRunner(
new ServiceOptions( JobRunner::CONSTRUCTOR_OPTIONS, $services->getMainConfig() ),
$services->getDBLoadBalancerFactory(),
JobQueueGroup::singleton(),
$services->getJobQueueGroup(),
$services->getReadOnlyMode(),
$services->getLinkCache(),
$services->getStatsdDataFactory(),
@ -1222,8 +1237,7 @@ return [
return new RevertedTagUpdateManager(
$editResultCache,
// TODO: should be replaced with proper service injection
JobQueueGroup::singleton()
$services->getJobQueueGroup()
);
},
@ -1506,12 +1520,10 @@ return [
},
'UserEditTracker' => static function ( MediaWikiServices $services ) : UserEditTracker {
$jobQueueGroup = JobQueueGroup::singleton();
return new UserEditTracker(
$services->getActorMigration(),
$services->getDBLoadBalancer(),
$jobQueueGroup
$services->getJobQueueGroup()
);
},
@ -1622,7 +1634,7 @@ return [
new ServiceOptions( WatchedItemStore::CONSTRUCTOR_OPTIONS,
$services->getMainConfig() ),
$services->getDBLoadBalancerFactory(),
JobQueueGroup::singleton(),
$services->getJobQueueGroup(),
$services->getMainObjectStash(),
new HashBagOStuff( [ 'maxKeys' => 100 ] ),
$services->getReadOnlyMode(),

View file

@ -19,7 +19,9 @@
*
* @file
*/
use MediaWiki\MediaWikiServices;
use Wikimedia\UUID\GlobalIdGenerator;
/**
* Class to handle enqueueing of background jobs
@ -28,7 +30,10 @@ use MediaWiki\MediaWikiServices;
* @since 1.21
*/
class JobQueueGroup {
/** @var JobQueueGroup[] */
/**
* @var JobQueueGroup[]
* @deprecated 1.37
*/
protected static $instances = [];
/** @var MapCacheLRU */
@ -36,10 +41,22 @@ class JobQueueGroup {
/** @var string Wiki domain ID */
protected $domain;
/** @var string|bool Read only rationale (or false if r/w) */
protected $readOnlyReason;
/** @var ConfiguredReadOnlyMode Read only mode */
protected $readOnlyMode;
/** @var bool Whether the wiki is not recognized in configuration */
protected $invalidDomain = false;
/** @var array */
private $jobClasses;
/** @var array */
private $jobTypeConfiguration;
/** @var array */
private $jobTypesExcludedFromDefaultQueue;
/** @var IBufferingStatsdDataFactory */
private $statsdDataFactory;
/** @var WANObjectCache */
private $wanCache;
/** @var GlobalIdGenerator */
private $globalIdGenerator;
/** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
protected $coalescedQueues;
@ -54,52 +71,57 @@ class JobQueueGroup {
private const CACHE_VERSION = 1; // integer; cache version
/**
* @internal Use MediaWikiServices::getJobQueueGroupFactory
*
* @param string $domain Wiki domain ID
* @param string|bool $readOnlyReason Read-only reason or false
* @param ConfiguredReadOnlyMode $readOnlyMode Read-only mode
* @param bool $invalidDomain Whether the wiki is not recognized in configuration
* @param array $jobClasses
* @param array $jobTypeConfiguration
* @param array $jobTypesExcludedFromDefaultQueue
* @param IBufferingStatsdDataFactory $statsdDataFactory
* @param WANObjectCache $wanCache
* @param GlobalIdGenerator $globalIdGenerator
*/
protected function __construct( $domain, $readOnlyReason ) {
public function __construct(
$domain,
ConfiguredReadOnlyMode $readOnlyMode,
bool $invalidDomain,
array $jobClasses,
array $jobTypeConfiguration,
array $jobTypesExcludedFromDefaultQueue,
IBufferingStatsdDataFactory $statsdDataFactory,
WANObjectCache $wanCache,
GlobalIdGenerator $globalIdGenerator
) {
$this->domain = $domain;
$this->readOnlyReason = $readOnlyReason;
$this->readOnlyMode = $readOnlyMode;
$this->cache = new MapCacheLRU( 10 );
$this->invalidDomain = $invalidDomain;
$this->jobClasses = $jobClasses;
$this->jobTypeConfiguration = $jobTypeConfiguration;
$this->jobTypesExcludedFromDefaultQueue = $jobTypesExcludedFromDefaultQueue;
$this->statsdDataFactory = $statsdDataFactory;
$this->wanCache = $wanCache;
$this->globalIdGenerator = $globalIdGenerator;
}
/**
* @deprecated 1.37 Use JobQueueGroupFactory::makeJobQueueGroupFactory
* @param bool|string $domain Wiki domain ID
* @return JobQueueGroup
*/
public static function singleton( $domain = false ) {
global $wgLocalDatabases;
if ( $domain === false ) {
$domain = WikiMap::getCurrentWikiDbDomain()->getId();
}
if ( !isset( self::$instances[$domain] ) ) {
$reason = MediaWikiServices::getInstance()
->getConfiguredReadOnlyMode()
->getReason();
self::$instances[$domain] = new self( $domain, $reason );
// Make sure jobs are not getting pushed to bogus wikis. This can confuse
// the job runner system into spawning endless RPC requests that fail (T171371).
$wikiId = WikiMap::getWikiIdFromDbDomain( $domain );
if (
!WikiMap::isCurrentWikiDbDomain( $domain ) &&
!in_array( $wikiId, $wgLocalDatabases )
) {
self::$instances[$domain]->invalidDomain = true;
}
}
return self::$instances[$domain];
return MediaWikiServices::getInstance()->getJobQueueGroupFactory()->makeJobQueueGroup( $domain );
}
/**
* Destroy the singleton instances
*
* @deprecated 1.37
* @return void
*/
public static function destroySingletons() {
self::$instances = [];
}
/**
@ -109,31 +131,19 @@ class JobQueueGroup {
* @return JobQueue
*/
public function get( $type ) {
global $wgJobTypeConf;
$conf = [ 'domain' => $this->domain, 'type' => $type ];
if ( isset( $wgJobTypeConf[$type] ) ) {
$conf += $wgJobTypeConf[$type];
if ( isset( $this->jobTypeConfiguration[$type] ) ) {
$conf += $this->jobTypeConfiguration[$type];
} else {
$conf += $wgJobTypeConf['default'];
$conf += $this->jobTypeConfiguration['default'];
}
if ( !isset( $conf['readOnlyReason'] ) ) {
$conf['readOnlyReason'] = $this->readOnlyReason;
$conf['readOnlyReason'] = $this->readOnlyMode->getReason();
}
return $this->factoryJobQueue( $conf );
}
/**
* @param array $conf
* @return JobQueue
* @throws JobQueueError
*/
private function factoryJobQueue( array $conf ) {
$services = MediaWikiServices::getInstance();
$conf['stats'] = $services->getStatsdDataFactory();
$conf['wanCache'] = $services->getMainWANObjectCache();
$conf['idGenerator'] = $services->getGlobalIdGenerator();
$conf['stats'] = $this->statsdDataFactory;
$conf['wanCache'] = $this->wanCache;
$conf['idGenerator'] = $this->globalIdGenerator;
return JobQueue::factory( $conf );
}
@ -149,8 +159,6 @@ class JobQueueGroup {
* @return void
*/
public function push( $jobs ) {
global $wgJobTypesExcludedFromDefaultQueue;
if ( $this->invalidDomain ) {
// Do not enqueue job that cannot be run (T171371)
$e = new LogicException( "Domain '{$this->domain}' is not recognized." );
@ -187,7 +195,7 @@ class JobQueueGroup {
'true',
15
);
if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
if ( array_diff( array_keys( $jobsByType ), $this->jobTypesExcludedFromDefaultQueue ) ) {
$cache->set(
$cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
'true',
@ -234,14 +242,12 @@ class JobQueueGroup {
* @return RunnableJob|bool Returns false on failure
*/
public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $ignored = [] ) {
global $wgJobClasses;
$job = false;
if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
throw new JobQueueError(
"Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
} elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) {
} elseif ( is_string( $qtype ) && !isset( $this->jobClasses[$qtype] ) ) {
// Do not pop jobs if there is no class for the queue type
throw new JobQueueError( "Unrecognized job type '$qtype'." );
}
@ -309,10 +315,8 @@ class JobQueueGroup {
* @return void
*/
public function waitForBackups() {
global $wgJobTypeConf;
// Try to avoid doing this more than once per queue storage medium
foreach ( $wgJobTypeConf as $type => $conf ) {
foreach ( $this->jobTypeConfiguration as $type => $conf ) {
$this->get( $type )->waitForBackups();
}
}
@ -332,9 +336,7 @@ class JobQueueGroup {
* @return string[]
*/
public function getDefaultQueueTypes() {
global $wgJobTypesExcludedFromDefaultQueue;
return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
return array_diff( $this->getQueueTypes(), $this->jobTypesExcludedFromDefaultQueue );
}
/**
@ -414,13 +416,16 @@ class JobQueueGroup {
* @phan-return array<string,array{queue:JobQueue,types:array<string,class-string>}>
*/
protected function getCoalescedQueues() {
global $wgJobTypeConf;
if ( $this->coalescedQueues === null ) {
$this->coalescedQueues = [];
foreach ( $wgJobTypeConf as $type => $conf ) {
$queue = $this->factoryJobQueue(
[ 'domain' => $this->domain, 'type' => 'null' ] + $conf );
foreach ( $this->jobTypeConfiguration as $type => $conf ) {
$conf['domain'] = $this->domain;
$conf['type'] = 'null';
$conf['stats'] = $this->statsdDataFactory;
$conf['wanCache'] = $this->wanCache;
$conf['idGenerator'] = $this->globalIdGenerator;
$queue = JobQueue::factory( $conf );
$loc = $queue->getCoalesceLocationInternal();
if ( !isset( $this->coalescedQueues[$loc] ) ) {
$this->coalescedQueues[$loc]['queue'] = $queue;
@ -429,7 +434,7 @@ class JobQueueGroup {
if ( $type === 'default' ) {
$this->coalescedQueues[$loc]['types'] = array_merge(
$this->coalescedQueues[$loc]['types'],
array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
array_diff( $this->getQueueTypes(), array_keys( $this->jobTypeConfiguration ) )
);
} else {
$this->coalescedQueues[$loc]['types'][] = $type;

View file

@ -0,0 +1,128 @@
<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
namespace MediaWiki\JobQueue;
use ConfiguredReadOnlyMode;
use IBufferingStatsdDataFactory;
use JobQueueGroup;
use MediaWiki\Config\ServiceOptions;
use WANObjectCache;
use WikiMap;
use Wikimedia\UUID\GlobalIdGenerator;
/**
* Class to construct JobQueueGroups
*
* @ingroup JobQueue
* @since 1.37
*/
class JobQueueGroupFactory {
/**
* @internal For use by ServiceWiring
*/
public const CONSTRUCTOR_OPTIONS = [
'JobClasses',
'JobTypeConf',
'JobTypesExcludedFromDefaultQueue',
'LocalDatabases',
];
/** @var JobQueueGroup[] */
private $instances;
/** @var ServiceOptions */
private $options;
/** @var ConfiguredReadOnlyMode */
private $readOnlyMode;
/** @var IBufferingStatsdDataFactory */
private $statsdDataFactory;
/** @var WANObjectCache */
private $wanCache;
/** @var GlobalIdGenerator */
private $globalIdGenerator;
/**
* @param ServiceOptions $options
* @param ConfiguredReadOnlyMode $readOnlyMode
* @param IBufferingStatsdDataFactory $statsdDataFactory
* @param WANObjectCache $wanCache
* @param GlobalIdGenerator $globalIdGenerator
*/
public function __construct(
ServiceOptions $options,
ConfiguredReadOnlyMode $readOnlyMode,
IBufferingStatsdDataFactory $statsdDataFactory,
WANObjectCache $wanCache,
GlobalIdGenerator $globalIdGenerator
) {
$options->assertRequiredOptions( self::CONSTRUCTOR_OPTIONS );
$this->instances = [];
$this->options = $options;
$this->readOnlyMode = $readOnlyMode;
$this->statsdDataFactory = $statsdDataFactory;
$this->wanCache = $wanCache;
$this->globalIdGenerator = $globalIdGenerator;
}
/**
* @since 1.37
*
* @param false|string $domain Wiki domain ID. False uses the current wiki domain ID
* @return JobQueueGroup
*/
public function makeJobQueueGroup( $domain = false ) : JobQueueGroup {
if ( $domain === false ) {
$domain = WikiMap::getCurrentWikiDbDomain()->getId();
}
if ( !isset( $this->instances[$domain] ) ) {
// Make sure jobs are not getting pushed to bogus wikis. This can confuse
// the job runner system into spawning endless RPC requests that fail (T171371).
$wikiId = WikiMap::getWikiIdFromDbDomain( $domain );
if (
!WikiMap::isCurrentWikiDbDomain( $domain ) &&
!in_array( $wikiId, $this->options->get( 'LocalDatabases' ) )
) {
$invalidDomain = true;
} else {
$invalidDomain = false;
}
$this->instances[$domain] = new JobQueueGroup(
$domain,
$this->readOnlyMode,
$invalidDomain,
$this->options->get( 'JobClasses' ),
$this->options->get( 'JobTypeConf' ),
$this->options->get( 'JobTypesExcludedFromDefaultQueue' ),
$this->statsdDataFactory,
$this->wanCache,
$this->globalIdGenerator
);
}
return $this->instances[$domain];
}
}

View file

@ -385,10 +385,11 @@ abstract class MediaWikiIntegrationTestCase extends PHPUnit\Framework\TestCase {
public static function resetNonServiceCaches() {
global $wgRequest, $wgJobClasses;
$jobQueueFactory = MediaWikiServices::getInstance()->getJobQueueGroupFactory();
foreach ( $wgJobClasses as $type => $class ) {
JobQueueGroup::singleton()->get( $type )->delete();
$jobQueueFactory->makeJobQueueGroup()->get( $type )->delete();
}
JobQueueGroup::destroySingletons();
ObjectCache::clear();
DeferredUpdates::clearPendingUpdates();

View file

@ -0,0 +1,32 @@
<?php
use MediaWiki\Config\ServiceOptions;
use MediaWiki\JobQueue\JobQueueGroupFactory;
use Wikimedia\UUID\GlobalIdGenerator;
/**
* @covers \MediaWiki\JobQueue\JobQueueGroupFactory
*/
class JobQueueGroupFactoryTest extends MediaWikiUnitTestCase {
public function testMakeJobQueueGroupReturnsSameInstance() : void {
$factory = new JobQueueGroupFactory(
new ServiceOptions(
JobQueueGroupFactory::CONSTRUCTOR_OPTIONS,
new HashConfig( [
'LocalDatabases' => [],
'JobClasses' => [],
'JobTypeConf' => [],
'JobTypesExcludedFromDefaultQueue' => []
] )
),
$this->createMock( ConfiguredReadOnlyMode::class ),
$this->createMock( IBufferingStatsdDataFactory::class ),
$this->createMock( WANObjectCache::class ),
$this->createMock( GlobalIdGenerator::class )
);
$group = $factory->makeJobQueueGroup();
$this->assertSame( $group, $factory->makeJobQueueGroup() );
}
}