mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-03 03:41:53 +08:00
Merge pull request #431 from ajbeamon/tlog-rename-variables
Rename several variables in TLogServer.actor.cpp to follow our normal camel case conventions.
This commit is contained in:
commit
953c27e570
@ -283,43 +283,43 @@ struct TLogData : NonCopyable {
|
||||
|
||||
struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||
struct TagData : NonCopyable, public ReferenceCounted<TagData> {
|
||||
std::deque<std::pair<Version, LengthPrefixedStringRef>> version_messages;
|
||||
bool nothing_persistent; // true means tag is *known* to have no messages in persistentData. false means nothing.
|
||||
bool popped_recently; // `popped` has changed since last updatePersistentData
|
||||
std::deque<std::pair<Version, LengthPrefixedStringRef>> versionMessages;
|
||||
bool nothingPersistent; // true means tag is *known* to have no messages in persistentData. false means nothing.
|
||||
bool poppedRecently; // `popped` has changed since last updatePersistentData
|
||||
Version popped; // see popped version tracking contract below
|
||||
bool update_version_sizes;
|
||||
bool updateVersionSizes;
|
||||
bool unpoppedRecovered;
|
||||
Tag tag;
|
||||
|
||||
TagData( Tag tag, Version popped, bool nothing_persistent, bool popped_recently, bool unpoppedRecovered ) : tag(tag), nothing_persistent(nothing_persistent), popped(popped), popped_recently(popped_recently), unpoppedRecovered(unpoppedRecovered), update_version_sizes(tag != txsTag) {}
|
||||
TagData( Tag tag, Version popped, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered ) : tag(tag), nothingPersistent(nothingPersistent), popped(popped), poppedRecently(poppedRecently), unpoppedRecovered(unpoppedRecovered), updateVersionSizes(tag != txsTag) {}
|
||||
|
||||
TagData(TagData&& r) noexcept(true) : version_messages(std::move(r.version_messages)), nothing_persistent(r.nothing_persistent), popped_recently(r.popped_recently), popped(r.popped), update_version_sizes(r.update_version_sizes), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
|
||||
TagData(TagData&& r) noexcept(true) : versionMessages(std::move(r.versionMessages)), nothingPersistent(r.nothingPersistent), poppedRecently(r.poppedRecently), popped(r.popped), updateVersionSizes(r.updateVersionSizes), tag(r.tag), unpoppedRecovered(r.unpoppedRecovered) {}
|
||||
void operator= (TagData&& r) noexcept(true) {
|
||||
version_messages = std::move(r.version_messages);
|
||||
nothing_persistent = r.nothing_persistent;
|
||||
popped_recently = r.popped_recently;
|
||||
versionMessages = std::move(r.versionMessages);
|
||||
nothingPersistent = r.nothingPersistent;
|
||||
poppedRecently = r.poppedRecently;
|
||||
popped = r.popped;
|
||||
update_version_sizes = r.update_version_sizes;
|
||||
updateVersionSizes = r.updateVersionSizes;
|
||||
tag = r.tag;
|
||||
unpoppedRecovered = r.unpoppedRecovered;
|
||||
}
|
||||
|
||||
// Erase messages not needed to update *from* versions >= before (thus, messages with toversion <= before)
|
||||
ACTOR Future<Void> eraseMessagesBefore( TagData *self, Version before, int64_t* gBytesErased, Reference<LogData> tlogData, int taskID ) {
|
||||
while(!self->version_messages.empty() && self->version_messages.front().first < before) {
|
||||
Version version = self->version_messages.front().first;
|
||||
while(!self->versionMessages.empty() && self->versionMessages.front().first < before) {
|
||||
Version version = self->versionMessages.front().first;
|
||||
std::pair<int, int> &sizes = tlogData->version_sizes[version];
|
||||
int64_t messagesErased = 0;
|
||||
|
||||
while(!self->version_messages.empty() && self->version_messages.front().first == version) {
|
||||
auto const& m = self->version_messages.front();
|
||||
while(!self->versionMessages.empty() && self->versionMessages.front().first == version) {
|
||||
auto const& m = self->versionMessages.front();
|
||||
++messagesErased;
|
||||
|
||||
if(self->update_version_sizes) {
|
||||
if(self->updateVersionSizes) {
|
||||
sizes.first -= m.second.expectedSize();
|
||||
}
|
||||
|
||||
self->version_messages.pop_front();
|
||||
self->versionMessages.pop_front();
|
||||
}
|
||||
|
||||
int64_t bytesErased = messagesErased * SERVER_KNOBS->VERSION_MESSAGES_ENTRY_BYTES_WITH_OVERHEAD;
|
||||
@ -336,7 +336,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||
}
|
||||
};
|
||||
|
||||
Map<Version, IDiskQueue::location> version_location; // For the version of each entry that was push()ed, the end location of the serialized bytes
|
||||
Map<Version, IDiskQueue::location> versionLocation; // For the version of each entry that was push()ed, the end location of the serialized bytes
|
||||
|
||||
/*
|
||||
Popped version tracking contract needed by log system to implement ILogCursor::popped():
|
||||
@ -378,8 +378,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||
}
|
||||
|
||||
//only callable after getTagData returns a null reference
|
||||
Reference<TagData> createTagData(Tag tag, Version popped, bool nothing_persistent, bool popped_recently, bool unpoppedRecovered) {
|
||||
Reference<TagData> newTagData = Reference<TagData>( new TagData(tag, popped, nothing_persistent, popped_recently, unpoppedRecovered) );
|
||||
Reference<TagData> createTagData(Tag tag, Version popped, bool nothingPersistent, bool poppedRecently, bool unpoppedRecovered) {
|
||||
Reference<TagData> newTagData = Reference<TagData>( new TagData(tag, popped, nothingPersistent, poppedRecently, unpoppedRecovered) );
|
||||
int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality);
|
||||
tag_data[idx][tag.id] = newTagData;
|
||||
return newTagData;
|
||||
@ -468,28 +468,28 @@ void TLogQueue::push( T const& qe, Reference<LogData> logData ) {
|
||||
*(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t);
|
||||
auto loc = queue->push( wr.toStringRef() );
|
||||
//TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc);
|
||||
logData->version_location[qe.version] = loc;
|
||||
logData->versionLocation[qe.version] = loc;
|
||||
}
|
||||
void TLogQueue::pop( Version upTo, Reference<LogData> logData ) {
|
||||
// Keep only the given and all subsequent version numbers
|
||||
// Find the first version >= upTo
|
||||
auto v = logData->version_location.lower_bound(upTo);
|
||||
if (v == logData->version_location.begin()) return;
|
||||
auto v = logData->versionLocation.lower_bound(upTo);
|
||||
if (v == logData->versionLocation.begin()) return;
|
||||
|
||||
if(v == logData->version_location.end()) {
|
||||
v = logData->version_location.lastItem();
|
||||
if(v == logData->versionLocation.end()) {
|
||||
v = logData->versionLocation.lastItem();
|
||||
}
|
||||
else {
|
||||
v.decrementNonEnd();
|
||||
}
|
||||
|
||||
queue->pop( v->value );
|
||||
logData->version_location.erase( logData->version_location.begin(), v ); // ... and then we erase that previous version and all prior versions
|
||||
logData->versionLocation.erase( logData->versionLocation.begin(), v ); // ... and then we erase that previous version and all prior versions
|
||||
}
|
||||
void TLogQueue::updateVersionSizes( const TLogQueueEntry& result, TLogData* tLog ) {
|
||||
auto it = tLog->id_data.find(result.id);
|
||||
if(it != tLog->id_data.end()) {
|
||||
it->second->version_location[result.version] = queue->getNextReadLocation();
|
||||
it->second->versionLocation[result.version] = queue->getNextReadLocation();
|
||||
}
|
||||
}
|
||||
|
||||
@ -523,17 +523,17 @@ ACTOR Future<Void> tLogLock( TLogData* self, ReplyPromise< TLogLockResult > repl
|
||||
}
|
||||
|
||||
void updatePersistentPopped( TLogData* self, Reference<LogData> logData, Reference<LogData::TagData> data ) {
|
||||
if (!data->popped_recently) return;
|
||||
if (!data->poppedRecently) return;
|
||||
self->persistentData->set(KeyValueRef( persistTagPoppedKey(logData->logId, data->tag), persistTagPoppedValue(data->popped) ));
|
||||
data->popped_recently = false;
|
||||
data->poppedRecently = false;
|
||||
|
||||
if (data->nothing_persistent) return;
|
||||
if (data->nothingPersistent) return;
|
||||
|
||||
self->persistentData->clear( KeyRangeRef(
|
||||
persistTagMessagesKey( logData->logId, data->tag, Version(0) ),
|
||||
persistTagMessagesKey( logData->logId, data->tag, data->popped ) ) );
|
||||
if (data->popped > logData->persistentDataVersion)
|
||||
data->nothing_persistent = true;
|
||||
data->nothingPersistent = true;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logData, Version newPersistentDataVersion ) {
|
||||
@ -548,25 +548,25 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
||||
state bool anyData = false;
|
||||
|
||||
// For all existing tags
|
||||
state int tag_locality = 0;
|
||||
state int tag_id = 0;
|
||||
state int tagLocality = 0;
|
||||
state int tagId = 0;
|
||||
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
state Reference<LogData::TagData> tagData = logData->tag_data[tag_locality][tag_id];
|
||||
for(tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
|
||||
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
state Reference<LogData::TagData> tagData = logData->tag_data[tagLocality][tagId];
|
||||
if(tagData) {
|
||||
state Version currentVersion = 0;
|
||||
// Clear recently popped versions from persistentData if necessary
|
||||
updatePersistentPopped( self, logData, tagData );
|
||||
// Transfer unpopped messages with version numbers less than newPersistentDataVersion to persistentData
|
||||
state std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator msg = tagData->version_messages.begin();
|
||||
while(msg != tagData->version_messages.end() && msg->first <= newPersistentDataVersion) {
|
||||
state std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator msg = tagData->versionMessages.begin();
|
||||
while(msg != tagData->versionMessages.end() && msg->first <= newPersistentDataVersion) {
|
||||
currentVersion = msg->first;
|
||||
anyData = true;
|
||||
tagData->nothing_persistent = false;
|
||||
tagData->nothingPersistent = false;
|
||||
BinaryWriter wr( Unversioned() );
|
||||
|
||||
for(; msg != tagData->version_messages.end() && msg->first == currentVersion; ++msg)
|
||||
for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg)
|
||||
wr << msg->second.toStringRef();
|
||||
|
||||
self->persistentData->set( KeyValueRef( persistTagMessagesKey( logData->logId, tagData->tag, currentVersion ), wr.toStringRef() ) );
|
||||
@ -574,7 +574,7 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
||||
Future<Void> f = yield(TaskUpdateStorage);
|
||||
if(!f.isReady()) {
|
||||
Void _ = wait(f);
|
||||
msg = std::upper_bound(tagData->version_messages.begin(), tagData->version_messages.end(), std::make_pair(currentVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
||||
msg = std::upper_bound(tagData->versionMessages.begin(), tagData->versionMessages.end(), std::make_pair(currentVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
||||
}
|
||||
}
|
||||
|
||||
@ -595,10 +595,10 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
||||
TEST(anyData); // TLog moved data to persistentData
|
||||
logData->persistentDataDurableVersion = newPersistentDataVersion;
|
||||
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
if(logData->tag_data[tag_locality][tag_id]) {
|
||||
Void _ = wait(logData->tag_data[tag_locality][tag_id]->eraseMessagesBefore( newPersistentDataVersion+1, &self->bytesDurable, logData, TaskUpdateStorage ));
|
||||
for(tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
|
||||
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
if(logData->tag_data[tagLocality][tagId]) {
|
||||
Void _ = wait(logData->tag_data[tagLocality][tagId]->eraseMessagesBefore( newPersistentDataVersion+1, &self->bytesDurable, logData, TaskUpdateStorage ));
|
||||
Void _ = wait(yield(TaskUpdateStorage));
|
||||
}
|
||||
}
|
||||
@ -647,8 +647,8 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
|
||||
state Version nextVersion = 0;
|
||||
state int totalSize = 0;
|
||||
|
||||
state int tag_locality = 0;
|
||||
state int tag_id = 0;
|
||||
state int tagLocality = 0;
|
||||
state int tagId = 0;
|
||||
state Reference<LogData::TagData> tagData;
|
||||
|
||||
if(logData->stopped) {
|
||||
@ -656,11 +656,11 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
|
||||
while(logData->persistentDataDurableVersion != logData->version.get()) {
|
||||
std::vector<std::pair<std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator, std::deque<std::pair<Version, LengthPrefixedStringRef>>::iterator>> iters;
|
||||
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
tagData = logData->tag_data[tag_locality][tag_id];
|
||||
for(tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
|
||||
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
tagData = logData->tag_data[tagLocality][tagId];
|
||||
if(tagData) {
|
||||
iters.push_back(std::make_pair(tagData->version_messages.begin(), tagData->version_messages.end()));
|
||||
iters.push_back(std::make_pair(tagData->versionMessages.begin(), tagData->versionMessages.end()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -718,12 +718,12 @@ ACTOR Future<Void> updateStorage( TLogData* self ) {
|
||||
++sizeItr;
|
||||
nextVersion = sizeItr == logData->version_sizes.end() ? logData->version.get() : sizeItr->key;
|
||||
|
||||
for(tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
tagData = logData->tag_data[tag_locality][tag_id];
|
||||
for(tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
|
||||
for(tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
tagData = logData->tag_data[tagLocality][tagId];
|
||||
if(tagData) {
|
||||
auto it = std::lower_bound(tagData->version_messages.begin(), tagData->version_messages.end(), std::make_pair(prevVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
||||
for(; it != tagData->version_messages.end() && it->first < nextVersion; ++it) {
|
||||
auto it = std::lower_bound(tagData->versionMessages.begin(), tagData->versionMessages.end(), std::make_pair(prevVersion, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
||||
for(; it != tagData->versionMessages.end() && it->first < nextVersion; ++it) {
|
||||
totalSize += it->second.expectedSize();
|
||||
}
|
||||
|
||||
@ -820,12 +820,12 @@ void commitMessages( Reference<LogData> self, Version version, const std::vector
|
||||
}
|
||||
|
||||
if (version >= tagData->popped) {
|
||||
tagData->version_messages.push_back(std::make_pair(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size()))));
|
||||
if(tagData->version_messages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) {
|
||||
TraceEvent(SevWarnAlways, "LargeMessage").detail("Size", tagData->version_messages.back().second.expectedSize());
|
||||
tagData->versionMessages.push_back(std::make_pair(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size()))));
|
||||
if(tagData->versionMessages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) {
|
||||
TraceEvent(SevWarnAlways, "LargeMessage").detail("Size", tagData->versionMessages.back().second.expectedSize());
|
||||
}
|
||||
if (tag != txsTag) {
|
||||
expectedBytes += tagData->version_messages.back().second.expectedSize();
|
||||
expectedBytes += tagData->versionMessages.back().second.expectedSize();
|
||||
}
|
||||
|
||||
// The factor of VERSION_MESSAGES_OVERHEAD is intended to be an overestimate of the actual memory used to store this data in a std::deque.
|
||||
@ -878,13 +878,13 @@ Version poppedVersion( Reference<LogData> self, Tag tag) {
|
||||
return tagData->popped;
|
||||
}
|
||||
|
||||
std::deque<std::pair<Version, LengthPrefixedStringRef>> & get_version_messages( Reference<LogData> self, Tag tag ) {
|
||||
std::deque<std::pair<Version, LengthPrefixedStringRef>> & getVersionMessages( Reference<LogData> self, Tag tag ) {
|
||||
auto tagData = self->getTagData(tag);
|
||||
if (!tagData) {
|
||||
static std::deque<std::pair<Version, LengthPrefixedStringRef>> empty;
|
||||
return empty;
|
||||
}
|
||||
return tagData->version_messages;
|
||||
return tagData->versionMessages;
|
||||
};
|
||||
|
||||
ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogData> logData ) {
|
||||
@ -893,7 +893,7 @@ ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogDat
|
||||
tagData = logData->createTagData(req.tag, req.to, true, true, false);
|
||||
} else if (req.to > tagData->popped) {
|
||||
tagData->popped = req.to;
|
||||
tagData->popped_recently = true;
|
||||
tagData->poppedRecently = true;
|
||||
|
||||
if(tagData->unpoppedRecovered && req.to > logData->recoveredAt) {
|
||||
tagData->unpoppedRecovered = false;
|
||||
@ -916,7 +916,7 @@ ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogDat
|
||||
void peekMessagesFromMemory( Reference<LogData> self, TLogPeekRequest const& req, BinaryWriter& messages, Version& endVersion ) {
|
||||
ASSERT( !messages.getLength() );
|
||||
|
||||
auto& deque = get_version_messages(self, req.tag);
|
||||
auto& deque = getVersionMessages(self, req.tag);
|
||||
//TraceEvent("tLogPeekMem", self->dbgid).detail("Tag", printable(req.tag1)).detail("pDS", self->persistentDataSequence).detail("pDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size());
|
||||
|
||||
Version begin = std::max( req.begin, self->persistentDataDurableVersion+1 );
|
||||
@ -1948,9 +1948,9 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
||||
//so we need to pop all tags that did not have data at the recovery version.
|
||||
std::vector<Future<Void>> popFutures;
|
||||
std::set<Tag> allTags(req.allTags.begin(), req.allTags.end());
|
||||
for(int tag_locality = 0; tag_locality < logData->tag_data.size(); tag_locality++) {
|
||||
for(int tag_id = 0; tag_id < logData->tag_data[tag_locality].size(); tag_id++) {
|
||||
auto data = logData->tag_data[tag_locality][tag_id];
|
||||
for(int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
|
||||
for(int tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
auto data = logData->tag_data[tagLocality][tagId];
|
||||
if(data && !allTags.count(data->tag) && data->tag.locality != tagLocalityLogRouter) {
|
||||
TraceEvent("TLogPopOnRecover", self->dbgid).detail("logId", logData->logId).detail("tag", data->tag.toString()).detail("ver", req.recoverAt);
|
||||
popFutures.push_back(tLogPop(self, TLogPopRequest(req.recoverAt, 0, data->tag), logData));
|
||||
|
Loading…
x
Reference in New Issue
Block a user