Add WRITE_SYNC flag to BagOStuff::set()/merge()
* This blocks on writing to all replicas and returns false if any failed. * This is useful if ChronologyProtector is to work across domains by having the writes go everywhere so that later reads will see them (and be local at the same time). * Redundant doc comments were also removed. Change-Id: I9ed098d563c64dba605e7809bc96731da3b3e79d
This commit is contained in:
parent
a8590172c8
commit
5c8ef13306
13 changed files with 84 additions and 157 deletions
|
|
@ -40,7 +40,7 @@ class APCBagOStuff extends BagOStuff {
|
|||
return $val;
|
||||
}
|
||||
|
||||
public function set( $key, $value, $exptime = 0 ) {
|
||||
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
|
||||
apc_store( $key . self::KEY_SUFFIX, $value, $exptime );
|
||||
|
||||
return true;
|
||||
|
|
|
|||
|
|
@ -67,6 +67,8 @@ abstract class BagOStuff implements LoggerAwareInterface {
|
|||
/** Bitfield constants for get()/getMulti() */
|
||||
const READ_LATEST = 1; // use latest data for replicated stores
|
||||
const READ_VERIFIED = 2; // promise that caller can tell when keys are stale
|
||||
/** Bitfield constants for set()/merge() */
|
||||
const WRITE_SYNC = 1; // synchronously write to all locations for replicated stores
|
||||
|
||||
public function __construct( array $params = array() ) {
|
||||
if ( isset( $params['logger'] ) ) {
|
||||
|
|
@ -158,6 +160,7 @@ abstract class BagOStuff implements LoggerAwareInterface {
|
|||
* @param mixed $casToken
|
||||
* @param integer $flags Bitfield of BagOStuff::READ_* constants [optional]
|
||||
* @return mixed Returns false on failure and if the item does not exist
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function getWithToken( $key, &$casToken, $flags = 0 ) {
|
||||
throw new Exception( __METHOD__ . ' not implemented.' );
|
||||
|
|
@ -169,9 +172,10 @@ abstract class BagOStuff implements LoggerAwareInterface {
|
|||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $exptime Either an interval in seconds or a unix timestamp for expiry
|
||||
* @param int $flags Bitfield of BagOStuff::WRITE_* constants
|
||||
* @return bool Success
|
||||
*/
|
||||
abstract public function set( $key, $value, $exptime = 0 );
|
||||
abstract public function set( $key, $value, $exptime = 0, $flags = 0 );
|
||||
|
||||
/**
|
||||
* Delete an item
|
||||
|
|
@ -191,15 +195,16 @@ abstract class BagOStuff implements LoggerAwareInterface {
|
|||
* @param callable $callback Callback method to be executed
|
||||
* @param int $exptime Either an interval in seconds or a unix timestamp for expiry
|
||||
* @param int $attempts The amount of times to attempt a merge in case of failure
|
||||
* @param int $flags Bitfield of BagOStuff::WRITE_* constants
|
||||
* @return bool Success
|
||||
* @throws InvalidArgumentException
|
||||
*/
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10 ) {
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
|
||||
if ( !is_callable( $callback ) ) {
|
||||
throw new InvalidArgumentException( "Got invalid callback." );
|
||||
}
|
||||
|
||||
return $this->mergeViaLock( $key, $callback, $exptime, $attempts );
|
||||
return $this->mergeViaLock( $key, $callback, $exptime, $attempts, $flags );
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -262,9 +267,10 @@ abstract class BagOStuff implements LoggerAwareInterface {
|
|||
* @param callable $callback Callback method to be executed
|
||||
* @param int $exptime Either an interval in seconds or a unix timestamp for expiry
|
||||
* @param int $attempts The amount of times to attempt a merge in case of failure
|
||||
* @param int $flags Bitfield of BagOStuff::WRITE_* constants
|
||||
* @return bool Success
|
||||
*/
|
||||
protected function mergeViaLock( $key, $callback, $exptime = 0, $attempts = 10 ) {
|
||||
protected function mergeViaLock( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
|
||||
if ( !$this->lock( $key, 6 ) ) {
|
||||
return false;
|
||||
}
|
||||
|
|
@ -279,7 +285,7 @@ abstract class BagOStuff implements LoggerAwareInterface {
|
|||
if ( $value === false ) {
|
||||
$success = true; // do nothing
|
||||
} else {
|
||||
$success = $this->set( $key, $value, $exptime ); // set the new value
|
||||
$success = $this->set( $key, $value, $exptime, $flags ); // set the new value
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class EmptyBagOStuff extends BagOStuff {
|
|||
return false;
|
||||
}
|
||||
|
||||
public function set( $key, $value, $exp = 0 ) {
|
||||
public function set( $key, $value, $exp = 0, $flags = 0 ) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -39,7 +39,7 @@ class EmptyBagOStuff extends BagOStuff {
|
|||
return true;
|
||||
}
|
||||
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10 ) {
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
|
||||
return true; // faster
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ class HashBagOStuff extends BagOStuff {
|
|||
return $this->bag[$key][0];
|
||||
}
|
||||
|
||||
public function set( $key, $value, $exptime = 0 ) {
|
||||
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
|
||||
$this->bag[$key] = array( $value, $this->convertExpiry( $exptime ) );
|
||||
return true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,8 +84,8 @@ class ReplicatedBagOStuff extends BagOStuff {
|
|||
: $this->readStore->getMulti( $keys, $flags );
|
||||
}
|
||||
|
||||
public function set( $key, $value, $exptime = 0 ) {
|
||||
return $this->writeStore->set( $key, $value, $exptime );
|
||||
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
|
||||
return $this->writeStore->set( $key, $value, $exptime, $flags );
|
||||
}
|
||||
|
||||
public function delete( $key ) {
|
||||
|
|
@ -112,8 +112,8 @@ class ReplicatedBagOStuff extends BagOStuff {
|
|||
return $this->writeStore->unlock( $key );
|
||||
}
|
||||
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10 ) {
|
||||
return $this->writeStore->merge( $key, $callback, $exptime, $attempts );
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
|
||||
return $this->writeStore->merge( $key, $callback, $exptime, $attempts, $flags );
|
||||
}
|
||||
|
||||
public function getLastError() {
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ class WinCacheBagOStuff extends BagOStuff {
|
|||
return $val;
|
||||
}
|
||||
|
||||
public function set( $key, $value, $expire = 0 ) {
|
||||
public function set( $key, $value, $expire = 0, $flags = 0 ) {
|
||||
$result = wincache_ucache_set( $key, serialize( $value ), $expire );
|
||||
|
||||
/* wincache_ucache_set returns an empty array on success if $value
|
||||
|
|
@ -64,7 +64,7 @@ class WinCacheBagOStuff extends BagOStuff {
|
|||
return true;
|
||||
}
|
||||
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10 ) {
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
|
||||
if ( !is_callable( $callback ) ) {
|
||||
throw new Exception( "Got invalid callback." );
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ class XCacheBagOStuff extends BagOStuff {
|
|||
return $val;
|
||||
}
|
||||
|
||||
public function set( $key, $value, $expire = 0 ) {
|
||||
public function set( $key, $value, $expire = 0, $flags = 0 ) {
|
||||
if ( !$this->isInteger( $value ) ) {
|
||||
$value = serialize( $value );
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@
|
|||
* @ingroup Cache
|
||||
*/
|
||||
class MemcachedBagOStuff extends BagOStuff {
|
||||
/** @var MWMemcached|Memcached */
|
||||
protected $client;
|
||||
|
||||
/**
|
||||
|
|
@ -67,49 +68,26 @@ class MemcachedBagOStuff extends BagOStuff {
|
|||
return $this->client->get( $this->encodeKey( $key ), $casToken );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $exptime
|
||||
* @return bool
|
||||
*/
|
||||
public function set( $key, $value, $exptime = 0 ) {
|
||||
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
|
||||
return $this->client->set( $this->encodeKey( $key ), $value,
|
||||
$this->fixExpiry( $exptime ) );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param mixed $casToken
|
||||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $exptime
|
||||
* @return bool
|
||||
*/
|
||||
protected function cas( $casToken, $key, $value, $exptime = 0 ) {
|
||||
return $this->client->cas( $casToken, $this->encodeKey( $key ),
|
||||
$value, $this->fixExpiry( $exptime ) );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @return bool
|
||||
*/
|
||||
public function delete( $key ) {
|
||||
return $this->client->delete( $this->encodeKey( $key ) );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param int $value
|
||||
* @param int $exptime (default 0)
|
||||
* @return mixed
|
||||
*/
|
||||
public function add( $key, $value, $exptime = 0 ) {
|
||||
return $this->client->add( $this->encodeKey( $key ), $value,
|
||||
$this->fixExpiry( $exptime ) );
|
||||
}
|
||||
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10 ) {
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
|
||||
if ( !is_callable( $callback ) ) {
|
||||
throw new Exception( "Got invalid callback." );
|
||||
}
|
||||
|
|
|
|||
|
|
@ -122,33 +122,16 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
|
|||
return $result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $exptime
|
||||
* @return bool
|
||||
*/
|
||||
public function set( $key, $value, $exptime = 0 ) {
|
||||
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
|
||||
$this->debugLog( "set($key)" );
|
||||
return $this->checkResult( $key, parent::set( $key, $value, $exptime ) );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param float $casToken
|
||||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $exptime
|
||||
* @return bool
|
||||
*/
|
||||
protected function cas( $casToken, $key, $value, $exptime = 0 ) {
|
||||
$this->debugLog( "cas($key)" );
|
||||
return $this->checkResult( $key, parent::cas( $casToken, $key, $value, $exptime ) );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @return bool
|
||||
*/
|
||||
public function delete( $key ) {
|
||||
$this->debugLog( "delete($key)" );
|
||||
$result = parent::delete( $key );
|
||||
|
|
@ -160,33 +143,17 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param int $value
|
||||
* @param int $exptime
|
||||
* @return mixed
|
||||
*/
|
||||
public function add( $key, $value, $exptime = 0 ) {
|
||||
$this->debugLog( "add($key)" );
|
||||
return $this->checkResult( $key, parent::add( $key, $value, $exptime ) );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param int $value
|
||||
* @return mixed
|
||||
*/
|
||||
public function incr( $key, $value = 1 ) {
|
||||
$this->debugLog( "incr($key)" );
|
||||
$result = $this->client->increment( $key, $value );
|
||||
return $this->checkResult( $key, $result );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param int $value
|
||||
* @return mixed
|
||||
*/
|
||||
public function decr( $key, $value = 1 ) {
|
||||
$this->debugLog( "decr($key)" );
|
||||
$result = $this->client->decrement( $key, $value );
|
||||
|
|
|
|||
|
|
@ -98,11 +98,10 @@ class MultiWriteBagOStuff extends BagOStuff {
|
|||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool $debug
|
||||
*/
|
||||
public function setDebug( $debug ) {
|
||||
$this->doWrite( self::ALL, 'setDebug', $debug );
|
||||
foreach ( $this->caches as $cache ) {
|
||||
$cache->setDebug( $debug );
|
||||
}
|
||||
}
|
||||
|
||||
protected function doGet( $key, $flags = 0 ) {
|
||||
|
|
@ -127,75 +126,43 @@ class MultiWriteBagOStuff extends BagOStuff {
|
|||
&& $misses > 0
|
||||
&& ( $flags & self::READ_VERIFIED ) == self::READ_VERIFIED
|
||||
) {
|
||||
$this->doWrite( $misses, 'set', $key, $value, self::UPGRADE_TTL );
|
||||
$this->doWrite( $misses, $this->asyncWrites, 'set', $key, $value, self::UPGRADE_TTL );
|
||||
}
|
||||
|
||||
return $value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $exptime
|
||||
* @return bool
|
||||
*/
|
||||
public function set( $key, $value, $exptime = 0 ) {
|
||||
return $this->doWrite( self::ALL, 'set', $key, $value, $exptime );
|
||||
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
|
||||
$asyncWrites = ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC )
|
||||
? false
|
||||
: $this->asyncWrites;
|
||||
|
||||
return $this->doWrite( self::ALL, $asyncWrites, 'set', $key, $value, $exptime );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @return bool
|
||||
*/
|
||||
public function delete( $key ) {
|
||||
return $this->doWrite( self::ALL, 'delete', $key );
|
||||
return $this->doWrite( self::ALL, $this->asyncWrites, 'delete', $key );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $exptime
|
||||
* @return bool
|
||||
*/
|
||||
public function add( $key, $value, $exptime = 0 ) {
|
||||
return $this->doWrite( self::ALL, 'add', $key, $value, $exptime );
|
||||
return $this->doWrite( self::ALL, $this->asyncWrites, 'add', $key, $value, $exptime );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param int $value
|
||||
* @return bool|null
|
||||
*/
|
||||
public function incr( $key, $value = 1 ) {
|
||||
return $this->doWrite( self::ALL, 'incr', $key, $value );
|
||||
return $this->doWrite( self::ALL, $this->asyncWrites, 'incr', $key, $value );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param int $value
|
||||
* @return bool
|
||||
*/
|
||||
public function decr( $key, $value = 1 ) {
|
||||
return $this->doWrite( self::ALL, 'decr', $key, $value );
|
||||
return $this->doWrite( self::ALL, $this->asyncWrites, 'decr', $key, $value );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param int $timeout
|
||||
* @param int $expiry
|
||||
* @param string $rclass
|
||||
* @return bool
|
||||
*/
|
||||
public function lock( $key, $timeout = 6, $expiry = 6, $rclass = '' ) {
|
||||
// Lock only the first cache, to avoid deadlocks
|
||||
// Only need to lock the first cache; also avoids deadlocks
|
||||
return $this->caches[0]->lock( $key, $timeout, $expiry, $rclass );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @return bool
|
||||
*/
|
||||
public function unlock( $key ) {
|
||||
// Only the first cache is locked
|
||||
return $this->caches[0]->unlock( $key );
|
||||
}
|
||||
|
||||
|
|
@ -211,20 +178,21 @@ class MultiWriteBagOStuff extends BagOStuff {
|
|||
* Apply a write method to the first $count backing caches
|
||||
*
|
||||
* @param integer $count
|
||||
* @param bool $asyncWrites
|
||||
* @param string $method
|
||||
* @param mixed ...
|
||||
* @return bool
|
||||
*/
|
||||
protected function doWrite( $count, $method /*, ... */ ) {
|
||||
protected function doWrite( $count, $asyncWrites, $method /*, ... */ ) {
|
||||
$ret = true;
|
||||
$args = array_slice( func_get_args(), 2 );
|
||||
$args = array_slice( func_get_args(), 3 );
|
||||
|
||||
foreach ( $this->caches as $i => $cache ) {
|
||||
if ( $i >= $count ) {
|
||||
break; // ignore the lower tiers
|
||||
}
|
||||
|
||||
if ( $i == 0 || !$this->asyncWrites ) {
|
||||
if ( $i == 0 || !$asyncWrites ) {
|
||||
// First store or in sync mode: write now and get result
|
||||
if ( !call_user_func_array( array( $cache, $method ), $args ) ) {
|
||||
$ret = false;
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ class RedisBagOStuff extends BagOStuff {
|
|||
return $result;
|
||||
}
|
||||
|
||||
public function set( $key, $value, $expiry = 0 ) {
|
||||
public function set( $key, $value, $expiry = 0, $flags = 0 ) {
|
||||
list( $server, $conn ) = $this->getConnection( $key );
|
||||
if ( !$conn ) {
|
||||
return false;
|
||||
|
|
|
|||
|
|
@ -289,11 +289,6 @@ class SqlBagOStuff extends BagOStuff {
|
|||
return $values;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array $data
|
||||
* @param int $expiry
|
||||
* @return bool
|
||||
*/
|
||||
public function setMulti( array $data, $expiry = 0 ) {
|
||||
$keysByTable = array();
|
||||
foreach ( $data as $key => $value ) {
|
||||
|
|
@ -353,23 +348,10 @@ class SqlBagOStuff extends BagOStuff {
|
|||
return $result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $exptime
|
||||
* @return bool
|
||||
*/
|
||||
public function set( $key, $value, $exptime = 0 ) {
|
||||
public function set( $key, $value, $exptime = 0, $flags = 0 ) {
|
||||
return $this->setMulti( array( $key => $value ), $exptime );
|
||||
}
|
||||
|
||||
/**
|
||||
* @param mixed $casToken
|
||||
* @param string $key
|
||||
* @param mixed $value
|
||||
* @param int $exptime
|
||||
* @return bool
|
||||
*/
|
||||
protected function cas( $casToken, $key, $value, $exptime = 0 ) {
|
||||
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
|
||||
try {
|
||||
|
|
@ -410,10 +392,6 @@ class SqlBagOStuff extends BagOStuff {
|
|||
return (bool)$db->affectedRows();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @return bool
|
||||
*/
|
||||
public function delete( $key ) {
|
||||
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
|
||||
try {
|
||||
|
|
@ -430,11 +408,6 @@ class SqlBagOStuff extends BagOStuff {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
* @param int $step
|
||||
* @return int|null
|
||||
*/
|
||||
public function incr( $key, $step = 1 ) {
|
||||
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
|
||||
try {
|
||||
|
|
@ -479,7 +452,7 @@ class SqlBagOStuff extends BagOStuff {
|
|||
return $newValue;
|
||||
}
|
||||
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10 ) {
|
||||
public function merge( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
|
||||
if ( !is_callable( $callback ) ) {
|
||||
throw new Exception( "Got invalid callback." );
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,41 @@ class MultiWriteBagOStuffTest extends MediaWikiTestCase {
|
|||
$this->assertEquals( $value, $this->cache2->get( $key ), 'Written to tier 2' );
|
||||
}
|
||||
|
||||
public function testSyncMerge() {
|
||||
$key = wfRandomString();
|
||||
$value = wfRandomString();
|
||||
$func = function () use ( $value ) {
|
||||
return $value;
|
||||
};
|
||||
|
||||
// XXX: DeferredUpdates bound to transactions in CLI mode
|
||||
$dbw = wfGetDB( DB_MASTER );
|
||||
$dbw->begin();
|
||||
$this->cache->merge( $key, $func );
|
||||
|
||||
// Set in tier 1
|
||||
$this->assertEquals( $value, $this->cache1->get( $key ), 'Written to tier 1' );
|
||||
// Not yet set in tier 2
|
||||
$this->assertEquals( false, $this->cache2->get( $key ), 'Not written to tier 2' );
|
||||
|
||||
$dbw->commit();
|
||||
|
||||
// Set in tier 2
|
||||
$this->assertEquals( $value, $this->cache2->get( $key ), 'Written to tier 2' );
|
||||
|
||||
$key = wfRandomString();
|
||||
|
||||
$dbw->begin();
|
||||
$this->cache->merge( $key, $func, 0, 1, BagOStuff::WRITE_SYNC );
|
||||
|
||||
// Set in tier 1
|
||||
$this->assertEquals( $value, $this->cache1->get( $key ), 'Written to tier 1' );
|
||||
// Also set in tier 2
|
||||
$this->assertEquals( $value, $this->cache2->get( $key ), 'Written to tier 2' );
|
||||
|
||||
$dbw->commit();
|
||||
}
|
||||
|
||||
public function testSetDelayed() {
|
||||
$key = wfRandomString();
|
||||
$value = wfRandomString();
|
||||
|
|
|
|||
Loading…
Reference in a new issue