wiki.techinc.nl/includes/jobqueue/JobQueueMemory.php
Timo Tijhof ce96bed119 jobqueue: combine Logger channels, improve docs, add missing ingroup tags
* combine `JobQueueFederated` and `JobQueueRedis` into a single
  `JobQueue` channel.

* Remove duplicate descriptions from file blocks in favour of class
  doc blocks. This reduces needless duplication and was often
  incorrect or outdated, and helps (ironically) to make the file header
  more consistently visually ignorable. Various files in this patch
  contained bogus copy-pasta descriptions from unrelated classes,
  and re-defined `defgroup JobQueue` many times, showing exactly
  how this is defacto ignored and counter-productive to maintain
  in two places.

  Remove `ingroup` from file blocks in class files as otherwise
  the file is indexed twice (e.g. in Doxygen) which makes navigation
  on doc.wikimedia.org rather messy for classes in this group.

  Ref <https://gerrit.wikimedia.org/r/q/message:ingroup+is:merged>

Change-Id: I926a3aec2bc98fefa1075c4a794c46108579ae3f
2024-03-25 15:00:38 -07:00

228 lines
4.9 KiB
PHP

<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
/**
* PHP memory-backed job queue storage, for testing.
*
* JobQueueGroup does not remember every queue instance, so statically track it here.
*
* @since 1.27
* @ingroup JobQueue
*/
class JobQueueMemory extends JobQueue {
/** @var array[] */
protected static $data = [];
public function __construct( array $params ) {
$params['wanCache'] = new WANObjectCache( [ 'cache' => new HashBagOStuff() ] );
parent::__construct( $params );
}
/**
* @see JobQueue::doBatchPush
*
* @param IJobSpecification[] $jobs
* @param int $flags
*/
protected function doBatchPush( array $jobs, $flags ) {
$unclaimed =& $this->getQueueData( 'unclaimed', [] );
foreach ( $jobs as $job ) {
if ( $job->ignoreDuplicates() ) {
$sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
if ( !isset( $unclaimed[$sha1] ) ) {
$unclaimed[$sha1] = $job;
}
} else {
$unclaimed[] = $job;
}
}
}
/**
* @see JobQueue::supportedOrders
*
* @return string[]
*/
protected function supportedOrders() {
return [ 'random', 'timestamp', 'fifo' ];
}
/**
* @see JobQueue::optimalOrder
*
* @return string
*/
protected function optimalOrder() {
return 'fifo';
}
/**
* @see JobQueue::doIsEmpty
*
* @return bool
*/
protected function doIsEmpty() {
return ( $this->doGetSize() == 0 );
}
/**
* @see JobQueue::doGetSize
*
* @return int
*/
protected function doGetSize() {
$unclaimed = $this->getQueueData( 'unclaimed' );
return $unclaimed ? count( $unclaimed ) : 0;
}
/**
* @see JobQueue::doGetAcquiredCount
*
* @return int
*/
protected function doGetAcquiredCount() {
$claimed = $this->getQueueData( 'claimed' );
return $claimed ? count( $claimed ) : 0;
}
/**
* @see JobQueue::doPop
*
* @return RunnableJob|false
*/
protected function doPop() {
if ( $this->doGetSize() == 0 ) {
return false;
}
$unclaimed =& $this->getQueueData( 'unclaimed' );
$claimed =& $this->getQueueData( 'claimed', [] );
if ( $this->order === 'random' ) {
$key = array_rand( $unclaimed );
} else {
$key = array_key_first( $unclaimed );
}
$spec = $unclaimed[$key];
unset( $unclaimed[$key] );
$claimed[] = $spec;
$job = $this->jobFromSpecInternal( $spec );
$job->setMetadata( 'claimId', array_key_last( $claimed ) );
return $job;
}
/**
* @see JobQueue::doAck
*
* @param RunnableJob $job
*/
protected function doAck( RunnableJob $job ) {
if ( $this->getAcquiredCount() == 0 ) {
return;
}
$claimed =& $this->getQueueData( 'claimed' );
unset( $claimed[$job->getMetadata( 'claimId' )] );
}
/**
* @inheritDoc
*/
protected function doDelete() {
if ( isset( self::$data[$this->type][$this->domain] ) ) {
unset( self::$data[$this->type][$this->domain] );
if ( !self::$data[$this->type] ) {
unset( self::$data[$this->type] );
}
}
}
/**
* @see JobQueue::getAllQueuedJobs
*
* @return Iterator<RunnableJob> of Job objects.
*/
public function getAllQueuedJobs() {
$unclaimed = $this->getQueueData( 'unclaimed' );
if ( !$unclaimed ) {
return new ArrayIterator( [] );
}
return new MappedIterator(
$unclaimed,
function ( $value ) {
return $this->jobFromSpecInternal( $value );
}
);
}
/**
* @see JobQueue::getAllAcquiredJobs
*
* @return Iterator<RunnableJob> of Job objects.
*/
public function getAllAcquiredJobs() {
$claimed = $this->getQueueData( 'claimed' );
if ( !$claimed ) {
return new ArrayIterator( [] );
}
return new MappedIterator(
$claimed,
function ( $value ) {
return $this->jobFromSpecInternal( $value );
}
);
}
/**
* @param IJobSpecification $spec
* @return RunnableJob
*/
public function jobFromSpecInternal( IJobSpecification $spec ) {
return $this->factoryJob( $spec->getType(), $spec->getParams() );
}
/**
* @param string $field
* @param mixed|null $init
*
* @return mixed
*/
private function &getQueueData( $field, $init = null ) {
if ( !isset( self::$data[$this->type][$this->domain][$field] ) ) {
if ( $init !== null ) {
self::$data[$this->type][$this->domain][$field] = $init;
} else {
return $init;
}
}
return self::$data[$this->type][$this->domain][$field];
}
}