rdbms: Introduce ServerInfoHolder to limit access to servers in LB

For example, any part of LB can write and change number of servers.

Bug: T326274
Change-Id: I177d57e2e34aea176a53c4d96d58f428b9a16634
This commit is contained in:
Amir Sarabadani 2023-02-02 15:31:41 +01:00
parent 805d63fa4a
commit 3909c1440a
7 changed files with 426 additions and 175 deletions

View file

@ -2854,6 +2854,7 @@ $wgAutoloadLocalClasses = [
'Wikimedia\\Rdbms\\SchemaBuilder' => __DIR__ . '/includes/libs/rdbms/dbal/SchemaBuilder.php',
'Wikimedia\\Rdbms\\SchemaChangeBuilder' => __DIR__ . '/includes/libs/rdbms/dbal/SchemaChangeBuilder.php',
'Wikimedia\\Rdbms\\SelectQueryBuilder' => __DIR__ . '/includes/libs/rdbms/querybuilder/SelectQueryBuilder.php',
'Wikimedia\\Rdbms\\ServerInfoHolder' => __DIR__ . '/includes/libs/rdbms/loadbalancer/ServerInfoHolder.php',
'Wikimedia\\Rdbms\\SessionConsistentConnectionManager' => __DIR__ . '/includes/libs/rdbms/connectionmanager/SessionConsistentConnectionManager.php',
'Wikimedia\\Rdbms\\SqliteResultWrapper' => __DIR__ . '/includes/libs/rdbms/database/resultwrapper/SqliteResultWrapper.php',
'Wikimedia\\Rdbms\\Subquery' => __DIR__ . '/includes/libs/rdbms/encasing/Subquery.php',

View file

@ -383,9 +383,7 @@ abstract class LBFactory implements ILBFactory {
$lb->commitPrimaryChanges( $fname );
}
// Run all post-commit callbacks in a separate step
$this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
$e = $this->executePostTransactionCallbacks();
$this->trxRoundStage = self::ROUND_CURSORY;
// Throw any last post-commit callback error
if ( $e instanceof Exception ) {
throw $e;
@ -403,9 +401,7 @@ abstract class LBFactory implements ILBFactory {
$lb->rollbackPrimaryChanges( $fname );
}
// Run all post-commit callbacks in a separate step
$this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
$this->executePostTransactionCallbacks();
$this->trxRoundStage = self::ROUND_CURSORY;
}
final public function flushPrimarySessions( $fname = __METHOD__ ) {
@ -424,6 +420,7 @@ abstract class LBFactory implements ILBFactory {
* @return Exception|null
*/
private function executePostTransactionCallbacks() {
$this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
$fname = __METHOD__;
// Run all post-commit callbacks until new ones stop getting added
$e = null; // first callback exception
@ -438,6 +435,7 @@ abstract class LBFactory implements ILBFactory {
$ex = $lb->runPrimaryTransactionListenerCallbacks( $fname );
$e = $e ?: $ex;
}
$this->trxRoundStage = self::ROUND_CURSORY;
return $e;
}

View file

@ -76,8 +76,8 @@ class LoadBalancer implements ILoadBalancerForOwner {
/** @var string|null The name of the DB cluster */
private $clusterName;
/** @var array[] Map of (server index => server config array) */
private $servers;
/** @var ServerInfoHolder */
private $serverInfoHolder;
/** @var array[] Map of (group => server index => weight) */
private $groupLoads;
/** @var array The LoadMonitor configuration */
@ -141,7 +141,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
* Default 'maxLag' when unspecified
* @internal Only for use within LoadBalancer/LoadMonitor
*/
public const MAX_LAG_DEFAULT = 6;
public const MAX_LAG_DEFAULT = ServerInfoHolder::MAX_LAG_DEFAULT;
/** Warn when this many connection are held */
private const CONN_HELD_WARN_THRESHOLD = 10;
@ -190,10 +190,10 @@ class LoadBalancer implements ILoadBalancerForOwner {
? DatabaseDomain::newFromId( $params['localDomain'] )
: DatabaseDomain::newUnspecified();
$this->servers = [];
$this->serverInfoHolder = new ServerInfoHolder();
$this->groupLoads = [ self::GROUP_GENERIC => [] ];
foreach ( $this->normalizeServerMaps( $params['servers'] ?? [] ) as $i => $server ) {
$this->servers[$i] = $server;
foreach ( $this->serverInfoHolder->normalizeServerMaps( $params['servers'] ?? [] ) as $i => $server ) {
$this->serverInfoHolder->addServer( $i, $server );
foreach ( $server['groupLoads'] as $group => $weight ) {
$this->groupLoads[$group][$i] = $weight;
}
@ -254,7 +254,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
public function getClusterName(): string {
// Fallback to the current primary name if not specified
return $this->clusterName ?? $this->getServerName( $this->getWriterIndex() );
return $this->clusterName ?? $this->getServerName( ServerInfoHolder::WRITER_INDEX );
}
public function getLocalDomainID(): string {
@ -330,7 +330,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
*/
private function sanitizeConnectionFlags( $flags, $i, $domain ) {
// Whether an outside caller is explicitly requesting the primary database server
if ( $i === self::DB_PRIMARY || $i === $this->getWriterIndex() ) {
if ( $i === self::DB_PRIMARY || $i === ServerInfoHolder::WRITER_INDEX ) {
$flags |= self::CONN_INTENT_WRITABLE;
}
@ -342,14 +342,14 @@ class LoadBalancer implements ILoadBalancerForOwner {
// the case, use the primary server information to get the attributes. The information
// for $i cannot be used since it might be DB_REPLICA, which might require connection
// attempts in order to be resolved into a real server index.
$attributes = $this->getServerAttributes( $this->getWriterIndex() );
$attributes = $this->getServerAttributes( ServerInfoHolder::WRITER_INDEX );
if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
// The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
// to use separate connections would just cause self-deadlocks. Note that
// REPEATABLE-READ staleness is not an issue since DB-level locking means
// that transactions are Strict Serializable anyway.
$flags &= ~self::CONN_TRX_AUTOCOMMIT;
$type = $this->getServerType( $this->getWriterIndex() );
$type = $this->serverInfoHolder->getServerType( ServerInfoHolder::WRITER_INDEX );
$this->logger->info( __METHOD__ . ": CONN_TRX_AUTOCOMMIT disallowed ($type)" );
} elseif ( isset( $this->tempTablesOnlyMode[$domain] ) ) {
// T202116: integration tests are active and queries should be all be using
@ -422,13 +422,13 @@ class LoadBalancer implements ILoadBalancerForOwner {
# Unset excessively lagged servers
foreach ( $lags as $i => $lag ) {
if ( $i !== $this->getWriterIndex() ) {
if ( $i !== ServerInfoHolder::WRITER_INDEX ) {
# How much lag this server nominally is allowed to have
$maxServerLag = $this->servers[$i]['max lag'] ?? self::MAX_LAG_DEFAULT; // default
$maxServerLag = $this->serverInfoHolder->getServerMaxLag( $i ); // default
# Constrain that further by $maxLag argument
$maxServerLag = min( $maxServerLag, $maxLag );
$srvName = $this->getServerName( $i );
$srvName = $this->serverInfoHolder->getServerName( $i );
if ( $lag === false && !is_infinite( $maxServerLag ) ) {
$this->logger->debug(
__METHOD__ . ": server {db_server} is not replicating?",
@ -459,9 +459,9 @@ class LoadBalancer implements ILoadBalancerForOwner {
public function getReaderIndex( $group = false ) {
$group = is_string( $group ) ? $group : self::GROUP_GENERIC;
if ( !$this->hasReplicaServers() ) {
if ( !$this->serverInfoHolder->hasReplicaServers() ) {
// There is only one possible server to use (the primary)
return $this->getWriterIndex();
return ServerInfoHolder::WRITER_INDEX;
}
$index = $this->getExistingReaderIndex( $group );
@ -623,7 +623,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
$this->waitForPos = $pos;
$ok = true;
foreach ( $this->getStreamingReplicaIndexes() as $i ) {
foreach ( $this->serverInfoHolder->getStreamingReplicaIndexes() as $i ) {
if ( $this->serverHasLoadInAnyGroup( $i ) ) {
$start = microtime( true );
$ok = $this->awaitSessionPrimaryPos( $i, $timeout ) && $ok;
@ -667,7 +667,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
}
public function getAnyOpenConnection( $i, $flags = 0 ) {
$i = ( $i === self::DB_PRIMARY ) ? $this->getWriterIndex() : $i;
$i = ( $i === self::DB_PRIMARY ) ? ServerInfoHolder::WRITER_INDEX : $i;
// Connection handles required to be in auto-commit mode use a separate connection
// pool since the main pool is effected by implicit and explicit transaction rounds
@ -754,7 +754,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
private function awaitSessionPrimaryPos( $index, $timeout = null ) {
$timeout = max( 1, intval( $timeout ?: self::MAX_WAIT_DEFAULT ) );
if ( !$this->waitForPos || $index === $this->getWriterIndex() ) {
if ( !$this->waitForPos || $index === ServerInfoHolder::WRITER_INDEX ) {
return true;
}
@ -841,7 +841,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
// The use of getServerConnection() instead of getConnection() avoids infinite loops.
$serverIndex = $i;
if ( $i === self::DB_PRIMARY ) {
$serverIndex = $this->getWriterIndex();
$serverIndex = ServerInfoHolder::WRITER_INDEX;
} elseif ( $i === self::DB_REPLICA ) {
foreach ( $groups as $group ) {
$groupIndex = $this->getReaderIndex( $group );
@ -853,7 +853,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
if ( $serverIndex < 0 ) {
$this->reportConnectionError( 'could not connect to any replica DB server' );
}
} elseif ( !isset( $this->servers[$i] ) ) {
} elseif ( !$this->serverInfoHolder->hasServerIndex( $i ) ) {
throw new UnexpectedValueException( "Invalid server index index #$i" );
}
// Get an open connection to that server (might trigger a new connection)
@ -899,7 +899,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
// or the primary database server is running in server-side read-only mode. Note that
// replica DB handles are always read-only via Database::assertIsWritablePrimary().
// Read-only mode due to replication lag is *avoided* here to avoid recursion.
if ( $i === $this->getWriterIndex() ) {
if ( $i === ServerInfoHolder::WRITER_INDEX ) {
if ( $this->readOnlyReason !== false ) {
$readOnlyReason = $this->readOnlyReason;
} elseif ( $this->isPrimaryConnectionReadOnly( $conn, $flags ) ) {
@ -953,7 +953,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
* @return int One of DB_PRIMARY/DB_REPLICA
*/
private function getRoleFromIndex( $i ) {
return ( $i === self::DB_PRIMARY || $i === $this->getWriterIndex() )
return ( $i === self::DB_PRIMARY || $i === ServerInfoHolder::WRITER_INDEX )
? self::DB_PRIMARY
: self::DB_REPLICA;
}
@ -1062,7 +1062,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
public function getServerAttributes( $i ) {
return $this->databaseFactory->attributesFromType(
$this->getServerType( $i ),
$this->servers[$i]['driver'] ?? null
$this->serverInfoHolder->getServerDriver( $i )
);
}
@ -1083,7 +1083,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
throw new DBAccessError();
}
$server = $this->getServerInfoStrict( $i );
$server = $this->serverInfoHolder->getServerInfoStrict( $i );
// Use low connection/read timeouts for connection used for gauging server health.
// Gauge information should be cached and used to avoid outages. Indefinite hanging
// while gauging servers would do the opposite.
@ -1128,7 +1128,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
$conn->setTableAliases( $this->tableAliases );
$conn->setIndexAliases( $this->indexAliases );
// Account for any active transaction round and listeners
if ( $i === $this->getWriterIndex() ) {
if ( $i === ServerInfoHolder::WRITER_INDEX ) {
if ( $this->trxRoundId !== false ) {
$this->applyTransactionRoundFlags( $conn );
}
@ -1169,7 +1169,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
$conn,
[
'connections' => $count,
'primarydb' => $this->getPrimaryServerName(),
'primarydb' => $this->serverInfoHolder->getPrimaryServerName(),
'db_domain' => $domain->getId()
]
)
@ -1191,7 +1191,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
return IDatabase::ROLE_STATIC_CLONE;
}
return ( $i === $this->getWriterIndex() )
return ( $i === ServerInfoHolder::WRITER_INDEX )
? IDatabase::ROLE_STREAMING_MASTER
: IDatabase::ROLE_STREAMING_REPLICA;
}
@ -1283,86 +1283,40 @@ class LoadBalancer implements ILoadBalancerForOwner {
}
public function getWriterIndex() {
return 0;
return ServerInfoHolder::WRITER_INDEX;
}
public function getServerCount() {
return count( $this->servers );
return $this->serverInfoHolder->getServerCount();
}
public function hasReplicaServers() {
return ( $this->getServerCount() > 1 );
}
/**
* @return int[] List of replica server indexes
*/
private function getStreamingReplicaIndexes() {
$indexes = [];
foreach ( $this->servers as $i => $server ) {
if ( $i !== $this->getWriterIndex() && empty( $server['is static'] ) ) {
$indexes[] = $i;
}
}
return $indexes;
return $this->serverInfoHolder->hasReplicaServers();
}
public function hasStreamingReplicaServers() {
return (bool)$this->getStreamingReplicaIndexes();
}
private function getServerNameFromConfig( $config ) {
$name = $config['serverName'] ?? ( $config['host'] ?? '' );
return ( $name !== '' ) ? $name : 'localhost';
}
private function normalizeServerMaps( array $servers, array &$indexBySrvName = null ) {
if ( !$servers ) {
throw new InvalidArgumentException( 'Missing or empty "servers" parameter' );
}
$listKey = -1;
$indexBySrvName = [];
foreach ( $servers as $i => $server ) {
if ( ++$listKey !== $i ) {
throw new UnexpectedValueException( 'List expected for "servers" parameter' );
}
$srvName = $server['serverName'] ?? $server['host'] ?? '';
$srvName = ( $srvName !== '' ) ? $srvName : 'localhost';
if ( isset( $indexBySrvName[$srvName] ) ) {
// Duplicate server names confuse caching, logging, and reconfigure()
throw new UnexpectedValueException( 'Duplicate server name "' . $srvName . '"' );
}
$indexBySrvName[$srvName] = $i;
$servers[$i]['serverName'] = $srvName;
$servers[$i]['groupLoads'] ??= [];
}
return $servers;
return $this->serverInfoHolder->hasStreamingReplicaServers();
}
public function getServerName( $i ): string {
return $this->servers[$i]['serverName'] ?? 'localhost';
return $this->serverInfoHolder->getServerName( $i );
}
public function getServerInfo( $i ) {
return $this->servers[$i] ?? false;
return $this->serverInfoHolder->getServerInfo( $i );
}
public function getServerType( $i ) {
return $this->servers[$i]['type'] ?? 'unknown';
return $this->serverInfoHolder->getServerType( $i );
}
public function getPrimaryPos() {
$index = $this->getWriterIndex();
$conn = $this->getAnyOpenConnection( $index );
$conn = $this->getAnyOpenConnection( ServerInfoHolder::WRITER_INDEX );
if ( $conn ) {
return $conn->getPrimaryPos();
}
$conn = $this->getConnectionInternal( $index, self::CONN_SILENCE_ERRORS );
$conn = $this->getConnectionInternal( ServerInfoHolder::WRITER_INDEX, self::CONN_SILENCE_ERRORS );
// @phan-suppress-next-line PhanRedundantCondition
if ( !$conn ) {
$this->reportConnectionError();
@ -1377,14 +1331,14 @@ class LoadBalancer implements ILoadBalancerForOwner {
public function getReplicaResumePos() {
// Get the position of any existing primary DB server connection
$primaryConn = $this->getAnyOpenConnection( $this->getWriterIndex() );
$primaryConn = $this->getAnyOpenConnection( ServerInfoHolder::WRITER_INDEX );
if ( $primaryConn ) {
return $primaryConn->getPrimaryPos();
}
// Get the highest position of any existing replica server connection
$highestPos = false;
foreach ( $this->getStreamingReplicaIndexes() as $i ) {
foreach ( $this->serverInfoHolder->getStreamingReplicaIndexes() as $i ) {
$conn = $this->getAnyOpenConnection( $i );
$pos = $conn ? $conn->getReplicaPos() : false;
if ( !$pos ) {
@ -1418,43 +1372,28 @@ class LoadBalancer implements ILoadBalancerForOwner {
* @return void
*/
public function reconfigure( array $params ) {
$anyServerDepooled = $this->serverInfoHolder->reconfigureServers( $params['servers'] );
if ( !$anyServerDepooled ) {
return;
}
// NOTE: We could close all connection here, but some may be in the middle of
// a transaction. So instead, we leave it to DBConnRef to close the
// connection when it detects that the modcount has changed and no
// transaction is open.
if ( count( $params['servers'] ) == count( $this->servers ) ) {
return;
}
$this->logger->info( 'Reconfiguring dbs!' );
$newServers = [];
foreach ( $params['servers'] as $server ) {
$newServers[] = $this->getServerNameFromConfig( $server );
}
$closeConnections = false;
foreach ( $this->servers as $i => $server ) {
if ( !in_array( $this->getServerNameFromConfig( $server ), $newServers ) ) {
// db depooled, remove it from list of servers
unset( $this->servers[$i] );
$this->groupLoads = [ self::GROUP_GENERIC => [] ];
foreach ( $params['servers'] as $j => $serverNew ) {
foreach ( ( $serverNew['groupLoads'] ?? [] ) as $group => $ratio ) {
$this->groupLoads[ $group ][ $j ] = $ratio;
}
$this->groupLoads[ self::GROUP_GENERIC ][ $j ] = $serverNew['load'];
}
$closeConnections = true;
$this->readIndexByGroup = [];
$this->conns = self::newTrackedConnectionsArray();
$this->readIndexByGroup = [];
$this->conns = self::newTrackedConnectionsArray();
$this->groupLoads = [ self::GROUP_GENERIC => [] ];
foreach ( $params['servers'] as $j => $serverNew ) {
foreach ( ( $serverNew['groupLoads'] ?? [] ) as $group => $ratio ) {
$this->groupLoads[ $group ][ $j ] = $ratio;
}
$this->groupLoads[ self::GROUP_GENERIC ][ $j ] = $serverNew['load'];
}
if ( $closeConnections ) {
// Bump modification counter to invalidate the connections held by DBConnRef
// instances. This will cause the next call to a method on the DBConnRef
// to get a new connection from getConnectionInternal()
$this->modcount++;
}
// Bump modification counter to invalidate the connections held by DBConnRef
// instances. This will cause the next call to a method on the DBConnRef
// to get a new connection from getConnectionInternal()
$this->modcount++;
}
public function disable( $fname = __METHOD__ ) {
@ -1484,7 +1423,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
throw new UnexpectedValueException( "Handle on '$domain' missing server index" );
}
$srvName = $this->getServerName( $serverIndex );
$srvName = $this->serverInfoHolder->getServerName( $serverIndex );
$found = false;
foreach ( $this->conns as $type => $poolConnsByServer ) {
@ -1846,7 +1785,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
}
public function hasPrimaryConnection() {
return (bool)$this->getAnyOpenConnection( $this->getWriterIndex() );
return (bool)$this->getAnyOpenConnection( ServerInfoHolder::WRITER_INDEX );
}
public function hasPrimaryChanges() {
@ -1960,17 +1899,16 @@ class LoadBalancer implements ILoadBalancerForOwner {
// Note that table prefixes are not related to server-side read-only mode
$this->wanCache->makeGlobalKey(
'rdbms-server-readonly',
$this->getPrimaryServerName()
$this->serverInfoHolder->getPrimaryServerName()
),
self::TTL_CACHE_READONLY,
function () {
$scope = $this->trxProfiler->silenceForScope();
$index = $this->getWriterIndex();
// Refresh the local server cache as well. This is done in order to avoid
// backfilling the WANCache with data that is already significantly stale
$flags = self::CONN_SILENCE_ERRORS | self::CONN_REFRESH_READ_ONLY;
$conn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
$conn = $this->getServerConnection( ServerInfoHolder::WRITER_INDEX, self::DOMAIN_ANY, $flags );
if ( $conn ) {
try {
$readOnly = (int)$this->isPrimaryConnectionReadOnly( $conn );
@ -2022,10 +1960,9 @@ class LoadBalancer implements ILoadBalancerForOwner {
* @return \Generator|Database[]
*/
private function getOpenPrimaryConnections() {
$primaryIndex = $this->getWriterIndex();
foreach ( $this->conns as $poolConnsByServer ) {
/** @var IDatabase $conn */
foreach ( ( $poolConnsByServer[$primaryIndex] ?? [] ) as $conn ) {
foreach ( ( $poolConnsByServer[ServerInfoHolder::WRITER_INDEX] ?? [] ) as $conn ) {
yield $conn;
}
}
@ -2038,7 +1975,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
private function getOpenReplicaConnections() {
foreach ( $this->conns as $poolConnsByServer ) {
foreach ( $poolConnsByServer as $serverIndex => $serverConns ) {
if ( $serverIndex === $this->getWriterIndex() ) {
if ( $serverIndex === ServerInfoHolder::WRITER_INDEX ) {
continue; // skip primary
}
foreach ( $serverConns as $conn ) {
@ -2053,12 +1990,12 @@ class LoadBalancer implements ILoadBalancerForOwner {
$maxLag = -1;
$maxIndex = 0;
if ( $this->hasReplicaServers() ) {
if ( $this->serverInfoHolder->hasReplicaServers() ) {
$lagTimes = $this->getLagTimes();
foreach ( $lagTimes as $i => $lag ) {
if ( $this->groupLoads[self::GROUP_GENERIC][$i] > 0 && $lag > $maxLag ) {
$maxLag = $lag;
$host = $this->getServerInfoStrict( $i, 'host' );
$host = $this->serverInfoHolder->getServerInfoStrict( $i, 'host' );
$maxIndex = $i;
}
}
@ -2069,35 +2006,24 @@ class LoadBalancer implements ILoadBalancerForOwner {
public function getLagTimes() {
if ( !$this->hasReplicaServers() ) {
return [ $this->getWriterIndex() => 0 ]; // no replication = no lag
return [ ServerInfoHolder::WRITER_INDEX => 0 ]; // no replication = no lag
}
$knownLagTimes = []; // map of (server index => 0 seconds)
$indexesWithLag = [];
foreach ( $this->servers as $i => $server ) {
if ( empty( $server['is static'] ) ) {
$indexesWithLag[] = $i; // DB server might have replication lag
} else {
$knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
}
}
[ $indexesWithLag, $knownLagTimes ] = $this->serverInfoHolder->getLagTimes();
return $this->getLoadMonitor()->getLagTimes( $indexesWithLag ) + $knownLagTimes;
}
public function waitForPrimaryPos( IDatabase $conn ) {
if ( $conn->getLBInfo( self::INFO_SERVER_INDEX ) === $this->getWriterIndex() ) {
if ( $conn->getLBInfo( self::INFO_SERVER_INDEX ) === ServerInfoHolder::WRITER_INDEX ) {
return true; // not a replica DB server
}
// Get the current primary DB position, opening a connection only if needed
$index = $this->getWriterIndex();
$flags = self::CONN_SILENCE_ERRORS;
$primaryConn = $this->getAnyOpenConnection( $index, $flags );
$primaryConn = $this->getAnyOpenConnection( ServerInfoHolder::WRITER_INDEX, $flags );
if ( $primaryConn ) {
$pos = $primaryConn->getPrimaryPos();
} else {
$primaryConn = $this->getServerConnection( $index, self::DOMAIN_ANY, $flags );
$primaryConn = $this->getServerConnection( ServerInfoHolder::WRITER_INDEX, self::DOMAIN_ANY, $flags );
if ( !$primaryConn ) {
throw new DBReplicationWaitError(
null,
@ -2184,28 +2110,6 @@ class LoadBalancer implements ILoadBalancerForOwner {
return $old;
}
/**
* @param int $i Server index
* @param string|null $field Server index field [optional]
* @return mixed
* @throws InvalidArgumentException
*/
private function getServerInfoStrict( $i, $field = null ) {
if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
throw new InvalidArgumentException( "No server with index '$i'" );
}
if ( $field !== null ) {
if ( !array_key_exists( $field, $this->servers[$i] ) ) {
throw new InvalidArgumentException( "No field '$field' in server index '$i'" );
}
return $this->servers[$i][$field];
}
return $this->servers[$i];
}
/**
* @param IDatabase $conn
* @return string Desciption of a connection handle for log messages
@ -2215,13 +2119,6 @@ class LoadBalancer implements ILoadBalancerForOwner {
return $conn->getLBInfo( self::INFO_SERVER_INDEX ) . '/' . $conn->getDomainID();
}
/**
* @return string Name of the primary DB server of the relevant DB cluster (e.g. "db1052")
*/
private function getPrimaryServerName() {
return $this->getServerName( $this->getWriterIndex() );
}
/**
* @param int $flags A bitfield of flags
* @param int $bit Bit flag constant

View file

@ -0,0 +1,170 @@
<?php
namespace Wikimedia\Rdbms;
use InvalidArgumentException;
use UnexpectedValueException;
/**
* Holds dbs information
* @internal
* @ingroup Database
*/
class ServerInfoHolder {
/**
* Default 'maxLag' when unspecified
* @internal Only for use within LoadBalancer/LoadMonitor
*/
public const MAX_LAG_DEFAULT = 6;
public const WRITER_INDEX = 0;
/** @var array[] Map of (server index => server config array) */
private $servers;
public function addServer( $i, $server ) {
$this->servers[$i] = $server;
}
public function getServerMaxLag( $i ) {
return $this->servers[$i]['max lag'] ?? self::MAX_LAG_DEFAULT;
}
public function getServerDriver( $i ) {
return $this->servers[$i]['driver'] ?? null;
}
public function getServerType( $i ) {
return $this->servers[$i]['type'] ?? 'unknown';
}
public function getServerName( $i ): string {
return $this->servers[$i]['serverName'] ?? 'localhost';
}
public function getServerInfo( $i ) {
return $this->servers[$i] ?? false;
}
public function getServerCount() {
return count( $this->servers );
}
public function hasServerIndex( $i ) {
return isset( $this->servers[$i] );
}
public function getLagTimes() {
$knownLagTimes = []; // map of (server index => 0 seconds)
$indexesWithLag = [];
foreach ( $this->servers as $i => $server ) {
if ( empty( $server['is static'] ) ) {
$indexesWithLag[] = $i; // DB server might have replication lag
} else {
$knownLagTimes[$i] = 0; // DB server is a non-replicating and read-only archive
}
}
return [ $indexesWithLag, $knownLagTimes ];
}
/**
* @param int $i Server index
* @param string|null $field Server index field [optional]
* @return mixed
* @throws InvalidArgumentException
*/
public function getServerInfoStrict( $i, $field = null ) {
if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
throw new InvalidArgumentException( "No server with index '$i'" );
}
if ( $field !== null ) {
if ( !array_key_exists( $field, $this->servers[$i] ) ) {
throw new InvalidArgumentException( "No field '$field' in server index '$i'" );
}
return $this->servers[$i][$field];
}
return $this->servers[$i];
}
/**
* @return int[] List of replica server indexes
*/
public function getStreamingReplicaIndexes() {
$indexes = [];
foreach ( $this->servers as $i => $server ) {
if ( $i !== self::WRITER_INDEX && empty( $server['is static'] ) ) {
$indexes[] = $i;
}
}
return $indexes;
}
public function hasStreamingReplicaServers() {
return (bool)$this->getStreamingReplicaIndexes();
}
public function reconfigureServers( $paramServers ) {
if ( count( $paramServers ) == $this->getServerCount() ) {
return false;
}
$newServers = [];
foreach ( $paramServers as $i => $server ) {
$newServers[] = $this->getServerNameFromConfig( $server );
}
$closeConnections = false;
foreach ( $this->servers as $i => $server ) {
if ( !in_array( $this->getServerNameFromConfig( $server ), $newServers ) ) {
// db depooled, remove it from list of servers
unset( $this->servers[$i] );
$closeConnections = true;
}
}
return $closeConnections;
}
public function getServerNameFromConfig( $config ) {
$name = $config['serverName'] ?? ( $config['host'] ?? '' );
return ( $name !== '' ) ? $name : 'localhost';
}
public function normalizeServerMaps( array $servers, array &$indexBySrvName = null ) {
if ( !$servers ) {
throw new InvalidArgumentException( 'Missing or empty "servers" parameter' );
}
$listKey = -1;
$indexBySrvName = [];
foreach ( $servers as $i => $server ) {
if ( ++$listKey !== $i ) {
throw new UnexpectedValueException( 'List expected for "servers" parameter' );
}
$srvName = $server['serverName'] ?? $server['host'] ?? '';
$srvName = ( $srvName !== '' ) ? $srvName : 'localhost';
if ( isset( $indexBySrvName[$srvName] ) ) {
// Duplicate server names confuse caching, logging, and reconfigure()
throw new UnexpectedValueException( 'Duplicate server name "' . $srvName . '"' );
}
$indexBySrvName[$srvName] = $i;
$servers[$i]['serverName'] = $srvName;
$servers[$i]['groupLoads'] ??= [];
}
return $servers;
}
/**
* @return string Name of the primary DB server of the relevant DB cluster (e.g. "db1052")
*/
public function getPrimaryServerName() {
return $this->getServerName( self::WRITER_INDEX );
}
public function hasReplicaServers() {
return ( $this->getServerCount() > 1 );
}
}

View file

@ -37,6 +37,7 @@ use Wikimedia\TestingAccessWrapper;
* @group Database
* @group medium
* @covers \Wikimedia\Rdbms\LoadBalancer
* @covers \Wikimedia\Rdbms\ServerInfoHolder
*/
class LoadBalancerTest extends MediaWikiIntegrationTestCase {
private function makeServerConfig( $flags = DBO_DEFAULT ) {

View file

@ -12,10 +12,10 @@ class SqlBagOStuffServerArrayTest extends BagOStuffTestBase {
protected function newCacheInstance() {
// Extract server config from main load balancer
$lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
$servers = TestingAccessWrapper::newFromObject( $lb )->servers;
$serverInfoHolder = TestingAccessWrapper::newFromObject( $lb )->serverInfoHolder;
return ObjectCache::newFromParams( [
'class' => SqlBagOStuff::class,
'servers' => [ $servers[0] ]
'servers' => [ $serverInfoHolder->getServerInfo( 0 ) ]
] );
}
}

View file

@ -0,0 +1,184 @@
<?php
use Wikimedia\Rdbms\ServerInfoHolder;
/**
* @covers \Wikimedia\Rdbms\ServerInfoHolder
*/
class ServerInfoHolderTest extends \PHPUnit\Framework\TestCase {
use MediaWikiCoversValidator;
private function serverConfigs(
$extra = [], $flags = DBO_DEFAULT
) {
return [
// Primary DB
0 => $extra + [
'host' => 'db0',
'serverName' => 'testhost0',
'dbname' => 'testwiki',
'tablePrefix' => 'prefix',
'user' => 'dbuser',
'password' => 'verystrongpassword',
'type' => 'mysql',
'flags' => $flags,
'load' => 0,
],
// Main replica DBs
1 => $extra + [
'host' => 'db1',
'serverName' => 'testhost1',
'dbname' => 'testwiki',
'tablePrefix' => 'prefix',
'user' => 'dbuser',
'password' => 'verystrongpassword',
'type' => 'mysql',
'flags' => $flags,
'load' => 100,
],
2 => $extra + [
'host' => 'db2',
'serverName' => 'testhost2',
'dbname' => 'testwiki',
'tablePrefix' => 'prefix',
'user' => 'dbuser',
'password' => 'verystrongpassword',
'type' => 'mysql',
'flags' => $flags,
'load' => 100,
],
// RC replica DBs
3 => $extra + [
'host' => 'db3',
'serverName' => 'testhost3',
'dbname' => 'testwiki',
'tablePrefix' => 'prefix',
'user' => 'dbuser',
'password' => 'verystrongpassword',
'type' => 'mysql',
'flags' => $flags,
'load' => 0,
'groupLoads' => [
'foo' => 100,
'bar' => 100
],
],
// Logging replica DBs
4 => $extra + [
'host' => 'db4',
'serverName' => 'testhost4',
'dbname' => 'testwiki',
'tablePrefix' => 'prefix',
'user' => 'dbuser',
'password' => 'verystrongpassword',
'type' => 'mysql',
'flags' => $flags,
'load' => 0,
'groupLoads' => [
'baz' => 100
],
],
5 => $extra + [
'host' => 'db5',
'serverName' => 'testhost5',
'dbname' => 'testwiki',
'tablePrefix' => 'prefix',
'user' => 'dbuser',
'password' => 'verystrongpassword',
'type' => 'mysql',
'flags' => $flags,
'load' => 0,
'groupLoads' => [
'baz' => 100
],
],
// Maintenance query replica DBs
6 => $extra + [
'host' => 'db6',
'serverName' => 'testhost6',
'dbname' => 'testwiki',
'tablePrefix' => 'prefix',
'user' => 'dbuser',
'password' => 'verystrongpassword',
'type' => 'mysql',
'flags' => $flags,
'load' => 0,
'groupLoads' => [
'vslow' => 100
],
],
// Replica DB that only has a copy of some static tables
7 => $extra + [
'host' => 'db7',
'serverName' => 'testhost7',
'dbname' => 'testwiki',
'tablePrefix' => 'prefix',
'user' => 'dbuser',
'password' => 'verystrongpassword',
'type' => 'mysql',
'flags' => $flags,
'load' => 0,
'groupLoads' => [
'archive' => 100
],
'is static' => true
]
];
}
private function createInfoHolderFromArray( $servers ): ServerInfoHolder {
$holder = new ServerInfoHolder();
foreach ( $holder->normalizeServerMaps( $servers ) as $i => $server ) {
$holder->addServer( $i, $server );
}
return $holder;
}
public function testWithoutReplica() {
$primaryInfo = $this->serverConfigs()[0];
$holder = $this->createInfoHolderFromArray( [ $primaryInfo ] );
$primaryInfo['groupLoads'] = [];
$this->assertSame( 1, $holder->getServerCount() );
$this->assertFalse( $holder->hasReplicaServers() );
$this->assertFalse( $holder->hasStreamingReplicaServers() );
$this->assertTrue( $holder->hasServerIndex( 0 ) );
$this->assertFalse( $holder->hasServerIndex( 1 ) );
$this->assertFalse( $holder->hasServerIndex( 999 ) );
$this->assertSame( $primaryInfo, $holder->getServerInfo( 0 ) );
$this->assertSame( $primaryInfo, $holder->getServerInfoStrict( 0 ) );
$this->assertSame( 'testhost0', $holder->getPrimaryServerName() );
}
public function testWithReplica() {
// Simulate web request with DBO_TRX
$holder = $this->createInfoHolderFromArray( $this->serverConfigs( [], DBO_TRX ) );
$this->assertSame( 8, $holder->getServerCount() );
$this->assertTrue( $holder->hasReplicaServers() );
$this->assertTrue( $holder->hasStreamingReplicaServers() );
$this->assertSame( 'testhost0', $holder->getPrimaryServerName() );
for ( $i = 0; $i < $holder->getServerCount(); ++$i ) {
$this->assertIsString( $holder->getServerName( $i ) );
$this->assertIsArray( $holder->getServerInfo( $i ) );
$this->assertIsString( $holder->getServerType( $i ) );
}
$this->assertFalse( $holder->hasServerIndex( 999 ) );
}
public function testNonExistentServerInfoStrict() {
$holder = $this->createInfoHolderFromArray( $this->serverConfigs( [], DBO_TRX ) );
$this->expectException( InvalidArgumentException::class );
$holder->getServerInfoStrict( 999 );
}
public function testNonExistentFieldInfoStrict() {
$holder = $this->createInfoHolderFromArray( $this->serverConfigs( [], DBO_TRX ) );
$this->expectException( InvalidArgumentException::class );
$holder->getServerInfoStrict( 0, 'non-existent-field' );
}
}