rdbms: Moving replication-related code to its own component

Remove 'insertSelectIsSafe' option, unused.

Remove 'topologicalPrimaryConnRef' option, no longer used as of two
months ago with I41a57247503 (8c9398f7f9).

Remove unneeded DatabaseSqlite::getTopologyBasedServerId
implementation which can inherit null instead of overriding with string
of "0". Only caller is SqlBagOStuff::makeTimestampedModificationToken
which can be used as MainStash DB, where its important that a given
server always has the same unique name within a set of db hosts that
may replicate to each other. By inheriting null as topology server ID,
it SqlBagOStuff will use IDatabase::getServerName instead. That in turn
uses the 'host' connection parameter, which defaults to null in
DatabaseFactory, and then falls back to the string "unknown" which is
as good as "0" for this purpose.

Bug: T299691
Change-Id: Iceb65c28cdd3c4a89b3c8b34c3f95d3285718ec0
This commit is contained in:
Amir Sarabadani 2022-10-13 16:03:27 +02:00 committed by Krinkle
parent 06c79bc445
commit 9b078129d2
14 changed files with 775 additions and 727 deletions

View file

@ -2811,6 +2811,8 @@ $wgAutoloadLocalClasses = [
'Wikimedia\\Rdbms\\PostgresField' => __DIR__ . '/includes/libs/rdbms/field/PostgresField.php',
'Wikimedia\\Rdbms\\PostgresResultWrapper' => __DIR__ . '/includes/libs/rdbms/database/resultwrapper/PostgresResultWrapper.php',
'Wikimedia\\Rdbms\\QueryStatus' => __DIR__ . '/includes/libs/rdbms/database/utils/QueryStatus.php',
'Wikimedia\\Rdbms\\Replication\\MysqlReplicationReporter' => __DIR__ . '/includes/libs/rdbms/database/replication/MysqlReplicationReporter.php',
'Wikimedia\\Rdbms\\Replication\\ReplicationReporter' => __DIR__ . '/includes/libs/rdbms/database/replication/ReplicationReporter.php',
'Wikimedia\\Rdbms\\ResultWrapper' => __DIR__ . '/includes/libs/rdbms/database/resultwrapper/ResultWrapper.php',
'Wikimedia\\Rdbms\\SQLiteField' => __DIR__ . '/includes/libs/rdbms/field/SQLiteField.php',
'Wikimedia\\Rdbms\\SchemaBuilder' => __DIR__ . '/includes/libs/rdbms/dbal/SchemaBuilder.php',

View file

@ -30,6 +30,7 @@ use Throwable;
use Wikimedia\AtEase\AtEase;
use Wikimedia\Rdbms\Database\DatabaseFlags;
use Wikimedia\Rdbms\Platform\SQLPlatform;
use Wikimedia\Rdbms\Replication\ReplicationReporter;
use Wikimedia\RequestTimeout\CriticalSectionProvider;
use Wikimedia\RequestTimeout\CriticalSectionScope;
use Wikimedia\ScopedCallback;
@ -50,8 +51,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
protected $connLogger;
/** @var LoggerInterface */
protected $queryLogger;
/** @var LoggerInterface */
protected $replLogger;
/** @var callable Error logging callback */
protected $errorLogger;
/** @var callable Deprecation logging callback */
@ -70,9 +69,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
/** @var object|resource|null Database connection */
protected $conn;
/** @var ?IDatabase Lazy handle to the most authoritative primary server for the dataset */
protected $topologicalPrimaryConnRef;
/** @var string|null Server that this instance is currently connected to */
protected $server;
/** @var string|null User that this instance is currently connected under the name of */
@ -85,8 +81,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
protected $cliMode;
/** @var string Agent name for query profiling */
protected $agent;
/** @var string Replication topology role of the server; one of the class ROLE_* constants */
protected $topologyRole;
/** @var array<string,mixed> Connection parameters used by initConnection() and open() */
protected $connectionParams;
/** @var string[]|int[]|float[] SQL variables values to use for all new connections */
@ -109,9 +103,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
/** @var array<string,array> Map of (name => (type,pristine,trx ID)) for current temp tables */
protected $sessionTempTables = [];
/** @var array|null Replication lag estimate at the time of BEGIN for the last transaction */
private $trxReplicaLagStatus = null;
/** @var int|null Affected row count for the last query statement */
protected $affectedRowCount;
@ -198,6 +189,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
/** @var SQLPlatform */
protected $platform;
/** @var ReplicationReporter */
protected $replicationReporter;
/**
* @note exceptions for missing libraries/drivers should be thrown in initConnection()
* @stable to call
@ -226,7 +220,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
];
$this->lbInfo = $params['lbInfo'] ?? [];
$this->topologicalPrimaryConnRef = $params['topologicalPrimaryConnRef'] ?? null;
$this->connectionVariables = $params['variables'] ?? [];
// Set SQL mode, default is turning them all off, can be overridden or skipped with null
if ( is_string( $params['sqlMode'] ?? null ) ) {
@ -238,14 +231,12 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$this->cliMode = (bool)$params['cliMode'];
$this->agent = (string)$params['agent'];
$this->serverName = $params['serverName'];
$this->topologyRole = $params['topologyRole'];
$this->nonNativeInsertSelectBatchSize = $params['nonNativeInsertSelectBatchSize'] ?? 10000;
$this->srvCache = $params['srvCache'];
$this->profiler = is_callable( $params['profiler'] ) ? $params['profiler'] : null;
$this->connLogger = $params['connLogger'];
$this->queryLogger = $params['queryLogger'];
$this->replLogger = $params['replLogger'];
$this->errorLogger = $params['errorLogger'];
$this->deprecationLogger = $params['deprecationLogger'];
@ -263,6 +254,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$this->currentDomain,
$this->errorLogger
);
// Children classes must set $this->replicationReporter.
}
/**
@ -343,9 +335,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
* - lbInfo: Optional map of field/values for the managing load balancer instance.
* The "master" and "replica" fields are used to flag the replication role of this
* database server and whether methods like getLag() should actually issue queries.
* - topologicalPrimaryConnRef: lazy-connecting IDatabase handle to the most authoritative
* primary database server for the cluster that this database belongs to. This handle is
* used for replication status purposes. This is generally managed by LoadBalancer.
* - connLogger: Optional PSR-3 logger interface instance.
* - queryLogger: Optional PSR-3 logger interface instance.
* - profiler : Optional callback that takes a section name argument and returns
@ -406,14 +395,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
return $this->getServerVersion();
}
public function getTopologyBasedServerId() {
return null;
}
public function getTopologyRole() {
return $this->topologyRole;
}
/**
* Get important session state that cannot be recovered upon connection loss
*
@ -1096,7 +1077,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$priorSessInfo = $this->getCriticalSessionInfo();
// Get the transaction-aware SQL string used for profiling
$prefix = ( $this->topologyRole === self::ROLE_STREAMING_MASTER ) ? 'role-primary: ' : '';
$prefix = ( $this->getTopologyRole() === self::ROLE_STREAMING_MASTER ) ? 'role-primary: ' : '';
$generalizedSql = new GeneralizedSql( $summarySql, $prefix );
// Start profile section
@ -2319,34 +2300,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
}
}
/**
* @inheritDoc
* @since 1.37
* @stable to override
*/
public function primaryPosWait( DBPrimaryPos $pos, $timeout ) {
# Real waits are implemented in the subclass.
return 0;
}
/**
* @inheritDoc
* @stable to override
*/
public function getReplicaPos() {
# Stub
return false;
}
/**
* @inheritDoc
* @stable to override
*/
public function getPrimaryPos() {
# Stub
return false;
}
/**
* @inheritDoc
* @stable to override
@ -2753,13 +2706,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
// Treat "BEGIN" as a trivial query to gauge the RTT delay
$rtt = max( $timeEnd - $timeStart, 0.0 );
$this->transactionManager->newTrxId( $mode, $fname, $rtt );
// With REPEATABLE-READ isolation, the first SELECT establishes the read snapshot,
// so get the replication lag estimate before any transaction SELECT queries come in.
// This way, the lag estimate reflects what will actually be read. Also, if heartbeat
// tables are used, this avoids counting snapshot lag as part of replication lag.
$this->trxReplicaLagStatus = null; // clear cached value first
$this->trxReplicaLagStatus = $this->getApproximateLagStatus();
$this->replicationReporter->resetReplicationLagStatus( $this );
$this->completeCriticalSection( __METHOD__, $cs );
}
@ -3043,52 +2990,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
return $ok;
}
public function getSessionLagStatus() {
return $this->getRecordedTransactionLagStatus() ?: $this->getApproximateLagStatus();
}
/**
* Get the replica DB lag when the current transaction started
*
* This is useful given that transactions might use point-in-time read snapshots,
* in which case the lag estimate should be recorded just before the transaction
* establishes the read snapshot (either BEGIN or the first SELECT/write query).
*
* If snapshots are not used, it is still safe to be pessimistic.
*
* This returns null if there is no transaction or the lag status was not yet recorded.
*
* @return array|null ('lag': seconds or false, 'since': UNIX timestamp of BEGIN) or null
* @since 1.27
*/
final protected function getRecordedTransactionLagStatus() {
return $this->trxLevel() ? $this->trxReplicaLagStatus : null;
}
/**
* Get a replica DB lag estimate for this server at the start of a transaction
*
* This is a no-op unless the server is known a priori to be a replica DB
*
* @stable to override
* @return array ('lag': seconds or false on error, 'since': UNIX timestamp of estimate)
* @since 1.27
*/
protected function getApproximateLagStatus() {
if ( $this->topologyRole === self::ROLE_STREAMING_REPLICA ) {
// Avoid exceptions as this is used internally in critical sections
try {
$lag = $this->getLag();
} catch ( DBError $e ) {
$lag = false;
}
} else {
$lag = 0;
}
return [ 'lag' => $lag, 'since' => microtime( true ) ];
}
/**
* Merge the result of getSessionLagStatus() for several DBs
* using the most pessimistic values to estimate the lag of
@ -3131,31 +3032,6 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
return $res;
}
public function getLag() {
if ( $this->topologyRole === self::ROLE_STREAMING_MASTER ) {
return 0; // this is the primary DB
} elseif ( $this->topologyRole === self::ROLE_STATIC_CLONE ) {
return 0; // static dataset
}
return $this->doGetLag();
}
/**
* Get the amount of replication lag for this database server
*
* Callers should avoid using this method while a transaction is active
*
* @see getLag()
*
* @stable to override
* @return float|int|false Database replication lag in seconds or false on error
* @throws DBError
*/
protected function doGetLag() {
return 0;
}
/**
* @inheritDoc
* @stable to override
@ -3499,10 +3375,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
* @return array|false Tuple of (read-only reason, "role" or "lb") or false if it is not
*/
protected function getReadOnlyReason() {
if ( $this->topologyRole === self::ROLE_STREAMING_REPLICA ) {
return [ 'Server is configured as a read-only replica database.', 'role' ];
} elseif ( $this->topologyRole === self::ROLE_STATIC_CLONE ) {
return [ 'Server is configured as a read-only static clone database.', 'role' ];
$reason = $this->replicationReporter->getTopologyBasedReadOnlyReason();
if ( $reason ) {
return $reason;
}
$reason = $this->getLBInfo( self::LB_READ_ONLY_REASON );
@ -3999,6 +3874,41 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
}
/* End of methods delegated to SQLPlatform. */
/* Start of methods delegated to ReplicationReporter. */
public function primaryPosWait( DBPrimaryPos $pos, $timeout ) {
return $this->replicationReporter->primaryPosWait( $this, $pos, $timeout );
}
public function getReplicaPos() {
return $this->replicationReporter->getReplicaPos( $this );
}
public function getPrimaryPos() {
return $this->replicationReporter->getPrimaryPos( $this );
}
public function getTopologyRole() {
return $this->replicationReporter->getTopologyRole();
}
public function getTopologyBasedServerId() {
return $this->replicationReporter->getTopologyBasedServerId( $this );
}
protected function getApproximateLagStatus() {
return $this->replicationReporter->getApproximateLagStatus( $this );
}
public function getLag() {
return $this->replicationReporter->getLag( $this );
}
public function getSessionLagStatus() {
return $this->replicationReporter->getSessionLagStatus( $this );
}
/* End of methods delegated to ReplicationReporter. */
}
/**

View file

@ -63,9 +63,6 @@ class DatabaseFactory {
* - lbInfo: Optional map of field/values for the managing load balancer instance.
* The "master" and "replica" fields are used to flag the replication role of this
* database server and whether methods like getLag() should actually issue queries.
* - topologicalPrimaryConnRef: lazy-connecting IDatabase handle to the most authoritative
* primary database server for the cluster that this database belongs to. This handle is
* used for replication status purposes. This is generally managed by LoadBalancer.
* - connLogger: Optional PSR-3 logger interface instance.
* - queryLogger: Optional PSR-3 logger interface instance.
* - profiler : Optional callback that takes a section name argument and returns
@ -104,7 +101,6 @@ class DatabaseFactory {
'serverName' => null,
'topologyRole' => null,
// Objects and callbacks
'topologicalPrimaryConnRef' => $params['topologicalPrimaryConnRef'] ?? null,
'srvCache' => $params['srvCache'] ?? new HashBagOStuff(),
'profiler' => $params['profiler'] ?? null,
'trxProfiler' => $params['trxProfiler'] ?? new TransactionProfiler(),

View file

@ -19,11 +19,10 @@
*/
namespace Wikimedia\Rdbms;
use InvalidArgumentException;
use RuntimeException;
use stdClass;
use Wikimedia\Rdbms\Platform\ISQLPlatform;
use Wikimedia\Rdbms\Platform\MySQLPlatform;
use Wikimedia\Rdbms\Replication\MysqlReplicationReporter;
/**
* MySQL database abstraction layer.
@ -40,14 +39,6 @@ use Wikimedia\Rdbms\Platform\MySQLPlatform;
* @see Database
*/
abstract class DatabaseMysqlBase extends Database {
/** @var MySQLPrimaryPos */
protected $lastKnownReplicaPos;
/** @var string Method to detect replica DB lag */
protected $lagDetectionMethod;
/** @var array Method to detect replica DB lag */
protected $lagDetectionOptions = [];
/** @var bool bool Whether to use GTID methods */
protected $useGTIDs = false;
/** @var string|null */
protected $sslKeyPath;
/** @var string|null */
@ -67,20 +58,12 @@ abstract class DatabaseMysqlBase extends Database {
/** @var bool|null */
protected $defaultBigSelects;
/** @var bool|null */
private $insertSelectIsSafe;
/** @var stdClass|null */
private $replicationInfoRow;
// Cache getServerId() for 24 hours
private const SERVER_ID_CACHE_TTL = 86400;
/** @var float Warn if lag estimates are made for transactions older than this many seconds */
private const LAG_STALE_WARN_THRESHOLD = 0.100;
/** @var ISQLPlatform */
protected $platform;
/** @var MysqlReplicationReporter */
protected $replicationReporter;
/**
* Additional $params include:
* - lagDetectionMethod : set to one of (Seconds_Behind_Master,pt-heartbeat).
@ -93,7 +76,6 @@ abstract class DatabaseMysqlBase extends Database {
* row having a server_id matching that of the immediate replication source server
* for the given replica.
* - useGTIDs : use GTID methods like MASTER_GTID_WAIT() when possible.
* - insertSelectIsSafe : force that native INSERT SELECT is or is not safe [default: null]
* - sslKeyPath : path to key file [default: null]
* - sslCertPath : path to certificate file [default: null]
* - sslCAFile: path to a single certificate authority PEM file [default: null]
@ -102,9 +84,6 @@ abstract class DatabaseMysqlBase extends Database {
* @param array $params
*/
public function __construct( array $params ) {
$this->lagDetectionMethod = $params['lagDetectionMethod'] ?? 'Seconds_Behind_Master';
$this->lagDetectionOptions = $params['lagDetectionOptions'] ?? [];
$this->useGTIDs = !empty( $params['useGTIDs' ] );
foreach ( [ 'KeyPath', 'CertPath', 'CAFile', 'CAPath', 'Ciphers' ] as $name ) {
$var = "ssl{$name}";
if ( isset( $params[$var] ) ) {
@ -112,8 +91,6 @@ abstract class DatabaseMysqlBase extends Database {
}
}
$this->utf8Mode = !empty( $params['utf8Mode'] );
$this->insertSelectIsSafe = isset( $params['insertSelectIsSafe'] )
? (bool)$params['insertSelectIsSafe'] : null;
parent::__construct( $params );
$this->platform = new MySQLPlatform(
$this,
@ -121,6 +98,14 @@ abstract class DatabaseMysqlBase extends Database {
$this->currentDomain,
$this->errorLogger
);
$this->replicationReporter = new MysqlReplicationReporter(
$params['topologyRole'],
$params['replLogger'],
$params['srvCache'],
$params['lagDetectionMethod'] ?? 'Seconds_Behind_Master',
$params['lagDetectionOptions'] ?? [],
!empty( $params['useGTIDs' ] )
);
}
/**
@ -261,7 +246,7 @@ abstract class DatabaseMysqlBase extends Database {
abstract protected function mysqlError( $conn = null );
protected function isInsertSelectSafe( array $insertOptions, array $selectOptions ) {
$row = $this->getReplicationSafetyInfo();
$row = $this->replicationReporter->getReplicationSafetyInfo( $this );
// For row-based-replication, the resulting changes will be relayed, not the query
if ( $row->binlog_format === 'ROW' ) {
return true;
@ -283,25 +268,6 @@ abstract class DatabaseMysqlBase extends Database {
);
}
/**
* @return stdClass Process cached row
*/
protected function getReplicationSafetyInfo() {
if ( $this->replicationInfoRow === null ) {
$this->replicationInfoRow = $this->selectRow(
false,
[
'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
'binlog_format' => '@@binlog_format',
],
[],
__METHOD__
);
}
return $this->replicationInfoRow;
}
/**
* Estimate rows in dataset
* Returns estimated count, based on EXPLAIN output
@ -447,381 +413,6 @@ abstract class DatabaseMysqlBase extends Database {
*/
abstract protected function mysqlRealEscapeString( $s );
protected function doGetLag() {
if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
return $this->getLagFromPtHeartbeat();
} else {
return $this->getLagFromSlaveStatus();
}
}
/**
* @return int|false Second of lag
*/
protected function getLagFromSlaveStatus() {
$res = $this->query(
'SHOW SLAVE STATUS',
__METHOD__,
self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
);
$row = $res ? $res->fetchObject() : false;
// If the server is not replicating, there will be no row
if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
// https://mariadb.com/kb/en/delayed-replication/
// https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
}
return false;
}
/**
* @return float|false Seconds of lag
*/
protected function getLagFromPtHeartbeat() {
$currentTrxInfo = $this->getRecordedTransactionLagStatus();
if ( $currentTrxInfo ) {
// There is an active transaction and the initial lag was already queried
$staleness = microtime( true ) - $currentTrxInfo['since'];
if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
// Avoid returning higher and higher lag value due to snapshot age
// given that the isolation level will typically be REPEATABLE-READ
// but UTC_TIMESTAMP() is not affected by point-in-time snapshots
$this->queryLogger->warning(
"Using cached lag value for {db_server} due to active transaction",
$this->getLogContext( [
'method' => __METHOD__,
'age' => $staleness,
'exception' => new RuntimeException()
] )
);
}
return $currentTrxInfo['lag'];
}
$ago = $this->fetchSecondsSinceHeartbeat();
if ( $ago !== null ) {
return max( $ago, 0.0 );
}
$this->queryLogger->error(
"Unable to find pt-heartbeat row for {db_server}",
$this->getLogContext( [
'method' => __METHOD__
] )
);
return false;
}
/**
* @return float|null Elapsed seconds since the newest beat or null if none was found
* @see https://www.percona.com/doc/percona-toolkit/2.1/pt-heartbeat.html
*/
protected function fetchSecondsSinceHeartbeat() {
// Some setups might have pt-heartbeat running on each replica server.
// Exclude the row for events originating on this DB server. Assume that
// there is only one active replication channel and that any other row
// getting updates must be the row for the primary DB server.
$where = $this->makeList(
$this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
self::LIST_AND
);
// User mysql server time so that query time and trip time are not counted.
// Use ORDER BY for channel based queries since that field might not be UNIQUE.
$res = $this->query(
"SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
"FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
__METHOD__,
self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE
);
$row = $res ? $res->fetchObject() : false;
return $row ? ( $row->us_ago / 1e6 ) : null;
}
protected function getApproximateLagStatus() {
if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
// Disable caching since this is fast enough and we don't want
// to be *too* pessimistic by having both the cache TTL and the
// pt-heartbeat interval count as lag in getSessionLagStatus()
return parent::getApproximateLagStatus();
}
$key = $this->srvCache->makeGlobalKey( 'mysql-lag', $this->getServerName() );
$approxLag = $this->srvCache->get( $key );
if ( !$approxLag ) {
$approxLag = parent::getApproximateLagStatus();
$this->srvCache->set( $key, $approxLag, 1 );
}
return $approxLag;
}
public function primaryPosWait( DBPrimaryPos $pos, $timeout ) {
if ( !( $pos instanceof MySQLPrimaryPos ) ) {
throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
}
if ( $this->topologyRole === self::ROLE_STATIC_CLONE ) {
$this->queryLogger->debug(
"Bypassed replication wait; database has a static dataset",
$this->getLogContext( [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
);
return 0; // this is a copy of a read-only dataset with no primary DB
} elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
$this->queryLogger->debug(
"Bypassed replication wait; replication known to have reached {raw_pos}",
$this->getLogContext( [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
);
return 0; // already reached this point for sure
}
// Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
if ( $pos->getGTIDs() ) {
// Get the GTIDs from this replica server too see the domains (channels)
$refPos = $this->getReplicaPos();
if ( !$refPos ) {
$this->queryLogger->error(
"Could not get replication position on replica DB to compare to {raw_pos}",
$this->getLogContext( [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
);
return -1; // this is the primary DB itself?
}
// GTIDs with domains (channels) that are active and are present on the replica
$gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
if ( !$gtidsWait ) {
$this->queryLogger->error(
"No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
$this->getLogContext( [
'method' => __METHOD__,
'raw_pos' => $pos,
'current_pos' => $refPos
] )
);
return -1; // $pos is from the wrong cluster?
}
// Wait on the GTID set
$gtidArg = $this->addQuotes( implode( ',', $gtidsWait ) );
if ( strpos( $gtidArg, ':' ) !== false ) {
// MySQL GTIDs, e.g "source_id:transaction_id"
$sql = "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)";
} else {
// MariaDB GTIDs, e.g."domain:server:sequence"
$sql = "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)";
}
$waitPos = implode( ',', $gtidsWait );
} else {
// Wait on the binlog coordinates
$encFile = $this->addQuotes( $pos->getLogFile() );
// @phan-suppress-next-line PhanTypeArraySuspiciousNullable
$encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
$sql = "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)";
$waitPos = $pos->__toString();
}
$start = microtime( true );
$flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
$res = $this->query( $sql, __METHOD__, $flags );
$row = $res->fetchRow();
$seconds = max( microtime( true ) - $start, 0 );
// Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
$status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
if ( $status === null ) {
$this->replLogger->error(
"An error occurred while waiting for replication to reach {wait_pos}",
$this->getLogContext( [
'raw_pos' => $pos,
'wait_pos' => $waitPos,
'sql' => $sql,
'seconds_waited' => $seconds,
'exception' => new RuntimeException()
] )
);
} elseif ( $status < 0 ) {
$this->replLogger->error(
"Timed out waiting for replication to reach {wait_pos}",
$this->getLogContext( [
'raw_pos' => $pos,
'wait_pos' => $waitPos,
'timeout' => $timeout,
'sql' => $sql,
'seconds_waited' => $seconds,
'exception' => new RuntimeException()
] )
);
} elseif ( $status >= 0 ) {
$this->replLogger->debug(
"Replication has reached {wait_pos}",
$this->getLogContext( [
'raw_pos' => $pos,
'wait_pos' => $waitPos,
'seconds_waited' => $seconds,
] )
);
// Remember that this position was reached to save queries next time
$this->lastKnownReplicaPos = $pos;
}
return $status;
}
/**
* Get the position of the primary DB from SHOW SLAVE STATUS
*
* @return MySQLPrimaryPos|false
*/
public function getReplicaPos() {
$now = microtime( true ); // as-of-time *before* fetching GTID variables
if ( $this->useGTIDs() ) {
// Try to use GTIDs, fallbacking to binlog positions if not possible
$data = $this->getServerGTIDs( __METHOD__ );
// Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
return new MySQLPrimaryPos( $data[$name], $now );
}
}
}
$data = $this->getServerRoleStatus( 'SLAVE', __METHOD__ );
if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
return new MySQLPrimaryPos(
"{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
$now
);
}
return false;
}
/**
* Get the position of the primary DB from SHOW MASTER STATUS
*
* @return MySQLPrimaryPos|false
*/
public function getPrimaryPos() {
$now = microtime( true ); // as-of-time *before* fetching GTID variables
$pos = false;
if ( $this->useGTIDs() ) {
// Try to use GTIDs, fallbacking to binlog positions if not possible
$data = $this->getServerGTIDs( __METHOD__ );
// Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
$pos = new MySQLPrimaryPos( $data[$name], $now );
break;
}
}
// Filter domains that are inactive or not relevant to the session
if ( $pos ) {
$pos->setActiveOriginServerId( $this->getServerId() );
$pos->setActiveOriginServerUUID( $this->getServerUUID() );
if ( isset( $data['gtid_domain_id'] ) ) {
$pos->setActiveDomain( $data['gtid_domain_id'] );
}
}
}
if ( !$pos ) {
$data = $this->getServerRoleStatus( 'MASTER', __METHOD__ );
if ( $data && strlen( $data['File'] ) ) {
$pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
}
}
return $pos;
}
/**
* @inheritDoc
* @return string|null 32 bit integer ID; null if not applicable or unknown
*/
public function getTopologyBasedServerId() {
return $this->getServerId();
}
/**
* @return string Value of server_id (32-bit integer, unique to the replication topology)
* @throws DBQueryError
*/
protected function getServerId() {
$fname = __METHOD__;
return $this->srvCache->getWithSetCallback(
$this->srvCache->makeGlobalKey( 'mysql-server-id', $this->getServerName() ),
self::SERVER_ID_CACHE_TTL,
function () use ( $fname ) {
$flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
$res = $this->query( "SELECT @@server_id AS id", $fname, $flags );
return $res->fetchObject()->id;
}
);
}
/**
* @return string|null Value of server_uuid (hyphenated 128-bit hex string, globally unique)
* @throws DBQueryError
*/
protected function getServerUUID() {
$fname = __METHOD__;
return $this->srvCache->getWithSetCallback(
$this->srvCache->makeGlobalKey( 'mysql-server-uuid', $this->getServerName() ),
self::SERVER_ID_CACHE_TTL,
function () use ( $fname ) {
$flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
$res = $this->query( "SHOW GLOBAL VARIABLES LIKE 'server_uuid'", $fname, $flags );
$row = $res->fetchObject();
return $row ? $row->Value : null;
}
);
}
/**
* @param string $fname
* @return string[]
*/
protected function getServerGTIDs( $fname = __METHOD__ ) {
$map = [];
$flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
// Get global-only variables like gtid_executed
$res = $this->query( "SHOW GLOBAL VARIABLES LIKE 'gtid_%'", $fname, $flags );
foreach ( $res as $row ) {
$map[$row->Variable_name] = $row->Value;
}
// Get session-specific (e.g. gtid_domain_id since that is were writes will log)
$res = $this->query( "SHOW SESSION VARIABLES LIKE 'gtid_%'", $fname, $flags );
foreach ( $res as $row ) {
$map[$row->Variable_name] = $row->Value;
}
return $map;
}
/**
* @param string $role One of "MASTER"/"SLAVE"
* @param string $fname
* @return array<string,mixed>|null Latest available server status row; false on failure
*/
protected function getServerRoleStatus( $role, $fname = __METHOD__ ) {
$flags = self::QUERY_SILENCE_ERRORS | self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
$res = $this->query( "SHOW $role STATUS", $fname, $flags );
$row = $res ? $res->fetchRow() : false;
return ( $row ?: null );
}
public function serverIsReadOnly() {
// Avoid SHOW to avoid internal temporary tables
$flags = self::QUERY_IGNORE_DBO_TRX | self::QUERY_CHANGE_NONE;
@ -1209,13 +800,6 @@ abstract class DatabaseMysqlBase extends Database {
return $sql;
}
/**
* @return bool Whether GTID support is used (mockable for testing)
*/
protected function useGTIDs() {
return $this->useGTIDs;
}
}
/**

View file

@ -21,6 +21,7 @@ namespace Wikimedia\Rdbms;
use RuntimeException;
use Wikimedia\Rdbms\Platform\PostgresPlatform;
use Wikimedia\Rdbms\Replication\ReplicationReporter;
use Wikimedia\WaitConditionLoop;
/**
@ -61,6 +62,11 @@ class DatabasePostgres extends Database {
$this->keywordTableMap = $params['keywordTableMap'];
}
$this->replicationReporter = new ReplicationReporter(
$params['topologyRole'],
$params['replLogger'],
$params['srvCache']
);
parent::__construct( $params );
$this->platform = new PostgresPlatform(

View file

@ -28,6 +28,7 @@ use PDOStatement;
use RuntimeException;
use Wikimedia\Rdbms\Platform\ISQLPlatform;
use Wikimedia\Rdbms\Platform\SqlitePlatform;
use Wikimedia\Rdbms\Replication\ReplicationReporter;
/**
* This is the SQLite database abstraction layer.
@ -103,6 +104,11 @@ class DatabaseSqlite extends Database {
$this->currentDomain,
$this->errorLogger
);
$this->replicationReporter = new ReplicationReporter(
$params['topologyRole'],
$params['replLogger'],
$params['srvCache']
);
}
public static function getAttributes() {
@ -584,11 +590,6 @@ class DatabaseSqlite extends Database {
return false;
}
public function getTopologyBasedServerId() {
// Sqlite topologies trivially consist of single primary server for the dataset
return '0';
}
public function serverIsReadOnly() {
$this->assertHasConnectionHandle();

View file

@ -0,0 +1,479 @@
<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
namespace Wikimedia\Rdbms\Replication;
use InvalidArgumentException;
use RuntimeException;
use stdClass;
use Wikimedia\Rdbms\DBPrimaryPos;
use Wikimedia\Rdbms\DBQueryError;
use Wikimedia\Rdbms\IDatabase;
use Wikimedia\Rdbms\MySQLPrimaryPos;
use Wikimedia\Rdbms\Platform\ISQLPlatform;
/**
* @internal
* @ingroup Database
* @since 1.40
*/
class MysqlReplicationReporter extends ReplicationReporter {
/** @var MySQLPrimaryPos */
protected $lastKnownReplicaPos;
/** @var string Method to detect replica DB lag */
protected $lagDetectionMethod;
/** @var array Method to detect replica DB lag */
protected $lagDetectionOptions = [];
/** @var bool bool Whether to use GTID methods */
protected $useGTIDs = false;
/** @var stdClass|null */
private $replicationInfoRow;
// Cache getServerId() for 24 hours
private const SERVER_ID_CACHE_TTL = 86400;
/** @var float Warn if lag estimates are made for transactions older than this many seconds */
private const LAG_STALE_WARN_THRESHOLD = 0.100;
public function __construct(
$topologyRole,
$logger,
$srvCache,
$lagDetectionMethod,
$lagDetectionOptions,
$useGTIDs
) {
parent::__construct( $topologyRole, $logger, $srvCache );
$this->lagDetectionMethod = $lagDetectionMethod;
$this->lagDetectionOptions = $lagDetectionOptions;
$this->useGTIDs = $useGTIDs;
}
protected function doGetLag( IDatabase $conn ) {
if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
return $this->getLagFromPtHeartbeat( $conn );
} else {
return $this->getLagFromSlaveStatus( $conn );
}
}
/**
* @param IDatabase $conn To make queries
* @return int|false Second of lag
*/
protected function getLagFromSlaveStatus( IDatabase $conn ) {
$res = $conn->query(
'SHOW SLAVE STATUS',
__METHOD__,
ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE
);
$row = $res ? $res->fetchObject() : false;
// If the server is not replicating, there will be no row
if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
// https://mariadb.com/kb/en/delayed-replication/
// https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
}
return false;
}
/**
* @param IDatabase $conn To make queries
* @return float|false Seconds of lag
*/
protected function getLagFromPtHeartbeat( IDatabase $conn ) {
$currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
if ( $currentTrxInfo ) {
// There is an active transaction and the initial lag was already queried
$staleness = microtime( true ) - $currentTrxInfo['since'];
if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
// Avoid returning higher and higher lag value due to snapshot age
// given that the isolation level will typically be REPEATABLE-READ
// but UTC_TIMESTAMP() is not affected by point-in-time snapshots
$this->logger->warning(
"Using cached lag value for {db_server} due to active transaction",
$this->getLogContext( $conn, [
'method' => __METHOD__,
'age' => $staleness,
'exception' => new RuntimeException()
] )
);
}
return $currentTrxInfo['lag'];
}
$ago = $this->fetchSecondsSinceHeartbeat( $conn );
if ( $ago !== null ) {
return max( $ago, 0.0 );
}
$this->logger->error(
"Unable to find pt-heartbeat row for {db_server}",
$this->getLogContext( $conn, [
'method' => __METHOD__
] )
);
return false;
}
/**
* @param IDatabase $conn To make queries
* @return float|null Elapsed seconds since the newest beat or null if none was found
* @see https://www.percona.com/doc/percona-toolkit/2.1/pt-heartbeat.html
*/
protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
// Some setups might have pt-heartbeat running on each replica server.
// Exclude the row for events originating on this DB server. Assume that
// there is only one active replication channel and that any other row
// getting updates must be the row for the primary DB server.
$where = $conn->makeList(
$this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
ISQLPlatform::LIST_AND
);
// User mysql server time so that query time and trip time are not counted.
// Use ORDER BY for channel based queries since that field might not be UNIQUE.
$res = $conn->query(
"SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
"FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
__METHOD__,
ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE
);
$row = $res ? $res->fetchObject() : false;
return $row ? ( $row->us_ago / 1e6 ) : null;
}
public function getApproximateLagStatus( IDatabase $conn ) {
if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
// Disable caching since this is fast enough and we don't want
// to be *too* pessimistic by having both the cache TTL and the
// pt-heartbeat interval count as lag in getSessionLagStatus()
return parent::getApproximateLagStatus( $conn );
}
$key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
$approxLag = $this->srvCache->get( $key );
if ( !$approxLag ) {
$approxLag = parent::getApproximateLagStatus( $conn );
$this->srvCache->set( $key, $approxLag, 1 );
}
return $approxLag;
}
/**
* @param IDatabase $conn To make queries
* @return stdClass Process cached row
*/
public function getReplicationSafetyInfo( IDatabase $conn ) {
if ( $this->replicationInfoRow === null ) {
$this->replicationInfoRow = $conn->selectRow(
[],
[
'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
'binlog_format' => '@@binlog_format',
],
[],
__METHOD__
);
}
return $this->replicationInfoRow;
}
/**
* @return bool Whether GTID support is used (mockable for testing)
*/
protected function useGTIDs() {
return $this->useGTIDs;
}
public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
if ( !( $pos instanceof MySQLPrimaryPos ) ) {
throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
}
if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
$this->logger->debug(
"Bypassed replication wait; database has a static dataset",
$this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
);
return 0; // this is a copy of a read-only dataset with no primary DB
} elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
$this->logger->debug(
"Bypassed replication wait; replication known to have reached {raw_pos}",
$this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
);
return 0; // already reached this point for sure
}
// Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
if ( $pos->getGTIDs() ) {
// Get the GTIDs from this replica server too see the domains (channels)
$refPos = $this->getReplicaPos( $conn );
if ( !$refPos ) {
$this->logger->error(
"Could not get replication position on replica DB to compare to {raw_pos}",
$this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
);
return -1; // this is the primary DB itself?
}
// GTIDs with domains (channels) that are active and are present on the replica
$gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
if ( !$gtidsWait ) {
$this->logger->error(
"No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
$this->getLogContext( $conn, [
'method' => __METHOD__,
'raw_pos' => $pos,
'current_pos' => $refPos
] )
);
return -1; // $pos is from the wrong cluster?
}
// Wait on the GTID set
$gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
if ( strpos( $gtidArg, ':' ) !== false ) {
// MySQL GTIDs, e.g "source_id:transaction_id"
$sql = "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)";
} else {
// MariaDB GTIDs, e.g."domain:server:sequence"
$sql = "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)";
}
$waitPos = implode( ',', $gtidsWait );
} else {
// Wait on the binlog coordinates
$encFile = $conn->addQuotes( $pos->getLogFile() );
// @phan-suppress-next-line PhanTypeArraySuspiciousNullable
$encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
$sql = "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)";
$waitPos = $pos->__toString();
}
$start = microtime( true );
$flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
$res = $conn->query( $sql, __METHOD__, $flags );
$row = $res->fetchRow();
$seconds = max( microtime( true ) - $start, 0 );
// Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
$status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
if ( $status === null ) {
$this->logger->error(
"An error occurred while waiting for replication to reach {wait_pos}",
$this->getLogContext( $conn, [
'raw_pos' => $pos,
'wait_pos' => $waitPos,
'sql' => $sql,
'seconds_waited' => $seconds,
'exception' => new RuntimeException()
] )
);
} elseif ( $status < 0 ) {
$this->logger->error(
"Timed out waiting for replication to reach {wait_pos}",
$this->getLogContext( $conn, [
'raw_pos' => $pos,
'wait_pos' => $waitPos,
'timeout' => $timeout,
'sql' => $sql,
'seconds_waited' => $seconds,
'exception' => new RuntimeException()
] )
);
} elseif ( $status >= 0 ) {
$this->logger->debug(
"Replication has reached {wait_pos}",
$this->getLogContext( $conn, [
'raw_pos' => $pos,
'wait_pos' => $waitPos,
'seconds_waited' => $seconds,
] )
);
// Remember that this position was reached to save queries next time
$this->lastKnownReplicaPos = $pos;
}
return $status;
}
/**
* Get the position of the primary DB from SHOW SLAVE STATUS
*
* @param IDatabase $conn To make queries
* @return MySQLPrimaryPos|false
*/
public function getReplicaPos( IDatabase $conn ) {
$now = microtime( true ); // as-of-time *before* fetching GTID variables
if ( $this->useGTIDs() ) {
// Try to use GTIDs, fallbacking to binlog positions if not possible
$data = $this->getServerGTIDs( $conn, __METHOD__ );
// Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
return new MySQLPrimaryPos( $data[$name], $now );
}
}
}
$data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
return new MySQLPrimaryPos(
"{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
$now
);
}
return false;
}
/**
* Get the position of the primary DB from SHOW MASTER STATUS
*
* @param IDatabase $conn To make queries
* @return MySQLPrimaryPos|false
*/
public function getPrimaryPos( IDatabase $conn ) {
$now = microtime( true ); // as-of-time *before* fetching GTID variables
$pos = false;
if ( $this->useGTIDs() ) {
// Try to use GTIDs, fallbacking to binlog positions if not possible
$data = $this->getServerGTIDs( $conn, __METHOD__ );
// Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
$pos = new MySQLPrimaryPos( $data[$name], $now );
break;
}
}
// Filter domains that are inactive or not relevant to the session
if ( $pos ) {
$pos->setActiveOriginServerId( $this->getServerId( $conn ) );
$pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
if ( isset( $data['gtid_domain_id'] ) ) {
$pos->setActiveDomain( $data['gtid_domain_id'] );
}
}
}
if ( !$pos ) {
$data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
if ( $data && strlen( $data['File'] ) ) {
$pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
}
}
return $pos;
}
/**
* @inheritDoc
* @param IDatabase $conn To make queries
* @return string|null 32 bit integer ID; null if not applicable or unknown
*/
public function getTopologyBasedServerId( IDatabase $conn ) {
return $this->getServerId( $conn );
}
/**
* @param IDatabase $conn To make queries
* @return string Value of server_id (32-bit integer, unique to the replication topology)
* @throws DBQueryError
*/
protected function getServerId( IDatabase $conn ) {
$fname = __METHOD__;
return $this->srvCache->getWithSetCallback(
$this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
self::SERVER_ID_CACHE_TTL,
static function () use ( $conn, $fname ) {
$flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
$res = $conn->query( "SELECT @@server_id AS id", $fname, $flags );
return $res->fetchObject()->id;
}
);
}
/**
* @param IDatabase $conn To make queries
* @return string|null Value of server_uuid (hyphenated 128-bit hex string, globally unique)
* @throws DBQueryError
*/
protected function getServerUUID( IDatabase $conn ) {
$fname = __METHOD__;
return $this->srvCache->getWithSetCallback(
$this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
self::SERVER_ID_CACHE_TTL,
static function () use ( $conn, $fname ) {
$flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
$res = $conn->query( "SHOW GLOBAL VARIABLES LIKE 'server_uuid'", $fname, $flags );
$row = $res->fetchObject();
return $row ? $row->Value : null;
}
);
}
/**
* @param IDatabase $conn To make queries
* @param string $fname
* @return string[]
*/
protected function getServerGTIDs( IDatabase $conn, $fname = __METHOD__ ) {
$map = [];
$flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;
// Get global-only variables like gtid_executed
$res = $conn->query( "SHOW GLOBAL VARIABLES LIKE 'gtid_%'", $fname, $flags );
foreach ( $res as $row ) {
$map[$row->Variable_name] = $row->Value;
}
// Get session-specific (e.g. gtid_domain_id since that is were writes will log)
$res = $conn->query( "SHOW SESSION VARIABLES LIKE 'gtid_%'", $fname, $flags );
foreach ( $res as $row ) {
$map[$row->Variable_name] = $row->Value;
}
return $map;
}
/**
* @param IDatabase $conn To make queries
* @param string $role One of "MASTER"/"SLAVE"
* @param string $fname
* @return array<string,mixed>|null Latest available server status row; false on failure
*/
protected function getServerRoleStatus( IDatabase $conn, $role, $fname = __METHOD__ ) {
$flags = ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX |
ISQLPlatform::QUERY_CHANGE_NONE;
$res = $conn->query( "SHOW $role STATUS", $fname, $flags );
$row = $res ? $res->fetchRow() : false;
return ( $row ?: null );
}
}

View file

@ -0,0 +1,184 @@
<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
namespace Wikimedia\Rdbms\Replication;
use BagOStuff;
use Psr\Log\LoggerInterface;
use Wikimedia\Rdbms\DBError;
use Wikimedia\Rdbms\DBPrimaryPos;
use Wikimedia\Rdbms\IDatabase;
/**
* @internal
* @ingroup Database
* @since 1.40
*/
class ReplicationReporter {
/** @var string Replication topology role of the server; one of the class ROLE_* constants */
protected $topologyRole;
/** @var LoggerInterface */
protected $logger;
/** @var BagOStuff */
protected $srvCache;
/** @var array|null Replication lag estimate at the time of BEGIN for the last transaction */
private $trxReplicaLagStatus = null;
public function __construct( $topologyRole, $logger, $srvCache ) {
$this->topologyRole = $topologyRole;
$this->logger = $logger;
$this->srvCache = $srvCache;
}
public function getTopologyRole() {
return $this->topologyRole;
}
public function getLag( IDatabase $conn ) {
if ( $this->topologyRole === IDatabase::ROLE_STREAMING_MASTER ) {
return 0; // this is the primary DB
} elseif ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
return 0; // static dataset
}
return $this->doGetLag( $conn );
}
/**
* Get the amount of replication lag for this database server
*
* Callers should avoid using this method while a transaction is active
*
* @see getLag()
*
* @param IDatabase $conn To make queries
* @return float|int|false Database replication lag in seconds or false on error
* @throws DBError
*/
protected function doGetLag( IDatabase $conn ) {
return 0;
}
/**
* Get a replica DB lag estimate for this server at the start of a transaction
*
* This is a no-op unless the server is known a priori to be a replica DB
*
* @param IDatabase $conn To make queries
* @return array ('lag': seconds or false on error, 'since': UNIX timestamp of estimate)
* @since 1.27 in Database, moved to ReplicationReporter in 1.40
*/
public function getApproximateLagStatus( IDatabase $conn ) {
if ( $this->topologyRole === IDatabase::ROLE_STREAMING_REPLICA ) {
// Avoid exceptions as this is used internally in critical sections
try {
$lag = $this->getLag( $conn );
} catch ( DBError $e ) {
$lag = false;
}
} else {
$lag = 0;
}
return [ 'lag' => $lag, 'since' => microtime( true ) ];
}
public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
// Real waits are implemented in the subclass.
return 0;
}
public function getReplicaPos( IDatabase $conn ) {
// Stub
return false;
}
public function getPrimaryPos( IDatabase $conn ) {
// Stub
return false;
}
public function getTopologyBasedReadOnlyReason() {
if ( $this->topologyRole === IDatabase::ROLE_STREAMING_REPLICA ) {
return [ 'Server is configured as a read-only replica database.', 'role' ];
} elseif ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
return [ 'Server is configured as a read-only static clone database.', 'role' ];
}
return [];
}
public function resetReplicationLagStatus( IDatabase $conn ) {
// With REPEATABLE-READ isolation, the first SELECT establishes the read snapshot,
// so get the replication lag estimate before any transaction SELECT queries come in.
// This way, the lag estimate reflects what will actually be read. Also, if heartbeat
// tables are used, this avoids counting snapshot lag as part of replication lag.
$this->trxReplicaLagStatus = null; // clear cached value first
$this->trxReplicaLagStatus = $this->getApproximateLagStatus( $conn );
}
/**
* Get the replica DB lag when the current transaction started
*
* This is useful given that transactions might use point-in-time read snapshots,
* in which case the lag estimate should be recorded just before the transaction
* establishes the read snapshot (either BEGIN or the first SELECT/write query).
*
* If snapshots are not used, it is still safe to be pessimistic.
*
* This returns null if there is no transaction or the lag status was not yet recorded.
*
* @param IDatabase $conn To make queries
* @return array|null ('lag': seconds or false, 'since': UNIX timestamp of BEGIN) or null
* @since 1.27 in Database, moved to ReplicationReporter in 1.40
*/
final protected function getRecordedTransactionLagStatus( IDatabase $conn ) {
return $conn->trxLevel() ? $this->trxReplicaLagStatus : null;
}
public function getSessionLagStatus( IDatabase $conn ) {
return $this->getRecordedTransactionLagStatus( $conn ) ?: $this->getApproximateLagStatus( $conn );
}
/**
* Create a log context to pass to PSR-3 logger functions.
*
* @param IDatabase $conn To make queries
* @param array $extras Additional data to add to context
* @return array
*/
protected function getLogContext( IDatabase $conn, array $extras = [] ) {
return array_merge(
[
'db_server' => $conn->getServerName(),
'db_name' => $conn->getDBname(),
// TODO: Add db_user
],
$extras
);
}
/**
* @param IDatabase $conn To make queries
* @return string|null 32 bit integer ID; null if not applicable or unknown
*/
public function getTopologyBasedServerId( IDatabase $conn ) {
return null;
}
}

View file

@ -1177,12 +1177,6 @@ class LoadBalancer implements ILoadBalancerForOwner {
// Inject the PHP execution mode and the agent string
'cliMode' => $this->cliMode,
'agent' => $this->agent,
// Inject object and callback dependencies
'topologicalPrimaryConnRef' => $this->getConnectionRef(
self::DB_PRIMARY,
[],
$domain->getId()
),
'srvCache' => $this->srvCache,
'connLogger' => $this->connLogger,
'queryLogger' => $this->queryLogger,

View file

@ -8,6 +8,7 @@ use Wikimedia\Rdbms\Database\DatabaseFlags;
use Wikimedia\Rdbms\DatabaseDomain;
use Wikimedia\Rdbms\FakeResultWrapper;
use Wikimedia\Rdbms\QueryStatus;
use Wikimedia\Rdbms\Replication\ReplicationReporter;
use Wikimedia\Rdbms\TransactionProfiler;
use Wikimedia\RequestTimeout\RequestTimeout;
@ -46,7 +47,7 @@ class DatabaseTestHelper extends Database {
protected $forcedAffectedCountQueue = [];
public function __construct( $testName, array $opts = [] ) {
parent::__construct( $opts + [
$params = $opts + [
'host' => null,
'user' => null,
'password' => null,
@ -72,11 +73,17 @@ class DatabaseTestHelper extends Database {
},
'criticalSectionProvider' =>
RequestTimeout::singleton()->createCriticalSectionProvider( 120 )
] );
];
parent::__construct( $params );
$this->testName = $testName;
$this->platform = new SQLPlatformTestHelper( new AddQuoterMock() );
$this->flagsHolder = new DatabaseFlags( 0 );
$this->replicationReporter = new ReplicationReporter(
$params['topologyRole'],
$params['replLogger'],
$params['srvCache']
);
$this->currentDomain = DatabaseDomain::newUnspecified();
$this->open( 'localhost', 'testuser', 'password', 'testdb', null, '' );

View file

@ -83,8 +83,6 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
}
public function testLBFactorySimpleServers() {
global $wgDBserver;
$primaryConfig = $this->getPrimaryServerConfig();
$fakeReplica = [ 'load' => 100, ] + $primaryConfig;
@ -101,30 +99,15 @@ class LBFactoryTest extends MediaWikiIntegrationTestCase {
$dbw = $lb->getConnection( DB_PRIMARY );
$dbw->ensureConnection();
$wConn = TestingAccessWrapper::newFromObject( $dbw )->conn;
$wConnWrap = TestingAccessWrapper::newFromObject( $wConn );
$this->assertEquals(
$dbw::ROLE_STREAMING_MASTER, $dbw->getTopologyRole(), 'primary shows as primary' );
$this->assertInstanceOf( IDatabase::class, $wConnWrap->topologicalPrimaryConnRef );
$this->assertEquals(
( $wgDBserver != '' ) ? $wgDBserver : 'localhost',
$wConnWrap->topologicalPrimaryConnRef->getServerName(),
'cluster primary is set' );
$dbr = $lb->getConnection( DB_REPLICA );
$dbr->ensureConnection();
$rConn = TestingAccessWrapper::newFromObject( $dbr )->conn;
$rConnWrap = TestingAccessWrapper::newFromObject( $rConn );
$this->assertEquals(
$dbr::ROLE_STREAMING_REPLICA, $dbr->getTopologyRole(), 'replica shows as replica' );
$this->assertInstanceOf( IDatabase::class, $rConnWrap->topologicalPrimaryConnRef );
$this->assertEquals(
( $wgDBserver != '' ) ? $wgDBserver : 'localhost',
$rConnWrap->topologicalPrimaryConnRef->getServerName(),
'cluster primary is set'
);
$factory->shutdown();
}

View file

@ -148,8 +148,6 @@ class LoadBalancerTest extends MediaWikiIntegrationTestCase {
* @covers \Wikimedia\Rdbms\LoadBalancer::getClusterName()
*/
public function testWithReplica() {
global $wgDBserver;
// Simulate web request with DBO_TRX
$lb = $this->newMultiServerLocalLoadBalancer( [], [ 'flags' => DBO_TRX ] );
@ -172,11 +170,6 @@ class LoadBalancerTest extends MediaWikiIntegrationTestCase {
$this->assertSame(
$dbw::ROLE_STREAMING_MASTER, $dbw->getTopologyRole(), 'primary shows as primary' );
$this->assertSame(
( $wgDBserver != '' ) ? $wgDBserver : 'localhost',
$wConnWrap->topologicalPrimaryConnRef->getServerName(),
'cluster primary is set'
);
$this->assertTrue( $dbw->getFlag( $dbw::DBO_TRX ), "DBO_TRX set on primary" );
$this->assertWriteAllowed( $dbw );
@ -188,11 +181,6 @@ class LoadBalancerTest extends MediaWikiIntegrationTestCase {
$this->assertSame(
$dbr::ROLE_STREAMING_REPLICA, $dbr->getTopologyRole(), 'replica shows as replica' );
$this->assertTrue( $dbr->isReadOnly(), 'replica shows as replica' );
$this->assertSame(
( $wgDBserver != '' ) ? $wgDBserver : 'localhost',
$rConnWrap->topologicalPrimaryConnRef->getServerName(),
'cluster master is set'
);
$this->assertTrue( $dbr->getFlag( $dbw::DBO_TRX ), "DBO_TRX set on replica" );
$this->assertSame( $dbr->getLBInfo( 'serverIndex' ), $lb->getReaderIndex() );

View file

@ -32,6 +32,7 @@ use Wikimedia\Rdbms\IDatabase;
use Wikimedia\Rdbms\IMaintainableDatabase;
use Wikimedia\Rdbms\MySQLPrimaryPos;
use Wikimedia\Rdbms\Platform\MySQLPlatform;
use Wikimedia\Rdbms\Replication\MysqlReplicationReporter;
use Wikimedia\TestingAccessWrapper;
/**
@ -252,20 +253,21 @@ class DatabaseMysqlBaseTest extends PHPUnit\Framework\TestCase {
* @dataProvider provideLagAmounts
*/
public function testPtHeartbeat( $lag ) {
$db = $this->getMockBuilder( DatabaseMysqli::class )
/** @var IDatabase $db */
$db = $this->getMockBuilder( IDatabase::class )
->disableOriginalConstructor()
->getMock();
$db->setLBInfo( 'replica', true );
$replicationReporter = $this->getMockBuilder( MysqlReplicationReporter::class )
->disableOriginalConstructor()
->onlyMethods( [ 'fetchSecondsSinceHeartbeat' ] )
->getMock();
TestingAccessWrapper::newFromObject( $db )->lagDetectionMethod = 'pt-heartbeat';
TestingAccessWrapper::newFromObject( $replicationReporter )->lagDetectionMethod = 'pt-heartbeat';
$replicationReporter->method( 'fetchSecondsSinceHeartbeat' )->willReturn( $lag );
$db->setLBInfo( 'replica', true );
$db->method( 'fetchSecondsSinceHeartbeat' )->willReturn( $lag );
/** @var IDatabase $db */
$db->setLBInfo( 'clusterMasterHost', 'db1052' );
$lagEst = $db->getLag();
$lagEst = $replicationReporter->getLag( $db );
$this->assertGreaterThan( $lag - 0.010, $lagEst, "Correct heatbeat lag" );
$this->assertLessThan( $lag + 0.010, $lagEst, "Correct heatbeat lag" );
@ -290,7 +292,10 @@ class DatabaseMysqlBaseTest extends PHPUnit\Framework\TestCase {
* @covers \Wikimedia\Rdbms\DatabaseMysqlBase
*/
public function testServerGtidTable( $gtable, $rBLtable, $mBLtable, $rGTIDs, $mGTIDs ) {
$db = $this->getMockBuilder( DatabaseMysqli::class )
$db = $this->getMockBuilder( IDatabase::class )
->disableOriginalConstructor()
->getMock();
$replicationReporter = $this->getMockBuilder( MysqlReplicationReporter::class )
->disableOriginalConstructor()
->onlyMethods( [
'useGTIDs',
@ -301,10 +306,10 @@ class DatabaseMysqlBaseTest extends PHPUnit\Framework\TestCase {
] )
->getMock();
$db->method( 'useGTIDs' )->willReturn( true );
$db->method( 'getServerGTIDs' )->willReturn( $gtable );
$db->method( 'getServerRoleStatus' )->willReturnCallback(
static function ( $role ) use ( $rBLtable, $mBLtable ) {
$replicationReporter->method( 'useGTIDs' )->willReturn( true );
$replicationReporter->method( 'getServerGTIDs' )->willReturn( $gtable );
$replicationReporter->method( 'getServerRoleStatus' )->willReturnCallback(
static function ( $db, $role ) use ( $rBLtable, $mBLtable ) {
if ( $role === 'SLAVE' ) {
return $rBLtable;
} elseif ( $role === 'MASTER' ) {
@ -314,19 +319,19 @@ class DatabaseMysqlBaseTest extends PHPUnit\Framework\TestCase {
return null;
}
);
$db->method( 'getServerId' )->willReturn( 1 );
$db->method( 'getServerUUID' )->willReturn( '2E11FA47-71CA-11E1-9E33-C80AA9429562' );
$replicationReporter->method( 'getServerId' )->willReturn( 1 );
$replicationReporter->method( 'getServerUUID' )->willReturn( '2E11FA47-71CA-11E1-9E33-C80AA9429562' );
/** @var DatabaseMysqlBase $db */
/** @var DatabaseMysqlBase $replicationReporter */
if ( is_array( $rGTIDs ) ) {
$this->assertEquals( $rGTIDs, $db->getReplicaPos()->getGTIDs() );
$this->assertEquals( $rGTIDs, $replicationReporter->getReplicaPos( $db )->getGTIDs() );
} else {
$this->assertFalse( $db->getReplicaPos() );
$this->assertFalse( $replicationReporter->getReplicaPos( $db ) );
}
if ( is_array( $mGTIDs ) ) {
$this->assertEquals( $mGTIDs, $db->getPrimaryPos()->getGTIDs() );
$this->assertEquals( $mGTIDs, $replicationReporter->getPrimaryPos( $db )->getGTIDs() );
} else {
$this->assertFalse( $db->getPrimaryPos() );
$this->assertFalse( $replicationReporter->getPrimaryPos( $db ) );
}
}
@ -453,99 +458,6 @@ class DatabaseMysqlBaseTest extends PHPUnit\Framework\TestCase {
$this->assertEquals( $pos, $roundtripPos );
}
/**
* @dataProvider provideInsertSelectCases
*/
public function testInsertSelectIsSafe( $insertOpts, $selectOpts, $row, $safe ) {
$db = $this->getMockBuilder( DatabaseMysqli::class )
->disableOriginalConstructor()
->onlyMethods( [ 'getReplicationSafetyInfo' ] )
->getMock();
$db->method( 'getReplicationSafetyInfo' )->willReturn( (object)$row );
$dbw = TestingAccessWrapper::newFromObject( $db );
/** @var Database $dbw */
$this->assertEquals( $safe, $dbw->isInsertSelectSafe( $insertOpts, $selectOpts ) );
}
public function provideInsertSelectCases() {
return [
[
[],
[],
[
'innodb_autoinc_lock_mode' => '2',
'binlog_format' => 'ROW',
],
true
],
[
[],
[ 'LIMIT' => 100 ],
[
'innodb_autoinc_lock_mode' => '2',
'binlog_format' => 'ROW',
],
true
],
[
[],
[ 'LIMIT' => 100 ],
[
'innodb_autoinc_lock_mode' => '0',
'binlog_format' => 'STATEMENT',
],
false
],
[
[],
[],
[
'innodb_autoinc_lock_mode' => '2',
'binlog_format' => 'STATEMENT',
],
false
],
[
[ 'NO_AUTO_COLUMNS' ],
[ 'LIMIT' => 100 ],
[
'innodb_autoinc_lock_mode' => '0',
'binlog_format' => 'STATEMENT',
],
false
],
[
[],
[],
[
'innodb_autoinc_lock_mode' => 0,
'binlog_format' => 'STATEMENT',
],
true
],
[
[ 'NO_AUTO_COLUMNS' ],
[],
[
'innodb_autoinc_lock_mode' => 2,
'binlog_format' => 'STATEMENT',
],
true
],
[
[ 'NO_AUTO_COLUMNS' ],
[],
[
'innodb_autoinc_lock_mode' => 0,
'binlog_format' => 'STATEMENT',
],
true
],
];
}
public function testBuildIntegerCast() {
$db = $this->createPartialMock( DatabaseMysqli::class, [] );
TestingAccessWrapper::newFromObject( $db )->platform = new MySQLPlatform( new AddQuoterMock() );

View file

@ -2,6 +2,7 @@
use MediaWiki\Tests\Unit\Libs\Rdbms\AddQuoterMock;
use MediaWiki\Tests\Unit\Libs\Rdbms\SQLPlatformTestHelper;
use Psr\Log\NullLogger;
use Wikimedia\Rdbms\Database;
use Wikimedia\Rdbms\Database\DatabaseFlags;
use Wikimedia\Rdbms\DatabaseDomain;
@ -14,6 +15,7 @@ use Wikimedia\Rdbms\IResultWrapper;
use Wikimedia\Rdbms\LBFactorySingle;
use Wikimedia\Rdbms\Platform\SQLPlatform;
use Wikimedia\Rdbms\QueryStatus;
use Wikimedia\Rdbms\Replication\ReplicationReporter;
use Wikimedia\Rdbms\TransactionManager;
use Wikimedia\RequestTimeout\CriticalSectionScope;
use Wikimedia\TestingAccessWrapper;
@ -445,9 +447,8 @@ class DatabaseTest extends PHPUnit\Framework\TestCase {
) ) ) )
->getMock();
$wdb = TestingAccessWrapper::newFromObject( $db );
$wdb->connLogger = new \Psr\Log\NullLogger();
$wdb->queryLogger = new \Psr\Log\NullLogger();
$wdb->replLogger = new \Psr\Log\NullLogger();
$wdb->connLogger = new NullLogger();
$wdb->queryLogger = new NullLogger();
$wdb->errorLogger = static function ( Throwable $e ) {
};
$wdb->deprecationLogger = static function ( $msg ) {
@ -460,6 +461,7 @@ class DatabaseTest extends PHPUnit\Framework\TestCase {
'host' => 'localhost',
'user' => 'testuser'
];
$wdb->replicationReporter = new ReplicationReporter( IDatabase::ROLE_STREAMING_MASTER, new NullLogger(), new HashBagOStuff() );
$db->method( 'getServer' )->willReturn( '*dummy*' );
$db->setTransactionManager( new TransactionManager() );