[Upload] Moved async upload stuff to the job queue.

* (bug 44080) Also carry-over the IP and HTTP header info.
* This adds a RequestContext::importScopedSession() function.

Change-Id: Ie9c0a4d78fb719569c8149b9cc8a5430f0ac5673
This commit is contained in:
Aaron Schulz 2013-02-13 13:25:37 -08:00
parent 3e06fef308
commit fbf34d84ab
7 changed files with 197 additions and 114 deletions

View file

@ -675,6 +675,8 @@ $wgAutoloadLocalClasses = array(
'RefreshLinksJob' => 'includes/job/jobs/RefreshLinksJob.php',
'RefreshLinksJob2' => 'includes/job/jobs/RefreshLinksJob.php',
'UploadFromUrlJob' => 'includes/job/jobs/UploadFromUrlJob.php',
'AssembleUploadChunksJob' => 'includes/job/jobs/AssembleUploadChunksJob.php',
'PublishStashedFileJob' => 'includes/job/jobs/PublishStashedFileJob.php',
# includes/json
'FormatJson' => 'includes/json/FormatJson.php',

View file

@ -311,6 +311,13 @@ $wgUploadStashMaxAge = 6 * 3600; // 6 hours
/** Allows to move images and other media files */
$wgAllowImageMoving = true;
/**
* Enable deferred upload tasks that use the job queue.
* Only enable this if job runners are set up for both the
* 'AssembleUploadChunks' and 'PublishStashedFile' job types.
*/
$wgEnableAsyncUploads = false;
/**
* These are additional characters that should be replaced with '-' in filenames
*/
@ -5513,6 +5520,8 @@ $wgJobClasses = array(
'enotifNotify' => 'EnotifNotifyJob',
'fixDoubleRedirect' => 'DoubleRedirectJob',
'uploadFromUrl' => 'UploadFromUrlJob',
'AssembleUploadChunks' => 'AssembleUploadChunksJob',
'PublishStashedFile' => 'PublishStashedFileJob',
'null' => 'NullJob'
);
@ -5526,7 +5535,7 @@ $wgJobClasses = array(
* - Jobs that you want to run on specialized machines ( like transcoding, or a particular
* machine on your cluster has 'outside' web access you could restrict uploadFromUrl )
*/
$wgJobTypesExcludedFromDefaultQueue = array();
$wgJobTypesExcludedFromDefaultQueue = array( 'AssembleUploadChunks', 'PublishStashedFile' );
/**
* Map of job types to configuration arrays.
@ -6200,7 +6209,7 @@ $wgMaxShellTime = 180;
$wgMaxShellWallClockTime = 180;
/**
* Under Linux: a cgroup directory used to constrain memory usage of shell
* Under Linux: a cgroup directory used to constrain memory usage of shell
* commands. The directory must be writable by the user which runs MediaWiki.
*
* If specified, this is used instead of ulimit, which is inaccurate, and
@ -6208,7 +6217,7 @@ $wgMaxShellWallClockTime = 180;
* them segfault or deadlock.
*
* A wrapper script will create a cgroup for each shell command that runs, as
* a subgroup of the specified cgroup. If the memory limit is exceeded, the
* a subgroup of the specified cgroup. If the memory limit is exceeded, the
* kernel will send a SIGKILL signal to a process in the subgroup.
*
* @par Example:
@ -6218,7 +6227,7 @@ $wgMaxShellWallClockTime = 180;
* echo '$wgShellCgroup = "/sys/fs/cgroup/memory/mediawiki/job";' >> LocalSettings.php
* @endcode
*
* The reliability of cgroup cleanup can be improved by installing a
* The reliability of cgroup cleanup can be improved by installing a
* notify_on_release script in the root cgroup, see e.g.
* https://gerrit.wikimedia.org/r/#/c/40784
*/

View file

@ -1124,6 +1124,30 @@ HTML;
$this->ip = $ip;
return $ip;
}
/**
* @param string $ip
* @return void
* @since 1.21
*/
public function setIP( $ip ) {
$this->ip = $ip;
}
/**
* Export the resolved user IP, HTTP headers, and session ID.
* The result will be reasonably sized to allow for serialization.
*
* @return Array
* @since 1.21
*/
public function exportUserSession() {
return array(
'ip' => $this->getIP(),
'headers' => $this->getAllHeaders(),
'sessionId' => session_id()
);
}
}
/**
@ -1263,8 +1287,9 @@ class FauxRequest extends WebRequest {
throw new MWException( "FauxRequest() got bogus data" );
}
$this->wasPosted = $wasPosted;
if( $session )
if( $session ) {
$this->session = $session;
}
}
/**

View file

@ -37,6 +37,8 @@ class ApiUpload extends ApiBase {
protected $mParams;
public function execute() {
global $wgEnableAsyncUploads;
// Check whether upload is enabled
if ( !UploadBase::isEnabled() ) {
$this->dieUsageMsg( 'uploaddisabled' );
@ -47,9 +49,8 @@ class ApiUpload extends ApiBase {
// Parameter handling
$this->mParams = $this->extractRequestParams();
$request = $this->getMain()->getRequest();
// Check if async mode is actually supported
$this->mParams['async'] = ( $this->mParams['async'] && !wfIsWindows() );
$this->mParams['async'] = false; // XXX: disabled per bug 44080
// Check if async mode is actually supported (jobs done in cli mode)
$this->mParams['async'] = ( $this->mParams['async'] && $wgEnableAsyncUploads );
// Add the uploaded file to the params array
$this->mParams['file'] = $request->getFileName( 'file' );
$this->mParams['chunk'] = $request->getFileName( 'chunk' );
@ -205,8 +206,8 @@ class ApiUpload extends ApiBase {
}
// Check we added the last chunk:
if( $this->mParams['offset'] + $chunkSize == $this->mParams['filesize'] ) {
if ( $this->mParams['async'] && !wfIsWindows() ) {
if ( $this->mParams['offset'] + $chunkSize == $this->mParams['filesize'] ) {
if ( $this->mParams['async'] ) {
$progress = UploadBase::getSessionStatus( $this->mParams['filekey'] );
if ( $progress && $progress['result'] === 'Poll' ) {
$this->dieUsage( "Chunk assembly already in progress.", 'stashfailed' );
@ -216,22 +217,16 @@ class ApiUpload extends ApiBase {
array( 'result' => 'Poll',
'stage' => 'queued', 'status' => Status::newGood() )
);
$retVal = 1;
$cmd = wfShellWikiCmd(
"$IP/includes/upload/AssembleUploadChunks.php",
$ok = JobQueueGroup::singleton()->push( new AssembleUploadChunksJob(
Title::makeTitle( NS_FILE, $this->mParams['filekey'] ),
array(
'--wiki', wfWikiID(),
'--filename', $this->mParams['filename'],
'--filekey', $this->mParams['filekey'],
'--userid', $this->getUser()->getId(),
'--sessionid', session_id(),
'--quiet'
'filename' => $this->mParams['filename'],
'filekey' => $this->mParams['filekey'],
'session' => $this->getRequest()->exportUserSession(),
'userid' => $this->getUser()->getId()
)
) . " < " . wfGetNull() . " > " . wfGetNull() . " 2>&1 &";
// Start a process in the background. Enforce the time limits via PHP
// since ulimit4.sh seems to often not work for this particular usage.
wfShellExec( $cmd, $retVal, array(), array( 'time' => 0, 'memory' => 0 ) );
if ( $retVal == 0 ) {
) );
if ( $ok ) {
$result['result'] = 'Poll';
} else {
UploadBase::setSessionStatus( $this->mParams['filekey'], false );
@ -596,25 +591,19 @@ class ApiUpload extends ApiBase {
$this->mParams['filekey'],
array( 'result' => 'Poll', 'stage' => 'queued', 'status' => Status::newGood() )
);
$retVal = 1;
$cmd = wfShellWikiCmd(
"$IP/includes/upload/PublishStashedFile.php",
$ok = JobQueueGroup::singleton()->push( new PublishStashedFileJob(
Title::makeTitle( NS_FILE, $this->mParams['filename'] ),
array(
'--wiki', wfWikiID(),
'--filename', $this->mParams['filename'],
'--filekey', $this->mParams['filekey'],
'--userid', $this->getUser()->getId(),
'--comment', $this->mParams['comment'],
'--text', $this->mParams['text'],
'--watch', $watch,
'--sessionid', session_id(),
'--quiet'
'filename' => $this->mParams['filename'],
'filekey' => $this->mParams['filekey'],
'comment' => $this->mParams['comment'],
'text' => $this->mParams['text'],
'watch' => $watch,
'session' => $this->getRequest()->exportUserSession(),
'userid' => $this->getUser()->getId()
)
) . " < " . wfGetNull() . " > " . wfGetNull() . " 2>&1 &";
// Start a process in the background. Enforce the time limits via PHP
// since ulimit4.sh seems to often not work for this particular usage.
wfShellExec( $cmd, $retVal, array(), array( 'time' => 0, 'memory' => 0 ) );
if ( $retVal == 0 ) {
) );
if ( $ok ) {
$result['result'] = 'Poll';
} else {
UploadBase::setSessionStatus( $this->mParams['filekey'], false );

View file

@ -392,6 +392,63 @@ class RequestContext implements IContextSource {
return $instance;
}
/**
* Import the resolved user IP, HTTP headers, and session ID.
* This sets the current session and sets $wgUser and $wgRequest.
* Once the return value falls out of scope, the old context is restored.
* This function can only be called within CLI mode scripts.
*
* This will setup the session from the given ID. This is useful when
* background scripts inherit some context when acting on behalf of a user.
*
* $param array $params Result of WebRequest::exportUserSession()
* @return ScopedCallback
* @throws MWException
* @since 1.21
*/
public static function importScopedSession( array $params ) {
if ( PHP_SAPI !== 'cli' ) {
// Don't send random private cookie headers to other random users
throw new MWException( "Sessions can only be imported in cli mode." );
}
$importSessionFunction = function( array $params ) {
global $wgRequest, $wgUser;
// Write and close any current session
session_write_close(); // persist
session_id( '' ); // detach
$_SESSION = array(); // clear in-memory array
// Load the new session from the session ID
if ( strlen( $params['sessionId'] ) ) {
wfSetupSession( $params['sessionId'] ); // sets $_SESSION
}
// Build the new WebRequest object
$request = new FauxRequest( array(), false, $_SESSION );
$request->setIP( $params['ip'] );
foreach ( $params['headers'] as $name => $value ) {
$request->setHeader( $name, $value );
}
$context = RequestContext::getMain();
// Set the current context to use the new WebRequest
$context->setRequest( $request );
$wgRequest = $context->getRequest(); // b/c
// Set the current user based on the new session and WebRequest
$context->setUser( User::newFromSession( $request ) ); // uses $_SESSION
$wgUser = $context->getUser(); // b/c
};
// Stash the old session and load in the new one
$oldParams = self::getMain()->getRequest()->exportUserSession();
$importSessionFunction( $params );
// Set callback to save and close the new session and reload the old one
return new ScopedCallback( function() use ( $importSessionFunction, $oldParams ) {
$importSessionFunction( $oldParams );
} );
}
/**
* Create a new extraneous context. The context is filled with information
* external to the current session.

View file

@ -18,65 +18,58 @@
* http://www.gnu.org/copyleft/gpl.html
*
* @file
* @ingroup Maintenance
* @ingroup Upload
*/
require_once( __DIR__ . '/../../maintenance/Maintenance.php' );
set_time_limit( 3600 ); // 1 hour
/**
* Assemble the segments of a chunked upload.
*
* @ingroup Maintenance
* @ingroup Upload
*/
class AssembleUploadChunks extends Maintenance {
public function __construct() {
parent::__construct();
$this->mDescription = "Re-assemble the segments of a chunked upload into a single file";
$this->addOption( 'filename', "Desired file name", true, true );
$this->addOption( 'filekey', "Upload stash file key", true, true );
$this->addOption( 'userid', "Upload owner user ID", true, true );
$this->addOption( 'sessionid', "Upload owner session ID", true, true );
class AssembleUploadChunksJob extends Job {
public function __construct( $title, $params, $id = 0 ) {
parent::__construct( 'AssembleUploadChunks', $title, $params, $id );
$this->removeDuplicates = true;
}
public function execute() {
$e = null;
wfDebug( "Started assembly for file {$this->getOption( 'filename' )}\n" );
wfSetupSession( $this->getOption( 'sessionid' ) );
public function run() {
$scope = RequestContext::importScopedSession( $this->params['session'] );
$context = RequestContext::getMain();
try {
$user = User::newFromId( $this->getOption( 'userid' ) );
if ( !$user ) {
throw new MWException( "No user with ID " . $this->getOption( 'userid' ) . "." );
$user = $context->getUser();
if ( !$user->isLoggedIn() || $user->getId() != $this->params['userid'] ) {
$this->setLastError( "Could not load the author user from session." );
return true; // no retries
}
UploadBase::setSessionStatus(
$this->getOption( 'filekey' ),
$this->params['filekey'],
array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() )
);
$upload = new UploadFromChunks( $user );
$upload->continueChunks(
$this->getOption( 'filename' ),
$this->getOption( 'filekey' ),
// @TODO: set User?
RequestContext::getMain()->getRequest() // dummy request
$this->params['filename'],
$this->params['filekey'],
$context->getRequest()
);
// Combine all of the chunks into a local file and upload that to a new stash file
$status = $upload->concatenateChunks();
if ( !$status->isGood() ) {
UploadBase::setSessionStatus(
$this->getOption( 'filekey' ),
$this->params['filekey'],
array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status )
);
session_write_close();
$this->error( $status->getWikiText() . "\n", 1 ); // die
$this->setLastError( $status->getWikiText() );
return true; // no retries
}
// We have a new filekey for the fully concatenated file
$newFileKey = $upload->getLocalFile()->getFileKey();
// Remove the old stash file row and first chunk file
$upload->stash->removeFileNoAuth( $this->getOption( 'filekey' ) );
$upload->stash->removeFileNoAuth( $this->params['filekey'] );
// Build the image info array while we have the local reference handy
$apiMain = new ApiMain(); // dummy object (XXX)
@ -87,7 +80,7 @@ class AssembleUploadChunks extends Maintenance {
// Cache the info so the user doesn't have to wait forever to get the final info
UploadBase::setSessionStatus(
$this->getOption( 'filekey' ),
$this->params['filekey'],
array(
'result' => 'Success',
'stage' => 'assembling',
@ -98,21 +91,26 @@ class AssembleUploadChunks extends Maintenance {
);
} catch ( MWException $e ) {
UploadBase::setSessionStatus(
$this->getOption( 'filekey' ),
$this->params['filekey'],
array(
'result' => 'Failure',
'stage' => 'assembling',
'status' => Status::newFatal( 'api-error-stashfailed' )
)
);
$this->setLastError( get_class( $e ) . ": " . $e->getText() );
}
session_write_close();
if ( $e ) {
throw $e;
return true; // returns true on success and erro (no retries)
}
/**
* @return Array
*/
public function getDeduplicationInfo() {
$info = parent::getDeduplicationInfo();
if ( is_array( $info['params'] ) ) {
$info['params'] = array( 'filekey' => $info['params']['filekey'] );
}
wfDebug( "Finished assembly for file {$this->getOption( 'filename' )}\n" );
return $info;
}
}
$maintClass = "AssembleUploadChunks";
require_once( RUN_MAINTENANCE_IF_MAIN );

View file

@ -18,39 +18,32 @@
* http://www.gnu.org/copyleft/gpl.html
*
* @file
* @ingroup Maintenance
* @ingroup Upload
*/
require_once( __DIR__ . '/../../maintenance/Maintenance.php' );
set_time_limit( 3600 ); // 1 hour
/**
* Upload a file from the upload stash into the local file repo.
*
* @ingroup Maintenance
* @ingroup Upload
*/
class PublishStashedFile extends Maintenance {
public function __construct() {
parent::__construct();
$this->mDescription = "Upload stashed file into the local file repo";
$this->addOption( 'filename', "Desired file name", true, true );
$this->addOption( 'filekey', "Upload stash file key", true, true );
$this->addOption( 'userid', "Upload owner user ID", true, true );
$this->addOption( 'comment', "Upload comment", true, true );
$this->addOption( 'text', "Upload description", true, true );
$this->addOption( 'watch', "Whether the uploader should watch the page", true, true );
$this->addOption( 'sessionid', "Upload owner session ID", true, true );
class PublishStashedFileJob extends Job {
public function __construct( $title, $params, $id = 0 ) {
parent::__construct( 'PublishStashedFile', $title, $params, $id );
$this->removeDuplicates = true;
}
public function execute() {
wfSetupSession( $this->getOption( 'sessionid' ) );
public function run() {
$scope = RequestContext::importScopedSession( $this->params['session'] );
$context = RequestContext::getMain();
try {
$user = User::newFromId( $this->getOption( 'userid' ) );
if ( !$user ) {
throw new MWException( "No user with ID " . $this->getOption( 'userid' ) . "." );
$user = $context->getUser();
if ( !$user->isLoggedIn() || $user->getId() != $this->params['userid'] ) {
$this->setLastError( "Could not load the author user from session." );
return true; // no retries
}
UploadBase::setSessionStatus(
$this->getOption( 'filekey' ),
$this->params['filekey'],
array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() )
);
@ -59,7 +52,7 @@ class PublishStashedFile extends Maintenance {
// checks and anything else to the stash stage (which includes concatenation and
// the local file is thus already there). That way, instead of GET+PUT, there could
// just be a COPY operation from the stash to the public zone.
$upload->initialize( $this->getOption( 'filekey' ), $this->getOption( 'filename' ) );
$upload->initialize( $this->params['filekey'], $this->params['filename'] );
// Check if the local file checks out (this is generally a no-op)
$verification = $upload->verifyUpload();
@ -67,25 +60,27 @@ class PublishStashedFile extends Maintenance {
$status = Status::newFatal( 'verification-error' );
$status->value = array( 'verification' => $verification );
UploadBase::setSessionStatus(
$this->getOption( 'filekey' ),
$this->params['filekey'],
array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
);
$this->error( "Could not verify upload.\n", 1 ); // die
$this->setLastError( "Could not verify upload." );
return true; // no retries
}
// Upload the stashed file to a permanent location
$status = $upload->performUpload(
$this->getOption( 'comment' ),
$this->getOption( 'text' ),
$this->getOption( 'watch' ),
$this->params['comment'],
$this->params['text'],
$this->params['watch'],
$user
);
if ( !$status->isGood() ) {
UploadBase::setSessionStatus(
$this->getOption( 'filekey' ),
$this->params['filekey'],
array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
);
$this->error( $status->getWikiText() . "\n", 1 ); // die
$this->setLastError( $status->getWikiText() );
return true; // no retries
}
// Build the image info array while we have the local reference handy
@ -97,7 +92,7 @@ class PublishStashedFile extends Maintenance {
// Cache the info so the user doesn't have to wait forever to get the final info
UploadBase::setSessionStatus(
$this->getOption( 'filekey' ),
$this->params['filekey'],
array(
'result' => 'Success',
'stage' => 'publish',
@ -108,18 +103,26 @@ class PublishStashedFile extends Maintenance {
);
} catch ( MWException $e ) {
UploadBase::setSessionStatus(
$this->getOption( 'filekey' ),
$this->params['filekey'],
array(
'result' => 'Failure',
'stage' => 'publish',
'status' => Status::newFatal( 'api-error-publishfailed' )
)
);
throw $e;
$this->setLastError( get_class( $e ) . ": " . $e->getText() );
}
session_write_close();
return true; // returns true on success and erro (no retries)
}
/**
* @return Array
*/
public function getDeduplicationInfo() {
$info = parent::getDeduplicationInfo();
if ( is_array( $info['params'] ) ) {
$info['params'] = array( 'filekey' => $info['params']['filekey'] );
}
return $info;
}
}
$maintClass = "PublishStashedFile";
require_once( RUN_MAINTENANCE_IF_MAIN );