rdbms: Add IDatabase::cancelAtomic()

Atomic sections are currently useful if you want to wrap some SQL
statements in a transaction when you might be called from inside someone
else's transaction, and you expect the caller to roll back everything if
you fail.

But there are some cases where you want to allow the caller to recover
from errors, in which case you need to roll back just the atomic
section. Savepoints are supported by all our databases and can be used
for this purpose, so let's do so.

Bug: T188660
Change-Id: Iee548619df89fd7fbd581b01106b8b41d3df71cc
This commit is contained in:
Brad Jorsch 2018-03-17 17:59:56 -04:00 committed by Aaron Schulz
parent 6c1cd929cb
commit 52aeaa7a5f
6 changed files with 271 additions and 30 deletions

View file

@ -66,6 +66,11 @@ production.
the SQL query. The ActorMigration class may also be used to get feature-flagged the SQL query. The ActorMigration class may also be used to get feature-flagged
information needed to access actor-related fields during the migration information needed to access actor-related fields during the migration
period. period.
* Added Wikimedia\Rdbms\IDatabase::cancelAtomic(), to roll back an atomic
section without having to roll back the whole transaction.
* Wikimedia\Rdbms\IDatabase::doAtomicSection(), non-native ::insertSelect(),
and non-MySQL ::replace() and ::upsert() no longer roll back the whole
transaction on failure.
=== External library changes in 1.31 === === External library changes in 1.31 ===

View file

@ -519,6 +519,10 @@ class DBConnRef implements IDatabase {
return $this->__call( __FUNCTION__, func_get_args() ); return $this->__call( __FUNCTION__, func_get_args() );
} }
public function cancelAtomic( $fname = __METHOD__ ) {
return $this->__call( __FUNCTION__, func_get_args() );
}
public function doAtomicSection( $fname, callable $callback ) { public function doAtomicSection( $fname, callable $callback ) {
return $this->__call( __FUNCTION__, func_get_args() ); return $this->__call( __FUNCTION__, func_get_args() );
} }

View file

@ -187,6 +187,12 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
* @see Database::trxLevel * @see Database::trxLevel
*/ */
private $trxAutomatic = false; private $trxAutomatic = false;
/**
* Counter for atomic savepoint identifiers. Reset when a new transaction begins.
*
* @var int
*/
private $trxAtomicCounter = 0;
/** /**
* Array of levels of atomicity within transactions * Array of levels of atomicity within transactions
* *
@ -1241,6 +1247,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
*/ */
private function handleSessionLoss() { private function handleSessionLoss() {
$this->trxLevel = 0; $this->trxLevel = 0;
$this->trxAtomicCounter = 0;
$this->trxIdleCallbacks = []; // T67263; transaction already lost $this->trxIdleCallbacks = []; // T67263; transaction already lost
$this->trxPreCommitCallbacks = []; // T67263; transaction already lost $this->trxPreCommitCallbacks = []; // T67263; transaction already lost
$this->sessionTempTables = []; $this->sessionTempTables = [];
@ -2547,7 +2554,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$this->endAtomic( $fname ); $this->endAtomic( $fname );
$this->affectedRowCount = $affectedRowCount; $this->affectedRowCount = $affectedRowCount;
} catch ( Exception $e ) { } catch ( Exception $e ) {
$this->rollback( $fname, self::FLUSHING_INTERNAL ); $this->cancelAtomic( $fname );
throw $e; throw $e;
} }
} }
@ -2630,7 +2637,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$this->endAtomic( $fname ); $this->endAtomic( $fname );
$this->affectedRowCount = $affectedRowCount; $this->affectedRowCount = $affectedRowCount;
} catch ( Exception $e ) { } catch ( Exception $e ) {
$this->rollback( $fname, self::FLUSHING_INTERNAL ); $this->cancelAtomic( $fname );
throw $e; throw $e;
} }
@ -2798,11 +2805,11 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$this->endAtomic( $fname ); $this->endAtomic( $fname );
$this->affectedRowCount = $affectedRowCount; $this->affectedRowCount = $affectedRowCount;
} else { } else {
$this->rollback( $fname, self::FLUSHING_INTERNAL ); $this->cancelAtomic( $fname );
} }
return $ok; return $ok;
} catch ( Exception $e ) { } catch ( Exception $e ) {
$this->rollback( $fname, self::FLUSHING_INTERNAL ); $this->cancelAtomic( $fname );
throw $e; throw $e;
} }
} }
@ -3093,7 +3100,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
call_user_func( $callback ); call_user_func( $callback );
$this->endAtomic( __METHOD__ ); $this->endAtomic( __METHOD__ );
} catch ( Exception $e ) { } catch ( Exception $e ) {
$this->rollback( __METHOD__, self::FLUSHING_INTERNAL ); $this->cancelAtomic( __METHOD__ );
throw $e; throw $e;
} }
} }
@ -3230,6 +3237,48 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
} }
} }
/**
* Create a savepoint
*
* This is used internally to implement atomic sections. It should not be
* used otherwise.
*
* @since 1.31
* @param string $identifier Identifier for the savepoint
* @param string $fname Calling function name
*/
protected function doSavepoint( $identifier, $fname ) {
$this->query( 'SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
}
/**
* Release a savepoint
*
* This is used internally to implement atomic sections. It should not be
* used otherwise.
*
* @since 1.31
* @param string $identifier Identifier for the savepoint
* @param string $fname Calling function name
*/
protected function doReleaseSavepoint( $identifier, $fname ) {
$this->query( 'RELEASE SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
}
/**
* Rollback to a savepoint
*
* This is used internally to implement atomic sections. It should not be
* used otherwise.
*
* @since 1.31
* @param string $identifier Identifier for the savepoint
* @param string $fname Calling function name
*/
protected function doRollbackToSavepoint( $identifier, $fname ) {
$this->query( 'ROLLBACK TO SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
}
final public function startAtomic( $fname = __METHOD__ ) { final public function startAtomic( $fname = __METHOD__ ) {
if ( !$this->trxLevel ) { if ( !$this->trxLevel ) {
$this->begin( $fname, self::TRANSACTION_INTERNAL ); $this->begin( $fname, self::TRANSACTION_INTERNAL );
@ -3238,32 +3287,68 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
if ( !$this->getFlag( self::DBO_TRX ) ) { if ( !$this->getFlag( self::DBO_TRX ) ) {
$this->trxAutomaticAtomic = true; $this->trxAutomaticAtomic = true;
} }
$savepointId = null;
} else {
$savepointId = 'wikimedia_rdbms_atomic' . ++$this->trxAtomicCounter;
if ( strlen( $savepointId ) > 30 ) { // 30 == Oracle's identifier length limit (pre 12c)
$this->queryLogger->warning(
'There have been an excessively large number of atomic sections in a transaction'
. " started by $this->trxFname, reusing IDs (at $fname)",
[ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
);
$this->trxAtomicCounter = 0;
$savepointId = 'wikimedia_rdbms_atomic' . ++$this->trxAtomicCounter;
}
$this->doSavepoint( $savepointId, $fname );
} }
$this->trxAtomicLevels[] = $fname; $this->trxAtomicLevels[] = [ $fname, $savepointId ];
} }
final public function endAtomic( $fname = __METHOD__ ) { final public function endAtomic( $fname = __METHOD__ ) {
if ( !$this->trxLevel ) { if ( !$this->trxLevel ) {
throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." ); throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
} }
if ( !$this->trxAtomicLevels ||
array_pop( $this->trxAtomicLevels ) !== $fname list( $savedFname, $savepointId ) = $this->trxAtomicLevels
) { ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
if ( $savedFname !== $fname ) {
throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." ); throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
} }
if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) { if ( !$savepointId ) {
$this->commit( $fname, self::FLUSHING_INTERNAL ); $this->commit( $fname, self::FLUSHING_INTERNAL );
} else {
$this->doReleaseSavepoint( $savepointId, $fname );
} }
} }
final public function cancelAtomic( $fname = __METHOD__ ) {
if ( !$this->trxLevel ) {
throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
}
list( $savedFname, $savepointId ) = $this->trxAtomicLevels
? array_pop( $this->trxAtomicLevels ) : [ null, null ];
if ( $savedFname !== $fname ) {
throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
}
if ( !$savepointId ) {
$this->rollback( $fname, self::FLUSHING_INTERNAL );
} else {
$this->doRollbackToSavepoint( $savepointId, $fname );
}
$this->affectedRowCount = 0; // for the sake of consistency
}
final public function doAtomicSection( $fname, callable $callback ) { final public function doAtomicSection( $fname, callable $callback ) {
$this->startAtomic( $fname ); $this->startAtomic( $fname );
try { try {
$res = call_user_func_array( $callback, [ $this, $fname ] ); $res = call_user_func_array( $callback, [ $this, $fname ] );
} catch ( Exception $e ) { } catch ( Exception $e ) {
$this->rollback( $fname, self::FLUSHING_INTERNAL ); $this->cancelAtomic( $fname );
throw $e; throw $e;
} }
$this->endAtomic( $fname ); $this->endAtomic( $fname );
@ -3275,7 +3360,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
// Protect against mismatched atomic section, transaction nesting, and snapshot loss // Protect against mismatched atomic section, transaction nesting, and snapshot loss
if ( $this->trxLevel ) { if ( $this->trxLevel ) {
if ( $this->trxAtomicLevels ) { if ( $this->trxAtomicLevels ) {
$levels = implode( ', ', $this->trxAtomicLevels ); $levels = array_reduce( $this->trxAtomicLevels, function ( $accum, $v ) {
return $accum === null ? $v[0] : "$accum, " . $v[0];
} );
$msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open."; $msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open.";
throw new DBUnexpectedError( $this, $msg ); throw new DBUnexpectedError( $this, $msg );
} elseif ( !$this->trxAutomatic ) { } elseif ( !$this->trxAutomatic ) {
@ -3294,6 +3381,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$this->assertOpen(); $this->assertOpen();
$this->doBegin( $fname ); $this->doBegin( $fname );
$this->trxAtomicCounter = 0;
$this->trxTimestamp = microtime( true ); $this->trxTimestamp = microtime( true );
$this->trxFname = $fname; $this->trxFname = $fname;
$this->trxDoneWrites = false; $this->trxDoneWrites = false;
@ -3331,7 +3419,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
final public function commit( $fname = __METHOD__, $flush = '' ) { final public function commit( $fname = __METHOD__, $flush = '' ) {
if ( $this->trxLevel && $this->trxAtomicLevels ) { if ( $this->trxLevel && $this->trxAtomicLevels ) {
// There are still atomic sections open. This cannot be ignored // There are still atomic sections open. This cannot be ignored
$levels = implode( ', ', $this->trxAtomicLevels ); $levels = array_reduce( $this->trxAtomicLevels, function ( $accum, $v ) {
return $accum === null ? $v[0] : "$accum, " . $v[0];
} );
throw new DBUnexpectedError( throw new DBUnexpectedError(
$this, $this,
"$fname: Got COMMIT while atomic sections $levels are still open." "$fname: Got COMMIT while atomic sections $levels are still open."

View file

@ -1053,6 +1053,19 @@ class DatabaseMssql extends Database {
return false; return false;
} }
protected function doSavepoint( $identifier, $fname ) {
$this->query( 'SAVE TRANSACTION ' . $this->addIdentifierQuotes( $identifier ), $fname );
}
protected function doReleaseSavepoint( $identifier, $fname ) {
// Not supported. Also not really needed, a new doSavepoint() for the
// same identifier will overwrite the old.
}
protected function doRollbackToSavepoint( $identifier, $fname ) {
$this->query( 'ROLLBACK TRANSACTION ' . $this->addIdentifierQuotes( $identifier ), $fname );
}
/** /**
* Begin a transaction, committing any previously open transaction * Begin a transaction, committing any previously open transaction
* @param string $fname * @param string $fname

View file

@ -1580,19 +1580,19 @@ interface IDatabase {
/** /**
* Begin an atomic section of statements * Begin an atomic section of statements
* *
* If a transaction has been started already, just keep track of the given * If a transaction has been started already, sets a savepoint and tracks
* section name to make sure the transaction is not committed pre-maturely. * the given section name to make sure the transaction is not committed
* This function can be used in layers (with sub-sections), so use a stack * pre-maturely. This function can be used in layers (with sub-sections),
* to keep track of the different atomic sections. If there is no transaction, * so use a stack to keep track of the different atomic sections. If there
* start one implicitly. * is no transaction, one is started implicitly.
* *
* The goal of this function is to create an atomic section of SQL queries * The goal of this function is to create an atomic section of SQL queries
* without having to start a new transaction if it already exists. * without having to start a new transaction if it already exists.
* *
* All atomic levels *must* be explicitly closed using IDatabase::endAtomic(), * All atomic levels *must* be explicitly closed using IDatabase::endAtomic()
* and any database transactions cannot be began or committed until all atomic * or IDatabase::cancelAtomic(), and any database transactions cannot be
* levels are closed. There is no such thing as implicitly opening or closing * began or committed until all atomic levels are closed. There is no such
* an atomic section. * thing as implicitly opening or closing an atomic section.
* *
* @since 1.23 * @since 1.23
* @param string $fname * @param string $fname
@ -1613,6 +1613,26 @@ interface IDatabase {
*/ */
public function endAtomic( $fname = __METHOD__ ); public function endAtomic( $fname = __METHOD__ );
/**
* Cancel an atomic section of SQL statements
*
* This will roll back only the statements executed since the start of the
* most recent atomic section, and close that section. If a transaction was
* open before the corresponding startAtomic() call, any statements before
* that call are *not* rolled back and the transaction remains open. If the
* corresponding startAtomic() implicitly started a transaction, that
* transaction is rolled back.
*
* Note that a call to IDatabase::rollback() will also roll back any open
* atomic sections.
*
* @since 1.31
* @see IDatabase::startAtomic
* @param string $fname
* @throws DBError
*/
public function cancelAtomic( $fname = __METHOD__ );
/** /**
* Run a callback to do an atomic set of updates for this database * Run a callback to do an atomic set of updates for this database
* *
@ -1620,17 +1640,18 @@ interface IDatabase {
* - This database object * - This database object
* - The value of $fname * - The value of $fname
* *
* If any exception occurs in the callback, then rollback() will be called and the error will * If any exception occurs in the callback, then cancelAtomic() will be
* be re-thrown. It may also be that the rollback itself fails with an exception before then. * called to back out any statements executed by the callback and the error
* In any case, such errors are expected to terminate the request, without any outside caller * will be re-thrown. It may also be that the cancel itself fails with an
* attempting to catch errors and commit anyway. Note that any rollback undoes all prior * exception before then. In any case, such errors are expected to
* atomic section and uncommitted updates, which trashes the current request, requiring an * terminate the request, without any outside caller attempting to catch
* error to be displayed. * errors and commit anyway.
* *
* This can be an alternative to explicit startAtomic()/endAtomic() calls. * This can be an alternative to explicit startAtomic()/endAtomic()/cancelAtomic() calls.
* *
* @see Database::startAtomic * @see Database::startAtomic
* @see Database::endAtomic * @see Database::endAtomic
* @see Database::cancelAtomic
* *
* @param string $fname Caller name (usually __METHOD__) * @param string $fname Caller name (usually __METHOD__)
* @param callable $callback Callback that issues DB updates * @param callable $callback Callback that issues DB updates
@ -1638,7 +1659,9 @@ interface IDatabase {
* @throws DBError * @throws DBError
* @throws RuntimeException * @throws RuntimeException
* @throws UnexpectedValueException * @throws UnexpectedValueException
* @since 1.27 * @since 1.27; prior to 1.31 this did a rollback() instead of
* cancelAtomic(), and assumed no callers up the stack would ever try to
* catch the exception.
*/ */
public function doAtomicSection( $fname, callable $callback ); public function doAtomicSection( $fname, callable $callback );

View file

@ -1352,4 +1352,110 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
$this->assertSame( 'CAST( fieldName AS INTEGER )', $output ); $this->assertSame( 'CAST( fieldName AS INTEGER )', $output );
} }
/**
* @covers \Wikimedia\Rdbms\Database::doSavepoint
* @covers \Wikimedia\Rdbms\Database::doReleaseSavepoint
* @covers \Wikimedia\Rdbms\Database::doRollbackToSavepoint
* @covers \Wikimedia\Rdbms\Database::startAtomic
* @covers \Wikimedia\Rdbms\Database::endAtomic
* @covers \Wikimedia\Rdbms\Database::cancelAtomic
* @covers \Wikimedia\Rdbms\Database::doAtomicSection
*/
public function testAtomicSections() {
$this->database->startAtomic( __METHOD__ );
$this->database->endAtomic( __METHOD__ );
$this->assertLastSql( 'BEGIN; COMMIT' );
$this->database->startAtomic( __METHOD__ );
$this->database->cancelAtomic( __METHOD__ );
$this->assertLastSql( 'BEGIN; ROLLBACK' );
$this->database->begin( __METHOD__ );
$this->database->startAtomic( __METHOD__ );
$this->database->endAtomic( __METHOD__ );
$this->database->commit( __METHOD__ );
// phpcs:ignore Generic.Files.LineLength
$this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; RELEASE SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
$this->database->begin( __METHOD__ );
$this->database->startAtomic( __METHOD__ );
$this->database->cancelAtomic( __METHOD__ );
$this->database->commit( __METHOD__ );
// phpcs:ignore Generic.Files.LineLength
$this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
$this->database->startAtomic( __METHOD__ );
$this->database->startAtomic( __METHOD__ );
$this->database->cancelAtomic( __METHOD__ );
$this->database->endAtomic( __METHOD__ );
// phpcs:ignore Generic.Files.LineLength
$this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
$this->database->doAtomicSection( __METHOD__, function () {
} );
$this->assertLastSql( 'BEGIN; COMMIT' );
$this->database->begin( __METHOD__ );
$this->database->doAtomicSection( __METHOD__, function () {
} );
$this->database->rollback( __METHOD__ );
// phpcs:ignore Generic.Files.LineLength
$this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; RELEASE SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK' );
$this->database->begin( __METHOD__ );
try {
$this->database->doAtomicSection( __METHOD__, function () {
throw new RuntimeException( 'Test exception' );
} );
$this->fail( 'Expected exception not thrown' );
} catch ( RuntimeException $ex ) {
$this->assertSame( 'Test exception', $ex->getMessage() );
}
$this->database->commit( __METHOD__ );
// phpcs:ignore Generic.Files.LineLength
$this->assertLastSql( 'BEGIN; SAVEPOINT wikimedia_rdbms_atomic1; ROLLBACK TO SAVEPOINT wikimedia_rdbms_atomic1; COMMIT' );
}
public static function provideAtomicSectionMethodsForErrors() {
return [
[ 'endAtomic' ],
[ 'cancelAtomic' ],
];
}
/**
* @dataProvider provideAtomicSectionMethodsForErrors
* @covers \Wikimedia\Rdbms\Database::endAtomic
* @covers \Wikimedia\Rdbms\Database::cancelAtomic
*/
public function testNoAtomicSection( $method ) {
try {
$this->database->$method( __METHOD__ );
$this->fail( 'Expected exception not thrown' );
} catch ( DBUnexpectedError $ex ) {
$this->assertSame(
'No atomic transaction is open (got ' . __METHOD__ . ').',
$ex->getMessage()
);
}
}
/**
* @dataProvider provideAtomicSectionMethodsForErrors
* @covers \Wikimedia\Rdbms\Database::endAtomic
* @covers \Wikimedia\Rdbms\Database::cancelAtomic
*/
public function testInvalidAtomicSectionEnded( $method ) {
$this->database->startAtomic( __METHOD__ . 'X' );
try {
$this->database->$method( __METHOD__ );
$this->fail( 'Expected exception not thrown' );
} catch ( DBUnexpectedError $ex ) {
$this->assertSame(
'Invalid atomic section ended (got ' . __METHOD__ . ').',
$ex->getMessage()
);
}
}
} }