objectcache: dependency inject LoadBalancer into SqlBagOStuff

Clean up the recursive DB dependency mitigation logic by having
ServiceContainer detect recursion and throw an appropriate error.
Catch the error and use EmptyBagOStuff in such cases. This works
better than checking getQoS() since that begs the question by
requiring the cache instance to begin with.

Also add support for using different LoadBalancer instances for
local and global keys in SqlBagOStuff. This makes it easier to
share keys between projects.

Bug: T229062
Change-Id: Ib8ec1845bcf1b86cbb3bededa0ca7621a0ca293a
This commit is contained in:
Aaron Schulz 2019-11-04 14:02:33 -08:00
parent a5a6cc3495
commit 6a8943d8c5
5 changed files with 261 additions and 183 deletions

View file

@ -104,6 +104,7 @@ use Wikimedia\DependencyStore\KeyValueDependencyStore;
use Wikimedia\DependencyStore\SqlModuleDependencyStore;
use Wikimedia\Message\IMessageFormatterFactory;
use Wikimedia\ObjectFactory;
use Wikimedia\Services\RecursiveServiceDependencyException;
use Wikimedia\UUID\GlobalIdGenerator;
return [
@ -245,13 +246,30 @@ return [
function ( MediaWikiServices $services ) : Wikimedia\Rdbms\LBFactory {
$mainConfig = $services->getMainConfig();
try {
$stash = $services->getMainObjectStash();
} catch ( RecursiveServiceDependencyException $e ) {
$stash = new EmptyBagOStuff(); // T141804: handle cases like CACHE_DB
}
if ( $stash instanceof EmptyBagOStuff ) {
// Use process cache if the main stash is disabled or there was recursion
$stash = new HashBagOStuff( [ 'maxKeys' => 100 ] );
}
try {
$wanCache = $services->getMainWANObjectCache();
} catch ( RecursiveServiceDependencyException $e ) {
$wanCache = WANObjectCache::newEmpty(); // T141804: handle cases like CACHE_DB
}
$lbConf = MWLBFactory::applyDefaultConfig(
$mainConfig->get( 'LBFactoryConf' ),
new ServiceOptions( MWLBFactory::APPLY_DEFAULT_CONFIG_OPTIONS, $mainConfig ),
$services->getConfiguredReadOnlyMode(),
$services->getLocalServerObjectCache(),
$services->getMainObjectStash(),
$services->getMainWANObjectCache()
$stash,
$wanCache
);
$class = MWLBFactory::getLBFactoryClass( $lbConf );

View file

@ -157,7 +157,9 @@ abstract class MWLBFactory {
$options->get( 'DBprefix' )
);
$lbConf = self::injectObjectCaches( $lbConf, $srvCache, $mainStash, $wanCache );
$lbConf['srvCache'] = $srvCache;
$lbConf['memStash'] = $mainStash;
$lbConf['wanCache'] = $wanCache;
return $lbConf;
}
@ -214,35 +216,6 @@ abstract class MWLBFactory {
return $server;
}
/**
* @param array $lbConf
* @param BagOStuff $sCache
* @param BagOStuff $mStash
* @param WANObjectCache $wCache
* @return array
*/
private static function injectObjectCaches(
array $lbConf, BagOStuff $sCache, BagOStuff $mStash, WANObjectCache $wCache
) {
// Fallback if APC style caching is not an option
if ( $sCache instanceof EmptyBagOStuff ) {
$sCache = new HashBagOStuff( [ 'maxKeys' => 100 ] );
}
// Use APC/memcached style caching, but avoids loops with CACHE_DB (T141804)
if ( $sCache->getQoS( $sCache::ATTR_EMULATION ) > $sCache::QOS_EMULATION_SQL ) {
$lbConf['srvCache'] = $sCache;
}
if ( $mStash->getQoS( $mStash::ATTR_EMULATION ) > $mStash::QOS_EMULATION_SQL ) {
$lbConf['memStash'] = $mStash;
}
if ( $wCache->getQoS( $wCache::ATTR_EMULATION ) > $wCache::QOS_EMULATION_SQL ) {
$lbConf['wanCache'] = $wCache;
}
return $lbConf;
}
/**
* @param array $servers
* @param string $ldDB Local domain database name

View file

@ -175,6 +175,12 @@ class ObjectCache {
$server['dbDirectory'] = $conf->get( 'SQLiteDataDir' );
}
}
} elseif ( !isset( $params['localKeyLB'] ) ) {
$params['localKeyLB'] = [
'factory' => function () {
return MediaWikiServices::getInstance()->getDBLoadBalancer();
}
];
}
}

View file

@ -21,13 +21,14 @@
* @ingroup Cache
*/
use MediaWiki\MediaWikiServices;
use Wikimedia\AtEase\AtEase;
use Wikimedia\ObjectFactory;
use Wikimedia\Rdbms\Database;
use Wikimedia\Rdbms\DBConnectionError;
use Wikimedia\Rdbms\DBError;
use Wikimedia\Rdbms\DBQueryError;
use Wikimedia\Rdbms\IDatabase;
use Wikimedia\Rdbms\ILoadBalancer;
use Wikimedia\Rdbms\IMaintainableDatabase;
use Wikimedia\ScopedCallback;
use Wikimedia\Timestamp\ConvertibleTimestamp;
@ -39,11 +40,16 @@ use Wikimedia\WaitConditionLoop;
* @ingroup Cache
*/
class SqlBagOStuff extends MediumSpecificBagOStuff {
/** @var ILoadBalancer|null */
protected $localKeyLb;
/** @var ILoadBalancer|null */
protected $globalKeyLb;
/** @var array[] (server index => server config) */
protected $serverInfos;
protected $serverInfos = [];
/** @var string[] (server index => tag/host name) */
protected $serverTags;
/** @var int */
protected $serverTags = [];
/** @var int Number of database servers shards (e.g. horizontal/vertical partitions) */
protected $numServerShards;
/** @var int UNIX timestamp */
protected $lastGarbageCollect = 0;
@ -58,11 +64,11 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
/** @var bool */
protected $replicaOnly = false;
/** @var array */
/** @var IMaintainableDatabase[] Map of (shard index => DB handle) */
protected $conns;
/** @var array UNIX timestamps */
/** @var float[] Map of (shard index => UNIX timestamps) */
protected $connFailureTimes = [];
/** @var array Exceptions */
/** @var Exception[] Map of (shard index => Exception) */
protected $connFailureErrors = [];
/** @var int */
@ -77,42 +83,44 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
/** @var string */
private static $OP_DELETE = 'delete';
/** @var string */
private const SHARD_LOCAL = 'local';
/** @var string */
private const SHARD_GLOBAL = 'global';
/**
* Constructor. Parameters are:
* - server: A server info structure in the format required by each
* element in $wgDBServers.
*
* - servers: An array of server info structures describing a set of database servers
* to distribute keys to. If this is specified, the "server" option will be
* ignored. If string keys are used, then they will be used for consistent
* hashing *instead* of the host name (from the server config). This is useful
* when a cluster is replicated to another site (with different host names)
* but each server has a corresponding replica in the other cluster.
*
* - purgePeriod: The average number of object cache writes in between
* garbage collection operations, where expired entries
* are removed from the database. Or in other words, the
* reciprocal of the probability of purging on any given
* write. If this is set to zero, purging will never be done.
*
* - purgeLimit: Maximum number of rows to purge at once.
*
* - tableName: The table name to use, default is "objectcache".
*
* - shards: The number of tables to use for data storage on each server.
* If this is more than 1, table names will be formed in the style
* objectcacheNNN where NNN is the shard index, between 0 and
* shards-1. The number of digits will be the minimum number
* required to hold the largest shard index. Data will be
* distributed across all tables by key hash. This is for
* MySQL bugs 61735 <https://bugs.mysql.com/bug.php?id=61735>
* and 61736 <https://bugs.mysql.com/bug.php?id=61736>.
*
* - replicaOnly: Whether to only use replica DBs and avoid triggering
* garbage collection logic of expired items. This only
* makes sense if the primary DB is used and only if get()
* calls will be used. This is used by ReplicatedBagOStuff.
* - syncTimeout: Max seconds to wait for replica DBs to catch up for WRITE_SYNC.
* - server: Server config map for Database::factory() that describes the database
* to use for all key operations. This overrides "localKeyLB" and "globalKeyLB".
* - servers: Array of server config maps, each for Database::factory(), describing a set
* of database servers to distribute keys to. If this is specified, the "server" option
* will be ignored. If string keys are used, then they will be used for consistent
* hashing *instead* of the host name (from the server config). This is useful
* when a cluster is replicated to another site (with different host names)
* but each server has a corresponding replica in the other cluster.
* - localKeyLB: ObjectFactory::getObjectFromSpec array yeilding ILoadBalancer.
* This load balancer is used for local keys, e.g. those using makeKey().
* This is overriden by 'server'/'servers'.
* - globalKeyLB: ObjectFactory::getObjectFromSpec array yeilding ILoadBalancer.
* This load balancer is used for local keys, e.g. those using makeGlobalKey().
* This is overriden by 'server'/'servers'.
* - purgePeriod: The average number of object cache writes in between arbage collection
* operations, where expired entries are removed from the database. Or in other words,
* the reciprocal of the probability of purging on any given write. If this is set to
* zero, purging will never be done. [optional]
* - purgeLimit: Maximum number of rows to purge at once. [optional]
* - tableName: The table name to use, default is "objectcache". [optional]
* - shards: The number of tables to use for data storage on each server.
* If this is more than 1, table names will be formed in the style objectcacheNNN where
* NNN is the shard index, between 0 and shards-1. The number of digits will be the
* minimum number required to hold the largest shard index. Data will be distributed
* across all tables by key hash. This is for MySQL bugs 61735
* <https://bugs.mysql.com/bug.php?id=61735> and 61736
* <https://bugs.mysql.com/bug.php?id=61736>. [optional]
* - replicaOnly: Whether to only use replica DBs and avoid triggering garbage collection
* logic of expired items. This only makes sense if the primary DB is used and only if
* get() calls will be used. This is used by ReplicatedBagOStuff. [optional]
* - syncTimeout: Max seconds to wait for replica DBs to catch up for WRITE_SYNC. [optional]
*
* @param array $params
*/
@ -120,29 +128,36 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
parent::__construct( $params );
$this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL;
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
if ( isset( $params['servers'] ) ) {
$this->serverInfos = [];
$this->serverTags = [];
$this->numServerShards = count( $params['servers'] );
if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
$index = 0;
foreach ( $params['servers'] as $tag => $info ) {
foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
$this->serverInfos[$index] = $info;
if ( is_string( $tag ) ) {
$this->serverTags[$index] = $tag;
} else {
$this->serverTags[$index] = $info['host'] ?? "#$index";
}
$this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
++$index;
}
} elseif ( isset( $params['server'] ) ) {
$this->serverInfos = [ $params['server'] ];
// Horizontal partitioning by key hash (if any)
$this->numServerShards = count( $this->serverInfos );
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE;
} else {
// Default to using the main wiki's database servers
$this->serverInfos = [];
$this->numServerShards = 1;
if ( isset( $params['localKeyLB'] ) ) {
$this->localKeyLb = ( $params['localKeyLB'] instanceof ILoadBalancer )
? $params['localKeyLB']
: ObjectFactory::getObjectFromSpec( $params['localKeyLB'] );
}
if ( isset( $params['globalKeyLB'] ) ) {
$this->globalKeyLb = ( $params['globalKeyLB'] instanceof ILoadBalancer )
? $params['globalKeyLB']
: ObjectFactory::getObjectFromSpec( $params['globalKeyLB'] );
}
$this->localKeyLb = $this->localKeyLb ?: $this->globalKeyLb;
if ( !$this->localKeyLb ) {
throw new InvalidArgumentException(
"Config requires 'server', 'servers', or 'localKeyLB'/'globalKeyLB'"
);
}
// Verticle partitioning by global vs local keys (if any)
$this->numServerShards = ( $this->localKeyLb === $this->globalKeyLb ) ? 1 : 2;
$this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE;
}
if ( isset( $params['purgePeriod'] ) ) {
@ -164,16 +179,13 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
/**
* Get a connection to the specified database
*
* @param int $shardIndex
* @param int|string $shardIndex Server index or self::SHARD_LOCAL/self::SHARD_GLOBAL
* @return IMaintainableDatabase
* @throws MWException
* @throws DBConnectionError
* @throws UnexpectedValueException
*/
private function getConnection( $shardIndex ) {
if ( $shardIndex >= $this->numServerShards ) {
throw new MWException( __METHOD__ . ": Invalid server index \"$shardIndex\"" );
}
# Don't keep timing out trying to connect for each call if the DB is down
// Don't keep timing out trying to connect if the server is down
if (
isset( $this->connFailureErrors[$shardIndex] ) &&
( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
@ -181,75 +193,70 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
throw $this->connFailureErrors[$shardIndex];
}
if ( $this->serverInfos ) {
if ( !isset( $this->conns[$shardIndex] ) ) {
// Use custom database defined by server connection info
$info = $this->serverInfos[$shardIndex];
$type = $info['type'] ?? 'mysql';
$host = $info['host'] ?? '[unknown]';
$this->logger->debug( __CLASS__ . ": connecting to $host" );
$conn = Database::factory( $type, $info );
$conn->clearFlag( DBO_TRX ); // auto-commit mode
$this->conns[$shardIndex] = $conn;
// Automatically create the objectcache table for sqlite as needed
if ( $conn->getType() === 'sqlite' ) {
$this->initSqliteDatabase( $conn );
}
if ( $shardIndex === self::SHARD_LOCAL ) {
$conn = $this->getConnectionViaLoadBalancer( $shardIndex );
} elseif ( $shardIndex === self::SHARD_GLOBAL ) {
$conn = $this->getConnectionViaLoadBalancer( $shardIndex );
} elseif ( is_int( $shardIndex ) ) {
if ( isset( $this->serverInfos[$shardIndex] ) ) {
$server = $this->serverInfos[$shardIndex];
$conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
} else {
throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
}
$conn = $this->conns[$shardIndex];
} else {
// Use the main LB database
$lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
$index = $this->replicaOnly ? DB_REPLICA : DB_MASTER;
// If the RDBMS has row-level locking, use the autocommit connection to avoid
// contention and deadlocks. Do not do this if it only has DB-level locking since
// that would just cause deadlocks.
$attribs = $lb->getServerAttributes( $lb->getWriterIndex() );
$flags = $attribs[Database::ATTR_DB_LEVEL_LOCKING] ? 0 : $lb::CONN_TRX_AUTOCOMMIT;
$conn = $lb->getMaintenanceConnectionRef( $index, [], false, $flags );
throw new UnexpectedValueException( "Invalid server index '$shardIndex'" );
}
$this->logger->debug( sprintf( "Connection %s will be used for SqlBagOStuff", $conn ) );
return $conn;
}
/**
* Get the server index and table name for a given key
* @param string $key
* @return array Server index and table name
* @return array (server index or self::SHARD_LOCAL/self::SHARD_GLOBAL, table name)
*/
private function getTableByKey( $key ) {
private function getKeyLocation( $key ) {
if ( $this->serverTags ) {
// Striped array of database servers
if ( count( $this->serverTags ) == 1 ) {
$shardIndex = 0; // short-circuit
} else {
$sortedServers = $this->serverTags;
ArrayUtils::consistentHashSort( $sortedServers, $key );
reset( $sortedServers );
$shardIndex = key( $sortedServers );
}
} else {
// LoadBalancer based configuration
$shardIndex = ( strpos( $key, 'global:' ) === 0 && $this->globalKeyLb )
? self::SHARD_GLOBAL
: self::SHARD_LOCAL;
}
if ( $this->numTableShards > 1 ) {
$hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
$tableIndex = $hash % $this->numTableShards;
} else {
$tableIndex = 0;
}
if ( $this->numServerShards > 1 ) {
$sortedServers = $this->serverTags;
ArrayUtils::consistentHashSort( $sortedServers, $key );
reset( $sortedServers );
$shardIndex = key( $sortedServers );
} else {
$shardIndex = 0;
$tableIndex = null;
}
return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
}
/**
* Get the table name for a given shard index
* @param int $index
* @param int|null $index
* @return string
*/
private function getTableNameByShard( $index ) {
if ( $this->numTableShards > 1 ) {
if ( $index !== null && $this->numTableShards > 1 ) {
$decimals = strlen( $this->numTableShards - 1 );
return $this->tableName .
sprintf( "%0{$decimals}d", $index );
} else {
return $this->tableName;
return $this->tableName . sprintf( "%0{$decimals}d", $index );
}
return $this->tableName;
}
protected function doGet( $key, $flags = 0, &$casToken = null ) {
@ -279,21 +286,22 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
return $values;
}
private function fetchBlobMulti( array $keys, $flags = 0 ) {
private function fetchBlobMulti( array $keys ) {
$values = []; // array of (key => value)
$keysByTable = [];
$keysByTableByShardIndex = [];
foreach ( $keys as $key ) {
list( $shardIndex, $tableName ) = $this->getTableByKey( $key );
$keysByTable[$shardIndex][$tableName][] = $key;
list( $shardIndex, $tableName ) = $this->getKeyLocation( $key );
$keysByTableByShardIndex[$shardIndex][$tableName][] = $key;
}
$dataRows = [];
foreach ( $keysByTable as $shardIndex => $serverKeys ) {
foreach ( $keysByTableByShardIndex as $shardIndex => $serverKeys ) {
try {
$db = $this->getConnection( $shardIndex );
foreach ( $serverKeys as $tableName => $tableKeys ) {
$res = $db->select( $tableName,
$res = $db->select(
$tableName,
[ 'keyname', 'value', 'exptime' ],
[ 'keyname' => $tableKeys ],
__METHOD__,
@ -351,10 +359,10 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
* @return bool
*/
private function modifyMulti( array $data, $exptime, $flags, $op ) {
$keysByTable = [];
$keysByTableByShardIndex = [];
foreach ( $data as $key => $value ) {
list( $shardIndex, $tableName ) = $this->getTableByKey( $key );
$keysByTable[$shardIndex][$tableName][] = $key;
list( $shardIndex, $tableName ) = $this->getKeyLocation( $key );
$keysByTableByShardIndex[$shardIndex][$tableName][] = $key;
}
$exptime = $this->getExpirationAsTimestamp( $exptime );
@ -362,7 +370,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
$result = true;
/** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
foreach ( $keysByTable as $shardIndex => $serverKeys ) {
foreach ( $keysByTableByShardIndex as $shardIndex => $serverKeys ) {
$db = null; // in case of connection failure
try {
$db = $this->getConnection( $shardIndex );
@ -376,7 +384,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
foreach ( $serverKeys as $tableName => $tableKeys ) {
try {
$result = $this->updateTableKeys(
$result = $this->updateTable(
$op,
$db,
$tableName,
@ -393,7 +401,9 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
}
if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) {
$result = $this->waitForReplication() && $result;
foreach ( $keysByTableByShardIndex as $shardIndex => $unused ) {
$result = $this->waitForReplication( $shardIndex ) && $result;
}
}
return $result;
@ -410,7 +420,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
* @throws DBError
* @throws InvalidArgumentException
*/
private function updateTableKeys( $op, $db, $table, $tableKeys, $data, $dbExpiry ) {
private function updateTable( $op, $db, $table, $tableKeys, $data, $dbExpiry ) {
$success = true;
if ( $op === self::$OP_ADD ) {
@ -473,7 +483,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
}
protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
list( $shardIndex, $tableName ) = $this->getTableByKey( $key );
list( $shardIndex, $tableName ) = $this->getKeyLocation( $key );
$exptime = $this->getExpirationAsTimestamp( $exptime );
/** @noinspection PhpUnusedLocalVariableInspection */
@ -507,7 +517,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
$success = (bool)$db->affectedRows();
if ( $this->fieldHasFlags( $flags, self::WRITE_SYNC ) ) {
$success = $this->waitForReplication() && $success;
$success = $this->waitForReplication( $shardIndex ) && $success;
}
return $success;
@ -527,7 +537,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
}
public function incr( $key, $step = 1, $flags = 0 ) {
list( $shardIndex, $tableName ) = $this->getTableByKey( $key );
list( $shardIndex, $tableName ) = $this->getKeyLocation( $key );
$newCount = false;
/** @noinspection PhpUnusedLocalVariableInspection */
@ -643,7 +653,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
/** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
$shardIndexes = range( 0, $this->numServerShards - 1 );
$shardIndexes = $this->getServerShardIndexes();
shuffle( $shardIndexes );
$ok = true;
@ -688,15 +698,15 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
&$keysDeletedCount = 0
) {
$cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
$shardIndexes = range( 0, $this->numTableShards - 1 );
shuffle( $shardIndexes );
$tableIndexes = range( 0, $this->numTableShards - 1 );
shuffle( $tableIndexes );
foreach ( $shardIndexes as $numShardsDone => $shardIndex ) {
foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
$continue = null; // last exptime
$lag = null; // purge lag
do {
$res = $db->select(
$this->getTableNameByShard( $shardIndex ),
$this->getTableNameByShard( $tableIndex ),
[ 'keyname', 'exptime' ],
array_merge(
[ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
@ -720,7 +730,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
}
$db->delete(
$this->getTableNameByShard( $shardIndex ),
$this->getTableNameByShard( $tableIndex ),
[
'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
'keyname' => $keys
@ -735,7 +745,8 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
$continueUnix = ConvertibleTimestamp::convert( TS_UNIX, $continue );
$remainingLag = $cutoffUnix - $continueUnix;
$processedLag = max( $lag - $remainingLag, 0 );
$doneRatio = ( $numShardsDone + $processedLag / $lag ) / $this->numTableShards;
$doneRatio =
( $numShardsDone + $processedLag / $lag ) / $this->numTableShards;
} else {
$doneRatio = 1;
}
@ -756,7 +767,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
public function deleteAll() {
/** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
for ( $shardIndex = 0; $shardIndex < $this->numServerShards; $shardIndex++ ) {
foreach ( $this->getServerShardIndexes() as $shardIndex ) {
$db = null; // in case of connection failure
try {
$db = $this->getConnection( $shardIndex );
@ -782,7 +793,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
}
}
list( $shardIndex ) = $this->getTableByKey( $key );
list( $shardIndex ) = $this->getKeyLocation( $key );
$db = null; // in case of connection failure
try {
@ -814,7 +825,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
if ( --$this->locks[$key]['depth'] <= 0 ) {
unset( $this->locks[$key] );
list( $shardIndex ) = $this->getTableByKey( $key );
list( $shardIndex ) = $this->getKeyLocation( $key );
$db = null; // in case of connection failure
try {
@ -915,6 +926,56 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
return unserialize( $serial );
}
/**
* @param string $shardIndex self::SHARD_LOCAL/self::SHARD_GLOBAL
* @return IMaintainableDatabase
* @throws DBError
*/
private function getConnectionViaLoadBalancer( $shardIndex ) {
$lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb : $this->globalKeyLb;
if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
// Use the main connection to avoid transaction deadlocks
$conn = $lb->getMaintenanceConnectionRef( DB_MASTER );
} else {
// If the RDBMs has row/table/page level locking, then use separate auto-commit
// connection to avoid needless contention and deadlocks.
$conn = $lb->getMaintenanceConnectionRef(
$this->replicaOnly ? DB_REPLICA : DB_MASTER, [],
false,
$lb::CONN_TRX_AUTOCOMMIT
);
}
return $conn;
}
/**
* @param int $shardIndex
* @param array $server Server config map
* @return IMaintainableDatabase
* @throws DBError
*/
private function getConnectionFromServerInfo( $shardIndex, array $server ) {
if ( !isset( $this->conns[$shardIndex] ) ) {
/** @var IMaintainableDatabase Auto-commit connection to the server */
$conn = Database::factory( $server['type'], array_merge(
$server,
[
'flags' => ( $server['flags'] ?? 0 ) & ~IDatabase::DBO_TRX,
'connLogger' => $this->logger,
'queryLogger' => $this->logger
]
) );
// Automatically create the objectcache table for sqlite as needed
if ( $conn->getType() === 'sqlite' && !$conn->tableExists( 'objectcache' ) ) {
$this->initSqliteDatabase( $conn );
}
$this->conns[$shardIndex] = $conn;
}
return $this->conns[$shardIndex];
}
/**
* Handle a DBError which occurred during a read operation.
*
@ -1017,7 +1078,7 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
* Create the shard tables on all databases (e.g. via eval.php/shell.php)
*/
public function createTables() {
for ( $shardIndex = 0; $shardIndex < $this->numServerShards; $shardIndex++ ) {
foreach ( $this->getServerShardIndexes() as $shardIndex ) {
$db = $this->getConnection( $shardIndex );
if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
for ( $i = 0; $i < $this->numTableShards; $i++ ) {
@ -1030,25 +1091,44 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
}
/**
* @return bool Whether the main DB is used, e.g. wfGetDB( DB_MASTER )
* @return string[]|int[] List of server indexes or self::SHARD_LOCAL/self::SHARD_GLOBAL
*/
private function usesMainDB() {
return !$this->serverInfos;
private function getServerShardIndexes() {
if ( $this->serverTags ) {
// Striped array of database servers
$shardIndexes = range( 0, $this->numServerShards - 1 );
} else {
// LoadBalancer based configuration
$shardIndexes = [];
if ( $this->localKeyLb ) {
$shardIndexes[] = self::SHARD_LOCAL;
}
if ( $this->globalKeyLb ) {
$shardIndexes[] = self::SHARD_GLOBAL;
}
}
return $shardIndexes;
}
private function waitForReplication() {
if ( !$this->usesMainDB() ) {
// Custom DB server list; probably doesn't use replication
/**
* Wait for replica DBs to catch up to the master DB
*
* @param int|string $shardIndex Server index or self::SHARD_LOCAL/self::SHARD_GLOBAL
* @return bool Success
*/
private function waitForReplication( $shardIndex ) {
if ( is_int( $shardIndex ) ) {
return true; // striped only, no LoadBalancer
}
$lb = ( $shardIndex === self::SHARD_LOCAL ) ? $this->localKeyLb : $this->globalKeyLb;
if ( !$lb->hasStreamingReplicaServers() ) {
return true;
}
$lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
if ( $lb->getServerCount() <= 1 ) {
return true; // no replica DBs
}
// Main LB is used; wait for any replica DBs to catch up
try {
// Wait for any replica DBs to catch up
$masterPos = $lb->getMasterPos();
if ( !$masterPos ) {
return true; // not applicable
@ -1076,10 +1156,8 @@ class SqlBagOStuff extends MediumSpecificBagOStuff {
* @return ScopedCallback|null
*/
private function silenceTransactionProfiler() {
if ( !$this->usesMainDB() ) {
// Custom DB is configured which either has no TransactionProfiler injected,
// or has one specific for cache use, which we shouldn't silence
return null;
if ( $this->serverInfos ) {
return null; // no TransactionProfiler injected anyway
}
$trxProfiler = Profiler::instance()->getTransactionProfiler();

View file

@ -32,7 +32,10 @@ class SqlBagOStuffTest extends MediaWikiIntegrationTestCase {
array $components,
string $expected
) {
$cache = new SqlBagOStuff( [ 'keyspace' => $keyspace ] );
$cache = new SqlBagOStuff( [
'keyspace' => $keyspace,
'servers' => []
] );
$this->assertSame( $expected, $cache->makeKey( $class, ...$components ) );
}
}