* Cleaned up some data structures into hashes, which get better compression and play well with the KEYS parameter in Lua scripts. The claim list is now a sorted set with O(logN) removal in ack() and O(log(N)+M) searching in recycleAndDeleteStaleJobs(). * Made the class itself control object serialization, so that lua scripts have an easy time. Only the job data itself needs to be serialized, where as other things just get bloated. * Used Lua scripts to get push(), pop() and ack() down to 1 RTT. * Likewise rewrote recycleAndDeleteStaleJobs() to use a script. * Fixed bug where claimed duplicate jobs removed the data on ack(), which meant that claimed duplicated jobs could no-op newer ones. De-duplication should only apply to unclaimed jobs like for the JobQueueDB class, so that unfinished jobs don't no-op new ones. * Removed locking in recycleAndDeleteStaleJobs(), which would not do much since the exclusive set request would serialize on the lua script anyway. The lua script will finish quickly the next times if done more than once in a row due to sorted set usage. Also made recycleAndDeleteStaleJobs() run randomly to reduce the chance of a single calling tying up the server. * Removed useless hDel() call in getJobFromUidInternal(). * Changed unit tests to handle the different supported orders better. Added tests for the 'timestamp' ordering. Change-Id: Ib2d7aff18753195248ab856afd4a46e18b301db9
318 lines
10 KiB
PHP
318 lines
10 KiB
PHP
<?php
|
|
|
|
/**
|
|
* @group JobQueue
|
|
* @group medium
|
|
* @group Database
|
|
*/
|
|
class JobQueueTest extends MediaWikiTestCase {
|
|
protected $key;
|
|
protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;
|
|
protected $old = array();
|
|
|
|
function __construct( $name = null, array $data = array(), $dataName = '' ) {
|
|
parent::__construct( $name, $data, $dataName );
|
|
|
|
$this->tablesUsed[] = 'job';
|
|
}
|
|
|
|
protected function setUp() {
|
|
global $wgMemc, $wgJobTypeConf;
|
|
parent::setUp();
|
|
$this->old['wgMemc'] = $wgMemc;
|
|
$wgMemc = new HashBagOStuff();
|
|
if ( $this->getCliArg( 'use-jobqueue=' ) ) {
|
|
$name = $this->getCliArg( 'use-jobqueue=' );
|
|
if ( !isset( $wgJobTypeConf[$name] ) ) {
|
|
throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
|
|
}
|
|
$baseConfig = $wgJobTypeConf[$name];
|
|
} else {
|
|
$baseConfig = array( 'class' => 'JobQueueDB' );
|
|
}
|
|
$baseConfig['type'] = 'null';
|
|
$baseConfig['wiki'] = wfWikiID();
|
|
$variants = array(
|
|
'queueRand' => array( 'order' => 'random', 'claimTTL' => 0 ),
|
|
'queueRandTTL' => array( 'order' => 'random', 'claimTTL' => 10 ),
|
|
'queueTimestamp' => array( 'order' => 'timestamp', 'claimTTL' => 0 ),
|
|
'queueTimestampTTL' => array( 'order' => 'timestamp', 'claimTTL' => 10 ),
|
|
'queueFifo' => array( 'order' => 'fifo', 'claimTTL' => 0 ),
|
|
'queueFifoTTL' => array( 'order' => 'fifo', 'claimTTL' => 10 ),
|
|
);
|
|
foreach ( $variants as $q => $settings ) {
|
|
try {
|
|
$this->$q = JobQueue::factory( $settings + $baseConfig );
|
|
if ( ! ( $this->$q instanceof JobQueueDB ) ) {
|
|
$this->$q->setTestingPrefix( 'unittests-' . wfRandomString( 32 ) );
|
|
}
|
|
} catch ( MWException $e ) {}; // unsupported? (@TODO: what if it was another error?)
|
|
}
|
|
}
|
|
|
|
protected function tearDown() {
|
|
global $wgMemc;
|
|
parent::tearDown();
|
|
foreach ( array(
|
|
'queueRand', 'queueRandTTL', 'queueTimestamp', 'queueTimestampTTL',
|
|
'queueFifo', 'queueFifoTTL'
|
|
) as $q ) {
|
|
if ( $this->$q ) {
|
|
do {
|
|
$job = $this->$q->pop();
|
|
if ( $job ) {
|
|
$this->$q->ack( $job );
|
|
}
|
|
} while ( $job );
|
|
}
|
|
$this->$q = null;
|
|
}
|
|
$wgMemc = $this->old['wgMemc'];
|
|
}
|
|
|
|
/**
|
|
* @dataProvider provider_queueLists
|
|
*/
|
|
function testProperties( $queue, $recycles, $desc ) {
|
|
$queue = $this->$queue;
|
|
if ( !$queue ) {
|
|
$this->markTestSkipped( $desc );
|
|
}
|
|
|
|
$this->assertEquals( wfWikiID(), $queue->getWiki(), "Proper wiki ID ($desc)" );
|
|
$this->assertEquals( 'null', $queue->getType(), "Proper job type ($desc)" );
|
|
}
|
|
|
|
/**
|
|
* @dataProvider provider_queueLists
|
|
*/
|
|
function testBasicOperations( $queue, $recycles, $desc ) {
|
|
$queue = $this->$queue;
|
|
if ( !$queue ) {
|
|
$this->markTestSkipped( $desc );
|
|
}
|
|
|
|
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
|
|
|
|
$this->assertTrue( $queue->push( $this->newJob() ), "Push worked ($desc)" );
|
|
$this->assertTrue( $queue->batchPush( array( $this->newJob() ) ), "Push worked ($desc)" );
|
|
|
|
$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 2, $queue->getSize(), "Queue size is correct ($desc)" );
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
|
|
|
|
$job1 = $queue->pop();
|
|
$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
if ( $recycles ) {
|
|
$this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
} else {
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
}
|
|
|
|
$job2 = $queue->pop();
|
|
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
|
|
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
if ( $recycles ) {
|
|
$this->assertEquals( 2, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
} else {
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
}
|
|
|
|
$queue->ack( $job1 );
|
|
|
|
$queue->flushCaches();
|
|
if ( $recycles ) {
|
|
$this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
} else {
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
}
|
|
|
|
$queue->ack( $job2 );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
}
|
|
|
|
/**
|
|
* @dataProvider provider_queueLists
|
|
*/
|
|
function testBasicDeduplication( $queue, $recycles, $desc ) {
|
|
$queue = $this->$queue;
|
|
if ( !$queue ) {
|
|
$this->markTestSkipped( $desc );
|
|
}
|
|
|
|
|
|
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
|
|
|
|
$this->assertTrue( $queue->batchPush(
|
|
array( $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ) ),
|
|
"Push worked ($desc)" );
|
|
|
|
$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
|
|
|
|
$this->assertTrue( $queue->batchPush(
|
|
array( $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ) ),
|
|
"Push worked ($desc)" );
|
|
|
|
$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
|
|
|
|
$job1 = $queue->pop();
|
|
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
|
if ( $recycles ) {
|
|
$this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
} else {
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
}
|
|
|
|
$queue->ack( $job1 );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
|
|
}
|
|
|
|
/**
|
|
* @dataProvider provider_queueLists
|
|
*/
|
|
function testRootDeduplication( $queue, $recycles, $desc ) {
|
|
$queue = $this->$queue;
|
|
if ( !$queue ) {
|
|
$this->markTestSkipped( $desc );
|
|
}
|
|
|
|
|
|
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
|
|
|
|
$id = wfRandomString( 32 );
|
|
$root1 = Job::newRootJobParams( "nulljobspam:$id" ); // task ID/timestamp
|
|
for ( $i = 0; $i < 5; ++$i ) {
|
|
$this->assertTrue( $queue->push( $this->newJob( 0, $root1 ) ), "Push worked ($desc)" );
|
|
}
|
|
$queue->deduplicateRootJob( $this->newJob( 0, $root1 ) );
|
|
sleep( 1 ); // roo job timestamp will increase
|
|
$root2 = Job::newRootJobParams( "nulljobspam:$id" ); // task ID/timestamp
|
|
$this->assertNotEquals( $root1['rootJobTimestamp'], $root2['rootJobTimestamp'],
|
|
"Root job signatures have different timestamps." );
|
|
for ( $i = 0; $i < 5; ++$i ) {
|
|
$this->assertTrue( $queue->push( $this->newJob( 0, $root2 ) ), "Push worked ($desc)" );
|
|
}
|
|
$queue->deduplicateRootJob( $this->newJob( 0, $root2 ) );
|
|
|
|
$this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 10, $queue->getSize(), "Queue size is correct ($desc)" );
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
|
|
|
|
$dupcount = 0;
|
|
$jobs = array();
|
|
do {
|
|
$job = $queue->pop();
|
|
if ( $job ) {
|
|
$jobs[] = $job;
|
|
$queue->ack( $job );
|
|
}
|
|
if ( $job instanceof DuplicateJob ) {
|
|
++$dupcount;
|
|
}
|
|
} while ( $job );
|
|
|
|
$this->assertEquals( 10, count( $jobs ), "Correct number of jobs popped ($desc)" );
|
|
$this->assertEquals( 5, $dupcount, "Correct number of duplicate jobs popped ($desc)" );
|
|
}
|
|
|
|
/**
|
|
* @dataProvider provider_fifoQueueLists
|
|
*/
|
|
function testJobOrder( $queue, $recycles, $desc ) {
|
|
$queue = $this->$queue;
|
|
if ( !$queue ) {
|
|
$this->markTestSkipped( $desc );
|
|
}
|
|
|
|
|
|
$this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
|
|
|
|
for ( $i = 0; $i < 10; ++$i ) {
|
|
$this->assertTrue( $queue->push( $this->newJob( $i ) ), "Push worked ($desc)" );
|
|
}
|
|
|
|
for ( $i = 0; $i < 10; ++$i ) {
|
|
$job = $queue->pop();
|
|
$this->assertTrue( $job instanceof Job, "Jobs popped from queue ($desc)" );
|
|
$params = $job->getParams();
|
|
$this->assertEquals( $i, $params['i'], "Job popped from queue is FIFO ($desc)" );
|
|
$queue->ack( $job );
|
|
}
|
|
|
|
$this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
|
|
|
|
$queue->flushCaches();
|
|
$this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
|
|
$this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
|
|
}
|
|
|
|
function provider_queueLists() {
|
|
return array(
|
|
array( 'queueRand', false, 'Random queue without ack()' ),
|
|
array( 'queueRandTTL', true, 'Random queue with ack()' ),
|
|
array( 'queueTimestamp', false, 'Time ordered queue without ack()' ),
|
|
array( 'queueTimestampTTL', true, 'Time ordered queue with ack()' ),
|
|
array( 'queueFifo', false, 'FIFO ordered queue without ack()' ),
|
|
array( 'queueFifoTTL', true, 'FIFO ordered queue with ack()' )
|
|
);
|
|
}
|
|
|
|
function provider_fifoQueueLists() {
|
|
return array(
|
|
array( 'queueFifo', false, 'Ordered queue without ack()' ),
|
|
array( 'queueFifoTTL', true, 'Ordered queue with ack()' )
|
|
);
|
|
}
|
|
|
|
function newJob( $i = 0, $rootJob = array() ) {
|
|
return new NullJob( Title::newMainPage(),
|
|
array( 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 0, 'i' => $i ) + $rootJob );
|
|
}
|
|
|
|
function newDedupedJob( $i = 0, $rootJob = array() ) {
|
|
return new NullJob( Title::newMainPage(),
|
|
array( 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 1, 'i' => $i ) + $rootJob );
|
|
}
|
|
}
|