mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-25 17:00:05 +08:00
Fixed a few bugs and added a backup popping mechanism
This commit is contained in:
parent
27151b0831
commit
4bbae59bb0
@ -1983,7 +1983,7 @@ ACTOR Future<Void> changeFeedList(Database db) {
|
||||
ASSERT(!result.more);
|
||||
printf("Found %d range feeds%s\n", result.size(), result.size() == 0 ? "." : ":");
|
||||
for (auto& it : result) {
|
||||
auto range = decodeChangeFeedValue(it.value);
|
||||
auto range = std::get<0>(decodeChangeFeedValue(it.value));
|
||||
printf(" %s: %s - %s\n",
|
||||
it.key.removePrefix(changeFeedPrefix).toString().c_str(),
|
||||
range.begin.toString().c_str(),
|
||||
|
@ -4319,8 +4319,8 @@ ACTOR Future<Void> registerChangeFeedActor(Transaction* tr, Key rangeID, KeyRang
|
||||
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
|
||||
Optional<Value> val = wait(tr->get(rangeIDKey));
|
||||
if (!val.present()) {
|
||||
tr->set(rangeIDKey, changeFeedValue(range));
|
||||
} else if (decodeChangeFeedValue(val.get()) != range) {
|
||||
tr->set(rangeIDKey, changeFeedValue(range, invalidVersion, false));
|
||||
} else if (std::get<0>(decodeChangeFeedValue(val.get())) != range) {
|
||||
throw unsupported_operation();
|
||||
}
|
||||
return Void();
|
||||
@ -6544,7 +6544,7 @@ ACTOR Future<Standalone<VectorRef<MutationsAndVersionRef>>> getChangeFeedMutatio
|
||||
if (!val.present()) {
|
||||
throw unsupported_operation();
|
||||
}
|
||||
KeyRange keys = decodeChangeFeedValue(val.get());
|
||||
KeyRange keys = std::get<0>(decodeChangeFeedValue(val.get()));
|
||||
state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
|
||||
wait(getKeyRangeLocations(cx,
|
||||
keys,
|
||||
@ -6706,7 +6706,7 @@ ACTOR Future<Void> getChangeFeedStreamActor(Reference<DatabaseContext> db,
|
||||
results.sendError(unsupported_operation());
|
||||
return Void();
|
||||
}
|
||||
keys = decodeChangeFeedValue(val.get());
|
||||
keys = std::get<0>(decodeChangeFeedValue(val.get()));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
@ -6853,31 +6853,44 @@ ACTOR Future<std::vector<std::pair<Key, KeyRange>>> getOverlappingChangeFeedsAct
|
||||
state Database cx(db);
|
||||
state Transaction tr(cx);
|
||||
state Span span("NAPI:GetOverlappingChangeFeeds"_loc);
|
||||
state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
|
||||
wait(getKeyRangeLocations(cx,
|
||||
range,
|
||||
1000,
|
||||
Reverse::False,
|
||||
&StorageServerInterface::changeFeed,
|
||||
TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
|
||||
|
||||
if (locations.size() >= 1000) {
|
||||
throw unsupported_operation();
|
||||
}
|
||||
loop {
|
||||
try {
|
||||
state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
|
||||
wait(getKeyRangeLocations(cx,
|
||||
range,
|
||||
1000,
|
||||
Reverse::False,
|
||||
&StorageServerInterface::overlappingChangeFeeds,
|
||||
TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
|
||||
|
||||
state std::vector<Future<std::vector<std::pair<Key, KeyRange>>>> allOverlappingRequests;
|
||||
for (auto& it : locations) {
|
||||
allOverlappingRequests.push_back(singleLocationOverlappingChangeFeeds(cx, it.second, range, minVersion));
|
||||
}
|
||||
wait(waitForAll(allOverlappingRequests));
|
||||
if (locations.size() >= 1000) {
|
||||
throw unsupported_operation();
|
||||
}
|
||||
|
||||
std::vector<std::pair<Key, KeyRange>> result;
|
||||
for (auto& it : allOverlappingRequests) {
|
||||
result.insert(result.end(), it.get().begin(), it.get().end());
|
||||
state std::vector<Future<std::vector<std::pair<Key, KeyRange>>>> allOverlappingRequests;
|
||||
for (auto& it : locations) {
|
||||
allOverlappingRequests.push_back(
|
||||
singleLocationOverlappingChangeFeeds(cx, it.second, it.first & range, minVersion));
|
||||
}
|
||||
wait(waitForAll(allOverlappingRequests));
|
||||
|
||||
std::vector<std::pair<Key, KeyRange>> result;
|
||||
for (auto& it : allOverlappingRequests) {
|
||||
result.insert(result.end(), it.get().begin(), it.get().end());
|
||||
}
|
||||
std::sort(result.begin(), result.end(), compareChangeFeedResult);
|
||||
result.resize(std::unique(result.begin(), result.end()) - result.begin());
|
||||
return result;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed) {
|
||||
cx->invalidateCache(range);
|
||||
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY));
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
std::sort(result.begin(), result.end(), compareChangeFeedResult);
|
||||
result.resize(std::unique(result.begin(), result.end()) - result.begin());
|
||||
return result;
|
||||
}
|
||||
|
||||
Future<std::vector<std::pair<Key, KeyRange>>> DatabaseContext::getOverlappingChangeFeeds(KeyRangeRef range,
|
||||
@ -6885,6 +6898,31 @@ Future<std::vector<std::pair<Key, KeyRange>>> DatabaseContext::getOverlappingCha
|
||||
return getOverlappingChangeFeedsActor(Reference<DatabaseContext>::addRef(this), range, minVersion);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> popChangeFeedBackup(Database cx, StringRef rangeID, Version version) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
state Key rangeIDKey = rangeID.withPrefix(changeFeedPrefix);
|
||||
Optional<Value> val = wait(tr.get(rangeIDKey));
|
||||
if (val.present()) {
|
||||
KeyRange range;
|
||||
Version popVersion;
|
||||
bool stopped;
|
||||
std::tie(range, popVersion, stopped) = decodeChangeFeedValue(val.get());
|
||||
if (version > popVersion) {
|
||||
tr.set(rangeIDKey, changeFeedValue(range, invalidVersion, stopped));
|
||||
}
|
||||
} else {
|
||||
throw unsupported_operation();
|
||||
}
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, StringRef rangeID, Version version) {
|
||||
state Database cx(db);
|
||||
state Transaction tr(cx);
|
||||
@ -6894,33 +6932,33 @@ ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, St
|
||||
if (!val.present()) {
|
||||
throw unsupported_operation();
|
||||
}
|
||||
KeyRange keys = decodeChangeFeedValue(val.get());
|
||||
KeyRange keys = std::get<0>(decodeChangeFeedValue(val.get()));
|
||||
state vector<pair<KeyRange, Reference<LocationInfo>>> locations =
|
||||
wait(getKeyRangeLocations(cx,
|
||||
keys,
|
||||
1000,
|
||||
3,
|
||||
Reverse::False,
|
||||
&StorageServerInterface::changeFeed,
|
||||
TransactionInfo(TaskPriority::DefaultEndpoint, span.context)));
|
||||
|
||||
if (locations.size() >= 1000) {
|
||||
throw unsupported_operation();
|
||||
if (locations.size() > 2) {
|
||||
wait(popChangeFeedBackup(cx, rangeID, version));
|
||||
return Void();
|
||||
}
|
||||
|
||||
state std::vector<StorageServerInterface> allInterfs;
|
||||
for (auto& it : locations) {
|
||||
for (int i = 0; i < it.second->size(); i++) {
|
||||
allInterfs.push_back(it.second->getInterface(i));
|
||||
}
|
||||
}
|
||||
uniquify(allInterfs);
|
||||
|
||||
// FIXME: lookup both the src and dest shards as of the pop version to ensure all locations are popped
|
||||
state std::vector<Future<Void>> popRequests;
|
||||
for (int i = 0; i < allInterfs.size(); i++) {
|
||||
popRequests.push_back(allInterfs[i].changeFeedPop.getReply(ChangeFeedPopRequest(rangeID, version)));
|
||||
for (int i = 0; i < locations.size(); i++) {
|
||||
for (int j = 0; j < locations[i].second->size(); j++) {
|
||||
popRequests.push_back(locations[i].second->getInterface(j).changeFeedPop.getReply(
|
||||
ChangeFeedPopRequest(rangeID, version, locations[i].first)));
|
||||
}
|
||||
}
|
||||
|
||||
choose {
|
||||
when(wait(waitForAll(popRequests))) {}
|
||||
when(wait(delay(5.0))) { wait(popChangeFeedBackup(cx, rangeID, version)); }
|
||||
}
|
||||
wait(waitForAll(popRequests));
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -723,14 +723,16 @@ struct ChangeFeedPopRequest {
|
||||
constexpr static FileIdentifier file_identifier = 10726174;
|
||||
Key rangeID;
|
||||
Version version;
|
||||
KeyRange range;
|
||||
ReplyPromise<Void> reply;
|
||||
|
||||
ChangeFeedPopRequest() {}
|
||||
ChangeFeedPopRequest(Key const& rangeID, Version version) : rangeID(rangeID), version(version) {}
|
||||
ChangeFeedPopRequest(Key const& rangeID, Version version, KeyRange const& range)
|
||||
: rangeID(rangeID), version(version), range(range) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, rangeID, version, reply);
|
||||
serializer(ar, rangeID, version, range, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1034,19 +1034,26 @@ const KeyRangeRef changeFeedKeys(LiteralStringRef("\xff\x02/feed/"), LiteralStri
|
||||
const KeyRef changeFeedPrefix = changeFeedKeys.begin;
|
||||
const KeyRef changeFeedPrivatePrefix = LiteralStringRef("\xff\xff\x02/feed/");
|
||||
|
||||
const Value changeFeedValue(KeyRangeRef const& range) {
|
||||
const Value changeFeedValue(KeyRangeRef const& range, Version popVersion, bool stopped) {
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withChangeFeed()));
|
||||
wr << range;
|
||||
wr << popVersion;
|
||||
wr << stopped;
|
||||
return wr.toValue();
|
||||
}
|
||||
KeyRange decodeChangeFeedValue(ValueRef const& value) {
|
||||
|
||||
std::tuple<KeyRange, Version, bool> decodeChangeFeedValue(ValueRef const& value) {
|
||||
KeyRange range;
|
||||
Version version;
|
||||
bool stopped;
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> range;
|
||||
return range;
|
||||
reader >> version;
|
||||
reader >> stopped;
|
||||
return std::make_tuple(range, version, stopped);
|
||||
}
|
||||
|
||||
const KeyRangeRef changeFeedDurableKeys(LiteralStringRef("\xff\xff/rf/"), LiteralStringRef("\xff\xff/rf0"));
|
||||
const KeyRangeRef changeFeedDurableKeys(LiteralStringRef("\xff\xff/cf/"), LiteralStringRef("\xff\xff/cf0"));
|
||||
const KeyRef changeFeedDurablePrefix = changeFeedDurableKeys.begin;
|
||||
|
||||
const Value changeFeedDurableKey(Key const& feed, Version const& version) {
|
||||
|
@ -496,8 +496,8 @@ extern const ValueRef writeRecoveryKeyTrue;
|
||||
extern const KeyRef snapshotEndVersionKey;
|
||||
|
||||
extern const KeyRangeRef changeFeedKeys;
|
||||
const Value changeFeedValue(KeyRangeRef const& range);
|
||||
KeyRange decodeChangeFeedValue(ValueRef const& value);
|
||||
const Value changeFeedValue(KeyRangeRef const& range, Version popVersion, bool stopped);
|
||||
std::tuple<KeyRange, Version, bool> decodeChangeFeedValue(ValueRef const& value);
|
||||
extern const KeyRef changeFeedPrefix;
|
||||
extern const KeyRef changeFeedPrivatePrefix;
|
||||
|
||||
|
@ -410,7 +410,7 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
TEST(true); // Snapshot created, setting writeRecoveryKey in txnStateStore
|
||||
} else if (m.param1.startsWith(changeFeedPrefix)) {
|
||||
if (toCommit && keyInfo) {
|
||||
KeyRange r = decodeChangeFeedValue(m.param2);
|
||||
KeyRange r = std::get<0>(decodeChangeFeedValue(m.param2));
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
auto ranges = keyInfo->intersectingRanges(r);
|
||||
|
@ -1513,11 +1513,16 @@ ACTOR Future<Void> watchValueSendReply(StorageServer* data,
|
||||
}
|
||||
|
||||
ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req) {
|
||||
if (!self->isReadable(req.range)) {
|
||||
req.reply.sendError(wrong_shard_server());
|
||||
return Void();
|
||||
}
|
||||
|
||||
auto& feed = self->uidChangeFeed[req.rangeID];
|
||||
if (req.version - 1 > feed->emptyVersion) {
|
||||
feed->emptyVersion = req.version - 1;
|
||||
while (!feed->mutations.empty() && feed->mutations.front().version < req.version) {
|
||||
self->uidChangeFeed[req.rangeID]->mutations.pop_front();
|
||||
feed->mutations.pop_front();
|
||||
}
|
||||
if (feed->storageVersion != invalidVersion) {
|
||||
self->storage.clearRange(
|
||||
@ -1539,6 +1544,12 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
|
||||
ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChangeFeedsRequest req) {
|
||||
wait(delay(0));
|
||||
wait(data->version.whenAtLeast(req.minVersion));
|
||||
|
||||
if (!data->isReadable(req.range)) {
|
||||
req.reply.sendError(wrong_shard_server());
|
||||
return Void();
|
||||
}
|
||||
|
||||
auto ranges = data->keyChangeFeed.intersectingRanges(req.range);
|
||||
std::map<Key, KeyRange> rangeIds;
|
||||
for (auto r : ranges) {
|
||||
@ -1680,11 +1691,12 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
|
||||
wait(delay(0, TaskPriority::DefaultEndpoint));
|
||||
|
||||
state uint64_t changeCounter = data->shardChangeCounter;
|
||||
if (!data->isReadable(req.range)) {
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
|
||||
try {
|
||||
if (!data->isReadable(req.range)) {
|
||||
throw wrong_shard_server();
|
||||
}
|
||||
|
||||
loop {
|
||||
wait(req.reply.onReady());
|
||||
state ChangeFeedRequest feedRequest;
|
||||
@ -3120,7 +3132,7 @@ ACTOR Future<Void> fetchChangeFeed(StorageServer* data, Key rangeId, KeyRange ra
|
||||
data->addMutationToMutationLog(mLV,
|
||||
MutationRef(MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + rangeId.toString(),
|
||||
changeFeedValue(range)));
|
||||
changeFeedValue(range, invalidVersion, false)));
|
||||
} else {
|
||||
changeFeedInfo = data->uidChangeFeed[rangeId];
|
||||
}
|
||||
@ -3946,26 +3958,49 @@ private:
|
||||
m.param1.startsWith(changeFeedPrivatePrefix)) {
|
||||
if (m.type == MutationRef::SetValue) {
|
||||
Key changeFeedId = m.param1.removePrefix(changeFeedPrivatePrefix);
|
||||
KeyRange changeFeedRange = decodeChangeFeedValue(m.param2);
|
||||
TraceEvent("AddingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId.printable())
|
||||
.detail("Range", changeFeedRange.toString());
|
||||
Reference<ChangeFeedInfo> changeFeedInfo(new ChangeFeedInfo());
|
||||
changeFeedInfo->range = changeFeedRange;
|
||||
changeFeedInfo->id = changeFeedId;
|
||||
changeFeedInfo->emptyVersion = currentVersion - 1;
|
||||
data->uidChangeFeed[changeFeedId] = changeFeedInfo;
|
||||
auto rs = data->keyChangeFeed.modify(changeFeedRange);
|
||||
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
||||
r->value().push_back(changeFeedInfo);
|
||||
KeyRange changeFeedRange;
|
||||
Version popVersion;
|
||||
bool stopped;
|
||||
std::tie(changeFeedRange, popVersion, stopped) = decodeChangeFeedValue(m.param2);
|
||||
auto feed = data->uidChangeFeed.find(changeFeedId);
|
||||
if (feed == data->uidChangeFeed.end()) {
|
||||
TraceEvent("AddingChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId.printable())
|
||||
.detail("Range", changeFeedRange.toString());
|
||||
Reference<ChangeFeedInfo> changeFeedInfo(new ChangeFeedInfo());
|
||||
changeFeedInfo->range = changeFeedRange;
|
||||
changeFeedInfo->id = changeFeedId;
|
||||
changeFeedInfo->emptyVersion = currentVersion - 1;
|
||||
data->uidChangeFeed[changeFeedId] = changeFeedInfo;
|
||||
|
||||
auto rs = data->keyChangeFeed.modify(changeFeedRange);
|
||||
for (auto r = rs.begin(); r != rs.end(); ++r) {
|
||||
r->value().push_back(changeFeedInfo);
|
||||
}
|
||||
data->keyChangeFeed.coalesce(changeFeedRange.contents());
|
||||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||
data->addMutationToMutationLog(
|
||||
mLV,
|
||||
MutationRef(MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + changeFeedId.toString(),
|
||||
m.param2));
|
||||
} else {
|
||||
if (popVersion != invalidVersion && popVersion - 1 > feed->second->emptyVersion) {
|
||||
feed->second->emptyVersion = popVersion - 1;
|
||||
while (!feed->second->mutations.empty() &&
|
||||
feed->second->mutations.front().version < popVersion) {
|
||||
feed->second->mutations.pop_front();
|
||||
}
|
||||
if (feed->second->storageVersion != invalidVersion) {
|
||||
data->storage.clearRange(KeyRangeRef(changeFeedDurableKey(feed->second->id, 0),
|
||||
changeFeedDurableKey(feed->second->id, popVersion)));
|
||||
if (popVersion > feed->second->storageVersion) {
|
||||
feed->second->storageVersion = invalidVersion;
|
||||
feed->second->durableVersion = invalidVersion;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
data->keyChangeFeed.coalesce(changeFeedRange.contents());
|
||||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||
data->addMutationToMutationLog(
|
||||
mLV,
|
||||
MutationRef(MutationRef::SetValue,
|
||||
persistChangeFeedKeys.begin.toString() + changeFeedId.toString(),
|
||||
m.param2));
|
||||
} else {
|
||||
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
|
||||
auto beginFeed = m.param1.removePrefix(changeFeedPrivatePrefix);
|
||||
@ -4916,7 +4951,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
|
||||
state int feedLoc;
|
||||
for (feedLoc = 0; feedLoc < changeFeeds.size(); feedLoc++) {
|
||||
Key changeFeedId = changeFeeds[feedLoc].key.removePrefix(persistChangeFeedKeys.begin);
|
||||
KeyRange changeFeedRange = decodeChangeFeedValue(changeFeeds[feedLoc].value);
|
||||
KeyRange changeFeedRange = std::get<0>(decodeChangeFeedValue(changeFeeds[feedLoc].value));
|
||||
TraceEvent("RestoringChangeFeed", data->thisServerID)
|
||||
.detail("RangeID", changeFeedId.printable())
|
||||
.detail("Range", changeFeedRange.toString());
|
||||
|
Loading…
x
Reference in New Issue
Block a user