Avoid large truncates in the DiskQueue.

And instead create a new file while incrementally truncating the old one
down.  This avoids queueing up a massive number of filesystem metadata
operations in one call, thus flooding the disk with requests and
stalling out all other filesystem operations.

This sets the knobs so that a truncate of >10GB causes us to create a
new file rather than trying to truncate the old one.
This commit is contained in:
Alex Miller 2019-05-07 21:01:29 -10:00
parent 36dfbf4fb3
commit 0685e6c1c7
3 changed files with 35 additions and 4 deletions

View File

@ -270,6 +270,27 @@ public:
Future<Void> truncateFile(int file, int64_t pos) { return truncateFile(this, file, pos); }
// FIXME: Merge this function with IAsyncFileSystem::incrementalDeleteFile().
ACTOR static void incrementalTruncate(Reference<IAsyncFile> file) {
state int64_t remainingFileSize = wait( file->size() );
for( ; remainingFileSize > 0; remainingFileSize -= FLOW_KNOBS->INCREMENTAL_DELETE_TRUNCATE_AMOUNT ){
wait(file->truncate(remainingFileSize));
wait(file->sync());
wait(delay(FLOW_KNOBS->INCREMENTAL_DELETE_INTERVAL));
}
}
ACTOR static Future<Reference<IAsyncFile>> replaceFile(Reference<IAsyncFile> toReplace) {
incrementalTruncate( toReplace );
Reference<IAsyncFile> _replacement = wait( IAsyncFileSystem::filesystem()->open( toReplace->getFilename(), IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_UNBUFFERED | IAsyncFile::OPEN_LOCK, 0 ) );
state Reference<IAsyncFile> replacement = _replacement;
wait( replacement->sync() );
return replacement;
}
Future<Void> push(StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
return push( this, pageData, toSync );
}
@ -283,7 +304,7 @@ public:
ASSERT( self->writingPos % _PAGE_SIZE == 0 );
ASSERT( self->files[0].size % _PAGE_SIZE == 0 && self->files[1].size % _PAGE_SIZE == 0 );
vector<Future<Void>> waitfor;
state vector<Future<Void>> waitfor;
if (pageData.size() + self->writingPos > self->files[1].size) {
if ( self->files[0].popped == self->files[0].size ) {
@ -308,8 +329,16 @@ public:
if (self->files[1].size > desiredMaxFileSize) {
// Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes
int64_t maxShrink = std::max( pageFloor(self->files[1].size - desiredMaxFileSize), self->fileShrinkBytes );
self->files[1].size -= maxShrink;
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
if (maxShrink / SERVER_KNOBS->DISK_QUEUE_FILE_EXTENSION_BYTES >
SERVER_KNOBS->DISK_QUEUE_MAX_TRUNCATE_EXTENTS) {
TEST(true); // Replacing DiskQueue file
Reference<IAsyncFile> newFile = wait( replaceFile(self->files[1].f) );
self->files[1].setFile(newFile);
self->files[1].size = 0;
} else {
self->files[1].size -= maxShrink;
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
}
}
} else {
// Extend self->files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
@ -366,9 +395,9 @@ public:
Future<Void> pushed = self->push( pageData, &syncFiles );
pushing.send(Void());
wait( pushed );
ASSERT( syncFiles.size() >= 1 && syncFiles.size() <= 2 );
TEST(2==syncFiles.size()); // push spans both files
wait( pushed );
delete pageMem;
pageMem = 0;

View File

@ -75,6 +75,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH, 16<<10 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH = 500;
init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_MAX_TRUNCATE_EXTENTS, 1<<10 ); if ( randomize && BUGGIFY ) DISK_QUEUE_MAX_TRUNCATE_EXTENTS = 0;
init( TLOG_DEGRADED_DELAY_COUNT, 5 );
init( TLOG_DEGRADED_DURATION, 5.0 );

View File

@ -79,6 +79,7 @@ public:
int64_t TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH;
int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow?
int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink?
int DISK_QUEUE_MAX_TRUNCATE_EXTENTS;
int TLOG_DEGRADED_DELAY_COUNT;
double TLOG_DEGRADED_DURATION;