[JobQueue] Cleanups for JobQueueRedis.

* Cleaned up 'server' option to not fragment the pool.
  Also made it actually match the documentation.
* Made it use doGetPeriodicTasks() for job recycling.
* Made it so that other job queue classes can be tested.
* Renamed "redisConf" => "redisConfig".
* Tweaked comments about the "random" order option.

Change-Id: I7823d90010e6bc9d581435c3be92830c5ba68480
This commit is contained in:
Aaron Schulz 2013-02-08 10:52:54 -08:00 committed by Gerrit Code Review
parent 04e0d75f86
commit 9cc7e6a39a
5 changed files with 91 additions and 51 deletions

View file

@ -6213,7 +6213,7 @@ $wgMaxShellTime = 180;
$wgMaxShellWallClockTime = 180;
/**
* Under Linux: a cgroup directory used to constrain memory usage of shell
* Under Linux: a cgroup directory used to constrain memory usage of shell
* commands. The directory must be writable by the user which runs MediaWiki.
*
* If specified, this is used instead of ulimit, which is inaccurate, and
@ -6221,7 +6221,7 @@ $wgMaxShellWallClockTime = 180;
* them segfault or deadlock.
*
* A wrapper script will create a cgroup for each shell command that runs, as
* a subgroup of the specified cgroup. If the memory limit is exceeded, the
* a subgroup of the specified cgroup. If the memory limit is exceeded, the
* kernel will send a SIGKILL signal to a process in the subgroup.
*
* @par Example:
@ -6231,7 +6231,7 @@ $wgMaxShellWallClockTime = 180;
* echo '$wgShellCgroup = "/sys/fs/cgroup/memory/mediawiki/job";' >> LocalSettings.php
* @endcode
*
* The reliability of cgroup cleanup can be improved by installing a
* The reliability of cgroup cleanup can be improved by installing a
* notify_on_release script in the root cgroup, see e.g.
* https://gerrit.wikimedia.org/r/#/c/40784
*/

View file

@ -62,6 +62,8 @@ abstract class JobQueue {
* by timestamp, allowing for some jobs to be popped off out of order.
* If "random" is used, pop() will pick jobs in random order. This might be
* useful for improving concurrency depending on the queue storage medium.
* Note that "random" really means "don't care", so it may actually be FIFO
* or only weakly random (e.g. pop() takes one of the first X jobs randomly).
* - claimTTL : If supported, the queue will recycle jobs that have been popped
* but not acknowledged as completed after this many seconds. Recycling
* of jobs simple means re-inserting them into the queue. Jobs can be
@ -371,4 +373,15 @@ abstract class JobQueue {
* @return void
*/
protected function doFlushCaches() {}
/**
* Namespace the queue with a key to isolate it for testing
*
* @param $key string
* @return void
* @throws MWException
*/
public function setTestingPrefix( $key ) {
throw new MWException( "Queue namespacing not support for this queue type." );
}
}

View file

@ -36,18 +36,20 @@ class JobQueueRedis extends JobQueue {
const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
protected $key; // string; key to prefix the queue keys with (used for testing)
/**
* @params include:
* - redisConf : An array of parameters to RedisConnectionPool::__construct().
* - server : A hostname/port combination or the absolute path of a UNIX socket.
* If a hostname is specified but no port, the standard port number
* 6379 will be used. Required.
* - redisConfig : An array of parameters to RedisConnectionPool::__construct().
* - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
* If a hostname is specified but no port, the standard port number
* 6379 will be used. Required.
* @param array $params
*/
public function __construct( array $params ) {
parent::__construct( $params );
$this->server = $params['redisConf']['server'];
$this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] );
$this->server = $params['redisServer'];
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
}
/**
@ -56,10 +58,6 @@ class JobQueueRedis extends JobQueue {
* @throws MWException
*/
protected function doIsEmpty() {
if ( mt_rand( 0, 99 ) == 0 ) {
$this->doInternalMaintenance();
}
$conn = $this->getConnection();
try {
return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
@ -74,10 +72,6 @@ class JobQueueRedis extends JobQueue {
* @throws MWException
*/
protected function doGetSize() {
if ( mt_rand( 0, 99 ) == 0 ) {
$this->doInternalMaintenance();
}
$conn = $this->getConnection();
try {
return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
@ -92,17 +86,12 @@ class JobQueueRedis extends JobQueue {
* @throws MWException
*/
protected function doGetAcquiredCount() {
if ( mt_rand( 0, 99 ) == 0 ) {
$this->doInternalMaintenance();
if ( $this->claimTTL <= 0 ) {
return 0; // no acknowledgements
}
$conn = $this->getConnection();
try {
if ( $this->claimTTL > 0 ) {
return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
} else {
return 0;
}
return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
@ -190,8 +179,8 @@ class JobQueueRedis extends JobQueue {
protected function doPop() {
$job = false;
if ( mt_rand( 0, 99 ) == 0 ) {
$this->doInternalMaintenance();
if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
$this->cleanupClaimedJobs(); // prune jobs and IDs from the "garbage" list
}
$conn = $this->getConnection();
@ -340,25 +329,16 @@ class JobQueueRedis extends JobQueue {
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
}
/**
* Do any job recycling or queue cleanup as needed
*
* @return void
* @return integer Number of jobs recycled/deleted
* @throws MWException
*/
protected function doInternalMaintenance() {
return ( $this->claimTTL > 0 ) ?
$this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs();
}
/**
* Recycle or destroy any jobs that have been claimed for too long
*
* @return integer Number of jobs recycled/deleted
* @throws MWException
*/
protected function recycleAndDeleteStaleJobs() {
public function recycleAndDeleteStaleJobs() {
if ( $this->claimTTL <= 0 ) { // sanity
throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
}
$count = 0;
// For each job item that can be retried, we need to add it back to the
// main queue and remove it from the list of currenty claimed job items.
@ -488,6 +468,22 @@ class JobQueueRedis extends JobQueue {
return $count;
}
/**
* @return Array
*/
protected function doGetPeriodicTasks() {
if ( $this->claimTTL > 0 ) {
return array(
'recycleAndDeleteStaleJobs' => array(
'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
'period' => ceil( $this->claimTTL / 2 )
)
);
} else {
return array();
}
}
/**
* @param $job Job
* @return array
@ -560,7 +556,11 @@ class JobQueueRedis extends JobQueue {
*/
private function getQueueKey( $prop ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
if ( strlen( $this->key ) ) { // namespaced queue (for testing)
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
} else {
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
}
}
/**
@ -606,4 +606,12 @@ class JobQueueRedis extends JobQueue {
}
return $res;
}
/**
* @param $key string
* @return void
*/
public function setTestingPrefix( $key ) {
$this->key = $key;
}
}

View file

@ -7,6 +7,7 @@ class MediaWikiPHPUnitCommand extends PHPUnit_TextUI_Command {
'file=' => false,
'use-filebackend=' => false,
'use-bagostuff=' => false,
'use-jobqueue=' => false,
'keep-uploads' => false,
'use-normal-tables' => false,
'reuse-db' => false,

View file

@ -6,6 +6,8 @@
* @group Database
*/
class JobQueueTest extends MediaWikiTestCase {
protected $key;
protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;
protected $old = array();
function __construct( $name = null, array $data = array(), $dataName = '' ) {
@ -15,18 +17,34 @@ class JobQueueTest extends MediaWikiTestCase {
}
protected function setUp() {
global $wgMemc;
global $wgMemc, $wgJobTypeConf;
parent::setUp();
$this->old['wgMemc'] = $wgMemc;
$wgMemc = new HashBagOStuff();
$this->queueRand = JobQueue::factory( array( 'class' => 'JobQueueDB',
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random' ) );
$this->queueRandTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random', 'claimTTL' => 10 ) );
$this->queueFifo = JobQueue::factory( array( 'class' => 'JobQueueDB',
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo' ) );
$this->queueFifoTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo', 'claimTTL' => 10 ) );
if ( $this->getCliArg( 'use-jobqueue=' ) ) {
$name = $this->getCliArg( 'use-jobqueue=' );
if ( !isset( $wgJobTypeConf[$name] ) ) {
throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
}
$baseConfig = $wgJobTypeConf[$name];
} else {
$baseConfig = array( 'class' => 'JobQueueDB' );
}
$baseConfig['type'] = 'null';
$baseConfig['wiki'] = wfWikiID();
$this->queueRand = JobQueue::factory(
array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig );
$this->queueRandTTL = JobQueue::factory(
array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig );
$this->queueFifo = JobQueue::factory(
array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig );
$this->queueFifoTTL = JobQueue::factory(
array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig );
if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables
foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
$this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
}
}
}
protected function tearDown() {
@ -239,7 +257,7 @@ class JobQueueTest extends MediaWikiTestCase {
$queue->ack( $job );
}
$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
$this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
$queue->flushCaches();
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );