Encrypt BlobGranule delta files (#7735)

* Encrypt  BlobGranule delta files

Description

 diff-1: Address review comments

Major changes proposed by the patch are:
1. Refactor code to allow caching of 'encryption key ctx' as part of
BlobFilePointerRef. The refactoring allows snapshot and/or delta files
to store their own file encryption context.
2. Enable BlobGranule delta file encryption/decryption semantics.

Testing

BlobGranuleCorrrectness  
BlobGranuleCorrectnessClean
BlobGranuleFileUnitTestToml

Description

Testing
This commit is contained in:
Ata E Husain Bohra 2022-08-01 16:34:44 -07:00 committed by GitHub
parent 51b92d59b9
commit ef6012c1d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 307 additions and 114 deletions

View File

@ -26,6 +26,7 @@
#include "fdbclient/Knobs.h"
#include "fdbclient/SystemData.h" // for allKeys unit test - could remove
#include "flow/Arena.h"
#include "flow/BlobCipher.h"
#include "flow/CompressionUtils.h"
#include "flow/DeterministicRandom.h"
@ -293,6 +294,7 @@ struct IndexBlockRef {
if (encryptHeaderRef.present()) {
CODE_PROBE(true, "reading encrypted chunked file");
ASSERT(cipherKeysCtx.present());
decrypt(cipherKeysCtx.get(), *this, arena);
} else {
TraceEvent("IndexBlockSize").detail("Sz", buffer.size());
@ -656,10 +658,19 @@ Value serializeFileFromChunks(Standalone<IndexedBlobGranuleFile>& file,
// TODO: this should probably be in actor file with yields? - move writing logic to separate actor file in server?
// TODO: optimize memory copying
// TODO: sanity check no oversized files
Value serializeChunkedSnapshot(Standalone<GranuleSnapshot> snapshot,
Value serializeChunkedSnapshot(const Standalone<StringRef>& fileNameRef,
Standalone<GranuleSnapshot> snapshot,
int targetChunkBytes,
Optional<CompressionFilter> compressFilter,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent(SevDebug, "SerializeChunkedSnapshot")
.detail("FileName", fileNameRef.toString())
.detail("Encrypted", cipherKeysCtx.present())
.detail("Compressed", compressFilter.present());
}
CODE_PROBE(compressFilter.present(), "serializing compressed snapshot file");
CODE_PROBE(cipherKeysCtx.present(), "serializing encrypted snapshot file");
Standalone<IndexedBlobGranuleFile> file;
@ -717,12 +728,21 @@ Value serializeChunkedSnapshot(Standalone<GranuleSnapshot> snapshot,
}
// TODO: use redwood prefix trick to optimize cpu comparison
static Arena loadSnapshotFile(const StringRef& snapshotData,
static Arena loadSnapshotFile(const Standalone<StringRef>& fileName,
const StringRef& snapshotData,
const KeyRangeRef& keyRange,
std::map<KeyRef, ValueRef>& dataMap,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
Arena rootArena;
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent(SevDebug, "LoadChunkedSnapshot")
.detail("FileName", fileName.toString())
.detail("RangeBegin", keyRange.begin.printable())
.detail("RangeEnd", keyRange.end.printable())
.detail("Encrypted", cipherKeysCtx.present());
}
Standalone<IndexedBlobGranuleFile> file = IndexedBlobGranuleFile::fromFileBytes(snapshotData, cipherKeysCtx);
ASSERT(file.fileType == SNAPSHOT_FILE_TYPE);
@ -879,11 +899,21 @@ void sortDeltasByKey(const Standalone<GranuleDeltas>& deltasByVersion,
}
// FIXME: Could maybe reduce duplicated code between this and chunkedSnapshot for chunking
Value serializeChunkedDeltaFile(Standalone<GranuleDeltas> deltas,
Value serializeChunkedDeltaFile(const Standalone<StringRef>& fileNameRef,
Standalone<GranuleDeltas> deltas,
const KeyRangeRef& fileRange,
int chunkSize,
Optional<CompressionFilter> compressFilter,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent(SevDebug, "SerializeChunkedDelta")
.detail("Filename", fileNameRef.toString())
.detail("RangeBegin", fileRange.begin.printable())
.detail("RangeEnd", fileRange.end.printable())
.detail("Encrypted", cipherKeysCtx.present())
.detail("Compressed", compressFilter.present());
}
CODE_PROBE(compressFilter.present(), "serializing compressed delta file");
CODE_PROBE(cipherKeysCtx.present(), "serializing encrypted delta file");
Standalone<IndexedBlobGranuleFile> file;
@ -1066,12 +1096,22 @@ void applyDeltasSorted(const Standalone<VectorRef<ParsedDeltaBoundaryRef>>& sort
// The arena owns the BoundaryDeltaRef struct data but the StringRef pointers point to data in deltaData, to avoid extra
// copying
Arena loadChunkedDeltaFile(const StringRef& deltaData,
Arena loadChunkedDeltaFile(const Standalone<StringRef>& fileNameRef,
const StringRef& deltaData,
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
std::map<KeyRef, ValueRef>& dataMap,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent(SevDebug, "LoadChunkedDelta")
.detail("FileName", fileNameRef.toString())
.detail("RangeBegin", keyRange.begin.printable())
.detail("RangeEnd", keyRange.end.printable())
.detail("Encrypted", cipherKeysCtx.present());
}
Standalone<VectorRef<ParsedDeltaBoundaryRef>> deltas;
Standalone<IndexedBlobGranuleFile> file = IndexedBlobGranuleFile::fromFileBytes(deltaData, cipherKeysCtx);
@ -1237,7 +1277,13 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
}
if (snapshotData.present()) {
Arena snapshotArena = loadSnapshotFile(snapshotData.get(), requestRange, dataMap, chunk.cipherKeysCtx);
ASSERT(chunk.snapshotFile.present());
Arena snapshotArena = loadSnapshotFile(chunk.snapshotFile.get().filename,
snapshotData.get(),
requestRange,
dataMap,
chunk.snapshotFile.get().cipherKeysCtx);
arena.dependsOn(snapshotArena);
}
@ -1245,8 +1291,13 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
fmt::print("Applying {} delta files\n", chunk.deltaFiles.size());
}
for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) {
Arena deltaArena = loadChunkedDeltaFile(
deltaFileData[deltaIdx], requestRange, beginVersion, readVersion, dataMap, chunk.cipherKeysCtx);
Arena deltaArena = loadChunkedDeltaFile(chunk.deltaFiles[deltaIdx].filename,
deltaFileData[deltaIdx],
requestRange,
beginVersion,
readVersion,
dataMap,
chunk.deltaFiles[deltaIdx].cipherKeysCtx);
arena.dependsOn(deltaArena);
}
if (BG_READ_DEBUG) {
@ -1644,12 +1695,14 @@ TEST_CASE("/blobgranule/files/deltaAtVersion") {
void checkSnapshotEmpty(const Value& serialized, Key begin, Key end, Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx) {
std::map<KeyRef, ValueRef> result;
Arena ar = loadSnapshotFile(serialized, KeyRangeRef(begin, end), result, cipherKeysCtx);
Standalone<StringRef> fileNameRef = StringRef();
Arena ar = loadSnapshotFile(fileNameRef, serialized, KeyRangeRef(begin, end), result, cipherKeysCtx);
ASSERT(result.empty());
}
// endIdx is exclusive
void checkSnapshotRead(const Standalone<GranuleSnapshot>& snapshot,
void checkSnapshotRead(const Standalone<StringRef>& fileNameRef,
const Standalone<GranuleSnapshot>& snapshot,
const Value& serialized,
int beginIdx,
int endIdx,
@ -1661,7 +1714,7 @@ void checkSnapshotRead(const Standalone<GranuleSnapshot>& snapshot,
Key endKey = endIdx == snapshot.size() ? keyAfter(snapshot.back().key) : snapshot[endIdx].key;
KeyRangeRef range(beginKey, endKey);
Arena ar = loadSnapshotFile(serialized, range, result, cipherKeysCtx);
Arena ar = loadSnapshotFile(fileNameRef, serialized, range, result, cipherKeysCtx);
if (result.size() != endIdx - beginIdx) {
fmt::print("Read {0} rows != {1}\n", result.size(), endIdx - beginIdx);
@ -1911,6 +1964,7 @@ TEST_CASE("/blobgranule/files/snapshotFormatUnitTest") {
int targetChunks = deterministicRandom()->randomExp(0, 9);
int targetDataBytes = deterministicRandom()->randomExp(0, 25);
int targetChunkSize = targetDataBytes / targetChunks;
Standalone<StringRef> fnameRef = StringRef(std::string("test"));
Standalone<GranuleSnapshot> data = genSnapshot(kvGen, targetDataBytes);
@ -1927,7 +1981,8 @@ TEST_CASE("/blobgranule/files/snapshotFormatUnitTest") {
fmt::print("Constructing snapshot with {0} rows, {1} chunks\n", data.size(), targetChunks);
Value serialized = serializeChunkedSnapshot(data, targetChunkSize, kvGen.compressFilter, kvGen.cipherKeys);
Value serialized =
serializeChunkedSnapshot(fnameRef, data, targetChunkSize, kvGen.compressFilter, kvGen.cipherKeys);
fmt::print("Snapshot serialized! {0} bytes\n", serialized.size());
@ -1938,7 +1993,7 @@ TEST_CASE("/blobgranule/files/snapshotFormatUnitTest") {
fmt::print("Initial read starting\n");
checkSnapshotRead(data, serialized, 0, data.size(), kvGen.cipherKeys);
checkSnapshotRead(fnameRef, data, serialized, 0, data.size(), kvGen.cipherKeys);
fmt::print("Initial read complete\n");
@ -1947,7 +2002,7 @@ TEST_CASE("/blobgranule/files/snapshotFormatUnitTest") {
int width = deterministicRandom()->randomExp(0, maxExp);
ASSERT(width <= data.size());
int start = deterministicRandom()->randomInt(0, data.size() - width);
checkSnapshotRead(data, serialized, start, start + width, kvGen.cipherKeys);
checkSnapshotRead(fnameRef, data, serialized, start, start + width, kvGen.cipherKeys);
}
fmt::print("Doing empty checks\n");
@ -1984,8 +2039,8 @@ void checkDeltaRead(const KeyValueGen& kvGen,
deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUniqueID(), readVersion, ".delta");
Standalone<BlobGranuleChunkRef> chunk;
// TODO need to add cipher keys meta
chunk.deltaFiles.emplace_back_deep(chunk.arena(), filename, 0, serialized->size(), serialized->size());
chunk.cipherKeysCtx = kvGen.cipherKeys;
chunk.deltaFiles.emplace_back_deep(
chunk.arena(), filename, 0, serialized->size(), serialized->size(), kvGen.cipherKeys);
chunk.keyRange = kvGen.allRange;
chunk.includedVersion = readVersion;
chunk.snapshotVersion = invalidVersion;
@ -2045,6 +2100,7 @@ Standalone<GranuleDeltas> genDeltas(KeyValueGen& kvGen, int targetBytes) {
TEST_CASE("/blobgranule/files/deltaFormatUnitTest") {
KeyValueGen kvGen;
Standalone<StringRef> fileNameRef = StringRef(std::string("test"));
int targetChunks = deterministicRandom()->randomExp(0, 8);
int targetDataBytes = deterministicRandom()->randomExp(0, 21);
@ -2054,8 +2110,8 @@ TEST_CASE("/blobgranule/files/deltaFormatUnitTest") {
Standalone<GranuleDeltas> data = genDeltas(kvGen, targetDataBytes);
fmt::print("Deltas ({0})\n", data.size());
Value serialized =
serializeChunkedDeltaFile(data, kvGen.allRange, targetChunkSize, kvGen.compressFilter, kvGen.cipherKeys);
Value serialized = serializeChunkedDeltaFile(
fileNameRef, data, kvGen.allRange, targetChunkSize, kvGen.compressFilter, kvGen.cipherKeys);
// check whole file
checkDeltaRead(kvGen, kvGen.allRange, 0, data.back().version, data, &serialized);
@ -2095,7 +2151,7 @@ void checkGranuleRead(const KeyValueGen& kvGen,
std::string snapshotFilename = randomBGFilename(
deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUniqueID(), 0, ".snapshot");
chunk.snapshotFile = BlobFilePointerRef(
chunk.arena(), snapshotFilename, 0, serializedSnapshot.size(), serializedSnapshot.size());
chunk.arena(), snapshotFilename, 0, serializedSnapshot.size(), serializedSnapshot.size(), kvGen.cipherKeys);
}
int deltaIdx = 0;
while (deltaIdx < serializedDeltas.size() && serializedDeltas[deltaIdx].first < beginVersion) {
@ -2106,7 +2162,7 @@ void checkGranuleRead(const KeyValueGen& kvGen,
std::string deltaFilename = randomBGFilename(
deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUniqueID(), readVersion, ".delta");
size_t fsize = serializedDeltas[deltaIdx].second.size();
chunk.deltaFiles.emplace_back_deep(chunk.arena(), deltaFilename, 0, fsize, fsize);
chunk.deltaFiles.emplace_back_deep(chunk.arena(), deltaFilename, 0, fsize, fsize, kvGen.cipherKeys);
deltaPtrsVector.push_back(serializedDeltas[deltaIdx].second);
if (serializedDeltas[deltaIdx].first >= readVersion) {
@ -2127,8 +2183,6 @@ void checkGranuleRead(const KeyValueGen& kvGen,
}
}
// TODO need to add cipher keys meta
chunk.cipherKeysCtx = kvGen.cipherKeys;
chunk.keyRange = kvGen.allRange;
chunk.includedVersion = readVersion;
chunk.snapshotVersion = (beginVersion == 0) ? 0 : invalidVersion;
@ -2150,6 +2204,7 @@ void checkGranuleRead(const KeyValueGen& kvGen,
TEST_CASE("/blobgranule/files/granuleReadUnitTest") {
KeyValueGen kvGen;
Standalone<StringRef> fileNameRef = StringRef(std::string("testSnap"));
int targetSnapshotChunks = deterministicRandom()->randomExp(0, 9);
int targetDeltaChunks = deterministicRandom()->randomExp(0, 8);
@ -2164,8 +2219,8 @@ TEST_CASE("/blobgranule/files/granuleReadUnitTest") {
Standalone<GranuleDeltas> deltaData = genDeltas(kvGen, targetDeltaBytes);
fmt::print("{0} snapshot rows and {1} deltas\n", snapshotData.size(), deltaData.size());
Value serializedSnapshot =
serializeChunkedSnapshot(snapshotData, targetSnapshotChunkSize, kvGen.compressFilter, kvGen.cipherKeys);
Value serializedSnapshot = serializeChunkedSnapshot(
fileNameRef, snapshotData, targetSnapshotChunkSize, kvGen.compressFilter, kvGen.cipherKeys);
// split deltas up across multiple files
int deltaFiles = std::min(deltaData.size(), deterministicRandom()->randomInt(1, 21));
@ -2184,8 +2239,13 @@ TEST_CASE("/blobgranule/files/granuleReadUnitTest") {
// if it's the last set of deltas, sometimes make them the memory deltas instead
inMemoryDeltas = fileData;
} else {
Value serializedDelta = serializeChunkedDeltaFile(
fileData, kvGen.allRange, targetDeltaChunkSize, kvGen.compressFilter, kvGen.cipherKeys);
Standalone<StringRef> fileNameRef = StringRef("delta" + std::to_string(i));
Value serializedDelta = serializeChunkedDeltaFile(fileNameRef,
fileData,
kvGen.allRange,
targetDeltaChunkSize,
kvGen.compressFilter,
kvGen.cipherKeys);
serializedDeltaFiles.emplace_back(fileData.back().version, serializedDelta);
}
}

View File

@ -143,16 +143,11 @@ struct BlobGranuleCipherKeysMetaRef {
StringRef ivRef;
BlobGranuleCipherKeysMetaRef() {}
BlobGranuleCipherKeysMetaRef(Arena& to,
const EncryptCipherDomainId tDomainId,
const EncryptCipherBaseKeyId tBaseCipherId,
const EncryptCipherRandomSalt tSalt,
const EncryptCipherDomainId hDomainId,
const EncryptCipherBaseKeyId hBaseCipherId,
const EncryptCipherRandomSalt hSalt,
const std::string& ivStr)
: textDomainId(tDomainId), textBaseCipherId(tBaseCipherId), textSalt(tSalt), headerDomainId(hDomainId),
headerBaseCipherId(hBaseCipherId), headerSalt(hSalt), ivRef(StringRef(to, ivStr)) {}
BlobGranuleCipherKeysMetaRef(Arena& to, BlobGranuleCipherKeysMeta cipherKeysMeta)
: textDomainId(cipherKeysMeta.textDomainId), textBaseCipherId(cipherKeysMeta.textBaseCipherId),
textSalt(cipherKeysMeta.textSalt), headerDomainId(cipherKeysMeta.headerDomainId),
headerBaseCipherId(cipherKeysMeta.headerBaseCipherId), headerSalt(cipherKeysMeta.headerSalt),
ivRef(StringRef(to, cipherKeysMeta.ivStr)) {}
template <class Ar>
void serialize(Ar& ar) {
@ -162,16 +157,31 @@ struct BlobGranuleCipherKeysMetaRef {
struct BlobFilePointerRef {
constexpr static FileIdentifier file_identifier = 5253554;
// Serializable fields
StringRef filename;
int64_t offset;
int64_t length;
int64_t fullFileLength;
Optional<BlobGranuleCipherKeysMetaRef> cipherKeysMetaRef;
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx;
// Non-serializable fields
Optional<BlobGranuleCipherKeysMetaRef>
cipherKeysMetaRef; // Placeholder to cache information sufficient to lookup encryption ciphers
BlobFilePointerRef() {}
BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length, int64_t fullFileLength)
: filename(to, filename), offset(offset), length(length), fullFileLength(fullFileLength) {}
BlobFilePointerRef(Arena& to,
const std::string& filename,
int64_t offset,
int64_t length,
int64_t fullFileLength,
Optional<BlobGranuleCipherKeysCtx> ciphKeysCtx)
: filename(to, filename), offset(offset), length(length), fullFileLength(fullFileLength),
cipherKeysCtx(ciphKeysCtx) {}
BlobFilePointerRef(Arena& to,
const std::string& filename,
int64_t offset,
@ -180,30 +190,23 @@ struct BlobFilePointerRef {
Optional<BlobGranuleCipherKeysMeta> ciphKeysMeta)
: filename(to, filename), offset(offset), length(length), fullFileLength(fullFileLength) {
if (ciphKeysMeta.present()) {
cipherKeysMetaRef = BlobGranuleCipherKeysMetaRef(to,
ciphKeysMeta.get().textDomainId,
ciphKeysMeta.get().textBaseCipherId,
ciphKeysMeta.get().textSalt,
ciphKeysMeta.get().headerDomainId,
ciphKeysMeta.get().headerBaseCipherId,
ciphKeysMeta.get().headerSalt,
ciphKeysMeta.get().ivStr);
cipherKeysMetaRef = BlobGranuleCipherKeysMetaRef(to, ciphKeysMeta.get());
}
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, filename, offset, length, fullFileLength, cipherKeysMetaRef);
serializer(ar, filename, offset, length, fullFileLength, cipherKeysCtx);
}
std::string toString() const {
std::stringstream ss;
ss << filename.toString() << ":" << offset << ":" << length << ":" << fullFileLength;
if (cipherKeysMetaRef.present()) {
ss << ":CipherKeysMeta:TextCipher:" << cipherKeysMetaRef.get().textDomainId << ":"
<< cipherKeysMetaRef.get().textBaseCipherId << ":" << cipherKeysMetaRef.get().textSalt
<< ":HeaderCipher:" << cipherKeysMetaRef.get().headerDomainId << ":"
<< cipherKeysMetaRef.get().headerBaseCipherId << ":" << cipherKeysMetaRef.get().headerSalt;
if (cipherKeysCtx.present()) {
ss << ":CipherKeysCtx:TextCipher:" << cipherKeysCtx.get().textCipherKey.encryptDomainId << ":"
<< cipherKeysCtx.get().textCipherKey.baseCipherId << ":" << cipherKeysCtx.get().textCipherKey.salt
<< ":HeaderCipher:" << cipherKeysCtx.get().headerCipherKey.encryptDomainId << ":"
<< cipherKeysCtx.get().headerCipherKey.baseCipherId << ":" << cipherKeysCtx.get().headerCipherKey.salt;
}
return std::move(ss).str();
}
@ -224,19 +227,10 @@ struct BlobGranuleChunkRef {
VectorRef<BlobFilePointerRef> deltaFiles;
GranuleDeltas newDeltas;
Optional<KeyRef> tenantPrefix;
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar,
keyRange,
includedVersion,
snapshotVersion,
snapshotFile,
deltaFiles,
newDeltas,
tenantPrefix,
cipherKeysCtx);
serializer(ar, keyRange, includedVersion, snapshotVersion, snapshotFile, deltaFiles, newDeltas, tenantPrefix);
}
};

View File

@ -26,12 +26,14 @@
#include "fdbclient/BlobGranuleCommon.h"
#include "flow/CompressionUtils.h"
Value serializeChunkedSnapshot(Standalone<GranuleSnapshot> snapshot,
Value serializeChunkedSnapshot(const Standalone<StringRef>& fileNameRef,
Standalone<GranuleSnapshot> snapshot,
int chunkSize,
Optional<CompressionFilter> compressFilter,
Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx = {});
Value serializeChunkedDeltaFile(Standalone<GranuleDeltas> deltas,
Value serializeChunkedDeltaFile(const Standalone<StringRef>& fileNameRef,
Standalone<GranuleDeltas> deltas,
const KeyRangeRef& fileRange,
int chunkSize,
Optional<CompressionFilter> compressFilter,

View File

@ -128,6 +128,10 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Database cx, UID granuleID) {
// key range, the granule may have a snapshot file at version X, where beginVersion < X <= readVersion. In this case, if
// the number of bytes in delta files between beginVersion and X is larger than the snapshot file at version X, it is
// strictly more efficient (in terms of files and bytes read) to just use the snapshot file at version X instead.
//
// To assist BlobGranule file (snapshot and/or delta) file encryption, the routine while populating snapshot and/or
// delta files, constructs BlobFilePointerRef->cipherKeysMeta field. Approach avoids this method to be defined as an
// ACTOR, as fetching desired EncryptionKey may potentially involve reaching out to EncryptKeyProxy or external KMS.
void GranuleFiles::getFiles(Version beginVersion,
Version readVersion,
bool canCollapse,
@ -195,8 +199,12 @@ void GranuleFiles::getFiles(Version beginVersion,
}
while (deltaF != deltaFiles.end() && deltaF->version < readVersion) {
chunk.deltaFiles.emplace_back_deep(
replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength);
chunk.deltaFiles.emplace_back_deep(replyArena,
deltaF->filename,
deltaF->offset,
deltaF->length,
deltaF->fullFileLength,
deltaF->cipherKeysMeta);
deltaBytesCounter += deltaF->length;
ASSERT(lastIncluded < deltaF->version);
lastIncluded = deltaF->version;
@ -204,8 +212,12 @@ void GranuleFiles::getFiles(Version beginVersion,
}
// include last delta file that passes readVersion, if it exists
if (deltaF != deltaFiles.end() && lastIncluded < readVersion) {
chunk.deltaFiles.emplace_back_deep(
replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength);
chunk.deltaFiles.emplace_back_deep(replyArena,
deltaF->filename,
deltaF->offset,
deltaF->length,
deltaF->fullFileLength,
deltaF->cipherKeysMeta);
deltaBytesCounter += deltaF->length;
lastIncluded = deltaF->version;
}

View File

@ -352,29 +352,27 @@ ACTOR Future<BlobGranuleCipherKey> lookupCipherKey(Reference<BlobWorkerData> bwD
return BlobGranuleCipherKey::fromBlobCipherKey(cipherKeyMapItr->second, *arena);
}
ACTOR Future<BlobGranuleCipherKeysCtx> getGranuleCipherKeys(Reference<BlobWorkerData> bwData,
BlobGranuleCipherKeysMetaRef cipherKeysMetaRef,
Arena* arena) {
ACTOR Future<BlobGranuleCipherKeysCtx> getGranuleCipherKeysImpl(Reference<BlobWorkerData> bwData,
BlobCipherDetails textCipherDetails,
BlobCipherDetails headerCipherDetails,
StringRef ivRef,
Arena* arena) {
state BlobGranuleCipherKeysCtx cipherKeysCtx;
// Fetch 'textCipher' key
state BlobCipherDetails textCipherDetails(
cipherKeysMetaRef.textDomainId, cipherKeysMetaRef.textBaseCipherId, cipherKeysMetaRef.textSalt);
BlobGranuleCipherKey textCipherKey = wait(lookupCipherKey(bwData, textCipherDetails, arena));
cipherKeysCtx.textCipherKey = textCipherKey;
// Fetch 'headerCipher' key
state BlobCipherDetails headerCipherDetails(
cipherKeysMetaRef.headerDomainId, cipherKeysMetaRef.headerBaseCipherId, cipherKeysMetaRef.headerSalt);
BlobGranuleCipherKey headerCipherKey = wait(lookupCipherKey(bwData, headerCipherDetails, arena));
cipherKeysCtx.headerCipherKey = headerCipherKey;
// Populate 'Intialization Vector'
ASSERT_EQ(cipherKeysMetaRef.ivRef.size(), AES_256_IV_LENGTH);
cipherKeysCtx.ivRef = StringRef(*arena, cipherKeysMetaRef.ivRef);
ASSERT_EQ(ivRef.size(), AES_256_IV_LENGTH);
cipherKeysCtx.ivRef = StringRef(*arena, ivRef);
if (BG_ENCRYPT_COMPRESS_DEBUG) {
TraceEvent("GetGranuleCipherKey")
TraceEvent(SevDebug, "GetGranuleCipherKey")
.detail("TextDomainId", cipherKeysCtx.textCipherKey.encryptDomainId)
.detail("TextBaseCipherId", cipherKeysCtx.textCipherKey.baseCipherId)
.detail("TextSalt", cipherKeysCtx.textCipherKey.salt)
@ -387,6 +385,32 @@ ACTOR Future<BlobGranuleCipherKeysCtx> getGranuleCipherKeys(Reference<BlobWorker
return cipherKeysCtx;
}
Future<BlobGranuleCipherKeysCtx> getGranuleCipherKeysFromKeysMeta(Reference<BlobWorkerData> bwData,
BlobGranuleCipherKeysMeta cipherKeysMeta,
Arena* arena) {
BlobCipherDetails textCipherDetails(
cipherKeysMeta.textDomainId, cipherKeysMeta.textBaseCipherId, cipherKeysMeta.textSalt);
BlobCipherDetails headerCipherDetails(
cipherKeysMeta.headerDomainId, cipherKeysMeta.headerBaseCipherId, cipherKeysMeta.headerSalt);
StringRef ivRef = StringRef(*arena, cipherKeysMeta.ivStr);
return getGranuleCipherKeysImpl(bwData, textCipherDetails, headerCipherDetails, ivRef, arena);
}
Future<BlobGranuleCipherKeysCtx> getGranuleCipherKeysFromKeysMetaRef(Reference<BlobWorkerData> bwData,
BlobGranuleCipherKeysMetaRef cipherKeysMetaRef,
Arena* arena) {
BlobCipherDetails textCipherDetails(
cipherKeysMetaRef.textDomainId, cipherKeysMetaRef.textBaseCipherId, cipherKeysMetaRef.textSalt);
BlobCipherDetails headerCipherDetails(
cipherKeysMetaRef.headerDomainId, cipherKeysMetaRef.headerBaseCipherId, cipherKeysMetaRef.headerSalt);
return getGranuleCipherKeysImpl(bwData, textCipherDetails, headerCipherDetails, cipherKeysMetaRef.ivRef, arena);
}
ACTOR Future<Void> readAndCheckGranuleLock(Reference<ReadYourWritesTransaction> tr,
KeyRange granuleRange,
int64_t epoch,
@ -605,17 +629,20 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
state Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx;
state Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
state Arena arena;
// TODO support encryption, figure out proper state stuff
/*if (isBlobFileEncryptionSupported()) {
BlobGranuleCipherKeysCtx ciphKeysCtx = wait(getLatestGranuleCipherKeys(bwData, keyRange, &arena));
cipherKeysCtx = ciphKeysCtx;
cipherKeysMeta = BlobGranuleCipherKeysCtx::toCipherKeysMeta(cipherKeysCtx.get());
}*/
Optional<CompressionFilter> compressFilter = getBlobFileCompressFilter();
if (isBlobFileEncryptionSupported()) {
BlobGranuleCipherKeysCtx ciphKeysCtx = wait(getLatestGranuleCipherKeys(bwData, keyRange, &arena));
cipherKeysCtx = std::move(ciphKeysCtx);
cipherKeysMeta = BlobGranuleCipherKeysCtx::toCipherKeysMeta(cipherKeysCtx.get());
}
state Value serialized = serializeChunkedDeltaFile(
deltasToWrite, keyRange, SERVER_KNOBS->BG_DELTA_FILE_TARGET_CHUNK_BYTES, compressFilter, cipherKeysCtx);
state Optional<CompressionFilter> compressFilter = getBlobFileCompressFilter();
state Value serialized = serializeChunkedDeltaFile(StringRef(fileName),
deltasToWrite,
keyRange,
SERVER_KNOBS->BG_DELTA_FILE_TARGET_CHUNK_BYTES,
compressFilter,
cipherKeysCtx);
state size_t serializedSize = serialized.size();
// Free up deltasToWrite here to reduce memory
@ -680,6 +707,14 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
if (BUGGIFY_WITH_PROB(0.01)) {
wait(delay(deterministicRandom()->random01()));
}
if (BW_DEBUG) {
TraceEvent(SevDebug, "DeltaFileWritten")
.detail("FileName", fname)
.detail("Encrypted", cipherKeysCtx.present())
.detail("Compressed", compressFilter.present());
}
// FIXME: change when we implement multiplexing
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize, cipherKeysMeta);
} catch (Error& e) {
@ -759,15 +794,19 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
state Optional<BlobGranuleCipherKeysCtx> cipherKeysCtx;
state Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
state Arena arena;
if (isBlobFileEncryptionSupported()) {
BlobGranuleCipherKeysCtx ciphKeysCtx = wait(getLatestGranuleCipherKeys(bwData, keyRange, &arena));
cipherKeysCtx = ciphKeysCtx;
cipherKeysCtx = std::move(ciphKeysCtx);
cipherKeysMeta = BlobGranuleCipherKeysCtx::toCipherKeysMeta(cipherKeysCtx.get());
}
Optional<CompressionFilter> compressFilter = getBlobFileCompressFilter();
state Value serialized = serializeChunkedSnapshot(
snapshot, SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES, compressFilter, cipherKeysCtx);
state Optional<CompressionFilter> compressFilter = getBlobFileCompressFilter();
state Value serialized = serializeChunkedSnapshot(StringRef(fileName),
snapshot,
SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES,
compressFilter,
cipherKeysCtx);
state size_t serializedSize = serialized.size();
// free snapshot to reduce memory
@ -848,6 +887,13 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
wait(delay(deterministicRandom()->random01()));
}
if (BW_DEBUG) {
TraceEvent(SevDebug, "SnapshotFileWritten")
.detail("FileName", fileName)
.detail("Encrypted", cipherKeysCtx.present())
.detail("Compressed", compressFilter.present());
}
// FIXME: change when we implement multiplexing
return BlobFileIndex(version, fname, 0, serializedSize, serializedSize, cipherKeysMeta);
}
@ -975,33 +1021,49 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
}
ASSERT(snapshotVersion < version);
state Optional<BlobGranuleCipherKeysCtx> snapCipherKeysCtx;
if (snapshotF.cipherKeysMeta.present()) {
ASSERT(isBlobFileEncryptionSupported());
BlobGranuleCipherKeysCtx keysCtx =
wait(getGranuleCipherKeysFromKeysMeta(bwData, snapshotF.cipherKeysMeta.get(), &filenameArena));
snapCipherKeysCtx = std::move(keysCtx);
}
chunk.snapshotFile = BlobFilePointerRef(filenameArena,
snapshotF.filename,
snapshotF.offset,
snapshotF.length,
snapshotF.fullFileLength,
snapshotF.cipherKeysMeta);
// TODO: optimization - batch 'encryption-key' lookup given the GranuleFile set is known
// FIXME: get cipher keys for delta as well!
if (chunk.snapshotFile.get().cipherKeysMetaRef.present()) {
ASSERT(isBlobFileEncryptionSupported());
BlobGranuleCipherKeysCtx cipherKeysCtx =
wait(getGranuleCipherKeys(bwData, chunk.snapshotFile.get().cipherKeysMetaRef.get(), &filenameArena));
chunk.cipherKeysCtx = cipherKeysCtx;
}
snapCipherKeysCtx);
compactBytesRead += snapshotF.length;
int deltaIdx = files.deltaFiles.size() - 1;
state int deltaIdx = files.deltaFiles.size() - 1;
while (deltaIdx >= 0 && files.deltaFiles[deltaIdx].version > snapshotVersion) {
deltaIdx--;
}
deltaIdx++;
Version lastDeltaVersion = snapshotVersion;
state Version lastDeltaVersion = snapshotVersion;
state BlobFileIndex deltaF;
while (deltaIdx < files.deltaFiles.size() && lastDeltaVersion < version) {
BlobFileIndex deltaF = files.deltaFiles[deltaIdx];
chunk.deltaFiles.emplace_back_deep(
filenameArena, deltaF.filename, deltaF.offset, deltaF.length, deltaF.fullFileLength);
state Optional<BlobGranuleCipherKeysCtx> deltaCipherKeysCtx;
deltaF = files.deltaFiles[deltaIdx];
if (deltaF.cipherKeysMeta.present()) {
ASSERT(isBlobFileEncryptionSupported());
BlobGranuleCipherKeysCtx keysCtx =
wait(getGranuleCipherKeysFromKeysMeta(bwData, deltaF.cipherKeysMeta.get(), &filenameArena));
deltaCipherKeysCtx = std::move(keysCtx);
}
chunk.deltaFiles.emplace_back_deep(filenameArena,
deltaF.filename,
deltaF.offset,
deltaF.length,
deltaF.fullFileLength,
deltaCipherKeysCtx);
compactBytesRead += deltaF.length;
lastDeltaVersion = files.deltaFiles[deltaIdx].version;
deltaIdx++;
@ -3193,12 +3255,44 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
didCollapse = true;
}
// TODO: optimization - batch 'encryption-key' lookup given the GranuleFile set is known
state Future<BlobGranuleCipherKeysCtx> cipherKeysCtx;
if (chunk.snapshotFile.present() && chunk.snapshotFile.get().cipherKeysMetaRef.present()) {
ASSERT(isBlobFileEncryptionSupported());
cipherKeysCtx =
getGranuleCipherKeys(bwData, chunk.snapshotFile.get().cipherKeysMetaRef.get(), &rep.arena);
// Invoke calls to populate 'EncryptionKeysCtx' for snapshot and/or deltaFiles asynchronously
state Optional<Future<BlobGranuleCipherKeysCtx>> snapCipherKeysCtx;
if (chunk.snapshotFile.present()) {
const bool encrypted = chunk.snapshotFile.get().cipherKeysMetaRef.present();
if (BW_DEBUG) {
TraceEvent("DoBlobGranuleFileRequestDelta_KeysCtxPrepare")
.detail("FileName", chunk.snapshotFile.get().filename.toString())
.detail("Encrypted", encrypted);
}
if (encrypted) {
ASSERT(isBlobFileEncryptionSupported());
ASSERT(!chunk.snapshotFile.get().cipherKeysCtx.present());
snapCipherKeysCtx = getGranuleCipherKeysFromKeysMetaRef(
bwData, chunk.snapshotFile.get().cipherKeysMetaRef.get(), &rep.arena);
}
}
state std::unordered_map<int, Future<BlobGranuleCipherKeysCtx>> deltaCipherKeysCtxs;
for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) {
const bool encrypted = chunk.deltaFiles[deltaIdx].cipherKeysMetaRef.present();
if (BW_DEBUG) {
TraceEvent("DoBlobGranuleFileRequestDelta_KeysCtxPrepare")
.detail("FileName", chunk.deltaFiles[deltaIdx].filename.toString())
.detail("Encrypted", encrypted);
}
if (encrypted) {
ASSERT(isBlobFileEncryptionSupported());
ASSERT(!chunk.deltaFiles[deltaIdx].cipherKeysCtx.present());
deltaCipherKeysCtxs.emplace(
deltaIdx,
getGranuleCipherKeysFromKeysMetaRef(
bwData, chunk.deltaFiles[deltaIdx].cipherKeysMetaRef.get(), &rep.arena));
}
}
// FIXME: get cipher keys for delta files too!
@ -3245,9 +3339,37 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
}
}
if (chunk.snapshotFile.present() && chunk.snapshotFile.get().cipherKeysMetaRef.present()) {
BlobGranuleCipherKeysCtx ctx = wait(cipherKeysCtx);
chunk.cipherKeysCtx = std::move(ctx);
// Update EncryptionKeysCtx information for the chunk->snapshotFile
if (chunk.snapshotFile.present() && snapCipherKeysCtx.present()) {
ASSERT(chunk.snapshotFile.get().cipherKeysMetaRef.present());
BlobGranuleCipherKeysCtx keysCtx = wait(snapCipherKeysCtx.get());
chunk.snapshotFile.get().cipherKeysCtx = std::move(keysCtx);
// reclaim memory from non-serializable field
chunk.snapshotFile.get().cipherKeysMetaRef.reset();
if (BW_DEBUG) {
TraceEvent("DoBlobGranuleFileRequestSnap_KeysCtxDone")
.detail("FileName", chunk.snapshotFile.get().filename.toString());
}
}
// Update EncryptionKeysCtx information for the chunk->deltaFiles
if (!deltaCipherKeysCtxs.empty()) {
ASSERT(!chunk.deltaFiles.empty());
state std::unordered_map<int, Future<BlobGranuleCipherKeysCtx>>::const_iterator itr;
for (itr = deltaCipherKeysCtxs.begin(); itr != deltaCipherKeysCtxs.end(); itr++) {
BlobGranuleCipherKeysCtx keysCtx = wait(itr->second);
chunk.deltaFiles[itr->first].cipherKeysCtx = std::move(keysCtx);
// reclaim memory from non-serializable field
chunk.deltaFiles[itr->first].cipherKeysMetaRef.reset();
if (BW_DEBUG) {
TraceEvent("DoBlobGranuleFileRequestDelta_KeysCtxDone")
.detail("FileName", chunk.deltaFiles[itr->first].filename.toString());
}
}
}
rep.chunks.push_back(rep.arena, chunk);

View File

@ -31,10 +31,13 @@
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tenant.h"
#include "fdbserver/ServerDBInfo.h"
#include "flow/actorcompiler.h" // has to be last include
#include "flow/flow.h"
#include "flow/actorcompiler.h" // has to be last include
struct GranuleHistory {
KeyRange range;
Version version;