. * * PoolCounter was created to provide semaphore semantics to restrict the number * of workers that may be concurrently performing a given task. Only one key * can be locked by any PoolCounter instance of a process, except for keys * that start with "nowait:". However, only non-blocking requests (timeout=0) * may be used with a "nowait:" key. * * By default PoolCounterNull is used, which provides no locking. * Install the poolcounterd service from * to * enable this feature. * * @since 1.16 * @stable to extend */ abstract class PoolCounter { /* Return codes */ public const LOCKED = 1; /* Lock acquired */ public const RELEASED = 2; /* Lock released */ public const DONE = 3; /* Another worker did the work for you */ public const ERROR = -1; /* Indeterminate error */ public const NOT_LOCKED = -2; /* Called release() with no lock held */ public const QUEUE_FULL = -3; /* There are already maxqueue workers on this lock */ public const TIMEOUT = -4; /* Timeout exceeded */ public const LOCK_HELD = -5; /* Cannot acquire another lock while you have one lock held */ /** @var string All workers with the same key share the lock */ protected $key; /** @var int Maximum number of workers working on tasks with the same key simultaneously */ protected $workers; /** * Maximum number of workers working on this task type, regardless of key. * 0 means unlimited. Max allowed value is 65536. * The way the slot limit is enforced is overzealous - this option should be used with caution. * @var int */ protected $slots = 0; /** @var int If this number of workers are already working/waiting, fail instead of wait */ protected $maxqueue; /** @var int Maximum time in seconds to wait for the lock */ protected $timeout; /** * @var bool Whether the key is a "might wait" key */ private $isMightWaitKey; /** * @var int Whether this process holds a "might wait" lock key */ private static $acquiredMightWaitKey = 0; /** * @var bool Enable fast stale mode (T250248). This may be overridden by the work class. */ private $fastStale; /** * @param array $conf * @param string $type The class of actions to limit concurrency for (task type) * @param string $key */ public function __construct( array $conf, string $type, string $key ) { $this->workers = $conf['workers']; $this->maxqueue = $conf['maxqueue']; $this->timeout = $conf['timeout']; if ( isset( $conf['slots'] ) ) { $this->slots = $conf['slots']; } $this->fastStale = $conf['fastStale'] ?? false; if ( $this->slots ) { $key = $this->hashKeyIntoSlots( $type, $key, $this->slots ); } $this->key = $key; $this->isMightWaitKey = !preg_match( '/^nowait:/', $this->key ); } /** * @return string */ public function getKey() { return $this->key; } /** * I want to do this task and I need to do it myself. * * @param int|null $timeout Wait timeout, or null to use value passed to * the constructor * @return Status Value is one of Locked/Error */ abstract public function acquireForMe( $timeout = null ); /** * I want to do this task, but if anyone else does it * instead, it's also fine for me. I will read its cached data. * * @param int|null $timeout Wait timeout, or null to use value passed to * the constructor * @return Status Value is one of Locked/Done/Error */ abstract public function acquireForAnyone( $timeout = null ); /** * I have successfully finished my task. * Lets another one grab the lock, and returns the workers * waiting on acquireForAnyone() * * @return Status Value is one of Released/NotLocked/Error */ abstract public function release(); /** * Checks that the lock request is sensible. * @return Status good for sensible requests, fatal for the not so sensible * @since 1.25 */ final protected function precheckAcquire() { if ( $this->isMightWaitKey ) { if ( self::$acquiredMightWaitKey ) { /* * The poolcounter itself is quite happy to allow you to wait * on another lock while you have a lock you waited on already * but we think that it is unlikely to be a good idea. So we * made it an error. If you are _really_ _really_ sure it is a * good idea then feel free to implement an unsafe flag or * something. */ return Status::newFatal( 'poolcounter-usage-error', 'You may only aquire a single non-nowait lock.' ); } } elseif ( $this->timeout !== 0 ) { return Status::newFatal( 'poolcounter-usage-error', 'Locks starting in nowait: must have 0 timeout.' ); } return Status::newGood(); } /** * Update any lock tracking information when the lock is acquired * @since 1.25 */ final protected function onAcquire() { self::$acquiredMightWaitKey |= $this->isMightWaitKey; } /** * Update any lock tracking information when the lock is released * @since 1.25 */ final protected function onRelease() { self::$acquiredMightWaitKey &= !$this->isMightWaitKey; } /** * Given a key (any string) and the number of lots, returns a slot key (a prefix with a suffix * integer from the [0..($slots-1)] range). This is used for a global limit on the number of * instances of a given type that can acquire a lock. The hashing is deterministic so that * PoolCounter::$workers is always an upper limit of how many instances with the same key * can acquire a lock. * * @param string $type The class of actions to limit concurrency for (task type) * @param string $key PoolCounter instance key (any string) * @param int $slots The number of slots (max allowed value is 65536) * @return string Slot key with the type and slot number */ protected function hashKeyIntoSlots( $type, $key, $slots ) { return $type . ':' . ( hexdec( substr( sha1( $key ), 0, 4 ) ) % $slots ); } /** * Is fast stale mode (T250248) enabled? This may be overridden by the * PoolCounterWork subclass. * * @return bool */ public function isFastStaleEnabled() { return $this->fastStale; } }