Moved some JobQueueAggregator logic out of JobQueueGroup
Change-Id: I28ba1a25db225d4cf5f503a6c0f4405f13118151
This commit is contained in:
parent
82ce368f80
commit
37042262e3
5 changed files with 17 additions and 12 deletions
|
|
@ -49,6 +49,8 @@ abstract class JobQueue {
|
|||
|
||||
/** @var BagOStuff */
|
||||
protected $dupCache;
|
||||
/** @var JobQueueAggregator */
|
||||
protected $aggr;
|
||||
|
||||
const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
|
||||
|
||||
|
|
@ -76,6 +78,9 @@ abstract class JobQueue {
|
|||
throw new MWException( __CLASS__ . " does not support delayed jobs." );
|
||||
}
|
||||
$this->dupCache = wfGetCache( CACHE_ANYTHING );
|
||||
$this->aggr = isset( $params['aggregator'] )
|
||||
? $params['aggregator']
|
||||
: new JobQueueAggregatorNull( array() );
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -298,7 +303,8 @@ abstract class JobQueue {
|
|||
* @throws JobQueueError
|
||||
*/
|
||||
final public function push( $jobs, $flags = 0 ) {
|
||||
$this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
|
||||
$jobs = is_array( $jobs ) ? $jobs : array( $jobs );
|
||||
$this->batchPush( $jobs, $flags );
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -327,6 +333,7 @@ abstract class JobQueue {
|
|||
}
|
||||
|
||||
$this->doBatchPush( $jobs, $flags );
|
||||
$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -356,6 +363,10 @@ abstract class JobQueue {
|
|||
|
||||
$job = $this->doPop();
|
||||
|
||||
if ( !$job ) {
|
||||
$this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
|
||||
}
|
||||
|
||||
// Flag this job as an old duplicate based on its "root" job...
|
||||
try {
|
||||
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
|
||||
|
|
|
|||
|
|
@ -686,7 +686,9 @@ class JobQueueDB extends JobQueue {
|
|||
$affected = $dbw->affectedRows();
|
||||
$count += $affected;
|
||||
JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
|
||||
// The tasks recycled jobs or release delayed jobs into the queue
|
||||
$this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
|
||||
$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -94,6 +94,7 @@ class JobQueueGroup {
|
|||
} else {
|
||||
$conf = $conf + $wgJobTypeConf['default'];
|
||||
}
|
||||
$conf['aggregator'] = JobQueueAggregator::singleton();
|
||||
|
||||
return JobQueue::factory( $conf );
|
||||
}
|
||||
|
|
@ -125,7 +126,6 @@ class JobQueueGroup {
|
|||
|
||||
foreach ( $jobsByType as $type => $jobs ) {
|
||||
$this->get( $type )->push( $jobs );
|
||||
JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
|
||||
}
|
||||
|
||||
if ( $this->cache->has( 'queues-ready', 'list' ) ) {
|
||||
|
|
@ -153,9 +153,6 @@ class JobQueueGroup {
|
|||
if ( is_string( $qtype ) ) { // specific job type
|
||||
if ( !in_array( $qtype, $blacklist ) ) {
|
||||
$job = $this->get( $qtype )->pop();
|
||||
if ( !$job ) {
|
||||
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
|
||||
}
|
||||
}
|
||||
} else { // any job in the "default" jobs types
|
||||
if ( $flags & self::USE_CACHE ) {
|
||||
|
|
@ -179,7 +176,6 @@ class JobQueueGroup {
|
|||
if ( $job ) { // found
|
||||
break;
|
||||
} else { // not found
|
||||
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
|
||||
$this->cache->clear( 'queues-ready' );
|
||||
}
|
||||
}
|
||||
|
|
@ -381,10 +377,6 @@ class JobQueueGroup {
|
|||
}
|
||||
}
|
||||
}
|
||||
// The tasks may have recycled jobs or release delayed jobs into the queue
|
||||
if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
|
||||
JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
|
||||
}
|
||||
}
|
||||
|
||||
if ( $count === 0 ) {
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ abstract class JobQueueAggregator {
|
|||
/**
|
||||
* @param array $params
|
||||
*/
|
||||
protected function __construct( array $params ) {
|
||||
public function __construct( array $params ) {
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
|
|||
* If a hostname is specified but no port, the standard port number
|
||||
* 6379 will be used. Required.
|
||||
*/
|
||||
protected function __construct( array $params ) {
|
||||
public function __construct( array $params ) {
|
||||
parent::__construct( $params );
|
||||
$this->servers = isset( $params['redisServers'] )
|
||||
? $params['redisServers']
|
||||
|
|
|
|||
Loading…
Reference in a new issue