diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 70c0bfd37a..8f77027581 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -25,6 +25,7 @@ #include "flow/Net2Packet.h" #include "flow/ActorCollection.h" #include "flow/TDMetric.actor.h" +#include "flow/ObjectSerializer.h" #include "fdbrpc/FailureMonitor.h" #include "fdbrpc/crc32c.h" #include "fdbrpc/simulator.h" @@ -371,7 +372,6 @@ struct Peer : NonCopyable { // Send an (ignored) packet to make sure that, if our outgoing connection died before the peer made this connection attempt, // we eventually find out that our connection is dead, close it, and then respond to the next connection reattempt from peer. - //sendPacket( self, SerializeSourceRaw(StringRef()), Endpoint(peer->address(), TOKEN_IGNORE_PACKET), false ); } } @@ -529,7 +529,8 @@ TransportData::~TransportData() { } } -ACTOR static void deliver( TransportData* self, Endpoint destination, ArenaReader reader, bool inReadSocket ) { +ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader reader, bool inReadSocket, + bool useFlatbuffers) { int priority = self->endpoints.getPriority(destination.token); if (priority < TaskReadSocket || !inReadSocket) { wait( delay(0, priority) ); @@ -540,8 +541,13 @@ ACTOR static void deliver( TransportData* self, Endpoint destination, ArenaReade auto receiver = self->endpoints.get(destination.token); if (receiver) { try { - g_currentDeliveryPeerAddress = destination.addresses; - receiver->receive( reader ); + g_currentDeliveryPeerAddress = destination.address; + if (useFlatbuffers) { + ArenaObjectReader objReader(reader.arena(), reader.arenaReadAll()); + receiver->receive(objReader); + } else { + receiver->receive( reader ); + } g_currentDeliveryPeerAddress = {NetworkAddress()}; } catch (Error& e) { g_currentDeliveryPeerAddress = {NetworkAddress()}; @@ -561,7 +567,8 @@ ACTOR static void deliver( TransportData* self, Endpoint destination, ArenaReade g_network->setCurrentTask( TaskReadSocket ); } -static void scanPackets( TransportData* transport, uint8_t*& unprocessed_begin, uint8_t* e, Arena& arena, NetworkAddress const& peerAddress, uint64_t peerProtocolVersion ) { +static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, uint8_t* e, Arena& arena, + NetworkAddress const& peerAddress, uint64_t peerProtocolVersion) { // Find each complete packet in the given byte range and queue a ready task to deliver it. // Remove the complete packets from the range by increasing unprocessed_begin. // There won't be more than 64K of data plus one packet, so this shouldn't take a long time. @@ -633,8 +640,9 @@ static void scanPackets( TransportData* transport, uint8_t*& unprocessed_begin, #if VALGRIND VALGRIND_CHECK_MEM_IS_DEFINED(p, packetLen); #endif - ArenaReader reader( arena, StringRef(p, packetLen), AssumeVersion(peerProtocolVersion) ); - UID token; reader >> token; + ArenaReader reader(arena, StringRef(p, packetLen), AssumeVersion(removeFlags(peerProtocolVersion))); + UID token; + reader >> token; ++transport->countPacketsReceived; @@ -649,7 +657,7 @@ static void scanPackets( TransportData* transport, uint8_t*& unprocessed_begin, transport->warnAlwaysForLargePacket = false; } - deliver( transport, Endpoint( {peerAddress}, token ), std::move(reader), true ); + deliver(transport, Endpoint({ peerAddress }, token), std::move(reader), true, hasObjectSerializerFlag(peerProtocolVersion)); unprocessed_begin = p = p + packetLen; } @@ -748,7 +756,8 @@ ACTOR static Future connectionReader( TraceEvent("ConnectionEstablished", conn->getDebugID()) .suppressFor(1.0) .detail("Peer", conn->getPeerAddress()) - .detail("ConnectionId", connectionId); + .detail("ConnectionId", connectionId) + .detail("UseObjectSerializer", false); } if(connectionId > 1) { @@ -994,13 +1003,18 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c // SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow? BinaryWriter wr( AssumeVersion(currentProtocolVersion) ); + // we don't need to send using an object writer here. This is a loopback delivery + // and therefore it is guaranteed that both versions will have exactly the + // same structures - so the backwards compatability capabilities are never needed + // here. what.serializeBinaryWriter(wr); Standalone copy = wr.toValue(); #if VALGRIND VALGRIND_CHECK_MEM_IS_DEFINED(copy.begin(), copy.size()); #endif - deliver( self, destination, ArenaReader(copy.arena(), copy, AssumeVersion(currentProtocolVersion)), false ); + deliver(self, destination, ArenaReader(copy.arena(), copy, AssumeVersion(currentProtocolVersion)), false, + false); return (PacketID)NULL; } else { @@ -1039,7 +1053,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c wr.writeAhead(packetInfoSize , &packetInfoBuffer); wr << destination.token; - what.serializePacketWriter(wr); + what.serializePacketWriter(wr, g_network->useObjectSerializer()); pb = wr.finish(); len = wr.size() - packetInfoSize; diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index 8be6fa6837..bcf9c334aa 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -85,9 +85,13 @@ public: }; #pragma pack(pop) + + +class ArenaObjectReader; class NetworkMessageReceiver { public: virtual void receive( ArenaReader& ) = 0; + virtual void receive(ArenaObjectReader&) = 0; virtual bool isStream() const { return false; } }; diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index 986fe6525d..4a94af1543 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -92,6 +92,17 @@ struct NetSAV : SAV, FlowReceiver, FastAllocated> { SAV::sendErrorAndDelPromiseRef(error); } } + virtual void receive(ArenaObjectReader& reader) { + if (!SAV::canBeSet()) return; + this->addPromiseRef(); + ErrorOr message; + reader.deserialize(message); + if (message.isError()) { + SAV::sendErrorAndDelPromiseRef(message.getError()); + } else { + SAV::sendAndDelPromiseRef(message.get()); + } + } }; diff --git a/fdbrpc/genericactors.actor.cpp b/fdbrpc/genericactors.actor.cpp index 9b727021db..d0065668e4 100644 --- a/fdbrpc/genericactors.actor.cpp +++ b/fdbrpc/genericactors.actor.cpp @@ -24,11 +24,6 @@ #include "fdbrpc/simulator.h" #include "flow/actorcompiler.h" -ACTOR void simDeliverDuplicate( Standalone data, Endpoint destination ) { - wait( delay( g_random->random01() * FLOW_KNOBS->MAX_DELIVER_DUPLICATE_DELAY ) ); - FlowTransport::transport().sendUnreliable( SerializeSourceRaw(data), destination ); -} - ACTOR Future disableConnectionFailuresAfter( double time, std::string context ) { wait( delay(time) ); diff --git a/fdbrpc/networksender.actor.h b/fdbrpc/networksender.actor.h index bc108c01d6..629360d188 100644 --- a/fdbrpc/networksender.actor.h +++ b/fdbrpc/networksender.actor.h @@ -31,14 +31,22 @@ #include "flow/actorcompiler.h" // This must be the last #include. ACTOR template -void networkSender( Future input, Endpoint endpoint ) { +void networkSender(Future input, Endpoint endpoint) { try { - T value = wait( input ); - FlowTransport::transport().sendUnreliable( SerializeBoolAnd(true, value), endpoint, false ); + T value = wait(input); + if (g_network->useObjectSerializer()) { + FlowTransport::transport().sendUnreliable(SerializeSource>(ErrorOr(value)), endpoint); + } else { + FlowTransport::transport().sendUnreliable(SerializeBoolAnd(true, value), endpoint, false); + } } catch (Error& err) { - //if (err.code() == error_code_broken_promise) return; - ASSERT( err.code() != error_code_actor_cancelled ); - FlowTransport::transport().sendUnreliable( SerializeBoolAnd(false, err), endpoint, false ); + // if (err.code() == error_code_broken_promise) return; + ASSERT(err.code() != error_code_actor_cancelled); + if (g_network->useObjectSerializer()) { + FlowTransport::transport().sendUnreliable(SerializeSource>(ErrorOr(err)), endpoint); + } else { + FlowTransport::transport().sendUnreliable(SerializeBoolAnd(false, err), endpoint, false); + } } } #include "flow/unactorcompiler.h" diff --git a/flow/Arena.h b/flow/Arena.h index d93f09dafa..4c1e60c6a0 100644 --- a/flow/Arena.h +++ b/flow/Arena.h @@ -26,6 +26,7 @@ #include "flow/FastRef.h" #include "flow/Error.h" #include "flow/Trace.h" +#include "flow/ObjectSerializerTraits.h" #include #include #include @@ -108,6 +109,18 @@ public: Reference impl; }; +template<> +struct scalar_traits : std::true_type { + constexpr static size_t size = 0; + static void save(uint8_t*, const Arena&) {} + // Context is an arbitrary type that is plumbed by reference throughout + // the load call tree. + template + static void load(const uint8_t*, Arena& arena, Context& context) { + context.addArena(arena); + } +}; + struct ArenaBlockRef { ArenaBlock* next; uint32_t nextBlockOffset; @@ -723,6 +736,17 @@ inline void save( Archive& ar, const StringRef& value ) { ar << (uint32_t)value.size(); ar.serializeBytes( value.begin(), value.size() ); } + +template<> +struct dynamic_size_traits : std::true_type { + static WriteRawMemory save(const StringRef& str) { return { { unownedPtr(str.begin()), str.size() } }; } + + template + static void load(const uint8_t* ptr, size_t sz, StringRef& str, Context& context) { + str = StringRef(context.tryReadZeroCopy(ptr, sz), sz); + } +}; + inline bool operator == (const StringRef& lhs, const StringRef& rhs ) { return lhs.size() == rhs.size() && !memcmp(lhs.begin(), rhs.begin(), lhs.size()); } @@ -753,6 +777,8 @@ struct memcpy_able : std::integral_constant {}; template class VectorRef { public: + using value_type = T; + // T must be trivially destructible (and copyable)! VectorRef() : data(0), m_size(0), m_capacity(0) {} @@ -928,6 +954,23 @@ inline void save( Archive& ar, const VectorRef& value ) { ar << value[i]; } +template +struct vector_like_traits> : std::true_type { + using Vec = VectorRef; + using value_type = typename Vec::value_type; + using iterator = const T*; + using insert_iterator = T*; + + static size_t num_entries(const VectorRef& v) { return v.size(); } + template + static void reserve(VectorRef& v, size_t s, Context& context) { + v.resize(context.arena(), s); + } + + static insert_iterator insert(Vec& v) { return v.begin(); } + static iterator begin(const Vec& v) { return v.begin(); } +}; + void ArenaBlock::destroy() { // If the stack never contains more than one item, nothing will be allocated from stackArena. // If stackArena is used, it will always be a linked list, so destroying *it* will not create another arena diff --git a/flow/FileIdentifier.h b/flow/FileIdentifier.h new file mode 100644 index 0000000000..4bbb1d9d14 --- /dev/null +++ b/flow/FileIdentifier.h @@ -0,0 +1,97 @@ +/* + * FileIdentifier.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include + +using FileIdentifier = uint32_t; + +template +struct FileIdentifierFor { + //constexpr static FileIdentifier value = T::file_identifier; + // TODO: use file identifiers for different types + constexpr static FileIdentifier value = 0xffffff; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 1; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 2; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 3; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 4; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 5; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 6; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 7; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 8; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 9; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 10; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 11; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 7266212; +}; + +template <> +struct FileIdentifierFor { + constexpr static FileIdentifier value = 9348150; +}; + diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index fe081e9131..5ee248a33e 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -58,6 +58,18 @@ using namespace boost::asio::ip; const uint64_t currentProtocolVersion = 0x0FDB00B061070001LL; const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL; const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL; +const uint64_t objectSerializerFlag = 0x1000000000000000LL; +const uint64_t versionFlagMask = 0x0FFFFFFFFFFFFFFFLL; + +uint64_t removeFlags(uint64_t version) { + return version & versionFlagMask; +} +uint64_t addObjectSerializerFlag(uint64_t version) { + return version | versionFlagMask; +} +bool hasObjectSerializerFlag(uint64_t version) { + return (version & objectSerializerFlag) > 0; +} // This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to change when we reach version 10. static_assert(currentProtocolVersion < 0x0FDB00B100000000LL, "Unexpected protocol version"); diff --git a/flow/ObjectSerializer.h b/flow/ObjectSerializer.h index 03e68905a2..b10be8e835 100644 --- a/flow/ObjectSerializer.h +++ b/flow/ObjectSerializer.h @@ -18,6 +18,7 @@ * limitations under the License. */ +#pragma once #include "flow/Error.h" #include "flow/Arena.h" #include "flow/flat_buffers.h" @@ -52,21 +53,21 @@ template class _ObjectReader { public: template - void deserialize(flat_buffers::FileIdentifier file_identifier, Items&... items) { + void deserialize(FileIdentifier file_identifier, Items&... items) { const uint8_t* data = static_cast(this)->data(); LoadContext context(*static_cast(this)); - ASSERT(flat_buffers::read_file_identifier(data) == file_identifier); - flat_buffers::load_members(data, context, items...); + ASSERT(read_file_identifier(data) == file_identifier); + load_members(data, context, items...); context.done(); } template void deserialize(Item& item) { - deserialize(flat_buffers::FileIdentifierFor::value, item); + deserialize(FileIdentifierFor::value, item); } }; -class ObjectReader : _ObjectReader { +class ObjectReader : public _ObjectReader { public: static constexpr bool ownsUnderlyingMemory = false; @@ -81,7 +82,7 @@ private: Arena _arena; }; -class ArenaObjectReader : _ObjectReader { +class ArenaObjectReader : public _ObjectReader { public: static constexpr bool ownsUnderlyingMemory = true; @@ -99,7 +100,7 @@ private: class ObjectWriter { public: template - void serialize(flat_buffers::FileIdentifier file_identifier, Items const&... items) { + void serialize(FileIdentifier file_identifier, Items const&... items) { ASSERT(data = nullptr); // object serializer can only serialize one object int allocations = 0; auto allocator = [this, &allocations](size_t size_) { @@ -108,21 +109,20 @@ public: data = new uint8_t[size]; return data; }; - auto res = flat_buffers::save_members(allocator, file_identifier, items...); + auto res = save_members(allocator, file_identifier, items...); ASSERT(allocations == 1); } template void serialize(Item const& item) { - serialize(flat_buffers::FileIdentifierFor::value, item); + serialize(FileIdentifierFor::value, item); + } + + StringRef toStringRef() const { + return StringRef(data, size); } private: uint8_t* data = nullptr; int size = 0; }; - -template -std::enable_if serializer(Visitor& visitor, Items&... items) { - visitor(items...); -} diff --git a/flow/ObjectSerializerTraits.h b/flow/ObjectSerializerTraits.h new file mode 100644 index 0000000000..9d1476e8e2 --- /dev/null +++ b/flow/ObjectSerializerTraits.h @@ -0,0 +1,164 @@ +/* + * ObjectSerializerTraits.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +template +struct pack {}; + +template +struct index_impl; + +template +struct index_impl> { + using type = typename index_impl>::type; +}; + +template +struct index_impl<0, pack> { + using type = T; +}; + +template +using index_t = typename index_impl::type; + +// A smart pointer that knows whether or not to delete itself. +template +using OwnershipErasedPtr = std::unique_ptr>; + +// Creates an OwnershipErasedPtr that will delete itself. +template > +OwnershipErasedPtr ownedPtr(T* t, Deleter&& d = Deleter{}) { + return OwnershipErasedPtr{ t, std::forward(d) }; +} + +// Creates an OwnershipErasedPtr that will not delete itself. +template +OwnershipErasedPtr unownedPtr(T* t) { + return OwnershipErasedPtr{ t, [](T*) {} }; +} + +struct WriteRawMemory { + using Block = std::pair, size_t>; + std::vector blocks; + + WriteRawMemory() {} + WriteRawMemory(Block&& b) { blocks.emplace_back(std::move(b.first), b.second); } + WriteRawMemory(std::vector&& v) : blocks(std::move(v)) {} + + WriteRawMemory(WriteRawMemory&&) = default; + WriteRawMemory& operator=(WriteRawMemory&&) = default; + + size_t size() const { + size_t result = 0; + for (const auto& b : blocks) { + result += b.second; + } + return result; + } +}; + + +template +struct scalar_traits : std::false_type { + constexpr static size_t size = 0; + static void save(uint8_t*, const T&); + + // Context is an arbitrary type that is plumbed by reference throughout the + // load call tree. + template + static void load(const uint8_t*, T&, Context&); +}; + + +template +struct dynamic_size_traits : std::false_type { + static WriteRawMemory save(const T&); + + // Context is an arbitrary type that is plumbed by reference throughout the + // load call tree. + template + static void load(const uint8_t*, size_t, T&, Context&); +}; + +template +struct serializable_traits : std::false_type { + template + static void serialize(Archiver& ar, T& v); +}; + +template +struct vector_like_traits : std::false_type { + // Write this at the beginning of the buffer + using value_type = uint8_t; + using iterator = void; + using insert_iterator = void; + + static size_t num_entries(VectorLike&); + template + static void reserve(VectorLike&, size_t, Context&); + + static insert_iterator insert(VectorLike&); + static iterator begin(const VectorLike&); + static void deserialization_done(VectorLike&); // Optional +}; + +template +struct union_like_traits : std::false_type { + using Member = UnionLike; + using alternatives = pack<>; + static uint8_t index(const Member&); + static bool empty(const Member& variant); + + template + static const index_t& get(const Member&); + + template + static const void assign(Member&, const Alternative&); + + template + static void done(Member&, Context&); +}; + +// TODO(anoyes): Implement things that are currently using scalar traits with +// struct-like traits. +template +struct struct_like_traits : std::false_type { + using Member = StructLike; + using types = pack<>; + + template + static const index_t& get(const Member&); + + template + static const void assign(Member&, const index_t&); + + template + static void done(Member&, Context&); +}; + + diff --git a/flow/flat_buffers.cpp b/flow/flat_buffers.cpp index 29e2b6dd84..53739e4b7b 100644 --- a/flow/flat_buffers.cpp +++ b/flow/flat_buffers.cpp @@ -18,17 +18,16 @@ * limitations under the License. */ -#include "flat_buffers.h" -#include "UnitTest.h" -#include "Arena.h" -#include "serialize.h" +#include "flow/flat_buffers.h" +#include "flow/UnitTest.h" +#include "flow/Arena.h" +#include "flow/serialize.h" +#include "flow/ObjectSerializer.h" #include #include #include -namespace flat_buffers { - namespace detail { bool TraverseMessageTypes::vtableGeneratedBefore(const std::type_index& idx) { @@ -106,7 +105,7 @@ struct Table2 { int16_t m_ed = {}; template void serialize(Archiver& ar) { - return flat_buffers::serializer(ar, m_p, m_ujrnpumbfvc, m_iwgxxt, m_tjkuqo, m_ed); + serializer(ar, m_p, m_ujrnpumbfvc, m_iwgxxt, m_tjkuqo, m_ed); } }; @@ -117,7 +116,7 @@ struct Table3 { int64_t m_n = {}; template void serialize(Archiver& ar) { - return flat_buffers::serializer(ar, m_asbehdlquj, m_k, m_jib, m_n); + serializer(ar, m_asbehdlquj, m_k, m_jib, m_n); } }; @@ -134,7 +133,7 @@ struct Nested2 { int c; template void serialize(Archiver& ar) { - return flat_buffers::serializer(ar, a, b, c); + serializer(ar, a, b, c); } friend bool operator==(const Nested2& lhs, const Nested2& rhs) { @@ -149,7 +148,7 @@ struct Nested { std::vector c; template void serialize(Archiver& ar) { - return flat_buffers::serializer(ar, a, b, nested, c); + serializer(ar, a, b, nested, c); } }; @@ -159,7 +158,7 @@ struct Root { Nested c; template void serialize(Archiver& ar) { - return flat_buffers::serializer(ar, a, b, c); + serializer(ar, a, b, c); } }; @@ -224,7 +223,7 @@ TEST_CASE("flow/FlatBuffers/serializeDeserializeRoot") { { 3, "hello", { 6, { "abc", "def" }, 8 }, { 10, 11, 12 } } }; Root root2 = root; Arena arena; - auto out = flat_buffers::detail::save(arena, root, flat_buffers::FileIdentifier{}); + auto out = detail::save(arena, root, FileIdentifier{}); ASSERT(root.a == root2.a); ASSERT(root.b == root2.b); @@ -237,7 +236,7 @@ TEST_CASE("flow/FlatBuffers/serializeDeserializeRoot") { root2 = {}; DummyContext context; - flat_buffers::detail::load(root2, out, context); + detail::load(root2, out, context); ASSERT(root.a == root2.a); ASSERT(root.b == root2.b); @@ -399,7 +398,7 @@ struct Y1 { template void serialize(Archiver& ar) { - return flat_buffers::serializer(ar, a); + serializer(ar, a); } }; @@ -409,7 +408,7 @@ struct Y2 { template void serialize(Archiver& ar) { - return flat_buffers::serializer(ar, a, b); + serializer(ar, a, b); } }; @@ -421,7 +420,7 @@ struct X { template void serialize(Archiver& ar) { - return flat_buffers::serializer(ar, a, b, c); + serializer(ar, a, b, c); } }; @@ -493,12 +492,12 @@ TEST_CASE("flow/FlatBuffers/VectorRef") { for (const auto& str : src) { vec.push_back(arena, str); } - BinaryWriter writer(IncludeVersion()); - ::serialize_fake_root(writer, FileIdentifierFor::value, arena, vec); + ObjectWriter writer; + writer.serialize(FileIdentifierFor::value, arena, vec); serializedVector = StringRef(readerArena, writer.toStringRef()); } - ArenaReader reader(readerArena, serializedVector, IncludeVersion()); - ::serialize_fake_root(reader, FileIdentifierFor::value, vecArena, outVec); + ArenaObjectReader reader(readerArena, serializedVector); + reader.deserialize(FileIdentifierFor::value, vecArena, outVec); } ASSERT(src.size() == outVec.size()); for (int i = 0; i < src.size(); ++i) { @@ -509,5 +508,3 @@ TEST_CASE("flow/FlatBuffers/VectorRef") { } } // namespace unit_tests - -} // namespace flat_buffers diff --git a/flow/flat_buffers.h b/flow/flat_buffers.h index adcbc6b8a9..13637b704b 100644 --- a/flow/flat_buffers.h +++ b/flow/flat_buffers.h @@ -35,11 +35,8 @@ #include #include #include - -namespace flat_buffers { - -template -struct pack {}; +#include "flow/FileIdentifier.h" +#include "flow/ObjectSerializerTraits.h" template , class...> struct concat { @@ -55,117 +52,23 @@ constexpr auto pack_size(pack) { return sizeof...(Ts); } -template -struct index; - -template -struct index> { - using type = typename index>::type; -}; - -template -struct index<0, pack> { - using type = T; -}; - -template -using index_t = typename index::type; - constexpr int RightAlign(int offset, int alignment) { return offset % alignment == 0 ? offset : ((offset / alignment) + 1) * alignment; } -using FileIdentifier = uint32_t; +template +struct is_fb_function_t : std::false_type {}; + +template +struct is_fb_function_t::type> : std::true_type {}; template -struct FileIdentifierFor { - constexpr static FileIdentifier value = T::file_identifier; -}; +constexpr bool is_fb_function = is_fb_function_t::value; -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 1; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 2; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 3; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 4; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 5; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 6; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 7; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 8; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 9; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 10; -}; - -template <> -struct FileIdentifierFor { - constexpr static FileIdentifier value = 11; -}; - -template <> -struct FileIdentifierFor { - constexpr static flat_buffers::FileIdentifier value = 7266212; -}; - -template <> -struct FileIdentifierFor { - constexpr static flat_buffers::FileIdentifier value = 9348150; -}; - -template -struct FileIdentifierFor> { - constexpr static FileIdentifier value = FileIdentifierFor>::value; -}; - -template -struct FileIdentifierFor> { - constexpr static FileIdentifier value = FileIdentifierFor::value ^ FileIdentifierFor::value; -}; - -template -struct FileIdentifierFor> { - constexpr static FileIdentifier value = (0x10 << 24) | FileIdentifierFor::value; -}; - -template -struct FileIdentifierFor> { - constexpr static FileIdentifier value = 15694229; -}; +template +typename std::enable_if, void>::type serializer(Visitor& visitor, Items&... items) { + visitor(items...); +} template struct object_construction { @@ -180,119 +83,6 @@ struct object_construction { T move() { return std::move(obj); } }; -// A smart pointer that knows whether or not to delete itself. -template -using OwnershipErasedPtr = std::unique_ptr>; - -// Creates an OwnershipErasedPtr that will delete itself. -template > -OwnershipErasedPtr ownedPtr(T* t, Deleter&& d = Deleter{}) { - return OwnershipErasedPtr{ t, std::forward(d) }; -} - -// Creates an OwnershipErasedPtr that will not delete itself. -template -OwnershipErasedPtr unownedPtr(T* t) { - return OwnershipErasedPtr{ t, [](T*) {} }; -} - -template -struct scalar_traits : std::false_type { - constexpr static size_t size = 0; - static void save(uint8_t*, const T&); - - // Context is an arbitrary type that is plumbed by reference throughout the - // load call tree. - template - static void load(const uint8_t*, T&, Context&); -}; - -struct WriteRawMemory { - using Block = std::pair, size_t>; - std::vector blocks; - - WriteRawMemory() {} - WriteRawMemory(Block&& b) { blocks.emplace_back(std::move(b.first), b.second); } - WriteRawMemory(std::vector&& v) : blocks(std::move(v)) {} - - WriteRawMemory(WriteRawMemory&&) = default; - WriteRawMemory& operator=(WriteRawMemory&&) = default; - - size_t size() const { - size_t result = 0; - for (const auto& b : blocks) { - result += b.second; - } - return result; - } -}; - -template -struct dynamic_size_traits : std::false_type { - static WriteRawMemory save(const T&); - - // Context is an arbitrary type that is plumbed by reference throughout the - // load call tree. - template - static void load(const uint8_t*, size_t, T&, Context&); -}; - -template -struct serializable_traits : std::false_type { - template - static void serialize(Archiver& ar, T& v); -}; - -template -struct vector_like_traits : std::false_type { - // Write this at the beginning of the buffer - using value_type = uint8_t; - using iterator = void; - using insert_iterator = void; - - static size_t num_entries(VectorLike&); - template - static void reserve(VectorLike&, size_t, Context&); - - static insert_iterator insert(VectorLike&); - static iterator begin(const VectorLike&); - static void deserialization_done(VectorLike&); // Optional -}; - -template -struct union_like_traits : std::false_type { - using Member = UnionLike; - using alternatives = pack<>; - static uint8_t index(const Member&); - static bool empty(const Member& variant); - - template - static const index_t& get(const Member&); - - template - static const void assign(Member&, const Alternative&); - - template - static void done(Member&, Context&); -}; - -// TODO(anoyes): Implement things that are currently using scalar traits with -// struct-like traits. -template -struct struct_like_traits : std::false_type { - using Member = StructLike; - using types = pack<>; - - template - static const index_t& get(const Member&); - - template - static const void assign(Member&, const index_t&); - - template - static void done(Member&, Context&); -}; - template struct struct_like_traits> : std::true_type { using Member = std::tuple; @@ -320,14 +110,11 @@ struct scalar_traits::value || std::is_f } }; -template -void serializer(F& fun, Items&... items); - template struct serializable_traits> : std::true_type { template static void serialize(Archiver& ar, std::pair& p) { - flat_buffers::serializer(ar, p.first, p.second); + serializer(ar, p.first, p.second); } }; @@ -734,6 +521,7 @@ private: struct InsertVTableLambda { static constexpr bool isDeserializing = true; + static constexpr bool is_fb_visitor = true; std::set& vtables; std::set& known_types; @@ -856,6 +644,7 @@ private: template struct SaveVisitorLambda { static constexpr bool isDeserializing = false; + static constexpr bool is_fb_visitor = true; const VTableSet* vtableset; Writer& writer; @@ -982,6 +771,7 @@ struct LoadSaveHelper { template struct SerializeFun { static constexpr bool isDeserializing = true; + static constexpr bool is_fb_visitor = true; const uint16_t* vtable; const uint8_t* current; @@ -1143,11 +933,6 @@ auto save_helper(const Member& member, Writer& writer, const VTableSet* vtables) } // namespace detail -template -void serializer(F& fun, Items&... items) { - fun(items...); -} - namespace detail { template @@ -1164,7 +949,7 @@ struct FakeRoot { private: template void serialize_impl(Archive& archive, std::index_sequence) { - flat_buffers::serializer(archive, std::get(members)...); + serializer(archive, std::get(members)...); } }; @@ -1189,20 +974,20 @@ uint8_t* save(Allocator& allocator, const Root& root, FileIdentifier file_identi template void load(Root& root, const uint8_t* in, Context& context) { - flat_buffers::detail::load_helper(root, in, context); + detail::load_helper(root, in, context); } } // namespace detail template uint8_t* save_members(Allocator& allocator, FileIdentifier file_identifier, Members&... members) { - const auto& root = flat_buffers::detail::fake_root(members...); + const auto& root = detail::fake_root(members...); return detail::save(allocator, root, file_identifier); } template void load_members(const uint8_t* in, Context& context, Members&... members) { - auto root = flat_buffers::detail::fake_root(members...); + auto root = detail::fake_root(members...); detail::load(root, in, context); } @@ -1216,7 +1001,7 @@ inline FileIdentifier read_file_identifier(const uint8_t* in) { // introduce the indirection only when necessary. template struct EnsureTable { - constexpr static flat_buffers::FileIdentifier file_identifier = FileIdentifierFor::value; + constexpr static FileIdentifier file_identifier = FileIdentifierFor::value; EnsureTable() = default; EnsureTable(const object_construction& t) : t(t) {} EnsureTable(const T& t) : t(t) {} @@ -1229,7 +1014,7 @@ struct EnsureTable { t.get().serialize(ar); } } else { - flat_buffers::serializer(ar, t.get()); + serializer(ar, t.get()); } } T& asUnderlyingType() { return t.get(); } @@ -1238,4 +1023,3 @@ private: object_construction t; }; -} // namespace flat_buffers diff --git a/flow/network.h b/flow/network.h index 2756ac963c..20124dcf66 100644 --- a/flow/network.h +++ b/flow/network.h @@ -399,6 +399,9 @@ public: virtual bool isAddressOnThisHost( NetworkAddress const& addr ) = 0; // Returns true if it is reasonably certain that a connection to the given address would be a fast loopback connection + virtual bool useObjectSerializer() { return false; } + // Whether or not the object serializer should be used when sending packets + // Shorthand for transport().getLocalAddress() static NetworkAddress getLocalAddress() { diff --git a/flow/serialize.h b/flow/serialize.h index 5e9c604e93..e9bf3ff84f 100644 --- a/flow/serialize.h +++ b/flow/serialize.h @@ -27,6 +27,8 @@ #include #include "flow/Error.h" #include "flow/Arena.h" +#include "flow/FileIdentifier.h" +#include "flow/ObjectSerializer.h" #include // Though similar, is_binary_serializable cannot be replaced by std::is_pod, as doing so would prefer @@ -99,6 +101,11 @@ inline void load( Ar& ar, T& value ) { Serializer::serialize(ar, value); } +template +struct FileIdentifierFor> { + constexpr static FileIdentifier value = 15694229; +}; + template inline void load( Archive& ar, std::string& value ) { int32_t length; @@ -123,6 +130,12 @@ public: } }; +template +struct FileIdentifierFor> { + constexpr static FileIdentifier value = FileIdentifierFor::value ^ FileIdentifierFor::value; +}; + + template class Serializer< Archive, std::pair, void > { public: @@ -131,6 +144,11 @@ public: } }; +template +struct FileIdentifierFor> { + constexpr static FileIdentifier value = (0x10 << 24) | FileIdentifierFor::value; +}; + template inline void save( Archive& ar, const std::vector& value ) { ar << (int)value.size(); @@ -229,6 +247,11 @@ static inline bool valgrindCheck( const void* data, int bytes, const char* conte extern const uint64_t currentProtocolVersion; extern const uint64_t minValidProtocolVersion; extern const uint64_t compatibleProtocolVersionMask; +extern const uint64_t objectSerializerFlag; + +extern uint64_t removeFlags(uint64_t version); +extern uint64_t addObjectSerializerFlag(uint64_t version); +extern bool hasObjectSerializerFlag(uint64_t version); struct _IncludeVersion { uint64_t v; @@ -504,6 +527,10 @@ public: return (const uint8_t*)readBytes(bytes); } + StringRef arenaReadAll() const { + return StringRef(reinterpret_cast(begin), end - begin); + } + template void serializeBinaryItem( T& t ) { t = *(T*)readBytes(sizeof(T)); @@ -686,14 +713,22 @@ private: }; struct ISerializeSource { - virtual void serializePacketWriter( PacketWriter& ) const = 0; - virtual void serializeBinaryWriter( BinaryWriter& ) const = 0; + virtual void serializePacketWriter(PacketWriter&, bool useObjectSerializer) const = 0; + virtual void serializeBinaryWriter(BinaryWriter&) const = 0; }; template struct MakeSerializeSource : ISerializeSource { - virtual void serializePacketWriter( PacketWriter& w ) const { ((T const*)this)->serialize(w); } - virtual void serializeBinaryWriter( BinaryWriter& w ) const { ((T const*)this)->serialize(w); } + virtual void serializePacketWriter(PacketWriter& w, bool useObjectSerializer) const { + if (useObjectSerializer) { + ObjectWriter writer; + writer.serialize(get()); + } else { + ((T const*)this)->serialize(w); + } + } + virtual void serializeBinaryWriter(BinaryWriter& w) const { ((T const*)this)->serialize(w); } + virtual T const& get() const = 0; }; template @@ -701,6 +736,7 @@ struct SerializeSource : MakeSerializeSource> { T const& value; SerializeSource(T const& value) : value(value) {} template void serialize(Ar& ar) const { ar << value; } + virtual T const& get() const { return value; } }; template @@ -709,12 +745,11 @@ struct SerializeBoolAnd : MakeSerializeSource> { T const& value; SerializeBoolAnd( bool b, T const& value ) : b(b), value(value) {} template void serialize(Ar& ar) const { ar << b << value; } -}; - -struct SerializeSourceRaw : MakeSerializeSource { - StringRef data; - SerializeSourceRaw(StringRef data) : data(data) {} - template void serialize(Ar& ar) const { ar.serializeBytes(data); } + virtual T const& get() const { + // This is only used for the streaming serializer + ASSERT(false); + return value; + } }; #endif