Add DB ConnectionManagers
This moves and refactors the ConsistentReadConnectionManager from Wikibase into the core rdbms lib. The refactoring also creates a generic ConnectionManager. This relates to Iff20a22f9f2bc7ceefd6defc0ed9a494a6fe62c0 which introduced a DB factory / connection manager in an extension revealing the need for this in multiple places. Change-Id: I0c58e15aed5bed88323d18cb95e5008f8d3381c5
This commit is contained in:
parent
6fce2ba116
commit
c3c3cf9696
5 changed files with 603 additions and 0 deletions
|
|
@ -1569,6 +1569,8 @@ $wgAutoloadLocalClasses = [
|
|||
'WikiRevision' => __DIR__ . '/includes/import/WikiRevision.php',
|
||||
'WikiStatsOutput' => __DIR__ . '/maintenance/language/StatOutputs.php',
|
||||
'WikiTextStructure' => __DIR__ . '/includes/content/WikiTextStructure.php',
|
||||
'Wikimedia\\Rdbms\\ConnectionManager' => __DIR__ . '/includes/libs/rdbms/connectionmanager/ConnectionManager.php',
|
||||
'Wikimedia\\Rdbms\\SessionConsistentConnectionManager' => __DIR__ . '/includes/libs/rdbms/connectionmanager/SessionConsistentConnectionManager.php',
|
||||
'WikitextContent' => __DIR__ . '/includes/content/WikitextContent.php',
|
||||
'WikitextContentHandler' => __DIR__ . '/includes/content/WikitextContentHandler.php',
|
||||
'WinCacheBagOStuff' => __DIR__ . '/includes/libs/objectcache/WinCacheBagOStuff.php',
|
||||
|
|
|
|||
180
includes/libs/rdbms/connectionmanager/ConnectionManager.php
Normal file
180
includes/libs/rdbms/connectionmanager/ConnectionManager.php
Normal file
|
|
@ -0,0 +1,180 @@
|
|||
<?php
|
||||
|
||||
namespace Wikimedia\Rdbms;
|
||||
|
||||
use Database;
|
||||
use DBConnRef;
|
||||
use IDatabase;
|
||||
use InvalidArgumentException;
|
||||
use LoadBalancer;
|
||||
|
||||
/**
|
||||
* Database connection manager.
|
||||
*
|
||||
* This manages access to master and replica databases.
|
||||
*
|
||||
* @since 1.29
|
||||
*
|
||||
* @license GPL-2.0+
|
||||
* @author Addshore
|
||||
*/
|
||||
class ConnectionManager {
|
||||
|
||||
/**
|
||||
* @var LoadBalancer
|
||||
*/
|
||||
private $loadBalancer;
|
||||
|
||||
/**
|
||||
* The symbolic name of the target database, or false for the local wiki's database.
|
||||
*
|
||||
* @var string|false
|
||||
*/
|
||||
private $domain;
|
||||
|
||||
/**
|
||||
* @var string[]
|
||||
*/
|
||||
private $groups = [];
|
||||
|
||||
/**
|
||||
* @param LoadBalancer $loadBalancer
|
||||
* @param string|bool $domain Optional logical DB name, defaults to current wiki.
|
||||
* This follows the convention for database names used by $loadBalancer.
|
||||
* @param string[] $groups see LoadBalancer::getConnection
|
||||
*
|
||||
* @throws InvalidArgumentException
|
||||
*/
|
||||
public function __construct( LoadBalancer $loadBalancer, $domain = false, array $groups = [] ) {
|
||||
if ( !is_string( $domain ) && $domain !== false ) {
|
||||
throw new InvalidArgumentException( '$dbName must be a string, or false.' );
|
||||
}
|
||||
|
||||
$this->loadBalancer = $loadBalancer;
|
||||
$this->domain = $domain;
|
||||
$this->groups = $groups;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $i
|
||||
* @param string[]|null $groups
|
||||
*
|
||||
* @return Database
|
||||
*/
|
||||
private function getConnection( $i, array $groups = null ) {
|
||||
$groups = $groups === null ? $this->groups : $groups;
|
||||
return $this->loadBalancer->getConnection( $i, $groups, $this->domain );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $i
|
||||
* @param string[]|null $groups
|
||||
*
|
||||
* @return DBConnRef
|
||||
*/
|
||||
private function getConnectionRef( $i, array $groups = null ) {
|
||||
$groups = $groups === null ? $this->groups : $groups;
|
||||
return $this->loadBalancer->getConnectionRef( $i, $groups, $this->domain );
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a connection to the master DB, for updating. The connection should later be released
|
||||
* by calling releaseConnection().
|
||||
*
|
||||
* @since 1.29
|
||||
*
|
||||
* @return Database
|
||||
*/
|
||||
public function getWriteConnection() {
|
||||
return $this->getConnection( DB_MASTER );
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a database connection for reading. The connection should later be released by
|
||||
* calling releaseConnection().
|
||||
*
|
||||
* @since 1.29
|
||||
*
|
||||
* @param string[]|null $groups
|
||||
*
|
||||
* @return Database
|
||||
*/
|
||||
public function getReadConnection( array $groups = null ) {
|
||||
$groups = $groups === null ? $this->groups : $groups;
|
||||
return $this->getConnection( DB_REPLICA, $groups );
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 1.29
|
||||
*
|
||||
* @param IDatabase $db
|
||||
*/
|
||||
public function releaseConnection( IDatabase $db ) {
|
||||
$this->loadBalancer->reuseConnection( $db );
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a connection ref to the master DB, for updating.
|
||||
*
|
||||
* @since 1.29
|
||||
*
|
||||
* @return DBConnRef
|
||||
*/
|
||||
public function getWriteConnectionRef() {
|
||||
return $this->getConnectionRef( DB_MASTER );
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a database connection ref for reading.
|
||||
*
|
||||
* @since 1.29
|
||||
*
|
||||
* @param string[]|null $groups
|
||||
*
|
||||
* @return DBConnRef
|
||||
*/
|
||||
public function getReadConnectionRef( array $groups = null ) {
|
||||
$groups = $groups === null ? $this->groups : $groups;
|
||||
return $this->getConnectionRef( DB_REPLICA, $groups );
|
||||
}
|
||||
|
||||
/**
|
||||
* Begins an atomic section and returns a database connection to the master DB, for updating.
|
||||
*
|
||||
* @since 1.29
|
||||
*
|
||||
* @param string $fname
|
||||
*
|
||||
* @return Database
|
||||
*/
|
||||
public function beginAtomicSection( $fname ) {
|
||||
$db = $this->getWriteConnection();
|
||||
$db->startAtomic( $fname );
|
||||
|
||||
return $db;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 1.29
|
||||
*
|
||||
* @param IDatabase $db
|
||||
* @param string $fname
|
||||
*/
|
||||
public function commitAtomicSection( IDatabase $db, $fname ) {
|
||||
$db->endAtomic( $fname );
|
||||
$this->releaseConnection( $db );
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 1.29
|
||||
*
|
||||
* @param IDatabase $db
|
||||
* @param string $fname
|
||||
*/
|
||||
public function rollbackAtomicSection( IDatabase $db, $fname ) {
|
||||
// FIXME: there does not seem to be a clean way to roll back an atomic section?!
|
||||
$db->rollback( $fname, 'flush' );
|
||||
$this->releaseConnection( $db );
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,118 @@
|
|||
<?php
|
||||
|
||||
namespace Wikimedia\Rdbms;
|
||||
|
||||
use Database;
|
||||
use DBConnRef;
|
||||
|
||||
/**
|
||||
* Database connection manager.
|
||||
*
|
||||
* This manages access to master and slave databases. It also manages state that indicates whether
|
||||
* the slave databases are possibly outdated after a write operation, and thus the master database
|
||||
* should be used for subsequent read operations.
|
||||
*
|
||||
* @note: Services that access overlapping sets of database tables, or interact with logically
|
||||
* related sets of data in the database, should share a SessionConsistentConnectionManager.
|
||||
* Services accessing unrelated sets of information may prefer to not share a
|
||||
* SessionConsistentConnectionManager, so they can still perform read operations against slave
|
||||
* databases after a (unrelated, per the assumption) write operation to the master database.
|
||||
* Generally, sharing a SessionConsistentConnectionManager improves consistency (by avoiding race
|
||||
* conditions due to replication lag), but can reduce performance (by directing more read
|
||||
* operations to the master database server).
|
||||
*
|
||||
* @since 1.29
|
||||
*
|
||||
* @license GPL-2.0+
|
||||
* @author Daniel Kinzler
|
||||
* @author Addshore
|
||||
*/
|
||||
class SessionConsistentConnectionManager extends ConnectionManager {
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
private $forceWriteConnection = false;
|
||||
|
||||
/**
|
||||
* Forces all future calls to getReadConnection() to return a write connection.
|
||||
* Use this before performing read operations that are critical for a future update.
|
||||
* Calling beginAtomicSection() implies a call to prepareForUpdates().
|
||||
*
|
||||
* @since 1.29
|
||||
*/
|
||||
public function prepareForUpdates() {
|
||||
$this->forceWriteConnection = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 1.29
|
||||
*
|
||||
* @param string[]|null $groups
|
||||
*
|
||||
* @return Database
|
||||
*/
|
||||
public function getReadConnection( array $groups = null ) {
|
||||
if ( $this->forceWriteConnection ) {
|
||||
return parent::getWriteConnection();
|
||||
}
|
||||
|
||||
return parent::getReadConnection( $groups );
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 1.29
|
||||
*
|
||||
* @return Database
|
||||
*/
|
||||
public function getWriteConnection() {
|
||||
$this->prepareForUpdates();
|
||||
return parent::getWriteConnection();
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 1.29
|
||||
*
|
||||
* @param string[]|null $groups
|
||||
*
|
||||
* @return DBConnRef
|
||||
*/
|
||||
public function getReadConnectionRef( array $groups = null ) {
|
||||
if ( $this->forceWriteConnection ) {
|
||||
return parent::getWriteConnectionRef();
|
||||
}
|
||||
|
||||
return parent::getReadConnectionRef( $groups );
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 1.29
|
||||
*
|
||||
* @return DBConnRef
|
||||
*/
|
||||
public function getWriteConnectionRef() {
|
||||
$this->prepareForUpdates();
|
||||
return parent::getWriteConnectionRef();
|
||||
}
|
||||
|
||||
/**
|
||||
* Begins an atomic section and returns a database connection to the master DB, for updating.
|
||||
*
|
||||
* @since 1.29
|
||||
*
|
||||
* @note: This causes all future calls to getReadConnection() to return a connection
|
||||
* to the master DB, even after commitAtomicSection() or rollbackAtomicSection() have
|
||||
* been called.
|
||||
*
|
||||
* @param string $fname
|
||||
*
|
||||
* @return Database
|
||||
*/
|
||||
public function beginAtomicSection( $fname ) {
|
||||
// Once we have written to master, do not read from replica.
|
||||
$this->prepareForUpdates();
|
||||
|
||||
return parent::beginAtomicSection( $fname );
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,139 @@
|
|||
<?php
|
||||
|
||||
namespace Wikimedia\Tests\Rdbms;
|
||||
|
||||
use IDatabase;
|
||||
use LoadBalancer;
|
||||
use PHPUnit_Framework_MockObject_MockObject;
|
||||
use Wikimedia\Rdbms\ConnectionManager;
|
||||
|
||||
/**
|
||||
* @covers Wikimedia\Rdbms\ConnectionManager
|
||||
*
|
||||
* @license GPL-2.0+
|
||||
* @author Daniel Kinzler
|
||||
*/
|
||||
class ConnectionManagerTest extends \PHPUnit_Framework_TestCase {
|
||||
|
||||
/**
|
||||
* @return IDatabase|PHPUnit_Framework_MockObject_MockObject
|
||||
*/
|
||||
private function getIDatabaseMock() {
|
||||
return $this->getMock( IDatabase::class );
|
||||
}
|
||||
|
||||
/**
|
||||
* @return LoadBalancer|PHPUnit_Framework_MockObject_MockObject
|
||||
*/
|
||||
private function getLoadBalancerMock() {
|
||||
$lb = $this->getMockBuilder( LoadBalancer::class )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
|
||||
return $lb;
|
||||
}
|
||||
|
||||
public function testGetReadConnection_nullGroups() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnection' )
|
||||
->with( DB_REPLICA, [ 'group1' ], 'someDbName' )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new ConnectionManager( $lb, 'someDbName', [ 'group1' ] );
|
||||
$actual = $manager->getReadConnection();
|
||||
|
||||
$this->assertSame( $database, $actual );
|
||||
}
|
||||
|
||||
public function testGetReadConnection_withGroups() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnection' )
|
||||
->with( DB_REPLICA, [ 'group2' ], 'someDbName' )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new ConnectionManager( $lb, 'someDbName', [ 'group1' ] );
|
||||
$actual = $manager->getReadConnection( [ 'group2' ] );
|
||||
|
||||
$this->assertSame( $database, $actual );
|
||||
}
|
||||
|
||||
public function testGetWriteConnection() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnection' )
|
||||
->with( DB_MASTER, [ 'group1' ], 'someDbName' )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new ConnectionManager( $lb, 'someDbName', [ 'group1' ] );
|
||||
$actual = $manager->getWriteConnection();
|
||||
|
||||
$this->assertSame( $database, $actual );
|
||||
}
|
||||
|
||||
public function testReleaseConnection() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'reuseConnection' )
|
||||
->with( $database )
|
||||
->will( $this->returnValue( null ) );
|
||||
|
||||
$manager = new ConnectionManager( $lb );
|
||||
$manager->releaseConnection( $database );
|
||||
}
|
||||
|
||||
public function testGetReadConnectionRef_nullGroups() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnectionRef' )
|
||||
->with( DB_REPLICA, [ 'group1' ], 'someDbName' )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new ConnectionManager( $lb, 'someDbName', [ 'group1' ] );
|
||||
$actual = $manager->getReadConnectionRef();
|
||||
|
||||
$this->assertSame( $database, $actual );
|
||||
}
|
||||
|
||||
public function testGetReadConnectionRef_withGroups() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnectionRef' )
|
||||
->with( DB_REPLICA, [ 'group2' ], 'someDbName' )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new ConnectionManager( $lb, 'someDbName', [ 'group1' ] );
|
||||
$actual = $manager->getReadConnectionRef( [ 'group2' ] );
|
||||
|
||||
$this->assertSame( $database, $actual );
|
||||
}
|
||||
|
||||
public function testGetWriteConnectionRef() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnectionRef' )
|
||||
->with( DB_MASTER, [ 'group1' ], 'someDbName' )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new ConnectionManager( $lb, 'someDbName', [ 'group1' ] );
|
||||
$actual = $manager->getWriteConnectionRef();
|
||||
|
||||
$this->assertSame( $database, $actual );
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,164 @@
|
|||
<?php
|
||||
|
||||
namespace Wikimedia\Tests\Rdbms;
|
||||
|
||||
use IDatabase;
|
||||
use LoadBalancer;
|
||||
use PHPUnit_Framework_MockObject_MockObject;
|
||||
use Wikimedia\Rdbms\SessionConsistentConnectionManager;
|
||||
|
||||
/**
|
||||
* @covers Wikimedia\Rdbms\SessionConsistentConnectionManager
|
||||
*
|
||||
* @license GPL-2.0+
|
||||
* @author Daniel Kinzler
|
||||
*/
|
||||
class ConsistentReadConnectionManagerTest extends \PHPUnit_Framework_TestCase {
|
||||
|
||||
/**
|
||||
* @return IDatabase|PHPUnit_Framework_MockObject_MockObject
|
||||
*/
|
||||
private function getIDatabaseMock() {
|
||||
return $this->getMock( IDatabase::class );
|
||||
}
|
||||
|
||||
/**
|
||||
* @return LoadBalancer|PHPUnit_Framework_MockObject_MockObject
|
||||
*/
|
||||
private function getLoadBalancerMock() {
|
||||
$lb = $this->getMockBuilder( LoadBalancer::class )
|
||||
->disableOriginalConstructor()
|
||||
->getMock();
|
||||
|
||||
return $lb;
|
||||
}
|
||||
|
||||
public function testGetReadConnection() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnection' )
|
||||
->with( DB_REPLICA )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new SessionConsistentConnectionManager( $lb );
|
||||
$actual = $manager->getReadConnection();
|
||||
|
||||
$this->assertSame( $database, $actual );
|
||||
}
|
||||
|
||||
public function testGetReadConnectionReturnsWriteDbOnForceMatser() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnection' )
|
||||
->with( DB_MASTER )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new SessionConsistentConnectionManager( $lb );
|
||||
$manager->prepareForUpdates();
|
||||
$actual = $manager->getReadConnection();
|
||||
|
||||
$this->assertSame( $database, $actual );
|
||||
}
|
||||
|
||||
public function testGetWriteConnection() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnection' )
|
||||
->with( DB_MASTER )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new SessionConsistentConnectionManager( $lb );
|
||||
$actual = $manager->getWriteConnection();
|
||||
|
||||
$this->assertSame( $database, $actual );
|
||||
}
|
||||
|
||||
public function testForceMaster() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'getConnection' )
|
||||
->with( DB_MASTER )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$manager = new SessionConsistentConnectionManager( $lb );
|
||||
$manager->prepareForUpdates();
|
||||
$manager->getReadConnection();
|
||||
}
|
||||
|
||||
public function testReleaseConnection() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'reuseConnection' )
|
||||
->with( $database )
|
||||
->will( $this->returnValue( null ) );
|
||||
|
||||
$manager = new SessionConsistentConnectionManager( $lb );
|
||||
$manager->releaseConnection( $database );
|
||||
}
|
||||
|
||||
public function testBeginAtomicSection() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->exactly( 2 ) )
|
||||
->method( 'getConnection' )
|
||||
->with( DB_MASTER )
|
||||
->will( $this->returnValue( $database ) );
|
||||
|
||||
$database->expects( $this->once() )
|
||||
->method( 'startAtomic' )
|
||||
->will( $this->returnValue( null ) );
|
||||
|
||||
$manager = new SessionConsistentConnectionManager( $lb );
|
||||
$manager->beginAtomicSection( 'TEST' );
|
||||
|
||||
// Should also ask for a DB_MASTER connection.
|
||||
// This is asserted by the $lb mock.
|
||||
$manager->getReadConnection();
|
||||
}
|
||||
|
||||
public function testCommitAtomicSection() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'reuseConnection' )
|
||||
->with( $database )
|
||||
->will( $this->returnValue( null ) );
|
||||
|
||||
$database->expects( $this->once() )
|
||||
->method( 'endAtomic' )
|
||||
->will( $this->returnValue( null ) );
|
||||
|
||||
$manager = new SessionConsistentConnectionManager( $lb );
|
||||
$manager->commitAtomicSection( $database, 'TEST' );
|
||||
}
|
||||
|
||||
public function testRollbackAtomicSection() {
|
||||
$database = $this->getIDatabaseMock();
|
||||
$lb = $this->getLoadBalancerMock();
|
||||
|
||||
$lb->expects( $this->once() )
|
||||
->method( 'reuseConnection' )
|
||||
->with( $database )
|
||||
->will( $this->returnValue( null ) );
|
||||
|
||||
$database->expects( $this->once() )
|
||||
->method( 'rollback' )
|
||||
->will( $this->returnValue( null ) );
|
||||
|
||||
$manager = new SessionConsistentConnectionManager( $lb );
|
||||
$manager->rollbackAtomicSection( $database, 'TEST' );
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue