Added checksum in MutationRef (#11181)

* Append checksum to param2.

* Pass sim tests w/o validating checksums.

* Code cleanup.

* Renew checksum.

* Remove checksum for all private mutations.

* Added checksum validation at SS.

* Fixed VERSION_TIMESTAMP.

* Disable Mutation Checksum by default.

* Cleanup.

* cleanup.
This commit is contained in:
He Liu 2024-02-09 13:36:41 -08:00 committed by GitHub
parent 13efe686f1
commit 9d8d52cbb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 136 additions and 11 deletions

View File

@ -326,6 +326,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( REST_KMS_ALLOW_NOT_SECURE_CONNECTION, false ); if ( randomize && BUGGIFY ) REST_KMS_ALLOW_NOT_SECURE_CONNECTION = !REST_KMS_ALLOW_NOT_SECURE_CONNECTION;
init( SIM_KMS_VAULT_MAX_KEYS, 4096 );
init( ENABLE_MUTATION_CHECKSUM, false ); // if ( randomize && BUGGIFY ) ENABLE_MUTATION_CHECKSUM = true; Enable this after deserialiser is ported to 7.3.
// clang-format on
}

View File

@ -301,6 +301,7 @@ inline void transformVersionstampMutation(MutationRef& mutation,
StringRef MutationRef::*param,
Version version,
uint16_t transactionNumber) {
mutation.removeChecksum();
if ((mutation.*param).size() >= 4) {
int32_t pos = parseVersionstampOffset(mutation.*param);
mutation.*param = (mutation.*param).substr(0, (mutation.*param).size() - 4);

View File

@ -325,6 +325,8 @@ public:
bool REST_KMS_ALLOW_NOT_SECURE_CONNECTION;
int SIM_KMS_VAULT_MAX_KEYS;
bool ENABLE_MUTATION_CHECKSUM;
ClientKnobs(Randomize randomize);
void initialize(Randomize randomize);
};

View File

@ -29,7 +29,9 @@
#include "fdbclient/Tracing.h"
#include "flow/EncryptUtils.h"
#include "flow/Knobs.h"
#include "flow/UnitTest.h"
#include "crc32/crc32c.h"
#include <unordered_set>
// The versioned message has wire format : -1, version, messages
@ -57,10 +59,13 @@ static const char* typeString[] = { "SetValue",
"AndV2",
"CompareAndClear",
"Reserved_For_SpanContextMessage",
"Reserved_For_OTELSpanContextMessage",
"Encrypted",
"MAX_ATOMIC_OP" };
struct MutationRef {
static const int OVERHEAD_BYTES = 12; // 12 is the size of Header in MutationList entries
static const uint8_t CHECKSUM_FLAG_MASK = 128U;
enum Type : uint8_t {
SetValue = 0,
ClearRange,
@ -88,16 +93,22 @@ struct MutationRef {
Encrypted, /* Represents an encrypted mutation and cannot be used directly before decrypting */
MAX_ATOMIC_OP
};
// This is stored this way for serialization purposes.
// This is stored this way for serialization purposes. Note: the first bit of `type` is used to indicate whether a
// checksum is appended to `param2`, when checksum is enabled, the bit is set during serialization, and reset during
// deserialization.
uint8_t type;
StringRef param1, param2;
Optional<uint32_t> checksum;
MutationRef() : type(MAX_ATOMIC_OP) {}
MutationRef(Type t, StringRef a, StringRef b) : type(t), param1(a), param2(b) {}
MutationRef(Arena& to, Type t, StringRef a, StringRef b) : type(t), param1(to, a), param2(to, b) {}
MutationRef(Arena& to, const MutationRef& from)
: type(from.type), param1(to, from.param1), param2(to, from.param2) {}
int totalSize() const { return OVERHEAD_BYTES + param1.size() + param2.size(); }
int totalSize() const {
return OVERHEAD_BYTES + param1.size() + param2.size() + (checksum.present() ? sizeof(uint32_t) + 1 : 1);
}
int expectedSize() const { return param1.size() + param2.size(); }
int weightedTotalSize() const {
// AtomicOp can cause more workload to FDB cluster than the same-size set mutation;
@ -111,27 +122,97 @@ struct MutationRef {
}
std::string toString() const {
return format("code: %s param1: %s param2: %s",
type < MutationRef::MAX_ATOMIC_OP ? typeString[(int)type] : "Unset",
std::string checksumStr;
if (checksum.present()) {
checksumStr = format("checksum: %s ", std::to_string(checksum.get()).c_str());
}
uint8_t cType = type & ~CHECKSUM_FLAG_MASK;
return format("%scode: %s param1: %s param2: %s",
checksumStr.c_str(),
cType < MutationRef::MAX_ATOMIC_OP ? typeString[(int)cType] : "Unset",
printable(param1).c_str(),
printable(param2).c_str());
}
uint8_t typeWithChecksum() const { return this->type | CHECKSUM_FLAG_MASK; }
Optional<uint32_t> removeChecksum() {
this->checksum.reset();
if (!withChecksum()) {
return Optional<uint32_t>();
}
ASSERT(this->param2.size() >= 4);
const int idx = this->param2.size() - 4;
const uint32_t pc = *(const uint32_t*)(this->param2.substr(idx).begin());
this->type &= ~CHECKSUM_FLAG_MASK;
this->param2 = this->param2.substr(0, idx);
return pc;
}
bool withChecksum() const { return this->type & CHECKSUM_FLAG_MASK; }
bool isAtomicOp() const { return (ATOMIC_MASK & (1 << type)) != 0; }
bool isValid() const { return type < MAX_ATOMIC_OP; }
uint32_t populateChecksum() {
ASSERT(!this->withChecksum());
uint32_t c = crc32c_append(static_cast<uint32_t>(this->type), param1.begin(), param1.size());
crc32c_append(c, param2.begin(), param2.size());
if (this->checksum.present()) {
if (this->checksum.get() != c) {
TraceEvent(SevError, "MutationRefChecksumMismatch")
.detail("CalculatedChecksum", std::to_string(c))
.detail("Mutatino", toString());
}
} else {
this->checksum = c;
}
return c;
}
bool validateChecksum() const {
if (!checksum.present()) {
return true;
}
uint32_t c = crc32c_append(static_cast<uint32_t>(this->type), param1.begin(), param1.size());
crc32c_append(c, param2.begin(), param2.size());
return c == checksum.get();
}
template <class Ar>
void serialize(Ar& ar) {
if (ar.isSerializing && type == ClearRange && equalsKeyAfter(param1, param2)) {
StringRef empty;
serializer(ar, type, param2, empty);
if (!isEncrypted() && ar.protocolVersion().hasMutationChecksum() &&
CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM) {
uint32_t c = populateChecksum();
StringRef cs = StringRef((uint8_t*)&c, 4);
uint8_t cType = this->typeWithChecksum();
serializer(ar, cType, param2, cs);
} else {
serializer(ar, type, param2, empty);
}
} else if (!isEncrypted() && ar.isSerializing && ar.protocolVersion().hasMutationChecksum() &&
CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM) {
uint32_t c = populateChecksum();
StringRef cs = StringRef((uint8_t*)&c, 4);
uint8_t cType = this->typeWithChecksum();
Standalone<StringRef> param2WithChecksum = param2.withSuffix(cs);
StringRef p2 = param2WithChecksum;
serializer(ar, cType, param1, p2);
} else {
serializer(ar, type, param1, param2);
}
if (ar.isDeserializing && type == ClearRange && param2 == StringRef() && param1 != StringRef()) {
ASSERT(param1[param1.size() - 1] == '\x00');
param2 = param1;
param1 = param2.substr(0, param2.size() - 1);
if (ar.isDeserializing) {
if (withChecksum()) {
checksum = removeChecksum();
}
if (type == ClearRange && param2 == StringRef() && param1 != StringRef()) {
ASSERT(param1[param1.size() - 1] == '\x00');
param2 = param1;
param1 = param2.substr(0, param2.size() - 1);
}
validateChecksum();
}
}
@ -498,4 +579,24 @@ struct EncryptedMutationsAndVersionRef {
};
};
TEST_CASE("noSim/CommitTransaction/MutationRef") {
printf("testing MutationRef encoding/decoding\n");
MutationRef m(MutationRef::SetValue, "TestKey"_sr, "TestValue"_sr);
BinaryWriter wr(AssumeVersion(ProtocolVersion::withMutationChecksum()));
wr << m;
Standalone<StringRef> value = wr.toValue();
TraceEvent("EncodedMutation").detail("RawBytes", value);
BinaryReader rd(value, AssumeVersion(ProtocolVersion::withBlobGranule()));
Standalone<MutationRef> de;
rd >> de;
printf("Deserialized mutation: %s\n", de.toString().c_str());
return Void();
}
#endif

View File

@ -258,6 +258,7 @@ private:
Tag tag = decodeServerTagValue(
txnStateStore->readValue(serverTagKeyFor(serverKeysDecodeServer(m.param1))).get().get());
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivateMutation", dbgid)
.detail("Original", m)
@ -281,6 +282,7 @@ private:
if (toCommit) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent("ServerTag", dbgid).detail("Server", id).detail("Tag", tag.toString());
@ -324,6 +326,7 @@ private:
// This is done to make the storage servers aware of the cached key-ranges
if (toCommit) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
//TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString());
cachedRangeInfo[k] = privatized;
@ -346,6 +349,7 @@ private:
// Create a private mutation for cache servers
// This is done to make the cache servers aware of the cached key-ranges
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_CacheTag", dbgid).detail("M", privatized);
toCommit->addTag(cacheTag);
@ -386,6 +390,7 @@ private:
if (toCommit && keyInfo) {
KeyRange r = std::get<0>(decodeChangeFeedValue(m.param2));
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
auto ranges = keyInfo->intersectingRanges(r);
auto firstRange = ranges.begin();
@ -450,6 +455,7 @@ private:
if (toCommit) {
// send private mutation to SS that it now has a TSS pair
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
@ -482,6 +488,7 @@ private:
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_TSSQuarantine", dbgid).detail("M", privatized);
toCommit->addTag(decodeServerTagValue(tagV.get()));
@ -647,6 +654,7 @@ private:
}
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_GlobalKeys", dbgid).detail("M", privatized);
toCommit->addTags(allTags);
@ -670,6 +678,7 @@ private:
}
const Tag tag = decodeServerTagValue(tagValue.get());
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent("SendingPrivateMutationCheckpoint", dbgid)
.detail("Original", m)
@ -786,6 +795,7 @@ private:
toCommit->addTags(allTags);
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
writeMutation(privatized);
}
@ -915,6 +925,7 @@ private:
if (toCommit) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = kv.key.withPrefix(systemKeys.begin, arena);
privatized.param2 = keyAfter(privatized.param1, arena);
@ -938,6 +949,7 @@ private:
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = maybeTssRange.begin.withPrefix(systemKeys.begin, arena);
privatized.param2 =
keyAfter(maybeTssRange.begin, arena).withPrefix(systemKeys.begin, arena);
@ -1135,6 +1147,7 @@ private:
// send private mutation to SS to notify that it no longer has a tss pair
if (Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); tagV.present()) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSMapping", dbgid).detail("M", privatized);
@ -1162,6 +1175,7 @@ private:
tagV.present()) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSQuarantine", dbgid).detail("M", privatized);
@ -1242,6 +1256,7 @@ private:
toCommit->addTags(allTags);
MutationRef privatized;
privatized.removeChecksum();
privatized.type = MutationRef::ClearRange;
privatized.param1 = systemKeys.begin.withSuffix(std::max(range.begin, subspace.begin), arena);
if (range.end < subspace.end) {

View File

@ -551,12 +551,12 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self,
wr << (uint8_t)hashlittle(&version, sizeof(version), 0);
wr << bigEndian64(commitVersion);
MutationRef backupMutation;
backupMutation.type = MutationRef::SetValue;
uint32_t* partBuffer = nullptr;
for (int part = 0; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < val.size(); part++) {
MutationRef backupMutation;
backupMutation.type = MutationRef::SetValue;
// Assign the second parameter as the part
backupMutation.param2 = val.substr(
part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE,

View File

@ -11204,6 +11204,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
cipherKeys.get(), eager.arena, BlobCipherMetrics::TLOG, nullptr, &decryptionTimeV);
decryptionTime += decryptionTimeV;
}
} else {
ASSERT(msg.validateChecksum());
}
// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg);
@ -12447,6 +12449,7 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
const char* debugContext) {
for (const auto& m : mutations) {
DEBUG_MUTATION(debugContext, debugVersion, m, data->thisServerID);
ASSERT(m.validateChecksum());
if (m.type == MutationRef::SetValue) {
storage->set(KeyValueRef(m.param1, m.param2));
*kvCommitLogicalBytes += m.expectedSize();

View File

@ -177,6 +177,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(@FDB_PV_BLOB_GRANULE_FILE_LOGICAL_SIZE@, BlobGranuleFileLogicalSize);
PROTOCOL_VERSION_FEATURE(@FDB_PV_BLOB_RANGE_CHANGE_LOG@, BlobRangeChangeLog);
PROTOCOL_VERSION_FEATURE(@FDB_PV_GC_TXN_GENERATIONS@, GcTxnGenerations);
PROTOCOL_VERSION_FEATURE(@FDB_PV_MUTATION_CHECKSUM@, MutationChecksum);
};
template <>

View File

@ -93,3 +93,4 @@ set(FDB_PV_CLUSTER_ID_SPECIAL_KEY "0x0FDB00B072000000LL")
set(FDB_PV_BLOB_GRANULE_FILE_LOGICAL_SIZE "0x0FDB00B072000000LL")
set(FDB_PV_BLOB_RANGE_CHANGE_LOG "0x0FDB00B072000000LL")
set(FDB_PV_GC_TXN_GENERATIONS "0x0FDB00B073000000LL")
set(FDB_PV_MUTATION_CHECKSUM "0x0FDB00B074000000LL")