rdbms: make automatic connection recovery more robust

Rename canRecoverFromDisconnect() in order to better describe
its function. Make it use the transaction ID and query walltime
as arguments and return an ERR_* class constant instead of a bool.
Avoid retries of slow queries that yield lost connection errors.

Track session state errors caused by the loss of named locks or
temp tables (e.g. during connection loss). Such errors will prevent
further queries except for rollback() and flushSession(), which must
be issued to resolve the error.

Add flushPrimarySessions() methods to LBFactory/LoadBalancer
and use it in places where session state loss is meant to be
safely aknowledged.

Change-Id: I60532f86e629c83b357d4832d1963eca17752944
This commit is contained in:
Aaron Schulz 2022-03-17 17:08:39 -07:00 committed by Tim Starling
parent f9e634ece8
commit db36853718
19 changed files with 848 additions and 131 deletions

View file

@ -1801,6 +1801,7 @@ $wgAutoloadLocalClasses = [
'Wikimedia\\Rdbms\\Blob' => __DIR__ . '/includes/libs/rdbms/encasing/Blob.php',
'Wikimedia\\Rdbms\\ChronologyProtector' => __DIR__ . '/includes/libs/rdbms/ChronologyProtector.php',
'Wikimedia\\Rdbms\\ConnectionManager' => __DIR__ . '/includes/libs/rdbms/connectionmanager/ConnectionManager.php',
'Wikimedia\\Rdbms\\CriticalSessionInfo' => __DIR__ . '/includes/libs/rdbms/database/utils/CriticalSessionInfo.php',
'Wikimedia\\Rdbms\\DBAccessError' => __DIR__ . '/includes/libs/rdbms/exception/DBAccessError.php',
'Wikimedia\\Rdbms\\DBConnRef' => __DIR__ . '/includes/libs/rdbms/database/DBConnRef.php',
'Wikimedia\\Rdbms\\DBConnectionError' => __DIR__ . '/includes/libs/rdbms/exception/DBConnectionError.php',
@ -1813,6 +1814,7 @@ $wgAutoloadLocalClasses = [
'Wikimedia\\Rdbms\\DBReadOnlyError' => __DIR__ . '/includes/libs/rdbms/exception/DBReadOnlyError.php',
'Wikimedia\\Rdbms\\DBReadOnlyRoleError' => __DIR__ . '/includes/libs/rdbms/exception/DBReadOnlyRoleError.php',
'Wikimedia\\Rdbms\\DBReplicationWaitError' => __DIR__ . '/includes/libs/rdbms/exception/DBReplicationWaitError.php',
'Wikimedia\\Rdbms\\DBSessionStateError' => __DIR__ . '/includes/libs/rdbms/exception/DBSessionStateError.php',
'Wikimedia\\Rdbms\\DBTransactionError' => __DIR__ . '/includes/libs/rdbms/exception/DBTransactionError.php',
'Wikimedia\\Rdbms\\DBTransactionSizeError' => __DIR__ . '/includes/libs/rdbms/exception/DBTransactionSizeError.php',
'Wikimedia\\Rdbms\\DBTransactionStateError' => __DIR__ . '/includes/libs/rdbms/exception/DBTransactionStateError.php',

View file

@ -133,7 +133,9 @@ class MWExceptionHandler {
// to rollback some databases due to connection issues or exceptions.
// However, any sensible DB driver will rollback implicitly anyway.
try {
$services->getDBLoadBalancerFactory()->rollbackPrimaryChanges( __METHOD__ );
$lbFactory = $services->getDBLoadBalancerFactory();
$lbFactory->rollbackPrimaryChanges( __METHOD__ );
$lbFactory->flushPrimarySessions( __METHOD__ );
} catch ( DBError $e2 ) {
// If the DB is unreachable, rollback() will throw an error
// and the error report() method might need messages from the DB,

View file

@ -617,6 +617,10 @@ class DBConnRef implements IDatabase {
return $this->__call( __FUNCTION__, func_get_args() );
}
public function flushSession( $fname = __METHOD__, $owner = null ) {
return $this->__call( __FUNCTION__, func_get_args() );
}
public function flushSnapshot( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) {
return $this->__call( __FUNCTION__, func_get_args() );
}

View file

@ -148,6 +148,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
/** @var DBUnexpectedError|null Last unresolved critical section error */
private $csmError;
/** var int An identifier for this class instance */
private $id;
/** @var int|null Integer ID of the managing LBFactory instance or null if none */
private $ownerId;
@ -163,6 +165,20 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
/** @var int New Database instance will already be connected when returned */
public const NEW_CONNECTED = 1;
/** No errors occurred during the query */
protected const ERR_NONE = 0;
/** Retry query due to a connection loss detected while sending the query (session intact) */
protected const ERR_RETRY_QUERY = 1;
/** Abort query (no retries) due to a statement rollback (session/transaction intact) */
protected const ERR_ABORT_QUERY = 2;
/** Abort any current transaction, by rolling it back, due to an error during the query */
protected const ERR_ABORT_TRX = 4;
/** Abort and reset session due to server-side session-level state loss (locks, temp tables) */
protected const ERR_ABORT_SESSION = 8;
/** Assume that queries taking this long to yield connection loss errors are at fault */
protected const DROPPED_CONN_BLAME_THRESHOLD_SEC = 3.0;
/** @var string Idiom used when a cancelable atomic section started the transaction */
private const NOT_APPLICABLE = 'n/a';
@ -264,6 +280,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$params['tablePrefix']
);
static $nextId;
$this->id = $nextId = ( is_int( $nextId ) ? $nextId++ : mt_rand() );
$this->ownerId = $params['ownerId'] ?? null;
}
@ -554,6 +572,22 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
return $this->transactionManager->trxStatus();
}
/**
* Get important session state that cannot be recovered upon connection loss
*
* @return CriticalSessionInfo
*/
private function getCriticalSessionInfo(): CriticalSessionInfo {
return new CriticalSessionInfo(
$this->transactionManager->getTrxId(),
$this->transactionManager->explicitTrxActive(),
$this->transactionManager->pendingWriteCallers(),
$this->transactionManager->pendingPreCommitCallbackCallers(),
$this->sessionNamedLocks,
$this->sessionTempTables
);
}
public function tablePrefix( $prefix = null ) {
$old = $this->currentDomain->getTablePrefix();
@ -834,7 +868,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
[
'db_server' => $this->getServerName(),
'db_name' => $this->getDBname(),
'db_user' => $this->connectionParams[self::CONN_USER],
'db_user' => $this->connectionParams[self::CONN_USER] ?? null,
],
$extras
);
@ -1167,15 +1201,20 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
public function query( $sql, $fname = __METHOD__, $flags = self::QUERY_NORMAL ) {
$flags = (int)$flags; // b/c; this field used to be a bool
// Double check that the SQL query is appropriate in the current context and is
// allowed for an outside caller (e.g. does not break transaction/session tracking).
// allowed for an outside caller (e.g. does not break session/transaction tracking).
$this->assertQueryIsCurrentlyAllowed( $sql, $fname );
// Send the query to the server and fetch any corresponding errors
list( $ret, $err, $errno, $unignorable ) = $this->executeQuery( $sql, $fname, $flags );
list( $ret, $err, $errno, $errFlags ) = $this->executeQuery( $sql, $fname, $flags );
if ( $ret === false ) {
$ignoreErrors = $this->fieldHasBit( $flags, self::QUERY_SILENCE_ERRORS );
// Throw an error unless both the ignore flag was set and a rollback is not needed
$this->reportQueryError( $err, $errno, $sql, $fname, $ignoreErrors && !$unignorable );
// An error occurred; log and report it as needed. Errors that corrupt the state of
// the transaction/session cannot be silenced from the client.
$ignore = (
$this->fieldHasBit( $flags, self::QUERY_SILENCE_ERRORS ) &&
!$this->fieldHasBit( $errFlags, self::ERR_ABORT_SESSION ) &&
!$this->fieldHasBit( $errFlags, self::ERR_ABORT_TRX )
);
$this->reportQueryError( $err, $errno, $sql, $fname, $ignore );
}
return $ret;
@ -1191,21 +1230,19 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
*
* This is meant for internal use with Database subclasses.
*
* @param string $sql Original SQL query
* @param string $sql Original SQL statement query
* @param string $fname Name of the calling function
* @param int $flags Bit field of class QUERY_* constants
* @return array An n-tuple of:
* - mixed|bool: An object, resource, or true on success; false on failure
* - string: The result of calling lastError()
* - int: The result of calling lastErrno()
* - bool: Whether a rollback is needed to allow future non-rollback queries
* - int: Bit field of ERR_* class constants
* @throws DBUnexpectedError
*/
final protected function executeQuery( $sql, $fname, $flags ) {
$this->assertHasConnectionHandle();
$priorTransaction = $this->trxLevel();
if ( $this->isWriteQuery( $sql, $flags ) ) {
// Do not treat temporary table writes as "meaningful writes" since they are only
// visible to one session and are not permanent. Profile them as reads. Integration
@ -1216,15 +1253,16 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
foreach ( $tempTableChanges as list( $tmpType ) ) {
$isPermWrite = $isPermWrite || ( $tmpType !== self::TEMP_NORMAL );
}
// Permit temporary table writes on replica DB connections
// but require a writable primary DB connection for any persistent writes.
if ( $isPermWrite ) {
$this->assertIsWritablePrimary();
// DBConnRef uses QUERY_REPLICA_ROLE to enforce the replica role for raw SQL queries
// DBConnRef uses QUERY_REPLICA_ROLE to enforce replica roles for raw SQL queries
if ( $this->fieldHasBit( $flags, self::QUERY_REPLICA_ROLE ) ) {
throw new DBReadOnlyRoleError( $this, "Cannot write; target role is DB_REPLICA" );
throw new DBReadOnlyRoleError(
$this,
"Cannot write; target role is DB_REPLICA"
);
}
}
} else {
@ -1239,8 +1277,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
// Whether a silent retry attempt is left for recoverable connection loss errors
$retryLeft = !$this->fieldHasBit( $flags, self::QUERY_NO_RETRY );
$corruptedTrx = false;
$cs = $this->commenceCriticalSection( __METHOD__ );
do {
@ -1250,46 +1286,43 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$retryLeft = false;
}
// Send the query to the server, fetching any results and corresponding errors
list( $ret, $err, $errno, $recoverableSR, $recoverableCL, $reconnected ) =
$this->executeQueryAttempt( $sql, $commentedSql, $isPermWrite, $fname, $flags );
// Send the query to the server, fetching any results and corresponding errors.
// Silently retry the query if the error was just a recoverable connection loss.
list( $ret, $err, $errno, $errflags ) = $this->executeQueryAttempt(
$sql,
$commentedSql,
$isPermWrite,
$fname,
$flags
);
// Silently retry the query if the error was just a recoverable connection loss
if ( !$retryLeft ) {
break;
}
$retryLeft = false;
} while ( $ret === false && $recoverableCL && $reconnected );
} while ( $this->fieldHasBit( $errflags, self::ERR_RETRY_QUERY ) );
// Register creation and dropping of temporary tables
$this->registerTempWrites( $ret, $tempTableChanges );
if ( $ret === false && $priorTransaction ) {
if ( $recoverableSR ) {
# We're ignoring an error that caused just the current query to be aborted.
# But log the cause so we can log a deprecation notice if a caller actually
# does ignore it.
$this->transactionManager->setTrxStatusIgnoredCause( [ $err, $errno, $fname ] );
} elseif ( !$recoverableCL ) {
# Either the query was aborted or all queries after BEGIN where aborted.
# In the first case, the only options going forward are (a) ROLLBACK, or
# (b) ROLLBACK TO SAVEPOINT (if one was set). If the later case, the only
# option is ROLLBACK, since the snapshots would have been released.
$corruptedTrx = true; // cannot recover
$trxError = $this->getQueryException( $err, $errno, $sql, $fname );
$this->transactionManager->setTransactionError( $trxError );
}
}
$this->completeCriticalSection( __METHOD__, $cs );
return [ $ret, $err, $errno, $corruptedTrx ];
return [ $ret, $err, $errno, $errflags ];
}
/**
* Wrapper for doQuery() that handles DBO_TRX, profiling, logging, affected row count
* tracking, and reconnects (without retry) on query failure due to connection loss
* Wrapper for doQuery() that handles a single SQL statement query attempt
*
* @param string $sql Original SQL query
* @param string $commentedSql SQL query with debugging/trace comment
* This method handles profiling, debug logging, reconnection and the tracking of:
* - write callers
* - last write time
* - affected row count of the last write
* - whether writes occured in a transaction
*
* This method does *not* handle DBO_TRX transaction logic *nor* query retries.
*
* @param string $sql Original SQL statement query
* @param string $commentedSql SQL statement query with debugging/trace comment
* @param bool $isPermWrite Whether the query is a (non-temporary table) write
* @param string $fname Name of the calling function
* @param int $flags Bit field of class QUERY_* constants
@ -1297,13 +1330,12 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
* - mixed|bool: An object, resource, or true on success; false on failure
* - string: The result of calling lastError()
* - int: The result of calling lastErrno()
* - bool: Whether a statement rollback error occurred
* - bool: Whether a disconnect *both* happened *and* was recoverable
* - bool: Whether a reconnection attempt was *both* made *and* succeeded
* - int: Bit field of ERR_* class constants
* @throws DBUnexpectedError
*/
private function executeQueryAttempt( $sql, $commentedSql, $isPermWrite, $fname, $flags ) {
$priorWritesPending = $this->writesOrCallbacksPending();
// Transaction attributes before issuing this query
$priorSessInfo = $this->getCriticalSessionInfo();
// Keep track of whether the transaction has write queries pending
if ( $isPermWrite ) {
@ -1314,29 +1346,25 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
);
}
$prefix = $this->topologyRole === IDatabase::ROLE_STREAMING_MASTER ? 'query-m: ' : 'query: ';
$prefix = ( $this->topologyRole === self::ROLE_STREAMING_MASTER ) ? 'query-m: ' : 'query: ';
$generalizedSql = new GeneralizedSql( $commentedSql, $prefix );
$startTime = microtime( true );
$ps = $this->profiler
? ( $this->profiler )( $generalizedSql->stringify() )
: null;
$ps = $this->profiler ? ( $this->profiler )( $generalizedSql->stringify() ) : null;
$this->affectedRowCount = null;
$this->lastQuery = $sql;
$ret = $this->doQuery( $commentedSql );
$lastError = $this->lastError();
$lastErrno = $this->lastErrno();
$this->affectedRowCount = $this->affectedRows();
unset( $ps ); // profile out (if set)
$queryRuntime = max( microtime( true ) - $startTime, 0.0 );
$recoverableSR = false; // recoverable statement rollback?
$recoverableCL = false; // recoverable connection loss?
$reconnected = false; // reconnection both attempted and succeeded?
$errflags = self::ERR_NONE;
if ( $ret !== false ) {
$this->lastPing = $startTime;
// Keep track of whether the transaction has write queries pending
if ( $isPermWrite && $this->trxLevel() ) {
$this->transactionManager->updateTrxWriteQueryReport(
$this->getQueryVerb( $sql ),
@ -1346,13 +1374,51 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
);
}
} elseif ( $this->isConnectionError( $lastErrno ) ) {
# Check if no meaningful session state was lost
$recoverableCL = $this->canRecoverFromDisconnect( $sql, $priorWritesPending );
# Update session state tracking and try to restore the connection
// Connection lost before or during the query...
// Determine how to proceed given the lost session state
$connLossFlag = $this->assessConnectionLoss( $sql, $queryRuntime, $priorSessInfo );
// Update session state tracking and try to reestablish a connection
$reconnected = $this->replaceLostConnection( $lastErrno, __METHOD__ );
// Check if important server-side session-level state was lost
if ( $connLossFlag >= self::ERR_ABORT_SESSION ) {
$sessionError = $this->getQueryException( $lastError, $lastErrno, $sql, $fname );
$this->transactionManager->setSessionError( $sessionError );
}
// Check if important server-side transaction-level state was lost
if ( $connLossFlag >= self::ERR_ABORT_TRX ) {
$trxError = $this->getQueryException( $lastError, $lastErrno, $sql, $fname );
$this->transactionManager->setTransactionError( $trxError );
}
// Check if the query should be retried (having made the reconnection attempt)
if ( $connLossFlag === self::ERR_RETRY_QUERY ) {
$errflags |= ( $reconnected ? self::ERR_RETRY_QUERY : self::ERR_ABORT_QUERY );
} else {
$errflags |= $connLossFlag;
}
} elseif ( $this->isKnownStatementRollbackError( $lastErrno ) ) {
// Query error triggered a server-side statement-only rollback...
$errflags |= self::ERR_ABORT_QUERY;
if ( $this->trxLevel() ) {
// Allow legacy callers to ignore such errors via QUERY_IGNORE_DBO_TRX and
// try/catch. However, a deprecation notice will be logged on the next query.
$cause = [ $lastError, $lastErrno, $fname ];
$this->transactionManager->setTrxStatusIgnoredCause( $cause );
}
} else {
# Check if only the last query was rolled back
$recoverableSR = $this->isKnownStatementRollbackError( $lastErrno );
// Some other error occurred during the query...
if ( $this->trxLevel() ) {
// Server-side handling of errors during transactions varies widely depending on
// the RDBMS type and configuration. There are several possible results: (a) the
// whole transaction is rolled back, (b) only the queries after BEGIN are rolled
// back, (c) the transaction is marked as "aborted" and a ROLLBACK is required
// before other queries are permitted. For compatibility reasons, pessimistically
// require a ROLLBACK query (not using SAVEPOINT) before allowing other queries.
$trxError = $this->getQueryException( $lastError, $lastErrno, $sql, $fname );
$this->transactionManager->setTransactionError( $trxError );
$errflags |= self::ERR_ABORT_TRX;
} else {
$errflags |= self::ERR_ABORT_QUERY;
}
}
if ( $sql === self::PING_QUERY ) {
@ -1391,7 +1457,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
static::class . '::doQuery() should return an IResultWrapper' );
}
return [ $ret, $lastError, $lastErrno, $recoverableSR, $recoverableCL, $reconnected ];
return [ $ret, $lastError, $lastErrno, $errflags ];
}
/**
@ -1463,6 +1529,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
);
}
$this->transactionManager->assertSessionStatus( $this, $fname );
if ( $verb !== 'ROLLBACK TO SAVEPOINT' ) {
$this->transactionManager->assertTransactionStatus(
$this,
@ -1473,39 +1541,113 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
}
/**
* Determine whether it is safe to retry queries after a database connection is lost
* Assure that if this instance is owned, the caller is either the owner or is internal
*
* @param string $sql SQL query
* @param bool $priorWritesPending Whether there is a transaction open with
* possible write queries or transaction pre-commit/idle callbacks
* waiting on it to finish.
* @return bool True if it is safe to retry the query, false otherwise
* If a LoadBalancer owns the Database, then certain methods should only called through
* that LoadBalancer to avoid broken contracts. Otherwise, those methods can publicly be
* called by anything. In any case, internal methods from the Database itself should
* always be allowed.
*
* @param string $fname
* @param int|null $owner Owner ID of the caller
* @throws DBTransactionError
*/
private function canRecoverFromDisconnect( $sql, $priorWritesPending ) {
private function assertOwnership( $fname, $owner ) {
if ( $this->ownerId !== null && $owner !== $this->ownerId && $owner !== $this->id ) {
throw new DBTransactionError(
null,
"$fname: Database is owned by ID '{$this->ownerId}' (got '$owner')."
);
}
}
/**
* Determine how to handle a connection lost discovered during a query attempt
*
* This checks if explicit transactions, pending transaction writes, and important
* session-level state (locks, temp tables) was lost. Point-in-time read snapshot loss
* is considered acceptable for DBO_TRX logic.
*
* If state was lost, but that loss was discovered during a ROLLBACK that would have
* destroyed that state anyway, treat the error as recoverable.
*
* @param string $sql SQL query statement that encountered or caused the connection loss
* @param float $walltime How many seconds passes while attempting the query
* @param CriticalSessionInfo $priorSessInfo Session state just before the query
* @return int Recovery approach. One of the following ERR_* class constants:
* - Database::ERR_RETRY_QUERY: reconnect silently, retry query
* - Database::ERR_ABORT_QUERY: reconnect silently, do not retry query
* - Database::ERR_ABORT_TRX: reconnect, throw error, enforce transaction rollback
* - Database::ERR_ABORT_SESSION: reconnect, throw error, enforce session rollback
*/
private function assessConnectionLoss(
string $sql,
float $walltime,
CriticalSessionInfo $priorSessInfo
) {
$verb = $this->getQueryVerb( $sql );
if ( $walltime < self::DROPPED_CONN_BLAME_THRESHOLD_SEC ) {
// Query failed quickly; the connection was probably lost before the query was sent
$res = self::ERR_RETRY_QUERY;
} else {
// Query took a long time; the connection was probably lost during query execution
$res = self::ERR_ABORT_QUERY;
}
// List of problems causing session/transaction state corruption
$blockers = [];
if ( $this->sessionNamedLocks ) {
// Named locks were automatically released, breaking the expectations
// of callers relying on those locks for critical section enforcement
$blockers[] = 'named locks';
// Loss of named locks breaks future callers relying on those locks for critical sections
foreach ( $priorSessInfo->namedLocks as $lockName => $lockInfo ) {
if ( $lockInfo['trxId'] && $lockInfo['trxId'] === $priorSessInfo->trxId ) {
// Treat lost locks acquired during the lost transaction as a transaction state
// problem. Connection loss on ROLLBACK (non-SAVEPOINT) is tolerable since
// rollback automatically triggered server-side.
if ( $verb !== 'ROLLBACK' ) {
$res = max( $res, self::ERR_ABORT_TRX );
$blockers[] = "named lock '$lockName'";
}
} else {
// Treat lost locks acquired either during prior transactions or during no
// transaction as a session state problem.
$res = max( $res, self::ERR_ABORT_SESSION );
$blockers[] = "named lock '$lockName'";
}
}
if ( $this->sessionTempTables ) {
// Temp tables were automatically dropped, breaking the expectations
// of callers relying on those tables having been created/populated
$blockers[] = 'temp tables';
// Loss of temp tables breaks future callers relying on those tables for queries
foreach ( $priorSessInfo->tempTables as $tableName => $tableInfo ) {
if ( $tableInfo['trxId'] && $tableInfo['trxId'] === $priorSessInfo->trxId ) {
// Treat lost temp tables created during the lost transaction as a transaction
// state problem. Connection loss on ROLLBACK (non-SAVEPOINT) is tolerable since
// rollback automatically triggered server-side.
if ( $verb !== 'ROLLBACK' ) {
$res = max( $res, self::ERR_ABORT_TRX );
$blockers[] = "temp table '$tableName'";
}
} else {
// Treat lost temp tables created either during prior transactions or during
// no transaction as a session state problem.
$res = max( $res, self::ERR_ABORT_SESSION );
$blockers[] = "temp table '$tableName'";
}
}
if ( $priorWritesPending && $sql !== 'ROLLBACK' ) {
// Transaction was automatically rolled back, breaking the expectations
// of callers and DBO_TRX semantics relying on that transaction to provide
// atomic writes (point-in-time snapshot loss is acceptable for DBO_TRX)
$blockers[] = 'transaction writes';
// Loss of transaction writes breaks future callers and DBO_TRX logic relying on those
// writes to be atomic and still pending. Connection loss on ROLLBACK (non-SAVEPOINT) is
// tolerable since rollback automatically triggered server-side.
if ( $priorSessInfo->trxWriteCallers && $verb !== 'ROLLBACK' ) {
$res = max( $res, self::ERR_ABORT_TRX );
$blockers[] = 'uncommitted writes';
}
if ( $this->transactionManager->explicitTrxActive() && $sql !== 'ROLLBACK' && $sql !== 'COMMIT' ) {
// Transaction was automatically rolled back, breaking the expectations of
// callers relying on that transaction to provide atomic writes, serializability,
// or read results consistent with a single point-in-time snapshot. Disconnection
// on ROLLBACK is not an issue, since the intended result of rolling back the
// transaction was in fact achieved. Disconnection on COMMIT of an empty transaction
// is also not an issue, for similar reasons (T127428).
if ( $priorSessInfo->trxPreCommitCbCallers && $verb !== 'ROLLBACK' ) {
$res = max( $res, self::ERR_ABORT_TRX );
$blockers[] = 'pre-commit callbacks';
}
if ( $priorSessInfo->trxExplicit && $verb !== 'ROLLBACK' && $sql !== 'COMMIT' ) {
// Transaction automatically rolled back, breaking the expectations of callers
// relying on that transaction to provide atomic writes, serializability, or use
// one point-in-time snapshot for all reads. Assume that connection loss is OK
// with ROLLBACK (non-SAVEPOINT). Likewise for COMMIT (T127428).
$res = max( $res, self::ERR_ABORT_TRX );
$blockers[] = 'explicit transaction';
}
@ -1518,11 +1660,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
'db_log_category' => 'connection'
] )
);
return false;
}
return true;
return $res;
}
/**
@ -1559,7 +1699,8 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
// Handle callbacks in trxEndCallbacks, e.g. onTransactionResolution().
// If callback suppression is set then the array will remain unhandled.
$this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
// Handle callbacks in trxRecurringCallbacks, e.g. setTransactionListener()
// Handle callbacks in trxRecurringCallbacks, e.g. setTransactionListener().
// If callback suppression is set then the array will remain unhandled.
$this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
}
@ -3949,6 +4090,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
* @stable to override
* @param int|string $errno
* @return bool Whether the given query error was a connection drop
* @since 1.38
*/
protected function isConnectionError( $errno ) {
return false;
@ -4434,7 +4576,12 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
);
}
} else {
$this->transactionManager->setTransactionErrorFromStatus( $this, $fname );
// Put the transaction into an error state if it's not already in one
$trxError = new DBUnexpectedError(
$this,
"Uncancelable atomic section canceled (got $fname)"
);
$this->transactionManager->setTransactionError( $trxError );
}
} finally {
// Fix up callbacks owned by the sections that were just cancelled.
@ -4579,7 +4726,15 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
}
if ( !$this->trxLevel() ) {
$this->transactionManager->setTrxStatusToNone();
$this->transactionManager->clearPreEndCallbacks();
if ( $this->transactionManager->trxLevel() <= TransactionManager::STATUS_TRX_ERROR ) {
$this->connLogger->info(
"$fname: acknowledged server-side transaction loss on {db_server}",
$this->getLogContext()
);
}
return;
}
@ -4620,6 +4775,47 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$this->transactionManager = $transactionManager;
}
public function flushSession( $fname = __METHOD__, $owner = null ) {
$this->assertOwnership( $fname, $owner );
if ( $this->trxLevel() ) {
// Any existing transaction should have been rolled back already
throw new DBUnexpectedError(
$this,
"$fname: transaction still in progress (not yet rolled back)"
);
}
// If the session state was already lost due to either an unacknowledged session
// state loss error (e.g. dropped connection) or an explicit connection close call,
// then there is nothing to do here. Note that such cases, even temporary tables and
// server-side config variables are lost (the invocation of this method is assumed to
// imply that such losses are tolerable).
if ( $this->transactionManager->sessionStatus() <= TransactionManager::STATUS_SESS_ERROR ) {
$this->connLogger->info(
"$fname: acknowledged server-side session loss on {db_server}",
$this->getLogContext()
);
} elseif ( $this->isOpen() ) {
// Connection handle exists; server-side session state must be flushed
$this->doFlushSession( $fname );
$this->sessionNamedLocks = [];
}
$this->transactionManager->clearSessionError();
}
/**
* Reset the server-side session state for named locks and table locks
*
* Connection and query errors will be suppressed and logged
*
* @param string $fname
* @since 1.38
*/
protected function doFlushSession( $fname ) {
// no-op
}
public function flushSnapshot( $fname = __METHOD__, $flush = self::FLUSHING_ONE ) {
$this->transactionManager->onFlushSnapshot( $this, $fname, $flush, $this->getTransactionRoundId() );
$this->commit( $fname, self::FLUSHING_INTERNAL );
@ -4767,14 +4963,15 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
/**
* Get the replica DB lag when the current transaction started
*
* This is useful when transactions might use snapshot isolation
* (e.g. REPEATABLE-READ in innodb), so the "real" lag of that data
* is this lag plus transaction duration. If they don't, it is still
* safe to be pessimistic. This returns null if there is no transaction.
* This is useful given that transactions might use point-in-time read snapshots,
* in which case the lag estimate should be recorded just before the transaction
* establishes the read snapshot (either BEGIN or the first SELECT/write query).
*
* This returns null if the lag status for this transaction was not yet recorded.
* If snapshots are not used, it is still safe to be pessimistic.
*
* @return array|null ('lag': seconds or false on error, 'since': UNIX timestamp of BEGIN)
* This returns null if there is no transaction or the lag status was not yet recorded.
*
* @return array|null ('lag': seconds or false, 'since': UNIX timestamp of BEGIN) or null
* @since 1.27
*/
final protected function getRecordedTransactionLagStatus() {
@ -5458,17 +5655,13 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
* try {
* //...send a query that changes the session/transaction state...
* } catch ( DBError $e ) {
* // Rely on assertQueryIsCurrentlyAllowed()/canRecoverFromDisconnect() to ensure
* // the rollback of incomplete transactions and the prohibition of reconnections
* // that mask a loss of session state (e.g. named locks and temp tables)
* $this->completeCriticalSection( __METHOD__, $cs );
* throw $expectedException;
* }
* try {
* //...send another query that changes the session/transaction state...
* } catch ( DBError $trxError ) {
* // Inform assertQueryIsCurrentlyAllowed() that the transaction must be rolled
* // back (e.g. even if the error was a pre-query check or normally recoverable)
* // Require ROLLBACK before allowing any other queries from outside callers
* $this->completeCriticalSection( __METHOD__, $cs, $trxError );
* throw $expectedException;
* }

View file

@ -263,12 +263,6 @@ abstract class DatabaseMysqlBase extends Database {
*/
abstract protected function mysqlError( $conn = null );
protected function isQueryTimeoutError( $errno ) {
// https://dev.mysql.com/doc/refman/8.0/en/client-error-reference.html
// https://phabricator.wikimedia.org/T170638
return in_array( $errno, [ 1028, 1969, 2062, 3024 ] );
}
protected function isInsertSelectSafe( array $insertOptions, array $selectOptions ) {
$row = $this->getReplicationSafetyInfo();
// For row-based-replication, the resulting changes will be relayed, not the query
@ -1122,6 +1116,31 @@ abstract class DatabaseMysqlBase extends Database {
return true;
}
protected function doFlushSession( $fname ) {
$flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_ROWS | self::QUERY_NO_RETRY;
// In MySQL, ROLLBACK does not automatically release table locks;
// https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html
$sql = "UNLOCK TABLES";
list( $res, $err, $errno ) = $this->executeQuery( $sql, $fname, $flags );
if ( $res === false ) {
$this->reportQueryError( $err, $errno, $sql, $fname, true );
}
$releaseLockFields = [];
foreach ( $this->sessionNamedLocks as $name => $info ) {
$encName = $this->addQuotes( $this->makeLockName( $name ) );
$releaseLockFields[] = "RELEASE_LOCK($encName)";
}
if ( $releaseLockFields ) {
$sql = 'SELECT ' . implode( ',', $releaseLockFields ) . ')';
list( $res, $err, $errno ) = $this->executeQuery( $sql, __METHOD__, $flags );
if ( $res === false ) {
$this->reportQueryError( $err, $errno, $sql, $fname, true );
}
}
}
/**
* @param bool $value
*/
@ -1235,6 +1254,13 @@ abstract class DatabaseMysqlBase extends Database {
return in_array( $errno, [ 2013, 2006, 2003, 1927, 1053 ], true );
}
protected function isQueryTimeoutError( $errno ) {
// https://mariadb.com/kb/en/mariadb-error-codes/
// https://dev.mysql.com/doc/refman/8.0/en/client-error-reference.html
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
return in_array( $errno, [ 3024, 2062, 1969, 1028 ], true );
}
protected function isKnownStatementRollbackError( $errno ) {
// https://mariadb.com/kb/en/mariadb-error-codes/
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html

View file

@ -662,6 +662,11 @@ __INDEXATTR__;
return in_array( $errno, $codes, true );
}
protected function isQueryTimeoutError( $errno ) {
// https://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
return ( $errno === '57014' );
}
protected function isKnownStatementRollbackError( $errno ) {
return false; // transaction has to be rolled-back from error state
}
@ -1330,6 +1335,20 @@ SQL;
return ( $row->released === 't' );
}
protected function doFlushSession( $fname ) {
$flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_ROWS | self::QUERY_NO_RETRY;
// In Postgres, ROLLBACK already releases table locks;
// https://www.postgresql.org/docs/9.4/sql-lock.html
// https://www.postgresql.org/docs/9.1/functions-admin.html
$sql = "pg_advisory_unlock_all()";
list( $res, $err, $errno ) = $this->executeQuery( $sql, __METHOD__, $flags );
if ( $res === false ) {
$this->reportQueryError( $err, $errno, $sql, $fname, true );
}
}
public function serverIsReadOnly() {
$res = $this->query(
"SHOW default_transaction_read_only",

View file

@ -47,7 +47,7 @@ class DatabaseSqlite extends Database {
/** @var PDO|null */
protected $conn;
/** @var LockManager (hopefully on the same server as the DB) */
/** @var LockManager|null (hopefully on the same server as the DB) */
protected $lockMgr;
/** @var string|null */
@ -250,7 +250,6 @@ class DatabaseSqlite extends Database {
protected function closeConnection() {
$this->conn = null;
// Release all locks, via FSLockManager::__destruct, as the base class expects
// @phan-suppress-next-line PhanTypeMismatchPropertyProbablyReal Expected null
$this->lockMgr = null;
return true;
@ -1086,6 +1085,14 @@ class DatabaseSqlite extends Database {
protected function doHandleSessionLossPreconnect() {
$this->sessionAttachedDbs = [];
// Release all locks, via FSLockManager::__destruct, as the base class expects;
$this->lockMgr = null;
// Create a new lock manager instance
$this->lockMgr = $this->makeLockManager();
}
protected function doFlushSession( $fname ) {
// Release all locks, via FSLockManager::__destruct, as the base class expects
$this->lockMgr = null;
// Create a new lock manager instance
$this->lockMgr = $this->makeLockManager();
}

View file

@ -1977,14 +1977,30 @@ interface IDatabase {
*
* @param string $fname Calling function name
* @param string $flush Flush flag, set to a situationally valid IDatabase::FLUSHING_*
* constant to disable warnings about calling rollback when no transaction is in
* progress. This will silently break any ongoing explicit transaction. Only set the
* flush flag if you are sure that it is safe to ignore these warnings in your context.
* constant to disable warnings about explicitly rolling back implicit transactions.
* This will silently break any ongoing explicit transaction. Only set the flush flag
* if you are sure that it is safe to ignore these warnings in your context.
* @throws DBError If an error occurs, {@see query}
* @since 1.23 Added $flush parameter
*/
public function rollback( $fname = __METHOD__, $flush = self::FLUSHING_ONE );
/**
* Release important session-level state (named lock, table locks) as post-rollback cleanup
*
* This should only be called by a load balancer or if the handle is not attached to one.
* Also, there must be no chance that a future caller will still be expecting some of the
* lost session state.
*
* Connection and query errors will be suppressed and logged
*
* @param string $fname Calling function name
* @param int|null $owner ID of the calling instance (e.g. the LBFactory ID)
* @throws DBError If an error occurs, {@see query}
* @since 1.38
*/
public function flushSession( $fname = __METHOD__, $owner = null );
/**
* Commit any transaction but error out if writes or callbacks are pending
*

View file

@ -225,12 +225,11 @@ interface IMaintainableDatabase extends IDatabase {
);
/**
* Checks if table locks acquired by lockTables() are transaction-bound in their scope
* Check if lockTables() locks are transaction-level locks instead of session-level
*
* Transaction-bound table locks will be released when the current transaction terminates.
* Table locks that are not bound to a transaction are not effected by BEGIN/COMMIT/ROLLBACK
* and will last until either lockTables()/unlockTables() is called or the TCP connection to
* the database is closed.
* Transaction-level table locks can only be acquired within a transaction and get released
* when that transaction terminates. Session-level table locks are acquired outside of any
* transaction and are incompatible with transactions.
*
* @return bool
* @since 1.29

View file

@ -30,7 +30,7 @@ use UnexpectedValueException;
/**
* @ingroup Database
* @internal
* @internal This class should not be used outside of Database
*/
class TransactionManager {
/** @var int Transaction is in a error state requiring a full or savepoint rollback */
@ -40,6 +40,11 @@ class TransactionManager {
/** @var int No transaction is active */
public const STATUS_TRX_NONE = 3;
/** Session is in a error state requiring a reset */
public const STATUS_SESS_ERROR = 1;
/** Session is in a normal state */
public const STATUS_SESS_OK = 2;
/** @var float Guess of how many seconds it takes to replicate a small insert */
private const TINY_WRITE_SEC = 0.010;
/** @var float Consider a write slow if it took more than this many seconds */
@ -56,11 +61,14 @@ class TransactionManager {
private $trxTimestamp = null;
/** @var int Transaction status */
private $trxStatus = self::STATUS_TRX_NONE;
/** @var Throwable|null The last error that caused the status to become STATUS_TRX_ERROR */
/** @var Throwable|null The cause of any unresolved transaction state error, or, null */
private $trxStatusCause;
/** @var array|null Error details of the last statement-only rollback */
/** @var array|null Details of the last statement-rollback error for the last transaction */
private $trxStatusIgnoredCause;
/** @var Throwable|null The cause of any unresolved session state loss error, or, null */
private $sessionError;
/** @var string[] Write query callers of the current transaction */
private $trxWriteCallers = [];
/** @var float Seconds spent in write queries for the current transaction */
@ -126,6 +134,7 @@ class TransactionManager {
public function newTrxId( $mode, $fname ) {
$this->trxId = new TransactionIdentifier();
$this->trxStatus = self::STATUS_TRX_OK;
$this->trxStatusCause = null;
$this->trxStatusIgnoredCause = null;
$this->trxWriteDuration = 0.0;
$this->trxWriteQueryCount = 0;
@ -182,11 +191,13 @@ class TransactionManager {
public function setTrxStatusToOk() {
$this->trxStatus = self::STATUS_TRX_OK;
$this->trxStatusCause = null;
$this->trxStatusIgnoredCause = null;
}
public function setTrxStatusToNone() {
$this->trxStatus = self::STATUS_TRX_NONE;
$this->trxStatusCause = null;
$this->trxStatusIgnoredCause = null;
}
@ -208,14 +219,14 @@ class TransactionManager {
}
}
public function setTransactionErrorFromStatus( $db, $fname ) {
if ( $this->trxStatus > self::STATUS_TRX_ERROR ) {
// Put the transaction into an error state if it's not already in one
$trxError = new DBUnexpectedError(
public function assertSessionStatus( IDatabase $db, $fname ) {
if ( $this->sessionError ) {
throw new DBSessionStateError(
$db,
"Uncancelable atomic section canceled (got $fname)"
"Cannot execute query from $fname while session status is ERROR",
[],
$this->sessionError
);
$this->setTransactionError( $trxError );
}
}
@ -238,6 +249,32 @@ class TransactionManager {
$this->trxStatusIgnoredCause = $trxStatusIgnoredCause;
}
/**
* Get the status of the current session (ephemeral server-side state tied to the connection)
*
* @return int One of the STATUS_SESSION_* class constants
*/
public function sessionStatus() {
// Check if an unresolved error still exists
return ( $this->sessionError ) ? self::STATUS_SESS_ERROR : self::STATUS_SESS_OK;
}
/**
* Flag the session as needing a reset due to an error, if not already flagged
*
* @param Throwable $sessionError
*/
public function setSessionError( Throwable $sessionError ) {
$this->sessionError = $this->sessionError ?? $sessionError;
}
/**
* Unflag the session as needing a reset due to an error
*/
public function clearSessionError() {
$this->sessionError = null;
}
/**
* @param float $rtt
* @return float Time to apply writes to replicas based on trxWrite* fields
@ -831,6 +868,20 @@ class TransactionManager {
return $fnames;
}
/**
* List the methods that have precommit callbacks for the current transaction
*
* @return string[]
*/
public function pendingPreCommitCallbackCallers(): array {
$fnames = $this->pendingWriteCallers();
foreach ( $this->trxPreCommitOrIdleCallbacks as $callback ) {
$fnames[] = $callback[1];
}
return $fnames;
}
public function isEndCallbacksSuppressed(): bool {
return $this->trxEndCallbacksSuppressed;
}

View file

@ -0,0 +1,64 @@
<?php
/**
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
namespace Wikimedia\Rdbms;
/**
* @ingroup Database
* @internal This class should not be used outside of Database
*/
class CriticalSessionInfo {
/** @var TransactionIdentifier|null */
public $trxId;
/** @var bool */
public $trxExplicit;
/** @var string[] */
public $trxWriteCallers;
/** @var string[] */
public $trxPreCommitCbCallers;
/** @var array<string,array> */
public $namedLocks;
/** @var array<string,array> */
public $tempTables;
/**
* @param TransactionIdentifier|null $trxId
* @param bool $trxExplicit
* @param array $trxWriteCallers
* @param array $trxPreCommitCbCallers
* @param array $namedLocks
* @param array $tempTables
*/
public function __construct(
?TransactionIdentifier $trxId,
bool $trxExplicit,
array $trxWriteCallers,
array $trxPreCommitCbCallers,
array $namedLocks,
array $tempTables
) {
$this->trxId = $trxId;
$this->trxExplicit = $trxExplicit;
$this->trxWriteCallers = $trxWriteCallers;
$this->trxPreCommitCbCallers = $trxPreCommitCbCallers;
$this->namedLocks = $namedLocks;
$this->tempTables = $tempTables;
}
}

View file

@ -0,0 +1,29 @@
<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
* @ingroup Database
*/
namespace Wikimedia\Rdbms;
/**
* @newable
* @ingroup Database
*/
class DBSessionStateError extends DBTransactionError {
}

View file

@ -282,6 +282,19 @@ interface ILBFactory {
*/
public function rollbackPrimaryChanges( $fname = __METHOD__ );
/**
* Release important session-level state (named lock, table locks) as post-rollback cleanup
*
* This only applies to the instantiated tracked load balancer instances.
*
* This should only be called by application entry point functions, since there must be
* no chance that a future caller will still be expecting some of the lost session state.
*
* @param string $fname Caller name
* @since 1.38
*/
public function flushPrimarySessions( $fname = __METHOD__ );
/**
* Check if an explicit transaction round is active
*

View file

@ -121,6 +121,7 @@ abstract class LBFactory implements ILBFactory {
private const ROUND_ROLLING_BACK = 'within-rollback';
private const ROUND_COMMIT_CALLBACKS = 'within-commit-callbacks';
private const ROUND_ROLLBACK_CALLBACKS = 'within-rollback-callbacks';
private const ROUND_ROLLBACK_SESSIONS = 'within-rollback-session';
private static $loggerFields =
[ 'replLogger', 'connLogger', 'queryLogger', 'perfLogger' ];
@ -336,6 +337,16 @@ abstract class LBFactory implements ILBFactory {
$this->trxRoundStage = self::ROUND_CURSORY;
}
final public function flushPrimarySessions( $fname = __METHOD__ ) {
/** @noinspection PhpUnusedLocalVariableInspection */
$scope = ScopedCallback::newScopedIgnoreUserAbort();
// Release named locks and table locks on all primary DB connections
$this->trxRoundStage = self::ROUND_ROLLBACK_SESSIONS;
$this->forEachLBCallMethod( 'flushPrimarySessions', [ $fname, $this->id ] );
$this->trxRoundStage = self::ROUND_CURSORY;
}
/**
* @return Exception|null
*/

View file

@ -636,6 +636,7 @@ interface ILoadBalancer {
/**
* Issue ROLLBACK only on primary, only if queries were done on connection
*
* @param string $fname Caller name
* @param int|null $owner ID of the calling instance (e.g. the LBFactory ID)
* @throws DBExpectedError
@ -643,6 +644,18 @@ interface ILoadBalancer {
*/
public function rollbackPrimaryChanges( $fname = __METHOD__, $owner = null );
/**
* Release/destroy session-level named locks, table locks, and temp tables
*
* Only call this function right after calling rollbackPrimaryChanges()
*
* @param string $fname Caller name
* @param int|null $owner ID of the calling instance (e.g. the LBFactory ID)
* @throws DBExpectedError
* @since 1.38
*/
public function flushPrimarySessions( $fname = __METHOD__, $owner = null );
/**
* Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshots
*

View file

@ -1910,9 +1910,11 @@ class LoadBalancer implements ILoadBalancer {
$restore = ( $this->trxRoundId !== false );
$this->trxRoundId = false;
$this->trxRoundStage = self::ROUND_ERROR; // "failed" until proven otherwise
$this->forEachOpenPrimaryConnection( static function ( IDatabase $conn ) use ( $fname ) {
$conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
} );
$this->forEachOpenPrimaryConnection(
static function ( IDatabase $conn ) use ( $fname ) {
$conn->rollback( $fname, $conn::FLUSHING_ALL_PEERS );
}
);
if ( $restore ) {
// Unmark handles as participating in this explicit transaction round
$this->forEachOpenPrimaryConnection( function ( Database $conn ) {
@ -1922,6 +1924,21 @@ class LoadBalancer implements ILoadBalancer {
$this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
}
public function flushPrimarySessions( $fname = __METHOD__, $owner = null ) {
$this->assertOwnership( $fname, $owner );
$this->assertTransactionRoundStage( [ self::ROUND_CURSORY ] );
if ( $this->hasPrimaryChanges() ) {
// Any transaction should have been rolled back beforehand
throw new DBTransactionError( null, "Cannot reset session while writes are pending" );
}
$this->forEachOpenPrimaryConnection(
function ( IDatabase $conn ) use ( $fname ) {
$conn->flushSession( $fname, $this->id );
}
);
}
/**
* @param string|string[] $stage
* @throws DBTransactionError

View file

@ -498,6 +498,7 @@ class LoadBalancerTest extends MediaWikiIntegrationTestCase {
* @covers \Wikimedia\Rdbms\LoadBalancer::finalizePrimaryChanges()
* @covers \Wikimedia\Rdbms\LoadBalancer::approvePrimaryChanges()
* @covers \Wikimedia\Rdbms\LoadBalancer::commitPrimaryChanges()
* @covers \Wikimedia\Rdbms\LoadBalancer::flushPrimarySessions()
* @covers \Wikimedia\Rdbms\LoadBalancer::runPrimaryTransactionIdleCallbacks()
* @covers \Wikimedia\Rdbms\LoadBalancer::runPrimaryTransactionListenerCallbacks()
*/
@ -584,6 +585,11 @@ class LoadBalancerTest extends MediaWikiIntegrationTestCase {
$this->assertEquals( array_fill_keys( [ 'a', 'b', 'c', 'd' ], 1 ), $ac );
$this->assertEquals( 2, $tlCalls );
$conn1->lock( 'test_lock_' . mt_rand(), __METHOD__, 0 );
$lb->flushPrimarySessions( __METHOD__ );
$this->assertSame( TransactionManager::STATUS_TRX_NONE, $conn1->trxStatus() );
$this->assertSame( TransactionManager::STATUS_TRX_NONE, $conn2->trxStatus() );
$conn1->close();
$conn2->close();
}

View file

@ -0,0 +1,240 @@
<?php
use MediaWiki\MediaWikiServices;
use Wikimedia\Rdbms\Database;
use Wikimedia\Rdbms\DBQueryDisconnectedError;
use Wikimedia\Rdbms\DBQueryTimeoutError;
use Wikimedia\Rdbms\DBSessionStateError;
use Wikimedia\Rdbms\DBTransactionStateError;
use Wikimedia\Rdbms\DBUnexpectedError;
use Wikimedia\Rdbms\TransactionManager;
/**
* @group mysql
* @group Database
* @group medium
*/
class DatabaseMysqlTest extends \MediaWikiIntegrationTestCase {
/** @var DatabaseMysqlBase */
protected $conn;
protected function setUp(): void {
parent::setUp();
if ( !extension_loaded( 'mysqli' ) ) {
$this->markTestSkipped( 'No MySQL support detected' );
}
$lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
if ( $lb->getServerType( 0 ) !== 'mysql' ) {
$this->markTestSkipped( 'No MySQL $wgLBFactoryConf config detected' );
}
$this->conn = $this->newConnection();
}
protected function tearDown(): void {
$this->conn->close( __METHOD__ );
parent::tearDown();
}
/**
* @covers Database::query()
*/
public function testQueryTimeout() {
try {
$this->conn->query(
'SET STATEMENT max_statement_time=0.001 FOR SELECT sleep(1) FROM dual',
__METHOD__
);
$this->fail( "No DBQueryTimeoutError caught" );
} catch ( DBQueryTimeoutError $e ) {
$this->assertInstanceOf( DBQueryTimeoutError::class, $e );
}
$row = $this->conn->query( 'SELECT "x" AS v', __METHOD__ )->fetchObject();
$this->assertSame( 'x', $row->v, "Still connected/usable" );
}
/**
* @covers Database::query()
*/
public function testConnectionKill() {
try {
$this->conn->query( 'KILL (SELECT connection_id())', __METHOD__ );
$this->fail( "No DBQueryDisconnectedError caught" );
} catch ( DBQueryDisconnectedError $e ) {
$this->assertInstanceOf( DBQueryDisconnectedError::class, $e );
}
$row = $this->conn->query( 'SELECT "x" AS v', __METHOD__ )->fetchObject();
$this->assertSame( 'x', $row->v, "Recovered" );
}
/**
* @covers Database::query()
* @covers Database::rollback()
* @covers Database::flushSession()
*/
public function testConnectionLoss() {
$row = $this->conn->query( 'SELECT connection_id() AS id', __METHOD__ )->fetchObject();
$encId = intval( $row->id );
$adminConn = $this->newConnection();
$adminConn->query( "KILL $encId", __METHOD__ );
$row = $this->conn->query( 'SELECT "x" AS v', __METHOD__ )->fetchObject();
$this->assertSame( 'x', $row->v, "Recovered" );
$this->conn->startAtomic( __METHOD__ );
$this->assertSame( 1, $this->conn->trxLevel(), "Transaction exists" );
$row = $this->conn->query( 'SELECT connection_id() AS id', __METHOD__ )->fetchObject();
$encId = intval( $row->id );
$adminConn->query( "KILL $encId", __METHOD__ );
try {
$this->conn->query( 'SELECT 1', __METHOD__ );
$this->fail( "No DBQueryDisconnectedError caught" );
} catch ( DBQueryDisconnectedError $e ) {
$this->assertTrue( $this->conn->isOpen(), "Reconnected" );
try {
$this->conn->endAtomic( __METHOD__ );
$this->fail( "No DBUnexpectedError caught" );
} catch ( DBUnexpectedError $e ) {
$this->assertInstanceOf( DBUnexpectedError::class, $e );
}
$this->assertSame( TransactionManager::STATUS_TRX_ERROR, $this->conn->trxStatus() );
try {
$this->conn->query( 'SELECT "x" AS v', __METHOD__ )->fetchObject();
$this->fail( "No DBTransactionStateError caught" );
} catch ( DBTransactionStateError $e ) {
$this->assertInstanceOf( DBTransactionStateError::class, $e );
}
$this->assertSame( 0, $this->conn->trxLevel(), "Transaction lost" );
$this->conn->rollback( __METHOD__ );
$this->assertSame( 0, $this->conn->trxLevel(), "No transaction" );
$row = $this->conn->query( 'SELECT "x" AS v', __METHOD__ )->fetchObject();
$this->assertSame( 'x', $row->v, "Recovered" );
}
$this->conn->lock( 'session_lock_' . mt_rand(), __METHOD__, 0 );
$row = $this->conn->query( 'SELECT connection_id() AS id', __METHOD__ )->fetchObject();
$encId = intval( $row->id );
$adminConn->query( "KILL $encId", __METHOD__ );
try {
$this->conn->query( 'SELECT 1', __METHOD__ );
$this->fail( "No DBQueryDisconnectedError caught" );
} catch ( DBQueryDisconnectedError $e ) {
$this->assertInstanceOf( DBQueryDisconnectedError::class, $e );
}
$this->assertTrue( $this->conn->isOpen(), "Reconnected" );
try {
$this->conn->query( 'SELECT "x" AS v', __METHOD__ )->fetchObject();
$this->fail( "No DBSessionStateError caught" );
} catch ( DBSessionStateError $e ) {
$this->assertInstanceOf( DBSessionStateError::class, $e );
}
$this->assertTrue( $this->conn->isOpen(), "Connection remains" );
$this->conn->rollback( __METHOD__ );
$this->conn->flushSession( __METHOD__ );
$row = $this->conn->query( 'SELECT "x" AS v', __METHOD__ )->fetchObject();
$this->assertSame( 'x', $row->v, "Recovered" );
$adminConn->close( __METHOD__ );
}
/**
* @covers Database::query()
* @covers Database::cancelAtomic()
* @covers Database::rollback()
* @covers Database::flushSession()
*/
public function testTransactionError() {
$this->assertSame( TransactionManager::STATUS_TRX_NONE, $this->conn->trxStatus() );
$this->conn->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
$this->assertSame( TransactionManager::STATUS_TRX_OK, $this->conn->trxStatus() );
$row = $this->conn->query( 'SELECT connection_id() AS id', __METHOD__ )->fetchObject();
$encId = intval( $row->id );
try {
$this->conn->lock( 'trx_lock_' . mt_rand(), __METHOD__, 0 );
$this->assertSame( TransactionManager::STATUS_TRX_OK, $this->conn->trxStatus() );
$this->conn->query( "SELECT invalid query()", __METHOD__ );
$this->fail( "No DBQueryError caught" );
} catch ( DBQueryError $e ) {
$this->assertInstanceOf( DBQueryError::class, $e );
}
$this->assertSame( TransactionManager::STATUS_TRX_ERROR, $this->conn->trxStatus() );
try {
$this->conn->query( 'SELECT "x" AS v', __METHOD__ )->fetchObject();
$this->fail( "No DBTransactionStateError caught" );
} catch ( DBTransactionStateError $e ) {
$this->assertInstanceOf( DBTransactionStateError::class, $e );
$this->assertSame( TransactionManager::STATUS_TRX_ERROR, $this->conn->trxStatus() );
$this->assertSame( 1, $this->conn->trxLevel(), "Transaction remains" );
$this->assertTrue( $this->conn->isOpen(), "Connection remains" );
}
$adminConn = $this->newConnection();
$adminConn->query( "KILL $encId", __METHOD__ );
$this->assertSame( TransactionManager::STATUS_TRX_ERROR, $this->conn->trxStatus() );
try {
$this->conn->query( 'SELECT 1', __METHOD__ );
$this->fail( "No DBTransactionStateError caught" );
} catch ( DBTransactionStateError $e ) {
$this->assertInstanceOf( DBTransactionStateError::class, $e );
}
$this->assertSame(
1,
$this->conn->trxLevel(),
"Transaction loss not yet detected (due to STATUS_TRX_ERROR)"
);
$this->assertTrue(
$this->conn->isOpen(),
"Connection loss not detected (due to STATUS_TRX_ERROR)"
);
$this->conn->cancelAtomic( __METHOD__ );
$this->assertSame( 0, $this->conn->trxLevel(), "No transaction remains" );
$this->assertTrue( $this->conn->isOpen(), "Reconnected" );
$row = $this->conn->query( 'SELECT "x" AS v', __METHOD__ )->fetchObject();
$this->assertSame( 'x', $row->v, "Recovered" );
$this->conn->rollback( __METHOD__ );
$adminConn->close( __METHOD__ );
}
/**
* @return DatabaseMysqlBase
*/
private function newConnection() {
$lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
/** @var DatabaseMysqlBase $conn */
$conn = Database::factory(
'mysql',
array_merge(
$lb->getServerInfo( 0 ),
[
'dbname' => null,
'schema' => null,
'tablePrefix' => '',
]
)
);
return $conn;
}
}

View file

@ -284,6 +284,9 @@ class DatabaseTest extends PHPUnit\Framework\TestCase {
} catch ( RuntimeException $e ) {
$this->assertTrue( $db->getFlag( DBO_TRX ) );
}
$lbFactory->rollbackPrimaryChanges( __METHOD__ );
$lbFactory->flushPrimarySessions( __METHOD__ );
}
/**
@ -366,6 +369,8 @@ class DatabaseTest extends PHPUnit\Framework\TestCase {
$lbFactory->commitPrimaryChanges( __METHOD__ );
$this->assertFalse( $called, 'Not called in next round commit' );
$lbFactory->flushPrimarySessions( __METHOD__ );
}
/**