Moved some JobQueueAggregator logic out of JobQueueGroup

Change-Id: I28ba1a25db225d4cf5f503a6c0f4405f13118151
This commit is contained in:
Aaron Schulz 2015-02-16 15:34:53 -08:00
parent 82ce368f80
commit 37042262e3
5 changed files with 17 additions and 12 deletions

View file

@ -49,6 +49,8 @@ abstract class JobQueue {
/** @var BagOStuff */
protected $dupCache;
/** @var JobQueueAggregator */
protected $aggr;
const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
@ -76,6 +78,9 @@ abstract class JobQueue {
throw new MWException( __CLASS__ . " does not support delayed jobs." );
}
$this->dupCache = wfGetCache( CACHE_ANYTHING );
$this->aggr = isset( $params['aggregator'] )
? $params['aggregator']
: new JobQueueAggregatorNull( array() );
}
/**
@ -298,7 +303,8 @@ abstract class JobQueue {
* @throws JobQueueError
*/
final public function push( $jobs, $flags = 0 ) {
$this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
$jobs = is_array( $jobs ) ? $jobs : array( $jobs );
$this->batchPush( $jobs, $flags );
}
/**
@ -327,6 +333,7 @@ abstract class JobQueue {
}
$this->doBatchPush( $jobs, $flags );
$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
/**
@ -356,6 +363,10 @@ abstract class JobQueue {
$job = $this->doPop();
if ( !$job ) {
$this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
}
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {

View file

@ -686,7 +686,9 @@ class JobQueueDB extends JobQueue {
$affected = $dbw->affectedRows();
$count += $affected;
JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
// The tasks recycled jobs or release delayed jobs into the queue
$this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
}

View file

@ -94,6 +94,7 @@ class JobQueueGroup {
} else {
$conf = $conf + $wgJobTypeConf['default'];
}
$conf['aggregator'] = JobQueueAggregator::singleton();
return JobQueue::factory( $conf );
}
@ -125,7 +126,6 @@ class JobQueueGroup {
foreach ( $jobsByType as $type => $jobs ) {
$this->get( $type )->push( $jobs );
JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
}
if ( $this->cache->has( 'queues-ready', 'list' ) ) {
@ -153,9 +153,6 @@ class JobQueueGroup {
if ( is_string( $qtype ) ) { // specific job type
if ( !in_array( $qtype, $blacklist ) ) {
$job = $this->get( $qtype )->pop();
if ( !$job ) {
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
}
}
} else { // any job in the "default" jobs types
if ( $flags & self::USE_CACHE ) {
@ -179,7 +176,6 @@ class JobQueueGroup {
if ( $job ) { // found
break;
} else { // not found
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
$this->cache->clear( 'queues-ready' );
}
}
@ -381,10 +377,6 @@ class JobQueueGroup {
}
}
}
// The tasks may have recycled jobs or release delayed jobs into the queue
if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
}
}
if ( $count === 0 ) {

View file

@ -34,7 +34,7 @@ abstract class JobQueueAggregator {
/**
* @param array $params
*/
protected function __construct( array $params ) {
public function __construct( array $params ) {
}
/**

View file

@ -44,7 +44,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
* If a hostname is specified but no port, the standard port number
* 6379 will be used. Required.
*/
protected function __construct( array $params ) {
public function __construct( array $params ) {
parent::__construct( $params );
$this->servers = isset( $params['redisServers'] )
? $params['redisServers']