rdbms: optimize insert(), replace(), and upsert() for sqlite when possible

Change-Id: Ic884a4ce42a99333f5176f7b681f8a8bba15d2a1
This commit is contained in:
Aaron Schulz 2019-09-09 16:50:04 -07:00 committed by Krinkle
parent 76d0913a44
commit 4bd1b4b455
5 changed files with 318 additions and 63 deletions

View file

@ -365,7 +365,7 @@ class DBConnRef implements IDatabase {
return $this->__call( __FUNCTION__, func_get_args() );
}
public function insert( $table, $a, $fname = __METHOD__, $options = [] ) {
public function insert( $table, $rows, $fname = __METHOD__, $options = [] ) {
$this->assertRoleAllowsWrites();
return $this->__call( __FUNCTION__, func_get_args() );

View file

@ -2104,9 +2104,17 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
return implode( ' ', $options );
}
public function insert( $table, $a, $fname = __METHOD__, $options = [] ) {
/**
* @param array $a A single (field => value) map or a list of such maps
* @return bool
*/
final protected function isMultiRowArray( array $a ) {
return ( isset( $a[0] ) && is_array( $a[0] ) );
}
public function insert( $table, $rows, $fname = __METHOD__, $options = [] ) {
# No rows to insert, easy just return now
if ( !count( $a ) ) {
if ( !count( $rows ) ) {
return true;
}
@ -2118,12 +2126,12 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$options = $this->makeInsertOptions( $options );
if ( isset( $a[0] ) && is_array( $a[0] ) ) {
$multi = true;
$keys = array_keys( $a[0] );
$multi = $this->isMultiRowArray( $rows );
if ( $multi ) {
$firstRow = $rows[0];
$keys = array_keys( $firstRow );
} else {
$multi = false;
$keys = array_keys( $a );
$keys = array_keys( $rows );
}
$sql = 'INSERT ' . $options .
@ -2131,7 +2139,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
if ( $multi ) {
$first = true;
foreach ( $a as $row ) {
foreach ( $rows as $row ) {
if ( $first ) {
$first = false;
} else {
@ -2140,7 +2148,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
$sql .= '(' . $this->makeList( $row ) . ')';
}
} else {
$sql .= '(' . $this->makeList( $a ) . ')';
$sql .= '(' . $this->makeList( $rows ) . ')';
}
$this->query( $sql, $fname );

View file

@ -36,6 +36,9 @@ use stdClass;
* @ingroup Database
*/
class DatabaseSqlite extends Database {
/** @var FSLockManager (hopefully on the same server as the DB) */
protected $lockMgr;
/** @var string|null Directory for SQLite database files listed under their DB name */
protected $dbDir;
/** @var string|null Explicit path for the SQLite database file */
@ -43,20 +46,20 @@ class DatabaseSqlite extends Database {
/** @var string Transaction mode */
protected $trxMode;
/** @var PDO */
protected $conn;
/** @var string|null */
private $version;
/** @var array List of shared database already attached to this connection */
private $sessionAttachedDbs = [];
/** @var int The number of rows affected as an integer */
protected $lastAffectedRowCount;
/** @var resource */
protected $lastResultHandle;
/** @var PDO */
protected $conn;
/** @var FSLockManager (hopefully on the same server as the DB) */
protected $lockMgr;
/** @var array List of shared database already attached to this connection */
private $sessionAttachedDbs = [];
/** @var string[] See https://www.sqlite.org/lang_transaction.html */
private static $VALID_TRX_MODES = [ '', 'DEFERRED', 'IMMEDIATE', 'EXCLUSIVE' ];
@ -620,7 +623,7 @@ class DatabaseSqlite extends Database {
*/
protected function makeUpdateOptionsArray( $options ) {
$options = parent::makeUpdateOptionsArray( $options );
$options = self::fixIgnore( $options );
$options = $this->rewriteIgnoreKeyword( $options );
return $options;
}
@ -629,7 +632,7 @@ class DatabaseSqlite extends Database {
* @param array $options
* @return array
*/
static function fixIgnore( $options ) {
private function rewriteIgnoreKeyword( $options ) {
# SQLite uses OR IGNORE not just IGNORE
foreach ( $options as $k => $v ) {
if ( $v == 'IGNORE' ) {
@ -640,35 +643,28 @@ class DatabaseSqlite extends Database {
return $options;
}
/**
* @param array $options
* @return string
*/
function makeInsertOptions( $options ) {
$options = self::fixIgnore( $options );
protected function makeInsertOptions( $options ) {
$options = $this->rewriteIgnoreKeyword( $options );
return parent::makeInsertOptions( $options );
}
/**
* Based on generic method (parent) with some prior SQLite-sepcific adjustments
* @param string $table
* @param array $a
* @param string $fname
* @param array $options
* @return bool
*/
function insert( $table, $a, $fname = __METHOD__, $options = [] ) {
if ( !count( $a ) ) {
public function insert( $table, $rows, $fname = __METHOD__, $options = [] ) {
if ( version_compare( $this->getServerVersion(), '3.7.11', '>=' ) ) {
// Batch INSERT support per http://www.sqlite.org/releaselog/3_7_11.html
return parent::insert( $table, $rows, $fname, $options );
}
if ( !$rows ) {
return true;
}
# SQLite can't handle multi-row inserts, so divide up into multiple single-row inserts
if ( isset( $a[0] ) && is_array( $a[0] ) ) {
$multi = $this->isMultiRowArray( $rows );
if ( $multi ) {
$affectedRowCount = 0;
try {
$this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
foreach ( $a as $v ) {
foreach ( $rows as $v ) {
parent::insert( $table, $v, "$fname/multi-row", $options );
$affectedRowCount += $this->affectedRows();
}
@ -679,25 +675,25 @@ class DatabaseSqlite extends Database {
}
$this->affectedRowCount = $affectedRowCount;
} else {
parent::insert( $table, $a, "$fname/single-row", $options );
parent::insert( $table, $rows, "$fname/single-row", $options );
}
return true;
}
/**
* @param string $table
* @param array $uniqueIndexes Unused
* @param string|array $rows
* @param string $fname
*/
function replace( $table, $uniqueIndexes, $rows, $fname = __METHOD__ ) {
if ( !count( $rows ) ) {
public function replace( $table, $uniqueIndexes, $rows, $fname = __METHOD__ ) {
if ( version_compare( $this->getServerVersion(), '3.7.11', '>=' ) ) {
// REPLACE is an alias for "INSERT OR REPLACE" in sqlite
// Batch support for INSERT per http://www.sqlite.org/releaselog/3_7_11.html
$this->nativeReplace( $table, $rows, $fname );
return;
}
# SQLite can't handle multi-row replaces, so divide up into multiple single-row queries
if ( isset( $rows[0] ) && is_array( $rows[0] ) ) {
if ( !$rows ) {
return;
}
if ( $this->isMultiRowArray( $rows ) ) {
$affectedRowCount = 0;
try {
$this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
@ -716,6 +712,44 @@ class DatabaseSqlite extends Database {
}
}
public function upsert( $table, array $rows, $uniqueIndexes, array $set, $fname = __METHOD__ ) {
// Supports UPSERT-like clauses with INSERT, per http://www.sqlite.org/releaselog/3_24_0.html
if ( version_compare( $this->getServerVersion(), '3.24.0', '<' ) ) {
// Use inefficient fallback implementation
return parent::upsert( $table, $rows, $uniqueIndexes, $set, $fname );
}
if ( !$rows ) {
return true;
}
if ( $this->isMultiRowArray( $rows ) ) {
$firstRow = reset( $rows );
$keys = array_keys( $firstRow );
} else {
$keys = array_keys( $rows );
$rows = [ $rows ];
}
$table = $this->tableName( $table );
$first = true;
$sql = "INSERT INTO $table (" . implode( ',', $keys ) . ") VALUES ";
foreach ( $rows as $row ) {
if ( $first ) {
$first = false;
} else {
$sql .= ',';
}
$sql .= '(' . $this->makeList( $row ) . ')';
}
$sql .= ' ON CONFLICT DO UPDATE SET ' . $this->makeList( $set, self::LIST_SET );
$this->query( $sql, $fname );
return true;
}
/**
* Returns the size of a text field, or -1 for "unlimited"
* In SQLite this is SQLITE_MAX_LENGTH, by default 1GB. No way to query it though.
@ -791,9 +825,11 @@ class DatabaseSqlite extends Database {
* @return string Version information from the database
*/
function getServerVersion() {
$ver = $this->getBindingHandle()->getAttribute( PDO::ATTR_SERVER_VERSION );
if ( $this->version === null ) {
$this->version = $this->getBindingHandle()->getAttribute( PDO::ATTR_SERVER_VERSION );
}
return $ver;
return $this->version;
}
/**

View file

@ -886,13 +886,13 @@ interface IDatabase {
*
* @param string $table Table name. This will be passed through
* Database::tableName().
* @param array $a Array of rows to insert
* @param array $rows Array of rows to insert
* @param string $fname Calling function name (use __METHOD__) for logs/profiling
* @param array $options Array of options
* @return bool Return true if no exception was thrown (deprecated since 1.33)
* @throws DBError If an error occurs, see IDatabase::query()
*/
public function insert( $table, $a, $fname = __METHOD__, $options = [] );
public function insert( $table, $rows, $fname = __METHOD__, $options = [] );
/**
* UPDATE wrapper. Takes a condition array and a SET array.

View file

@ -23,11 +23,23 @@ class DatabaseSqliteTest extends \MediaWikiIntegrationTestCase {
if ( !Sqlite::isPresent() ) {
$this->markTestSkipped( 'No SQLite support detected' );
}
$this->db = $this->getMockBuilder( DatabaseSqlite::class )
$this->db = $this->newMockDb();
if ( version_compare( $this->db->getServerVersion(), '3.6.0', '<' ) ) {
$this->markTestSkipped( "SQLite at least 3.6 required, {$this->db->getServerVersion()} found" );
}
}
/**
* @param null $version
* @param null $sqlDump
* @return \PHPUnit\Framework\MockObject\MockObject|DatabaseSqlite
*/
private function newMockDb( $version = null, &$sqlDump = null ) {
$mock = $this->getMockBuilder( DatabaseSqlite::class )
->setConstructorArgs( [ [
'dbFilePath' => ':memory:',
'dbname' => 'Foo',
'schema' => false,
'schema' => null,
'host' => false,
'user' => false,
'password' => false,
@ -42,13 +54,25 @@ class DatabaseSqliteTest extends \MediaWikiIntegrationTestCase {
'queryLogger' => new NullLogger(),
'errorLogger' => null,
'deprecationLogger' => null,
] ] )->setMethods( [ 'query' ] )
->getMock();
$this->db->initConnection();
$this->db->method( 'query' )->willReturn( true );
if ( version_compare( $this->db->getServerVersion(), '3.6.0', '<' ) ) {
$this->markTestSkipped( "SQLite at least 3.6 required, {$this->db->getServerVersion()} found" );
] ] )->setMethods( array_merge(
[ 'query' ],
$version ? [ 'getServerVersion' ] : []
) )->getMock();
$mock->initConnection();
$sqlDump = '';
$mock->method( 'query' )->willReturnCallback( function ( $sql ) use ( &$sqlDump ) {
$sqlDump .= "$sql;";
return true;
} );
if ( $version ) {
$mock->method( 'getServerVersion' )->willReturn( $version );
}
return $mock;
}
/**
@ -551,4 +575,191 @@ class DatabaseSqliteTest extends \MediaWikiIntegrationTestCase {
$attributes = Database::attributesFromType( 'sqlite' );
$this->assertTrue( $attributes[Database::ATTR_DB_LEVEL_LOCKING] );
}
/**
* @covers \Wikimedia\Rdbms\DatabaseSqlite::insert()
* @param string $version
* @param string $table
* @param array $rows
* @param string $expectedSql
* @dataProvider provideNativeInserts
*/
public function testNativeInsertSupport( $version, $table, $rows, $expectedSql ) {
$sqlDump = '';
$db = $this->newMockDb( $version, $sqlDump );
$db->query( 'CREATE TABLE a ( a_1 )', __METHOD__ );
$sqlDump = '';
$db->insert( $table, $rows, __METHOD__ );
$this->assertEquals( $expectedSql, $sqlDump );
}
function provideNativeInserts() {
return [
[
'3.7.11',
'a',
[ 'a_1' => 1 ],
'INSERT INTO a (a_1) VALUES (\'1\');'
],
[
'3.7.10',
'a',
[ 'a_1' => 1 ],
'INSERT INTO a (a_1) VALUES (\'1\');'
],
[
'3.7.11',
'a',
[
[ 'a_1' => 2 ],
[ 'a_1' => 3 ]
],
'INSERT INTO a (a_1) VALUES (\'2\'),(\'3\');'
],
[
'3.7.10',
'a',
[
[ 'a_1' => 2 ],
[ 'a_1' => 3 ]
],
'BEGIN;' .
'INSERT INTO a (a_1) VALUES (\'2\');' .
'INSERT INTO a (a_1) VALUES (\'3\');' .
'COMMIT;'
]
];
}
/**
* @covers \Wikimedia\Rdbms\DatabaseSqlite::replace()
* @param string $version
* @param string $table
* @param array $ukeys
* @param array $rows
* @param string $expectedSql
* @dataProvider provideNativeReplaces
*/
public function testNativeReplaceSupport( $version, $table, $ukeys, $rows, $expectedSql ) {
$sqlDump = '';
$db = $this->newMockDb( $version, $sqlDump );
$db->query( 'CREATE TABLE a ( a_1 PRIMARY KEY, a_2 )', __METHOD__ );
$sqlDump = '';
$db->replace( $table, $ukeys, $rows, __METHOD__ );
$this->assertEquals( $expectedSql, $sqlDump );
}
function provideNativeReplaces() {
return [
[
'3.7.11',
'a',
[ 'a_1' ],
[ 'a_1' => 1, 'a_2' => 'x' ],
'REPLACE INTO a (a_1,a_2) VALUES (\'1\',\'x\');'
],
[
'3.7.10',
'a',
[ 'a_1' ],
[ 'a_1' => 1, 'a_2' => 'x' ],
'REPLACE INTO a (a_1,a_2) VALUES (\'1\',\'x\');'
],
[
'3.7.11',
'a',
[ 'a_1' ],
[
[ 'a_1' => 2, 'a_2' => 'x' ],
[ 'a_1' => 3, 'a_2' => 'y' ]
],
'REPLACE INTO a (a_1,a_2) VALUES (\'2\',\'x\'),(\'3\',\'y\');'
],
[
'3.7.10',
'a',
[ 'a_1' ],
[
[ 'a_1' => 2, 'a_2' => 'x' ],
[ 'a_1' => 3, 'a_2' => 'y' ]
],
'BEGIN;' .
'REPLACE INTO a (a_1,a_2) VALUES (\'2\',\'x\');' .
'REPLACE INTO a (a_1,a_2) VALUES (\'3\',\'y\');' .
'COMMIT;'
]
];
}
/**
* @covers \Wikimedia\Rdbms\DatabaseSqlite::upsert()
* @param string $version
* @param string $table
* @param array $rows
* @param array $ukeys
* @param array $set
* @param string $expectedSql
* @dataProvider provideNativeUpserts
*/
public function testNativeUpsertSupport( $version, $table, $rows, $ukeys, $set, $expectedSql ) {
$sqlDump = '';
$db = $this->newMockDb( $version, $sqlDump );
$db->query( 'CREATE TABLE a ( a_1 PRIMARY KEY, a_2 )', __METHOD__ );
$sqlDump = '';
$db->upsert( $table, $rows, $ukeys, $set, __METHOD__ );
$this->assertEquals( $expectedSql, $sqlDump );
}
function provideNativeUpserts() {
return [
[
'3.24.0',
'a',
[ 'a_1' => 1, 'a_2' => 'x' ],
[ 'a_1' ],
[ 'a_2 = a_2 + 1' ],
'INSERT INTO a (a_1,a_2) VALUES (\'1\',\'x\') ' .
'ON CONFLICT DO UPDATE SET a_2 = a_2 + 1;'
],
[
'3.23.0',
'a',
[ 'a_1' => 1, 'a_2' => 'x' ],
[ 'a_1' ],
[ 'a_2 = a_2 + 1' ],
'BEGIN;' .
'UPDATE a SET a_2 = a_2 + 1 WHERE ((a_1 = \'1\'));' .
'INSERT OR IGNORE INTO a (a_1,a_2) VALUES (\'1\',\'x\');' .
'COMMIT;'
],
[
'3.24.0',
'a',
[
[ 'a_1' => 2, 'a_2' => 'x' ],
[ 'a_1' => 3, 'a_2' => 'y' ]
],
[ 'a_1' ],
[ 'a_2 = a_2 + 1' ],
'INSERT INTO a (a_1,a_2) VALUES (\'2\',\'x\'),(\'3\',\'y\') ' .
'ON CONFLICT DO UPDATE SET a_2 = a_2 + 1;'
],
[
'3.23.0',
'a',
[
[ 'a_1' => 2, 'a_2' => 'x' ],
[ 'a_1' => 3, 'a_2' => 'y' ]
],
[ 'a_1' ],
[ 'a_2 = a_2 + 1' ],
'BEGIN;UPDATE a SET a_2 = a_2 + 1 WHERE ((a_1 = \'2\') OR (a_1 = \'3\'));' .
'INSERT OR IGNORE INTO a (a_1,a_2) VALUES (\'2\',\'x\'),(\'3\',\'y\');' .
'COMMIT;'
]
];
}
}