diff --git a/includes/export/WikiExporter.php b/includes/export/WikiExporter.php index b0185843e28..1f2b81d3a8c 100644 --- a/includes/export/WikiExporter.php +++ b/includes/export/WikiExporter.php @@ -52,14 +52,10 @@ class WikiExporter { const LOGS = 8; const RANGE = 16; - const BUFFER = 0; - const STREAM = 1; - const TEXT = 0; const STUB = 1; - /** @var int */ - public $buffer; + const BATCH_SIZE = 1000; /** @var int */ public $text; @@ -76,26 +72,17 @@ class WikiExporter { } /** - * If using WikiExporter::STREAM to stream a large amount of data, - * provide a database connection which is not managed by - * LoadBalancer to read from: some history blob types will - * make additional queries to pull source data while the - * main query is still running. - * * @param IDatabase $db * @param int|array $history One of WikiExporter::FULL, WikiExporter::CURRENT, * WikiExporter::RANGE or WikiExporter::STABLE, or an associative array: * - offset: non-inclusive offset at which to start the query * - limit: maximum number of rows to return * - dir: "asc" or "desc" timestamp order - * @param int $buffer One of WikiExporter::BUFFER or WikiExporter::STREAM * @param int $text One of WikiExporter::TEXT or WikiExporter::STUB */ - function __construct( $db, $history = self::CURRENT, - $buffer = self::BUFFER, $text = self::TEXT ) { + function __construct( $db, $history = self::CURRENT, $text = self::TEXT ) { $this->db = $db; $this->history = $history; - $this->buffer = $buffer; $this->writer = new XmlDumpWriter(); $this->sink = new DumpOutput(); $this->text = $text; @@ -263,206 +250,191 @@ class WikiExporter { * @throws Exception */ protected function dumpFrom( $cond = '', $orderRevs = false ) { - global $wgMultiContentRevisionSchemaMigrationStage; - - # For logging dumps... if ( $this->history & self::LOGS ) { - $where = []; - # Hide private logs - $hideLogs = LogEventsList::getExcludeClause( $this->db ); - if ( $hideLogs ) { - $where[] = $hideLogs; - } - # Add on any caller specified conditions - if ( $cond ) { - $where[] = $cond; - } - # Get logging table name for logging.* clause - $logging = $this->db->tableName( 'logging' ); - - if ( $this->buffer == self::STREAM ) { - $prev = $this->db->bufferResults( false ); - } - $result = null; // Assuring $result is not undefined, if exception occurs early - - $commentQuery = CommentStore::getStore()->getJoin( 'log_comment' ); - $actorQuery = ActorMigration::newMigration()->getJoin( 'log_user' ); - - try { - $result = $this->db->select( - array_merge( [ 'logging' ], $commentQuery['tables'], $actorQuery['tables'], [ 'user' ] ), - [ "{$logging}.*", 'user_name' ] + $commentQuery['fields'] + $actorQuery['fields'], - $where, - __METHOD__, - [ 'ORDER BY' => 'log_id', 'USE INDEX' => [ 'logging' => 'PRIMARY' ] ], - [ - 'user' => [ 'JOIN', 'user_id = ' . $actorQuery['fields']['log_user'] ] - ] + $commentQuery['joins'] + $actorQuery['joins'] - ); - $this->outputLogStream( $result ); - if ( $this->buffer == self::STREAM ) { - $this->db->bufferResults( $prev ); - } - } catch ( Exception $e ) { - // Throwing the exception does not reliably free the resultset, and - // would also leave the connection in unbuffered mode. - - // Freeing result - try { - if ( $result ) { - $result->free(); - } - } catch ( Exception $e2 ) { - // Already in panic mode -> ignoring $e2 as $e has - // higher priority - } - - // Putting database back in previous buffer mode - try { - if ( $this->buffer == self::STREAM ) { - $this->db->bufferResults( $prev ); - } - } catch ( Exception $e2 ) { - // Already in panic mode -> ignoring $e2 as $e has - // higher priority - } - - // Inform caller about problem - throw $e; - } - # For page dumps... + $this->dumpLogs( $cond ); } else { - if ( !( $wgMultiContentRevisionSchemaMigrationStage & SCHEMA_COMPAT_WRITE_OLD ) ) { - // TODO: Make XmlDumpWriter use a RevisionStore! (see T198706 and T174031) - throw new MWException( - 'Cannot use WikiExporter with SCHEMA_COMPAT_WRITE_OLD mode disabled!' - . ' Support for dumping from the new schema is not implemented yet!' - ); + $this->dumpPages( $cond, $orderRevs ); + } + } + + /** + * @param string $cond + * @throws Exception + */ + protected function dumpLogs( $cond ) { + $where = []; + # Hide private logs + $hideLogs = LogEventsList::getExcludeClause( $this->db ); + if ( $hideLogs ) { + $where[] = $hideLogs; + } + # Add on any caller specified conditions + if ( $cond ) { + $where[] = $cond; + } + # Get logging table name for logging.* clause + $logging = $this->db->tableName( 'logging' ); + + $result = null; // Assuring $result is not undefined, if exception occurs early + + $commentQuery = CommentStore::getStore()->getJoin( 'log_comment' ); + $actorQuery = ActorMigration::newMigration()->getJoin( 'log_user' ); + + $lastLogId = 0; + while ( true ) { + $result = $this->db->select( + array_merge( [ 'logging' ], $commentQuery['tables'], $actorQuery['tables'], [ 'user' ] ), + [ "{$logging}.*", 'user_name' ] + $commentQuery['fields'] + $actorQuery['fields'], + array_merge( $where, [ 'log_id > ' . intval( $lastLogId ) ] ), + __METHOD__, + [ + 'ORDER BY' => 'log_id', + 'USE INDEX' => [ 'logging' => 'PRIMARY' ], + 'LIMIT' => self::BATCH_SIZE, + ], + [ + 'user' => [ 'JOIN', 'user_id = ' . $actorQuery['fields']['log_user'] ] + ] + $commentQuery['joins'] + $actorQuery['joins'] + ); + + if ( !$result->numRows() ) { + break; } - $revOpts = [ 'page' ]; - if ( $this->text != self::STUB ) { - // TODO: remove the text and make XmlDumpWriter use a RevisionStore instead! (T198706) - $revOpts[] = 'text'; - } - $revQuery = Revision::getQueryInfo( $revOpts ); + $lastLogId = $this->outputLogStream( $result ); + }; + } - // We want page primary rather than revision - $tables = array_merge( [ 'page' ], array_diff( $revQuery['tables'], [ 'page' ] ) ); - $join = $revQuery['joins'] + [ + /** + * @param string $cond + * @param bool $orderRevs + * @throws MWException + * @throws Exception + */ + protected function dumpPages( $cond, $orderRevs ) { + global $wgMultiContentRevisionSchemaMigrationStage; + if ( !( $wgMultiContentRevisionSchemaMigrationStage & SCHEMA_COMPAT_WRITE_OLD ) ) { + // TODO: Make XmlDumpWriter use a RevisionStore! (see T198706 and T174031) + throw new MWException( + 'Cannot use WikiExporter with SCHEMA_COMPAT_WRITE_OLD mode disabled!' + . ' Support for dumping from the new schema is not implemented yet!' + ); + } + + $revOpts = [ 'page' ]; + if ( $this->text != self::STUB ) { + // TODO: remove the text and make XmlDumpWriter use a RevisionStore instead! (T198706) + $revOpts[] = 'text'; + } + $revQuery = Revision::getQueryInfo( $revOpts ); + + // We want page primary rather than revision + $tables = array_merge( [ 'page' ], array_diff( $revQuery['tables'], [ 'page' ] ) ); + $join = $revQuery['joins'] + [ 'revision' => $revQuery['joins']['page'] ]; - unset( $join['page'] ); + unset( $join['page'] ); - // TODO: remove rev_text_id and make XmlDumpWriter use a RevisionStore instead! (T198706) - $fields = array_merge( $revQuery['fields'], [ 'page_restrictions, rev_text_id' ] ); + // TODO: remove rev_text_id and make XmlDumpWriter use a RevisionStore instead! (T198706) + $fields = array_merge( $revQuery['fields'], [ 'page_restrictions, rev_text_id' ] ); - $conds = []; - if ( $cond !== '' ) { - $conds[] = $cond; - } - $opts = [ 'ORDER BY' => 'page_id ASC' ]; - $opts['USE INDEX'] = []; - if ( is_array( $this->history ) ) { - # Time offset/limit for all pages/history... - # Set time order - if ( $this->history['dir'] == 'asc' ) { - $op = '>'; - $opts['ORDER BY'] = 'rev_timestamp ASC'; - } else { - $op = '<'; - $opts['ORDER BY'] = 'rev_timestamp DESC'; - } - # Set offset - if ( !empty( $this->history['offset'] ) ) { - $conds[] = "rev_timestamp $op " . - $this->db->addQuotes( $this->db->timestamp( $this->history['offset'] ) ); - } - # Set query limit - if ( !empty( $this->history['limit'] ) ) { - $opts['LIMIT'] = intval( $this->history['limit'] ); - } - } elseif ( $this->history & self::FULL ) { - # Full history dumps... - # query optimization for history stub dumps - if ( $this->text == self::STUB && $orderRevs ) { - $tables = $revQuery['tables']; - $opts['ORDER BY'] = [ 'rev_page ASC', 'rev_id ASC' ]; - $opts['USE INDEX']['revision'] = 'rev_page_id'; - unset( $join['revision'] ); - $join['page'] = [ 'INNER JOIN', 'rev_page=page_id' ]; - } - } elseif ( $this->history & self::CURRENT ) { - # Latest revision dumps... - if ( $this->list_authors && $cond != '' ) { // List authors, if so desired - $this->do_list_authors( $cond ); - } - $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ]; - } elseif ( $this->history & self::STABLE ) { - # "Stable" revision dumps... - # Default JOIN, to be overridden... - $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ]; - # One, and only one hook should set this, and return false - if ( Hooks::run( 'WikiExporter::dumpStableQuery', [ &$tables, &$opts, &$join ] ) ) { - throw new MWException( __METHOD__ . " given invalid history dump type." ); - } - } elseif ( $this->history & self::RANGE ) { - # Dump of revisions within a specified range - $opts['ORDER BY'] = [ 'rev_page ASC', 'rev_id ASC' ]; + $conds = []; + if ( $cond !== '' ) { + $conds[] = $cond; + } + $opts = [ 'ORDER BY' => [ 'rev_page ASC', 'rev_id ASC' ] ]; + $opts['USE INDEX'] = []; + + $op = '>'; + if ( is_array( $this->history ) ) { + # Time offset/limit for all pages/history... + # Set time order + if ( $this->history['dir'] == 'asc' ) { + $opts['ORDER BY'] = 'rev_timestamp ASC'; } else { - # Unknown history specification parameter? + $op = '<'; + $opts['ORDER BY'] = 'rev_timestamp DESC'; + } + # Set offset + if ( !empty( $this->history['offset'] ) ) { + $conds[] = "rev_timestamp $op " . + $this->db->addQuotes( $this->db->timestamp( $this->history['offset'] ) ); + } + # Set query limit + if ( !empty( $this->history['limit'] ) ) { + $maxRowCount = intval( $this->history['limit'] ); + } + } elseif ( $this->history & self::FULL ) { + # Full history dumps... + # query optimization for history stub dumps + if ( $this->text == self::STUB && $orderRevs ) { + $tables = $revQuery['tables']; + $opts['USE INDEX']['revision'] = 'rev_page_id'; + unset( $join['revision'] ); + $join['page'] = [ 'INNER JOIN', 'rev_page=page_id' ]; + } + } elseif ( $this->history & self::CURRENT ) { + # Latest revision dumps... + if ( $this->list_authors && $cond != '' ) { // List authors, if so desired + $this->do_list_authors( $cond ); + } + $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ]; + } elseif ( $this->history & self::STABLE ) { + # "Stable" revision dumps... + # Default JOIN, to be overridden... + $join['revision'] = [ 'INNER JOIN', 'page_id=rev_page AND page_latest=rev_id' ]; + # One, and only one hook should set this, and return false + if ( Hooks::run( 'WikiExporter::dumpStableQuery', [ &$tables, &$opts, &$join ] ) ) { throw new MWException( __METHOD__ . " given invalid history dump type." ); } + } elseif ( $this->history & self::RANGE ) { + # Dump of revisions within a specified range. Condition already set in revsByRange(). + } else { + # Unknown history specification parameter? + throw new MWException( __METHOD__ . " given invalid history dump type." ); + } - if ( $this->buffer == self::STREAM ) { - $prev = $this->db->bufferResults( false ); + $result = null; // Assuring $result is not undefined, if exception occurs early + $done = false; + $lastRow = null; + $revPage = 0; + $revId = 0; + $rowCount = 0; + + $opts['LIMIT'] = self::BATCH_SIZE; + + Hooks::run( 'ModifyExportQuery', + [ $this->db, &$tables, &$cond, &$opts, &$join ] ); + + while ( !$done ) { + // If necessary, impose the overall maximum and stop looping after this iteration. + if ( !empty( $maxRowCount ) && $rowCount + self::BATCH_SIZE > $maxRowCount ) { + $opts['LIMIT'] = $maxRowCount - $rowCount; + $done = true; } - $result = null; // Assuring $result is not undefined, if exception occurs early - try { - Hooks::run( 'ModifyExportQuery', - [ $this->db, &$tables, &$cond, &$opts, &$join ] ); - # Do the query! - $result = $this->db->select( - $tables, - $fields, - $conds, - __METHOD__, - $opts, - $join - ); - # Output dump results - $this->outputPageStream( $result ); + $queryConds = $conds; + $queryConds[] = 'rev_page>' . intval( $revPage ) . ' OR (rev_page=' . + intval( $revPage ) . ' AND rev_id' . $op . intval( $revId ) . ')'; - if ( $this->buffer == self::STREAM ) { - $this->db->bufferResults( $prev ); - } - } catch ( Exception $e ) { - // Throwing the exception does not reliably free the resultset, and - // would also leave the connection in unbuffered mode. + # Do the query! + $result = $this->db->select( + $tables, + $fields, + $queryConds, + __METHOD__, + $opts, + $join + ); + # Output dump results, get new max ids. + $lastRow = $this->outputPageStream( $result, $lastRow ); - // Freeing result - try { - if ( $result ) { - $result->free(); - } - } catch ( Exception $e2 ) { - // Already in panic mode -> ignoring $e2 as $e has - // higher priority - } - - // Putting database back in previous buffer mode - try { - if ( $this->buffer == self::STREAM ) { - $this->db->bufferResults( $prev ); - } - } catch ( Exception $e2 ) { - // Already in panic mode -> ignoring $e2 as $e has - // higher priority - } - - // Inform caller about problem - throw $e; + if ( !$result->numRows() || !$lastRow ) { + $done = true; + } else { + $rowCount += $result->numRows(); + $revPage = $lastRow->rev_page; + $revId = $lastRow->rev_id; } } } @@ -472,52 +444,55 @@ class WikiExporter { * The result set should be sorted/grouped by page to avoid duplicate * page records in the output. * - * Should be safe for - * streaming (non-buffered) queries, as long as it was made on a - * separate database connection not managed by LoadBalancer; some - * blob storage types will make queries to pull source data. - * * @param ResultWrapper $resultset + * @param object $lastRow the last row output from the previous call (or null if none) + * @return object the last row processed */ - protected function outputPageStream( $resultset ) { - $last = null; - foreach ( $resultset as $row ) { - if ( $last === null || - $last->page_namespace != $row->page_namespace || - $last->page_title != $row->page_title ) { - if ( $last !== null ) { - $output = ''; - if ( $this->dumpUploads ) { - $output .= $this->writer->writeUploads( $last, $this->dumpUploadFileContents ); + protected function outputPageStream( $resultset, $lastRow ) { + if ( $resultset->numRows() ) { + foreach ( $resultset as $row ) { + if ( $lastRow === null || + $lastRow->page_namespace != $row->page_namespace || + $lastRow->page_title != $row->page_title ) { + if ( $lastRow !== null ) { + $output = ''; + if ( $this->dumpUploads ) { + $output .= $this->writer->writeUploads( $lastRow, $this->dumpUploadFileContents ); + } + $output .= $this->writer->closePage(); + $this->sink->writeClosePage( $output ); } - $output .= $this->writer->closePage(); - $this->sink->writeClosePage( $output ); + $output = $this->writer->openPage( $row ); + $this->sink->writeOpenPage( $row, $output ); } - $output = $this->writer->openPage( $row ); - $this->sink->writeOpenPage( $row, $output ); - $last = $row; + $output = $this->writer->writeRevision( $row ); + $this->sink->writeRevision( $row, $output ); + $lastRow = $row; } - $output = $this->writer->writeRevision( $row ); - $this->sink->writeRevision( $row, $output ); - } - if ( $last !== null ) { + } elseif ( $lastRow !== null ) { + // Empty resultset means done with all batches Close off final page element (if any). $output = ''; if ( $this->dumpUploads ) { - $output .= $this->writer->writeUploads( $last, $this->dumpUploadFileContents ); + $output .= $this->writer->writeUploads( $lastRow, $this->dumpUploadFileContents ); } $output .= $this->author_list; $output .= $this->writer->closePage(); $this->sink->writeClosePage( $output ); + $lastRow = null; } + + return $lastRow; } /** * @param ResultWrapper $resultset + * @return int the log_id value of the last item output, or null if none */ protected function outputLogStream( $resultset ) { foreach ( $resultset as $row ) { $output = $this->writer->writeLogItem( $row ); $this->sink->writeLogItem( $row, $output ); } + return isset( $row ) ? $row->log_id : null; } } diff --git a/includes/specials/SpecialExport.php b/includes/specials/SpecialExport.php index 3a7e9cdf9de..513e7a96b85 100644 --- a/includes/specials/SpecialExport.php +++ b/includes/specials/SpecialExport.php @@ -23,7 +23,6 @@ * @ingroup SpecialPage */ -use MediaWiki\MediaWikiServices; use MediaWiki\Logger\LoggerFactory; /** @@ -379,23 +378,10 @@ class SpecialExport extends SpecialPage { } /* Ok, let's get to it... */ - if ( $history == WikiExporter::CURRENT ) { - $lb = false; - $db = wfGetDB( DB_REPLICA ); - $buffer = WikiExporter::BUFFER; - } else { - // Use an unbuffered query; histories may be very long! - $lb = MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->newMainLB(); - $db = $lb->getConnection( DB_REPLICA ); - $buffer = WikiExporter::STREAM; + $lb = false; + $db = wfGetDB( DB_REPLICA ); - // This might take a while... :D - Wikimedia\suppressWarnings(); - set_time_limit( 0 ); - Wikimedia\restoreWarnings(); - } - - $exporter = new WikiExporter( $db, $history, $buffer ); + $exporter = new WikiExporter( $db, $history ); $exporter->list_authors = $list_authors; $exporter->openStream(); diff --git a/maintenance/includes/BackupDumper.php b/maintenance/includes/BackupDumper.php index e8993e42007..4c2b64c9035 100644 --- a/maintenance/includes/BackupDumper.php +++ b/maintenance/includes/BackupDumper.php @@ -257,7 +257,7 @@ abstract class BackupDumper extends Maintenance { $this->initProgress( $history ); $db = $this->backupDb(); - $exporter = new WikiExporter( $db, $history, WikiExporter::STREAM, $text ); + $exporter = new WikiExporter( $db, $history, $text ); $exporter->dumpUploads = $this->dumpUploads; $exporter->dumpUploadFileContents = $this->dumpUploadFileContents;