Convert push() into an actor.

This commit is contained in:
Alex Miller 2019-05-07 13:48:41 -10:00
parent 68c773987c
commit a269a784cc

View File

@ -270,68 +270,73 @@ public:
Future<Void> truncateFile(int file, int64_t pos) { return truncateFile(this, file, pos); }
Future<Void> push(StringRef pageData, vector<Reference<SyncQueue>>& toSync) {
Future<Void> push(StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
return push( this, pageData, toSync );
}
ACTOR static Future<Void> push(RawDiskQueue_TwoFiles* self, StringRef pageData, vector<Reference<SyncQueue>>* toSync) {
// Write the given data to the queue files, swapping or extending them if necessary.
// Don't do any syncs, but push the modified file(s) onto toSync.
ASSERT( readingFile == 2 );
ASSERT( self->readingFile == 2 );
ASSERT( pageData.size() % _PAGE_SIZE == 0 );
ASSERT( int64_t(pageData.begin()) % _PAGE_SIZE == 0 );
ASSERT( writingPos % _PAGE_SIZE == 0 );
ASSERT( files[0].size % _PAGE_SIZE == 0 && files[1].size % _PAGE_SIZE == 0 );
ASSERT( self->writingPos % _PAGE_SIZE == 0 );
ASSERT( self->files[0].size % _PAGE_SIZE == 0 && self->files[1].size % _PAGE_SIZE == 0 );
vector<Future<Void>> waitfor;
if (pageData.size() + writingPos > files[1].size) {
if ( files[0].popped == files[0].size ) {
// Finish files[1] and swap
int p = files[1].size - writingPos;
if (pageData.size() + self->writingPos > self->files[1].size) {
if ( self->files[0].popped == self->files[0].size ) {
// Finish self->files[1] and swap
int p = self->files[1].size - self->writingPos;
if(p > 0) {
toSync.push_back( files[1].syncQueue );
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
.detail("WritingPos", writingPos).detail("WritingBytes", p);*/
waitfor.push_back( files[1].f->write( pageData.begin(), p, writingPos ) );
toSync->push_back( self->files[1].syncQueue );
/*TraceEvent("RDQWriteAndSwap", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("WritingPos", self->writingPos).detail("WritingBytes", p);*/
waitfor.push_back( self->files[1].f->write( pageData.begin(), p, self->writingPos ) );
pageData = pageData.substr( p );
}
dbg_file0BeginSeq += files[0].size;
std::swap(files[0], files[1]);
std::swap(firstPages[0], firstPages[1]);
files[1].popped = 0;
writingPos = 0;
self->dbg_file0BeginSeq += self->files[0].size;
std::swap(self->files[0], self->files[1]);
std::swap(self->firstPages[0], self->firstPages[1]);
self->files[1].popped = 0;
self->writingPos = 0;
const int64_t activeDataVolume = pageCeiling(files[0].size - files[0].popped + fileExtensionBytes + fileShrinkBytes);
if (files[1].size > activeDataVolume) {
// Either shrink files[1] to the size of files[0], or chop off fileShrinkBytes
int64_t maxShrink = std::max( pageFloor(files[1].size - activeDataVolume), fileShrinkBytes );
files[1].size -= maxShrink;
waitfor.push_back( files[1].f->truncate( files[1].size ) );
const int64_t activeDataVolume = pageCeiling(self->files[0].size - self->files[0].popped + self->fileExtensionBytes + self->fileShrinkBytes);
if (self->files[1].size > activeDataVolume) {
// Either shrink self->files[1] to the size of self->files[0], or chop off fileShrinkBytes
int64_t maxShrink = std::max( pageFloor(self->files[1].size - activeDataVolume), self->fileShrinkBytes );
self->files[1].size -= maxShrink;
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
}
} else {
// Extend files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
// Extend self->files[1] to accomodate the new write and about 10MB or 2x current size for future writes.
/*TraceEvent("RDQExtend", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("ExtensionBytes", fileExtensionBytes);*/
int64_t minExtension = pageData.size() + writingPos - files[1].size;
files[1].size += std::min(std::max(fileExtensionBytes, minExtension), files[0].size+files[1].size+minExtension);
waitfor.push_back( files[1].f->truncate( files[1].size ) );
int64_t minExtension = pageData.size() + self->writingPos - self->files[1].size;
self->files[1].size += std::min(std::max(self->fileExtensionBytes, minExtension), self->files[0].size+self->files[1].size+minExtension);
waitfor.push_back( self->files[1].f->truncate( self->files[1].size ) );
if(fileSizeWarningLimit > 0 && files[1].size > fileSizeWarningLimit) {
TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", dbgid).suppressFor(1.0).detail("Filename", filename(1)).detail("Size", files[1].size);
if(self->fileSizeWarningLimit > 0 && self->files[1].size > self->fileSizeWarningLimit) {
TraceEvent(SevWarnAlways, "DiskQueueFileTooLarge", self->dbgid).suppressFor(1.0).detail("Filename", self->filename(1)).detail("Size", self->files[1].size);
}
}
}
if (writingPos == 0) {
*firstPages[1] = *(const Page*)pageData.begin();
if (self->writingPos == 0) {
*self->firstPages[1] = *(const Page*)pageData.begin();
}
/*TraceEvent("RDQWrite", this->dbgid).detail("File1name", files[1].dbgFilename).detail("File1size", files[1].size)
.detail("WritingPos", writingPos).detail("WritingBytes", pageData.size());*/
files[1].size = std::max( files[1].size, writingPos + pageData.size() );
toSync.push_back( files[1].syncQueue );
waitfor.push_back( files[1].f->write( pageData.begin(), pageData.size(), writingPos ) );
writingPos += pageData.size();
/*TraceEvent("RDQWrite", this->dbgid).detail("File1name", self->files[1].dbgFilename).detail("File1size", self->files[1].size)
.detail("WritingPos", self->writingPos).detail("WritingBytes", pageData.size());*/
self->files[1].size = std::max( self->files[1].size, self->writingPos + pageData.size() );
toSync->push_back( self->files[1].syncQueue );
waitfor.push_back( self->files[1].f->write( pageData.begin(), pageData.size(), self->writingPos ) );
self->writingPos += pageData.size();
return waitForAll(waitfor);
wait( waitForAll(waitfor) );
return Void();
}
ACTOR static UNCANCELLABLE Future<Void> pushAndCommit(RawDiskQueue_TwoFiles* self, StringRef pageData, StringBuffer* pageMem, uint64_t poppedPages) {
@ -358,7 +363,7 @@ public:
TEST( pageData.size() > sizeof(Page) ); // push more than one page of data
Future<Void> pushed = self->push( pageData, syncFiles );
Future<Void> pushed = self->push( pageData, &syncFiles );
pushing.send(Void());
ASSERT( syncFiles.size() >= 1 && syncFiles.size() <= 2 );
TEST(2==syncFiles.size()); // push spans both files