Update format of serialized spans

Write span context as a string instead of as two 64-bit integers. Also
removes some debug code.
This commit is contained in:
Lukas Joswiak 2020-11-10 15:33:19 -08:00
parent abedd7a147
commit 069d75b6f2

View File

@ -165,7 +165,6 @@ ACTOR Future<Void> traceLog(int* pendingMessages, bool* sendError) {
struct FluentDTracer : ITracer { struct FluentDTracer : ITracer {
public: public:
~FluentDTracer() override { ~FluentDTracer() override {
// TODO: Handle case where socket->send returns after FluentDTracer instance is destructed?
while (!buffers_.empty()) { while (!buffers_.empty()) {
auto& request = buffers_.front(); auto& request = buffers_.front();
buffers_.pop(); buffers_.pop();
@ -186,12 +185,14 @@ public:
} }
}); });
if (span.location.name.size() == 0) {
return;
}
// ASSERT(!send_actor_.isReady()); // ASSERT(!send_actor_.isReady());
// ASSERT(!log_actor_.isReady()); // ASSERT(!log_actor_.isReady());
if (buffers_.empty()) { if (buffers_.empty()) {
++total_buffers_;
TraceEvent(SevInfo, "TracingSpanCreateBuffer").detail("Buffers", total_buffers_);
buffers_.push(TraceRequest{ buffers_.push(TraceRequest{
.buffer = new uint8_t[kTraceBufferSize], .buffer = new uint8_t[kTraceBufferSize],
.data_size = 0, .data_size = 0,
@ -202,9 +203,14 @@ public:
auto request = buffers_.front(); auto request = buffers_.front();
buffers_.pop(); buffers_.pop();
request.write_byte(6 | 0b10010000); // write as array // Serialize span fields as an array. If you change the serialization
// format here, make sure to update the fluentd filter to be able to
// correctly parse the updated format!
uint8_t size = 5;
if (span.parents.size() == 0) --size;
request.write_byte(size | 0b10010000); // write as array
serialize_uid(span.context, request); serialize_string(span.context.toString(), request);
serialize_value(span.begin, request, 0xcb); serialize_value(span.begin, request, 0xcb);
serialize_value(span.end, request, 0xcb); serialize_value(span.end, request, 0xcb);
@ -214,12 +220,7 @@ public:
serialize_vector(span.parents, request); serialize_vector(span.parents, request);
++pending_messages_; ++pending_messages_;
++total_messages_;
stream_.send(request); stream_.send(request);
if (total_messages_ % 50000 == 0) {
TraceEvent("TracingSpanTotalMessages").detail("Messages", total_messages_);
}
} }
private: private:
@ -242,9 +243,7 @@ private:
// specified by the msgpack specification. // specified by the msgpack specification.
inline void serialize_string(const std::string& str, TraceRequest& request) { inline void serialize_string(const std::string& str, TraceRequest& request) {
int size = str.size(); int size = str.size();
if (size == 0) { ASSERT(size > 0);
return;
}
if (size <= 31) { if (size <= 31) {
request.write_byte((uint8_t) size | 0b10100000); request.write_byte((uint8_t) size | 0b10100000);
@ -259,16 +258,14 @@ private:
request.write_bytes((uint8_t*) str.data(), size); request.write_bytes((uint8_t*) str.data(), size);
} }
// Writes the given UID to the request.
inline void serialize_uid(const UID& uid, TraceRequest& request) {
serialize_value(uid.first(), request, 0xcf);
serialize_value(uid.second(), request, 0xcf);
}
// Writes the given vector to the request. Assumes each element in the // Writes the given vector to the request. Assumes each element in the
// vector is a SpanID, and serializes as two big-endian 64-bit integers. // vector is a SpanID, and serializes as two big-endian 64-bit integers.
inline void serialize_vector(const SmallVectorRef<SpanID>& vec, TraceRequest& request) { inline void serialize_vector(const SmallVectorRef<SpanID>& vec, TraceRequest& request) {
int size = vec.size() * 2; int size = vec.size();
if (size == 0) {
return;
}
if (size <= 15) { if (size <= 15) {
request.write_byte((uint8_t) size | 0b10010000); request.write_byte((uint8_t) size | 0b10010000);
} else if (size <= 65535) { } else if (size <= 65535) {
@ -281,7 +278,7 @@ private:
} }
for (const auto& parentContext : vec) { for (const auto& parentContext : vec) {
serialize_uid(parentContext, request); serialize_string(parentContext.toString(), request);
} }
} }
@ -291,8 +288,6 @@ private:
std::queue<TraceRequest> buffers_; std::queue<TraceRequest> buffers_;
int pending_messages_; int pending_messages_;
bool send_error_; bool send_error_;
int total_buffers_; // TODO: This should be removed after performance testing is done
int total_messages_; // TODO: This should be removed after performance testing is done
PromiseStream<TraceRequest> stream_; PromiseStream<TraceRequest> stream_;
Future<Void> send_actor_; Future<Void> send_actor_;