[JobQueue] Cleanups for JobQueueRedis.
* Cleaned up 'server' option to not fragment the pool. Also made it actually match the documentation. * Made it use doGetPeriodicTasks() for job recycling. * Made it so that other job queue classes can be tested. * Renamed "redisConf" => "redisConfig". * Tweaked comments about the "random" order option. Change-Id: I7823d90010e6bc9d581435c3be92830c5ba68480
This commit is contained in:
parent
04e0d75f86
commit
9cc7e6a39a
5 changed files with 91 additions and 51 deletions
|
|
@ -6213,7 +6213,7 @@ $wgMaxShellTime = 180;
|
|||
$wgMaxShellWallClockTime = 180;
|
||||
|
||||
/**
|
||||
* Under Linux: a cgroup directory used to constrain memory usage of shell
|
||||
* Under Linux: a cgroup directory used to constrain memory usage of shell
|
||||
* commands. The directory must be writable by the user which runs MediaWiki.
|
||||
*
|
||||
* If specified, this is used instead of ulimit, which is inaccurate, and
|
||||
|
|
@ -6221,7 +6221,7 @@ $wgMaxShellWallClockTime = 180;
|
|||
* them segfault or deadlock.
|
||||
*
|
||||
* A wrapper script will create a cgroup for each shell command that runs, as
|
||||
* a subgroup of the specified cgroup. If the memory limit is exceeded, the
|
||||
* a subgroup of the specified cgroup. If the memory limit is exceeded, the
|
||||
* kernel will send a SIGKILL signal to a process in the subgroup.
|
||||
*
|
||||
* @par Example:
|
||||
|
|
@ -6231,7 +6231,7 @@ $wgMaxShellWallClockTime = 180;
|
|||
* echo '$wgShellCgroup = "/sys/fs/cgroup/memory/mediawiki/job";' >> LocalSettings.php
|
||||
* @endcode
|
||||
*
|
||||
* The reliability of cgroup cleanup can be improved by installing a
|
||||
* The reliability of cgroup cleanup can be improved by installing a
|
||||
* notify_on_release script in the root cgroup, see e.g.
|
||||
* https://gerrit.wikimedia.org/r/#/c/40784
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -62,6 +62,8 @@ abstract class JobQueue {
|
|||
* by timestamp, allowing for some jobs to be popped off out of order.
|
||||
* If "random" is used, pop() will pick jobs in random order. This might be
|
||||
* useful for improving concurrency depending on the queue storage medium.
|
||||
* Note that "random" really means "don't care", so it may actually be FIFO
|
||||
* or only weakly random (e.g. pop() takes one of the first X jobs randomly).
|
||||
* - claimTTL : If supported, the queue will recycle jobs that have been popped
|
||||
* but not acknowledged as completed after this many seconds. Recycling
|
||||
* of jobs simple means re-inserting them into the queue. Jobs can be
|
||||
|
|
@ -371,4 +373,15 @@ abstract class JobQueue {
|
|||
* @return void
|
||||
*/
|
||||
protected function doFlushCaches() {}
|
||||
|
||||
/**
|
||||
* Namespace the queue with a key to isolate it for testing
|
||||
*
|
||||
* @param $key string
|
||||
* @return void
|
||||
* @throws MWException
|
||||
*/
|
||||
public function setTestingPrefix( $key ) {
|
||||
throw new MWException( "Queue namespacing not support for this queue type." );
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,18 +36,20 @@ class JobQueueRedis extends JobQueue {
|
|||
const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
|
||||
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
|
||||
|
||||
protected $key; // string; key to prefix the queue keys with (used for testing)
|
||||
|
||||
/**
|
||||
* @params include:
|
||||
* - redisConf : An array of parameters to RedisConnectionPool::__construct().
|
||||
* - server : A hostname/port combination or the absolute path of a UNIX socket.
|
||||
* If a hostname is specified but no port, the standard port number
|
||||
* 6379 will be used. Required.
|
||||
* - redisConfig : An array of parameters to RedisConnectionPool::__construct().
|
||||
* - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
|
||||
* If a hostname is specified but no port, the standard port number
|
||||
* 6379 will be used. Required.
|
||||
* @param array $params
|
||||
*/
|
||||
public function __construct( array $params ) {
|
||||
parent::__construct( $params );
|
||||
$this->server = $params['redisConf']['server'];
|
||||
$this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] );
|
||||
$this->server = $params['redisServer'];
|
||||
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -56,10 +58,6 @@ class JobQueueRedis extends JobQueue {
|
|||
* @throws MWException
|
||||
*/
|
||||
protected function doIsEmpty() {
|
||||
if ( mt_rand( 0, 99 ) == 0 ) {
|
||||
$this->doInternalMaintenance();
|
||||
}
|
||||
|
||||
$conn = $this->getConnection();
|
||||
try {
|
||||
return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
|
||||
|
|
@ -74,10 +72,6 @@ class JobQueueRedis extends JobQueue {
|
|||
* @throws MWException
|
||||
*/
|
||||
protected function doGetSize() {
|
||||
if ( mt_rand( 0, 99 ) == 0 ) {
|
||||
$this->doInternalMaintenance();
|
||||
}
|
||||
|
||||
$conn = $this->getConnection();
|
||||
try {
|
||||
return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
|
||||
|
|
@ -92,17 +86,12 @@ class JobQueueRedis extends JobQueue {
|
|||
* @throws MWException
|
||||
*/
|
||||
protected function doGetAcquiredCount() {
|
||||
if ( mt_rand( 0, 99 ) == 0 ) {
|
||||
$this->doInternalMaintenance();
|
||||
if ( $this->claimTTL <= 0 ) {
|
||||
return 0; // no acknowledgements
|
||||
}
|
||||
|
||||
$conn = $this->getConnection();
|
||||
try {
|
||||
if ( $this->claimTTL > 0 ) {
|
||||
return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
|
||||
} catch ( RedisException $e ) {
|
||||
$this->throwRedisException( $this->server, $conn, $e );
|
||||
}
|
||||
|
|
@ -190,8 +179,8 @@ class JobQueueRedis extends JobQueue {
|
|||
protected function doPop() {
|
||||
$job = false;
|
||||
|
||||
if ( mt_rand( 0, 99 ) == 0 ) {
|
||||
$this->doInternalMaintenance();
|
||||
if ( $this->claimTTL <= 0 && mt_rand( 0, 99 ) == 0 ) {
|
||||
$this->cleanupClaimedJobs(); // prune jobs and IDs from the "garbage" list
|
||||
}
|
||||
|
||||
$conn = $this->getConnection();
|
||||
|
|
@ -340,25 +329,16 @@ class JobQueueRedis extends JobQueue {
|
|||
return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
|
||||
}
|
||||
|
||||
/**
|
||||
* Do any job recycling or queue cleanup as needed
|
||||
*
|
||||
* @return void
|
||||
* @return integer Number of jobs recycled/deleted
|
||||
* @throws MWException
|
||||
*/
|
||||
protected function doInternalMaintenance() {
|
||||
return ( $this->claimTTL > 0 ) ?
|
||||
$this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Recycle or destroy any jobs that have been claimed for too long
|
||||
*
|
||||
* @return integer Number of jobs recycled/deleted
|
||||
* @throws MWException
|
||||
*/
|
||||
protected function recycleAndDeleteStaleJobs() {
|
||||
public function recycleAndDeleteStaleJobs() {
|
||||
if ( $this->claimTTL <= 0 ) { // sanity
|
||||
throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
|
||||
}
|
||||
$count = 0;
|
||||
// For each job item that can be retried, we need to add it back to the
|
||||
// main queue and remove it from the list of currenty claimed job items.
|
||||
|
|
@ -488,6 +468,22 @@ class JobQueueRedis extends JobQueue {
|
|||
return $count;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Array
|
||||
*/
|
||||
protected function doGetPeriodicTasks() {
|
||||
if ( $this->claimTTL > 0 ) {
|
||||
return array(
|
||||
'recycleAndDeleteStaleJobs' => array(
|
||||
'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
|
||||
'period' => ceil( $this->claimTTL / 2 )
|
||||
)
|
||||
);
|
||||
} else {
|
||||
return array();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param $job Job
|
||||
* @return array
|
||||
|
|
@ -560,7 +556,11 @@ class JobQueueRedis extends JobQueue {
|
|||
*/
|
||||
private function getQueueKey( $prop ) {
|
||||
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
|
||||
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
|
||||
if ( strlen( $this->key ) ) { // namespaced queue (for testing)
|
||||
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
|
||||
} else {
|
||||
return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -606,4 +606,12 @@ class JobQueueRedis extends JobQueue {
|
|||
}
|
||||
return $res;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param $key string
|
||||
* @return void
|
||||
*/
|
||||
public function setTestingPrefix( $key ) {
|
||||
$this->key = $key;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ class MediaWikiPHPUnitCommand extends PHPUnit_TextUI_Command {
|
|||
'file=' => false,
|
||||
'use-filebackend=' => false,
|
||||
'use-bagostuff=' => false,
|
||||
'use-jobqueue=' => false,
|
||||
'keep-uploads' => false,
|
||||
'use-normal-tables' => false,
|
||||
'reuse-db' => false,
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@
|
|||
* @group Database
|
||||
*/
|
||||
class JobQueueTest extends MediaWikiTestCase {
|
||||
protected $key;
|
||||
protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;
|
||||
protected $old = array();
|
||||
|
||||
function __construct( $name = null, array $data = array(), $dataName = '' ) {
|
||||
|
|
@ -15,18 +17,34 @@ class JobQueueTest extends MediaWikiTestCase {
|
|||
}
|
||||
|
||||
protected function setUp() {
|
||||
global $wgMemc;
|
||||
global $wgMemc, $wgJobTypeConf;
|
||||
parent::setUp();
|
||||
$this->old['wgMemc'] = $wgMemc;
|
||||
$wgMemc = new HashBagOStuff();
|
||||
$this->queueRand = JobQueue::factory( array( 'class' => 'JobQueueDB',
|
||||
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random' ) );
|
||||
$this->queueRandTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
|
||||
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'random', 'claimTTL' => 10 ) );
|
||||
$this->queueFifo = JobQueue::factory( array( 'class' => 'JobQueueDB',
|
||||
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo' ) );
|
||||
$this->queueFifoTTL = JobQueue::factory( array( 'class' => 'JobQueueDB',
|
||||
'wiki' => wfWikiID(), 'type' => 'null', 'order' => 'fifo', 'claimTTL' => 10 ) );
|
||||
if ( $this->getCliArg( 'use-jobqueue=' ) ) {
|
||||
$name = $this->getCliArg( 'use-jobqueue=' );
|
||||
if ( !isset( $wgJobTypeConf[$name] ) ) {
|
||||
throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
|
||||
}
|
||||
$baseConfig = $wgJobTypeConf[$name];
|
||||
} else {
|
||||
$baseConfig = array( 'class' => 'JobQueueDB' );
|
||||
}
|
||||
$baseConfig['type'] = 'null';
|
||||
$baseConfig['wiki'] = wfWikiID();
|
||||
$this->queueRand = JobQueue::factory(
|
||||
array( 'order' => 'random', 'claimTTL' => 0 ) + $baseConfig );
|
||||
$this->queueRandTTL = JobQueue::factory(
|
||||
array( 'order' => 'random', 'claimTTL' => 10 ) + $baseConfig );
|
||||
$this->queueFifo = JobQueue::factory(
|
||||
array( 'order' => 'fifo', 'claimTTL' => 0 ) + $baseConfig );
|
||||
$this->queueFifoTTL = JobQueue::factory(
|
||||
array( 'order' => 'fifo', 'claimTTL' => 10 ) + $baseConfig );
|
||||
if ( $baseConfig['class'] !== 'JobQueueDB' ) { // DB namespace with prefix or temp tables
|
||||
foreach ( array( 'queueRand', 'queueRandTTL', 'queueFifo', 'queueFifoTTL' ) as $q ) {
|
||||
$this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected function tearDown() {
|
||||
|
|
@ -239,7 +257,7 @@ class JobQueueTest extends MediaWikiTestCase {
|
|||
$queue->ack( $job );
|
||||
}
|
||||
|
||||
$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
|
||||
$this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
|
||||
|
||||
$queue->flushCaches();
|
||||
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
||||
|
|
|
|||
Loading…
Reference in a new issue