diff --git a/includes/ServiceWiring.php b/includes/ServiceWiring.php index 6a3951403ac..a3fe093ba5e 100644 --- a/includes/ServiceWiring.php +++ b/includes/ServiceWiring.php @@ -104,6 +104,7 @@ use Wikimedia\DependencyStore\KeyValueDependencyStore; use Wikimedia\DependencyStore\SqlModuleDependencyStore; use Wikimedia\Message\IMessageFormatterFactory; use Wikimedia\ObjectFactory; +use Wikimedia\Services\RecursiveServiceDependencyException; use Wikimedia\UUID\GlobalIdGenerator; return [ @@ -245,13 +246,30 @@ return [ function ( MediaWikiServices $services ) : Wikimedia\Rdbms\LBFactory { $mainConfig = $services->getMainConfig(); + try { + $stash = $services->getMainObjectStash(); + } catch ( RecursiveServiceDependencyException $e ) { + $stash = new EmptyBagOStuff(); // T141804: handle cases like CACHE_DB + } + + if ( $stash instanceof EmptyBagOStuff ) { + // Use process cache if the main stash is disabled or there was recursion + $stash = new HashBagOStuff( [ 'maxKeys' => 100 ] ); + } + + try { + $wanCache = $services->getMainWANObjectCache(); + } catch ( RecursiveServiceDependencyException $e ) { + $wanCache = WANObjectCache::newEmpty(); // T141804: handle cases like CACHE_DB + } + $lbConf = MWLBFactory::applyDefaultConfig( $mainConfig->get( 'LBFactoryConf' ), new ServiceOptions( MWLBFactory::APPLY_DEFAULT_CONFIG_OPTIONS, $mainConfig ), $services->getConfiguredReadOnlyMode(), $services->getLocalServerObjectCache(), - $services->getMainObjectStash(), - $services->getMainWANObjectCache() + $stash, + $wanCache ); $class = MWLBFactory::getLBFactoryClass( $lbConf ); diff --git a/includes/db/MWLBFactory.php b/includes/db/MWLBFactory.php index b5e70cdbaaa..75bc17a270c 100644 --- a/includes/db/MWLBFactory.php +++ b/includes/db/MWLBFactory.php @@ -157,7 +157,9 @@ abstract class MWLBFactory { $options->get( 'DBprefix' ) ); - $lbConf = self::injectObjectCaches( $lbConf, $srvCache, $mainStash, $wanCache ); + $lbConf['srvCache'] = $srvCache; + $lbConf['memStash'] = $mainStash; + $lbConf['wanCache'] = $wanCache; return $lbConf; } @@ -214,35 +216,6 @@ abstract class MWLBFactory { return $server; } - /** - * @param array $lbConf - * @param BagOStuff $sCache - * @param BagOStuff $mStash - * @param WANObjectCache $wCache - * @return array - */ - private static function injectObjectCaches( - array $lbConf, BagOStuff $sCache, BagOStuff $mStash, WANObjectCache $wCache - ) { - // Fallback if APC style caching is not an option - if ( $sCache instanceof EmptyBagOStuff ) { - $sCache = new HashBagOStuff( [ 'maxKeys' => 100 ] ); - } - - // Use APC/memcached style caching, but avoids loops with CACHE_DB (T141804) - if ( $sCache->getQoS( $sCache::ATTR_EMULATION ) > $sCache::QOS_EMULATION_SQL ) { - $lbConf['srvCache'] = $sCache; - } - if ( $mStash->getQoS( $mStash::ATTR_EMULATION ) > $mStash::QOS_EMULATION_SQL ) { - $lbConf['memStash'] = $mStash; - } - if ( $wCache->getQoS( $wCache::ATTR_EMULATION ) > $wCache::QOS_EMULATION_SQL ) { - $lbConf['wanCache'] = $wCache; - } - - return $lbConf; - } - /** * @param array $servers * @param string $ldDB Local domain database name diff --git a/includes/objectcache/ObjectCache.php b/includes/objectcache/ObjectCache.php index 053e6c7cc0e..6226f91b1ef 100644 --- a/includes/objectcache/ObjectCache.php +++ b/includes/objectcache/ObjectCache.php @@ -175,6 +175,12 @@ class ObjectCache { $server['dbDirectory'] = $conf->get( 'SQLiteDataDir' ); } } + } elseif ( !isset( $params['localKeyLB'] ) ) { + $params['localKeyLB'] = [ + 'factory' => function () { + return MediaWikiServices::getInstance()->getDBLoadBalancer(); + } + ]; } } diff --git a/includes/objectcache/SqlBagOStuff.php b/includes/objectcache/SqlBagOStuff.php index 0b61626fcc8..1455d0edeed 100644 --- a/includes/objectcache/SqlBagOStuff.php +++ b/includes/objectcache/SqlBagOStuff.php @@ -21,13 +21,14 @@ * @ingroup Cache */ -use MediaWiki\MediaWikiServices; use Wikimedia\AtEase\AtEase; +use Wikimedia\ObjectFactory; use Wikimedia\Rdbms\Database; use Wikimedia\Rdbms\DBConnectionError; use Wikimedia\Rdbms\DBError; use Wikimedia\Rdbms\DBQueryError; use Wikimedia\Rdbms\IDatabase; +use Wikimedia\Rdbms\ILoadBalancer; use Wikimedia\Rdbms\IMaintainableDatabase; use Wikimedia\ScopedCallback; use Wikimedia\Timestamp\ConvertibleTimestamp; @@ -39,11 +40,16 @@ use Wikimedia\WaitConditionLoop; * @ingroup Cache */ class SqlBagOStuff extends MediumSpecificBagOStuff { + /** @var ILoadBalancer|null */ + protected $localKeyLb; + /** @var ILoadBalancer|null */ + protected $globalKeyLb; + /** @var array[] (server index => server config) */ - protected $serverInfos; + protected $serverInfos = []; /** @var string[] (server index => tag/host name) */ - protected $serverTags; - /** @var int */ + protected $serverTags = []; + /** @var int Number of database servers shards (e.g. horizontal/vertical partitions) */ protected $numServerShards; /** @var int UNIX timestamp */ protected $lastGarbageCollect = 0; @@ -58,11 +64,11 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { /** @var bool */ protected $replicaOnly = false; - /** @var array */ + /** @var IMaintainableDatabase[] Map of (shard index => DB handle) */ protected $conns; - /** @var array UNIX timestamps */ + /** @var float[] Map of (shard index => UNIX timestamps) */ protected $connFailureTimes = []; - /** @var array Exceptions */ + /** @var Exception[] Map of (shard index => Exception) */ protected $connFailureErrors = []; /** @var int */ @@ -77,42 +83,44 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { /** @var string */ private static $OP_DELETE = 'delete'; + /** @var string */ + private const SHARD_LOCAL = 'local'; + /** @var string */ + private const SHARD_GLOBAL = 'global'; + /** * Constructor. Parameters are: - * - server: A server info structure in the format required by each - * element in $wgDBServers. - * - * - servers: An array of server info structures describing a set of database servers - * to distribute keys to. If this is specified, the "server" option will be - * ignored. If string keys are used, then they will be used for consistent - * hashing *instead* of the host name (from the server config). This is useful - * when a cluster is replicated to another site (with different host names) - * but each server has a corresponding replica in the other cluster. - * - * - purgePeriod: The average number of object cache writes in between - * garbage collection operations, where expired entries - * are removed from the database. Or in other words, the - * reciprocal of the probability of purging on any given - * write. If this is set to zero, purging will never be done. - * - * - purgeLimit: Maximum number of rows to purge at once. - * - * - tableName: The table name to use, default is "objectcache". - * - * - shards: The number of tables to use for data storage on each server. - * If this is more than 1, table names will be formed in the style - * objectcacheNNN where NNN is the shard index, between 0 and - * shards-1. The number of digits will be the minimum number - * required to hold the largest shard index. Data will be - * distributed across all tables by key hash. This is for - * MySQL bugs 61735 - * and 61736 . - * - * - replicaOnly: Whether to only use replica DBs and avoid triggering - * garbage collection logic of expired items. This only - * makes sense if the primary DB is used and only if get() - * calls will be used. This is used by ReplicatedBagOStuff. - * - syncTimeout: Max seconds to wait for replica DBs to catch up for WRITE_SYNC. + * - server: Server config map for Database::factory() that describes the database + * to use for all key operations. This overrides "localKeyLB" and "globalKeyLB". + * - servers: Array of server config maps, each for Database::factory(), describing a set + * of database servers to distribute keys to. If this is specified, the "server" option + * will be ignored. If string keys are used, then they will be used for consistent + * hashing *instead* of the host name (from the server config). This is useful + * when a cluster is replicated to another site (with different host names) + * but each server has a corresponding replica in the other cluster. + * - localKeyLB: ObjectFactory::getObjectFromSpec array yeilding ILoadBalancer. + * This load balancer is used for local keys, e.g. those using makeKey(). + * This is overriden by 'server'/'servers'. + * - globalKeyLB: ObjectFactory::getObjectFromSpec array yeilding ILoadBalancer. + * This load balancer is used for local keys, e.g. those using makeGlobalKey(). + * This is overriden by 'server'/'servers'. + * - purgePeriod: The average number of object cache writes in between arbage collection + * operations, where expired entries are removed from the database. Or in other words, + * the reciprocal of the probability of purging on any given write. If this is set to + * zero, purging will never be done. [optional] + * - purgeLimit: Maximum number of rows to purge at once. [optional] + * - tableName: The table name to use, default is "objectcache". [optional] + * - shards: The number of tables to use for data storage on each server. + * If this is more than 1, table names will be formed in the style objectcacheNNN where + * NNN is the shard index, between 0 and shards-1. The number of digits will be the + * minimum number required to hold the largest shard index. Data will be distributed + * across all tables by key hash. This is for MySQL bugs 61735 + * and 61736 + * . [optional] + * - replicaOnly: Whether to only use replica DBs and avoid triggering garbage collection + * logic of expired items. This only makes sense if the primary DB is used and only if + * get() calls will be used. This is used by ReplicatedBagOStuff. [optional] + * - syncTimeout: Max seconds to wait for replica DBs to catch up for WRITE_SYNC. [optional] * * @param array $params */ @@ -120,29 +128,36 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { parent::__construct( $params ); $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL; - $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE; - if ( isset( $params['servers'] ) ) { - $this->serverInfos = []; - $this->serverTags = []; - $this->numServerShards = count( $params['servers'] ); + if ( isset( $params['servers'] ) || isset( $params['server'] ) ) { $index = 0; - foreach ( $params['servers'] as $tag => $info ) { + foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) { $this->serverInfos[$index] = $info; - if ( is_string( $tag ) ) { - $this->serverTags[$index] = $tag; - } else { - $this->serverTags[$index] = $info['host'] ?? "#$index"; - } + $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index"; ++$index; } - } elseif ( isset( $params['server'] ) ) { - $this->serverInfos = [ $params['server'] ]; + // Horizontal partitioning by key hash (if any) $this->numServerShards = count( $this->serverInfos ); + $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE; } else { - // Default to using the main wiki's database servers - $this->serverInfos = []; - $this->numServerShards = 1; + if ( isset( $params['localKeyLB'] ) ) { + $this->localKeyLb = ( $params['localKeyLB'] instanceof ILoadBalancer ) + ? $params['localKeyLB'] + : ObjectFactory::getObjectFromSpec( $params['localKeyLB'] ); + } + if ( isset( $params['globalKeyLB'] ) ) { + $this->globalKeyLb = ( $params['globalKeyLB'] instanceof ILoadBalancer ) + ? $params['globalKeyLB'] + : ObjectFactory::getObjectFromSpec( $params['globalKeyLB'] ); + } + $this->localKeyLb = $this->localKeyLb ?: $this->globalKeyLb; + if ( !$this->localKeyLb ) { + throw new InvalidArgumentException( + "Config requires 'server', 'servers', or 'localKeyLB'/'globalKeyLB'" + ); + } + // Verticle partitioning by global vs local keys (if any) + $this->numServerShards = ( $this->localKeyLb === $this->globalKeyLb ) ? 1 : 2; $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE; } if ( isset( $params['purgePeriod'] ) ) { @@ -164,16 +179,13 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { /** * Get a connection to the specified database * - * @param int $shardIndex + * @param int|string $shardIndex Server index or self::SHARD_LOCAL/self::SHARD_GLOBAL * @return IMaintainableDatabase - * @throws MWException + * @throws DBConnectionError + * @throws UnexpectedValueException */ private function getConnection( $shardIndex ) { - if ( $shardIndex >= $this->numServerShards ) { - throw new MWException( __METHOD__ . ": Invalid server index \"$shardIndex\"" ); - } - - # Don't keep timing out trying to connect for each call if the DB is down + // Don't keep timing out trying to connect if the server is down if ( isset( $this->connFailureErrors[$shardIndex] ) && ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60 @@ -181,75 +193,70 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { throw $this->connFailureErrors[$shardIndex]; } - if ( $this->serverInfos ) { - if ( !isset( $this->conns[$shardIndex] ) ) { - // Use custom database defined by server connection info - $info = $this->serverInfos[$shardIndex]; - $type = $info['type'] ?? 'mysql'; - $host = $info['host'] ?? '[unknown]'; - $this->logger->debug( __CLASS__ . ": connecting to $host" ); - $conn = Database::factory( $type, $info ); - $conn->clearFlag( DBO_TRX ); // auto-commit mode - $this->conns[$shardIndex] = $conn; - // Automatically create the objectcache table for sqlite as needed - if ( $conn->getType() === 'sqlite' ) { - $this->initSqliteDatabase( $conn ); - } + if ( $shardIndex === self::SHARD_LOCAL ) { + $conn = $this->getConnectionViaLoadBalancer( $shardIndex ); + } elseif ( $shardIndex === self::SHARD_GLOBAL ) { + $conn = $this->getConnectionViaLoadBalancer( $shardIndex ); + } elseif ( is_int( $shardIndex ) ) { + if ( isset( $this->serverInfos[$shardIndex] ) ) { + $server = $this->serverInfos[$shardIndex]; + $conn = $this->getConnectionFromServerInfo( $shardIndex, $server ); + } else { + throw new UnexpectedValueException( "Invalid server index #$shardIndex" ); } - $conn = $this->conns[$shardIndex]; } else { - // Use the main LB database - $lb = MediaWikiServices::getInstance()->getDBLoadBalancer(); - $index = $this->replicaOnly ? DB_REPLICA : DB_MASTER; - // If the RDBMS has row-level locking, use the autocommit connection to avoid - // contention and deadlocks. Do not do this if it only has DB-level locking since - // that would just cause deadlocks. - $attribs = $lb->getServerAttributes( $lb->getWriterIndex() ); - $flags = $attribs[Database::ATTR_DB_LEVEL_LOCKING] ? 0 : $lb::CONN_TRX_AUTOCOMMIT; - $conn = $lb->getMaintenanceConnectionRef( $index, [], false, $flags ); + throw new UnexpectedValueException( "Invalid server index '$shardIndex'" ); } - $this->logger->debug( sprintf( "Connection %s will be used for SqlBagOStuff", $conn ) ); - return $conn; } /** * Get the server index and table name for a given key * @param string $key - * @return array Server index and table name + * @return array (server index or self::SHARD_LOCAL/self::SHARD_GLOBAL, table name) */ - private function getTableByKey( $key ) { + private function getKeyLocation( $key ) { + if ( $this->serverTags ) { + // Striped array of database servers + if ( count( $this->serverTags ) == 1 ) { + $shardIndex = 0; // short-circuit + } else { + $sortedServers = $this->serverTags; + ArrayUtils::consistentHashSort( $sortedServers, $key ); + reset( $sortedServers ); + $shardIndex = key( $sortedServers ); + } + } else { + // LoadBalancer based configuration + $shardIndex = ( strpos( $key, 'global:' ) === 0 && $this->globalKeyLb ) + ? self::SHARD_GLOBAL + : self::SHARD_LOCAL; + } + if ( $this->numTableShards > 1 ) { $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff; $tableIndex = $hash % $this->numTableShards; } else { - $tableIndex = 0; - } - if ( $this->numServerShards > 1 ) { - $sortedServers = $this->serverTags; - ArrayUtils::consistentHashSort( $sortedServers, $key ); - reset( $sortedServers ); - $shardIndex = key( $sortedServers ); - } else { - $shardIndex = 0; + $tableIndex = null; } + return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ]; } /** * Get the table name for a given shard index - * @param int $index + * @param int|null $index * @return string */ private function getTableNameByShard( $index ) { - if ( $this->numTableShards > 1 ) { + if ( $index !== null && $this->numTableShards > 1 ) { $decimals = strlen( $this->numTableShards - 1 ); - return $this->tableName . - sprintf( "%0{$decimals}d", $index ); - } else { - return $this->tableName; + + return $this->tableName . sprintf( "%0{$decimals}d", $index ); } + + return $this->tableName; } protected function doGet( $key, $flags = 0, &$casToken = null ) { @@ -279,21 +286,22 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { return $values; } - private function fetchBlobMulti( array $keys, $flags = 0 ) { + private function fetchBlobMulti( array $keys ) { $values = []; // array of (key => value) - $keysByTable = []; + $keysByTableByShardIndex = []; foreach ( $keys as $key ) { - list( $shardIndex, $tableName ) = $this->getTableByKey( $key ); - $keysByTable[$shardIndex][$tableName][] = $key; + list( $shardIndex, $tableName ) = $this->getKeyLocation( $key ); + $keysByTableByShardIndex[$shardIndex][$tableName][] = $key; } $dataRows = []; - foreach ( $keysByTable as $shardIndex => $serverKeys ) { + foreach ( $keysByTableByShardIndex as $shardIndex => $serverKeys ) { try { $db = $this->getConnection( $shardIndex ); foreach ( $serverKeys as $tableName => $tableKeys ) { - $res = $db->select( $tableName, + $res = $db->select( + $tableName, [ 'keyname', 'value', 'exptime' ], [ 'keyname' => $tableKeys ], __METHOD__, @@ -351,10 +359,10 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { * @return bool */ private function modifyMulti( array $data, $exptime, $flags, $op ) { - $keysByTable = []; + $keysByTableByShardIndex = []; foreach ( $data as $key => $value ) { - list( $shardIndex, $tableName ) = $this->getTableByKey( $key ); - $keysByTable[$shardIndex][$tableName][] = $key; + list( $shardIndex, $tableName ) = $this->getKeyLocation( $key ); + $keysByTableByShardIndex[$shardIndex][$tableName][] = $key; } $exptime = $this->getExpirationAsTimestamp( $exptime ); @@ -362,7 +370,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { $result = true; /** @noinspection PhpUnusedLocalVariableInspection */ $silenceScope = $this->silenceTransactionProfiler(); - foreach ( $keysByTable as $shardIndex => $serverKeys ) { + foreach ( $keysByTableByShardIndex as $shardIndex => $serverKeys ) { $db = null; // in case of connection failure try { $db = $this->getConnection( $shardIndex ); @@ -376,7 +384,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { foreach ( $serverKeys as $tableName => $tableKeys ) { try { - $result = $this->updateTableKeys( + $result = $this->updateTable( $op, $db, $tableName, @@ -393,7 +401,9 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { } if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) { - $result = $this->waitForReplication() && $result; + foreach ( $keysByTableByShardIndex as $shardIndex => $unused ) { + $result = $this->waitForReplication( $shardIndex ) && $result; + } } return $result; @@ -410,7 +420,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { * @throws DBError * @throws InvalidArgumentException */ - private function updateTableKeys( $op, $db, $table, $tableKeys, $data, $dbExpiry ) { + private function updateTable( $op, $db, $table, $tableKeys, $data, $dbExpiry ) { $success = true; if ( $op === self::$OP_ADD ) { @@ -473,7 +483,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { } protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) { - list( $shardIndex, $tableName ) = $this->getTableByKey( $key ); + list( $shardIndex, $tableName ) = $this->getKeyLocation( $key ); $exptime = $this->getExpirationAsTimestamp( $exptime ); /** @noinspection PhpUnusedLocalVariableInspection */ @@ -507,7 +517,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { $success = (bool)$db->affectedRows(); if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) { - $success = $this->waitForReplication() && $success; + $success = $this->waitForReplication( $shardIndex ) && $success; } return $success; @@ -527,7 +537,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { } public function incr( $key, $step = 1, $flags = 0 ) { - list( $shardIndex, $tableName ) = $this->getTableByKey( $key ); + list( $shardIndex, $tableName ) = $this->getKeyLocation( $key ); $newCount = false; /** @noinspection PhpUnusedLocalVariableInspection */ @@ -643,7 +653,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { /** @noinspection PhpUnusedLocalVariableInspection */ $silenceScope = $this->silenceTransactionProfiler(); - $shardIndexes = range( 0, $this->numServerShards - 1 ); + $shardIndexes = $this->getServerShardIndexes(); shuffle( $shardIndexes ); $ok = true; @@ -688,15 +698,15 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { &$keysDeletedCount = 0 ) { $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp ); - $shardIndexes = range( 0, $this->numTableShards - 1 ); - shuffle( $shardIndexes ); + $tableIndexes = range( 0, $this->numTableShards - 1 ); + shuffle( $tableIndexes ); - foreach ( $shardIndexes as $numShardsDone => $shardIndex ) { + foreach ( $tableIndexes as $numShardsDone => $tableIndex ) { $continue = null; // last exptime $lag = null; // purge lag do { $res = $db->select( - $this->getTableNameByShard( $shardIndex ), + $this->getTableNameByShard( $tableIndex ), [ 'keyname', 'exptime' ], array_merge( [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ], @@ -720,7 +730,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { } $db->delete( - $this->getTableNameByShard( $shardIndex ), + $this->getTableNameByShard( $tableIndex ), [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ), 'keyname' => $keys @@ -735,7 +745,8 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { $continueUnix = ConvertibleTimestamp::convert( TS_UNIX, $continue ); $remainingLag = $cutoffUnix - $continueUnix; $processedLag = max( $lag - $remainingLag, 0 ); - $doneRatio = ( $numShardsDone + $processedLag / $lag ) / $this->numTableShards; + $doneRatio = + ( $numShardsDone + $processedLag / $lag ) / $this->numTableShards; } else { $doneRatio = 1; } @@ -756,7 +767,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { public function deleteAll() { /** @noinspection PhpUnusedLocalVariableInspection */ $silenceScope = $this->silenceTransactionProfiler(); - for ( $shardIndex = 0; $shardIndex < $this->numServerShards; $shardIndex++ ) { + foreach ( $this->getServerShardIndexes() as $shardIndex ) { $db = null; // in case of connection failure try { $db = $this->getConnection( $shardIndex ); @@ -782,7 +793,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { } } - list( $shardIndex ) = $this->getTableByKey( $key ); + list( $shardIndex ) = $this->getKeyLocation( $key ); $db = null; // in case of connection failure try { @@ -814,7 +825,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { if ( --$this->locks[$key]['depth'] <= 0 ) { unset( $this->locks[$key] ); - list( $shardIndex ) = $this->getTableByKey( $key ); + list( $shardIndex ) = $this->getKeyLocation( $key ); $db = null; // in case of connection failure try { @@ -915,6 +926,56 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { return unserialize( $serial ); } + /** + * @param string $shardIndex self::SHARD_LOCAL/self::SHARD_GLOBAL + * @return IMaintainableDatabase + * @throws DBError + */ + private function getConnectionViaLoadBalancer( $shardIndex ) { + $lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb : $this->globalKeyLb; + if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) { + // Use the main connection to avoid transaction deadlocks + $conn = $lb->getMaintenanceConnectionRef( DB_MASTER ); + } else { + // If the RDBMs has row/table/page level locking, then use separate auto-commit + // connection to avoid needless contention and deadlocks. + $conn = $lb->getMaintenanceConnectionRef( + $this->replicaOnly ? DB_REPLICA : DB_MASTER, [], + false, + $lb::CONN_TRX_AUTOCOMMIT + ); + } + + return $conn; + } + + /** + * @param int $shardIndex + * @param array $server Server config map + * @return IMaintainableDatabase + * @throws DBError + */ + private function getConnectionFromServerInfo( $shardIndex, array $server ) { + if ( !isset( $this->conns[$shardIndex] ) ) { + /** @var IMaintainableDatabase Auto-commit connection to the server */ + $conn = Database::factory( $server['type'], array_merge( + $server, + [ + 'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX, + 'connLogger' => $this->logger, + 'queryLogger' => $this->logger + ] + ) ); + // Automatically create the objectcache table for sqlite as needed + if ( $conn->getType() === 'sqlite' && !$conn->tableExists( 'objectcache' ) ) { + $this->initSqliteDatabase( $conn ); + } + $this->conns[$shardIndex] = $conn; + } + + return $this->conns[$shardIndex]; + } + /** * Handle a DBError which occurred during a read operation. * @@ -1017,7 +1078,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { * Create the shard tables on all databases (e.g. via eval.php/shell.php) */ public function createTables() { - for ( $shardIndex = 0; $shardIndex < $this->numServerShards; $shardIndex++ ) { + foreach ( $this->getServerShardIndexes() as $shardIndex ) { $db = $this->getConnection( $shardIndex ); if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) { for ( $i = 0; $i < $this->numTableShards; $i++ ) { @@ -1030,25 +1091,44 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { } /** - * @return bool Whether the main DB is used, e.g. wfGetDB( DB_MASTER ) + * @return string[]|int[] List of server indexes or self::SHARD_LOCAL/self::SHARD_GLOBAL */ - private function usesMainDB() { - return !$this->serverInfos; + private function getServerShardIndexes() { + if ( $this->serverTags ) { + // Striped array of database servers + $shardIndexes = range( 0, $this->numServerShards - 1 ); + } else { + // LoadBalancer based configuration + $shardIndexes = []; + if ( $this->localKeyLb ) { + $shardIndexes[] = self::SHARD_LOCAL; + } + if ( $this->globalKeyLb ) { + $shardIndexes[] = self::SHARD_GLOBAL; + } + } + + return $shardIndexes; } - private function waitForReplication() { - if ( !$this->usesMainDB() ) { - // Custom DB server list; probably doesn't use replication + /** + * Wait for replica DBs to catch up to the master DB + * + * @param int|string $shardIndex Server index or self::SHARD_LOCAL/self::SHARD_GLOBAL + * @return bool Success + */ + private function waitForReplication( $shardIndex ) { + if ( is_int( $shardIndex ) ) { + return true; // striped only, no LoadBalancer + } + + $lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb : $this->globalKeyLb; + if ( !$lb->hasStreamingReplicaServers() ) { return true; } - $lb = MediaWikiServices::getInstance()->getDBLoadBalancer(); - if ( $lb->getServerCount() <= 1 ) { - return true; // no replica DBs - } - - // Main LB is used; wait for any replica DBs to catch up try { + // Wait for any replica DBs to catch up $masterPos = $lb->getMasterPos(); if ( !$masterPos ) { return true; // not applicable @@ -1076,10 +1156,8 @@ class SqlBagOStuff extends MediumSpecificBagOStuff { * @return ScopedCallback|null */ private function silenceTransactionProfiler() { - if ( !$this->usesMainDB() ) { - // Custom DB is configured which either has no TransactionProfiler injected, - // or has one specific for cache use, which we shouldn't silence - return null; + if ( $this->serverInfos ) { + return null; // no TransactionProfiler injected anyway } $trxProfiler = Profiler::instance()->getTransactionProfiler(); diff --git a/tests/phpunit/includes/objectcache/SqlBagOStuffTest.php b/tests/phpunit/includes/objectcache/SqlBagOStuffTest.php index 9ab1edc8be4..f7b748d28dd 100644 --- a/tests/phpunit/includes/objectcache/SqlBagOStuffTest.php +++ b/tests/phpunit/includes/objectcache/SqlBagOStuffTest.php @@ -32,7 +32,10 @@ class SqlBagOStuffTest extends MediaWikiIntegrationTestCase { array $components, string $expected ) { - $cache = new SqlBagOStuff( [ 'keyspace' => $keyspace ] ); + $cache = new SqlBagOStuff( [ + 'keyspace' => $keyspace, + 'servers' => [] + ] ); $this->assertSame( $expected, $cache->makeKey( $class, ...$components ) ); } }