objectcache: add WANObjectCacheReaper for assuring purges

* This fixes keys based on some sort of change log.
  Updates are wrapped in a mutex and keep track of the
  last known good position.
* Make WANObjectReapUpdate class that cleans up title
  related keys using the recentchanges table. This triggers
  as a deferred updates on RC view.

Change-Id: I7f14b9ca2533032147e62b1a3cc004a23da86579
This commit is contained in:
Aaron Schulz 2016-09-02 21:43:16 -07:00 committed by Tim Starling
parent e191d4e913
commit 2e5eb693de
11 changed files with 516 additions and 0 deletions

View file

@ -1541,7 +1541,9 @@ $wgAutoloadLocalClasses = [
'ViewCLI' => __DIR__ . '/maintenance/view.php',
'VirtualRESTService' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTService.php',
'VirtualRESTServiceClient' => __DIR__ . '/includes/libs/virtualrest/VirtualRESTServiceClient.php',
'WANCacheReapUpdate' => __DIR__ . '/includes/deferred/WANCacheReapUpdate.php',
'WANObjectCache' => __DIR__ . '/includes/libs/objectcache/WANObjectCache.php',
'WANObjectCacheReaper' => __DIR__ . '/includes/libs/objectcache/WANObjectCacheReaper.php',
'WantedCategoriesPage' => __DIR__ . '/includes/specials/SpecialWantedcategories.php',
'WantedFilesPage' => __DIR__ . '/includes/specials/SpecialWantedfiles.php',
'WantedPagesPage' => __DIR__ . '/includes/specials/SpecialWantedpages.php',

View file

@ -2342,6 +2342,19 @@ $wgWANObjectCaches = [
*/
];
/**
* Verify and enforce WAN cache purges using reliable DB sources as streams.
*
* These secondary cache purges are de-duplicated via simple cache mutexes.
* This improves consistency when cache purges are lost, which becomes more likely
* as more cache servers are added or if there are multiple datacenters. Only keys
* related to important mutable content will be checked.
*
* @var bool
* @since 1.29
*/
$wgEnableWANCacheReaper = false;
/**
* Main object stash type. This should be a fast storage system for storing
* lightweight data like hit counters and user activity. Sites with multiple

View file

@ -278,6 +278,20 @@ class LinkCache {
return $id;
}
/**
* @param WANObjectCache $cache
* @param TitleValue $t
* @return string[]
* @since 1.28
*/
public function getMutableCacheKeys( WANObjectCache $cache, TitleValue $t ) {
if ( $this->isCacheable( $t ) ) {
return [ $cache->makeKey( 'page', $t->getNamespace(), sha1( $t->getDBkey() ) ) ];
}
return [];
}
private function isCacheable( LinkTarget $title ) {
return ( $title->inNamespace( NS_TEMPLATE ) || $title->inNamespace( NS_FILE ) );
}

View file

@ -0,0 +1,126 @@
<?php
use Psr\Log\LoggerInterface;
/**
* Class for fixing stale WANObjectCache keys using a purge event source
*
* This is useful for expiring keys that missed fire-and-forget purges. This uses the
* recentchanges table as a reliable stream to make certain keys reach consistency
* as soon as the underlying replica database catches up. These means that critical
* keys will not escape getting purged simply due to brief hiccups in the network,
* which are more prone to happen accross datacenters.
*
* ----
* "I was trying to cheat death. I was only trying to surmount for a little while the
* darkness that all my life I surely knew was going to come rolling in on me some day
* and obliterate me. I was only to stay alive a little brief while longer, after I was
* already gone. To stay in the light, to be with the living, a little while past my time."
* -- Notes for "Blues of a Lifetime", by [[Cornell Woolrich]]
*
* @since 1.28
*/
class WANCacheReapUpdate implements DeferrableUpdate {
/** @var IDatabase */
private $db;
/** @var LoggerInterface */
private $logger;
/**
* @param IDatabase $db
* @param LoggerInterface $logger
*/
public function __construct( IDatabase $db, LoggerInterface $logger ) {
$this->db = $db;
$this->logger = $logger;
}
function doUpdate() {
$reaper = new WANObjectCacheReaper(
ObjectCache::getMainWANInstance(),
ObjectCache::getLocalClusterInstance(),
[ $this, 'getTitleChangeEvents' ],
[ $this, 'getEventAffectedKeys' ],
[
'channel' => 'table:recentchanges:' . $this->db->getWikiID(),
'logger' => $this->logger
]
);
$reaper->invoke( 100 );
}
/**
* @see WANObjectCacheRepear
*
* @param int $start
* @param int $id
* @param int $end
* @param int $limit
* @return TitleValue[]
*/
public function getTitleChangeEvents( $start, $id, $end, $limit ) {
$db = $this->db;
$encStart = $db->addQuotes( $db->timestamp( $start ) );
$encEnd = $db->addQuotes( $db->timestamp( $end ) );
$id = (int)$id; // cast NULL => 0 since rc_id is an integer
$res = $db->select(
'recentchanges',
[ 'rc_namespace', 'rc_title', 'rc_timestamp', 'rc_id' ],
[
$db->makeList( [
"rc_timestamp > $encStart",
"rc_timestamp = $encStart AND rc_id > " . $db->addQuotes( $id )
], LIST_OR ),
"rc_timestamp < $encEnd"
],
__METHOD__,
[ 'ORDER BY' => 'rc_timestamp ASC, rc_id ASC', 'LIMIT' => $limit ]
);
$events = [];
foreach ( $res as $row ) {
$events[] = [
'id' => (int)$row->rc_id,
'pos' => (int)wfTimestamp( TS_UNIX, $row->rc_timestamp ),
'item' => new TitleValue( (int)$row->rc_namespace, $row->rc_title )
];
}
return $events;
}
/**
* Gets a list of important cache keys associated with a title
*
* @see WANObjectCacheRepear
* @param WANObjectCache $cache
* @param TitleValue $t
* @returns string[]
*/
public function getEventAffectedKeys( WANObjectCache $cache, TitleValue $t ) {
/** @var WikiPage[]|LocalFile[]|User[] $entities */
$entities = [];
$entities[] = WikiPage::factory( Title::newFromTitleValue( $t ) );
if ( $t->inNamespace( NS_FILE ) ) {
$entities[] = wfLocalFile( $t->getText() );
}
if ( $t->inNamespace( NS_USER ) ) {
$entities[] = User::newFromName( $t->getText(), false );
}
$keys = [];
foreach ( $entities as $entity ) {
if ( $entity ) {
$keys = array_merge( $keys, $entity->getMutableCacheKeys( $cache ) );
}
}
if ( $keys ) {
$this->logger->debug( __CLASS__ . ': got key(s) ' . implode( ', ', $keys ) );
}
return $keys;
}
}

View file

@ -240,6 +240,15 @@ class LocalFile extends File {
return $this->repo->getSharedCacheKey( 'file', sha1( $this->getName() ) );
}
/**
* @param WANObjectCache $cache
* @return string[]
* @since 1.28
*/
public function getMutableCacheKeys( WANObjectCache $cache ) {
return [ $this->getCacheKey() ];
}
/**
* Try to load file metadata from memcached, falling back to the database
*/

View file

@ -1127,6 +1127,65 @@ class WANObjectCache implements IExpiringStore, LoggerAwareInterface {
return $values;
}
/**
* Locally set a key to expire soon if it is stale based on $purgeTimestamp
*
* This sets stale keys' time-to-live at HOLDOFF_TTL seconds, which both avoids
* broadcasting in mcrouter setups and also avoids races with new tombstones.
*
* @param string $key Cache key
* @param int $purgeTimestamp UNIX timestamp of purge
* @param bool &$isStale Whether the key is stale
* @return bool Success
* @since 1.28
*/
public function reap( $key, $purgeTimestamp, &$isStale = false ) {
$minAsOf = $purgeTimestamp + self::HOLDOFF_TTL;
$wrapped = $this->cache->get( self::VALUE_KEY_PREFIX . $key );
if ( is_array( $wrapped ) && $wrapped[self::FLD_TIME] < $minAsOf ) {
$isStale = true;
$this->logger->warning( "Reaping stale value key '$key'." );
$ttlReap = self::HOLDOFF_TTL; // avoids races with tombstone creation
$ok = $this->cache->changeTTL( self::VALUE_KEY_PREFIX . $key, $ttlReap );
if ( !$ok ) {
$this->logger->error( "Could not complete reap of key '$key'." );
}
return $ok;
}
$isStale = false;
return true;
}
/**
* Locally set a "check" key to expire soon if it is stale based on $purgeTimestamp
*
* @param string $key Cache key
* @param int $purgeTimestamp UNIX timestamp of purge
* @param bool &$isStale Whether the key is stale
* @return bool Success
* @since 1.28
*/
public function reapCheckKey( $key, $purgeTimestamp, &$isStale = false ) {
$purge = $this->parsePurgeValue( $this->cache->get( self::TIME_KEY_PREFIX . $key ) );
if ( $purge && $purge[self::FLD_TIME] < $purgeTimestamp ) {
$isStale = true;
$this->logger->warning( "Reaping stale check key '$key'." );
$ok = $this->cache->changeTTL( self::TIME_KEY_PREFIX . $key, 1 );
if ( !$ok ) {
$this->logger->error( "Could not complete reap of check key '$key'." );
}
return $ok;
}
$isStale = false;
return false;
}
/**
* @see BagOStuff::makeKey()
* @param string ... Key component

View file

@ -0,0 +1,204 @@
<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
* @ingroup Cache
* @author Aaron Schulz
*/
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
/**
* Class for scanning through chronological, log-structured data or change logs
* and locally purging cache keys related to entities that appear in this data.
*
* This is useful for repairing cache when purges are missed by using a reliable
* stream, such as Kafka or a replicated MySQL table. Purge loss between datacenters
* is expected to be more common than within them.
*
* @since 1.28
*/
class WANObjectCacheReaper implements LoggerAwareInterface {
/** @var WANObjectCache */
protected $cache;
/** @var BagOStuff */
protected $store;
/** @var callable */
protected $logChunkCallback;
/** @var callable */
protected $keyListCallback;
/** @var LoggerInterface */
protected $logger;
/** @var string */
protected $channel;
/** @var integer */
protected $initialStartWindow;
/**
* @param WANObjectCache $cache Cache to reap bad keys from
* @param BagOStuff $store Cache to store positions use for locking
* @param callable $logCallback Callback taking arguments:
* - The starting position as a UNIX timestamp
* - The starting unique ID used for breaking timestamp collisions or null
* - The ending position as a UNIX timestamp
* - The maximum number of results to return
* It returns a list of maps of (key: cache key, pos: UNIX timestamp, id: unique ID)
* for each key affected, with the corrosponding event timestamp/ID information.
* The events should be in ascending order, by (timestamp,id).
* @param callable $keyCallback Callback taking arguments:
* - The WANObjectCache instance
* - An object from the event log
* It should return a list of WAN cache keys.
* The callback must fully duck-type test the object, since can be any model class.
* @param array $params Additional options:
* - channel: the name of the update event stream.
* Default: WANObjectCache::DEFAULT_PURGE_CHANNEL.
* - initialStartWindow: seconds back in time to start if the position is lost.
* Default: 1 hour.
* - logger: an SPL monolog instance [optional]
*/
public function __construct(
WANObjectCache $cache,
BagOStuff $store,
callable $logCallback,
callable $keyCallback,
array $params
) {
$this->cache = $cache;
$this->store = $store;
$this->logChunkCallback = $logCallback;
$this->keyListCallback = $keyCallback;
if ( isset( $params['channel'] ) ) {
$this->channel = $params['channel'];
} else {
throw new UnexpectedValueException( "No channel specified." );
}
$this->initialStartWindow = isset( $params['initialStartWindow'] )
? $params['initialStartWindow']
: 3600;
$this->logger = isset( $params['logger'] )
? $params['logger']
: new NullLogger();
}
public function setLogger( LoggerInterface $logger ) {
$this->logger = $logger;
}
/**
* Check and reap stale keys based on a chunk of events
*
* @param int $n Number of events
* @return int Number of keys checked
*/
final public function invoke( $n = 100 ) {
$posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
$scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 );
if ( !$scopeLock ) {
return 0;
}
$now = time();
$status = $this->store->get( $posKey );
if ( !$status ) {
$status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ];
}
// Get events for entities who's keys tombstones/hold-off should have expired by now
$events = call_user_func_array(
$this->logChunkCallback,
[ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ]
);
$event = null;
$keyEvents = [];
foreach ( $events as $event ) {
$keys = call_user_func_array(
$this->keyListCallback,
[ $this->cache, $event['item'] ]
);
foreach ( $keys as $key ) {
unset( $keyEvents[$key] ); // use only the latest per key
$keyEvents[$key] = [
'pos' => $event['pos'],
'id' => $event['id']
];
}
}
$purgeCount = 0;
$lastOkEvent = null;
foreach ( $keyEvents as $key => $keyEvent ) {
if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) {
break;
}
++$purgeCount;
$lastOkEvent = $event;
}
if ( $lastOkEvent ) {
$ok = $this->store->merge(
$posKey,
function ( $bag, $key, $curValue ) use ( $lastOkEvent ) {
if ( !$curValue ) {
// Use new position
} else {
$curCoord = [ $curValue['pos'], $curValue['id'] ];
$newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ];
if ( $newCoord < $curCoord ) {
// Keep prior position instead of rolling it back
return $curValue;
}
}
return [
'pos' => $lastOkEvent['pos'],
'id' => $lastOkEvent['id'],
'ctime' => $curValue ? $curValue['ctime'] : date( 'c' )
];
},
IExpiringStore::TTL_INDEFINITE
);
$pos = $lastOkEvent['pos'];
$id = $lastOkEvent['id'];
if ( $ok ) {
$this->logger->info( "Updated cache reap position ($pos, $id)." );
} else {
$this->logger->error( "Could not update cache reap position ($pos, $id)." );
}
}
ScopedCallback::consume( $scopeLock );
return $purgeCount;
}
/**
* @return array|bool Returns (pos, id) map or false if not set
*/
public function getState() {
$posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
return $this->store->get( $posKey );
}
}

View file

@ -3652,4 +3652,15 @@ class WikiPage implements Page, IDBAccessObject {
public function getSourceURL() {
return $this->getTitle()->getCanonicalURL();
}
/*
* @param WANObjectCache $cache
* @return string[]
* @since 1.28
*/
public function getMutableCacheKeys( WANObjectCache $cache ) {
$linkCache = MediaWikiServices::getInstance()->getLinkCache();
return $linkCache->getMutableCacheKeys( $cache, $this->getTitle()->getTitleValue() );
}
}

View file

@ -20,6 +20,7 @@
* @file
* @ingroup SpecialPage
*/
use MediaWiki\Logger\LoggerFactory;
/**
* Special page which uses a ChangesList to show query results.
@ -77,6 +78,14 @@ abstract class ChangesListSpecialPage extends SpecialPage {
$this->webOutput( $rows, $opts );
$rows->free();
if ( $this->getConfig()->get( 'EnableWANCacheReaper' ) ) {
// Clean up any bad page entries for titles showing up in RC
DeferredUpdates::addUpdate( new WANCacheReapUpdate(
$this->getDB(),
LoggerFactory::getInstance( 'objectcache' )
) );
}
}
/**

View file

@ -468,6 +468,17 @@ class User implements IDBAccessObject {
return $cache->makeGlobalKey( 'user', 'id', wfWikiID(), $this->mId );
}
/**
* @param WANObjectCache $cache
* @return string[]
* @since 1.28
*/
public function getMutableCacheKeys( WANObjectCache $cache ) {
$id = $this->getId();
return $id ? [ $this->getCacheKey( $cache ) ] : [];
}
/**
* Load user data from shared cache, given mId has already been set.
*

View file

@ -872,6 +872,62 @@ class WANObjectCacheTest extends PHPUnit_Framework_TestCase {
$this->assertGreaterThan( -5.1, $curTTL, "Correct CTL" );
}
/**
* @covers WANObjectCache::reap()
* @covers WANObjectCache::reapCheckKey()
*/
public function testReap() {
$vKey1 = wfRandomString();
$vKey2 = wfRandomString();
$tKey1 = wfRandomString();
$tKey2 = wfRandomString();
$value = 'moo';
$knownPurge = time() - 60;
$goodTime = microtime( true ) - 5;
$badTime = microtime( true ) - 300;
$this->internalCache->set(
WANObjectCache::VALUE_KEY_PREFIX . $vKey1,
[
WANObjectCache::FLD_VERSION => WANObjectCache::VERSION,
WANObjectCache::FLD_VALUE => $value,
WANObjectCache::FLD_TTL => 3600,
WANObjectCache::FLD_TIME => $goodTime
]
);
$this->internalCache->set(
WANObjectCache::VALUE_KEY_PREFIX . $vKey2,
[
WANObjectCache::FLD_VERSION => WANObjectCache::VERSION,
WANObjectCache::FLD_VALUE => $value,
WANObjectCache::FLD_TTL => 3600,
WANObjectCache::FLD_TIME => $badTime
]
);
$this->internalCache->set(
WANObjectCache::TIME_KEY_PREFIX . $tKey1,
WANObjectCache::PURGE_VAL_PREFIX . $goodTime
);
$this->internalCache->set(
WANObjectCache::TIME_KEY_PREFIX . $tKey2,
WANObjectCache::PURGE_VAL_PREFIX . $badTime
);
$this->assertEquals( $value, $this->cache->get( $vKey1 ) );
$this->assertEquals( $value, $this->cache->get( $vKey2 ) );
$this->cache->reap( $vKey1, $knownPurge, $bad1 );
$this->cache->reap( $vKey2, $knownPurge, $bad2 );
$this->assertFalse( $bad1 );
$this->assertTrue( $bad2 );
$this->cache->reapCheckKey( $tKey1, $knownPurge, $tBad1 );
$this->cache->reapCheckKey( $tKey2, $knownPurge, $tBad2 );
$this->assertFalse( $tBad1 );
$this->assertTrue( $tBad2 );
}
/**
* @covers WANObjectCache::set()
*/
@ -926,6 +982,8 @@ class WANObjectCacheTest extends PHPUnit_Framework_TestCase {
$wanCache->getMulti( [ 'x', 'y' ], $ctls, [ 'check2' ] );
$wanCache->getWithSetCallback( 'p', 30, $valFunc );
$wanCache->getCheckKeyTime( 'zzz' );
$wanCache->reap( 'x', time() - 300 );
$wanCache->reap( 'zzz', time() - 300 );
}
/**