Merge "rdbms: improve LoadBalancer connection pool reuse"

This commit is contained in:
jenkins-bot 2022-09-23 08:03:29 +00:00 committed by Gerrit Code Review
commit a48afaa50c
9 changed files with 196 additions and 276 deletions

View file

@ -16,7 +16,7 @@ use InvalidArgumentException;
* @par Example:
* @code
* function getRowData() {
* $conn = $this->lb->getConnectedRef( DB_REPLICA );
* $conn = $this->lb->getConnectionRef( DB_REPLICA );
* $row = $conn->select( ... );
* return $row ? (array)$row : false;
* // $conn falls out of scope and $this->lb->reuseConnection() gets called
@ -31,7 +31,10 @@ class DBConnRef implements IMaintainableDatabase {
private $lb;
/** @var Database|null Live connection handle */
private $conn;
/** @var array N-tuple of (server index, group, DatabaseDomain|string) */
/**
* @var array Map of (DBConnRef::FLD_* constant => connection parameter)
* @phan-var array{0:int,1:array|string|false,2:DatabaseDomain,3:int}
*/
private $params;
/** @var int One of DB_PRIMARY/DB_REPLICA */
private $role;
@ -49,7 +52,7 @@ class DBConnRef implements IMaintainableDatabase {
private $modCountFix;
private const FLD_INDEX = 0;
private const FLD_GROUP = 1;
private const FLD_GROUPS = 1;
private const FLD_DOMAIN = 2;
private const FLD_FLAGS = 3;
@ -66,6 +69,8 @@ class DBConnRef implements IMaintainableDatabase {
throw new InvalidArgumentException( "Missing lazy connection arguments." );
}
$params[self::FLD_DOMAIN] = DatabaseDomain::newFromId( $params[self::FLD_DOMAIN] );
$this->lb = $lb;
$this->params = $params;
$this->role = $role;
@ -91,10 +96,21 @@ class DBConnRef implements IMaintainableDatabase {
}
if ( $this->conn === null ) {
[ $index, $groups, $wiki, $flags ] = $this->params;
$this->conn = $this->lb->getConnectionInternal( $index, $groups, $wiki, $flags );
$this->conn = $this->lb->getConnectionInternal(
$this->params[self::FLD_INDEX],
$this->params[self::FLD_GROUPS],
$this->params[self::FLD_DOMAIN]->getId(),
$this->params[self::FLD_FLAGS]
);
$this->modCountFix = $this->modCountRef;
}
if ( !$this->params[self::FLD_DOMAIN]->equals( $this->conn->getDomainID() ) ) {
// The underlying connection handle is likely being shared by other DBConnRef
// instances in a load balancer. Make sure that each one routes queries by their
// owner function to the domain that the owner expects.
$this->conn->selectDomain( $this->params[self::FLD_DOMAIN] );
}
}
public function __call( $name, array $arguments ) {
@ -143,8 +159,7 @@ class DBConnRef implements IMaintainableDatabase {
if ( $this->conn === null ) {
// Avoid triggering a database connection
$domain = DatabaseDomain::newFromId( $this->params[self::FLD_DOMAIN] );
$prefix = $domain->getTablePrefix();
$prefix = $this->params[self::FLD_DOMAIN]->getTablePrefix();
} else {
// This will just return the prefix
$prefix = $this->__call( __FUNCTION__, func_get_args() );
@ -161,8 +176,7 @@ class DBConnRef implements IMaintainableDatabase {
if ( $this->conn === null ) {
// Avoid triggering a database connection
$domain = DatabaseDomain::newFromId( $this->params[self::FLD_DOMAIN] );
$schema = (string)$domain->getSchema();
$schema = (string)( $this->params[self::FLD_DOMAIN]->getSchema() );
} else {
// This will just return the schema
$schema = $this->__call( __FUNCTION__, func_get_args() );
@ -235,9 +249,8 @@ class DBConnRef implements IMaintainableDatabase {
public function getDomainID() {
if ( $this->conn === null ) {
$domain = $this->params[self::FLD_DOMAIN];
// Avoid triggering a database connection
return $domain instanceof DatabaseDomain ? $domain->getId() : $domain;
return $this->params[self::FLD_DOMAIN]->getId();
}
return $this->__call( __FUNCTION__, func_get_args() );
@ -474,9 +487,8 @@ class DBConnRef implements IMaintainableDatabase {
public function getDBname() {
if ( $this->conn === null ) {
$domain = DatabaseDomain::newFromId( $this->params[self::FLD_DOMAIN] );
// Avoid triggering a database connection
return $domain->getDatabase();
return $this->params[self::FLD_DOMAIN]->getDatabase();
}
return $this->__call( __FUNCTION__, func_get_args() );

View file

@ -74,10 +74,10 @@ class LoadBalancer implements ILoadBalancerForOwner {
/** @var callable Deprecation logger */
private $deprecationLogger;
/** @var DatabaseDomain Local DB domain ID and default for selectDB() calls */
/** @var DatabaseDomain Local DB domain ID and default for new connections */
private $localDomain;
/** @var IDatabase[][][] Map of (pool category => server index => domain => Database) */
/** @var Database[][][] Map of (pool category => server index => Database[]) */
private $conns;
/** @var string|null The name of the DB cluster */
@ -143,10 +143,10 @@ class LoadBalancer implements ILoadBalancerForOwner {
*/
private $modcount = 0;
/** IDatabase handle LB info key; the "server index" of the handle */
private const INFO_SERVER_INDEX = 'serverIndex';
/** IDatabase handle LB info key; whether the handle belongs to the auto-commit pool */
private const INFO_AUTOCOMMIT_ONLY = 'autoCommitOnly';
private const INFO_FORIEGN = 'foreign';
private const INFO_FOREIGN_REF_COUNT = 'foreignPoolRefCount';
/**
* Default 'maxLag' when unspecified
@ -162,15 +162,10 @@ class LoadBalancer implements ILoadBalancerForOwner {
/** Seconds to cache primary DB server read-only status */
private const TTL_CACHE_READONLY = 5;
private const KEY_LOCAL = 'local';
private const KEY_FOREIGN_FREE = 'foreignFree';
private const KEY_FOREIGN_INUSE = 'foreignInUse';
private const KEY_LOCAL_NOROUND = 'localAutoCommit';
private const KEY_FOREIGN_FREE_NOROUND = 'foreignFreeAutoCommit';
private const KEY_FOREIGN_INUSE_NOROUND = 'foreignInUseAutoCommit';
private const KEY_LOCAL_DOMAIN = '__local__';
/** @var string Key to the pool of transaction round connections */
private const POOL_ROUND = 'round';
/** @var string Key to the pool of auto-commit connections */
private const POOL_AUTOCOMMIT = 'auto-commit';
/** Transaction round, explicit or implicit, has not finished writing */
private const ROUND_CURSORY = 'cursory';
@ -276,14 +271,10 @@ class LoadBalancer implements ILoadBalancerForOwner {
private static function newTrackedConnectionsArray() {
return [
// Connection were transaction rounds may be applied
self::KEY_LOCAL => [],
self::KEY_FOREIGN_INUSE => [],
self::KEY_FOREIGN_FREE => [],
// Auto-committing counterpart connections that ignore transaction rounds
self::KEY_LOCAL_NOROUND => [],
self::KEY_FOREIGN_INUSE_NOROUND => [],
self::KEY_FOREIGN_FREE_NOROUND => []
// Connection handles that participate in transaction rounds
self::POOL_ROUND => [],
// Auto-committing connection handles that ignore transaction rounds
self::POOL_AUTOCOMMIT => []
];
}
@ -377,8 +368,10 @@ class LoadBalancer implements ILoadBalancerForOwner {
// Callers use CONN_TRX_AUTOCOMMIT to bypass REPEATABLE-READ staleness without
// resorting to row locks (e.g. FOR UPDATE) or to make small out-of-band commits
// during larger transactions. This is useful for avoiding lock contention.
// Primary DB server attributes (should match those of the replica DB servers)
// Assuming all servers are of the same type (or similar), which is overwhelmingly
// the case, use the primary server information to get the attributes. The information
// for $i cannot be used since it might be DB_REPLICA, which might require connection
// attempts in order to be resolved into a real server index.
$attributes = $this->getServerAttributes( $this->getWriterIndex() );
if ( $attributes[Database::ATTR_DB_LEVEL_LOCKING] ) {
// The RDBMS does not support concurrent writes (e.g. SQLite), so attempts
@ -628,7 +621,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
/** @var int|false $i Index of selected server */
$i = false;
/** @var bool $laggedReplicaMode Whether server is considered lagged */
$laggedReplicaMode = false;
// Quickly look through the available servers for a server that meets criteria...
@ -787,14 +780,14 @@ class LoadBalancer implements ILoadBalancerForOwner {
$autoCommitOnly = self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT );
$conn = false;
foreach ( $this->conns as $type => $connsByServer ) {
foreach ( $this->conns as $type => $poolConnsByServer ) {
if ( $i === self::DB_REPLICA ) {
// Consider all existing connections to any server
$applicableConnsByServer = $connsByServer;
$applicableConnsByServer = $poolConnsByServer;
} else {
// Consider all existing connections to a specific server
$applicableConnsByServer = isset( $connsByServer[$i] )
? [ $i => $connsByServer[$i] ]
$applicableConnsByServer = isset( $poolConnsByServer[$i] )
? [ $i => $poolConnsByServer[$i] ]
: [];
}
@ -826,7 +819,6 @@ class LoadBalancer implements ILoadBalancerForOwner {
": pooled DB handle for {db_server} (#$i) has no open connection.",
$this->getConnLogContext( $conn )
);
continue; // some sort of error occurred?
}
@ -844,7 +836,6 @@ class LoadBalancer implements ILoadBalancerForOwner {
": pooled DB handle for {db_server} (#$i) has a pending transaction.",
$this->getConnLogContext( $conn )
);
continue;
}
}
@ -970,12 +961,16 @@ class LoadBalancer implements ILoadBalancerForOwner {
}
public function getServerConnection( $i, $domain, $flags = 0 ) {
$domainInstance = DatabaseDomain::newFromId( $domain );
// Number of connections made before getting the server index and handle
$priorConnectionsMade = $this->connectionCounter;
// Get an open connection to this server (might trigger a new connection)
$conn = $this->localDomain->equals( $domain )
? $this->getLocalConnection( $i, $flags )
: $this->getForeignConnection( $i, $domain, $flags );
[ $conn, $domainChanged ] = $this->reuseOrOpenConnectionForNewRef(
$i,
$domainInstance,
$flags
);
// Throw an error or otherwise bail out if the connection attempt failed
if ( !( $conn instanceof IDatabase ) ) {
if ( !self::fieldHasBit( $flags, self::CONN_SILENCE_ERRORS ) ) {
@ -1030,11 +1025,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
}
public function reuseConnectionInternal( IDatabase $conn ) {
$serverIndex = $conn->getLBInfo( self::INFO_SERVER_INDEX );
$refCount = $conn->getLBInfo( self::INFO_FOREIGN_REF_COUNT );
if ( $serverIndex === null || $refCount === null ) {
return; // non-foreign connection; no domain-use tracking to update
} elseif ( $conn instanceof DBConnRef ) {
if ( $conn instanceof DBConnRef ) {
// DBConnRef already handles calling reuseConnection() and only passes the live
// Database instance to this method. Any caller passing in a DBConnRef is broken.
$this->connLogger->error(
@ -1042,43 +1033,12 @@ class LoadBalancer implements ILoadBalancerForOwner {
[ 'db_domain' => $conn->getDomainID(), 'exception' => new RuntimeException() ]
);
return;
return; // DBConnRef::__destruct() will call this method (passing a Database)
}
if ( $this->disabled ) {
return; // DBConnRef handle probably survived longer than the LoadBalancer
}
if ( $conn->getLBInfo( self::INFO_AUTOCOMMIT_ONLY ) ) {
$connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
$connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
} else {
$connFreeKey = self::KEY_FOREIGN_FREE;
$connInUseKey = self::KEY_FOREIGN_INUSE;
}
$domain = $conn->getDomainID();
$existingDomainConn = $this->conns[$connInUseKey][$serverIndex][$domain] ?? null;
if ( !$existingDomainConn ) {
throw new InvalidArgumentException(
"Connection $serverIndex/$domain not found; it may have already been freed" );
} elseif ( $existingDomainConn !== $conn ) {
throw new InvalidArgumentException(
"Connection $serverIndex/$domain mismatched; it may have already been freed" );
}
$existingDomainConn->setLBInfo( self::INFO_FOREIGN_REF_COUNT, --$refCount );
if ( $refCount <= 0 ) {
$this->conns[$connFreeKey][$serverIndex][$domain] = $existingDomainConn;
unset( $this->conns[$connInUseKey][$serverIndex][$domain] );
if ( !$this->conns[$connInUseKey][$serverIndex] ) {
unset( $this->conns[$connInUseKey][$serverIndex] ); // clean up
}
$this->connLogger->debug( __METHOD__ . ": freed connection $serverIndex/$domain" );
} else {
$this->connLogger->debug( __METHOD__ .
": reference count for $serverIndex/$domain reduced to $refCount" );
}
}
public function getConnectionRef( $i, $groups = [], $domain = false, $flags = 0 ): IDatabase {
@ -1128,176 +1088,102 @@ class LoadBalancer implements ILoadBalancerForOwner {
}
/**
* Open a connection to a local DB, or return one if it is already open.
* Get a live connection handle to the given domain
*
* On error, returns false, and the connection which caused the
* error will be available via $this->errorConnection.
* This will reuse an existing tracked connection when possible. In some cases, this
* involves switching the DB domain of an existing handle in order to reuse it. If no
* existing handles can be reused, then a new connection will be made.
*
* @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
* On error, the offending DB handle will be available via $this->errorConnection.
*
* @param int $i Specific server index
* @param int $flags Class CONN_* constant bitfield
* @return Database
* @throws InvalidArgumentException When the server index is invalid
* @throws UnexpectedValueException When the DB domain of the connection is corrupted
*/
private function getLocalConnection( $i, $flags = 0 ) {
$autoCommit = self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT );
// Connection handles required to be in auto-commit mode use a separate connection
// pool since the main pool is effected by implicit and explicit transaction rounds
$connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
if ( isset( $this->conns[$connKey][$i][self::KEY_LOCAL_DOMAIN] ) ) {
$conn = $this->conns[$connKey][$i][self::KEY_LOCAL_DOMAIN];
$this->connLogger->debug( __METHOD__ . ": reused a connection for $connKey/$i" );
} else {
$conn = $this->reallyOpenConnection(
$i,
$this->localDomain,
[ self::INFO_AUTOCOMMIT_ONLY => $autoCommit ]
);
if ( $conn->isOpen() ) {
$this->connLogger->debug( __METHOD__ . ": opened new connection for $connKey/$i" );
$this->conns[$connKey][$i][self::KEY_LOCAL_DOMAIN] = $conn;
} else {
$this->connLogger->warning( __METHOD__ . ": connection error for $connKey/$i" );
$this->errorConnection = $conn;
$conn = false;
}
}
// Check to make sure that the right domain is selected
if (
$conn instanceof IDatabase &&
!$this->localDomain->isCompatible( $conn->getDomainID() )
) {
throw new UnexpectedValueException(
"Got connection to '{$conn->getDomainID()}', " .
"but expected local domain ('{$this->localDomain}')"
);
}
return $conn;
}
/**
* Open a connection to a foreign DB, or return one if it is already open.
*
* Increments a reference count on the returned connection which locks the
* connection to the requested domain. This reference count can be
* decremented by calling reuseConnection().
*
* If a connection is open to the appropriate server already, but with the wrong
* database, it will be switched to the right database and returned, as long as
* it has been freed first with reuseConnection().
*
* On error, returns false, and the connection which caused the
* error will be available via $this->errorConnection.
*
* @note If disable() was called on this LoadBalancer, this method will throw a DBAccessError.
*
* @param int $i Specific server index
* @param string $domain Domain ID to open
* @param int $flags Class CONN_* constant bitfield
* @return Database|false Returns false on connection error
* @param DatabaseDomain $domain Database domain ID required by the reference
* @param int $flags Bit field of class CONN_* constants
* @return array (Database or null on error, whether the DB domain changed)
* @throws DBError When database selection fails
* @throws InvalidArgumentException When the server index is invalid
* @throws UnexpectedValueException When the DB domain of the connection is corrupted
* @throws DBAccessError If disable() was called
*/
private function getForeignConnection( $i, $domain, $flags = 0 ) {
$domainInstance = DatabaseDomain::newFromId( $domain );
$autoCommit = self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT );
private function reuseOrOpenConnectionForNewRef( $i, DatabaseDomain $domain, $flags = 0 ) {
// Connection handles required to be in auto-commit mode use a separate connection
// pool since the main pool is effected by implicit and explicit transaction rounds
if ( $autoCommit ) {
$connFreeKey = self::KEY_FOREIGN_FREE_NOROUND;
$connInUseKey = self::KEY_FOREIGN_INUSE_NOROUND;
} else {
$connFreeKey = self::KEY_FOREIGN_FREE;
$connInUseKey = self::KEY_FOREIGN_INUSE;
}
$autoCommit = self::fieldHasBit( $flags, self::CONN_TRX_AUTOCOMMIT );
// Decide which pool of connection handles to use (segregated by CONN_TRX_AUTOCOMMIT)
$poolKey = $autoCommit ? self::POOL_AUTOCOMMIT : self::POOL_ROUND;
/** @var Database $conn */
$conn = null;
if ( isset( $this->conns[$connInUseKey][$i][$domain] ) ) {
// Reuse an in-use connection for the same domain
$conn = $this->conns[$connInUseKey][$i][$domain];
$this->connLogger->debug( __METHOD__ . ": reusing connection $connInUseKey/$i/$domain" );
} elseif ( isset( $this->conns[$connFreeKey][$i][$domain] ) ) {
// Reuse a free connection for the same domain
$conn = $this->conns[$connFreeKey][$i][$domain];
unset( $this->conns[$connFreeKey][$i][$domain] );
$this->conns[$connInUseKey][$i][$domain] = $conn;
$this->connLogger->debug( __METHOD__ . ": reusing free connection $connInUseKey/$i/$domain" );
} elseif ( !empty( $this->conns[$connFreeKey][$i] ) ) {
// Reuse a free connection from another domain if possible
foreach ( $this->conns[$connFreeKey][$i] as $oldDomain => $oldConn ) {
if ( $domainInstance->getDatabase() !== null ) {
// Check if changing the database will require a new connection.
// In that case, leave the connection handle alone and keep looking.
// This prevents connections from being closed mid-transaction and can
// also avoid overhead if the same database will later be requested.
if (
$oldConn->databasesAreIndependent() &&
$oldConn->getDBname() !== $domainInstance->getDatabase()
) {
continue;
$domainChanged = false;
// Reuse a free connection in the pool from any domain if possible. There should only
// be one connection in this pool unless either:
// - a) IDatabase::databasesAreIndependent() returns true (e.g. postgres) and two
// or more database domains have been used during the load balancer's lifetime
// - b) Two or more nested function calls used getConnection() on different domains.
// Normally, callers should use getConnectionRef() instead of getConnection().
foreach ( ( $this->conns[$poolKey][$i] ?? [] ) as $poolConn ) {
// Check if any required DB domain changes for the new reference are possible
// Calling selectDomain() would trigger a reconnect, which will break if a
// transaction is active or if there is any other meaningful session state.
$isShareable = !(
$poolConn->databasesAreIndependent() &&
$domain->getDatabase() !== null &&
$domain->getDatabase() !== $poolConn->getDBname()
);
if ( $isShareable ) {
$conn = $poolConn;
// Make any required DB domain changes for the new reference
if ( !$domain->equals( $conn->getDomainID() ) ) {
if ( $domain->getDatabase() !== null ) {
// Select the new database, schema, and prefix
$conn->selectDomain( $domain );
} else {
// Stay on the current database, but update the schema/prefix
$conn->dbSchema( $domain->getSchema() );
$conn->tablePrefix( $domain->getTablePrefix() );
}
// Select the new database, schema, and prefix
$conn = $oldConn;
$conn->selectDomain( $domainInstance );
} else {
// Stay on the current database, but update the schema/prefix
$conn = $oldConn;
$conn->dbSchema( $domainInstance->getSchema() );
$conn->tablePrefix( $domainInstance->getTablePrefix() );
$domainChanged = true;
}
unset( $this->conns[$connFreeKey][$i][$oldDomain] );
// Note that if $domain is an empty string, getDomainID() might not match it
$this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
$this->connLogger->debug( __METHOD__ .
": reusing free connection from $oldDomain for $domain" );
$this->connLogger->debug( __METHOD__ . ": reusing connection for $i/$domain" );
break;
}
}
// If necessary, try to open a new connection and add it to the pool
if ( !$conn ) {
$conn = $this->reallyOpenConnection(
$i,
$domainInstance,
[
self::INFO_AUTOCOMMIT_ONLY => $autoCommit,
self::INFO_FORIEGN => true,
self::INFO_FOREIGN_REF_COUNT => 0
]
$domain,
[ self::INFO_AUTOCOMMIT_ONLY => $autoCommit ]
);
if ( $conn->isOpen() ) {
// Note that if $domain is an empty string, getDomainID() might not match it
$this->conns[$connInUseKey][$i][$conn->getDomainID()] = $conn;
$this->connLogger->debug( __METHOD__ . ": opened new connection for $connInUseKey/$i/$domain" );
$this->conns[$poolKey][$i][] = $conn;
} else {
$this->connLogger->warning(
__METHOD__ . ": connection error for $connInUseKey/$i/{db_domain}",
[ 'db_domain' => $domain ]
);
$this->errorConnection = $conn;
$conn = false;
$conn = null;
}
}
// Check to make sure that the right domain is selected
if ( $conn instanceof IDatabase ) {
// Check to make sure that the right domain is selected
if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
throw new UnexpectedValueException(
"Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
}
// Increment reference count
$refCount = $conn->getLBInfo( self::INFO_FOREIGN_REF_COUNT );
$conn->setLBInfo( self::INFO_FOREIGN_REF_COUNT, $refCount + 1 );
$this->assertConnectionDomain( $conn, $domain );
}
return $conn;
return [ $conn, $domainChanged ];
}
/**
* Sanity check to make sure that the right domain is selected
*
* @param Database $conn
* @param DatabaseDomain $domain
* @throws DBUnexpectedError
*/
private function assertConnectionDomain( Database $conn, DatabaseDomain $domain ) {
if ( !$domain->isCompatible( $conn->getDomainID() ) ) {
throw new UnexpectedValueException(
"Got connection to '{$conn->getDomainID()}', but expected one for '{$domain}'"
);
}
}
public function getServerAttributes( $i ) {
@ -1325,7 +1211,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
* @param int $i Specific server index
* @param DatabaseDomain $domain Domain the connection is for, possibly unspecified
* @param array $lbInfo Additional information for setLBInfo()
* @return IDatabase
* @return Database
* @throws DBAccessError
* @throws InvalidArgumentException
*/
@ -1401,7 +1287,13 @@ class LoadBalancer implements ILoadBalancerForOwner {
// in a request or script and then return soon after in another request or script.
// This requires cooperation with ChronologyProtector and the application wiring.
if ( $conn->isOpen() ) {
$this->connLogger->debug( __METHOD__ . ": opened new connection for $i/$domain" );
$this->lazyLoadReplicationPositions();
} else {
$this->connLogger->warning(
__METHOD__ . ": connection error for $i/{db_domain}",
[ 'db_domain' => $domain->getId() ]
);
}
// Log when many connection are made during a single request/script
@ -1420,6 +1312,8 @@ class LoadBalancer implements ILoadBalancerForOwner {
);
}
$this->assertConnectionDomain( $conn, $domain );
return $conn;
}
@ -1662,17 +1556,17 @@ class LoadBalancer implements ILoadBalancerForOwner {
throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
}
$domain = $conn->getDomainID();
$serverIndex = $conn->getLBInfo( self::INFO_SERVER_INDEX );
if ( $serverIndex === null ) {
throw new RuntimeException( 'Database handle is missing server index' );
throw new UnexpectedValueException( "Handle on '$domain' missing server index" );
}
$srvName = $this->getServerName( $serverIndex );
$domain = $conn->getDomainID();
$found = false;
foreach ( $this->conns as $type => $connsByServer ) {
$key = array_search( $conn, $connsByServer[$serverIndex] ?? [], true );
foreach ( $this->conns as $type => $poolConnsByServer ) {
$key = array_search( $conn, $poolConnsByServer[$serverIndex] ?? [], true );
if ( $key !== false ) {
$found = true;
unset( $this->conns[$type][$serverIndex][$key] );
@ -1682,13 +1576,13 @@ class LoadBalancer implements ILoadBalancerForOwner {
if ( !$found ) {
$this->connLogger->warning(
__METHOD__ .
": got orphaned connection to database $serverIndex/$domain at '$srvName'."
": orphaned connection to database {$this->stringifyConn( $conn )} at '$srvName'."
);
}
$this->connLogger->debug(
__METHOD__ .
": closing connection to database $serverIndex/$domain at '$srvName'."
": closing connection to database {$this->stringifyConn( $conn )} at '$srvName'."
);
$conn->close( __METHOD__ );
@ -2251,8 +2145,8 @@ class LoadBalancer implements ILoadBalancerForOwner {
* @return \Generator|Database[]
*/
private function getOpenConnections() {
foreach ( $this->conns as $connsByServer ) {
foreach ( $connsByServer as $serverConns ) {
foreach ( $this->conns as $poolConnsByServer ) {
foreach ( $poolConnsByServer as $serverConns ) {
foreach ( $serverConns as $conn ) {
yield $conn;
}
@ -2266,12 +2160,10 @@ class LoadBalancer implements ILoadBalancerForOwner {
*/
private function getOpenPrimaryConnections() {
$primaryIndex = $this->getWriterIndex();
foreach ( $this->conns as $connsByServer ) {
if ( isset( $connsByServer[$primaryIndex] ) ) {
/** @var IDatabase $conn */
foreach ( $connsByServer[$primaryIndex] as $conn ) {
yield $conn;
}
foreach ( $this->conns as $poolConnsByServer ) {
/** @var IDatabase $conn */
foreach ( ( $poolConnsByServer[$primaryIndex] ?? [] ) as $conn ) {
yield $conn;
}
}
}
@ -2281,10 +2173,10 @@ class LoadBalancer implements ILoadBalancerForOwner {
* @return \Generator|Database[]
*/
private function getOpenReplicaConnections() {
foreach ( $this->conns as $connsByServer ) {
foreach ( $connsByServer as $i => $serverConns ) {
if ( $i === $this->getWriterIndex() ) {
continue; // skip primary DB
foreach ( $this->conns as $poolConnsByServer ) {
foreach ( $poolConnsByServer as $serverIndex => $serverConns ) {
if ( $serverIndex === $this->getWriterIndex() ) {
continue; // skip primary
}
foreach ( $serverConns as $conn ) {
yield $conn;
@ -2298,8 +2190,8 @@ class LoadBalancer implements ILoadBalancerForOwner {
*/
private function getCurrentConnectionCount() {
$count = 0;
foreach ( $this->conns as $connsByServer ) {
foreach ( $connsByServer as $serverConns ) {
foreach ( $this->conns as $poolConnsByServer ) {
foreach ( $poolConnsByServer as $serverConns ) {
$count += count( $serverConns );
}
}
@ -2420,22 +2312,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
}
public function setLocalDomainPrefix( $prefix ) {
// Find connections to explicit foreign domains still marked as in-use...
$domainsInUse = [];
foreach ( $this->getOpenConnections() as $conn ) {
// Once reuseConnection() is called on a handle, its reference count goes from 1 to 0.
// Until then, it is still in use by the caller (explicitly or via DBConnRef scope).
if ( $conn->getLBInfo( self::INFO_FOREIGN_REF_COUNT ) > 0 ) {
$domainsInUse[] = $conn->getDomainID();
}
}
// Do not switch connections to explicit foreign domains unless marked as safe
if ( $domainsInUse ) {
$domains = implode( ', ', $domainsInUse );
throw new DBUnexpectedError( null,
"Foreign domain connections are still in use ($domains)" );
}
$oldLocalDomain = $this->localDomain;
$this->setLocalDomain( new DatabaseDomain(
$this->localDomain->getDatabase(),
@ -2443,9 +2320,10 @@ class LoadBalancer implements ILoadBalancerForOwner {
$prefix
) );
// Update the prefix for all local connections...
// Update the prefix for existing connections.
// Existing DBConnRef handles will not be affected.
foreach ( $this->getOpenConnections() as $conn ) {
if ( !$conn->getLBInfo( self::INFO_FORIEGN ) ) {
if ( $oldLocalDomain->equals( $conn->getDomainID() ) ) {
$conn->tablePrefix( $prefix );
}
}
@ -2478,7 +2356,7 @@ class LoadBalancer implements ILoadBalancerForOwner {
/**
* @param int $i Server index
* @param string|null $field Server index field [optional]
* @return array|mixed
* @return mixed
* @throws InvalidArgumentException
*/
private function getServerInfoStrict( $i, $field = null ) {
@ -2497,6 +2375,15 @@ class LoadBalancer implements ILoadBalancerForOwner {
return $this->servers[$i];
}
/**
* @param IDatabase $conn
* @return string Desciption of a connection handle for log messages
* @throws InvalidArgumentException
*/
private function stringifyConn( IDatabase $conn ) {
return $conn->getLBInfo( self::INFO_SERVER_INDEX ) . '/' . $conn->getDomainID();
}
/**
* @return string Name of the primary DB server of the relevant DB cluster (e.g. "db1052")
*/

View file

@ -27,8 +27,8 @@ use InvalidArgumentException;
* @ingroup Database
*/
class LoadBalancerSingle extends LoadBalancer {
/** @var IDatabase */
private $db;
/** @var Database */
private $conn;
/**
* You probably want to use {@link newFromConnection} instead.
@ -37,13 +37,13 @@ class LoadBalancerSingle extends LoadBalancer {
* - connection: An IDatabase connection object
*/
public function __construct( array $params ) {
/** @var IDatabase $conn */
/** @var Database $conn */
$conn = $params['connection'] ?? null;
if ( !$conn ) {
throw new InvalidArgumentException( "Missing 'connection' argument." );
}
$this->db = $conn;
$this->conn = $conn;
parent::__construct( [
'servers' => [ [
@ -55,7 +55,7 @@ class LoadBalancerSingle extends LoadBalancer {
'trxProfiler' => $params['trxProfiler'] ?? null,
'srvCache' => $params['srvCache'] ?? null,
'wanCache' => $params['wanCache'] ?? null,
'localDomain' => $params['localDomain'] ?? $this->db->getDomainID(),
'localDomain' => $params['localDomain'] ?? $this->conn->getDomainID(),
'readOnlyReason' => $params['readOnlyReason'] ?? false,
'clusterName' => $params['clusterName'] ?? null,
] );
@ -80,7 +80,15 @@ class LoadBalancerSingle extends LoadBalancer {
}
protected function reallyOpenConnection( $i, DatabaseDomain $domain, array $lbInfo = [] ) {
return $this->db;
foreach ( $lbInfo as $k => $v ) {
$this->conn->setLBInfo( $k, $v );
}
return $this->conn;
}
public function reuseConnection( IDatabase $conn ) {
// do nothing since the connection was injected
}
public function __destruct() {

View file

@ -483,6 +483,7 @@ abstract class MediaWikiIntegrationTestCase extends PHPUnit\Framework\TestCase {
$useTemporaryTables = !$this->getCliArg( 'use-normal-tables' );
$lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
// Need a Database where the DB domain changes during table cloning
$this->db = $lb->getConnectionInternal( DB_PRIMARY );
$this->checkDbIsSupported();

View file

@ -139,8 +139,13 @@ class RevisionStoreDbTest extends MediaWikiIntegrationTestCase {
->getMock();
$lb->method( 'reallyOpenConnection' )->willReturnCallback(
function () use ( $server ) {
return $this->getDatabaseMock( $server );
function ( $i, DatabaseDomain $domain, array $lbInfo ) use ( $server ) {
$conn = $this->getDatabaseMock( $server );
foreach ( $lbInfo as $k => $v ) {
$conn->setLBInfo( $k, $v );
}
return $conn;
}
);

View file

@ -60,6 +60,8 @@ class RevisionStoreTest extends MediaWikiIntegrationTestCase {
->disableAutoReturnValueGeneration()
->disableOriginalConstructor()->getMock();
$db->method( 'getDomainId' )->willReturn( 'fake' );
$this->installMockLoadBalancer( $db );
return $db;
}
@ -86,6 +88,7 @@ class RevisionStoreTest extends MediaWikiIntegrationTestCase {
$db = $this->installMockDatabase();
// First query is by page ID. Return result
$db->expects( $this->at( 0 ) )
->method( 'selectRow' )
->with(
@ -136,6 +139,7 @@ class RevisionStoreTest extends MediaWikiIntegrationTestCase {
->willReturn( false );
// Retrying on master...
// Third query, by page_id again
$db->expects( $this->at( 2 ) )
->method( 'selectRow' )
@ -173,6 +177,7 @@ class RevisionStoreTest extends MediaWikiIntegrationTestCase {
->willReturn( false );
// Second select using rev_id, faking no result (db lag?)
$db->expects( $this->at( 1 ) )
->method( 'selectRow' )
->with(
@ -219,6 +224,7 @@ class RevisionStoreTest extends MediaWikiIntegrationTestCase {
->willReturn( false );
// Retrying on master...
// Third query, by page_id again, still no result
$db->expects( $this->at( 2 ) )
->method( 'selectRow' )
@ -229,7 +235,7 @@ class RevisionStoreTest extends MediaWikiIntegrationTestCase {
)
->willReturn( false );
// Forth query, by rev_id agin
// Forth query, by rev_id again
$db->expects( $this->at( 3 ) )
->method( 'selectRow' )
->with(

View file

@ -598,7 +598,7 @@ class LoadBalancerTest extends MediaWikiIntegrationTestCase {
$conn2->ensureConnection();
$count = iterator_count( $lbWrapper->getOpenPrimaryConnections() );
$this->assertSame( 2, $count, 'Connection handle count' );
$this->assertSame( 1, $count, 'Connection handle count' );
$tlCalls = 0;
$lb->setTransactionListener( 'test-listener', static function () use ( &$tlCalls ) {
@ -626,7 +626,7 @@ class LoadBalancerTest extends MediaWikiIntegrationTestCase {
$lb->runPrimaryTransactionListenerCallbacks();
$this->assertSame( array_fill_keys( [ 'a', 'b', 'c', 'd' ], 1 ), $bc );
$this->assertSame( 2, $tlCalls );
$this->assertSame( 1, $tlCalls );
$tlCalls = 0;
$lb->beginPrimaryChanges( __METHOD__ );
@ -650,7 +650,7 @@ class LoadBalancerTest extends MediaWikiIntegrationTestCase {
$lb->runPrimaryTransactionListenerCallbacks();
$this->assertSame( array_fill_keys( [ 'a', 'b', 'c', 'd' ], 1 ), $ac );
$this->assertSame( 2, $tlCalls );
$this->assertSame( 1, $tlCalls );
$conn1->lock( 'test_lock_' . mt_rand(), __METHOD__, 0 );
$lb->flushPrimarySessions( __METHOD__ );

View file

@ -337,6 +337,7 @@ class MediaWikiIntegrationTestCaseTest extends MediaWikiIntegrationTestCase {
// Make an untracked DB_PRIMARY connection
$lb = $this->getServiceContainer()->getDBLoadBalancerFactory()->newMainLB();
// Need a Database where the DB domain changes during table cloning
$db = $lb->getConnectionInternal( DB_PRIMARY );
$this->assertNotSame( $this->db, $db );

View file

@ -111,7 +111,7 @@ class DBConnRefTest extends PHPUnit\Framework\TestCase {
$ref = new DBConnRef(
$lb,
[ DB_PRIMARY, [ 'test' ], 'dummy', ILoadBalancer::CONN_TRX_AUTOCOMMIT ],
[ DB_PRIMARY, [ 'test' ], 'dummy', $lb::CONN_TRX_AUTOCOMMIT ],
DB_PRIMARY
);
@ -120,7 +120,7 @@ class DBConnRefTest extends PHPUnit\Framework\TestCase {
$ref2 = new DBConnRef(
$lb,
[ DB_PRIMARY, [ 'test' ], 'dummy', ILoadBalancer::CONN_TRX_AUTOCOMMIT ],
[ DB_PRIMARY, [ 'test' ], 'dummy', $lb::CONN_TRX_AUTOCOMMIT ],
DB_REPLICA
);
$this->assertEquals( DB_REPLICA, $ref2->getReferenceRole() );