rdbms: add unique key checks to upsert() and replace() in Database

Disallow changing unique key values of rows in the upsert() assignment.
The unique key values are used to decide which new and old rows collide.
Changing these values at the same time is too complex and ill-defined.

Disallow setting NULL values for columns of the identity key in upsert()
and replace(), unless they are all NULL for all of the rows. This edge
case can be treated as a plain INSERT, assuming that conflicts are not
triggered over weird DEFAULT column values.

Update the corresponding IDatabase documentation.

Change-Id: If6e0cecce32a5eedaf9a82b67ba41296bfa4a317
This commit is contained in:
Aaron Schulz 2021-05-27 12:18:56 -07:00
parent a466cacfd1
commit acd3a53799
5 changed files with 167 additions and 42 deletions

View file

@ -2313,6 +2313,77 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
}
}
/**
* @param array<int,array> $rows Normalized list of rows to insert
* @param string[] $identityKey Columns of the (unique) identity key to UPSERT upon
* @return bool Whether all the rows have NULL values for all identity key columns
* @since 1.37
*/
final protected function assertValidUpsertRowArray( array $rows, array $identityKey ) {
$keyColumnCountNull = 0;
foreach ( $rows as $row ) {
foreach ( $identityKey as $column ) {
$keyColumnCountNull += ( isset( $row[$column] ) ? 0 : 1 );
}
}
if (
$keyColumnCountNull &&
$keyColumnCountNull !== ( count( $rows ) * count( $identityKey ) )
) {
throw new DBUnexpectedError(
$this,
"NULL values for unique key (" . implode( ',', $identityKey ) . ")"
);
}
return (bool)$keyColumnCountNull;
}
/**
* @param array $set Combined column/literal assignment map and SQL assignment list
* @param string[] $identityKey Columns of the (unique) identity key to UPSERT upon
* @param array<int,array> $rows List of rows to upsert
* @since 1.37
*/
final protected function assertValidUpsertSetArray(
array $set,
array $identityKey,
array $rows
) {
// Sloppy callers might construct the SET array using the ROW array, leaving redundant
// column definitions for identity key columns. Detect this for backwards compatability.
$soleRow = ( count( $rows ) == 1 ) ? reset( $rows ) : null;
// Disallow value changes for any columns in the identity key. This avoids additional
// insertion order dependencies that are unwieldy and difficult to implement efficiently.
foreach ( $set as $k => $v ) {
if ( is_string( $k ) ) {
// Key is a column name and value is a literal (e.g. string, int, null, ...)
if ( in_array( $k, $identityKey, true ) ) {
if ( $soleRow && array_key_exists( $k, $soleRow ) && $soleRow[$k] === $v ) {
$this->queryLogger->warning(
__METHOD__ . " called with redundant assignment to column '$k'",
[ 'exception' => new RuntimeException() ]
);
} else {
throw new DBUnexpectedError(
$this,
"Cannot reassign column '$k' since it belongs to identity key"
);
}
}
} elseif ( preg_match( '/^([a-zA-Z0-9_]+)\s*=/', $v, $m ) ) {
// Value is of the form "<unquoted alphanumeric column> = <SQL expression>"
if ( in_array( $m[1], $identityKey, true ) ) {
throw new DBUnexpectedError(
$this,
"Cannot reassign column '{$m[1]}' since it belongs to identity key"
);
}
}
}
}
/**
* @param string $option Query option flag (e.g. "IGNORE" or "FOR UPDATE")
* @param array $options Combination option/value map and boolean option list
@ -2726,7 +2797,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
* @param string $sqlfunc Name of a SQL function
* @param string|string[] $fields Name(s) of column(s) with values to compare
* @param string|int|float|string[]|int[]|float[] $values Values to compare
* @return mixed
* @return string
* @since 1.35
*/
protected function buildSuperlative( $sqlfunc, $fields, $values ) {
@ -3350,33 +3421,52 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
}
if ( $uniqueKeys ) {
$uniqueKey = $this->normalizeUpsertKeys( $uniqueKeys );
$this->doReplace( $table, $uniqueKey, $rows, $fname );
$identityKey = $this->normalizeUpsertKeys( $uniqueKeys );
} else {
// For backwards compatibility, allow insertion of rows with no applicable key
$identityKey = null;
$this->queryLogger->warning(
__METHOD__ . " called with no unique keys",
__METHOD__ . " called with no unique key",
[ 'exception' => new RuntimeException() ]
);
}
if ( $identityKey ) {
$allDefaultKeyValues = $this->assertValidUpsertRowArray( $rows, $identityKey );
if ( $allDefaultKeyValues ) {
// For backwards compatibility, allow insertion of rows with all-NULL
// values for the unique columns (e.g. for an AUTOINCREMENT column)
$identityKey = null;
$this->queryLogger->warning(
__METHOD__ . " called with all-null values for unique key",
[ 'exception' => new RuntimeException() ]
);
}
}
if ( $identityKey ) {
$this->doReplace( $table, $identityKey, $rows, $fname );
} else {
$this->doInsert( $table, $rows, $fname );
}
}
/**
* @see Database::replace()
* @stable to override
* @param string $table
* @param string[] $uniqueKey List of columns defining a unique key
* @param string[] $identityKey List of columns defining a unique key
* @param array $rows Non-empty list of rows
* @param string $fname
* @see Database::replace()
* @stable to override
* @since 1.35
*/
protected function doReplace( $table, array $uniqueKey, array $rows, $fname ) {
protected function doReplace( $table, array $identityKey, array $rows, $fname ) {
$affectedRowCount = 0;
$this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
try {
foreach ( $rows as $row ) {
// Delete any conflicting rows (including ones inserted from $rows)
$sqlCondition = $this->makeConditionCollidesUponKeys( [ $row ], [ $uniqueKey ] );
$sqlCondition = $this->makeConditionCollidesUponKeys( [ $row ], [ $identityKey ] );
$this->delete( $table, [ $sqlCondition ], $fname );
$affectedRowCount += $this->affectedRows();
// Now insert the row
@ -3414,9 +3504,11 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
return $this->makeList( [ $column => $values ], self::LIST_AND );
}
$nullByUniqueKeyColumn = array_fill_keys( $uniqueKey, null );
$disjunctions = [];
foreach ( $rows as $row ) {
$rowKeyMap = array_intersect_key( $row, array_fill_keys( $uniqueKey, true ) );
$rowKeyMap = array_intersect_key( $row, $nullByUniqueKeyColumn );
if ( count( $rowKeyMap ) != count( $uniqueKey ) ) {
throw new DBUnexpectedError(
$this,
@ -3459,13 +3551,33 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
}
if ( $uniqueKeys ) {
$uniqueKey = $this->normalizeUpsertKeys( $uniqueKeys );
$this->doUpsert( $table, $rows, $uniqueKey, $set, $fname );
$identityKey = $this->normalizeUpsertKeys( $uniqueKeys );
} else {
// For backwards compatibility, allow insertion of rows with no applicable key
$identityKey = null;
$this->queryLogger->warning(
__METHOD__ . " called with no unique keys",
__METHOD__ . " called with no unique key",
[ 'exception' => new RuntimeException() ]
);
}
if ( $identityKey ) {
$allDefaultKeyValues = $this->assertValidUpsertRowArray( $rows, $identityKey );
$this->assertValidUpsertSetArray( $set, $identityKey, $rows );
if ( $allDefaultKeyValues ) {
// For backwards compatibility, allow insertion of rows with all-NULL
// values for the unique columns (e.g. for an AUTOINCREMENT column)
$identityKey = null;
$this->queryLogger->warning(
__METHOD__ . " called with all-null values for unique key",
[ 'exception' => new RuntimeException() ]
);
}
}
if ( $identityKey ) {
$this->doUpsert( $table, $rows, $identityKey, $set, $fname );
} else {
$this->doInsert( $table, $rows, $fname );
}
@ -3473,22 +3585,28 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
}
/**
* @see Database::upsert()
* @stable to override
* @param string $table
* @param array[] $rows Non-empty list of rows
* @param string[] $uniqueKey List of columns defining a unique key
* @param array $set
* @param string[] $identityKey List of columns defining a unique key
* @param string[] $set Non-empty combined column/literal map and SQL assignment list
* @param string $fname
* @see Database::upsert()
* @stable to override
* @since 1.35
*/
protected function doUpsert( $table, array $rows, array $uniqueKey, array $set, $fname ) {
protected function doUpsert(
string $table,
array $rows,
array $identityKey,
array $set,
string $fname
) {
$affectedRowCount = 0;
$this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
try {
foreach ( $rows as $row ) {
// Update any existing conflicting rows (including ones inserted from $rows)
$sqlConditions = $this->makeConditionCollidesUponKeys( [ $row ], [ $uniqueKey ] );
$sqlConditions = $this->makeConditionCollidesUponKeys( [ $row ], [ $identityKey ] );
$this->update( $table, $set, [ $sqlConditions ], $fname );
$rowsUpdated = $this->affectedRows();
$affectedRowCount += $rowsUpdated;

View file

@ -1178,7 +1178,13 @@ abstract class DatabaseMysqlBase extends Database {
$this->query( $sql, $fname, self::QUERY_CHANGE_ROWS );
}
protected function doUpsert( $table, array $rows, array $uniqueKey, array $set, $fname ) {
protected function doUpsert(
string $table,
array $rows,
array $identityKey,
array $set,
string $fname
) {
$encTable = $this->tableName( $table );
list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows );
$sqlColumnAssignments = $this->makeList( $set, self::LIST_SET );
@ -1190,7 +1196,7 @@ abstract class DatabaseMysqlBase extends Database {
$this->query( $sql, $fname, self::QUERY_CHANGE_ROWS );
}
protected function doReplace( $table, array $uniqueKey, array $rows, $fname ) {
protected function doReplace( $table, array $identityKey, array $rows, $fname ) {
$encTable = $this->tableName( $table );
list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows );

View file

@ -560,7 +560,7 @@ class DatabaseSqlite extends Database {
return [ 'INSERT OR IGNORE INTO', '' ];
}
protected function doReplace( $table, array $uniqueKey, array $rows, $fname ) {
protected function doReplace( $table, array $identityKey, array $rows, $fname ) {
$encTable = $this->tableName( $table );
list( $sqlColumns, $sqlTuples ) = $this->makeInsertLists( $rows );
// https://sqlite.org/lang_insert.html

View file

@ -980,8 +980,8 @@ interface IDatabase {
* The keys in each map must be identical to each other and in the same order.
* The rows must not collide with each other.
* @param string $fname Calling function name (use __METHOD__) for logs/profiling
* @param string|array $options Combination map/list where each string-keyed entry maps a
* non-boolean option to the option parameters and each integer-keyed value is the
* @param string|array $options Combination map/list where each string-keyed entry maps
* a non-boolean option to the option parameters and each integer-keyed value is the
* name of a boolean option. Supported options are:
* - IGNORE: Boolean: skip insertion of rows that would cause unique key conflicts.
* IDatabase::affectedRows() can be used to determine how many rows were inserted.
@ -1006,8 +1006,8 @@ interface IDatabase {
* accidentally, an empty condition for 'update' queries isn't allowed.
* IDatabase::ALL_ROWS should be passed explicitely in order to update all rows.
* @param string $fname Calling function name (use __METHOD__) for logs/profiling
* @param string|array $options Combination map/list where each string-keyed entry maps a
* non-boolean option to the option parameters and each integer-keyed value is the
* @param string|array $options Combination map/list where each string-keyed entry maps
* a non-boolean option to the option parameters and each integer-keyed value is the
* name of a boolean option. Supported options are:
* - IGNORE: Boolean: skip update of rows that would cause unique key conflicts.
* IDatabase::affectedRows() can be used to determine how many rows were updated.
@ -1369,14 +1369,14 @@ interface IDatabase {
*
* @param string $table The table name
* @param string|string[]|string[][] $uniqueKeys Column name or non-empty list of column
* name lists that define all applicable unique keys on the table. Each unique key on the
* table is "applicable" unless either:
* name lists that define all applicable unique keys on the table. There must only be
* one such key. Each unique key on the table is "applicable" unless either:
* - It involves an AUTOINCREMENT column for which no values are assigned in $rows
* - It involves a UUID column for which newly generated UUIDs are assigned in $rows
* @param array|array[] $rows Row(s) to insert, in the form of either:
* - A string-keyed map of (column name => value) defining a new row. Values are
* treated as literals and quoted appropriately; null is interpreted as NULL.
* Columns belonging to a key in $uniqueIndexes must be defined here and non-null.
* Columns belonging to a key in $uniqueKeys must be defined here and non-null.
* - An integer-keyed list of such string-keyed maps, defining a list of new rows.
* The keys in each map must be identical to each other and in the same order.
* The rows must not collide with each other.
@ -1396,23 +1396,24 @@ interface IDatabase {
* @param array|array[] $rows Row(s) to insert, in the form of either:
* - A string-keyed map of (column name => value) defining a new row. Values are
* treated as literals and quoted appropriately; null is interpreted as NULL.
* Columns belonging to a key in $uniqueIndexes must be defined here and non-null.
* Columns belonging to a key in $uniqueKeys must be defined here and non-null.
* - An integer-keyed list of such string-keyed maps, defining a list of new rows.
* The keys in each map must be identical to each other and in the same order.
* The rows must not collide with each other.
* @param string|string[]|string[][] $uniqueKeys Column name or non-empty list of column
* name lists that define all applicable unique keys on the table. Each unique key on the
* table is "applicable" unless either:
* name lists that define all applicable unique keys on the table. There must only be
* one such key. Each unique key on the table is "applicable" unless either:
* - It involves an AUTOINCREMENT column for which no values are assigned in $rows
* - It involves a UUID column for which newly generated UUIDs are assigned in $rows
* Passing string[] to $uniqueKeys is deprecated.
* @param array $set Combination map/list where each string-keyed entry maps a column
* to a literal assigned value and each integer-keyed value is a SQL expression in the
* format of a column assignment within UPDATE...SET. The (column => value) entries are
* convenient due to automatic value quoting and conversion of null to NULL. The SQL
* assignment format is useful for updates like "column = column + X". All assignments
* have no defined execution order, so they should not depend on each other. Do not
* modified AUTOINCREMENT or UUID columns in assignments.
* to a literal assigned value and each integer-keyed value is a SQL assignment expression
* of the form "<unquoted alphanumeric column> = <SQL expression>". The (column => value)
* entries are convenient due to automatic value quoting and conversion of null to NULL.
* The SQL assignment entries are useful for updates like "column = column + X". All of
* the assignments have no defined execution order, so callers should make sure that they
* not depend on each other. Do not modify AUTOINCREMENT or UUID columns in assignments,
* even if they are just "secondary" unique keys.
* @param string $fname Calling function name (use __METHOD__) for logs/profiling
* @return bool Return true if no exception was thrown (deprecated since 1.33)
* @throws DBError If an error occurs, {@see query}

View file

@ -658,11 +658,11 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
'table' => 'upsert_table',
'rows' => [ 'field' => 'text', 'field2' => 'text2' ],
'uniqueIndexes' => 'field',
'set' => [ 'field' => 'set' ],
'set' => [ 'field2' => 'set' ],
],
"BEGIN; " .
"UPDATE upsert_table " .
"SET field = 'set' " .
"SET field2 = 'set' " .
"WHERE (field = 'text'); " .
"INSERT INTO upsert_table " .
"(field,field2) " .
@ -674,11 +674,11 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
'table' => 'upsert_table',
'rows' => [ 'field' => 'text', 'field2' => 'text2' ],
'uniqueIndexes' => [ [ 'field' ] ],
'set' => [ 'field' => 'set' ],
'set' => [ 'field2' => 'set' ],
],
"BEGIN; " .
"UPDATE upsert_table " .
"SET field = 'set' " .
"SET field2 = 'set' " .
"WHERE (field = 'text'); " .
"INSERT INTO upsert_table " .
"(field,field2) " .