LocalFileMoveBatch lock improvements

* Lock the file earlier: before the DB locking selects.
* Unlock the file later: after the transaction is committed.

Per CR comments on Iaf9c9eef617044656b5b0d91a9632eacd84a5a78.

Bug: T283045
Change-Id: I075af66aad9221e0c5803dfc13b023fb20e70f83
This commit is contained in:
Tim Starling 2021-11-22 15:27:49 +11:00
parent 6e29611642
commit b2aa100f20
2 changed files with 95 additions and 13 deletions

View file

@ -2326,7 +2326,10 @@ class LocalFile extends File {
wfDebugLog( 'imagemove', "Got request to move {$this->name} to " . $target->getText() );
$batch = new LocalFileMoveBatch( $this, $target );
$batch->addCurrent();
$status = $batch->addCurrent();
if ( !$status->isOK() ) {
return $status;
}
$archiveNames = $batch->addOlds();
$status = $batch->execute();

View file

@ -70,6 +70,15 @@ class LocalFileMoveBatch {
/** @var LoggerInterface */
private $logger;
/** @var bool */
private $haveSourceLock = false;
/** @var bool */
private $haveTargetLock = false;
/** @var LocalFile|null */
private $targetFile;
/**
* @param LocalFile $file
* @param Title $target
@ -90,9 +99,15 @@ class LocalFileMoveBatch {
/**
* Add the current image to the batch
*
* @return Status
*/
public function addCurrent() {
$this->cur = [ $this->oldRel, $this->newRel ];
$status = $this->acquireSourceLock();
if ( $status->isOK() ) {
$this->cur = [ $this->oldRel, $this->newRel ];
}
return $status;
}
/**
@ -109,7 +124,7 @@ class LocalFileMoveBatch {
[ 'oi_archive_name', 'oi_deleted' ],
[ 'oi_name' => $this->oldName ],
__METHOD__,
[ 'LOCK IN SHARE MODE' ] // ignore snapshot
[ 'FOR UPDATE' ] // ignore snapshot
);
foreach ( $result as $row ) {
@ -151,6 +166,65 @@ class LocalFileMoveBatch {
return $archiveNames;
}
/**
* Acquire the source file lock, if it has not been acquired already
*
* @return Status
*/
protected function acquireSourceLock() {
if ( $this->haveSourceLock ) {
return Status::newGood();
}
$status = $this->file->acquireFileLock();
if ( $status->isOK() ) {
$this->haveSourceLock = true;
}
return $status;
}
/**
* Acquire the target file lock, if it has not been acquired already
*
* @return Status
*/
protected function acquireTargetLock() {
if ( $this->haveTargetLock ) {
return Status::newGood();
}
$status = $this->getTargetFile()->acquireFileLock();
if ( $status->isOK() ) {
$this->haveTargetLock = true;
}
return $status;
}
/**
* Release both file locks
*/
protected function releaseLocks() {
if ( $this->haveSourceLock ) {
$this->file->releaseFileLock();
$this->haveSourceLock = false;
}
if ( $this->haveTargetLock ) {
$this->getTargetFile()->releaseFileLock();
$this->haveTargetLock = false;
}
}
/**
* Get the target file
*
* @return LocalFile
*/
protected function getTargetFile() {
if ( $this->targetFile === null ) {
$this->targetFile = MediaWikiServices::getInstance()->getRepoGroup()->getLocalRepo()
->newFile( $this->target );
}
return $this->targetFile;
}
/**
* Perform the move.
* @return Status
@ -158,21 +232,18 @@ class LocalFileMoveBatch {
public function execute() {
$repo = $this->file->repo;
$status = $repo->newGood();
$destFile = MediaWikiServices::getInstance()->getRepoGroup()->getLocalRepo()
->newFile( $this->target );
$status->merge( $this->file->acquireFileLock() );
$status->merge( $this->acquireSourceLock() );
if ( !$status->isOK() ) {
return $status;
}
$status->merge( $destFile->acquireFileLock() );
$status->merge( $this->acquireTargetLock() );
if ( !$status->isOK() ) {
$this->file->releaseFileLock();
$this->releaseLocks();
return $status;
}
$unlockScope = new ScopedCallback( function () use ( $destFile ) {
$this->file->releaseFileLock();
$destFile->releaseFileLock();
$unlockScope = new ScopedCallback( function () {
$this->releaseLocks();
} );
$triplets = $this->getMoveTriplets();
@ -234,11 +305,19 @@ class LocalFileMoveBatch {
]
);
ScopedCallback::consume( $unlockScope );
// Everything went ok, remove the source files
$this->cleanupSource( $triplets );
// Defer lock release until the transaction is committed.
if ( $this->db->trxLevel() ) {
$unlockScope->cancel();
$this->db->onTransactionResolution( function () {
$this->releaseLocks();
} );
} else {
ScopedCallback::consume( $unlockScope );
}
$status->merge( $statusDb );
return $status;