rdbms: make resolveDomainID() use more consistent in LoadBalancer
Use the method in getConnection()/openConnection() and make it handle the local domain alias as well. Also: * Move some code to a new getConnectionIndex() method. * Make the text of various index/domain exceptions more consistent. * Improve some auto-commit flag code comments. Change-Id: I7e0d4f2134ee91ad60b0d34bf01e05115193b04a
This commit is contained in:
parent
74881a7eac
commit
8669e0fbe6
3 changed files with 106 additions and 94 deletions
|
|
@ -131,7 +131,7 @@ interface ILoadBalancer {
|
|||
|
||||
/**
|
||||
* @param DatabaseDomain|string|bool $domain Database domain
|
||||
* @return string Value of $domain if provided or the local domain otherwise
|
||||
* @return string Value of $domain if it is foreign or the local domain otherwise
|
||||
* @since 1.32
|
||||
*/
|
||||
public function resolveDomainID( $domain );
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
/** @var callable Deprecation logger */
|
||||
private $deprecationLogger;
|
||||
|
||||
/** @var DatabaseDomain Local Domain ID and default for selectDB() calls */
|
||||
/** @var DatabaseDomain Local DB domain ID and default for selectDB() calls */
|
||||
private $localDomain;
|
||||
|
||||
/** @var Database[][][] Map of (connection category => server index => IDatabase[]) */
|
||||
|
|
@ -82,7 +82,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
private $waitTimeout;
|
||||
/** @var array The LoadMonitor configuration */
|
||||
private $loadMonitorConfig;
|
||||
/** @var string Alternate ID string for the domain instead of DatabaseDomain::getId() */
|
||||
/** @var string Alternate local DB domain instead of DatabaseDomain::getId() */
|
||||
private $localDomainIdAlias;
|
||||
/** @var int */
|
||||
private $maxLag;
|
||||
|
|
@ -163,7 +163,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
|
||||
public function __construct( array $params ) {
|
||||
if ( !isset( $params['servers'] ) ) {
|
||||
throw new InvalidArgumentException( __CLASS__ . ': missing servers parameter' );
|
||||
throw new InvalidArgumentException( __CLASS__ . ': missing "servers" parameter' );
|
||||
}
|
||||
$this->servers = $params['servers'];
|
||||
foreach ( $this->servers as $i => $server ) {
|
||||
|
|
@ -257,7 +257,12 @@ class LoadBalancer implements ILoadBalancer {
|
|||
}
|
||||
|
||||
public function resolveDomainID( $domain ) {
|
||||
return ( $domain !== false ) ? (string)$domain : $this->getLocalDomainID();
|
||||
if ( $domain === $this->localDomainIdAlias || $domain === false ) {
|
||||
// Local connection requested via some backwards-compatibility domain alias
|
||||
return $this->getLocalDomainID();
|
||||
}
|
||||
|
||||
return (string)$domain;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -341,6 +346,52 @@ class LoadBalancer implements ILoadBalancer {
|
|||
return ArrayUtils::pickRandom( $loads );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $i
|
||||
* @param array $groups
|
||||
* @param string|bool $domain
|
||||
* @return int
|
||||
*/
|
||||
private function getConnectionIndex( $i, $groups, $domain ) {
|
||||
// Check one "group" per default: the generic pool
|
||||
$defaultGroups = $this->defaultGroup ? [ $this->defaultGroup ] : [ false ];
|
||||
|
||||
$groups = ( $groups === false || $groups === [] )
|
||||
? $defaultGroups
|
||||
: (array)$groups;
|
||||
|
||||
if ( $i == self::DB_MASTER ) {
|
||||
$i = $this->getWriterIndex();
|
||||
} elseif ( $i == self::DB_REPLICA ) {
|
||||
# Try to find an available server in any the query groups (in order)
|
||||
foreach ( $groups as $group ) {
|
||||
$groupIndex = $this->getReaderIndex( $group, $domain );
|
||||
if ( $groupIndex !== false ) {
|
||||
$i = $groupIndex;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Operation-based index
|
||||
if ( $i == self::DB_REPLICA ) {
|
||||
$this->lastError = 'Unknown error'; // reset error string
|
||||
# Try the general server pool if $groups are unavailable.
|
||||
$i = ( $groups === [ false ] )
|
||||
? false // don't bother with this if that is what was tried above
|
||||
: $this->getReaderIndex( false, $domain );
|
||||
# Couldn't find a working server in getReaderIndex()?
|
||||
if ( $i === false ) {
|
||||
$this->lastError = 'No working replica DB server: ' . $this->lastError;
|
||||
// Throw an exception
|
||||
$this->reportConnectionError();
|
||||
return null; // not reached
|
||||
}
|
||||
}
|
||||
|
||||
return $i;
|
||||
}
|
||||
|
||||
public function getReaderIndex( $group = false, $domain = false ) {
|
||||
if ( count( $this->servers ) == 1 ) {
|
||||
// Skip the load balancing if there's only one server
|
||||
|
|
@ -407,7 +458,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
*/
|
||||
private function pickReaderIndex( array $loads, $domain = false ) {
|
||||
if ( $loads === [] ) {
|
||||
throw new InvalidArgumentException( "Empty server array given to LoadBalancer" );
|
||||
throw new InvalidArgumentException( "Server configuration array is empty" );
|
||||
}
|
||||
|
||||
/** @var int|bool $i Index of selected server */
|
||||
|
|
@ -687,13 +738,11 @@ class LoadBalancer implements ILoadBalancer {
|
|||
|
||||
public function getConnection( $i, $groups = [], $domain = false, $flags = 0 ) {
|
||||
if ( $i === null || $i === false ) {
|
||||
throw new InvalidArgumentException( 'Attempt to call ' . __METHOD__ .
|
||||
' with invalid server index' );
|
||||
throw new InvalidArgumentException( "Cannot connect without a server index" );
|
||||
}
|
||||
|
||||
if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
|
||||
$domain = false; // local connection requested
|
||||
}
|
||||
$domain = $this->resolveDomainID( $domain );
|
||||
$masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
|
||||
|
||||
if ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) === self::CONN_TRX_AUTOCOMMIT ) {
|
||||
// Assuming all servers are of the same type (or similar), which is overwhelmingly
|
||||
|
|
@ -712,62 +761,28 @@ class LoadBalancer implements ILoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
// Check one "group" per default: the generic pool
|
||||
$defaultGroups = $this->defaultGroup ? [ $this->defaultGroup ] : [ false ];
|
||||
|
||||
$groups = ( $groups === false || $groups === [] )
|
||||
? $defaultGroups
|
||||
: (array)$groups;
|
||||
|
||||
$masterOnly = ( $i == self::DB_MASTER || $i == $this->getWriterIndex() );
|
||||
$oldConnsOpened = $this->connsOpened; // connections open now
|
||||
|
||||
if ( $i == self::DB_MASTER ) {
|
||||
$i = $this->getWriterIndex();
|
||||
} elseif ( $i == self::DB_REPLICA ) {
|
||||
# Try to find an available server in any the query groups (in order)
|
||||
foreach ( $groups as $group ) {
|
||||
$groupIndex = $this->getReaderIndex( $group, $domain );
|
||||
if ( $groupIndex !== false ) {
|
||||
$i = $groupIndex;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Operation-based index
|
||||
if ( $i == self::DB_REPLICA ) {
|
||||
$this->lastError = 'Unknown error'; // reset error string
|
||||
# Try the general server pool if $groups are unavailable.
|
||||
$i = ( $groups === [ false ] )
|
||||
? false // don't bother with this if that is what was tried above
|
||||
: $this->getReaderIndex( false, $domain );
|
||||
# Couldn't find a working server in getReaderIndex()?
|
||||
if ( $i === false ) {
|
||||
$this->lastError = 'No working replica DB server: ' . $this->lastError;
|
||||
// Throw an exception
|
||||
$this->reportConnectionError();
|
||||
return null; // not reached
|
||||
}
|
||||
}
|
||||
|
||||
# Now we have an explicit index into the servers array
|
||||
$conn = $this->openConnection( $i, $domain, $flags );
|
||||
// Number of connections made before getting the server index and handle
|
||||
$priorConnectionsMade = $this->connsOpened;
|
||||
// Decide which server to use (might trigger new connections)
|
||||
$serverIndex = $this->getConnectionIndex( $i, $groups, $domain );
|
||||
// Get an open connection to that server (might trigger a new connection)
|
||||
$conn = $this->openConnection( $serverIndex, $domain, $flags );
|
||||
if ( !$conn ) {
|
||||
// Throw an exception
|
||||
$this->reportConnectionError();
|
||||
return null; // not reached
|
||||
return null; // unreachable due to exception
|
||||
}
|
||||
|
||||
# Profile any new connections that happen
|
||||
if ( $this->connsOpened > $oldConnsOpened ) {
|
||||
// Profile any new connections caused by this method
|
||||
if ( $this->connsOpened > $priorConnectionsMade ) {
|
||||
$host = $conn->getServer();
|
||||
$dbname = $conn->getDBname();
|
||||
$this->trxProfiler->recordConnection( $host, $dbname, $masterOnly );
|
||||
}
|
||||
|
||||
if ( $masterOnly ) {
|
||||
# Make master-requested DB handles inherit any read-only mode setting
|
||||
// If the load balancer is read-only, perhaps due to replication lag, then master
|
||||
// DB handles will reflect that. Note that Database::assertIsWritableMaster() takes
|
||||
// care of replica DB handles whereas getReadOnlyReason() would cause infinite loops.
|
||||
$conn->setLBInfo( 'readOnlyReason', $this->getReadOnlyReason( $domain, $conn ) );
|
||||
}
|
||||
|
||||
|
|
@ -813,11 +828,11 @@ class LoadBalancer implements ILoadBalancer {
|
|||
|
||||
$domain = $conn->getDomainID();
|
||||
if ( !isset( $this->conns[$connInUseKey][$serverIndex][$domain] ) ) {
|
||||
throw new InvalidArgumentException( __METHOD__ .
|
||||
": connection $serverIndex/$domain not found; it may have already been freed." );
|
||||
throw new InvalidArgumentException(
|
||||
"Connection $serverIndex/$domain not found; it may have already been freed" );
|
||||
} elseif ( $this->conns[$connInUseKey][$serverIndex][$domain] !== $conn ) {
|
||||
throw new InvalidArgumentException( __METHOD__ .
|
||||
": connection $serverIndex/$domain mismatched; it may have already been freed." );
|
||||
throw new InvalidArgumentException(
|
||||
"Connection $serverIndex/$domain mismatched; it may have already been freed" );
|
||||
}
|
||||
|
||||
$conn->setLBInfo( 'foreignPoolRefCount', --$refCount );
|
||||
|
|
@ -867,45 +882,35 @@ class LoadBalancer implements ILoadBalancer {
|
|||
}
|
||||
|
||||
public function openConnection( $i, $domain = false, $flags = 0 ) {
|
||||
if ( $this->localDomain->equals( $domain ) || $domain === $this->localDomainIdAlias ) {
|
||||
$domain = false; // local connection requested
|
||||
}
|
||||
$domain = $this->resolveDomainID( $domain );
|
||||
|
||||
if ( !$this->connectionAttempted && $this->chronologyCallback ) {
|
||||
$this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
|
||||
// Load any "waitFor" positions before connecting so that doWait() is triggered
|
||||
$this->connLogger->debug( __METHOD__ . ': calling initLB() before first connection.' );
|
||||
$this->connectionAttempted = true;
|
||||
( $this->chronologyCallback )( $this );
|
||||
}
|
||||
|
||||
// Check if an auto-commit connection is being requested. If so, it will not reuse the
|
||||
// main set of DB connections but rather its own pool since:
|
||||
// a) those are usually set to implicitly use transaction rounds via DBO_TRX
|
||||
// b) those must support the use of explicit transaction rounds via beginMasterChanges()
|
||||
$autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
|
||||
|
||||
if ( $domain !== false ) {
|
||||
// Connection is to a foreign domain
|
||||
$conn = $this->openForeignConnection( $i, $domain, $flags );
|
||||
} else {
|
||||
// Connection is to the local domain
|
||||
$conn = $this->openLocalConnection( $i, $flags );
|
||||
}
|
||||
$conn = $this->localDomain->equals( $domain )
|
||||
? $this->openLocalConnection( $i, $flags )
|
||||
: $this->openForeignConnection( $i, $domain, $flags );
|
||||
|
||||
if ( $conn instanceof IDatabase && !$conn->isOpen() ) {
|
||||
// Connection was made but later unrecoverably lost for some reason.
|
||||
// Do not return a handle that will just throw exceptions on use,
|
||||
// but let the calling code (e.g. getReaderIndex) try another server.
|
||||
// See DatabaseMysqlBase::ping() for how this can happen.
|
||||
$this->errorConnection = $conn;
|
||||
$conn = false;
|
||||
}
|
||||
|
||||
if ( $autoCommit && $conn instanceof IDatabase ) {
|
||||
if (
|
||||
$conn instanceof IDatabase &&
|
||||
( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT )
|
||||
) {
|
||||
if ( $conn->trxLevel() ) { // sanity
|
||||
throw new DBUnexpectedError(
|
||||
$conn,
|
||||
__METHOD__ . ': CONN_TRX_AUTOCOMMIT handle has a transaction.'
|
||||
'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction'
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -928,6 +933,8 @@ class LoadBalancer implements ILoadBalancer {
|
|||
* @return Database
|
||||
*/
|
||||
private function openLocalConnection( $i, $flags = 0 ) {
|
||||
// Connection handles required to be in auto-commit mode use a separate connection
|
||||
// pool since the main pool is effected by implicit and explicit transaction rounds
|
||||
$autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
|
||||
|
||||
$connKey = $autoCommit ? self::KEY_LOCAL_NOROUND : self::KEY_LOCAL;
|
||||
|
|
@ -935,7 +942,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
$conn = $this->conns[$connKey][$i][0];
|
||||
} else {
|
||||
if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
|
||||
throw new InvalidArgumentException( "No server with index '$i'." );
|
||||
throw new InvalidArgumentException( "No server with index '$i'" );
|
||||
}
|
||||
// Open a new connection
|
||||
$server = $this->servers[$i];
|
||||
|
|
@ -962,7 +969,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
) {
|
||||
throw new UnexpectedValueException(
|
||||
"Got connection to '{$conn->getDomainID()}', " .
|
||||
"but expected local domain ('{$this->localDomain}')." );
|
||||
"but expected local domain ('{$this->localDomain}')" );
|
||||
}
|
||||
|
||||
return $conn;
|
||||
|
|
@ -992,6 +999,8 @@ class LoadBalancer implements ILoadBalancer {
|
|||
*/
|
||||
private function openForeignConnection( $i, $domain, $flags = 0 ) {
|
||||
$domainInstance = DatabaseDomain::newFromId( $domain );
|
||||
// Connection handles required to be in auto-commit mode use a separate connection
|
||||
// pool since the main pool is effected by implicit and explicit transaction rounds
|
||||
$autoCommit = ( ( $flags & self::CONN_TRX_AUTOCOMMIT ) == self::CONN_TRX_AUTOCOMMIT );
|
||||
|
||||
if ( $autoCommit ) {
|
||||
|
|
@ -1047,7 +1056,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
|
||||
if ( !$conn ) {
|
||||
if ( !isset( $this->servers[$i] ) || !is_array( $this->servers[$i] ) ) {
|
||||
throw new InvalidArgumentException( "No server with index '$i'." );
|
||||
throw new InvalidArgumentException( "No server with index '$i'" );
|
||||
}
|
||||
// Open a new connection
|
||||
$server = $this->servers[$i];
|
||||
|
|
@ -1071,7 +1080,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
// Final sanity check to make sure the right domain is selected
|
||||
if ( !$domainInstance->isCompatible( $conn->getDomainID() ) ) {
|
||||
throw new UnexpectedValueException(
|
||||
"Got connection to '{$conn->getDomainID()}', but expected '$domain'." );
|
||||
"Got connection to '{$conn->getDomainID()}', but expected '$domain'" );
|
||||
}
|
||||
// Increment reference count
|
||||
$refCount = $conn->getLBInfo( 'foreignPoolRefCount' );
|
||||
|
|
@ -1301,7 +1310,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
public function closeConnection( IDatabase $conn ) {
|
||||
if ( $conn instanceof DBConnRef ) {
|
||||
// Avoid calling close() but still leaving the handle in the pool
|
||||
throw new RuntimeException( __METHOD__ . ': got DBConnRef instance.' );
|
||||
throw new RuntimeException( 'Cannot close DBConnRef instance; it must be shareable' );
|
||||
}
|
||||
|
||||
$serverIndex = $conn->getLBInfo( 'serverIndex' );
|
||||
|
|
@ -1372,7 +1381,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
if ( $limit > 0 && $time > $limit ) {
|
||||
throw new DBTransactionSizeError(
|
||||
$conn,
|
||||
"Transaction spent $time second(s) in writes, exceeding the limit of $limit.",
|
||||
"Transaction spent $time second(s) in writes, exceeding the limit of $limit",
|
||||
[ $time, $limit ]
|
||||
);
|
||||
}
|
||||
|
|
@ -1381,7 +1390,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
if ( $conn->writesOrCallbacksPending() && !$conn->ping() ) {
|
||||
throw new DBTransactionError(
|
||||
$conn,
|
||||
"A connection to the {$conn->getDBname()} database was lost before commit."
|
||||
"A connection to the {$conn->getDBname()} database was lost before commit"
|
||||
);
|
||||
}
|
||||
} );
|
||||
|
|
@ -1392,7 +1401,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
if ( $this->trxRoundId !== false ) {
|
||||
throw new DBTransactionError(
|
||||
null,
|
||||
"$fname: Transaction round '{$this->trxRoundId}' already started."
|
||||
"$fname: Transaction round '{$this->trxRoundId}' already started"
|
||||
);
|
||||
}
|
||||
$this->assertTransactionRoundStage( self::ROUND_CURSORY );
|
||||
|
|
@ -1683,8 +1692,12 @@ class LoadBalancer implements ILoadBalancer {
|
|||
}
|
||||
|
||||
public function getLaggedReplicaMode( $domain = false ) {
|
||||
// No-op if there is only one DB (also avoids recursion)
|
||||
if ( !$this->laggedReplicaMode && $this->getServerCount() > 1 ) {
|
||||
if (
|
||||
// Avoid recursion if there is only one DB
|
||||
$this->getServerCount() > 1 &&
|
||||
// Stay in lagged replica mode during the load balancer instance lifetime
|
||||
!$this->laggedReplicaMode
|
||||
) {
|
||||
try {
|
||||
// See if laggedReplicaMode gets set
|
||||
$conn = $this->getConnection( self::DB_REPLICA, false, $domain );
|
||||
|
|
@ -1959,7 +1972,7 @@ class LoadBalancer implements ILoadBalancer {
|
|||
if ( $domainsInUse ) {
|
||||
$domains = implode( ', ', $domainsInUse );
|
||||
throw new DBUnexpectedError( null,
|
||||
"Foreign domain connections are still in use ($domains)." );
|
||||
"Foreign domain connections are still in use ($domains)" );
|
||||
}
|
||||
|
||||
$this->setLocalDomain( new DatabaseDomain(
|
||||
|
|
|
|||
|
|
@ -336,8 +336,7 @@ class LoadBalancerTest extends MediaWikiTestCase {
|
|||
$this->fail( "No exception thrown." );
|
||||
} catch ( DBUnexpectedError $e ) {
|
||||
$this->assertEquals(
|
||||
'Wikimedia\Rdbms\LoadBalancer::openConnection: ' .
|
||||
'CONN_TRX_AUTOCOMMIT handle has a transaction.',
|
||||
'Handle requested with CONN_TRX_AUTOCOMMIT yet it has a transaction',
|
||||
$e->getMessage()
|
||||
);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue