From e7fa8e9f6fa862747b163359f0f994f603c88a65 Mon Sep 17 00:00:00 2001
From: Hao Fu <77984096+hfu94@users.noreply.github.com>
Date: Thu, 2 Jun 2022 17:44:10 -0700
Subject: [PATCH] Add versionstamp support in tuple (#7293)

Tuple in C++ needs to support Versionstamp.
---
 bindings/c/test/unit/unit_tests.cpp   | 71 ++++++++++++++++++++++++++-
 bindings/flow/Tuple.cpp               | 32 ++++++++++++
 bindings/flow/Tuple.h                 |  6 ++-
 bindings/flow/tester/Tester.actor.cpp | 10 ++++
 fdbclient/CMakeLists.txt              |  2 +
 fdbclient/GlobalConfig.actor.cpp      |  2 +
 fdbclient/Tuple.cpp                   | 27 ++++++++++
 fdbclient/Tuple.h                     |  5 +-
 fdbclient/Versionstamp.cpp            | 44 +++++++++++++++++
 fdbclient/Versionstamp.h              | 44 +++++++++++++++++
 flow/error_definitions.h              |  1 +
 11 files changed, 241 insertions(+), 3 deletions(-)
 create mode 100644 fdbclient/Versionstamp.cpp
 create mode 100644 fdbclient/Versionstamp.h

diff --git a/bindings/c/test/unit/unit_tests.cpp b/bindings/c/test/unit/unit_tests.cpp
index 5ee8d9e9b7..113759d23a 100644
--- a/bindings/c/test/unit/unit_tests.cpp
+++ b/bindings/c/test/unit/unit_tests.cpp
@@ -992,6 +992,73 @@ GetMappedRangeResult getMappedIndexEntries(int beginId,
 	return getMappedIndexEntries(beginId, endId, tr, mapper, matchIndex);
 }
 
+TEST_CASE("versionstamp_unit_test") {
+	// a random 12 bytes long StringRef as a versionstamp
+	StringRef str = "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12"_sr;
+	Versionstamp vs(str), vs2(str);
+	ASSERT(vs == vs2);
+	ASSERT(vs.begin() != vs2.begin());
+
+	int64_t version = vs.getVersion();
+	int64_t version2 = vs2.getVersion();
+	int64_t versionExpected = ((int64_t)0x01 << 56) + ((int64_t)0x02 << 48) + ((int64_t)0x03 << 40) +
+	                          ((int64_t)0x04 << 32) + (0x05 << 24) + (0x06 << 16) + (0x07 << 8) + 0x08;
+	ASSERT(version == versionExpected);
+	ASSERT(version2 == versionExpected);
+
+	int16_t batch = vs.getBatchNumber();
+	int16_t batch2 = vs2.getBatchNumber();
+	int16_t batchExpected = (0x09 << 8) + 0x10;
+	ASSERT(batch == batchExpected);
+	ASSERT(batch2 == batchExpected);
+
+	int16_t user = vs.getUserVersion();
+	int16_t user2 = vs2.getUserVersion();
+	int16_t userExpected = (0x11 << 8) + 0x12;
+	ASSERT(user == userExpected);
+	ASSERT(user2 == userExpected);
+
+	ASSERT(vs.size() == VERSIONSTAMP_TUPLE_SIZE);
+	ASSERT(vs2.size() == VERSIONSTAMP_TUPLE_SIZE);
+}
+
+TEST_CASE("tuple_support_versionstamp") {
+	// a random 12 bytes long StringRef as a versionstamp
+	StringRef str = "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12"_sr;
+	Versionstamp vs(str);
+	const Tuple t = Tuple().append(prefix).append(RECORD).appendVersionstamp(vs).append("{K[3]}"_sr).append("{...}"_sr);
+	ASSERT(t.getVersionstamp(2) == vs);
+
+	// verify the round-way pack-unpack path for a Tuple containing a versionstamp
+	StringRef result1 = t.pack();
+	Tuple t2 = Tuple::unpack(result1);
+	StringRef result2 = t2.pack();
+	ASSERT(t2.getVersionstamp(2) == vs);
+	ASSERT(result1.toString() == result2.toString());
+}
+
+TEST_CASE("tuple_fail_to_append_truncated_versionstamp") {
+	// a truncated 11 bytes long StringRef as a versionstamp
+	StringRef str = "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11"_sr;
+	try {
+		Versionstamp truncatedVersionstamp(str);
+	} catch (Error& e) {
+		return;
+	}
+	UNREACHABLE();
+}
+
+TEST_CASE("tuple_fail_to_append_longer_versionstamp") {
+	// a longer than expected 13 bytes long StringRef as a versionstamp
+	StringRef str = "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11"_sr;
+	try {
+		Versionstamp longerVersionstamp(str);
+	} catch (Error& e) {
+		return;
+	}
+	UNREACHABLE();
+}
+
 TEST_CASE("fdb_transaction_get_mapped_range") {
 	const int TOTAL_RECORDS = 20;
 	fillInRecords(TOTAL_RECORDS);
@@ -1153,11 +1220,13 @@ void assertNotTuple(std::string str) {
 TEST_CASE("fdb_transaction_get_mapped_range_fail_on_mapper_not_tuple") {
 	// A string that cannot be parsed as tuple.
 	// "\x15:\x152\x15E\x15\x09\x15\x02\x02MySimpleRecord$repeater-version\x00\x15\x013\x00\x00\x00\x00\x1aU\x90\xba\x00\x00\x00\x02\x15\x04"
+	// should fail at \x35
+
 	std::string mapper = {
 		'\x15', ':',    '\x15', '2', '\x15', 'E',    '\x15', '\t',   '\x15', '\x02', '\x02', 'M',
 		'y',    'S',    'i',    'm', 'p',    'l',    'e',    'R',    'e',    'c',    'o',    'r',
 		'd',    '$',    'r',    'e', 'p',    'e',    'a',    't',    'e',    'r',    '-',    'v',
-		'e',    'r',    's',    'i', 'o',    'n',    '\x00', '\x15', '\x01', '3',    '\x00', '\x00',
+		'e',    'r',    's',    'i', 'o',    'n',    '\x00', '\x15', '\x01', '\x35', '\x00', '\x00',
 		'\x00', '\x00', '\x1a', 'U', '\x90', '\xba', '\x00', '\x00', '\x00', '\x02', '\x15', '\x04'
 	};
 	assertNotTuple(mapper);
diff --git a/bindings/flow/Tuple.cpp b/bindings/flow/Tuple.cpp
index fde784f418..3e4e4a4a98 100644
--- a/bindings/flow/Tuple.cpp
+++ b/bindings/flow/Tuple.cpp
@@ -26,6 +26,7 @@ static_assert(std::numeric_limits<float>::is_iec559);
 static_assert(std::numeric_limits<double>::is_iec559);
 
 const size_t Uuid::SIZE = 16;
+const size_t VERSIONSTAMP_TUPLE_SIZE = 12;
 
 const uint8_t Tuple::NULL_CODE = 0x00;
 const uint8_t Tuple::BYTES_CODE = 0x01;
@@ -39,6 +40,7 @@ const uint8_t Tuple::DOUBLE_CODE = 0x21;
 const uint8_t Tuple::FALSE_CODE = 0x26;
 const uint8_t Tuple::TRUE_CODE = 0x27;
 const uint8_t Tuple::UUID_CODE = 0x30;
+const uint8_t Tuple::VERSIONSTAMP_96_CODE = 0x33;
 
 static float bigEndianFloat(float orig) {
 	int32_t big = *(int32_t*)&orig;
@@ -117,6 +119,8 @@ Tuple::Tuple(StringRef const& str) {
 		} else if (data[i] == NESTED_CODE) {
 			i += 1;
 			depth += 1;
+		} else if (data[i] == VERSIONSTAMP_96_CODE) {
+			i += 1 + VERSIONSTAMP_TUPLE_SIZE;
 		} else {
 			throw invalid_tuple_data_type();
 		}
@@ -170,6 +174,15 @@ Tuple& Tuple::append(StringRef const& str, bool utf8) {
 	return *this;
 }
 
+Tuple& Tuple::appendVersionstamp(Versionstamp const& vs) {
+	offsets.push_back(data.size());
+
+	data.push_back(data.arena(), VERSIONSTAMP_96_CODE);
+	data.append(data.arena(), vs.begin(), vs.size());
+
+	return *this;
+}
+
 Tuple& Tuple::append(int32_t value) {
 	return append((int64_t)value);
 }
@@ -293,6 +306,8 @@ Tuple::ElementType Tuple::getType(size_t index) const {
 		return ElementType::BOOL;
 	} else if (code == UUID_CODE) {
 		return ElementType::UUID;
+	} else if (code == VERSIONSTAMP_96_CODE) {
+		return ElementType::VERSIONSTAMP;
 	} else {
 		throw invalid_tuple_data_type();
 	}
@@ -425,6 +440,19 @@ double Tuple::getDouble(size_t index) const {
 	return bigEndianDouble(swap);
 }
 
+Versionstamp Tuple::getVersionstamp(size_t index) const {
+	if (index >= offsets.size()) {
+		throw invalid_tuple_index();
+	}
+	ASSERT_LT(offsets[index], data.size());
+	uint8_t code = data[offsets[index]];
+	if (code != VERSIONSTAMP_96_CODE) {
+		throw invalid_tuple_data_type();
+	}
+	size_t versionstampLength = VERSIONSTAMP_TUPLE_SIZE;
+	return Versionstamp(StringRef(data.begin() + offsets[index] + 1, versionstampLength));
+}
+
 Uuid Tuple::getUuid(size_t index) const {
 	if (index >= offsets.size()) {
 		throw invalid_tuple_index();
@@ -507,6 +535,10 @@ Tuple Tuple::getNested(size_t index) const {
 			ASSERT_LE(i + 1 + sizeof(double), next_offset - 1);
 			dest.append(dest.arena(), data.begin() + i + 1, sizeof(double));
 			i += sizeof(double) + 1;
+		} else if (code == VERSIONSTAMP_96_CODE) {
+			ASSERT_LE(i + 1 + VERSIONSTAMP_TUPLE_SIZE, next_offset - 1);
+			dest.append(dest.arena(), data.begin() + i + 1, VERSIONSTAMP_TUPLE_SIZE);
+			i += VERSIONSTAMP_TUPLE_SIZE + 1;
 		} else if (code == NESTED_CODE) {
 			i += 1;
 			depth += 1;
diff --git a/bindings/flow/Tuple.h b/bindings/flow/Tuple.h
index 03c08a6a02..4d903e5bb8 100644
--- a/bindings/flow/Tuple.h
+++ b/bindings/flow/Tuple.h
@@ -24,6 +24,7 @@
 #pragma once
 
 #include "bindings/flow/fdb_flow.h"
+#include "fdbclient/Versionstamp.h"
 
 namespace FDB {
 struct Uuid {
@@ -60,6 +61,7 @@ struct Tuple {
 	Tuple& append(Uuid);
 	Tuple& appendNested(Tuple const&);
 	Tuple& appendNull();
+	Tuple& appendVersionstamp(Versionstamp const&);
 
 	StringRef pack() const { return StringRef(data.begin(), data.size()); }
 
@@ -68,13 +70,14 @@ struct Tuple {
 		return append(t);
 	}
 
-	enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE, UUID, NESTED };
+	enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE, UUID, NESTED, VERSIONSTAMP };
 
 	// this is number of elements, not length of data
 	size_t size() const { return offsets.size(); }
 
 	ElementType getType(size_t index) const;
 	Standalone<StringRef> getString(size_t index) const;
+	Versionstamp getVersionstamp(size_t index) const;
 	int64_t getInt(size_t index) const;
 	bool getBool(size_t index) const;
 	float getFloat(size_t index) const;
@@ -107,6 +110,7 @@ private:
 	static const uint8_t FALSE_CODE;
 	static const uint8_t TRUE_CODE;
 	static const uint8_t UUID_CODE;
+	static const uint8_t VERSIONSTAMP_96_CODE;
 
 	Tuple(const StringRef& data);
 	Tuple(Standalone<VectorRef<uint8_t>> data, std::vector<size_t> offsets);
diff --git a/bindings/flow/tester/Tester.actor.cpp b/bindings/flow/tester/Tester.actor.cpp
index 066ae9125c..6214b6e508 100644
--- a/bindings/flow/tester/Tester.actor.cpp
+++ b/bindings/flow/tester/Tester.actor.cpp
@@ -112,6 +112,12 @@ std::string tupleToString(Tuple const& tuple) {
 			str += format("%016llx%016llx", *(uint64_t*)u.getData().begin(), *(uint64_t*)(u.getData().begin() + 8));
 		} else if (type == Tuple::NESTED) {
 			str += tupleToString(tuple.getNested(i));
+		} else if (type == Tuple::VERSIONSTAMP) {
+			Versionstamp versionstamp = tuple.getVersionstamp(i);
+			str += format("Transaction Version: '%ld', BatchNumber: '%hd', UserVersion : '%hd'",
+			              versionstamp.getVersion(),
+			              versionstamp.getBatchNumber(),
+			              versionstamp.getUserVersion());
 		} else {
 			ASSERT(false);
 		}
@@ -1147,6 +1153,8 @@ struct TuplePackFunc : InstructionFunc {
 					tuple << itemTuple.getUuid(0);
 				} else if (type == Tuple::NESTED) {
 					tuple.appendNested(itemTuple.getNested(0));
+				} else if (type == Tuple::VERSIONSTAMP) {
+					tuple.appendVersionstamp(Versionstamp(itemTuple.getString(0)));
 				} else {
 					ASSERT(false);
 				}
@@ -1226,6 +1234,8 @@ struct TupleRangeFunc : InstructionFunc {
 					tuple << itemTuple.getUuid(0);
 				} else if (type == Tuple::NESTED) {
 					tuple.appendNested(itemTuple.getNested(0));
+				} else if (type == Tuple::VERSIONSTAMP) {
+					tuple.appendVersionstamp(Versionstamp(itemTuple.getString(0)));
 				} else {
 					ASSERT(false);
 				}
diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt
index fc2840caa5..19109b8dc9 100644
--- a/fdbclient/CMakeLists.txt
+++ b/fdbclient/CMakeLists.txt
@@ -149,6 +149,8 @@ set(FDBCLIENT_SRCS
   VersionedMap.actor.h
   VersionedMap.h
   VersionedMap.cpp
+  Versionstamp.cpp
+  Versionstamp.h
   VersionVector.h
   VersionVector.cpp
   WellKnownEndpoints.h
diff --git a/fdbclient/GlobalConfig.actor.cpp b/fdbclient/GlobalConfig.actor.cpp
index a40073a87f..c35919c386 100644
--- a/fdbclient/GlobalConfig.actor.cpp
+++ b/fdbclient/GlobalConfig.actor.cpp
@@ -119,6 +119,8 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
 			any = t.getFloat(0);
 		} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
 			any = t.getDouble(0);
+		} else if (t.getType(0) == Tuple::ElementType::VERSIONSTAMP) {
+			any = t.getVersionstamp(0);
 		} else {
 			ASSERT(false);
 		}
diff --git a/fdbclient/Tuple.cpp b/fdbclient/Tuple.cpp
index 77716b94ee..729c910252 100644
--- a/fdbclient/Tuple.cpp
+++ b/fdbclient/Tuple.cpp
@@ -20,6 +20,8 @@
 
 #include "fdbclient/Tuple.h"
 
+const uint8_t VERSIONSTAMP_96_CODE = 0x33;
+
 // TODO: Many functions copied from bindings/flow/Tuple.cpp. Merge at some point.
 static float bigEndianFloat(float orig) {
 	int32_t big = *(int32_t*)&orig;
@@ -75,6 +77,8 @@ Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
 			i += 1;
 		} else if (data[i] == '\x00') {
 			i += 1;
+		} else if (data[i] == VERSIONSTAMP_96_CODE) {
+			i += VERSIONSTAMP_TUPLE_SIZE + 1;
 		} else {
 			throw invalid_tuple_data_type();
 		}
@@ -99,6 +103,15 @@ Tuple& Tuple::append(Tuple const& tuple) {
 	return *this;
 }
 
+Tuple& Tuple::appendVersionstamp(Versionstamp const& vs) {
+	offsets.push_back(data.size());
+
+	data.push_back(data.arena(), VERSIONSTAMP_96_CODE);
+	data.append(data.arena(), vs.begin(), vs.size());
+
+	return *this;
+}
+
 Tuple& Tuple::append(StringRef const& str, bool utf8) {
 	offsets.push_back(data.size());
 
@@ -213,6 +226,8 @@ Tuple::ElementType Tuple::getType(size_t index) const {
 		return ElementType::DOUBLE;
 	} else if (code == 0x26 || code == 0x27) {
 		return ElementType::BOOL;
+	} else if (code == VERSIONSTAMP_96_CODE) {
+		return ElementType::VERSIONSTAMP;
 	} else {
 		throw invalid_tuple_data_type();
 	}
@@ -361,6 +376,18 @@ double Tuple::getDouble(size_t index) const {
 	return bigEndianDouble(swap);
 }
 
+Versionstamp Tuple::getVersionstamp(size_t index) const {
+	if (index >= offsets.size()) {
+		throw invalid_tuple_index();
+	}
+	ASSERT_LT(offsets[index], data.size());
+	uint8_t code = data[offsets[index]];
+	if (code != VERSIONSTAMP_96_CODE) {
+		throw invalid_tuple_data_type();
+	}
+	return Versionstamp(StringRef(data.begin() + offsets[index] + 1, VERSIONSTAMP_TUPLE_SIZE));
+}
+
 KeyRange Tuple::range(Tuple const& tuple) const {
 	VectorRef<uint8_t> begin;
 	VectorRef<uint8_t> end;
diff --git a/fdbclient/Tuple.h b/fdbclient/Tuple.h
index 9db4d7ba4c..4c52c1ebc6 100644
--- a/fdbclient/Tuple.h
+++ b/fdbclient/Tuple.h
@@ -25,6 +25,7 @@
 
 #include "flow/flow.h"
 #include "fdbclient/FDBTypes.h"
+#include "fdbclient/Versionstamp.h"
 
 struct Tuple {
 	Tuple() {}
@@ -47,6 +48,7 @@ struct Tuple {
 	Tuple& appendFloat(float);
 	Tuple& appendDouble(double);
 	Tuple& appendNull();
+	Tuple& appendVersionstamp(Versionstamp const&);
 
 	StringRef pack() const { return StringRef(data.begin(), data.size()); }
 
@@ -55,7 +57,7 @@ struct Tuple {
 		return append(t);
 	}
 
-	enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE };
+	enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE, VERSIONSTAMP };
 
 	// this is number of elements, not length of data
 	size_t size() const { return offsets.size(); }
@@ -68,6 +70,7 @@ struct Tuple {
 	StringRef subTupleRawString(size_t index) const;
 	ElementType getType(size_t index) const;
 	Standalone<StringRef> getString(size_t index) const;
+	Versionstamp getVersionstamp(size_t index) const;
 	int64_t getInt(size_t index, bool allow_incomplete = false) const;
 	bool getBool(size_t index) const;
 	float getFloat(size_t index) const;
diff --git a/fdbclient/Versionstamp.cpp b/fdbclient/Versionstamp.cpp
new file mode 100644
index 0000000000..7a61936040
--- /dev/null
+++ b/fdbclient/Versionstamp.cpp
@@ -0,0 +1,44 @@
+#include "fdbclient/Versionstamp.h"
+
+Versionstamp::Versionstamp(StringRef str) {
+	if (str.size() != VERSIONSTAMP_TUPLE_SIZE) {
+		throw invalid_versionstamp_size();
+	}
+	data = str;
+}
+
+int16_t Versionstamp::getBatchNumber() const {
+	const uint8_t* begin = data.begin();
+	begin += 8;
+	int16_t batchNumber = *(int16_t*)(begin);
+	batchNumber = bigEndian16(batchNumber);
+	return batchNumber;
+}
+
+int16_t Versionstamp::getUserVersion() const {
+	const uint8_t* begin = data.begin();
+	begin += 10;
+	int16_t userVersion = *(int16_t*)(begin);
+	userVersion = bigEndian16(userVersion);
+	return userVersion;
+}
+
+const uint8_t* Versionstamp::begin() const {
+	return data.begin();
+}
+
+int64_t Versionstamp::getVersion() const {
+	const uint8_t* begin = data.begin();
+	int64_t version = *(int64_t*)begin;
+	version = bigEndian64(version);
+	return version;
+}
+
+size_t Versionstamp::size() const {
+	return VERSIONSTAMP_TUPLE_SIZE;
+}
+
+bool Versionstamp::operator==(const Versionstamp& other) const {
+	return getVersion() == other.getVersion() && getBatchNumber() == other.getBatchNumber() &&
+	       getUserVersion() == other.getUserVersion();
+}
\ No newline at end of file
diff --git a/fdbclient/Versionstamp.h b/fdbclient/Versionstamp.h
new file mode 100644
index 0000000000..b3fd6770c7
--- /dev/null
+++ b/fdbclient/Versionstamp.h
@@ -0,0 +1,44 @@
+/*
+ * Versionstamp.h
+ *
+ * This source file is part of the FoundationDB open source project
+ *
+ * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FDBCLIENT_VERSIONSTAMP_H
+#define FDBCLIENT_VERSIONSTAMP_H
+
+#pragma once
+
+#include "flow/Arena.h"
+
+const size_t VERSIONSTAMP_TUPLE_SIZE = 12;
+
+struct Versionstamp {
+	Versionstamp(StringRef);
+
+	int64_t getVersion() const;
+	int16_t getBatchNumber() const;
+	int16_t getUserVersion() const;
+	size_t size() const;
+	const uint8_t* begin() const;
+	bool operator==(const Versionstamp&) const;
+
+private:
+	Standalone<StringRef> data;
+};
+
+#endif
\ No newline at end of file
diff --git a/flow/error_definitions.h b/flow/error_definitions.h
index f83e665d37..f296bc943a 100755
--- a/flow/error_definitions.h
+++ b/flow/error_definitions.h
@@ -258,6 +258,7 @@ ERROR( directory_prefix_in_use, 2265, "Directory layer already has a conflicting
 ERROR( invalid_destination_directory, 2266, "Target directory is invalid" )
 ERROR( cannot_modify_root_directory, 2267, "Root directory cannot be modified" )
 ERROR( invalid_uuid_size, 2268, "UUID is not sixteen bytes");
+ERROR( invalid_versionstamp_size, 2269, "Versionstamp is not exactly twelve bytes");
 
 // 2300 - backup and restore errors
 ERROR( backup_error, 2300, "Backup error")