diff --git a/includes/libs/objectcache/BagOStuff.php b/includes/libs/objectcache/BagOStuff.php index d65b0382849..63464c51858 100644 --- a/includes/libs/objectcache/BagOStuff.php +++ b/includes/libs/objectcache/BagOStuff.php @@ -51,7 +51,7 @@ use Wikimedia\ScopedCallback; * among datacenters. * * Subclasses should override the default "segmentationSize" field with an appropriate value. - * The value should not be larger than what the storage backend (by default) supports. It also + * The value should not be larger than what the backing store (by default) supports. It also * should be roughly informed by common performance bottlenecks (e.g. values over a certain size * having poor scalability). The same goes for the "segmentedValueMaxSize" member, which limits * the maximum size and chunk count (indirectly) of values. diff --git a/includes/objectcache/SqlBagOStuff.php b/includes/objectcache/SqlBagOStuff.php index e236a7b1737..c41e67b3629 100644 --- a/includes/objectcache/SqlBagOStuff.php +++ b/includes/objectcache/SqlBagOStuff.php @@ -36,7 +36,16 @@ use Wikimedia\Timestamp\ConvertibleTimestamp; use Wikimedia\WaitConditionLoop; /** - * Class to store objects in the database + * RDBMS-based caching module + * + * The following database sharding schemes are supported: + * - None; all keys map to the same shard + * - Hash; keys map to shards via consistent hashing + * - Keyspace; global keys map to the global shard and non-global keys map to the local shard + * + * The following database replication topologies are supported: + * - A primary database server for each shard, all within one datacenter + * - A co-primary database server for each shard within each datacenter * * @ingroup Cache */ @@ -52,16 +61,18 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { protected $serverTags = []; /** @var int UNIX timestamp */ protected $lastGarbageCollect = 0; - /** @var int */ + /** @var int Average number of writes required to trigger garbage collection */ protected $purgePeriod = 10; - /** @var int */ + /** @var int Max expired rows to purge during randomized garbage collection */ protected $purgeLimit = 100; - /** @var int */ + /** @var int Number of table shards to use on each server */ protected $numTableShards = 1; /** @var string */ protected $tableName = 'objectcache'; - /** @var bool */ + /** @var bool Whether to use replicas instead of primaries (if using LoadBalancer) */ protected $replicaOnly; + /** @var string|null Multi-primary mode DB type ("mysql",...); null if not enabled */ + protected $multiPrimaryModeType; /** @var IMaintainableDatabase[] Map of (shard index => DB handle) */ protected $conns; @@ -70,16 +81,23 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { /** @var Exception[] Map of (shard index => Exception) */ protected $connFailureErrors = []; + private const SHARD_LOCAL = 'local'; + private const SHARD_GLOBAL = 'global'; + + /** A number of seconds well above any expected clock skew */ + private const SAFE_CLOCK_BOUND_SEC = 15; + /** A number of seconds well above any expected clock skew and replication lag */ + private const SAFE_PURGE_DELAY_SEC = 3600; + /** Distinct string for tombstones stored in the "serialized" value column */ + private const TOMB_SERIAL = ''; + /** Relative seconds-to-live to use for tombstones */ + private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC; /** How many seconds must pass before triggering a garbage collection */ private const GC_DELAY_SEC = 1; - private const OP_SET = 'set'; - private const OP_ADD = 'add'; - private const OP_TOUCH = 'touch'; - private const OP_DELETE = 'delete'; - - private const SHARD_LOCAL = 'local'; - private const SHARD_GLOBAL = 'global'; + private const BLOB_VALUE = 0; + private const BLOB_EXPIRY = 1; + private const BLOB_CASTOKEN = 2; /** * Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types. @@ -90,37 +108,46 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959'; /** - * Constructor. Parameters are: - * - server: Server config map for Database::factory() that describes the database - * to use for all key operations. This overrides "localKeyLB" and "globalKeyLB". - * - servers: Array of server config maps, each for Database::factory(), describing a set - * of database servers to distribute keys to. If this is specified, the "server" option - * will be ignored. If string keys are used, then they will be used for consistent - * hashing *instead* of the host name (from the server config). This is useful - * when a cluster is replicated to another site (with different host names) - * but each server has a corresponding replica in the other cluster. + * Create a new backend instance from configuration + * + * The database servers must be provided by *either* the "server" parameter, the "servers" + * parameter, the "globalKeyLB" parameter, or both the "globalKeyLB"/"localKeyLB" paramters. + * + * Parameters include: + * - server: Server config map for Database::factory() that describes the database to + * use for all key operations in the current region. This is overriden by "servers". + * - servers: Map of tag strings to server config maps, each for Database::factory(), + * describing the set of database servers on which to distribute key operations in the + * current region. Data is distributed among the servers via key hashing based on the + * server tags. Therefore, each tag represents a shard of the dataset. Tags are useful + * for failover using cold-standby servers and for managing shards with replica servers + * in multiple regions (each having different hostnames). * - localKeyLB: ObjectFactory::getObjectFromSpec array yielding ILoadBalancer. * This load balancer is used for local keys, e.g. those using makeKey(). - * This is overriden by 'server'/'servers'. + * This is overriden by "server" and "servers". * - globalKeyLB: ObjectFactory::getObjectFromSpec array yielding ILoadBalancer. * This load balancer is used for local keys, e.g. those using makeGlobalKey(). - * This is overriden by 'server'/'servers'. + * This is overriden by "server" and "servers". + * - multiPrimaryMode: Whether the portion of the dataset belonging to each tag/shard is + * replicated among one or more regions, with one "co-primary" server in each region. + * Queries are issued in a manner that provides Last-Write-Wins eventual consistency. + * This option requires the "server" or "servers" options. Only MySQL, with statment + * based replication (log_bin='ON' and binlog_format='STATEMENT') is supported. Also, + * the `modtoken` column must exist on the `objectcache` table(s). [optional] * - purgePeriod: The average number of object cache writes in between garbage collection * operations, where expired entries are removed from the database. Or in other words, * the reciprocal of the probability of purging on any given write. If this is set to * zero, purging will never be done. [optional] * - purgeLimit: Maximum number of rows to purge at once. [optional] * - tableName: The table name to use, default is "objectcache". [optional] - * - shards: The number of tables to use for data storage on each server. - * If this is more than 1, table names will be formed in the style objectcacheNNN where - * NNN is the shard index, between 0 and shards-1. The number of digits will be the - * minimum number required to hold the largest shard index. Data will be distributed - * across all tables by key hash. This is for MySQL bugs 61735 - * and 61736 - * . [optional] - * - replicaOnly: Whether to only use replica DBs and avoid triggering garbage collection - * logic of expired items. This only makes sense if the primary DB is used and only if - * get() calls will be used. This is used by ReplicatedBagOStuff. [optional] + * - shards: The number of tables to use for data storage on each server. If greater than + * 1, table names are formed in the style objectcacheNNN where NNN is the shard index, + * between 0 and shards-1. The number of digits used in the suffix is the minimum number + * required to hold the largest shard index. Data is distributed among the tables via + * key hashing. This helps mitigate MySQL bugs 61735 and 61736. [optional] + * - replicaOnly: Whether to only use replica servers and only support read operations. + * This option requires the use of LoadBalancer ("localKeyLB"/"globalKeyLB") and + * should only be used by ReplicatedBagOStuff. [optional] * - syncTimeout: Max seconds to wait for replica DBs to catch up for WRITE_SYNC. [optional] * * @param array $params @@ -128,28 +155,33 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { public function __construct( $params ) { parent::__construct( $params ); - $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL; - + $dbType = null; if ( isset( $params['servers'] ) || isset( $params['server'] ) ) { + // Configuration uses a direct list of servers $index = 0; foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) { $this->serverInfos[$index] = $info; + // Allow integer-indexes arrays for b/c $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index"; + $dbType = $info['type']; ++$index; } $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE; } else { - if ( isset( $params['localKeyLB'] ) ) { - $this->localKeyLb = ( $params['localKeyLB'] instanceof ILoadBalancer ) - ? $params['localKeyLB'] - : ObjectFactory::getObjectFromSpec( $params['localKeyLB'] ); - } + // Configuration uses the servers defined in LoadBalancer instances if ( isset( $params['globalKeyLB'] ) ) { $this->globalKeyLb = ( $params['globalKeyLB'] instanceof ILoadBalancer ) ? $params['globalKeyLB'] : ObjectFactory::getObjectFromSpec( $params['globalKeyLB'] ); } - $this->localKeyLb = $this->localKeyLb ?: $this->globalKeyLb; + if ( isset( $params['localKeyLB'] ) ) { + $this->localKeyLb = ( $params['localKeyLB'] instanceof ILoadBalancer ) + ? $params['localKeyLB'] + : ObjectFactory::getObjectFromSpec( $params['localKeyLB'] ); + } else { + $this->localKeyLb = $this->globalKeyLb; + } + // When using LoadBalancer instances, one *must* be defined for local keys if ( !$this->localKeyLb ) { throw new InvalidArgumentException( "Config requires 'server', 'servers', or 'localKeyLB'/'globalKeyLB'" @@ -157,20 +189,22 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { } $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE; } - if ( isset( $params['purgePeriod'] ) ) { - $this->purgePeriod = intval( $params['purgePeriod'] ); - } - if ( isset( $params['purgeLimit'] ) ) { - $this->purgeLimit = intval( $params['purgeLimit'] ); - } - if ( isset( $params['tableName'] ) ) { - $this->tableName = $params['tableName']; - } - if ( isset( $params['shards'] ) ) { - $this->numTableShards = intval( $params['shards'] ); - } + + $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod ); + $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit ); + $this->tableName = $params['tableName'] ?? $this->tableName; + $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards ); $this->replicaOnly = $params['replicaOnly'] ?? false; + if ( $params['multiPrimaryMode'] ?? false ) { + if ( $dbType !== 'mysql' ) { + throw new InvalidArgumentException( "Multi-primary mode only supports MySQL" ); + } + + $this->multiPrimaryModeType = $dbType; + } + + $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL; $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS; } @@ -178,14 +212,13 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { $getToken = ( $casToken === self::PASS_BY_REF ); $casToken = null; - $blobs = $this->fetchBlobMulti( [ $key ] ); - if ( array_key_exists( $key, $blobs ) ) { - $blob = $blobs[$key]; - $result = $this->unserialize( $blob ); - if ( $getToken && $blob !== false ) { - $casToken = $blob; + $data = $this->fetchBlobs( [ $key ], $getToken )[$key]; + if ( $data ) { + $result = $this->unserialize( $data[self::BLOB_VALUE] ); + if ( $getToken && $result !== false ) { + $casToken = $data[self::BLOB_CASTOKEN]; } - $valueSize = strlen( $blob ); + $valueSize = strlen( $data[self::BLOB_VALUE] ); } else { $result = false; $valueSize = false; @@ -197,122 +230,184 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { } protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) { - return $this->modifyMulti( [ $key => $value ], $exptime, $flags, self::OP_SET ); + $mtime = $this->getCurrentTime(); + + return $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForSet' ], + $mtime, + [ $key => [ $value, $exptime ] ], + $flags + ); } protected function doDelete( $key, $flags = 0 ) { - return $this->modifyMulti( [ $key => null ], 0, $flags, self::OP_DELETE ); + $mtime = $this->getCurrentTime(); + + return $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForDelete' ], + $mtime, + [ $key => [] ], + $flags + ); } protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) { - return $this->modifyMulti( [ $key => $value ], $exptime, $flags, self::OP_ADD ); - } - - protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) { - list( $shardIndex, $tableName ) = $this->getKeyLocation( $key ); - $expiry = $this->getExpirationAsTimestamp( $exptime ); - $serialized = $this->getSerialized( $value, $key ); - - /** @noinspection PhpUnusedLocalVariableInspection */ - $silenceScope = $this->silenceTransactionProfiler(); - $db = null; // in case of connection failure - try { - $db = $this->getConnection( $shardIndex ); - // (T26425) use a replace if the db supports it instead of - // delete/insert to avoid clashes with conflicting keynames - $db->update( - $tableName, - [ - 'keyname' => $key, - 'value' => $db->encodeBlob( $serialized ), - 'exptime' => $this->encodeDbExpiry( $db, $expiry ) - ], - [ - 'keyname' => $key, - 'value' => $db->encodeBlob( $casToken ), - 'exptime > ' . $db->addQuotes( $db->timestamp() ) - ], - __METHOD__ - ); - } catch ( DBQueryError $e ) { - $this->handleWriteError( $e, $db, $shardIndex ); - + $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope ); + if ( $mtime === null ) { + // Timeout or I/O error during lock acquisition return false; } - $success = (bool)$db->affectedRows(); - if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) { - $success = $this->waitForReplication( $shardIndex ) && $success; + return $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForAdd' ], + $mtime, + [ $key => [ $value, $exptime ] ], + $flags + ); + } + + protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) { + $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope ); + if ( $mtime === null ) { + // Timeout or I/O error during lock acquisition + return false; } - $this->updateOpStats( self::METRIC_OP_CAS, [ $key => [ strlen( $serialized ), null ] ] ); - - return $success; + return $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForCas' ], + $mtime, + [ $key => [ $value, $exptime, $casToken ] ], + $flags + ); } protected function doChangeTTL( $key, $exptime, $flags ) { - return $this->modifyMulti( [ $key => null ], $exptime, $flags, self::OP_TOUCH ); + $mtime = $this->getCurrentTime(); + + return $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForChangeTTL' ], + $mtime, + [ $key => [ $exptime ] ], + $flags + ); } - public function incr( $key, $step = 1, $flags = 0 ) { - list( $shardIndex, $tableName ) = $this->getKeyLocation( $key ); + public function incrWithInit( $key, $exptime, $value = 1, $init = null, $flags = 0 ) { + $value = (int)$value; + $init = is_int( $init ) ? $init : $value; - $newCount = false; - /** @noinspection PhpUnusedLocalVariableInspection */ - $silenceScope = $this->silenceTransactionProfiler(); - $db = null; // in case of connection failure - try { - $db = $this->getConnection( $shardIndex ); - $encTimestamp = $db->addQuotes( $db->timestamp() ); - $db->update( - $tableName, - [ 'value = value + ' . (int)$step ], - [ 'keyname' => $key, "exptime > $encTimestamp" ], - __METHOD__ - ); - if ( $db->affectedRows() > 0 ) { - $newValue = $db->selectField( - $tableName, - 'value', - [ 'keyname' => $key, "exptime > $encTimestamp" ], - __METHOD__ - ); - if ( $this->isInteger( $newValue ) ) { - $newCount = (int)$newValue; - } - } - } catch ( DBError $e ) { - $this->handleWriteError( $e, $db, $shardIndex ); - } + $mtime = $this->getCurrentTime(); - $this->updateOpStats( $step >= 0 ? self::METRIC_OP_INCR : self::METRIC_OP_DECR, [ $key ] ); + $result = $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForIncrInit' ], + $mtime, + [ $key => [ $value, $init, $exptime ] ], + $flags, + $resByKey + ) ? $resByKey[$key] : false; - return $newCount; + return $result; + } + + public function incr( $key, $value = 1, $flags = 0 ) { + return $this->doIncr( $key, $value, $flags ); } public function decr( $key, $value = 1, $flags = 0 ) { - return $this->incr( $key, -$value, $flags ); + return $this->doIncr( $key, -$value, $flags ); + } + + private function doIncr( $key, $value = 1, $flags = 0 ) { + $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope ); + if ( $mtime === null ) { + // Timeout or I/O error during lock acquisition + return false; + } + + $data = $this->fetchBlobs( [ $key ] )[$key]; + if ( $data ) { + $serialValue = $data[self::BLOB_VALUE]; + if ( $this->isInteger( $serialValue ) ) { + $newValue = max( (int)$serialValue + (int)$value, 0 ); + $result = $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForSet' ], + $mtime, + // Preserve the old expiry timestamp + [ $key => [ $newValue, $data[self::BLOB_EXPIRY] ] ], + $flags + ) ? $newValue : false; + } else { + $result = false; + $this->logger->warning( __METHOD__ . ": $key is a non-integer" ); + } + } else { + $result = false; + $this->logger->debug( __METHOD__ . ": $key does not exists" ); + } + + return $result; + } + + protected function doGetMulti( array $keys, $flags = 0 ) { + $result = []; + $valueSizeByKey = []; + + $dataByKey = $this->fetchBlobs( $keys ); + foreach ( $keys as $key ) { + $data = $dataByKey[$key]; + if ( $data ) { + $serialValue = $data[self::BLOB_VALUE]; + $value = $this->unserialize( $serialValue ); + if ( $value !== false ) { + $result[$key] = $value; + } + $valueSize = strlen( $serialValue ); + } else { + $valueSize = false; + } + $valueSizeByKey[$key] = [ null, $valueSize ]; + } + + $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey ); + + return $result; } protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) { - return $this->modifyMulti( $data, $exptime, $flags, self::OP_SET ); + $mtime = $this->getCurrentTime(); + + return $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForSet' ], + $mtime, + array_map( + static function ( $value ) use ( $exptime ) { + return [ $value, $exptime ]; + }, + $data + ), + $flags + ); } protected function doDeleteMulti( array $keys, $flags = 0 ) { - return $this->modifyMulti( - array_fill_keys( $keys, null ), - 0, - $flags, - self::OP_DELETE + $mtime = $this->getCurrentTime(); + + return $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForDelete' ], + $mtime, + array_fill_keys( $keys, [] ), + $flags ); } public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) { - return $this->modifyMulti( - array_fill_keys( $keys, null ), - $exptime, - $flags, - self::OP_TOUCH + $mtime = $this->getCurrentTime(); + + return $this->modifyBlobs( + [ $this, 'modifyTableSpecificBlobsForChangeTTL' ], + $mtime, + array_fill_keys( $keys, [ $exptime ] ), + $flags ); } @@ -399,232 +494,734 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { return $this->tableName; } - protected function doGetMulti( array $keys, $flags = 0 ) { - $values = []; - $valueSizeByKey = []; + /** + * @param string[] $keys + * @param bool $getCasToken Whether to get a CAS token + * @return array Order-preserved map of (key => (value,expiry,token) or null) + */ + private function fetchBlobs( array $keys, bool $getCasToken = false ) { + // Initialize order-preserved per-key results; set values for live keys below + $dataByKey = array_fill_keys( $keys, null ); - $blobsByKey = $this->fetchBlobMulti( $keys ); + $readTime = (int)$this->getCurrentTime(); + + $keysByTableByShard = []; foreach ( $keys as $key ) { - if ( array_key_exists( $key, $blobsByKey ) ) { - $blob = $blobsByKey[$key]; - $value = $this->unserialize( $blob ); - if ( $value !== false ) { - $values[$key] = $value; - } - $valueSize = strlen( $blob ); - } else { - $valueSize = false; - } - $valueSizeByKey[$key] = [ null, $valueSize ]; + list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key ); + $keysByTableByShard[$shardIndex][$partitionTable][] = $key; } - $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey ); - - return $values; - } - - private function fetchBlobMulti( array $keys ) { - $values = []; // array of (key => value) - - $keysByTableByShardIndex = []; - foreach ( $keys as $key ) { - list( $shardIndex, $tableName ) = $this->getKeyLocation( $key ); - $keysByTableByShardIndex[$shardIndex][$tableName][] = $key; - } - - $now = $this->getCurrentTime(); - $dataRows = []; - foreach ( $keysByTableByShardIndex as $shardIndex => $serverKeys ) { + foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) { try { $db = $this->getConnection( $shardIndex ); - foreach ( $serverKeys as $tableName => $tableKeys ) { + foreach ( $serverKeys as $partitionTable => $tableKeys ) { $res = $db->select( - $tableName, - [ 'keyname', 'value', 'exptime' ], - [ 'keyname' => $tableKeys ], - __METHOD__, - // Approximate write-on-the-fly BagOStuff API via blocking. - // This approximation fails if a ROLLBACK happens (which is rare). - // We do not want to flush the TRX as that can break callers. - $db->trxLevel() ? [ 'LOCK IN SHARE MODE' ] : [] + $partitionTable, + $getCasToken + ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] ) + : [ 'keyname', 'value', 'exptime' ], + $this->buildExistenceConditions( $db, $tableKeys, $readTime ), + __METHOD__ ); - if ( $res === false ) { - continue; - } foreach ( $res as $row ) { $row->shardIndex = $shardIndex; - $row->tableName = $tableName; - $dataRows[$row->keyname] = $row; + $row->tableName = $partitionTable; + $dataByKey[$row->keyname] = $row; } } } catch ( DBError $e ) { - $this->handleReadError( $e, $shardIndex ); + $this->handleDBError( $e, $shardIndex ); } } foreach ( $keys as $key ) { - if ( isset( $dataRows[$key] ) ) { // HIT? - $row = $dataRows[$key]; - $this->debug( "get: retrieved data; expiry time is " . $row->exptime ); - $db = null; // in case of connection failure - try { - $db = $this->getConnection( $row->shardIndex ); - $expiry = $this->decodeDbExpiry( $db, $row->exptime ); - if ( $expiry !== self::TTL_INDEFINITE && $expiry < $now ) { // MISS - $this->debug( "get: key has expired" ); - } else { // HIT - $values[$key] = $db->decodeBlob( $row->value ); - } - } catch ( DBQueryError $e ) { - $this->handleWriteError( $e, $db, $row->shardIndex ); - } - } else { // MISS - $this->debug( 'get: no matching rows' ); - } - } - - return $values; - } - - /** - * @param mixed[]|null[] $data Map of (key => new value or null) - * @param int $exptime UNIX timestamp, TTL in seconds, or 0 (no expiration) - * @param int $flags Bitfield of BagOStuff::WRITE_* constants - * @param string $op Cache operation - * @return bool - */ - private function modifyMulti( array $data, $exptime, $flags, $op ) { - $keysByTableByShardIndex = []; - foreach ( $data as $key => $value ) { - list( $shardIndex, $tableName ) = $this->getKeyLocation( $key ); - $keysByTableByShardIndex[$shardIndex][$tableName][] = $key; - } - - $expiry = $this->getExpirationAsTimestamp( $exptime ); - - $result = true; - /** @noinspection PhpUnusedLocalVariableInspection */ - $silenceScope = $this->silenceTransactionProfiler(); - foreach ( $keysByTableByShardIndex as $shardIndex => $serverKeys ) { - $db = null; // in case of connection failure - try { - $db = $this->getConnection( $shardIndex ); - $this->occasionallyGarbageCollect( $db ); // expire old entries if any - $dbExpiry = $this->encodeDbExpiry( $db, $expiry ); - } catch ( DBError $e ) { - $this->handleWriteError( $e, $db, $shardIndex ); - $result = false; + $row = $dataByKey[$key] ?? null; + if ( !$row ) { continue; } - foreach ( $serverKeys as $tableName => $tableKeys ) { - try { - $result = $this->updateTable( - $op, - $db, - $tableName, - $tableKeys, - $data, - $dbExpiry - ) && $result; - } catch ( DBError $e ) { - $this->handleWriteError( $e, $db, $shardIndex ); - $result = false; - } - + $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" ); + try { + $db = $this->getConnection( $row->shardIndex ); + $dataByKey[$key] = [ + self::BLOB_VALUE => $db->decodeBlob( $row->value ), + self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ), + self::BLOB_CASTOKEN => $getCasToken + ? $this->getCasTokenFromRow( $db, $row ) + : null + ]; + } catch ( DBQueryError $e ) { + $this->handleDBError( $e, $row->shardIndex ); } } - if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) { - foreach ( $keysByTableByShardIndex as $shardIndex => $unused ) { - $result = $this->waitForReplication( $shardIndex ) && $result; - } - } - - return $result; + return $dataByKey; } /** - * @param string $op - * @param IDatabase $db - * @param string $table - * @param string[] $tableKeys Keys in $data to update - * @param mixed[]|null[] $data Map of (key => new value or null) - * @param string $dbExpiry DB-encoded expiry - * @return bool - * @throws DBError - * @throws InvalidArgumentException + * @param callable $tableWriteCallback Callback the takes the following arguments: + * - IDatabase instance + * - Partition table name string + * - UNIX modification timestamp + * - Map of (key => list of arguments) for keys belonging to the server/table partition + * - Map of (key => result) [returned] + * @param float $mtime UNIX modification timestamp + * @param array $argsByKey Map of (key => list of arguments) + * @param int $flags Bitfield of BagOStuff::WRITE_* constants + * @param array &$resByKey Order-preserved map of (key => result) [returned] + * @return bool Whether all keys were processed + * @param-taint $argsByKey none */ - private function updateTable( $op, $db, $table, $tableKeys, $data, $dbExpiry ) { - $success = true; + private function modifyBlobs( + callable $tableWriteCallback, + float $mtime, + array $argsByKey, + int $flags, + &$resByKey = [] + ) { + // Initialize order-preserved per-key results; callbacks mark successful results + $resByKey = array_fill_keys( array_keys( $argsByKey ), false ); - if ( $op === self::OP_ADD ) { - $valueSizesByKey = []; + /** @noinspection PhpUnusedLocalVariableInspection */ + $silenceScope = $this->silenceTransactionProfiler(); - $rows = []; - foreach ( $tableKeys as $key ) { - $serialized = $this->getSerialized( $data[$key], $key ); - $rows[] = [ - 'keyname' => $key, - 'value' => $db->encodeBlob( $serialized ), - 'exptime' => $dbExpiry - ]; - $valueSizesByKey[$key] = [ strlen( $serialized ), null ]; + $argsByKeyByTableByShard = []; + foreach ( $argsByKey as $key => $args ) { + list( $shardIndex, $partitionTable ) = $this->getKeyLocation( $key ); + $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args; + } + + $shardIndexesAffected = []; + foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) { + foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) { + try { + $db = $this->getConnection( $shardIndex ); + $shardIndexesAffected[] = $shardIndex; + $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey ); + } catch ( DBError $e ) { + $this->handleDBError( $e, $shardIndex ); + continue; + } } - $db->delete( - $table, - [ - 'keyname' => $tableKeys, - 'exptime <= ' . $db->addQuotes( $db->timestamp() ) - ], - __METHOD__ - ); - $db->insert( $table, $rows, __METHOD__, [ 'IGNORE' ] ); + } - $success = ( $db->affectedRows() == count( $rows ) ); + $success = !in_array( false, $resByKey, true ); - $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey ); - } elseif ( $op === self::OP_SET ) { - $valueSizesByKey = []; - - $rows = []; - foreach ( $tableKeys as $key ) { - $serialized = $this->getSerialized( $data[$key], $key ); - $rows[] = [ - 'keyname' => $key, - 'value' => $db->encodeBlob( $serialized ), - 'exptime' => $dbExpiry - ]; - $valueSizesByKey[$key] = [ strlen( $serialized ), null ]; + if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) { + foreach ( $shardIndexesAffected as $shardIndex ) { + if ( !$this->waitForReplication( $shardIndex ) ) { + $success = false; + } } - $db->replace( $table, 'keyname', $rows, __METHOD__ ); + } - $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey ); - } elseif ( $op === self::OP_DELETE ) { - $db->delete( $table, [ 'keyname' => $tableKeys ], __METHOD__ ); - - $this->updateOpStats( self::METRIC_OP_DELETE, $tableKeys ); - } elseif ( $op === self::OP_TOUCH ) { - $db->update( - $table, - [ 'exptime' => $dbExpiry ], - [ - 'keyname' => $tableKeys, - 'exptime > ' . $db->addQuotes( $db->timestamp() ) - ], - __METHOD__ - ); - - $success = ( $db->affectedRows() == count( $tableKeys ) ); - - $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, $tableKeys ); - } else { - throw new InvalidArgumentException( "Invalid operation '$op'" ); + foreach ( $shardIndexesAffected as $shardIndex ) { + try { + $db = $this->getConnection( $shardIndex ); + $this->occasionallyGarbageCollect( $db ); + } catch ( DBError $e ) { + $this->handleDBError( $e, $shardIndex ); + } } return $success; } + /** + * Set key/value pairs belonging to a partition table on the the given server + * + * In multi-primary mode, if the current row for a key exists and has a modification token + * with a greater integral UNIX timestamp than that of the provided modification timestamp, + * then the write to that key will be aborted with a "false" result. Successfully modified + * key rows will be assigned a new modification token using the provided timestamp. + * + * @param IDatabase $db Handle to the database server where the argument keys belong + * @param string $ptable Name of the partition table where the argument keys belong + * @param float $mtime UNIX modification timestamp + * @param array $argsByKey Non-empty (key => (value,exptime)) map + * @param array &$resByKey Map of (key => result) for succesful writes [returned] + * @throws DBError + */ + private function modifyTableSpecificBlobsForSet( + IDatabase $db, + string $ptable, + float $mtime, + array $argsByKey, + array &$resByKey + ) { + $valueSizesByKey = []; + + $mt = $this->makeTimestampedModificationToken( $mtime, $db ); + + // @TODO: use multi-row upsert() with VALUES() once supported in Database + foreach ( $argsByKey as $key => list( $value, $exptime ) ) { + $serialValue = $this->getSerialized( $value, $key ); + $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); + $db->upsert( + $ptable, + $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ), + [ [ 'keyname' ] ], + $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ), + __METHOD__ + ); + $resByKey[$key] = true; + + $valueSizesByKey[$key] = [ strlen( $serialValue ), null ]; + } + + $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey ); + } + + /** + * Purge/tombstone key/value pairs belonging to a partition table on the the given server + * + * In multi-primary mode, if the current row for a key exists and has a modification token + * with a greater integral UNIX timestamp than that of the provided modification timestamp, + * then the write to that key will be aborted with a "false" result. Successfully modified + * key rows will be assigned a new modification token/timestamp, an empty value, and an + * expiration timestamp dated slightly before the new modification timestamp. + * + * @param IDatabase $db Handle to the database server where the argument keys belong + * @param string $ptable Name of the partition table where the argument keys belong + * @param float $mtime UNIX modification timestamp + * @param array $argsByKey Non-empty (key => []) map + * @param array &$resByKey Map of (key => result) prefilled with false [returned] + * @throws DBError + */ + private function modifyTableSpecificBlobsForDelete( + IDatabase $db, + string $ptable, + float $mtime, + array $argsByKey, + array &$resByKey + ) { + if ( $this->isMultiPrimaryModeEnabled() ) { + // Tombstone keys in order to respect eventual consistency + $mt = $this->makeTimestampedModificationToken( $mtime, $db ); + $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime ); + $rows = []; + foreach ( $argsByKey as $key => $arg ) { + $rows[] = $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt ); + } + $db->upsert( + $ptable, + $rows, + [ [ 'keyname' ] ], + $this->buildUpsertSetForOverwrite( $db, self::TOMB_SERIAL, $expiry, $mt ), + __METHOD__ + ); + } else { + // Just purge the keys since there is only one primary (e.g. "source of truth") + $db->delete( $ptable, [ 'keyname' => array_keys( $argsByKey ) ], __METHOD__ ); + } + + foreach ( $argsByKey as $key => $arg ) { + $resByKey[$key] = true; + } + + $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) ); + } + + /** + * Insert key/value pairs belonging to a partition table on the the given server + * + * If the current row for a key exists and has an integral UNIX timestamp of expiration + * greater than that of the provided modification timestamp, then the write to that key + * will be aborted with a "false" result. Aquisition of advisory key locks must be handled + * by calling functions. + * + * In multi-primary mode, if the current row for a key exists and has a modification token + * with a greater integral UNIX timestamp than that of the provided modification timestamp, + * then the write to that key will be aborted with a "false" result. Successfully modified + * key rows will be assigned a new modification token/timestamp. + * + * @param IDatabase $db Handle to the database server where the argument keys belong + * @param string $ptable Name of the partition table where the argument keys belong + * @param float $mtime UNIX modification timestamp + * @param array $argsByKey Non-empty (key => (value,exptime)) map + * @param array &$resByKey Map of (key => result) prefilled with false [returned] + * @throws DBError + */ + private function modifyTableSpecificBlobsForAdd( + IDatabase $db, + string $ptable, + float $mtime, + array $argsByKey, + array &$resByKey + ) { + $valueSizesByKey = []; + + $mt = $this->makeTimestampedModificationToken( $mtime, $db ); + + // This check must happen outside the write query to respect eventual consistency + $existingKeys = $db->selectFieldValues( + $ptable, + 'keyname', + $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ), + __METHOD__ + ); + $existingByKey = array_fill_keys( $existingKeys, true ); + + // @TODO: use multi-row upsert() with VALUES() once supported in Database + foreach ( $argsByKey as $key => list( $value, $exptime ) ) { + if ( isset( $existingByKey[$key] ) ) { + $this->logger->debug( __METHOD__ . ": $key already exists" ); + continue; + } + + $serialValue = $this->getSerialized( $value, $key ); + $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); + $db->upsert( + $ptable, + $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ), + [ [ 'keyname' ] ], + $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ), + __METHOD__ + ); + $resByKey[$key] = true; + + $valueSizesByKey[$key] = [ strlen( $serialValue ), null ]; + } + + $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey ); + } + + /** + * Insert key/value pairs belonging to a partition table on the the given server + * + * If the current row for a key exists, has an integral UNIX timestamp of expiration greater + * than that of the provided modification timestamp, and the CAS token does not match, then + * the write to that key will be aborted with a "false" result. Aquisition of advisory key + * locks must be handled by calling functions. + * + * In multi-primary mode, if the current row for a key exists and has a modification token + * with a greater integral UNIX timestamp than that of the provided modification timestamp, + * then the write to that key will be aborted with a "false" result. Successfully modified + * key rows will be assigned a new modification token/timestamp. + * + * @param IDatabase $db Handle to the database server where the argument keys belong + * @param string $ptable Name of the partition table where the argument keys belong + * @param float $mtime UNIX modification timestamp + * @param array $argsByKey Non-empty (key => (value, exptime, CAS token)) map + * @param array &$resByKey Map of (key => result) prefilled with false [returned] + * @throws DBError + */ + private function modifyTableSpecificBlobsForCas( + IDatabase $db, + string $ptable, + float $mtime, + array $argsByKey, + array &$resByKey + ) { + $valueSizesByKey = []; + + $mt = $this->makeTimestampedModificationToken( $mtime, $db ); + + // This check must happen outside the write query to respect eventual consistency + $res = $db->select( + $ptable, + $this->addCasTokenFields( $db, [ 'keyname' ] ), + $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ), + __METHOD__ + ); + + $curTokensByKey = []; + foreach ( $res as $row ) { + $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row ); + } + + // @TODO: use multi-row upsert() with VALUES() once supported in Database + foreach ( $argsByKey as $key => list( $value, $exptime, $casToken ) ) { + $curToken = $curTokensByKey[$key] ?? null; + if ( $curToken === null ) { + $this->logger->debug( __METHOD__ . ": $key does not exists" ); + continue; + } + + if ( $curToken !== $casToken ) { + $this->logger->debug( __METHOD__ . ": $key does not have a matching token" ); + continue; + } + + $serialValue = $this->getSerialized( $value, $key ); + $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); + $db->upsert( + $ptable, + $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ), + [ [ 'keyname' ] ], + $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ), + __METHOD__ + ); + $resByKey[$key] = true; + + $valueSizesByKey[$key] = [ strlen( $serialValue ), null ]; + } + + $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey ); + } + + /** + * Update the TTL for keys belonging to a partition table on the the given server + * + * If no current row for a key exists or the current row has an integral UNIX timestamp of + * expiration less than that of the provided modification timestamp, then the write to that + * key will be aborted with a "false" result. + * + * In multi-primary mode, if the current row for a key exists and has a modification token + * with a greater integral UNIX timestamp than that of the provided modification timestamp, + * then the write to that key will be aborted with a "false" result. Successfully modified + * key rows will be assigned a new modification token/timestamp. + * + * @param IDatabase $db Handle to the database server where the argument keys belong + * @param string $ptable Name of the partition table where the argument keys belong + * @param float $mtime UNIX modification timestamp + * @param array $argsByKey Non-empty (key => (exptime)) map + * @param array &$resByKey Map of (key => result) prefilled with false [returned] + * @throws DBError + */ + private function modifyTableSpecificBlobsForChangeTTL( + IDatabase $db, + string $ptable, + float $mtime, + array $argsByKey, + array &$resByKey + ) { + if ( $this->isMultiPrimaryModeEnabled() ) { + $mt = $this->makeTimestampedModificationToken( $mtime, $db ); + + $res = $db->select( + $ptable, + [ 'keyname', 'value' ], + $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ), + __METHOD__ + ); + // @TODO: use multi-row upsert() with VALUES() once supported in Database + foreach ( $res as $curRow ) { + $key = $curRow->keyname; + $serialValue = $db->decodeBlob( $curRow->value ); + list( $exptime ) = $argsByKey[$key]; + $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); + + $db->upsert( + $ptable, + $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt ), + [ [ 'keyname' ] ], + $this->buildUpsertSetForOverwrite( $db, $serialValue, $expiry, $mt ), + __METHOD__ + ); + $resByKey[$key] = true; + } + } else { + $keysBatchesByExpiry = []; + foreach ( $argsByKey as $key => list( $exptime ) ) { + $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); + $keysBatchesByExpiry[$expiry][] = $key; + } + + $existingCount = 0; + foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) { + $db->update( + $ptable, + [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ], + $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ), + __METHOD__ + ); + $existingCount += $db->affectedRows(); + } + if ( $existingCount === count( $argsByKey ) ) { + foreach ( $argsByKey as $key => $args ) { + $resByKey[$key] = true; + } + } + } + + $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) ); + } + + /** + * Either increment a counter key, if it exists, or initialize it, otherwise + * + * If no current row for a key exists or the current row has an integral UNIX timestamp of + * expiration less than that of the provided modification timestamp, then the key row will + * be set to the initial value. Otherwise, the current row will be incremented. + * + * In multi-primary mode, if the current row for a key exists and has a modification token + * with a greater integral UNIX timestamp than that of the provided modification timestamp, + * then the write to that key will be aborted with a "false" result. Successfully initialized + * key rows will be assigned a new modification token/timestamp. + * + * @param IDatabase $db Handle to the database server where the argument keys belong + * @param string $ptable Name of the partition table where the argument keys belong + * @param float $mtime UNIX modification timestamp + * @param array $argsByKey Non-empty (key => (step, init, exptime) map + * @param array &$resByKey Map of (key => result) prefilled with false [returned] + * @throws DBError + */ + private function modifyTableSpecificBlobsForIncrInit( + IDatabase $db, + string $ptable, + float $mtime, + array $argsByKey, + array &$resByKey + ) { + foreach ( $argsByKey as $key => list( $step, $init, $exptime ) ) { + $mt = $this->makeTimestampedModificationToken( $mtime, $db ); + $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime ); + + // Use a transaction so that changes from other threads are not visible due to + // "consistent reads". This way, the exact post-increment value can be returned. + // The "live key exists" check can go inside the write query and remain safe for + // replication since the TTL for such keys is either indefinite or very short. + $db->startAtomic( __METHOD__ ); + $db->upsert( + $ptable, + $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ), + [ [ 'keyname' ] ], + $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ), + __METHOD__ + ); + $affectedCount = $db->affectedRows(); + $row = $db->selectRow( $ptable, 'value', [ 'keyname' => $key ], __METHOD__ ); + $db->endAtomic( __METHOD__ ); + + if ( !$affectedCount || $row === false ) { + $this->logger->warning( __METHOD__ . ": failed to set new $key value" ); + continue; + } + + $serialValue = $db->decodeBlob( $row->value ); + if ( !$this->isInteger( $serialValue ) ) { + $this->logger->warning( __METHOD__ . ": got non-integer $key value" ); + continue; + } + + $resByKey[$key] = (int)$serialValue; + } + + $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) ); + } + + /** + * @param int $exptime Relative or absolute expiration + * @param int $nowTsUnix Current UNIX timestamp + * @return int UNIX timestamp or TTL_INDEFINITE + */ + private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) { + $expiry = $this->getExpirationAsTimestamp( $exptime ); + // Eventual consistency requires the preservation of recently modified keys. + // Do not create rows with `exptime` fields so low that they might get garbage + // collected before being replicated. + if ( $expiry !== self::TTL_INDEFINITE ) { + $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC ); + } + + return $expiry; + } + + /** + * Get a scoped lock and modification timestamp for a critical section of reads/writes + * + * This is used instead of BagOStuff::getCurrentTime() for certain writes (such as "add", + * "incr", and "cas"), for which we want to support tight race conditions where the same + * key is repeatedly written to by multiple web servers that each get to see the previous + * value, act on it, and modify it in some way. + * + * It is assumed that this method is normally only invoked from the primary datacenter. + * A lock is acquired on the primary server of the local datacenter in order to avoid race + * conditions within the critical section. The clock on the SQL server is used to get the + * modification timestamp in order to minimize issues with clock drift between web servers; + * thus key writes will not be rejected due to some web servers having lagged clocks. + * + * @param string $key + * @param ?ScopedCallback &$scope Unlocker callback; null on failure [returned] + * @return string|null UNIX timestamp with 6 decimal places; null on failure + */ + private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) { + if ( !$this->lock( $key, 0 ) ) { + return null; + } + + $scope = new ScopedCallback( function () use ( $key ) { + $this->unlock( $key ); + } ); + + return sprintf( '%.6f', $this->locks[$key][self::LOCK_TIME] ); + } + + /** + * Make a `modtoken` column value with the original time and source database server of a write + * + * @param int|float $mtime UNIX modification timestamp + * @param IDatabase $db Handle to the primary database server sourcing the write + * @return string String of the form "", where SECONDS_SOURCE + * is "<35 bit seconds portion of UNIX time><32 bit database server ID>" as 13 base 36 chars, + * and MICROSECONDS is "<20 bit microseconds portion of UNIX time>" as 4 base 36 chars + */ + private function makeTimestampedModificationToken( $mtime, IDatabase $db ) { + // We have reserved space for upto 6 digits in the microsecond portion of the token. + // This is for future use only (maybe CAS tokens) and not currently used. + // It is currently populated by the microsecond portion returned by microtime, + // which generally has fewer than 6 digits of meaningful precision but can still be useful + // in debugging (to see the token continuously change even during rapid testing). + $seconds = (int)$mtime; + list( , $microseconds ) = explode( '.', sprintf( '%.6f', $mtime ) ); + + $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) ); + + $token = implode( '', [ + // 67 bit integral portion of UNIX timestamp, qualified + \Wikimedia\base_convert( + // 35 bit integral seconds portion of UNIX timestamp + str_pad( base_convert( $seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) . + // 32 bit ID of the primary database server handling the write + str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ), + 2, + 36, + 13 + ), + // 20 bit fractional portion of UNIX timestamp, as integral microseconds + str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT ) + ] ); + + if ( strlen( $token ) !== 17 ) { + throw new RuntimeException( "Modification timestamp overflow detected" ); + } + + return $token; + } + + /** + * WHERE conditions that check for existence and liveness of keys + * + * @param IDatabase $db + * @param string[]|string $keys + * @param string $time UNIX modification timestamp with 6 decimal places + * @return array + */ + private function buildExistenceConditions( IDatabase $db, $keys, string $time ) { + // Note that tombstones always have past expiration dates + return [ + 'keyname' => $keys, + 'exptime >= ' . $db->addQuotes( $db->timestamp( (int)$time ) ) + ]; + } + + /** + * INSERT array for handling key writes/overwrites when no live nor stale key exists + * + * @param IDatabase $db + * @param string $key + * @param string|int $serialValue New value + * @param int $expiry Expiration timestamp or TTL_INDEFINITE + * @param string $mt Modification token + * @return array + */ + private function buildUpsertRow( + IDatabase $db, + $key, + $serialValue, + int $expiry, + string $mt + ) { + $row = [ + 'keyname' => $key, + 'value' => $db->encodeBlob( $serialValue ), + 'exptime' => $this->encodeDbExpiry( $db, $expiry ) + ]; + if ( $this->isMultiPrimaryModeEnabled() ) { + $row['modtoken'] = $mt; + } + + return $row; + } + + /** + * SET array for handling key overwrites when a live or stale key exists + * + * @param IDatabase $db + * @param string|int $serialValue New value + * @param int $expiry Expiration timestamp or TTL_INDEFINITE + * @param string $mt Modification token + * @return array + */ + private function buildUpsertSetForOverwrite( + IDatabase $db, + $serialValue, + int $expiry, + string $mt + ) { + $expressionsByColumn = [ + 'value' => $db->addQuotes( $db->encodeBlob( $serialValue ) ), + 'exptime' => $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) ) + ]; + + $set = []; + if ( $this->isMultiPrimaryModeEnabled() ) { + // The query might take a while to replicate, during which newer values might get + // written. Qualify the query so that it does not override such values. Note that + // duplicate tokens generated around the same time for a key should still result + // in convergence given the use of server_id in modtoken (providing a total order + // among primary DB servers) and MySQL binlog ordering (providing a total order + // for writes replicating from a given primary DB server). + $expressionsByColumn['modtoken'] = $db->addQuotes( $mt ); + foreach ( $expressionsByColumn as $column => $updateExpression ) { + $rhs = $db->conditional( + $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= SUBSTR(modtoken,0,13)', + $updateExpression, + $column + ); + $set[] = "{$column}=" . trim( $rhs ); + } + } else { + foreach ( $expressionsByColumn as $column => $updateExpression ) { + $set[] = "{$column}={$updateExpression}"; + } + } + + return $set; + } + + /** + * SET array for handling key overwrites when a live or stale key exists + * + * @param IDatabase $db + * @param int $step Positive counter incrementation value + * @param int $init Positive initial counter value + * @param int $expiry Expiration timestamp or TTL_INDEFINITE + * @param string $mt Modification token + * @param int $mtUnixTs UNIX timestamp of modification token + * @return array + */ + private function buildIncrUpsertSet( + IDatabase $db, + int $step, + int $init, + int $expiry, + string $mt, + int $mtUnixTs + ) { + // Map of (column => (SQL for non-expired key rows, SQL for expired key rows)) + $expressionsByColumn = [ + 'value' => [ "value + {$db->addQuotes( $step )}", $db->addQuotes( $init ) ], + 'exptime' => [ 'exptime', $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) ) ] + ]; + if ( $this->isMultiPrimaryModeEnabled() ) { + $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ]; + } + + $set = []; + foreach ( $expressionsByColumn as $column => list( $updateExpression, $initExpression ) ) { + $rhs = $db->conditional( + 'exptime >= ' . $db->addQuotes( $db->timestamp( $mtUnixTs ) ), + $updateExpression, + $initExpression + ); + $set[] = "{$column}=" . trim( $rhs ); + } + + return $set; + } + /** * @param IDatabase $db * @param int $expiry UNIX timestamp of expiration or TTL_INDEFINITE @@ -641,12 +1238,64 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { /** * @param IDatabase $db * @param string $dbExpiry DB timestamp of expiration - * @return string UNIX timestamp of expiration or TTL_INDEFINITE + * @return int UNIX timestamp of expiration or TTL_INDEFINITE */ private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) { return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) ) ? self::TTL_INDEFINITE - : ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry ); + : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry ); + } + + /** + * Either append a 'castoken' field or append the fields needed to compute the CAS token + * + * @param IDatabase $db + * @param string[] $fields SELECT field array + * @return string[] SELECT field array + */ + private function addCasTokenFields( IDatabase $db, array $fields ) { + $type = $db->getType(); + + if ( $type === 'mysql' ) { + $fields['castoken'] = $db->buildConcat( [ + 'SHA1(value)', + $db->addQuotes( '@' ), + 'exptime' + ] ); + } elseif ( $type === 'postgres' ) { + $fields['castoken'] = $db->buildConcat( [ + 'md5(value)', + $db->addQuotes( '@' ), + 'exptime' + ] ); + } else { + if ( !in_array( 'value', $fields, true ) ) { + $fields[] = 'value'; + } + if ( !in_array( 'exptime', $fields, true ) ) { + $fields[] = 'exptime'; + } + } + + return $fields; + } + + /** + * Get a CAS token from a SELECT result row + * + * @param IDatabase $db + * @param stdClass $row A row for a key + * @return string CAS token + */ + private function getCasTokenFromRow( IDatabase $db, stdClass $row ) { + if ( isset( $row->castoken ) ) { + $curToken = $row->castoken; + } else { + $curToken = sha1( $db->decodeBlob( $row->value ) ) . '@' . $row->exptime; + $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" ); + } + + return $curToken; } /** @@ -665,7 +1314,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { $garbageCollector = function () use ( $db ) { $this->deleteServerObjectsExpiringBefore( $db, - $this->getCurrentTime(), + (int)$this->getCurrentTime(), $this->purgeLimit ); $this->lastGarbageCollect = time(); @@ -680,7 +1329,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { } public function expireAll() { - $this->deleteObjectsExpiringBefore( $this->getCurrentTime() ); + $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() ); } public function deleteObjectsExpiringBefore( @@ -705,7 +1354,6 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { $keysDeletedCount = 0; foreach ( $shardIndexes as $numServersDone => $shardIndex ) { - $db = null; // in case of connection failure try { $db = $this->getConnection( $shardIndex ); $this->deleteServerObjectsExpiringBefore( @@ -716,7 +1364,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ] ); } catch ( DBError $e ) { - $this->handleWriteError( $e, $db, $shardIndex ); + $this->handleDBError( $e, $shardIndex ); $ok = false; } } @@ -740,6 +1388,18 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { array $progress = null ) { $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp ); + if ( $this->isMultiPrimaryModeEnabled() ) { + // Eventual consistency requires the preservation of any key that was recently + // modified. The key must exist on this database server long enough for the server + // to receive, via replication, all writes to the key with lower timestamps. Such + // writes should be no-ops since the existing key value should "win". If the network + // partitions between datacenters A and B for 30 minutes, the database servers in + // each datacenter will see an initial burst of writes with "old" timestamps via + // replication. This might include writes with lower timestamps that the existing + // key value. Therefore, clock skew and replication delay are both factors. + $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC; + $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe ); + } $tableIndexes = range( 0, $this->numTableShards - 1 ); shuffle( $tableIndexes ); @@ -825,65 +1485,57 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { /** @noinspection PhpUnusedLocalVariableInspection */ $silenceScope = $this->silenceTransactionProfiler(); foreach ( $this->getServerShardIndexes() as $shardIndex ) { - $db = null; // in case of connection failure try { $db = $this->getConnection( $shardIndex ); for ( $i = 0; $i < $this->numTableShards; $i++ ) { $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ ); } } catch ( DBError $e ) { - $this->handleWriteError( $e, $db, $shardIndex ); + $this->handleDBError( $e, $shardIndex ); return false; } } return true; } - protected function doLock( $key, $timeout = 6, $exptime = 6 ) { - list( $shardIndex ) = $this->getKeyLocation( $key ); + public function doLock( $key, $timeout = 6, $exptime = 6 ) { + /** @noinspection PhpUnusedLocalVariableInspection */ + $silenceScope = $this->silenceTransactionProfiler(); $lockTsUnix = null; - $db = null; // in case of connection failure + list( $shardIndex ) = $this->getKeyLocation( $key ); try { $db = $this->getConnection( $shardIndex ); $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP ); - if ( $lockTsUnix === null ) { - $this->logger->warning( - __METHOD__ . " failed due to timeout for {key}.", - [ 'key' => $key, 'timeout' => $timeout ] - ); - } } catch ( DBError $e ) { - $this->handleWriteError( $e, $db, $shardIndex ); + $this->handleDBError( $e, $shardIndex ); + $this->logger->warning( + __METHOD__ . ' failed due to I/O error for {key}.', + [ 'key' => $key ] + ); } return $lockTsUnix; } - protected function doUnlock( $key ) { + public function doUnlock( $key ) { + /** @noinspection PhpUnusedLocalVariableInspection */ + $silenceScope = $this->silenceTransactionProfiler(); + list( $shardIndex ) = $this->getKeyLocation( $key ); - $db = null; // in case of connection failure try { $db = $this->getConnection( $shardIndex ); $released = $db->unlock( $key, __METHOD__ ); } catch ( DBError $e ) { - $this->handleWriteError( $e, $db, $shardIndex ); + $this->handleDBError( $e, $shardIndex ); $released = false; } return $released; } - /** - * Construct a cache key. - * - * @since 1.35 - * @param string $keyspace - * @param array $components - * @return string - */ public function makeKeyInternal( $keyspace, $components ) { // SQL schema for 'objectcache' specifies keys as varchar(255). From that, // subtract the number of characters we need for the keyspace and for @@ -910,14 +1562,6 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { return $keyspace . ':' . implode( ':', $components ); } - /** - * Serialize an object and, if possible, compress the representation. - * On typical message and page data, this can provide a 3X decrease - * in storage requirements. - * - * @param mixed $value - * @return string|int - */ protected function serialize( $value ) { if ( is_int( $value ) ) { return $value; @@ -925,18 +1569,18 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { $serial = serialize( $value ); if ( function_exists( 'gzdeflate' ) ) { + // On typical message and page data, this can provide a 3X storage savings $serial = gzdeflate( $serial ); } return $serial; } - /** - * Unserialize and, if necessary, decompress an object. - * @param string $value - * @return mixed - */ protected function unserialize( $value ) { + if ( $value === self::TOMB_SERIAL ) { + return false; // tombstone + } + if ( $this->isInteger( $value ) ) { return (int)$value; } @@ -986,17 +1630,23 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { private function getConnectionFromServerInfo( $shardIndex, array $server ) { if ( !isset( $this->conns[$shardIndex] ) ) { /** @var IMaintainableDatabase Auto-commit connection to the server */ - $conn = Database::factory( $server['type'], array_merge( - $server, - [ - 'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX, - 'connLogger' => $this->logger, - 'queryLogger' => $this->logger - ] - ) ); + $conn = Database::factory( + $server['type'], + array_merge( + $server, + [ + // Make sure the handle uses autocommit mode + 'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX, + 'connLogger' => $this->logger, + 'queryLogger' => $this->logger + ] + ) + ); // Automatically create the objectcache table for sqlite as needed - if ( $conn->getType() === 'sqlite' && !$conn->tableExists( 'objectcache', __METHOD__ ) ) { - $this->initSqliteDatabase( $conn ); + if ( $conn->getType() === 'sqlite' ) { + if ( !$conn->tableExists( 'objectcache', __METHOD__ ) ) { + $this->initSqliteDatabase( $conn ); + } } $this->conns[$shardIndex] = $conn; } @@ -1008,9 +1658,9 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { * Handle a DBError which occurred during a read operation. * * @param DBError $exception - * @param int $shardIndex + * @param int|string $shardIndex Server index or self::SHARD_LOCAL/self::SHARD_GLOBAL */ - private function handleReadError( DBError $exception, $shardIndex ) { + private function handleDBError( DBError $exception, $shardIndex ) { if ( $exception instanceof DBConnectionError ) { $this->markServerDown( $exception, $shardIndex ); } @@ -1018,22 +1668,6 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { $this->setAndLogDBError( $exception ); } - /** - * Handle a DBQueryError which occurred during a write operation. - * - * @param DBError $exception - * @param IDatabase|null $db DB handle or null if connection failed - * @param int $shardIndex - * @throws Exception - */ - private function handleWriteError( DBError $exception, $db, $shardIndex ) { - if ( !( $db instanceof IDatabase ) ) { - $this->markServerDown( $exception, $shardIndex ); - } - - $this->setAndLogDBError( $exception ); - } - /** * @param DBError $exception */ @@ -1041,10 +1675,10 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { $this->logger->error( "DBError: {$exception->getMessage()}" ); if ( $exception instanceof DBConnectionError ) { $this->setLastError( BagOStuff::ERR_UNREACHABLE ); - $this->logger->debug( __METHOD__ . ": ignoring connection error" ); + $this->logger->warning( __METHOD__ . ": ignoring connection error" ); } else { $this->setLastError( BagOStuff::ERR_UNEXPECTED ); - $this->logger->debug( __METHOD__ . ": ignoring query error" ); + $this->logger->warning( __METHOD__ . ": ignoring query error" ); } } @@ -1052,7 +1686,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { * Mark a server down due to a DBConnectionError exception * * @param DBError $exception - * @param int $shardIndex + * @param int|string $shardIndex Server index or self::SHARD_LOCAL/self::SHARD_GLOBAL */ private function markServerDown( DBError $exception, $shardIndex ) { unset( $this->conns[$shardIndex] ); // bug T103435 @@ -1156,6 +1790,13 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { throw new InvalidArgumentException( "Unknown server tag: $tag" ); } + /** + * @return bool Whether queries should be multi-primary safe + */ + private function isMultiPrimaryModeEnabled() { + return ( $this->multiPrimaryModeType !== null ); + } + /** * Wait for replica DBs to catch up to the primary DB * diff --git a/tests/phpunit/includes/libs/objectcache/BagOStuffTestBase.php b/tests/phpunit/includes/libs/objectcache/BagOStuffTestBase.php index debce9ca05f..6a67866de2f 100644 --- a/tests/phpunit/includes/libs/objectcache/BagOStuffTestBase.php +++ b/tests/phpunit/includes/libs/objectcache/BagOStuffTestBase.php @@ -13,14 +13,16 @@ abstract class BagOStuffTestBase extends MediaWikiIntegrationTestCase { private $cache; private const TEST_KEY = 'test'; + private const TEST_TIME = 1563892142; protected function setUp(): void { parent::setUp(); $this->cache = $this->newCacheInstance(); - - $this->cache->delete( $this->cache->makeKey( self::TEST_KEY ) ); - $this->cache->delete( $this->cache->makeKey( self::TEST_KEY ) . ':lock' ); + $this->cache->deleteMulti( [ + $this->cache->makeKey( self::TEST_KEY ), + $this->cache->makeKey( self::TEST_KEY ) . ':lock' + ] ); } /** @@ -123,9 +125,6 @@ abstract class BagOStuffTestBase extends MediaWikiIntegrationTestCase { * @covers MediumSpecificBagOStuff::changeTTL */ public function testChangeTTLRenew() { - $now = microtime( true ); // need real time - $this->cache->setMockTime( $now ); - $key = $this->cache->makeKey( self::TEST_KEY ); $value = 'meow'; @@ -144,39 +143,36 @@ abstract class BagOStuffTestBase extends MediaWikiIntegrationTestCase { * @covers MediumSpecificBagOStuff::changeTTL */ public function testChangeTTLExpireRel() { - $now = microtime( true ); // need real time - $this->cache->setMockTime( $now ); - $key = $this->cache->makeKey( self::TEST_KEY ); $value = 'meow'; $this->cache->add( $key, $value, 5 ); + $this->assertSame( $value, $this->cache->get( $key ) ); $this->assertTrue( $this->cache->changeTTL( $key, -3600 ) ); $this->assertFalse( $this->cache->get( $key ) ); + $this->assertFalse( $this->cache->changeTTL( $key, -3600 ) ); } /** * @covers MediumSpecificBagOStuff::changeTTL */ public function testChangeTTLExpireAbs() { - $now = microtime( true ); // need real time - $this->cache->setMockTime( $now ); - $key = $this->cache->makeKey( self::TEST_KEY ); $value = 'meow'; $this->cache->add( $key, $value, 5 ); - $this->assertTrue( $this->cache->changeTTL( $key, $now - 3600 ) ); + $this->assertSame( $value, $this->cache->get( $key ) ); + + $now = $this->cache->getCurrentTime(); + $this->assertTrue( $this->cache->changeTTL( $key, (int)$now - 3600 ) ); $this->assertFalse( $this->cache->get( $key ) ); + $this->assertFalse( $this->cache->changeTTL( $key, (int)$now - 3600 ) ); } /** * @covers MediumSpecificBagOStuff::changeTTLMulti */ public function testChangeTTLMulti() { - $now = 1563892142; - $this->cache->setMockTime( $now ); - $key1 = $this->cache->makeKey( 'test-key1' ); $key2 = $this->cache->makeKey( 'test-key2' ); $key3 = $this->cache->makeKey( 'test-key3' ); @@ -206,11 +202,11 @@ abstract class BagOStuffTestBase extends MediaWikiIntegrationTestCase { $this->assertFalse( $ok, "One key missing" ); $this->assertSame( 1, $this->cache->get( $key1 ), "Key still live" ); - $now = microtime( true ); // real time $ok = $this->cache->setMulti( [ $key1 => 1, $key2 => 2, $key3 => 3 ] ); $this->assertTrue( $ok, "setMulti() succeeded" ); - $ok = $this->cache->changeTTLMulti( [ $key1, $key2, $key3 ], $now + 86400 ); + $now = $this->cache->getCurrentTime(); + $ok = $this->cache->changeTTLMulti( [ $key1, $key2, $key3 ], (int)$now + 86400 ); $this->assertTrue( $ok, "Expiry set for all keys" ); $this->assertSame( 1, $this->cache->get( $key1 ), "Key still live" ); @@ -240,7 +236,7 @@ abstract class BagOStuffTestBase extends MediaWikiIntegrationTestCase { $key = $this->cache->makeKey( self::TEST_KEY ); $this->cache->add( $key, $value, 5 ); - $this->assertEquals( $this->cache->get( $key ), $value ); + $this->assertSame( $this->cache->get( $key ), $value ); } /** @@ -249,7 +245,7 @@ abstract class BagOStuffTestBase extends MediaWikiIntegrationTestCase { * @covers MediumSpecificBagOStuff::getWithSetCallback */ public function testGetWithSetCallback() { - $now = 1563892142; + $now = self::TEST_TIME; $cache = new HashBagOStuff( [] ); $cache->setMockTime( $now ); $key = $cache->makeKey( self::TEST_KEY ); @@ -504,11 +500,4 @@ abstract class BagOStuffTestBase extends MediaWikiIntegrationTestCase { $this->assertTrue( $this->cache->unlock( $key2 ) ); $this->assertTrue( $this->cache->unlock( $key2 ) ); } - - protected function tearDown(): void { - $this->cache->delete( $this->cache->makeKey( self::TEST_KEY ) ); - $this->cache->delete( $this->cache->makeKey( self::TEST_KEY ) . ':lock' ); - - parent::tearDown(); - } }