1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-28 02:48:09 +08:00

Support PacketBuffer's of arbitrary size

This commit is contained in:
Andrew Noyes 2019-07-01 09:57:50 -07:00 committed by mpilman
parent 70eac949e6
commit 70f0726185
5 changed files with 68 additions and 37 deletions

@ -334,7 +334,7 @@ namespace HTTP {
}
// Write headers to a packet buffer chain
PacketBuffer *pFirst = new PacketBuffer();
PacketBuffer* pFirst = PacketBuffer::create();
PacketBuffer *pLast = writeRequestHeader(verb, resource, headers, pFirst);
// Prepend headers to content packer buffer chain
pContent->prependWriteBuffer(pFirst, pLast);

@ -342,7 +342,7 @@ struct Peer : NonCopyable {
}
pkt.connectionId = transport->transportId;
PacketBuffer* pb_first = new PacketBuffer;
PacketBuffer* pb_first = PacketBuffer::create();
PacketWriter wr( pb_first, nullptr, Unversioned() );
pkt.serialize(wr);
unsent.prependWriteBuffer(pb_first, wr.finish());
@ -1206,15 +1206,16 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
// Find the correct place to start calculating checksum
uint32_t checksumUnprocessedLength = len;
prevBytesWritten += packetInfoSize;
if (prevBytesWritten >= PacketBuffer::DATA_SIZE) {
prevBytesWritten -= PacketBuffer::DATA_SIZE;
if (prevBytesWritten >= checksumPb->bytes_written) {
prevBytesWritten -= checksumPb->bytes_written;
checksumPb = checksumPb->nextPacketBuffer();
}
// Checksum calculation
while (checksumUnprocessedLength > 0) {
uint32_t processLength = std::min(checksumUnprocessedLength, (uint32_t)(PacketBuffer::DATA_SIZE - prevBytesWritten));
checksum = crc32c_append(checksum, checksumPb->data + prevBytesWritten, processLength);
uint32_t processLength =
std::min(checksumUnprocessedLength, (uint32_t)(checksumPb->bytes_written - prevBytesWritten));
checksum = crc32c_append(checksum, checksumPb->data() + prevBytesWritten, processLength);
checksumUnprocessedLength -= processLength;
checksumPb = checksumPb->nextPacketBuffer();
prevBytesWritten = 0;

@ -42,8 +42,8 @@ PacketBuffer* PacketWriter::finish() {
void PacketWriter::serializeBytesAcrossBoundary(const void* data, int bytes) {
while(true) {
int b = std::min( bytes, PacketBuffer::DATA_SIZE - buffer->bytes_written );
memcpy( buffer->data + buffer->bytes_written, data, b );
int b = std::min(bytes, buffer->bytes_unwritten());
memcpy(buffer->data() + buffer->bytes_written, data, b);
buffer->bytes_written += b;
bytes -= b;
if (!bytes) break;
@ -54,13 +54,14 @@ void PacketWriter::serializeBytesAcrossBoundary(const void* data, int bytes) {
}
void PacketWriter::nextBuffer() {
ASSERT( buffer->bytes_written == PacketBuffer::DATA_SIZE );
length += PacketBuffer::DATA_SIZE;
buffer->next = new PacketBuffer;
auto last_buffer_bytes_written = buffer->bytes_written;
length += last_buffer_bytes_written;
buffer->next = PacketBuffer::create();
buffer = buffer->nextPacketBuffer();
if (reliable) {
reliable->end = PacketBuffer::DATA_SIZE;
reliable->end = last_buffer_bytes_written;
reliable->cont = new ReliablePacket;
reliable = reliable->cont;
reliable->buffer = buffer; buffer->addref();
@ -69,20 +70,19 @@ void PacketWriter::nextBuffer() {
}
void PacketWriter::writeAhead( int bytes, struct SplitBuffer* buf ) {
if (bytes <= PacketBuffer::DATA_SIZE - buffer->bytes_written) {
buf->begin = buffer->data + buffer->bytes_written;
if (bytes <= buffer->bytes_unwritten()) {
buf->begin = buffer->data() + buffer->bytes_written;
buf->first_length = bytes;
buffer->bytes_written += bytes;
buf->next = 0;
} else {
buf->begin = buffer->data + buffer->bytes_written;
buf->first_length = PacketBuffer::DATA_SIZE - buffer->bytes_written;
buffer->bytes_written = PacketBuffer::DATA_SIZE;
buf->begin = buffer->data() + buffer->bytes_written;
buf->first_length = buffer->bytes_unwritten();
buffer->bytes_written = buffer->size();
nextBuffer();
buf->next = buffer->data;
buf->next = buffer->data();
buffer->bytes_written = bytes - buf->first_length;
}
}
void SplitBuffer::write( const void* data, int len ) {
@ -142,14 +142,14 @@ void UnsentPacketQueue::sent(int bytes) {
if (b->bytes_sent + bytes <= b->bytes_written && (b->bytes_sent + bytes != b->bytes_written || (!b->next && b->bytes_unwritten()))) {
b->bytes_sent += bytes;
ASSERT( b->bytes_sent <= PacketBuffer::DATA_SIZE );
ASSERT(b->bytes_sent <= b->size());
break;
}
// We've sent an entire buffer
bytes -= b->bytes_written - b->bytes_sent;
b->bytes_sent = b->bytes_written;
ASSERT( b->bytes_written <= PacketBuffer::DATA_SIZE );
ASSERT(b->bytes_written <= b->size());
unsent_first = b->nextPacketBuffer();
if (!unsent_first) unsent_last = NULL;
b->delref();
@ -171,12 +171,12 @@ PacketBuffer* ReliablePacketList::compact(PacketBuffer* into, PacketBuffer* end)
for(ReliablePacket* c = r; c; c = c->cont) {
if (c->buffer == end /*&& c->begin>=c->buffer->bytes_written*/) // quit when we hit the unsent range
return into;
if (into->bytes_written == PacketBuffer::DATA_SIZE) {
into->next = new PacketBuffer;
if (into->bytes_written == into->size()) {
into->next = PacketBuffer::create();
into = into->nextPacketBuffer();
}
uint8_t* data = &c->buffer->data[c->begin];
uint8_t* data = &c->buffer->data()[c->begin];
int len = c->end-c->begin;
if (len > into->bytes_unwritten()) {
@ -190,7 +190,7 @@ PacketBuffer* ReliablePacketList::compact(PacketBuffer* into, PacketBuffer* end)
c->cont = e;
}
memcpy( into->data + into->bytes_written, data, len );
memcpy(into->data() + into->bytes_written, data, len);
c->buffer->delref(); c->buffer = into; c->buffer->addref();
c->begin = into->bytes_written;
into->bytes_written += len;

@ -44,7 +44,13 @@ public:
~UnsentPacketQueue() { discardAll(); }
// Get a PacketBuffer to write new packets into
PacketBuffer* getWriteBuffer() { if (!unsent_last) { ASSERT( !unsent_first ); unsent_first = unsent_last = new PacketBuffer; }; return unsent_last; }
PacketBuffer* getWriteBuffer() {
if (!unsent_last) {
ASSERT(!unsent_first);
unsent_first = unsent_last = PacketBuffer::create();
};
return unsent_last;
}
// Call after potentially adding to the chain returned by getWriteBuffer()
void setWriteBuffer(PacketBuffer* pb) { unsent_last = pb; }

@ -661,26 +661,50 @@ private:
};
struct SendBuffer {
int bytes_written, bytes_sent;
uint8_t const* data;
SendBuffer* next;
int bytes_written, bytes_sent;
};
struct PacketBuffer : SendBuffer, FastAllocated<PacketBuffer> {
struct PacketBuffer : SendBuffer {
private:
int reference_count;
enum { DATA_SIZE = 4096 - 28 }; //28 is the size of the PacketBuffer fields
uint8_t data[ DATA_SIZE ];
uint32_t size_;
static constexpr size_t PACKET_BUFFER_OVERHEAD = 32;
PacketBuffer() : reference_count(1) {
public:
uint8_t* data() { return const_cast<uint8_t*>(static_cast<SendBuffer*>(this)->data); }
size_t size() { return size_; }
private:
explicit PacketBuffer(size_t size) : reference_count(1), size_(size) {
next = 0;
bytes_written = bytes_sent = 0;
((SendBuffer*)this)->data = data;
static_assert( sizeof(PacketBuffer) == 4096, "PacketBuffer size mismatch" );
((SendBuffer*)this)->data = reinterpret_cast<uint8_t*>(this + 1);
static_assert(sizeof(PacketBuffer) == PACKET_BUFFER_OVERHEAD);
}
public:
static PacketBuffer* create(size_t size = 0) {
size = std::max(size, 4096 - PACKET_BUFFER_OVERHEAD);
if (size == 4096 - PACKET_BUFFER_OVERHEAD) {
return new (FastAllocator<4096>::allocate()) PacketBuffer{ size };
}
uint8_t* mem = new uint8_t[size + PACKET_BUFFER_OVERHEAD];
return new (mem) PacketBuffer{ size };
}
PacketBuffer* nextPacketBuffer() { return (PacketBuffer*)next; }
void addref() { ++reference_count; }
void delref() { if (!--reference_count) delete this; }
int bytes_unwritten() const { return DATA_SIZE-bytes_written; }
void delref() {
if (!--reference_count) {
if (size_ == 4096 - PACKET_BUFFER_OVERHEAD) {
FastAllocator<4096>::release(this);
} else {
delete[] this;
}
}
}
int bytes_unwritten() const { return size_ - bytes_written; }
};
struct PacketWriter {
@ -700,7 +724,7 @@ struct PacketWriter {
void serializeBytes(const void* data, int bytes) {
if (bytes <= buffer->bytes_unwritten()) {
memcpy(buffer->data + buffer->bytes_written, data, bytes);
memcpy(buffer->data() + buffer->bytes_written, data, bytes);
buffer->bytes_written += bytes;
} else {
serializeBytesAcrossBoundary(data, bytes);
@ -718,7 +742,7 @@ struct PacketWriter {
template <class T>
void serializeBinaryItem( const T& t ) {
if (sizeof(T) <= buffer->bytes_unwritten()) {
*(T*)(buffer->data + buffer->bytes_written) = t;
*(T*)(buffer->data() + buffer->bytes_written) = t;
buffer->bytes_written += sizeof(T);
} else {
serializeBytesAcrossBoundary(&t, sizeof(T));