mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 02:37:02 +08:00
Support TLog encryption in commit proxy (#6942)
This PR add support for TLog encryption through commit proxy. The encryption is done on per-mutation basis. As CP writes mutations to TLog, it inserts encryption header alongside encrypted mutations. Storage server (and other consumers of TLog such as storage cache and backup worker) decrypts the mutations as they peek TLog.
This commit is contained in:
parent
c9366f8b01
commit
364644673f
fdbclient/include/fdbclient
fdbserver
ApplyMetadataMutation.cppBackupWorker.actor.cppCommitProxyServer.actor.cppEncryptKeyProxy.actor.cppGetEncryptCipherKeys.actor.cppMutationTracking.cppRestoreLoader.actor.cppSimKmsConnector.actor.cppStorageCache.actor.cppTLogServer.actor.cpp
include/fdbserver
ApplyMetadataMutation.hEncryptedMutationMessage.hGetEncryptCipherKeys.hTagPartitionedLogSystem.actor.h
storageserver.actor.cppflow
@ -79,6 +79,7 @@ struct MutationRef {
|
||||
CompareAndClear,
|
||||
Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */,
|
||||
Reserved_For_OTELSpanContextMessage,
|
||||
Reserved_For_EncryptedMutationMessage /* See fdbserver/EncryptedMutationMessage.actor.h */,
|
||||
MAX_ATOMIC_OP
|
||||
};
|
||||
// This is stored this way for serialization purposes.
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/ApplyMetadataMutation.h"
|
||||
#include "fdbserver/EncryptedMutationMessage.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
@ -67,13 +68,14 @@ public:
|
||||
ProxyCommitData& proxyCommitData_,
|
||||
Reference<ILogSystem> logSystem_,
|
||||
LogPushData* toCommit_,
|
||||
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* cipherKeys_,
|
||||
bool& confChange_,
|
||||
Version version,
|
||||
Version popVersion_,
|
||||
bool initialCommit_)
|
||||
: spanContext(spanContext_), dbgid(proxyCommitData_.dbgid), arena(arena_), mutations(mutations_),
|
||||
txnStateStore(proxyCommitData_.txnStateStore), toCommit(toCommit_), confChange(confChange_),
|
||||
logSystem(logSystem_), version(version), popVersion(popVersion_),
|
||||
txnStateStore(proxyCommitData_.txnStateStore), toCommit(toCommit_), cipherKeys(cipherKeys_),
|
||||
confChange(confChange_), logSystem(logSystem_), version(version), popVersion(popVersion_),
|
||||
vecBackupKeys(&proxyCommitData_.vecBackupKeys), keyInfo(&proxyCommitData_.keyInfo),
|
||||
cacheInfo(&proxyCommitData_.cacheInfo),
|
||||
uid_applyMutationsData(proxyCommitData_.firstProxy ? &proxyCommitData_.uid_applyMutationsData : nullptr),
|
||||
@ -108,6 +110,9 @@ private:
|
||||
// non-null if these mutations were part of a new commit handled by this commit proxy
|
||||
LogPushData* toCommit = nullptr;
|
||||
|
||||
// Cipher keys used to encrypt to be committed mutations
|
||||
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* cipherKeys = nullptr;
|
||||
|
||||
// Flag indicates if the configure is changed
|
||||
bool& confChange;
|
||||
|
||||
@ -152,6 +157,16 @@ private:
|
||||
bool dummyConfChange = false;
|
||||
|
||||
private:
|
||||
void writeMutation(const MutationRef& m) {
|
||||
if (forResolver || !SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
|
||||
toCommit->writeTypedMessage(m);
|
||||
} else {
|
||||
ASSERT(cipherKeys != nullptr);
|
||||
Arena arena;
|
||||
toCommit->writeTypedMessage(EncryptedMutationMessage::encryptMetadata(arena, *cipherKeys, m));
|
||||
}
|
||||
}
|
||||
|
||||
void checkSetKeyServersPrefix(MutationRef m) {
|
||||
if (!m.param1.startsWith(keyServersPrefix)) {
|
||||
return;
|
||||
@ -221,7 +236,7 @@ private:
|
||||
.detail("Tag", tag.toString());
|
||||
|
||||
toCommit->addTag(tag);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
}
|
||||
|
||||
@ -243,7 +258,7 @@ private:
|
||||
toCommit->writeTypedMessage(LogProtocolMessage(), true);
|
||||
TraceEvent(SevDebug, "SendingPrivatized_ServerTag", dbgid).detail("M", privatized);
|
||||
toCommit->addTag(tag);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
if (!initialCommit) {
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
@ -303,7 +318,7 @@ private:
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
TraceEvent(SevDebug, "SendingPrivatized_CacheTag", dbgid).detail("M", privatized);
|
||||
toCommit->addTag(cacheTag);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
|
||||
void checkSetConfigKeys(MutationRef m) {
|
||||
@ -354,7 +369,7 @@ private:
|
||||
toCommit->addTags(allSources);
|
||||
}
|
||||
TraceEvent(SevDebug, "SendingPrivatized_ChangeFeed", dbgid).detail("M", privatized);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
}
|
||||
|
||||
@ -408,7 +423,7 @@ private:
|
||||
if (tagV.present()) {
|
||||
TraceEvent(SevDebug, "SendingPrivatized_TSSID", dbgid).detail("M", privatized);
|
||||
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -437,7 +452,7 @@ private:
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
TraceEvent(SevDebug, "SendingPrivatized_TSSQuarantine", dbgid).detail("M", privatized);
|
||||
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
}
|
||||
|
||||
@ -560,7 +575,7 @@ private:
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
TraceEvent(SevDebug, "SendingPrivatized_GlobalKeys", dbgid).detail("M", privatized);
|
||||
toCommit->addTags(allTags);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
|
||||
// Generates private mutations for the target storage server, instructing it to create a checkpoint.
|
||||
@ -582,7 +597,7 @@ private:
|
||||
.detail("Checkpoint", checkpoint.toString());
|
||||
|
||||
toCommit->addTag(tag);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
}
|
||||
|
||||
@ -662,7 +677,7 @@ private:
|
||||
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
|
||||
TEST(true); // Tenant added to map
|
||||
@ -760,7 +775,7 @@ private:
|
||||
TraceEvent(SevDebug, "SendingPrivatized_ClearServerTag", dbgid).detail("M", privatized);
|
||||
|
||||
toCommit->addTag(decodeServerTagValue(kv.value));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
}
|
||||
// Might be a tss removal, which doesn't store a tag there.
|
||||
@ -784,7 +799,7 @@ private:
|
||||
TraceEvent(SevDebug, "SendingPrivatized_TSSClearServerTag", dbgid)
|
||||
.detail("M", privatized);
|
||||
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -969,7 +984,7 @@ private:
|
||||
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
|
||||
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSMapping", dbgid).detail("M", privatized);
|
||||
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
}
|
||||
|
||||
@ -996,7 +1011,7 @@ private:
|
||||
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
|
||||
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSQuarantine", dbgid).detail("M", privatized);
|
||||
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1050,7 +1065,7 @@ private:
|
||||
privatized.type = MutationRef::ClearRange;
|
||||
privatized.param1 = range.begin.withPrefix(systemKeys.begin, arena);
|
||||
privatized.param2 = range.end.withPrefix(systemKeys.begin, arena);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
writeMutation(privatized);
|
||||
}
|
||||
|
||||
TEST(true); // Tenant cleared from map
|
||||
@ -1146,9 +1161,9 @@ private:
|
||||
.detail("MBegin", mutationBegin)
|
||||
.detail("MEnd", mutationEnd);
|
||||
toCommit->addTags(allTags);
|
||||
toCommit->writeTypedMessage(mutationBegin);
|
||||
writeMutation(mutationBegin);
|
||||
toCommit->addTags(allTags);
|
||||
toCommit->writeTypedMessage(mutationEnd);
|
||||
writeMutation(mutationEnd);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1223,6 +1238,7 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
||||
Reference<ILogSystem> logSystem,
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
LogPushData* toCommit,
|
||||
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* pCipherKeys,
|
||||
bool& confChange,
|
||||
Version version,
|
||||
Version popVersion,
|
||||
@ -1234,6 +1250,7 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
||||
proxyCommitData,
|
||||
logSystem,
|
||||
toCommit,
|
||||
pCipherKeys,
|
||||
confChange,
|
||||
version,
|
||||
popVersion,
|
||||
|
@ -25,6 +25,8 @@
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/BackupInterface.h"
|
||||
#include "fdbserver/BackupProgress.actor.h"
|
||||
#include "fdbserver/EncryptedMutationMessage.h"
|
||||
#include "fdbserver/GetEncryptCipherKeys.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
@ -44,6 +46,7 @@ struct VersionedMessage {
|
||||
StringRef message;
|
||||
VectorRef<Tag> tags;
|
||||
Arena arena; // Keep a reference to the memory containing the message
|
||||
Arena decryptArena; // Arena used for decrypt buffer.
|
||||
size_t bytes; // arena's size when inserted, which can grow afterwards
|
||||
|
||||
VersionedMessage(LogMessageVersion v, StringRef m, const VectorRef<Tag>& t, const Arena& a)
|
||||
@ -53,7 +56,8 @@ struct VersionedMessage {
|
||||
|
||||
// Returns true if the message is a mutation that should be backuped, i.e.,
|
||||
// either key is not in system key space or is not a metadataVersionKey.
|
||||
bool isBackupMessage(MutationRef* m) const {
|
||||
bool isBackupMessage(MutationRef* m,
|
||||
const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys) {
|
||||
for (Tag tag : tags) {
|
||||
if (tag.locality == tagLocalitySpecial || tag.locality == tagLocalityTxs) {
|
||||
return false; // skip Txs mutations
|
||||
@ -71,10 +75,26 @@ struct VersionedMessage {
|
||||
TEST(true); // Returning false for OTELSpanContextMessage
|
||||
return false;
|
||||
}
|
||||
|
||||
reader >> *m;
|
||||
if (EncryptedMutationMessage::isNextIn(reader)) {
|
||||
// In case the mutation is encrypted, get the decrypted mutation and also update message to point to
|
||||
// the decrypted mutation.
|
||||
// We use dedicated arena for decrypt buffer, as the other arena is used to count towards backup lock bytes.
|
||||
*m = EncryptedMutationMessage::decrypt(reader, decryptArena, cipherKeys, &message);
|
||||
} else {
|
||||
reader >> *m;
|
||||
}
|
||||
return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey;
|
||||
}
|
||||
|
||||
void collectCipherDetailIfEncrypted(std::unordered_set<BlobCipherDetails>& cipherDetails) {
|
||||
ArenaReader reader(arena, message, AssumeVersion(g_network->protocolVersion()));
|
||||
if (EncryptedMutationMessage::isNextIn(reader)) {
|
||||
EncryptedMutationMessage emm;
|
||||
reader >> emm;
|
||||
cipherDetails.insert(emm.header.cipherTextDetails);
|
||||
cipherDetails.insert(emm.header.cipherHeaderDetails);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct BackupData {
|
||||
@ -89,6 +109,7 @@ struct BackupData {
|
||||
Version minKnownCommittedVersion;
|
||||
Version savedVersion; // Largest version saved to blob storage
|
||||
Version popVersion; // Largest version popped in NOOP mode, can be larger than savedVersion.
|
||||
Reference<AsyncVar<ServerDBInfo> const> db;
|
||||
AsyncVar<Reference<ILogSystem>> logSystem;
|
||||
Database cx;
|
||||
std::vector<VersionedMessage> messages;
|
||||
@ -245,7 +266,7 @@ struct BackupData {
|
||||
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
|
||||
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
|
||||
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
|
||||
pulledVersion(0), paused(false), lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)),
|
||||
db(db), pulledVersion(0), paused(false), lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)),
|
||||
cc("BackupWorker", myId.toString()) {
|
||||
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True);
|
||||
|
||||
@ -682,7 +703,10 @@ ACTOR static Future<Void> updateLogBytesWritten(BackupData* self,
|
||||
// Saves messages in the range of [0, numMsg) to a file and then remove these
|
||||
// messages. The file content format is a sequence of (Version, sub#, msgSize, message).
|
||||
// Note only ready backups are saved.
|
||||
ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int numMsg) {
|
||||
ACTOR Future<Void> saveMutationsToFile(BackupData* self,
|
||||
Version popVersion,
|
||||
int numMsg,
|
||||
std::unordered_set<BlobCipherDetails> cipherDetails) {
|
||||
state int blockSize = SERVER_KNOBS->BACKUP_FILE_BLOCK_BYTES;
|
||||
state std::vector<Future<Reference<IBackupFile>>> logFileFutures;
|
||||
state std::vector<Reference<IBackupFile>> logFiles;
|
||||
@ -691,6 +715,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
|
||||
state std::vector<Version> beginVersions; // logFiles' begin versions
|
||||
state KeyRangeMap<std::set<int>> keyRangeMap; // range to index in logFileFutures, logFiles, & blockEnds
|
||||
state std::vector<Standalone<StringRef>> mutations;
|
||||
state std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> cipherKeys;
|
||||
state int idx;
|
||||
|
||||
// Make sure all backups are ready, otherwise mutations will be lost.
|
||||
@ -742,11 +767,18 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
|
||||
.detail("File", logFiles[i]->getFileName());
|
||||
}
|
||||
|
||||
// Fetch cipher keys if any of the messages are encrypted.
|
||||
if (!cipherDetails.empty()) {
|
||||
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> getCipherKeysResult =
|
||||
wait(getEncryptCipherKeys(self->db, cipherDetails));
|
||||
cipherKeys = getCipherKeysResult;
|
||||
}
|
||||
|
||||
blockEnds = std::vector<int64_t>(logFiles.size(), 0);
|
||||
for (idx = 0; idx < numMsg; idx++) {
|
||||
const auto& message = self->messages[idx];
|
||||
auto& message = self->messages[idx];
|
||||
MutationRef m;
|
||||
if (!message.isBackupMessage(&m))
|
||||
if (!message.isBackupMessage(&m, cipherKeys))
|
||||
continue;
|
||||
|
||||
DEBUG_MUTATION("addMutation", message.version.version, m)
|
||||
@ -815,6 +847,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
|
||||
state Future<Void> uploadDelay = delay(SERVER_KNOBS->BACKUP_UPLOAD_DELAY);
|
||||
|
||||
state int numMsg = 0;
|
||||
state std::unordered_set<BlobCipherDetails> cipherDetails;
|
||||
Version lastPopVersion = popVersion;
|
||||
// index of last version's end position in self->messages
|
||||
int lastVersionIndex = 0;
|
||||
@ -826,7 +859,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
|
||||
popVersion = std::max(popVersion, self->minKnownCommittedVersion);
|
||||
}
|
||||
} else {
|
||||
for (const auto& message : self->messages) {
|
||||
for (auto& message : self->messages) {
|
||||
// message may be prefetched in peek; uncommitted message should not be uploaded.
|
||||
const Version version = message.getVersion();
|
||||
if (version > self->maxPopVersion())
|
||||
@ -836,6 +869,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
|
||||
lastVersion = popVersion;
|
||||
popVersion = version;
|
||||
}
|
||||
message.collectCipherDetailIfEncrypted(cipherDetails);
|
||||
numMsg++;
|
||||
}
|
||||
}
|
||||
@ -859,7 +893,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
|
||||
.detail("NumMsg", numMsg)
|
||||
.detail("MsgQ", self->messages.size());
|
||||
// save an empty file for old epochs so that log file versions are continuous
|
||||
wait(saveMutationsToFile(self, popVersion, numMsg));
|
||||
wait(saveMutationsToFile(self, popVersion, numMsg, cipherDetails));
|
||||
self->eraseMessages(numMsg);
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,9 @@
|
||||
#include "fdbserver/ApplyMetadataMutation.h"
|
||||
#include "fdbserver/ConflictSet.h"
|
||||
#include "fdbserver/DataDistributorInterface.h"
|
||||
#include "fdbserver/EncryptedMutationMessage.h"
|
||||
#include "fdbserver/FDBExecHelper.actor.h"
|
||||
#include "fdbserver/GetEncryptCipherKeys.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
@ -48,6 +50,7 @@
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/Knobs.h"
|
||||
@ -641,6 +644,9 @@ struct CommitBatchContext {
|
||||
std::set<Tag> writtenTags; // final set tags written to in the batch
|
||||
std::set<Tag> writtenTagsPreResolution; // tags written to in the batch not including any changes from the resolver.
|
||||
|
||||
// Cipher keys to be used to encrypt mutations
|
||||
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys;
|
||||
|
||||
CommitBatchContext(ProxyCommitData*, const std::vector<CommitTransactionRequest>*, const int);
|
||||
|
||||
void setupTraceBatch();
|
||||
@ -897,6 +903,27 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
||||
self->transactionResolverMap.swap(requests.transactionResolverMap);
|
||||
// Used to report conflicting keys
|
||||
self->txReadConflictRangeIndexMap.swap(requests.txReadConflictRangeIndexMap);
|
||||
|
||||
// Fetch cipher keys if needed.
|
||||
state Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>> getCipherKeys;
|
||||
if (SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
|
||||
static std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> defaultDomains = {
|
||||
{ SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME },
|
||||
{ ENCRYPT_HEADER_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME }
|
||||
};
|
||||
std::unordered_map<EncryptCipherDomainId, EncryptCipherDomainName> encryptDomains = defaultDomains;
|
||||
for (int t = 0; t < trs.size(); t++) {
|
||||
int64_t tenantId = trs[t].tenantInfo.tenantId;
|
||||
Optional<TenantName> tenantName = trs[t].tenantInfo.name;
|
||||
// TODO(yiwu): In raw access mode, use tenant prefix to figure out tenant id for user data
|
||||
if (tenantId != TenantInfo::INVALID_TENANT) {
|
||||
ASSERT(tenantName.present());
|
||||
encryptDomains[tenantId] = tenantName.get();
|
||||
}
|
||||
}
|
||||
getCipherKeys = getLatestEncryptCipherKeys(pProxyCommitData->db, encryptDomains);
|
||||
}
|
||||
|
||||
self->releaseFuture = releaseResolvingAfter(pProxyCommitData, self->releaseDelay, self->localBatchNumber);
|
||||
|
||||
if (self->localBatchNumber - self->pProxyCommitData->latestLocalCommitBatchLogging.get() >
|
||||
@ -922,6 +949,11 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
||||
"CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution");
|
||||
}
|
||||
|
||||
if (SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION) {
|
||||
std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>> cipherKeys = wait(getCipherKeys);
|
||||
self->cipherKeys = cipherKeys;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
@ -961,6 +993,7 @@ void applyMetadataEffect(CommitBatchContext* self) {
|
||||
self->pProxyCommitData->logSystem,
|
||||
self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations,
|
||||
/* pToCommit= */ nullptr,
|
||||
/* pCipherKeys= */ nullptr,
|
||||
self->forceRecovery,
|
||||
/* version= */ self->commitVersion,
|
||||
/* popVersion= */ 0,
|
||||
@ -1060,6 +1093,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
|
||||
pProxyCommitData->logSystem,
|
||||
trs[t].transaction.mutations,
|
||||
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS ? nullptr : &self->toCommit,
|
||||
SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION ? &self->cipherKeys : nullptr,
|
||||
self->forceRecovery,
|
||||
self->commitVersion,
|
||||
self->commitVersion + 1,
|
||||
@ -1111,6 +1145,22 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
|
||||
return Void();
|
||||
}
|
||||
|
||||
void writeMutation(CommitBatchContext* self, int64_t tenantId, const MutationRef& mutation) {
|
||||
static_assert(TenantInfo::INVALID_TENANT == ENCRYPT_INVALID_DOMAIN_ID);
|
||||
if (!SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION || tenantId == TenantInfo::INVALID_TENANT) {
|
||||
// TODO(yiwu): In raw access mode, use tenant prefix to figure out tenant id for user data
|
||||
bool isRawAccess = tenantId == TenantInfo::INVALID_TENANT && !isSystemKey(mutation.param1) &&
|
||||
!(mutation.type == MutationRef::ClearRange && isSystemKey(mutation.param2)) &&
|
||||
self->pProxyCommitData->db->get().client.tenantMode == TenantMode::REQUIRED;
|
||||
TEST(isRawAccess); // Raw access to tenant key space
|
||||
self->toCommit.writeTypedMessage(mutation);
|
||||
} else {
|
||||
Arena arena;
|
||||
self->toCommit.writeTypedMessage(
|
||||
EncryptedMutationMessage::encrypt(arena, self->cipherKeys, tenantId /*domainId*/, mutation));
|
||||
}
|
||||
}
|
||||
|
||||
/// This second pass through committed transactions assigns the actual mutations to the appropriate storage servers'
|
||||
/// tags
|
||||
ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
||||
@ -1127,6 +1177,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
||||
state Optional<ClientTrCommitCostEstimation>* trCost = &trs[self->transactionNum].commitCostEstimation;
|
||||
state int mutationNum = 0;
|
||||
state VectorRef<MutationRef>* pMutations = &trs[self->transactionNum].transaction.mutations;
|
||||
state int64_t tenantId = trs[self->transactionNum].tenantInfo.tenantId;
|
||||
|
||||
self->toCommit.addTransactionInfo(trs[self->transactionNum].spanContext);
|
||||
|
||||
@ -1184,7 +1235,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
||||
if (pProxyCommitData->cacheInfo[m.param1]) {
|
||||
self->toCommit.addTag(cacheTag);
|
||||
}
|
||||
self->toCommit.writeTypedMessage(m);
|
||||
writeMutation(self, tenantId, m);
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2));
|
||||
auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange);
|
||||
@ -1237,7 +1288,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
||||
if (pProxyCommitData->needsCacheTag(clearRange)) {
|
||||
self->toCommit.addTag(cacheTag);
|
||||
}
|
||||
self->toCommit.writeTypedMessage(m);
|
||||
writeMutation(self, tenantId, m);
|
||||
} else {
|
||||
UNREACHABLE();
|
||||
}
|
||||
@ -2308,6 +2359,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
|
||||
Reference<ILogSystem>(),
|
||||
mutations,
|
||||
/* pToCommit= */ nullptr,
|
||||
/* pCipherKeys= */ nullptr,
|
||||
confChanges,
|
||||
/* version= */ 0,
|
||||
/* popVersion= */ 0,
|
||||
@ -2399,7 +2451,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
|
||||
|
||||
// Wait until we can load the "real" logsystem, since we don't support switching them currently
|
||||
while (!(masterLifetime.isEqual(commitData.db->get().masterLifetime) &&
|
||||
commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
|
||||
commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION &&
|
||||
(!SERVER_KNOBS->ENABLE_TLOG_ENCRYPTION || commitData.db->get().encryptKeyProxy.present()))) {
|
||||
//TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch);
|
||||
wait(commitData.db->onChange());
|
||||
}
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include "flow/genericactors.actor.h"
|
||||
#include "flow/network.h"
|
||||
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <boost/mpl/not.hpp>
|
||||
#include <limits>
|
||||
#include <string>
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* GetCipherKeys.actor.cpp
|
||||
* GetEncryptCipherKeys.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
@ -38,7 +38,7 @@ ACTOR Future<Void> onEncryptKeyProxyChange(Reference<AsyncVar<ServerDBInfo> cons
|
||||
break;
|
||||
}
|
||||
}
|
||||
TraceEvent("GetCipherKeys_EncryptKeyProxyChanged")
|
||||
TraceEvent("GetEncryptCipherKeys_EncryptKeyProxyChanged")
|
||||
.detail("PreviousProxyId", previousProxyId.orDefault(UID()))
|
||||
.detail("CurrentProxyId", currentProxyId.orDefault(UID()));
|
||||
return Void();
|
||||
@ -50,19 +50,19 @@ ACTOR Future<EKPGetLatestBaseCipherKeysReply> getUncachedLatestEncryptCipherKeys
|
||||
Optional<EncryptKeyProxyInterface> proxy = db->get().encryptKeyProxy;
|
||||
if (!proxy.present()) {
|
||||
// Wait for onEncryptKeyProxyChange.
|
||||
TraceEvent("GetLatestCipherKeys_EncryptKeyProxyNotPresent");
|
||||
TraceEvent("GetLatestEncryptCipherKeys_EncryptKeyProxyNotPresent");
|
||||
return Never();
|
||||
}
|
||||
request.reply.reset();
|
||||
try {
|
||||
EKPGetLatestBaseCipherKeysReply reply = wait(proxy.get().getLatestBaseCipherKeys.getReply(request));
|
||||
if (reply.error.present()) {
|
||||
TraceEvent(SevWarn, "GetLatestCipherKeys_RequestFailed").error(reply.error.get());
|
||||
TraceEvent(SevWarn, "GetLatestEncryptCipherKeys_RequestFailed").error(reply.error.get());
|
||||
throw encrypt_keys_fetch_failed();
|
||||
}
|
||||
return reply;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("GetLatestCipherKeys_CaughtError").error(e);
|
||||
TraceEvent("GetLatestEncryptCipherKeys_CaughtError").error(e);
|
||||
if (e.code() == error_code_broken_promise) {
|
||||
// Wait for onEncryptKeyProxyChange.
|
||||
return Never();
|
||||
@ -81,7 +81,7 @@ ACTOR Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>
|
||||
state EKPGetLatestBaseCipherKeysRequest request;
|
||||
|
||||
if (!db.isValid()) {
|
||||
TraceEvent(SevError, "GetLatestCipherKeys_ServerDBInfoNotAvailable");
|
||||
TraceEvent(SevError, "GetLatestEncryptCipherKeys_ServerDBInfoNotAvailable");
|
||||
throw encrypt_ops_error();
|
||||
}
|
||||
|
||||
@ -114,7 +114,7 @@ ACTOR Future<std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>
|
||||
// Check for any missing cipher keys.
|
||||
for (auto& domain : request.encryptDomainInfos) {
|
||||
if (cipherKeys.count(domain.domainId) == 0) {
|
||||
TraceEvent(SevWarn, "GetLatestCipherKeys_KeyMissing").detail("DomainId", domain.domainId);
|
||||
TraceEvent(SevWarn, "GetLatestEncryptCipherKeys_KeyMissing").detail("DomainId", domain.domainId);
|
||||
throw encrypt_key_not_found();
|
||||
}
|
||||
}
|
||||
@ -133,19 +133,19 @@ ACTOR Future<EKPGetBaseCipherKeysByIdsReply> getUncachedEncryptCipherKeys(Refere
|
||||
Optional<EncryptKeyProxyInterface> proxy = db->get().encryptKeyProxy;
|
||||
if (!proxy.present()) {
|
||||
// Wait for onEncryptKeyProxyChange.
|
||||
TraceEvent("GetCipherKeys_EncryptKeyProxyNotPresent");
|
||||
TraceEvent("GetEncryptCipherKeys_EncryptKeyProxyNotPresent");
|
||||
return Never();
|
||||
}
|
||||
request.reply.reset();
|
||||
try {
|
||||
EKPGetBaseCipherKeysByIdsReply reply = wait(proxy.get().getBaseCipherKeysByIds.getReply(request));
|
||||
if (reply.error.present()) {
|
||||
TraceEvent(SevWarn, "GetCipherKeys_RequestFailed").error(reply.error.get());
|
||||
TraceEvent(SevWarn, "GetEncryptCipherKeys_RequestFailed").error(reply.error.get());
|
||||
throw encrypt_keys_fetch_failed();
|
||||
}
|
||||
return reply;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("GetCipherKeys_CaughtError").error(e);
|
||||
TraceEvent("GetEncryptCipherKeys_CaughtError").error(e);
|
||||
if (e.code() == error_code_broken_promise) {
|
||||
// Wait for onEncryptKeyProxyChange.
|
||||
return Never();
|
||||
@ -167,7 +167,7 @@ ACTOR Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> ge
|
||||
state EKPGetBaseCipherKeysByIdsRequest request;
|
||||
|
||||
if (!db.isValid()) {
|
||||
TraceEvent(SevError, "GetCipherKeys_ServerDBInfoNotAvailable");
|
||||
TraceEvent(SevError, "GetEncryptCipherKeys_ServerDBInfoNotAvailable");
|
||||
throw encrypt_ops_error();
|
||||
}
|
||||
|
||||
@ -204,7 +204,7 @@ ACTOR Future<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> ge
|
||||
BaseCipherIndex baseIdx = std::make_pair(details.encryptDomainId, details.baseCipherId);
|
||||
const auto& itr = baseCipherKeys.find(baseIdx);
|
||||
if (itr == baseCipherKeys.end()) {
|
||||
TraceEvent(SevError, "GetCipherKeys_KeyMissing")
|
||||
TraceEvent(SevError, "GetEncryptCipherKeys_KeyMissing")
|
||||
.detail("DomainId", details.encryptDomainId)
|
||||
.detail("BaseCipherId", details.baseCipherId);
|
||||
throw encrypt_key_not_found();
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbserver/EncryptedMutationMessage.h"
|
||||
#include "fdbserver/MutationTracking.h"
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/SpanContextMessage.h"
|
||||
@ -102,6 +103,8 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
|
||||
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
|
||||
OTELSpanContextMessage scm;
|
||||
br >> scm;
|
||||
} else if (EncryptedMutationMessage::startsEncryptedMutationMessage(mutationType)) {
|
||||
throw encrypt_unsupported();
|
||||
} else {
|
||||
MutationRef m;
|
||||
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "flow/UnitTest.h"
|
||||
#include "fdbclient/BackupContainer.h"
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbserver/EncryptedMutationMessage.h"
|
||||
#include "fdbserver/RestoreLoader.actor.h"
|
||||
#include "fdbserver/RestoreRoleCommon.actor.h"
|
||||
#include "fdbserver/MutationTracking.h"
|
||||
@ -422,6 +423,9 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
|
||||
ASSERT(inserted);
|
||||
|
||||
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(g_network->protocolVersion()));
|
||||
if (EncryptedMutationMessage::isNextIn(rd)) {
|
||||
throw encrypt_unsupported();
|
||||
}
|
||||
MutationRef mutation;
|
||||
rd >> mutation;
|
||||
|
||||
|
@ -139,7 +139,7 @@ ACTOR Future<Void> ekLookupByDomainIds(Reference<SimKmsConnectorContext> ctx,
|
||||
req.debugId.present() ? TraceEvent("SimKmsGetsByDomIds", interf.id()) : Optional<TraceEvent>();
|
||||
|
||||
if (dbgDIdTrace.present()) {
|
||||
dbgDIdTrace.get().detail("DbgId", req.debugId.get());
|
||||
dbgDIdTrace.get().setMaxEventLength(16384).detail("DbgId", req.debugId.get());
|
||||
}
|
||||
|
||||
// Map encryptionDomainId to corresponding EncryptKeyCtx element using a modulo operation. This
|
||||
|
@ -23,6 +23,8 @@
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/EncryptedMutationMessage.h"
|
||||
#include "fdbserver/GetEncryptCipherKeys.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
@ -1874,6 +1876,9 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
|
||||
|
||||
state FetchInjectionInfo fii;
|
||||
state Reference<ILogSystem::IPeekCursor> cloneCursor2;
|
||||
state Optional<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> cipherKeys;
|
||||
state bool collectingCipherKeys = false;
|
||||
// If encrypted mutation is encountered, we collect cipher details and fetch cipher keys, then start over.
|
||||
loop {
|
||||
state uint64_t changeCounter = data->cacheRangeChangeCounter;
|
||||
bool epochEnd = false;
|
||||
@ -1881,6 +1886,8 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
|
||||
bool firstMutation = true;
|
||||
bool dbgLastMessageWasProtocol = false;
|
||||
|
||||
std::unordered_set<BlobCipherDetails> cipherDetails;
|
||||
|
||||
Reference<ILogSystem::IPeekCursor> cloneCursor1 = cursor->cloneNoMore();
|
||||
cloneCursor2 = cursor->cloneNoMore();
|
||||
|
||||
@ -1904,36 +1911,60 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
|
||||
OTELSpanContextMessage::isNextIn(cloneReader)) {
|
||||
OTELSpanContextMessage scm;
|
||||
cloneReader >> scm;
|
||||
} else if (cloneReader.protocolVersion().hasEncryptionAtRest() &&
|
||||
EncryptedMutationMessage::isNextIn(cloneReader) && !cipherKeys.present()) {
|
||||
// Encrypted mutation found, but cipher keys haven't been fetch.
|
||||
// Collect cipher details to fetch cipher keys in one batch.
|
||||
EncryptedMutationMessage emm;
|
||||
cloneReader >> emm;
|
||||
cipherDetails.insert(emm.header.cipherTextDetails);
|
||||
cipherDetails.insert(emm.header.cipherHeaderDetails);
|
||||
collectingCipherKeys = true;
|
||||
} else {
|
||||
MutationRef msg;
|
||||
cloneReader >> msg;
|
||||
|
||||
if (firstMutation && msg.param1.startsWith(systemKeys.end))
|
||||
hasPrivateData = true;
|
||||
firstMutation = false;
|
||||
|
||||
if (msg.param1 == lastEpochEndPrivateKey) {
|
||||
epochEnd = true;
|
||||
// ASSERT(firstMutation);
|
||||
ASSERT(dbgLastMessageWasProtocol);
|
||||
if (cloneReader.protocolVersion().hasEncryptionAtRest() &&
|
||||
EncryptedMutationMessage::isNextIn(cloneReader)) {
|
||||
assert(cipherKeys.present());
|
||||
msg = EncryptedMutationMessage::decrypt(cloneReader, cloneReader.arena(), cipherKeys.get());
|
||||
} else {
|
||||
cloneReader >> msg;
|
||||
}
|
||||
|
||||
dbgLastMessageWasProtocol = false;
|
||||
if (!collectingCipherKeys) {
|
||||
if (firstMutation && msg.param1.startsWith(systemKeys.end))
|
||||
hasPrivateData = true;
|
||||
firstMutation = false;
|
||||
|
||||
if (msg.param1 == lastEpochEndPrivateKey) {
|
||||
epochEnd = true;
|
||||
// ASSERT(firstMutation);
|
||||
ASSERT(dbgLastMessageWasProtocol);
|
||||
}
|
||||
|
||||
dbgLastMessageWasProtocol = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Any fetchKeys which are ready to transition their cacheRanges to the adding,transferred state do so
|
||||
// now. If there is an epoch end we skip this step, to increase testability and to prevent inserting a
|
||||
// version in the middle of a rolled back version range.
|
||||
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
|
||||
auto fk = data->readyFetchKeys.back();
|
||||
data->readyFetchKeys.pop_back();
|
||||
fk.send(&fii);
|
||||
if (collectingCipherKeys) {
|
||||
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> result =
|
||||
wait(getEncryptCipherKeys(data->db, cipherDetails));
|
||||
cipherKeys = result;
|
||||
collectingCipherKeys = false;
|
||||
} else {
|
||||
// Any fetchKeys which are ready to transition their cacheRanges to the adding,transferred state do
|
||||
// so now. If there is an epoch end we skip this step, to increase testability and to prevent
|
||||
// inserting a version in the middle of a rolled back version range.
|
||||
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
|
||||
auto fk = data->readyFetchKeys.back();
|
||||
data->readyFetchKeys.pop_back();
|
||||
fk.send(&fii);
|
||||
}
|
||||
if (data->cacheRangeChangeCounter == changeCounter)
|
||||
break;
|
||||
// TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read
|
||||
// it again.
|
||||
}
|
||||
if (data->cacheRangeChangeCounter == changeCounter)
|
||||
break;
|
||||
// TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it
|
||||
// again.
|
||||
}
|
||||
|
||||
data->debug_inApplyUpdate = true;
|
||||
@ -1988,7 +2019,11 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
|
||||
reader >> oscm;
|
||||
} else {
|
||||
MutationRef msg;
|
||||
reader >> msg;
|
||||
if (reader.protocolVersion().hasEncryptionAtRest() && EncryptedMutationMessage::isNextIn(reader)) {
|
||||
msg = EncryptedMutationMessage::decrypt(reader, reader.arena(), cipherKeys.get());
|
||||
} else {
|
||||
reader >> msg;
|
||||
}
|
||||
|
||||
if (ver != invalidVersion) // This change belongs to a version < minVersion
|
||||
{
|
||||
|
@ -28,7 +28,6 @@
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/SpanContextMessage.h"
|
||||
#include "fdbserver/TLogInterface.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
@ -33,6 +33,7 @@
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "fdbserver/ProxyCommitData.actor.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
#include "flow/FastRef.h"
|
||||
|
||||
// Resolver's data for applyMetadataMutations() calls.
|
||||
@ -93,6 +94,7 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
||||
Reference<ILogSystem> logSystem,
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
LogPushData* pToCommit,
|
||||
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>* pCipherKeys,
|
||||
bool& confChange,
|
||||
Version version,
|
||||
Version popVersion,
|
||||
|
117
fdbserver/include/fdbserver/EncryptedMutationMessage.h
Normal file
117
fdbserver/include/fdbserver/EncryptedMutationMessage.h
Normal file
@ -0,0 +1,117 @@
|
||||
/*
|
||||
* EncryptedMutationMessage.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBSERVER_ENCRYPTEDMUTATIONMESSAGE_H
|
||||
#define FDBSERVER_ENCRYPTEDMUTATIONMESSAGE_H
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "flow/BlobCipher.h"
|
||||
|
||||
struct EncryptedMutationMessage {
|
||||
|
||||
BlobCipherEncryptHeader header;
|
||||
StringRef encrypted;
|
||||
|
||||
EncryptedMutationMessage() {}
|
||||
|
||||
std::string toString() const {
|
||||
return format("code: %d, encryption info: %s",
|
||||
MutationRef::Reserved_For_EncryptedMutationMessage,
|
||||
header.toString().c_str());
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
uint8_t poly = MutationRef::Reserved_For_EncryptedMutationMessage;
|
||||
serializer(ar, poly, header, encrypted);
|
||||
}
|
||||
|
||||
static bool startsEncryptedMutationMessage(uint8_t byte) {
|
||||
return byte == MutationRef::Reserved_For_EncryptedMutationMessage;
|
||||
}
|
||||
template <class Ar>
|
||||
static bool isNextIn(Ar& ar) {
|
||||
return startsEncryptedMutationMessage(*(const uint8_t*)ar.peekBytes(1));
|
||||
}
|
||||
|
||||
// Encrypt given mutation and return an EncryptedMutationMessage.
|
||||
static EncryptedMutationMessage encrypt(
|
||||
Arena& arena,
|
||||
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>& cipherKeys,
|
||||
const EncryptCipherDomainId& domainId,
|
||||
const MutationRef& mutation) {
|
||||
ASSERT_NE(domainId, ENCRYPT_INVALID_DOMAIN_ID);
|
||||
auto textCipherItr = cipherKeys.find(domainId);
|
||||
auto headerCipherItr = cipherKeys.find(ENCRYPT_HEADER_DOMAIN_ID);
|
||||
ASSERT(textCipherItr != cipherKeys.end() && textCipherItr->second.isValid());
|
||||
ASSERT(headerCipherItr != cipherKeys.end() && headerCipherItr->second.isValid());
|
||||
uint8_t iv[AES_256_IV_LENGTH];
|
||||
generateRandomData(iv, AES_256_IV_LENGTH);
|
||||
BinaryWriter bw(AssumeVersion(g_network->protocolVersion()));
|
||||
bw << mutation;
|
||||
EncryptedMutationMessage encrypted_mutation;
|
||||
EncryptBlobCipherAes265Ctr cipher(textCipherItr->second,
|
||||
headerCipherItr->second,
|
||||
iv,
|
||||
AES_256_IV_LENGTH,
|
||||
ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE);
|
||||
encrypted_mutation.encrypted =
|
||||
cipher
|
||||
.encrypt(static_cast<const uint8_t*>(bw.getData()), bw.getLength(), &encrypted_mutation.header, arena)
|
||||
->toStringRef();
|
||||
return encrypted_mutation;
|
||||
}
|
||||
|
||||
// Encrypt system key space mutation and return an EncryptedMutationMessage.
|
||||
static EncryptedMutationMessage encryptMetadata(
|
||||
Arena& arena,
|
||||
const std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKey>>& cipherKeys,
|
||||
const MutationRef& mutation) {
|
||||
return encrypt(arena, cipherKeys, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, mutation);
|
||||
}
|
||||
|
||||
// Read an EncryptedMutationMessage from given reader, decrypt and return the encrypted mutation.
|
||||
// Also return decrypt buffer through buf, if it is specified.
|
||||
template <class Ar>
|
||||
static MutationRef decrypt(Ar& ar,
|
||||
Arena& arena,
|
||||
const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys,
|
||||
StringRef* buf = nullptr) {
|
||||
EncryptedMutationMessage msg;
|
||||
ar >> msg;
|
||||
auto textCipherItr = cipherKeys.find(msg.header.cipherTextDetails);
|
||||
auto headerCipherItr = cipherKeys.find(msg.header.cipherHeaderDetails);
|
||||
ASSERT(textCipherItr != cipherKeys.end() && textCipherItr->second.isValid());
|
||||
ASSERT(headerCipherItr != cipherKeys.end() && headerCipherItr->second.isValid());
|
||||
DecryptBlobCipherAes256Ctr cipher(textCipherItr->second, headerCipherItr->second, msg.header.iv);
|
||||
StringRef plaintext =
|
||||
cipher.decrypt(msg.encrypted.begin(), msg.encrypted.size(), msg.header, arena)->toStringRef();
|
||||
if (buf != nullptr) {
|
||||
*buf = plaintext;
|
||||
}
|
||||
ArenaReader reader(arena, plaintext, AssumeVersion(g_network->protocolVersion()));
|
||||
MutationRef mutation;
|
||||
reader >> mutation;
|
||||
return mutation;
|
||||
}
|
||||
};
|
||||
#endif
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* GetCipherKeys.h
|
||||
* GetEncryptCipherKeys.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -32,7 +32,6 @@
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/DBCoreState.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "fdbserver/RecoveryState.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
|
@ -52,7 +52,9 @@
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/TransactionLineage.h"
|
||||
#include "fdbclient/VersionedMap.h"
|
||||
#include "fdbserver/EncryptedMutationMessage.h"
|
||||
#include "fdbserver/FDBExecHelper.actor.h"
|
||||
#include "fdbserver/GetEncryptCipherKeys.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/LatencyBandConfig.h"
|
||||
@ -7092,7 +7094,11 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
state UpdateEagerReadInfo eager;
|
||||
state FetchInjectionInfo fii;
|
||||
state Reference<ILogSystem::IPeekCursor> cloneCursor2;
|
||||
state Optional<std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>> cipherKeys;
|
||||
state bool collectingCipherKeys = false;
|
||||
|
||||
// Collect eager read keys.
|
||||
// If encrypted mutation is encountered, we collect cipher details and fetch cipher keys, then start over.
|
||||
loop {
|
||||
state uint64_t changeCounter = data->shardChangeCounter;
|
||||
bool epochEnd = false;
|
||||
@ -7100,6 +7106,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
bool firstMutation = true;
|
||||
bool dbgLastMessageWasProtocol = false;
|
||||
|
||||
std::unordered_set<BlobCipherDetails> cipherDetails;
|
||||
|
||||
Reference<ILogSystem::IPeekCursor> cloneCursor1 = cursor->cloneNoMore();
|
||||
cloneCursor2 = cursor->cloneNoMore();
|
||||
|
||||
@ -7122,47 +7130,72 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
OTELSpanContextMessage::isNextIn(cloneReader)) {
|
||||
OTELSpanContextMessage scm;
|
||||
cloneReader >> scm;
|
||||
} else if (cloneReader.protocolVersion().hasEncryptionAtRest() &&
|
||||
EncryptedMutationMessage::isNextIn(cloneReader) && !cipherKeys.present()) {
|
||||
// Encrypted mutation found, but cipher keys haven't been fetch.
|
||||
// Collect cipher details to fetch cipher keys in one batch.
|
||||
EncryptedMutationMessage emm;
|
||||
cloneReader >> emm;
|
||||
cipherDetails.insert(emm.header.cipherTextDetails);
|
||||
cipherDetails.insert(emm.header.cipherHeaderDetails);
|
||||
collectingCipherKeys = true;
|
||||
} else {
|
||||
MutationRef msg;
|
||||
cloneReader >> msg;
|
||||
if (cloneReader.protocolVersion().hasEncryptionAtRest() &&
|
||||
EncryptedMutationMessage::isNextIn(cloneReader)) {
|
||||
assert(cipherKeys.present());
|
||||
msg = EncryptedMutationMessage::decrypt(cloneReader, eager.arena, cipherKeys.get());
|
||||
} else {
|
||||
cloneReader >> msg;
|
||||
}
|
||||
// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg);
|
||||
|
||||
if (firstMutation && msg.param1.startsWith(systemKeys.end))
|
||||
hasPrivateData = true;
|
||||
firstMutation = false;
|
||||
if (!collectingCipherKeys) {
|
||||
if (firstMutation && msg.param1.startsWith(systemKeys.end))
|
||||
hasPrivateData = true;
|
||||
firstMutation = false;
|
||||
|
||||
if (msg.param1 == lastEpochEndPrivateKey) {
|
||||
epochEnd = true;
|
||||
ASSERT(dbgLastMessageWasProtocol);
|
||||
if (msg.param1 == lastEpochEndPrivateKey) {
|
||||
epochEnd = true;
|
||||
ASSERT(dbgLastMessageWasProtocol);
|
||||
}
|
||||
|
||||
eager.addMutation(msg);
|
||||
dbgLastMessageWasProtocol = false;
|
||||
}
|
||||
|
||||
eager.addMutation(msg);
|
||||
dbgLastMessageWasProtocol = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
|
||||
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a
|
||||
// version in the middle of a rolled back version range.
|
||||
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
|
||||
auto fk = data->readyFetchKeys.back();
|
||||
data->readyFetchKeys.pop_back();
|
||||
fk.send(&fii);
|
||||
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this
|
||||
// actor until it was completed.
|
||||
if (collectingCipherKeys) {
|
||||
std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>> getCipherKeysResult =
|
||||
wait(getEncryptCipherKeys(data->db, cipherDetails));
|
||||
cipherKeys = getCipherKeysResult;
|
||||
collectingCipherKeys = false;
|
||||
eager = UpdateEagerReadInfo();
|
||||
} else {
|
||||
// Any fetchKeys which are ready to transition their shards to the adding,transferred state do so now.
|
||||
// If there is an epoch end we skip this step, to increase testability and to prevent inserting a
|
||||
// version in the middle of a rolled back version range.
|
||||
while (!hasPrivateData && !epochEnd && !data->readyFetchKeys.empty()) {
|
||||
auto fk = data->readyFetchKeys.back();
|
||||
data->readyFetchKeys.pop_back();
|
||||
fk.send(&fii);
|
||||
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this
|
||||
// actor until it was completed.
|
||||
}
|
||||
|
||||
for (auto& c : fii.changes)
|
||||
eager.addMutations(c.mutations);
|
||||
|
||||
wait(doEagerReads(data, &eager));
|
||||
if (data->shardChangeCounter == changeCounter)
|
||||
break;
|
||||
TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it
|
||||
// again.
|
||||
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads
|
||||
// only selectively
|
||||
eager = UpdateEagerReadInfo();
|
||||
}
|
||||
|
||||
for (auto& c : fii.changes)
|
||||
eager.addMutations(c.mutations);
|
||||
|
||||
wait(doEagerReads(data, &eager));
|
||||
if (data->shardChangeCounter == changeCounter)
|
||||
break;
|
||||
TEST(true); // A fetchKeys completed while we were doing this, so eager might be outdated. Read it
|
||||
// again.
|
||||
// SOMEDAY: Theoretically we could check the change counters of individual shards and retry the reads
|
||||
// only selectively
|
||||
eager = UpdateEagerReadInfo();
|
||||
}
|
||||
data->eagerReadsLatencyHistogram->sampleSeconds(now() - start);
|
||||
|
||||
@ -7255,7 +7288,12 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
spanContext = scm.spanContext;
|
||||
} else {
|
||||
MutationRef msg;
|
||||
rd >> msg;
|
||||
if (rd.protocolVersion().hasEncryptionAtRest() && EncryptedMutationMessage::isNextIn(rd)) {
|
||||
ASSERT(cipherKeys.present());
|
||||
msg = EncryptedMutationMessage::decrypt(rd, rd.arena(), cipherKeys.get());
|
||||
} else {
|
||||
rd >> msg;
|
||||
}
|
||||
|
||||
Span span("SS:update"_loc, spanContext);
|
||||
span.addAttribute("key"_sr, msg.param1);
|
||||
@ -7435,7 +7473,9 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
||||
return Void(); // update will get called again ASAP
|
||||
} catch (Error& err) {
|
||||
state Error e = err;
|
||||
if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
|
||||
if (e.code() == error_code_encrypt_keys_fetch_failed) {
|
||||
TraceEvent(SevWarn, "SSUpdateError", data->thisServerID).error(e).backtrace();
|
||||
} else if (e.code() != error_code_worker_removed && e.code() != error_code_please_reboot) {
|
||||
TraceEvent(SevError, "SSUpdateError", data->thisServerID).error(e).backtrace();
|
||||
} else if (e.code() == error_code_please_reboot) {
|
||||
wait(data->durableInProgress);
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "flow/EncryptUtils.h"
|
||||
#include "flow/Trace.h"
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/format.hpp>
|
||||
|
||||
std::string getEncryptDbgTraceKey(std::string_view prefix,
|
||||
@ -29,12 +30,15 @@ std::string getEncryptDbgTraceKey(std::string_view prefix,
|
||||
Optional<EncryptCipherBaseKeyId> baseCipherId) {
|
||||
// Construct the TraceEvent field key ensuring its uniqueness and compliance to TraceEvent field validator and log
|
||||
// parsing tools
|
||||
std::string dName = domainName.toString();
|
||||
// Underscores are invalid in trace event detail name.
|
||||
boost::replace_all(dName, "_", "-");
|
||||
if (baseCipherId.present()) {
|
||||
boost::format fmter("%s.%lld.%s.%llu");
|
||||
return boost::str(boost::format(fmter % prefix % domainId % domainName.toString() % baseCipherId.get()));
|
||||
return boost::str(boost::format(fmter % prefix % domainId % dName % baseCipherId.get()));
|
||||
} else {
|
||||
boost::format fmter("%s.%lld.%s");
|
||||
return boost::str(boost::format(fmter % prefix % domainId % domainName.toString()));
|
||||
return boost::str(boost::format(fmter % prefix % domainId % dName));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,14 +29,14 @@
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
|
||||
#define ENCRYPT_INVALID_DOMAIN_ID 0
|
||||
#define ENCRYPT_INVALID_DOMAIN_ID -1
|
||||
#define ENCRYPT_INVALID_CIPHER_KEY_ID 0
|
||||
#define ENCRYPT_INVALID_RANDOM_SALT 0
|
||||
|
||||
#define AUTH_TOKEN_SIZE 16
|
||||
|
||||
#define SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID -1
|
||||
#define ENCRYPT_HEADER_DOMAIN_ID -2
|
||||
#define SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID -2
|
||||
#define ENCRYPT_HEADER_DOMAIN_ID -3
|
||||
|
||||
const std::string FDB_DEFAULT_ENCRYPT_DOMAIN_NAME = "FdbDefaultEncryptDomain";
|
||||
|
||||
|
@ -172,6 +172,7 @@ public: // introduced features
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ResolverPrivateMutations);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, EncryptionAtRest);
|
||||
};
|
||||
|
||||
template <>
|
||||
|
@ -315,6 +315,7 @@ ERROR( encrypt_update_cipher, 2705, "Attempt to update encryption cipher key")
|
||||
ERROR( encrypt_invalid_id, 2706, "Invalid encryption cipher details")
|
||||
ERROR( encrypt_keys_fetch_failed, 2707, "Encryption keys fetch from external KMS failed")
|
||||
ERROR( encrypt_invalid_kms_config, 2708, "Invalid encryption/kms configuration: discovery-url, validation-token, endpoint etc.")
|
||||
ERROR( encrypt_unsupported, 2709, "Encryption not supported")
|
||||
|
||||
// 4xxx Internal errors (those that should be generated only by bugs) are decimal 4xxx
|
||||
ERROR( unknown_error, 4000, "An unknown error occurred" ) // C++ exception not of type Error
|
||||
|
Loading…
x
Reference in New Issue
Block a user