mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-28 10:52:03 +08:00
Extract methods in LogSystem.h to corresponding cpp file
This commit is contained in:
parent
ecca4edeb4
commit
067c1cc55b
@ -49,6 +49,7 @@ set(FDBSERVER_SRCS
|
|||||||
LocalConfiguration.h
|
LocalConfiguration.h
|
||||||
LogProtocolMessage.h
|
LogProtocolMessage.h
|
||||||
LogRouter.actor.cpp
|
LogRouter.actor.cpp
|
||||||
|
LogSystem.cpp
|
||||||
LogSystem.h
|
LogSystem.h
|
||||||
LogSystemConfig.h
|
LogSystemConfig.h
|
||||||
LogSystemDiskQueueAdapter.actor.cpp
|
LogSystemDiskQueueAdapter.actor.cpp
|
||||||
|
@ -415,7 +415,7 @@ void peekMessagesFromMemory(LogRouterData* self, Tag tag, Version begin, BinaryW
|
|||||||
auto it = std::lower_bound(deque.begin(),
|
auto it = std::lower_bound(deque.begin(),
|
||||||
deque.end(),
|
deque.end(),
|
||||||
std::make_pair(begin, LengthPrefixedStringRef()),
|
std::make_pair(begin, LengthPrefixedStringRef()),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
[](const auto& l, const auto& r) -> bool { return l.first < r.first; });
|
||||||
|
|
||||||
Version currentVersion = -1;
|
Version currentVersion = -1;
|
||||||
for (; it != deque.end(); ++it) {
|
for (; it != deque.end(); ++it) {
|
||||||
|
346
fdbserver/LogSystem.cpp
Normal file
346
fdbserver/LogSystem.cpp
Normal file
@ -0,0 +1,346 @@
|
|||||||
|
/*
|
||||||
|
* LogSystem.cpp
|
||||||
|
*
|
||||||
|
* This source file is part of the FoundationDB open source project
|
||||||
|
*
|
||||||
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "fdbserver/LogSystem.h"
|
||||||
|
|
||||||
|
std::string LogSet::logRouterString() {
|
||||||
|
std::string result;
|
||||||
|
for (int i = 0; i < logRouters.size(); i++) {
|
||||||
|
if (i > 0) {
|
||||||
|
result += ", ";
|
||||||
|
}
|
||||||
|
result += logRouters[i]->get().id().toString();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool LogSet::hasLogRouter(UID id) const {
|
||||||
|
for (const auto& router : logRouters) {
|
||||||
|
if (router->get().id() == id) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool LogSet::hasBackupWorker(UID id) const {
|
||||||
|
for (const auto& worker : backupWorkers) {
|
||||||
|
if (worker->get().id() == id) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string LogSet::logServerString() {
|
||||||
|
std::string result;
|
||||||
|
for (int i = 0; i < logServers.size(); i++) {
|
||||||
|
if (i > 0) {
|
||||||
|
result += ", ";
|
||||||
|
}
|
||||||
|
result += logServers[i]->get().id().toString();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
void LogSet::populateSatelliteTagLocations(int logRouterTags, int oldLogRouterTags, int txsTags, int oldTxsTags) {
|
||||||
|
satelliteTagLocations.clear();
|
||||||
|
satelliteTagLocations.resize(std::max({ logRouterTags, oldLogRouterTags, txsTags, oldTxsTags }) + 1);
|
||||||
|
|
||||||
|
std::map<int, int> server_usedBest;
|
||||||
|
std::set<std::pair<int, int>> used_servers;
|
||||||
|
for (int i = 0; i < tLogLocalities.size(); i++) {
|
||||||
|
used_servers.insert(std::make_pair(0, i));
|
||||||
|
}
|
||||||
|
|
||||||
|
Reference<LocalitySet> serverSet = Reference<LocalitySet>(new LocalityMap<std::pair<int, int>>());
|
||||||
|
LocalityMap<std::pair<int, int>>* serverMap = (LocalityMap<std::pair<int, int>>*)serverSet.getPtr();
|
||||||
|
std::vector<std::pair<int, int>> resultPairs;
|
||||||
|
for (int loc = 0; loc < satelliteTagLocations.size(); loc++) {
|
||||||
|
int team = loc;
|
||||||
|
if (loc < logRouterTags) {
|
||||||
|
team = loc + 1;
|
||||||
|
} else if (loc == logRouterTags) {
|
||||||
|
team = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool teamComplete = false;
|
||||||
|
alsoServers.resize(1);
|
||||||
|
serverMap->clear();
|
||||||
|
resultPairs.clear();
|
||||||
|
for (auto& used_idx : used_servers) {
|
||||||
|
auto entry = serverMap->add(tLogLocalities[used_idx.second], &used_idx);
|
||||||
|
if (!resultPairs.size()) {
|
||||||
|
resultPairs.push_back(used_idx);
|
||||||
|
alsoServers[0] = entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
resultEntries.clear();
|
||||||
|
if (serverSet->selectReplicas(tLogPolicy, alsoServers, resultEntries)) {
|
||||||
|
for (auto& entry : resultEntries) {
|
||||||
|
resultPairs.push_back(*serverMap->getObject(entry));
|
||||||
|
}
|
||||||
|
int firstBestUsed = server_usedBest[resultPairs[0].second];
|
||||||
|
for (int i = 1; i < resultPairs.size(); i++) {
|
||||||
|
int thisBestUsed = server_usedBest[resultPairs[i].second];
|
||||||
|
if (thisBestUsed < firstBestUsed) {
|
||||||
|
std::swap(resultPairs[0], resultPairs[i]);
|
||||||
|
firstBestUsed = thisBestUsed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
server_usedBest[resultPairs[0].second]++;
|
||||||
|
|
||||||
|
for (auto& res : resultPairs) {
|
||||||
|
satelliteTagLocations[team].push_back(res.second);
|
||||||
|
used_servers.erase(res);
|
||||||
|
res.first++;
|
||||||
|
used_servers.insert(res);
|
||||||
|
}
|
||||||
|
teamComplete = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(teamComplete);
|
||||||
|
}
|
||||||
|
|
||||||
|
checkSatelliteTagLocations();
|
||||||
|
}
|
||||||
|
|
||||||
|
void LogSet::checkSatelliteTagLocations() {
|
||||||
|
std::vector<int> usedBest;
|
||||||
|
std::vector<int> used;
|
||||||
|
usedBest.resize(tLogLocalities.size());
|
||||||
|
used.resize(tLogLocalities.size());
|
||||||
|
for (auto team : satelliteTagLocations) {
|
||||||
|
usedBest[team[0]]++;
|
||||||
|
for (auto loc : team) {
|
||||||
|
used[loc]++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int minUsedBest = satelliteTagLocations.size();
|
||||||
|
int maxUsedBest = 0;
|
||||||
|
for (auto i : usedBest) {
|
||||||
|
minUsedBest = std::min(minUsedBest, i);
|
||||||
|
maxUsedBest = std::max(maxUsedBest, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
int minUsed = satelliteTagLocations.size();
|
||||||
|
int maxUsed = 0;
|
||||||
|
for (auto i : used) {
|
||||||
|
minUsed = std::min(minUsed, i);
|
||||||
|
maxUsed = std::max(maxUsed, i);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool foundDuplicate = false;
|
||||||
|
std::set<Optional<Key>> zones;
|
||||||
|
std::set<Optional<Key>> dcs;
|
||||||
|
for (auto& loc : tLogLocalities) {
|
||||||
|
if (zones.count(loc.zoneId())) {
|
||||||
|
foundDuplicate = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
zones.insert(loc.zoneId());
|
||||||
|
dcs.insert(loc.dcId());
|
||||||
|
}
|
||||||
|
bool moreThanOneDC = dcs.size() > 1 ? true : false;
|
||||||
|
|
||||||
|
TraceEvent(((maxUsed - minUsed > 1) || (maxUsedBest - minUsedBest > 1))
|
||||||
|
? (g_network->isSimulated() && !foundDuplicate && !moreThanOneDC ? SevError : SevWarnAlways)
|
||||||
|
: SevInfo,
|
||||||
|
"CheckSatelliteTagLocations")
|
||||||
|
.detail("MinUsed", minUsed)
|
||||||
|
.detail("MaxUsed", maxUsed)
|
||||||
|
.detail("MinUsedBest", minUsedBest)
|
||||||
|
.detail("MaxUsedBest", maxUsedBest)
|
||||||
|
.detail("DuplicateZones", foundDuplicate)
|
||||||
|
.detail("NumOfDCs", dcs.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
int LogSet::bestLocationFor(Tag tag) {
|
||||||
|
if (locality == tagLocalitySatellite) {
|
||||||
|
return satelliteTagLocations[tag == txsTag ? 0 : tag.id + 1][0];
|
||||||
|
}
|
||||||
|
|
||||||
|
// the following logic supports upgrades from 5.X
|
||||||
|
if (tag == txsTag)
|
||||||
|
return txsTagOld % logServers.size();
|
||||||
|
return tag.id % logServers.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
void LogSet::updateLocalitySet(std::vector<LocalityData> const& localities) {
|
||||||
|
LocalityMap<int>* logServerMap;
|
||||||
|
|
||||||
|
logServerSet = Reference<LocalitySet>(new LocalityMap<int>());
|
||||||
|
logServerMap = (LocalityMap<int>*)logServerSet.getPtr();
|
||||||
|
|
||||||
|
logEntryArray.clear();
|
||||||
|
logEntryArray.reserve(localities.size());
|
||||||
|
logIndexArray.clear();
|
||||||
|
logIndexArray.reserve(localities.size());
|
||||||
|
|
||||||
|
for (int i = 0; i < localities.size(); i++) {
|
||||||
|
logIndexArray.push_back(i);
|
||||||
|
logEntryArray.push_back(logServerMap->add(localities[i], &logIndexArray.back()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool LogSet::satisfiesPolicy(const std::vector<LocalityEntry>& locations) {
|
||||||
|
resultEntries.clear();
|
||||||
|
|
||||||
|
// Run the policy, assert if unable to satify
|
||||||
|
bool result = logServerSet->selectReplicas(tLogPolicy, locations, resultEntries);
|
||||||
|
ASSERT(result);
|
||||||
|
|
||||||
|
return resultEntries.size() == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void LogSet::getPushLocations(VectorRef<Tag> tags, std::vector<int>& locations, int locationOffset, bool allLocations) {
|
||||||
|
if (locality == tagLocalitySatellite) {
|
||||||
|
for (auto& t : tags) {
|
||||||
|
if (t == txsTag || t.locality == tagLocalityTxs || t.locality == tagLocalityLogRouter) {
|
||||||
|
for (int loc : satelliteTagLocations[t == txsTag ? 0 : t.id + 1]) {
|
||||||
|
locations.push_back(locationOffset + loc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
uniquify(locations);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
newLocations.clear();
|
||||||
|
alsoServers.clear();
|
||||||
|
resultEntries.clear();
|
||||||
|
|
||||||
|
if (allLocations) {
|
||||||
|
// special handling for allLocations
|
||||||
|
TraceEvent("AllLocationsSet").log();
|
||||||
|
for (int i = 0; i < logServers.size(); i++) {
|
||||||
|
newLocations.push_back(i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (auto& t : tags) {
|
||||||
|
if (locality == tagLocalitySpecial || t.locality == locality || t.locality < 0) {
|
||||||
|
newLocations.push_back(bestLocationFor(t));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
uniquify(newLocations);
|
||||||
|
|
||||||
|
if (newLocations.size())
|
||||||
|
alsoServers.reserve(newLocations.size());
|
||||||
|
|
||||||
|
// Convert locations to the also servers
|
||||||
|
for (auto location : newLocations) {
|
||||||
|
locations.push_back(locationOffset + location);
|
||||||
|
alsoServers.push_back(logEntryArray[location]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the policy, assert if unable to satify
|
||||||
|
bool result = logServerSet->selectReplicas(tLogPolicy, alsoServers, resultEntries);
|
||||||
|
ASSERT(result);
|
||||||
|
|
||||||
|
// Add the new servers to the location array
|
||||||
|
LocalityMap<int>* logServerMap = (LocalityMap<int>*)logServerSet.getPtr();
|
||||||
|
for (auto entry : resultEntries) {
|
||||||
|
locations.push_back(locationOffset + *logServerMap->getObject(entry));
|
||||||
|
}
|
||||||
|
//TraceEvent("GetPushLocations").detail("Policy", tLogPolicy->info())
|
||||||
|
// .detail("Results", locations.size()).detail("Selection", logServerSet->size())
|
||||||
|
// .detail("Included", alsoServers.size()).detail("Duration", timer() - t);
|
||||||
|
}
|
||||||
|
|
||||||
|
void LogPushData::addTxsTag() {
|
||||||
|
if (logSystem->getTLogVersion() >= TLogVersion::V4) {
|
||||||
|
next_message_tags.push_back(logSystem->getRandomTxsTag());
|
||||||
|
} else {
|
||||||
|
next_message_tags.push_back(txsTag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void LogPushData::addTransactionInfo(SpanID const& context) {
|
||||||
|
TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID
|
||||||
|
spanContext = context;
|
||||||
|
writtenLocations.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void LogPushData::writeMessage(StringRef rawMessageWithoutLength, bool usePreviousLocations) {
|
||||||
|
if (!usePreviousLocations) {
|
||||||
|
prev_tags.clear();
|
||||||
|
if (logSystem->hasRemoteLogs()) {
|
||||||
|
prev_tags.push_back(logSystem->getRandomRouterTag());
|
||||||
|
}
|
||||||
|
for (auto& tag : next_message_tags) {
|
||||||
|
prev_tags.push_back(tag);
|
||||||
|
}
|
||||||
|
msg_locations.clear();
|
||||||
|
logSystem->getPushLocations(prev_tags, msg_locations);
|
||||||
|
next_message_tags.clear();
|
||||||
|
}
|
||||||
|
uint32_t subseq = this->subsequence++;
|
||||||
|
uint32_t msgsize =
|
||||||
|
rawMessageWithoutLength.size() + sizeof(subseq) + sizeof(uint16_t) + sizeof(Tag) * prev_tags.size();
|
||||||
|
for (int loc : msg_locations) {
|
||||||
|
BinaryWriter& wr = messagesWriter[loc];
|
||||||
|
wr << msgsize << subseq << uint16_t(prev_tags.size());
|
||||||
|
for (auto& tag : prev_tags)
|
||||||
|
wr << tag;
|
||||||
|
wr.serializeBytes(rawMessageWithoutLength);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void LogPushData::recordEmptyMessage(int loc, const Standalone<StringRef>& value) {
|
||||||
|
if (!isEmptyMessage[loc]) {
|
||||||
|
BinaryWriter w(AssumeVersion(g_network->protocolVersion()));
|
||||||
|
Standalone<StringRef> v = w.toValue();
|
||||||
|
if (value.size() > v.size()) {
|
||||||
|
isEmptyMessage[loc] = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
float LogPushData::getEmptyMessageRatio() const {
|
||||||
|
auto count = std::count(isEmptyMessage.begin(), isEmptyMessage.end(), false);
|
||||||
|
ASSERT_WE_THINK(isEmptyMessage.size() > 0);
|
||||||
|
return 1.0 * count / isEmptyMessage.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool LogPushData::writeTransactionInfo(int location, uint32_t subseq) {
|
||||||
|
if (!FLOW_KNOBS->WRITE_TRACING_ENABLED || logSystem->getTLogVersion() < TLogVersion::V6 ||
|
||||||
|
writtenLocations.count(location) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(true); // Wrote SpanContextMessage to a transaction log
|
||||||
|
writtenLocations.insert(location);
|
||||||
|
|
||||||
|
BinaryWriter& wr = messagesWriter[location];
|
||||||
|
SpanContextMessage contextMessage(spanContext);
|
||||||
|
|
||||||
|
int offset = wr.getLength();
|
||||||
|
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
|
||||||
|
for (auto& tag : prev_tags)
|
||||||
|
wr << tag;
|
||||||
|
wr << contextMessage;
|
||||||
|
int length = wr.getLength() - offset;
|
||||||
|
*(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t);
|
||||||
|
return true;
|
||||||
|
}
|
@ -41,6 +41,8 @@
|
|||||||
struct DBCoreState;
|
struct DBCoreState;
|
||||||
struct TLogSet;
|
struct TLogSet;
|
||||||
struct CoreTLogSet;
|
struct CoreTLogSet;
|
||||||
|
struct LogPushData;
|
||||||
|
struct LocalityData;
|
||||||
|
|
||||||
struct ConnectionResetInfo : public ReferenceCounted<ConnectionResetInfo> {
|
struct ConnectionResetInfo : public ReferenceCounted<ConnectionResetInfo> {
|
||||||
double lastReset;
|
double lastReset;
|
||||||
@ -79,256 +81,28 @@ public:
|
|||||||
LogSet(const TLogSet& tlogSet);
|
LogSet(const TLogSet& tlogSet);
|
||||||
LogSet(const CoreTLogSet& coreSet);
|
LogSet(const CoreTLogSet& coreSet);
|
||||||
|
|
||||||
std::string logRouterString() {
|
std::string logRouterString();
|
||||||
std::string result;
|
|
||||||
for (int i = 0; i < logRouters.size(); i++) {
|
|
||||||
if (i > 0) {
|
|
||||||
result += ", ";
|
|
||||||
}
|
|
||||||
result += logRouters[i]->get().id().toString();
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool hasLogRouter(UID id) const {
|
bool hasLogRouter(UID id) const;
|
||||||
for (const auto& router : logRouters) {
|
|
||||||
if (router->get().id() == id) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool hasBackupWorker(UID id) const {
|
bool hasBackupWorker(UID id) const;
|
||||||
for (const auto& worker : backupWorkers) {
|
|
||||||
if (worker->get().id() == id) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string logServerString() {
|
std::string logServerString();
|
||||||
std::string result;
|
|
||||||
for (int i = 0; i < logServers.size(); i++) {
|
|
||||||
if (i > 0) {
|
|
||||||
result += ", ";
|
|
||||||
}
|
|
||||||
result += logServers[i]->get().id().toString();
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
void populateSatelliteTagLocations(int logRouterTags, int oldLogRouterTags, int txsTags, int oldTxsTags) {
|
void populateSatelliteTagLocations(int logRouterTags, int oldLogRouterTags, int txsTags, int oldTxsTags);
|
||||||
satelliteTagLocations.clear();
|
|
||||||
satelliteTagLocations.resize(std::max({ logRouterTags, oldLogRouterTags, txsTags, oldTxsTags }) + 1);
|
|
||||||
|
|
||||||
std::map<int, int> server_usedBest;
|
void checkSatelliteTagLocations();
|
||||||
std::set<std::pair<int, int>> used_servers;
|
|
||||||
for (int i = 0; i < tLogLocalities.size(); i++) {
|
|
||||||
used_servers.insert(std::make_pair(0, i));
|
|
||||||
}
|
|
||||||
|
|
||||||
Reference<LocalitySet> serverSet = Reference<LocalitySet>(new LocalityMap<std::pair<int, int>>());
|
int bestLocationFor(Tag tag);
|
||||||
LocalityMap<std::pair<int, int>>* serverMap = (LocalityMap<std::pair<int, int>>*)serverSet.getPtr();
|
|
||||||
std::vector<std::pair<int, int>> resultPairs;
|
|
||||||
for (int loc = 0; loc < satelliteTagLocations.size(); loc++) {
|
|
||||||
int team = loc;
|
|
||||||
if (loc < logRouterTags) {
|
|
||||||
team = loc + 1;
|
|
||||||
} else if (loc == logRouterTags) {
|
|
||||||
team = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool teamComplete = false;
|
void updateLocalitySet(std::vector<LocalityData> const& localities);
|
||||||
alsoServers.resize(1);
|
|
||||||
serverMap->clear();
|
|
||||||
resultPairs.clear();
|
|
||||||
for (auto& used_idx : used_servers) {
|
|
||||||
auto entry = serverMap->add(tLogLocalities[used_idx.second], &used_idx);
|
|
||||||
if (!resultPairs.size()) {
|
|
||||||
resultPairs.push_back(used_idx);
|
|
||||||
alsoServers[0] = entry;
|
|
||||||
}
|
|
||||||
|
|
||||||
resultEntries.clear();
|
bool satisfiesPolicy(const std::vector<LocalityEntry>& locations);
|
||||||
if (serverSet->selectReplicas(tLogPolicy, alsoServers, resultEntries)) {
|
|
||||||
for (auto& entry : resultEntries) {
|
|
||||||
resultPairs.push_back(*serverMap->getObject(entry));
|
|
||||||
}
|
|
||||||
int firstBestUsed = server_usedBest[resultPairs[0].second];
|
|
||||||
for (int i = 1; i < resultPairs.size(); i++) {
|
|
||||||
int thisBestUsed = server_usedBest[resultPairs[i].second];
|
|
||||||
if (thisBestUsed < firstBestUsed) {
|
|
||||||
std::swap(resultPairs[0], resultPairs[i]);
|
|
||||||
firstBestUsed = thisBestUsed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
server_usedBest[resultPairs[0].second]++;
|
|
||||||
|
|
||||||
for (auto& res : resultPairs) {
|
|
||||||
satelliteTagLocations[team].push_back(res.second);
|
|
||||||
used_servers.erase(res);
|
|
||||||
res.first++;
|
|
||||||
used_servers.insert(res);
|
|
||||||
}
|
|
||||||
teamComplete = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ASSERT(teamComplete);
|
|
||||||
}
|
|
||||||
|
|
||||||
checkSatelliteTagLocations();
|
|
||||||
}
|
|
||||||
|
|
||||||
void checkSatelliteTagLocations() {
|
|
||||||
std::vector<int> usedBest;
|
|
||||||
std::vector<int> used;
|
|
||||||
usedBest.resize(tLogLocalities.size());
|
|
||||||
used.resize(tLogLocalities.size());
|
|
||||||
for (auto team : satelliteTagLocations) {
|
|
||||||
usedBest[team[0]]++;
|
|
||||||
for (auto loc : team) {
|
|
||||||
used[loc]++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int minUsedBest = satelliteTagLocations.size();
|
|
||||||
int maxUsedBest = 0;
|
|
||||||
for (auto i : usedBest) {
|
|
||||||
minUsedBest = std::min(minUsedBest, i);
|
|
||||||
maxUsedBest = std::max(maxUsedBest, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
int minUsed = satelliteTagLocations.size();
|
|
||||||
int maxUsed = 0;
|
|
||||||
for (auto i : used) {
|
|
||||||
minUsed = std::min(minUsed, i);
|
|
||||||
maxUsed = std::max(maxUsed, i);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool foundDuplicate = false;
|
|
||||||
std::set<Optional<Key>> zones;
|
|
||||||
std::set<Optional<Key>> dcs;
|
|
||||||
for (auto& loc : tLogLocalities) {
|
|
||||||
if (zones.count(loc.zoneId())) {
|
|
||||||
foundDuplicate = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
zones.insert(loc.zoneId());
|
|
||||||
dcs.insert(loc.dcId());
|
|
||||||
}
|
|
||||||
bool moreThanOneDC = dcs.size() > 1 ? true : false;
|
|
||||||
|
|
||||||
TraceEvent(((maxUsed - minUsed > 1) || (maxUsedBest - minUsedBest > 1))
|
|
||||||
? (g_network->isSimulated() && !foundDuplicate && !moreThanOneDC ? SevError : SevWarnAlways)
|
|
||||||
: SevInfo,
|
|
||||||
"CheckSatelliteTagLocations")
|
|
||||||
.detail("MinUsed", minUsed)
|
|
||||||
.detail("MaxUsed", maxUsed)
|
|
||||||
.detail("MinUsedBest", minUsedBest)
|
|
||||||
.detail("MaxUsedBest", maxUsedBest)
|
|
||||||
.detail("DuplicateZones", foundDuplicate)
|
|
||||||
.detail("NumOfDCs", dcs.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
int bestLocationFor(Tag tag) {
|
|
||||||
if (locality == tagLocalitySatellite) {
|
|
||||||
return satelliteTagLocations[tag == txsTag ? 0 : tag.id + 1][0];
|
|
||||||
}
|
|
||||||
|
|
||||||
// the following logic supports upgrades from 5.X
|
|
||||||
if (tag == txsTag)
|
|
||||||
return txsTagOld % logServers.size();
|
|
||||||
return tag.id % logServers.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
void updateLocalitySet(std::vector<LocalityData> const& localities) {
|
|
||||||
LocalityMap<int>* logServerMap;
|
|
||||||
|
|
||||||
logServerSet = Reference<LocalitySet>(new LocalityMap<int>());
|
|
||||||
logServerMap = (LocalityMap<int>*)logServerSet.getPtr();
|
|
||||||
|
|
||||||
logEntryArray.clear();
|
|
||||||
logEntryArray.reserve(localities.size());
|
|
||||||
logIndexArray.clear();
|
|
||||||
logIndexArray.reserve(localities.size());
|
|
||||||
|
|
||||||
for (int i = 0; i < localities.size(); i++) {
|
|
||||||
logIndexArray.push_back(i);
|
|
||||||
logEntryArray.push_back(logServerMap->add(localities[i], &logIndexArray.back()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool satisfiesPolicy(const std::vector<LocalityEntry>& locations) {
|
|
||||||
resultEntries.clear();
|
|
||||||
|
|
||||||
// Run the policy, assert if unable to satify
|
|
||||||
bool result = logServerSet->selectReplicas(tLogPolicy, locations, resultEntries);
|
|
||||||
ASSERT(result);
|
|
||||||
|
|
||||||
return resultEntries.size() == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void getPushLocations(VectorRef<Tag> tags,
|
void getPushLocations(VectorRef<Tag> tags,
|
||||||
std::vector<int>& locations,
|
std::vector<int>& locations,
|
||||||
int locationOffset,
|
int locationOffset,
|
||||||
bool allLocations = false) {
|
bool allLocations = false);
|
||||||
if (locality == tagLocalitySatellite) {
|
|
||||||
for (auto& t : tags) {
|
|
||||||
if (t == txsTag || t.locality == tagLocalityTxs || t.locality == tagLocalityLogRouter) {
|
|
||||||
for (int loc : satelliteTagLocations[t == txsTag ? 0 : t.id + 1]) {
|
|
||||||
locations.push_back(locationOffset + loc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
uniquify(locations);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
newLocations.clear();
|
|
||||||
alsoServers.clear();
|
|
||||||
resultEntries.clear();
|
|
||||||
|
|
||||||
if (allLocations) {
|
|
||||||
// special handling for allLocations
|
|
||||||
TraceEvent("AllLocationsSet").log();
|
|
||||||
for (int i = 0; i < logServers.size(); i++) {
|
|
||||||
newLocations.push_back(i);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (auto& t : tags) {
|
|
||||||
if (locality == tagLocalitySpecial || t.locality == locality || t.locality < 0) {
|
|
||||||
newLocations.push_back(bestLocationFor(t));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
uniquify(newLocations);
|
|
||||||
|
|
||||||
if (newLocations.size())
|
|
||||||
alsoServers.reserve(newLocations.size());
|
|
||||||
|
|
||||||
// Convert locations to the also servers
|
|
||||||
for (auto location : newLocations) {
|
|
||||||
locations.push_back(locationOffset + location);
|
|
||||||
alsoServers.push_back(logEntryArray[location]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run the policy, assert if unable to satify
|
|
||||||
bool result = logServerSet->selectReplicas(tLogPolicy, alsoServers, resultEntries);
|
|
||||||
ASSERT(result);
|
|
||||||
|
|
||||||
// Add the new servers to the location array
|
|
||||||
LocalityMap<int>* logServerMap = (LocalityMap<int>*)logServerSet.getPtr();
|
|
||||||
for (auto entry : resultEntries) {
|
|
||||||
locations.push_back(locationOffset + *logServerMap->getObject(entry));
|
|
||||||
}
|
|
||||||
//TraceEvent("GetPushLocations").detail("Policy", tLogPolicy->info())
|
|
||||||
// .detail("Results", locations.size()).detail("Selection", logServerSet->size())
|
|
||||||
// .detail("Included", alsoServers.size()).detail("Duration", timer() - t);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<LocalityEntry> alsoServers, resultEntries;
|
std::vector<LocalityEntry> alsoServers, resultEntries;
|
||||||
@ -743,7 +517,7 @@ struct ILogSystem {
|
|||||||
Version version,
|
Version version,
|
||||||
Version knownCommittedVersion,
|
Version knownCommittedVersion,
|
||||||
Version minKnownCommittedVersion,
|
Version minKnownCommittedVersion,
|
||||||
struct LogPushData& data,
|
LogPushData& data,
|
||||||
SpanID const& spanContext,
|
SpanID const& spanContext,
|
||||||
Optional<UID> debugID = Optional<UID>()) = 0;
|
Optional<UID> debugID = Optional<UID>()) = 0;
|
||||||
// Waits for the version number of the bundle (in this epoch) to be prevVersion (i.e. for all pushes ordered
|
// Waits for the version number of the bundle (in this epoch) to be prevVersion (i.e. for all pushes ordered
|
||||||
@ -812,13 +586,13 @@ struct ILogSystem {
|
|||||||
|
|
||||||
static Reference<ILogSystem> fromServerDBInfo(
|
static Reference<ILogSystem> fromServerDBInfo(
|
||||||
UID const& dbgid,
|
UID const& dbgid,
|
||||||
struct ServerDBInfo const& db,
|
ServerDBInfo const& db,
|
||||||
bool useRecoveredAt = false,
|
bool useRecoveredAt = false,
|
||||||
Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>());
|
Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>());
|
||||||
static Reference<ILogSystem> fromLogSystemConfig(
|
static Reference<ILogSystem> fromLogSystemConfig(
|
||||||
UID const& dbgid,
|
UID const& dbgid,
|
||||||
struct LocalityData const&,
|
LocalityData const&,
|
||||||
struct LogSystemConfig const&,
|
LogSystemConfig const&,
|
||||||
bool excludeRemote = false,
|
bool excludeRemote = false,
|
||||||
bool useRecoveredAt = false,
|
bool useRecoveredAt = false,
|
||||||
Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>());
|
Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>());
|
||||||
@ -826,9 +600,7 @@ struct ILogSystem {
|
|||||||
// reference if there isn't a fully recovered log system available. The caller can peek() the returned log system
|
// reference if there isn't a fully recovered log system available. The caller can peek() the returned log system
|
||||||
// and can push() if it has version numbers reserved for it and prevVersions
|
// and can push() if it has version numbers reserved for it and prevVersions
|
||||||
|
|
||||||
static Reference<ILogSystem> fromOldLogSystemConfig(UID const& dbgid,
|
static Reference<ILogSystem> fromOldLogSystemConfig(UID const& dbgid, LocalityData const&, LogSystemConfig const&);
|
||||||
struct LocalityData const&,
|
|
||||||
struct LogSystemConfig const&);
|
|
||||||
// Constructs a new ILogSystem implementation from the old log data within a ServerDBInfo/LogSystemConfig. Might
|
// Constructs a new ILogSystem implementation from the old log data within a ServerDBInfo/LogSystemConfig. Might
|
||||||
// return a null reference if there isn't a fully recovered log system available.
|
// return a null reference if there isn't a fully recovered log system available.
|
||||||
|
|
||||||
@ -863,7 +635,7 @@ struct ILogSystem {
|
|||||||
virtual std::map<LogEpoch, EpochTagsVersionsInfo> getOldEpochTagsVersionsInfo() const = 0;
|
virtual std::map<LogEpoch, EpochTagsVersionsInfo> getOldEpochTagsVersionsInfo() const = 0;
|
||||||
|
|
||||||
virtual Future<Reference<ILogSystem>> newEpoch(
|
virtual Future<Reference<ILogSystem>> newEpoch(
|
||||||
struct RecruitFromConfigurationReply const& recr,
|
RecruitFromConfigurationReply const& recr,
|
||||||
Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers,
|
Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers,
|
||||||
DatabaseConfiguration const& config,
|
DatabaseConfiguration const& config,
|
||||||
LogEpoch recoveryCount,
|
LogEpoch recoveryCount,
|
||||||
@ -948,11 +720,6 @@ struct LengthPrefixedStringRef {
|
|||||||
LengthPrefixedStringRef(uint32_t* length) : length(length) {}
|
LengthPrefixedStringRef(uint32_t* length) : length(length) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
template <class T>
|
|
||||||
struct CompareFirst {
|
|
||||||
bool operator()(T const& lhs, T const& rhs) const { return lhs.first < rhs.first; }
|
|
||||||
};
|
|
||||||
|
|
||||||
// Structure to store serialized mutations sent from the proxy to the
|
// Structure to store serialized mutations sent from the proxy to the
|
||||||
// transaction logs. The serialization repeats with the following format:
|
// transaction logs. The serialization repeats with the following format:
|
||||||
//
|
//
|
||||||
@ -980,13 +747,7 @@ struct LogPushData : NonCopyable {
|
|||||||
isEmptyMessage = std::vector<bool>(messagesWriter.size(), false);
|
isEmptyMessage = std::vector<bool>(messagesWriter.size(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addTxsTag() {
|
void addTxsTag();
|
||||||
if (logSystem->getTLogVersion() >= TLogVersion::V4) {
|
|
||||||
next_message_tags.push_back(logSystem->getRandomTxsTag());
|
|
||||||
} else {
|
|
||||||
next_message_tags.push_back(txsTag);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// addTag() adds a tag for the *next* message to be added
|
// addTag() adds a tag for the *next* message to be added
|
||||||
void addTag(Tag tag) { next_message_tags.push_back(tag); }
|
void addTag(Tag tag) { next_message_tags.push_back(tag); }
|
||||||
@ -997,125 +758,22 @@ struct LogPushData : NonCopyable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add transaction info to be written before the first mutation in the transaction.
|
// Add transaction info to be written before the first mutation in the transaction.
|
||||||
void addTransactionInfo(SpanID const& context) {
|
void addTransactionInfo(SpanID const& context);
|
||||||
TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID
|
|
||||||
spanContext = context;
|
|
||||||
writtenLocations.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
void writeMessage(StringRef rawMessageWithoutLength, bool usePreviousLocations) {
|
void writeMessage(StringRef rawMessageWithoutLength, bool usePreviousLocations);
|
||||||
if (!usePreviousLocations) {
|
|
||||||
prev_tags.clear();
|
|
||||||
if (logSystem->hasRemoteLogs()) {
|
|
||||||
prev_tags.push_back(logSystem->getRandomRouterTag());
|
|
||||||
}
|
|
||||||
for (auto& tag : next_message_tags) {
|
|
||||||
prev_tags.push_back(tag);
|
|
||||||
}
|
|
||||||
msg_locations.clear();
|
|
||||||
logSystem->getPushLocations(prev_tags, msg_locations);
|
|
||||||
next_message_tags.clear();
|
|
||||||
}
|
|
||||||
uint32_t subseq = this->subsequence++;
|
|
||||||
uint32_t msgsize =
|
|
||||||
rawMessageWithoutLength.size() + sizeof(subseq) + sizeof(uint16_t) + sizeof(Tag) * prev_tags.size();
|
|
||||||
for (int loc : msg_locations) {
|
|
||||||
BinaryWriter& wr = messagesWriter[loc];
|
|
||||||
wr << msgsize << subseq << uint16_t(prev_tags.size());
|
|
||||||
for (auto& tag : prev_tags)
|
|
||||||
wr << tag;
|
|
||||||
wr.serializeBytes(rawMessageWithoutLength);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
void writeTypedMessage(T const& item, bool metadataMessage = false, bool allLocations = false) {
|
void writeTypedMessage(T const& item, bool metadataMessage = false, bool allLocations = false);
|
||||||
prev_tags.clear();
|
|
||||||
if (logSystem->hasRemoteLogs()) {
|
|
||||||
prev_tags.push_back(logSystem->getRandomRouterTag());
|
|
||||||
}
|
|
||||||
for (auto& tag : next_message_tags) {
|
|
||||||
prev_tags.push_back(tag);
|
|
||||||
}
|
|
||||||
msg_locations.clear();
|
|
||||||
logSystem->getPushLocations(prev_tags, msg_locations, allLocations);
|
|
||||||
|
|
||||||
BinaryWriter bw(AssumeVersion(g_network->protocolVersion()));
|
|
||||||
|
|
||||||
// Metadata messages (currently LogProtocolMessage is the only metadata
|
|
||||||
// message) should be written before span information. If this isn't a
|
|
||||||
// metadata message, make sure all locations have had transaction info
|
|
||||||
// written to them. Mutations may have different sets of tags, so it
|
|
||||||
// is necessary to check all tag locations each time a mutation is
|
|
||||||
// written.
|
|
||||||
if (!metadataMessage) {
|
|
||||||
uint32_t subseq = this->subsequence++;
|
|
||||||
bool updatedLocation = false;
|
|
||||||
for (int loc : msg_locations) {
|
|
||||||
updatedLocation = writeTransactionInfo(loc, subseq) || updatedLocation;
|
|
||||||
}
|
|
||||||
// If this message doesn't write to any new locations, the
|
|
||||||
// subsequence wasn't actually used and can be decremented.
|
|
||||||
if (!updatedLocation) {
|
|
||||||
this->subsequence--;
|
|
||||||
TEST(true); // No new SpanContextMessage written to transaction logs
|
|
||||||
ASSERT(this->subsequence > 0);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// When writing a metadata message, make sure transaction state has
|
|
||||||
// been reset. If you are running into this assertion, make sure
|
|
||||||
// you are calling addTransactionInfo before each transaction.
|
|
||||||
ASSERT(writtenLocations.size() == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t subseq = this->subsequence++;
|
|
||||||
bool first = true;
|
|
||||||
int firstOffset = -1, firstLength = -1;
|
|
||||||
for (int loc : msg_locations) {
|
|
||||||
BinaryWriter& wr = messagesWriter[loc];
|
|
||||||
|
|
||||||
if (first) {
|
|
||||||
firstOffset = wr.getLength();
|
|
||||||
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
|
|
||||||
for (auto& tag : prev_tags)
|
|
||||||
wr << tag;
|
|
||||||
wr << item;
|
|
||||||
firstLength = wr.getLength() - firstOffset;
|
|
||||||
*(uint32_t*)((uint8_t*)wr.getData() + firstOffset) = firstLength - sizeof(uint32_t);
|
|
||||||
DEBUG_TAGS_AND_MESSAGE("ProxyPushLocations",
|
|
||||||
invalidVersion,
|
|
||||||
StringRef(((uint8_t*)wr.getData() + firstOffset), firstLength))
|
|
||||||
.detail("PushLocations", msg_locations);
|
|
||||||
first = false;
|
|
||||||
} else {
|
|
||||||
BinaryWriter& from = messagesWriter[msg_locations[0]];
|
|
||||||
wr.serializeBytes((uint8_t*)from.getData() + firstOffset, firstLength);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
next_message_tags.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
Standalone<StringRef> getMessages(int loc) { return messagesWriter[loc].toValue(); }
|
Standalone<StringRef> getMessages(int loc) { return messagesWriter[loc].toValue(); }
|
||||||
|
|
||||||
// Records if a tlog (specified by "loc") will receive an empty version batch message.
|
// Records if a tlog (specified by "loc") will receive an empty version batch message.
|
||||||
// "value" is the message returned by getMessages() call.
|
// "value" is the message returned by getMessages() call.
|
||||||
void recordEmptyMessage(int loc, const Standalone<StringRef>& value) {
|
void recordEmptyMessage(int loc, const Standalone<StringRef>& value);
|
||||||
if (!isEmptyMessage[loc]) {
|
|
||||||
BinaryWriter w(AssumeVersion(g_network->protocolVersion()));
|
|
||||||
Standalone<StringRef> v = w.toValue();
|
|
||||||
if (value.size() > v.size()) {
|
|
||||||
isEmptyMessage[loc] = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns the ratio of empty messages in this version batch.
|
// Returns the ratio of empty messages in this version batch.
|
||||||
// MUST be called after getMessages() and recordEmptyMessage().
|
// MUST be called after getMessages() and recordEmptyMessage().
|
||||||
float getEmptyMessageRatio() const {
|
float getEmptyMessageRatio() const;
|
||||||
auto count = std::count(isEmptyMessage.begin(), isEmptyMessage.end(), false);
|
|
||||||
ASSERT_WE_THINK(isEmptyMessage.size() > 0);
|
|
||||||
return 1.0 * count / isEmptyMessage.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Reference<ILogSystem> logSystem;
|
Reference<ILogSystem> logSystem;
|
||||||
@ -1135,27 +793,73 @@ private:
|
|||||||
// it has not already been written (for the current transaction). Returns
|
// it has not already been written (for the current transaction). Returns
|
||||||
// true on a successful write, and false if the location has already been
|
// true on a successful write, and false if the location has already been
|
||||||
// written.
|
// written.
|
||||||
bool writeTransactionInfo(int location, uint32_t subseq) {
|
bool writeTransactionInfo(int location, uint32_t subseq);
|
||||||
if (!FLOW_KNOBS->WRITE_TRACING_ENABLED || logSystem->getTLogVersion() < TLogVersion::V6 ||
|
|
||||||
writtenLocations.count(location) != 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST(true); // Wrote SpanContextMessage to a transaction log
|
|
||||||
writtenLocations.insert(location);
|
|
||||||
|
|
||||||
BinaryWriter& wr = messagesWriter[location];
|
|
||||||
SpanContextMessage contextMessage(spanContext);
|
|
||||||
|
|
||||||
int offset = wr.getLength();
|
|
||||||
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
|
|
||||||
for (auto& tag : prev_tags)
|
|
||||||
wr << tag;
|
|
||||||
wr << contextMessage;
|
|
||||||
int length = wr.getLength() - offset;
|
|
||||||
*(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
template <class T>
|
||||||
|
void LogPushData::writeTypedMessage(T const& item, bool metadataMessage, bool allLocations) {
|
||||||
|
prev_tags.clear();
|
||||||
|
if (logSystem->hasRemoteLogs()) {
|
||||||
|
prev_tags.push_back(logSystem->getRandomRouterTag());
|
||||||
|
}
|
||||||
|
for (auto& tag : next_message_tags) {
|
||||||
|
prev_tags.push_back(tag);
|
||||||
|
}
|
||||||
|
msg_locations.clear();
|
||||||
|
logSystem->getPushLocations(prev_tags, msg_locations, allLocations);
|
||||||
|
|
||||||
|
BinaryWriter bw(AssumeVersion(g_network->protocolVersion()));
|
||||||
|
|
||||||
|
// Metadata messages (currently LogProtocolMessage is the only metadata
|
||||||
|
// message) should be written before span information. If this isn't a
|
||||||
|
// metadata message, make sure all locations have had transaction info
|
||||||
|
// written to them. Mutations may have different sets of tags, so it
|
||||||
|
// is necessary to check all tag locations each time a mutation is
|
||||||
|
// written.
|
||||||
|
if (!metadataMessage) {
|
||||||
|
uint32_t subseq = this->subsequence++;
|
||||||
|
bool updatedLocation = false;
|
||||||
|
for (int loc : msg_locations) {
|
||||||
|
updatedLocation = writeTransactionInfo(loc, subseq) || updatedLocation;
|
||||||
|
}
|
||||||
|
// If this message doesn't write to any new locations, the
|
||||||
|
// subsequence wasn't actually used and can be decremented.
|
||||||
|
if (!updatedLocation) {
|
||||||
|
this->subsequence--;
|
||||||
|
TEST(true); // No new SpanContextMessage written to transaction logs
|
||||||
|
ASSERT(this->subsequence > 0);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// When writing a metadata message, make sure transaction state has
|
||||||
|
// been reset. If you are running into this assertion, make sure
|
||||||
|
// you are calling addTransactionInfo before each transaction.
|
||||||
|
ASSERT(writtenLocations.size() == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t subseq = this->subsequence++;
|
||||||
|
bool first = true;
|
||||||
|
int firstOffset = -1, firstLength = -1;
|
||||||
|
for (int loc : msg_locations) {
|
||||||
|
BinaryWriter& wr = messagesWriter[loc];
|
||||||
|
|
||||||
|
if (first) {
|
||||||
|
firstOffset = wr.getLength();
|
||||||
|
wr << uint32_t(0) << subseq << uint16_t(prev_tags.size());
|
||||||
|
for (auto& tag : prev_tags)
|
||||||
|
wr << tag;
|
||||||
|
wr << item;
|
||||||
|
firstLength = wr.getLength() - firstOffset;
|
||||||
|
*(uint32_t*)((uint8_t*)wr.getData() + firstOffset) = firstLength - sizeof(uint32_t);
|
||||||
|
DEBUG_TAGS_AND_MESSAGE(
|
||||||
|
"ProxyPushLocations", invalidVersion, StringRef(((uint8_t*)wr.getData() + firstOffset), firstLength))
|
||||||
|
.detail("PushLocations", msg_locations);
|
||||||
|
first = false;
|
||||||
|
} else {
|
||||||
|
BinaryWriter& from = messagesWriter[msg_locations[0]];
|
||||||
|
wr.serializeBytes((uint8_t*)from.getData() + firstOffset, firstLength);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
next_message_tags.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // FDBSERVER_LOGSYSTEM_H
|
||||||
|
@ -595,7 +595,7 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
|
|||||||
msg = std::upper_bound(tag->value.version_messages.begin(),
|
msg = std::upper_bound(tag->value.version_messages.begin(),
|
||||||
tag->value.version_messages.end(),
|
tag->value.version_messages.end(),
|
||||||
std::make_pair(currentVersion, LengthPrefixedStringRef()),
|
std::make_pair(currentVersion, LengthPrefixedStringRef()),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
[](const auto& l, const auto& r) { return l.first < r.first; });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -748,7 +748,7 @@ ACTOR Future<Void> updateStorage(TLogData* self) {
|
|||||||
auto it = std::lower_bound(tag->value.version_messages.begin(),
|
auto it = std::lower_bound(tag->value.version_messages.begin(),
|
||||||
tag->value.version_messages.end(),
|
tag->value.version_messages.end(),
|
||||||
std::make_pair(prevVersion, LengthPrefixedStringRef()),
|
std::make_pair(prevVersion, LengthPrefixedStringRef()),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
[](const auto& l, const auto& r) { return l.first < r.first; });
|
||||||
for (; it != tag->value.version_messages.end() && it->first < nextVersion; ++it) {
|
for (; it != tag->value.version_messages.end() && it->first < nextVersion; ++it) {
|
||||||
totalSize += it->second.expectedSize();
|
totalSize += it->second.expectedSize();
|
||||||
}
|
}
|
||||||
@ -945,7 +945,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
|
|||||||
auto it = std::lower_bound(deque.begin(),
|
auto it = std::lower_bound(deque.begin(),
|
||||||
deque.end(),
|
deque.end(),
|
||||||
std::make_pair(begin, LengthPrefixedStringRef()),
|
std::make_pair(begin, LengthPrefixedStringRef()),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
[](const auto& l, const auto& r) { return l.first < r.first; });
|
||||||
|
|
||||||
Version currentVersion = -1;
|
Version currentVersion = -1;
|
||||||
for (; it != deque.end(); ++it) {
|
for (; it != deque.end(); ++it) {
|
||||||
|
@ -747,7 +747,7 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
|
|||||||
msg = std::upper_bound(tagData->versionMessages.begin(),
|
msg = std::upper_bound(tagData->versionMessages.begin(),
|
||||||
tagData->versionMessages.end(),
|
tagData->versionMessages.end(),
|
||||||
std::make_pair(currentVersion, LengthPrefixedStringRef()),
|
std::make_pair(currentVersion, LengthPrefixedStringRef()),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
[](const auto& l, const auto& r) { return l.first < r.first; });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1187,7 +1187,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
|
|||||||
auto it = std::lower_bound(deque.begin(),
|
auto it = std::lower_bound(deque.begin(),
|
||||||
deque.end(),
|
deque.end(),
|
||||||
std::make_pair(begin, LengthPrefixedStringRef()),
|
std::make_pair(begin, LengthPrefixedStringRef()),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
[](const auto& l, const auto& r) { return l.first < r.first; });
|
||||||
|
|
||||||
Version currentVersion = -1;
|
Version currentVersion = -1;
|
||||||
for (; it != deque.end(); ++it) {
|
for (; it != deque.end(); ++it) {
|
||||||
|
@ -1007,7 +1007,7 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
|
|||||||
msg = std::upper_bound(tagData->versionMessages.begin(),
|
msg = std::upper_bound(tagData->versionMessages.begin(),
|
||||||
tagData->versionMessages.end(),
|
tagData->versionMessages.end(),
|
||||||
std::make_pair(currentVersion, LengthPrefixedStringRef()),
|
std::make_pair(currentVersion, LengthPrefixedStringRef()),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
[](const auto& l, const auto& r) { return l.first < r.first; });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1504,7 +1504,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
|
|||||||
auto it = std::lower_bound(deque.begin(),
|
auto it = std::lower_bound(deque.begin(),
|
||||||
deque.end(),
|
deque.end(),
|
||||||
std::make_pair(begin, LengthPrefixedStringRef()),
|
std::make_pair(begin, LengthPrefixedStringRef()),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
[](const auto& l, const auto& r) { return l.first < r.first; });
|
||||||
|
|
||||||
Version currentVersion = -1;
|
Version currentVersion = -1;
|
||||||
for (; it != deque.end(); ++it) {
|
for (; it != deque.end(); ++it) {
|
||||||
|
@ -1027,10 +1027,11 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
|
|||||||
Future<Void> f = yield(TaskPriority::UpdateStorage);
|
Future<Void> f = yield(TaskPriority::UpdateStorage);
|
||||||
if (!f.isReady()) {
|
if (!f.isReady()) {
|
||||||
wait(f);
|
wait(f);
|
||||||
msg = std::upper_bound(tagData->versionMessages.begin(),
|
msg = std::upper_bound(
|
||||||
tagData->versionMessages.end(),
|
tagData->versionMessages.begin(),
|
||||||
std::make_pair(currentVersion, LengthPrefixedStringRef()),
|
tagData->versionMessages.end(),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
std::make_pair(currentVersion, LengthPrefixedStringRef()),
|
||||||
|
[](const auto& l, const auto& r) -> bool { return l.first < r.first; });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1535,7 +1536,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
|
|||||||
auto it = std::lower_bound(deque.begin(),
|
auto it = std::lower_bound(deque.begin(),
|
||||||
deque.end(),
|
deque.end(),
|
||||||
std::make_pair(begin, LengthPrefixedStringRef()),
|
std::make_pair(begin, LengthPrefixedStringRef()),
|
||||||
CompareFirst<std::pair<Version, LengthPrefixedStringRef>>());
|
[](const auto& l, const auto& r) -> bool { return l.first < r.first; });
|
||||||
|
|
||||||
Version currentVersion = -1;
|
Version currentVersion = -1;
|
||||||
for (; it != deque.end(); ++it) {
|
for (; it != deque.end(); ++it) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user