diff --git a/fdbclient/BlobGranuleCommon.h b/fdbclient/BlobGranuleCommon.h index 76bde814c9..fd7ef1d788 100644 --- a/fdbclient/BlobGranuleCommon.h +++ b/fdbclient/BlobGranuleCommon.h @@ -27,35 +27,6 @@ #include "fdbclient/CommitTransaction.h" #include "fdbclient/FDBTypes.h" -// TODO better place for this? It's used in change feeds too -struct MutationsAndVersionRef { - VectorRef<MutationRef> mutations; - Version version; - Version knownCommittedVersion; - - MutationsAndVersionRef() {} - explicit MutationsAndVersionRef(Version version, Version knownCommittedVersion) - : version(version), knownCommittedVersion(knownCommittedVersion) {} - MutationsAndVersionRef(VectorRef<MutationRef> mutations, Version version, Version knownCommittedVersion) - : mutations(mutations), version(version), knownCommittedVersion(knownCommittedVersion) {} - MutationsAndVersionRef(Arena& to, VectorRef<MutationRef> mutations, Version version, Version knownCommittedVersion) - : mutations(to, mutations), version(version), knownCommittedVersion(knownCommittedVersion) {} - MutationsAndVersionRef(Arena& to, const MutationsAndVersionRef& from) - : mutations(to, from.mutations), version(from.version), knownCommittedVersion(from.knownCommittedVersion) {} - int expectedSize() const { return mutations.expectedSize(); } - - struct OrderByVersion { - bool operator()(MutationsAndVersionRef const& a, MutationsAndVersionRef const& b) const { - return a.version < b.version; - } - }; - - template <class Ar> - void serialize(Ar& ar) { - serializer(ar, mutations, version, knownCommittedVersion); - } -}; - // TODO should GranuleSnapshot and GranuleDeltas just be typedefs instead of subclasses? // file format of actual blob files struct GranuleSnapshot : VectorRef<KeyValueRef> { @@ -78,14 +49,14 @@ struct GranuleDeltas : VectorRef<MutationsAndVersionRef> { }; // TODO better name? -struct BlobFilenameRef { +struct BlobFilePointerRef { constexpr static FileIdentifier file_identifier = 5253554; StringRef filename; int64_t offset; int64_t length; - BlobFilenameRef() {} - BlobFilenameRef(Arena& to, std::string filename, int64_t offset, int64_t length) + BlobFilePointerRef() {} + BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length) : filename(to, filename), offset(offset), length(length) {} template <class Ar> @@ -108,8 +79,8 @@ struct BlobGranuleChunkRef { constexpr static FileIdentifier file_identifier = 865198; KeyRangeRef keyRange; Version includedVersion; - Optional<BlobFilenameRef> snapshotFile; // not set if it's an incremental read - VectorRef<BlobFilenameRef> deltaFiles; + Optional<BlobFilePointerRef> snapshotFile; // not set if it's an incremental read + VectorRef<BlobFilePointerRef> deltaFiles; GranuleDeltas newDeltas; template <class Ar> diff --git a/fdbclient/BlobGranuleReader.actor.cpp b/fdbclient/BlobGranuleReader.actor.cpp index 696824084e..9211124ad5 100644 --- a/fdbclient/BlobGranuleReader.actor.cpp +++ b/fdbclient/BlobGranuleReader.actor.cpp @@ -41,7 +41,7 @@ #define BG_READ_DEBUG false ACTOR Future<Arena> readSnapshotFile(Reference<BackupContainerFileSystem> bstore, - BlobFilenameRef f, + BlobFilePointerRef f, KeyRangeRef keyRange, std::map<KeyRef, ValueRef>* dataMap) { try { @@ -112,7 +112,7 @@ ACTOR Future<Arena> readSnapshotFile(Reference<BackupContainerFileSystem> bstore } ACTOR Future<Standalone<GranuleDeltas>> readDeltaFile(Reference<BackupContainerFileSystem> bstore, - BlobFilenameRef f, + BlobFilePointerRef f, KeyRangeRef keyRange, Version readVersion) { try { @@ -302,7 +302,7 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk, state std::vector<Future<Standalone<GranuleDeltas>>> readDeltaFutures; readDeltaFutures.reserve(chunk.deltaFiles.size()); - for (BlobFilenameRef deltaFile : chunk.deltaFiles) { + for (BlobFilePointerRef deltaFile : chunk.deltaFiles) { readDeltaFutures.push_back(readDeltaFile(bstore, deltaFile, keyRange, readVersion)); if (stats.present()) { ++stats.get()->s3GetReqs; diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 880d67b664..b914c8a506 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -73,8 +73,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( KEY_SIZE_LIMIT, 1e4 ); init( SYSTEM_KEY_SIZE_LIMIT, 3e4 ); init( VALUE_SIZE_LIMIT, 1e5 ); - // TODO find better solution? - init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 );// if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - 31;//serverKeysPrefixFor(UID()).size() - 1; + init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 ); if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - 31;//serverKeysPrefixFor(UID()).size() - 1; init( METADATA_VERSION_CACHE_SIZE, 1000 ); init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1; @@ -253,7 +252,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( BUSYNESS_SPIKE_SATURATED_THRESHOLD, 0.500 ); // blob granules - init( ENABLE_BLOB_GRANULES, true ); + init( ENABLE_BLOB_GRANULES, false ); // clang-format on } diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index 34eb1a916d..2ef30ce4f9 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -217,4 +217,32 @@ struct CommitTransactionRef { } }; +struct MutationsAndVersionRef { + VectorRef<MutationRef> mutations; + Version version; + Version knownCommittedVersion; + + MutationsAndVersionRef() {} + explicit MutationsAndVersionRef(Version version, Version knownCommittedVersion) + : version(version), knownCommittedVersion(knownCommittedVersion) {} + MutationsAndVersionRef(VectorRef<MutationRef> mutations, Version version, Version knownCommittedVersion) + : mutations(mutations), version(version), knownCommittedVersion(knownCommittedVersion) {} + MutationsAndVersionRef(Arena& to, VectorRef<MutationRef> mutations, Version version, Version knownCommittedVersion) + : mutations(to, mutations), version(version), knownCommittedVersion(knownCommittedVersion) {} + MutationsAndVersionRef(Arena& to, const MutationsAndVersionRef& from) + : mutations(to, from.mutations), version(from.version), knownCommittedVersion(from.knownCommittedVersion) {} + int expectedSize() const { return mutations.expectedSize(); } + + struct OrderByVersion { + bool operator()(MutationsAndVersionRef const& a, MutationsAndVersionRef const& b) const { + return a.version < b.version; + } + }; + + template <class Ar> + void serialize(Ar& ar) { + serializer(ar, mutations, version, knownCommittedVersion); + } +}; + #endif diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index ad2bd97bf5..6fcc0299d5 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -268,11 +268,10 @@ public: Future<Void> popChangeFeedMutations(Key rangeID, Version version); Future<Void> getBlobGranuleRangesStream(const PromiseStream<KeyRange>& results, KeyRange range); - // TODO add optional for end version so it can do a GRV in the transaction it already has to do Future<Void> readBlobGranulesStream(const PromiseStream<Standalone<BlobGranuleChunkRef>>& results, KeyRange range, Version begin, - Version end); + Optional<Version> end); // private: explicit DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile, diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index ddbf0838b0..b46b8ddc46 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6744,16 +6744,8 @@ ACTOR Future<Void> singleChangeFeedStream(StorageServerInterface interf, state ChangeFeedStreamReply rep = waitNext(replyStream.getFuture()); begin = rep.mutations.back().version + 1; - /*if (rangeID.printable() == "8696f5d434d2247e73307a158f9f2a6b" && begin >= 234494075) { - printf(" NA CF %s [%s - %s) got %d at %lld\n", - rangeID.printable().c_str(), - range.begin.printable().c_str(), - range.end.printable().c_str(), - rep.mutations.size(), - begin); - }*/ state int resultLoc = 0; - // FIXME: fix better + // FIXME: handle empty versions properly while (resultLoc < rep.mutations.size()) { if (rep.mutations[resultLoc].mutations.size() || rep.mutations[resultLoc].version + 1 == end || (rep.mutations[resultLoc].mutations.empty() && @@ -7146,7 +7138,6 @@ Future<Void> DatabaseContext::popChangeFeedMutations(Key rangeID, Version versio #define BG_REQUEST_DEBUG false -// FIXME: code for discovering blob granules is similar enough that it could be refactored? It's pretty simple though ACTOR Future<Void> getBlobGranuleRangesStreamActor(Reference<DatabaseContext> db, PromiseStream<KeyRange> results, KeyRange keyRange) { @@ -7211,7 +7202,6 @@ ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db, state UID workerId; loop { try { - // FIXME NOW: handle errors, handle mapping changes // FIXME: Use streaming parallelism? // Read mapping and worker interfaces from DB loopCounter++; @@ -7237,7 +7227,6 @@ ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db, tr, blobGranuleMappingKeys.begin, keyRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); blobGranuleMapping = _bgMapping; if (blobGranuleMapping.more) { - // TODO REMOVE if (BG_REQUEST_DEBUG) { printf("BG Mapping for [%s - %s) too large!\n"); } @@ -7365,8 +7354,6 @@ ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db, results.sendError(end_of_stream()); return Void(); } catch (Error& e) { - // only print this error with exponential backoff - if (e.code() == error_code_actor_cancelled) { throw; } @@ -7381,7 +7368,6 @@ ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db, results.sendError(e); return Void(); } - // TODO add a wait here! } } } @@ -7389,7 +7375,7 @@ ACTOR Future<Void> readBlobGranulesStreamActor(Reference<DatabaseContext> db, Future<Void> DatabaseContext::readBlobGranulesStream(const PromiseStream<Standalone<BlobGranuleChunkRef>>& results, KeyRange range, Version begin, - Version end) { + Optional<Version> end) { if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { throw client_invalid_operation(); } diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index e04bfa9903..9b2a051c25 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -716,9 +716,9 @@ public: double LATENCY_METRICS_LOGGING_INTERVAL; // blob granule stuff - // TODO better place to put this or wire up blob config? - + // FIXME: configure url with database configuration instead of knob eventually std::string BG_URL; + int BG_SNAPSHOT_FILE_TARGET_BYTES; int BG_DELTA_FILE_TARGET_BYTES; int BG_DELTA_BYTES_BEFORE_COMPACT; diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index f47e9b7aa7..46dcd1406e 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -30,7 +30,6 @@ #include "fdbrpc/Stats.h" #include "fdbrpc/TimedRequest.h" #include "fdbrpc/TSSComparison.h" -#include "fdbclient/BlobGranuleCommon.h" #include "fdbclient/CommitTransaction.h" #include "fdbclient/TagThrottle.actor.h" #include "flow/UnitTest.h" diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 5ab492842e..bb99dbf140 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1122,66 +1122,34 @@ const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), Lite const uint8_t BG_FILE_TYPE_DELTA = 'D'; const uint8_t BG_FILE_TYPE_SNAPSHOT = 'S'; -// uids in blob granule file/split keys are serialized big endian so that incrementUID can create a prefix range for -// just that UID. - -// TODO: could move this to UID or sometwhere else? -UID incrementUID(UID uid) { - uint64_t first = uid.first(); - uint64_t second = uid.second() + 1; - // handle overflow increment of second - if (second == 0) { - first++; - // FIXME: assume we never generate max uid, for now - ASSERT(first != 0); - } - - return UID(first, second); -} - -void serializeUIDBigEndian(BinaryWriter& wr, UID uid) { - wr << bigEndian64(uid.first()); - wr << bigEndian64(uid.second()); -} - -UID deserializeUIDBigEndian(BinaryReader& reader) { - uint64_t first; - uint64_t second; - reader >> first; - reader >> second; - return UID(bigEndian64(first), bigEndian64(second)); -} - const Key blobGranuleFileKeyFor(UID granuleID, uint8_t fileType, Version fileVersion) { ASSERT(fileType == 'D' || fileType == 'S'); BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); wr.serializeBytes(blobGranuleFileKeys.begin); - serializeUIDBigEndian(wr, granuleID); + wr << granuleID; wr << fileType; wr << bigEndian64(fileVersion); return wr.toValue(); } std::tuple<UID, uint8_t, Version> decodeBlobGranuleFileKey(KeyRef const& key) { + UID granuleID; uint8_t fileType; Version fileVersion; BinaryReader reader(key.removePrefix(blobGranuleFileKeys.begin), AssumeVersion(ProtocolVersion::withBlobGranule())); - UID granuleID = deserializeUIDBigEndian(reader); + reader >> granuleID; reader >> fileType; reader >> fileVersion; ASSERT(fileType == 'D' || fileType == 'S'); return std::tuple(granuleID, fileType, bigEndian64(fileVersion)); } -Key bgFilePrefixKey(UID gid) { +const KeyRange blobGranuleFileKeyRangeFor(UID granuleID) { BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); wr.serializeBytes(blobGranuleFileKeys.begin); - serializeUIDBigEndian(wr, gid); - return wr.toValue(); -} - -const KeyRange blobGranuleFileKeyRangeFor(UID granuleID) { - return KeyRangeRef(bgFilePrefixKey(granuleID), bgFilePrefixKey(incrementUID(granuleID))); + wr << granuleID; + Key startKey = wr.toValue(); + return KeyRangeRef(startKey, strinc(startKey)); } const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length) { @@ -1244,30 +1212,29 @@ std::tuple<int64_t, int64_t, UID> decodeBlobGranuleLockValue(const ValueRef& val const Key blobGranuleSplitKeyFor(UID const& parentGranuleID, UID const& granuleID) { BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); wr.serializeBytes(blobGranuleSplitKeys.begin); - serializeUIDBigEndian(wr, parentGranuleID); - serializeUIDBigEndian(wr, granuleID); + wr << parentGranuleID; + wr << granuleID; return wr.toValue(); } std::pair<UID, UID> decodeBlobGranuleSplitKey(KeyRef const& key) { - + UID parentGranuleID; + UID granuleID; BinaryReader reader(key.removePrefix(blobGranuleSplitKeys.begin), AssumeVersion(ProtocolVersion::withBlobGranule())); - UID parentGranuleID = deserializeUIDBigEndian(reader); - UID currentGranuleID = deserializeUIDBigEndian(reader); - return std::pair(parentGranuleID, currentGranuleID); -} - -Key bgSplitPrefixKeyFor(UID gid) { - BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); - wr.serializeBytes(blobGranuleSplitKeys.begin); - serializeUIDBigEndian(wr, gid); - return wr.toValue(); + reader >> parentGranuleID; + reader >> granuleID; + return std::pair(parentGranuleID, granuleID); } const KeyRange blobGranuleSplitKeyRangeFor(UID const& parentGranuleID) { - return KeyRangeRef(bgSplitPrefixKeyFor(parentGranuleID), bgSplitPrefixKeyFor(incrementUID(parentGranuleID))); + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); + wr.serializeBytes(blobGranuleSplitKeys.begin); + wr << parentGranuleID; + + Key startKey = wr.toValue(); + return KeyRangeRef(startKey, strinc(startKey)); } const Value blobGranuleSplitValueFor(BlobGranuleSplitState st) { diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 9aab3ceffb..e6fb4f2e6e 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -533,9 +533,6 @@ extern const uint8_t BG_FILE_TYPE_SNAPSHOT; // \xff\x02/bgf/(granuleID, {snapshot|delta}, version) = [[filename]] extern const KeyRangeRef blobGranuleFileKeys; -// TODO could shrink the size of the mapping keyspace by using something similar to tags instead of UIDs. We'd probably -// want to do that in V1 or it'd be a big migration. - // \xff\x02/bgm/[[begin]] = [[BlobWorkerUID]] extern const KeyRangeRef blobGranuleMappingKeys; @@ -548,7 +545,6 @@ extern const KeyRangeRef blobGranuleSplitKeys; // \xff\x02/bgh/(start,end,version) = { granuleID, [parentGranuleHistoryKeys] } extern const KeyRangeRef blobGranuleHistoryKeys; -// FIXME: better way to represent file type than a string ref? const Key blobGranuleFileKeyFor(UID granuleID, uint8_t fileType, Version fileVersion); std::tuple<UID, uint8_t, Version> decodeBlobGranuleFileKey(ValueRef const& value); const KeyRange blobGranuleFileKeyRangeFor(UID granuleID); diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 87fc76701a..a74638c585 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -305,19 +305,6 @@ public: } } - std::set<std::string> getRolesSet(NetworkAddress const& address, bool skipWorkers = true) const { - std::set<std::string> roles; - auto addressIt = roleAddresses.find(address); - if (addressIt != roleAddresses.end()) { - for (auto& roleIt : addressIt->second) { - if ((!skipWorkers) || (roleIt.first != "Worker")) { - roles.insert(roleIt.first); - } - } - } - return roles; - } - std::string getRoles(NetworkAddress const& address, bool skipWorkers = true) const { auto addressIt = roleAddresses.find(address); std::string roleText; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index adaf94589c..4db38e23c3 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -27,7 +27,6 @@ #include "fdbclient/ReadYourWrites.h" #include "fdbclient/SystemData.h" #include "fdbserver/BlobManagerInterface.h" -#include "fdbserver/BlobWorker.actor.h" #include "fdbserver/Knobs.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index af8c3ab5f3..07925b35ef 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -32,10 +32,10 @@ #include "fdbclient/DatabaseContext.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/Notified.h" -#include "fdbserver/BlobWorker.actor.h" #include "fdbserver/Knobs.h" #include "fdbserver/MutationTracking.h" #include "fdbserver/WaitFailure.h" +#include "fdbserver/ServerDBInfo.h" #include "flow/Arena.h" #include "flow/Error.h" #include "flow/IRandom.h" @@ -793,7 +793,7 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData, state int64_t compactBytesRead = 0; state Version snapshotVersion = files.snapshotFiles.back().version; BlobFileIndex snapshotF = files.snapshotFiles.back(); - chunk.snapshotFile = BlobFilenameRef(filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length); + chunk.snapshotFile = BlobFilePointerRef(filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length); compactBytesRead += snapshotF.length; int deltaIdx = files.deltaFiles.size() - 1; while (deltaIdx >= 0 && files.deltaFiles[deltaIdx].version > snapshotVersion) { @@ -1920,7 +1920,7 @@ ACTOR Future<Void> handleBlobGranuleFileRequest(Reference<BlobWorkerData> bwData ASSERT(i >= 0); BlobFileIndex snapshotF = chunkFiles.snapshotFiles[i]; - chunk.snapshotFile = BlobFilenameRef(rep.arena, snapshotF.filename, snapshotF.offset, snapshotF.length); + chunk.snapshotFile = BlobFilePointerRef(rep.arena, snapshotF.filename, snapshotF.offset, snapshotF.length); Version snapshotVersion = chunkFiles.snapshotFiles[i].version; // handle delta files diff --git a/fdbserver/BlobWorker.actor.h b/fdbserver/BlobWorker.actor.h deleted file mode 100644 index 42d10a2de3..0000000000 --- a/fdbserver/BlobWorker.actor.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * BlobWorker.actor.h - * - * This source file is part of the FoundationDB open source project - * - * Copyright 2013-2019 Apple Inc. and the FoundationDB project authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_BLOBWORKER_ACTOR_G_H) -#define FDBSERVER_BLOBWORKER_ACTOR_G_H -#include "fdbserver/BlobWorker.actor.g.h" -#elif !defined(FDBSERVER_BLOBWORKER_ACTOR_H) -#define FDBSERVER_BLOBWORKER_ACTOR_H - -#include "fdbclient/BlobWorkerInterface.h" -#include "fdbserver/ServerDBInfo.h" -#include "flow/actorcompiler.h" // This must be the last #include. - -// TODO this whole file should go away once blob worker isn't embedded in other roles - -ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo); - -#include "flow/unactorcompiler.h" -#endif \ No newline at end of file diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 78fa6e8df9..8f664c0877 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -8,7 +8,6 @@ set(FDBSERVER_SRCS BlobManager.actor.cpp BlobManagerInterface.h BlobWorker.actor.cpp - BlobWorker.actor.h ClusterController.actor.cpp ConfigBroadcaster.actor.cpp ConfigBroadcaster.h diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 09d567fa1e..abb8ccc614 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -3706,8 +3706,6 @@ void checkBetterSingletons(ClusterControllerData* self) { // if we reach here, we know that the singletons are healthy so let's // check if we can colocate the singletons in a more optimal way - - // TODO: verify that we don't need to get the pid from the worker like we were doing before Optional<Standalone<StringRef>> currRKProcessId = rkSingleton.interface.get().locality.processId(); Optional<Standalone<StringRef>> currDDProcessId = ddSingleton.interface.get().locality.processId(); Optional<Standalone<StringRef>> newRKProcessId = newRKWorker.interf.locality.processId(); diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index f7fbc28d6a..94fcce6ed2 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -32,7 +32,6 @@ #include "fdbserver/ServerDBInfo.h" #include "fdbserver/WaitFailure.h" #include "flow/actorcompiler.h" // This must be the last #include. -#include <limits> enum limitReason_t { unlimited, // TODO: rename to workload? diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index efd67d5830..8fcd33bb2d 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -68,10 +68,6 @@ namespace { const int MACHINE_REBOOT_TIME = 10; -// The max number of extra blob worker machines we might (i.e. randomly) add to the simulated cluster. -// Note that this is in addition to the two we always have. -const int NUM_EXTRA_BW_MACHINES = 5; - bool destructed = false; // Configuration details specified in workload test files that change the simulation @@ -288,7 +284,7 @@ public: // Refer to FDBTypes.h::TLogVersion. Defaults to the maximum supported version. int maxTLogVersion = TLogVersion::MAX_SUPPORTED; // Set true to simplify simulation configs for easier debugging - bool simpleConfig = true; + bool simpleConfig = false; int extraMachineCountDC = 0; Optional<bool> generateFearless, buggify; Optional<int> datacenters, desiredTLogCount, commitProxyCount, grvProxyCount, resolverCount, storageEngineType, @@ -1635,14 +1631,7 @@ void SimulationConfig::setRegions(const TestConfig& testConfig) { void SimulationConfig::setMachineCount(const TestConfig& testConfig) { if (testConfig.machineCount.present()) { machine_count = testConfig.machineCount.get(); - } - /// TODO REMOVE! - else if (testConfig.simpleConfig) { - printf("Setting machine count to 1\n"); - machine_count = 1; - } - // - else if (generateFearless && testConfig.minimumReplication > 1) { + } else if (generateFearless && testConfig.minimumReplication > 1) { // low latency tests in fearless configurations need 4 machines per datacenter (3 for triple replication, 1 that // is down during failures). machine_count = 16; @@ -1689,7 +1678,7 @@ void SimulationConfig::setCoordinators(const TestConfig& testConfig) { void SimulationConfig::setProcessesPerMachine(const TestConfig& testConfig) { if (testConfig.processesPerMachine.present()) { processes_per_machine = testConfig.processesPerMachine.get(); - } else if (generateFearless || testConfig.simpleConfig) { // TODO CHANGE BACK + } else if (generateFearless) { processes_per_machine = 1; } else { processes_per_machine = deterministicRandom()->randomInt(1, (extraDB ? 14 : 28) / machine_count + 2); @@ -2020,19 +2009,16 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors, coordinatorCount); ASSERT_LE(dcCoordinators, machines); + // FIXME: temporarily code to test storage cache // TODO: caching disabled for this merge - // FIXME: we hardcode some machines to specifically test storage cache and blob workers - int storageCacheMachines = dc == 0 ? 1 : 0; - int blobWorkerMachines = 0; - if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { - blobWorkerMachines = 2 + deterministicRandom()->randomInt(0, NUM_EXTRA_BW_MACHINES + 1); + if (dc == 0) { + machines++; } - int totalMachines = machines + storageCacheMachines + blobWorkerMachines; - int useSeedForMachine = deterministicRandom()->randomInt(0, totalMachines); + int useSeedForMachine = deterministicRandom()->randomInt(0, machines); Standalone<StringRef> zoneId; Standalone<StringRef> newZoneId; - for (int machine = 0; machine < totalMachines; machine++) { + for (int machine = 0; machine < machines; machine++) { Standalone<StringRef> machineId(deterministicRandom()->randomUniqueID().toString()); if (machine == 0 || machineCount - dataCenters <= 4 || assignedMachines != 4 || simconfig.db.regions.size() || deterministicRandom()->random01() < 0.5) { @@ -2062,20 +2048,11 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors, } } - // FIXME: hack to add machines specifically to test storage cache and blob workers + // FIXME: temporarily code to test storage cache // TODO: caching disabled for this merge - - // `machines` here is the normal (non-temporary) machines that totalMachines comprises of - if (machine >= machines) { - if (storageCacheMachines > 0 && dc == 0) { - processClass = ProcessClass(ProcessClass::StorageCacheClass, ProcessClass::CommandLineSource); - nonVersatileMachines++; - storageCacheMachines--; - } else if (blobWorkerMachines > 0) { // add blob workers to every DC - processClass = ProcessClass(ProcessClass::BlobWorkerClass, ProcessClass::CommandLineSource); - nonVersatileMachines++; - blobWorkerMachines--; - } + if (machine == machines - 1 && dc == 0) { + processClass = ProcessClass(ProcessClass::StorageCacheClass, ProcessClass::CommandLineSource); + nonVersatileMachines++; } std::vector<IPAddress> ips; @@ -2322,4 +2299,4 @@ ACTOR void setupAndRun(std::string dataFolder, destructed = true; wait(Never()); ASSERT(false); -} +} \ No newline at end of file diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index a441bb65fa..d677ce0203 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -1657,8 +1657,6 @@ ACTOR Future<ChangeFeedReply> getChangeFeedMutations(StorageServer* data, Change auto feed = data->uidChangeFeed.find(req.rangeID); if (feed == data->uidChangeFeed.end()) { - // TODO should unknown_change_feed be replyable? - printf("Unknown change feed %s\n", req.rangeID.printable().c_str()); throw unknown_change_feed(); } state Version dequeVersion = data->version.get(); @@ -1756,13 +1754,15 @@ ACTOR Future<ChangeFeedReply> getChangeFeedMutations(StorageServer* data, Change } // TODO REMOVE or only do if mutation tracking is enabled - for (auto& mutations : reply.mutations) { - for (auto& m : mutations.mutations) { - DEBUG_MUTATION("ChangeFeedRead", mutations.version, m, data->thisServerID) - .detail("ChangeFeedID", req.rangeID) - .detail("ReqBegin", req.begin) - .detail("ReqEnd", req.end) - .detail("ReqRange", req.range); + if (MUTATION_TRACKING_ENABLED) { + for (auto& mutations : reply.mutations) { + for (auto& m : mutations.mutations) { + DEBUG_MUTATION("ChangeFeedRead", mutations.version, m, data->thisServerID) + .detail("ChangeFeedID", req.rangeID) + .detail("ReqBegin", req.begin) + .detail("ReqEnd", req.end) + .detail("ReqRange", req.range); + } } } @@ -1780,7 +1780,6 @@ ACTOR Future<Void> localChangeFeedStream(StorageServer* data, feedRequest.rangeID = rangeID; feedRequest.begin = begin; feedRequest.end = end; - // FIXME: this should set request range, otherwise filterMutations won't work?.. state ChangeFeedReply feedReply = wait(getChangeFeedMutations(data, feedRequest)); begin = feedReply.mutations.back().version + 1; state int resultLoc = 0; @@ -1828,8 +1827,7 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques state ChangeFeedRequest feedRequest; feedRequest.rangeID = req.rangeID; feedRequest.begin = begin; - // Set to min of request end and buffered version to skip any potentially partially buffered mutations - feedRequest.end = std::min(req.end, data->version.get() + 1); + feedRequest.end = req.end; feedRequest.range = req.range; ChangeFeedReply feedReply = wait(getChangeFeedMutations(data, feedRequest)); @@ -1851,8 +1849,6 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques } } } catch (Error& e) { - // TODO REMOVE - printf("CF Stream %s got error %s\n", req.rangeID.printable().c_str(), e.name()); if (e.code() != error_code_operation_obsolete) { if (!canReplyWith(e)) throw; @@ -5969,7 +5965,6 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData, throw internal_error(); } catch (Error& e) { - printf("SS %s crashed with error %s\n", self.thisServerID.toString().c_str(), e.name()); // If we die with an error before replying to the recruitment request, send the error to the recruiter // (ClusterController, and from there to the DataDistributionTeamCollection) if (!recruitReply.isSet()) @@ -6181,7 +6176,6 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData, throw internal_error(); } catch (Error& e) { - printf("SS %s crashed with error %s\n", self.thisServerID.toString().c_str(), e.name()); if (recovered.canBeSet()) recovered.send(Void()); if (storageServerTerminated(self, persistentData, e)) diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp index fb1e9c6505..5b451f3e3e 100644 --- a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -43,8 +43,6 @@ * To catch availability issues with the blob worker, it does a request to each granule at the end of the test. */ struct BlobGranuleVerifierWorkload : TestWorkload { - // TODO add delay on start so it can start with data - bool doSetup; double minDelay; double maxDelay; @@ -237,7 +235,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload { .detail("RequestType", initialRequest ? "RealTime" : "TimeTravel") .detail("FDBSize", fdb.size()) .detail("BlobSize", blob.first.size()); - // TODO debugging details! if (BGV_DEBUG) { printf("\nMismatch for [%s - %s) @ %lld (%s). F(%d) B(%d):\n", @@ -313,7 +310,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload { OldRead() {} OldRead(KeyRange range, Version v, RangeResult oldResult) : range(range), v(v), oldResult(oldResult) {} - // OldRead(const OldRead& other) : range(other.range), v(other.v), oldResult(other.oldResult) {} }; ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) { diff --git a/flow/ProtocolVersion.h b/flow/ProtocolVersion.h index 098ca011be..4d03564c1c 100644 --- a/flow/ProtocolVersion.h +++ b/flow/ProtocolVersion.h @@ -138,9 +138,9 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(0x0FDB00B070010000LL, StableInterfaces); PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TagThrottleValueReason); PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, SpanContext); - PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, ChangeFeed); PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TSS); - PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, BlobGranule); // TODO make this 7.1 or 7.2? + PROTOCOL_VERSION_FEATURE(0x0FDB00B071010001LL, ChangeFeed); + PROTOCOL_VERSION_FEATURE(0x0FDB00B071010001LL, BlobGranule); }; template <> @@ -157,7 +157,7 @@ struct Traceable<ProtocolVersion> : std::true_type { // // xyzdev // vvvv -constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B070010001LL); +constexpr ProtocolVersion currentProtocolVersion(0x0FDB00B071010001LL); // This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to // change when we reach version 10. static_assert(currentProtocolVersion.version() < 0x0FDB00B100000000LL, "Unexpected protocol version");