swift-nio-extras/Sources/NIOExtras/WritePCAPHandler.swift

816 lines
33 KiB
Swift

//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import CNIOLinux
import Dispatch
import NIOCore
#if canImport(Darwin)
import Darwin
#elseif canImport(Musl)
import Musl
#elseif canImport(Android)
import Android
#else
import Glibc
#endif
let sysWrite = write
struct TCPHeader {
struct Flags: OptionSet {
var rawValue: UInt8
init(rawValue: UInt8) {
self.rawValue = rawValue
}
static let fin = Flags(rawValue: 1 << 0)
static let syn = Flags(rawValue: 1 << 1)
static let rst = Flags(rawValue: 1 << 2)
static let psh = Flags(rawValue: 1 << 3)
static let ack = Flags(rawValue: 1 << 4)
static let urg = Flags(rawValue: 1 << 5)
static let ece = Flags(rawValue: 1 << 6)
static let cwr = Flags(rawValue: 1 << 7)
}
var flags: Flags
var ackNumber: UInt32?
var sequenceNumber: UInt32
var srcPort: UInt16
var dstPort: UInt16
}
struct PCAPRecordHeader {
enum Error: Swift.Error {
case incompatibleAddressPair(SocketAddress, SocketAddress)
}
enum AddressTuple {
case v4(src: SocketAddress.IPv4Address, dst: SocketAddress.IPv4Address)
case v6(src: SocketAddress.IPv6Address, dst: SocketAddress.IPv6Address)
var srcPort: UInt16 {
switch self {
case .v4(let src, dst: _):
return UInt16(bigEndian: src.address.sin_port)
case .v6(let src, dst: _):
return UInt16(bigEndian: src.address.sin6_port)
}
}
var dstPort: UInt16 {
switch self {
case .v4(src: _, let dst):
return UInt16(bigEndian: dst.address.sin_port)
case .v6(src: _, let dst):
return UInt16(bigEndian: dst.address.sin6_port)
}
}
}
var payloadLength: Int
var addresses: AddressTuple
var time: timeval
var tcp: TCPHeader
init(payloadLength: Int, addresses: AddressTuple, time: timeval, tcp: TCPHeader) {
self.payloadLength = payloadLength
self.addresses = addresses
self.time = time
self.tcp = tcp
assert(addresses.srcPort == Int(tcp.srcPort))
assert(addresses.dstPort == Int(tcp.dstPort))
assert(tcp.ackNumber == nil ? !tcp.flags.contains([.ack]) : tcp.flags.contains([.ack]))
}
init(payloadLength: Int, src: SocketAddress, dst: SocketAddress, tcp: TCPHeader) throws {
let addressTuple: AddressTuple
switch (src, dst) {
case (.v4(let src), .v4(let dst)):
addressTuple = .v4(src: src, dst: dst)
case (.v6(let src), .v6(let dst)):
addressTuple = .v6(src: src, dst: dst)
default:
throw Error.incompatibleAddressPair(src, dst)
}
self = .init(payloadLength: payloadLength, addresses: addressTuple, tcp: tcp)
}
init(payloadLength: Int, addresses: AddressTuple, tcp: TCPHeader) {
var tv = timeval()
gettimeofday(&tv, nil)
self = .init(payloadLength: payloadLength, addresses: addresses, time: tv, tcp: tcp)
}
}
/// A `ChannelHandler` that can write a [`.pcap` file](https://en.wikipedia.org/wiki/Pcap) containing the send/received
/// data as synthesized TCP packet captures.
///
/// You will be able to open the `.pcap` file in for example [Wireshark](https://www.wireshark.org) or
/// [`tcpdump`](http://www.tcpdump.org). Using `NIOWritePCAPHandler` to write your `.pcap` files can be useful for
/// example when your real network traffic is TLS protected (so `tcpdump`/Wireshark can't read it directly), or if you
/// don't have enough privileges on the running host to dump the network traffic.
///
/// ``NIOWritePCAPHandler`` will also work with Unix Domain Sockets in which case it will still synthesize a TCP packet
/// capture with local address `111.111.111.111` (port `1111`) and remote address `222.222.222.222` (port `2222`).
public class NIOWritePCAPHandler: RemovableChannelHandler {
public enum Mode: Sendable {
case client
case server
}
/// Settings for ``NIOWritePCAPHandler``.
public struct Settings: Sendable {
/// When to issue data into the `.pcap` file.
public enum EmitPCAP: Sendable {
/// Write the data immediately when ``NIOWritePCAPHandler`` saw the event on the `ChannelPipeline`.
///
/// For writes this means when the `write` event is triggered. Please note that this will write potentially
/// unflushed data into the `.pcap` file.
///
/// If in doubt, prefer ``whenCompleted``.
case whenIssued
/// Write the data when the event completed.
///
/// For writes this means when the `write` promise is succeeded. The ``whenCompleted`` mode mirrors most
/// closely what's actually sent over the wire.
case whenCompleted
}
/// When to emit the data from the `write` event into the `.pcap` file.
public var emitPCAPWrites: EmitPCAP
/// Default settings for the ``NIOWritePCAPHandler``.
public init() {
self = .init(emitPCAPWrites: .whenCompleted)
}
/// Settings with customization.
///
/// - parameters:
/// - emitPCAPWrites: When to issue the writes into the `.pcap` file, see ``EmitPCAP``.
public init(emitPCAPWrites: EmitPCAP) {
self.emitPCAPWrites = emitPCAPWrites
}
}
private enum CloseState {
case notClosing
case closedInitiatorLocal
case closedInitiatorRemote
}
private let fileSink: (ByteBuffer) -> Void
private let mode: Mode
private let maxPayloadSize = Int(UInt16.max - 40) // needs to fit into the IPv4 header which adds 40
private let settings: Settings
private var buffer: ByteBuffer!
private var readInboundBytes: UInt64 = 0
private var writtenOutboundBytes: UInt64 = 0
private var closeState = CloseState.notClosing
private static let fakeLocalAddress = try! SocketAddress(ipAddress: "111.111.111.111", port: 1111)
private static let fakeRemoteAddress = try! SocketAddress(ipAddress: "222.222.222.222", port: 2222)
private var localAddress: SocketAddress?
private var remoteAddress: SocketAddress?
/// Reusable header for `.pcap` file.
public static var pcapFileHeader: ByteBuffer {
var buffer = ByteBufferAllocator().buffer(capacity: 24)
buffer.writePCAPHeader()
return buffer
}
/// Initialize a ``NIOWritePCAPHandler``.
///
/// - parameters:
/// - mode: Whether the handler should behave in the client or server mode.
/// - fakeLocalAddress: Allows you to optionally override the local address to be different from the real one.
/// - fakeRemoteAddress: Allows you to optionally override the remote address to be different from the real one.
/// - settings: The settings for the ``NIOWritePCAPHandler``.
/// - fileSink: The `fileSink` closure is called every time a new chunk of the `.pcap` file is ready to be
/// written to disk or elsewhere. See ``SynchronizedFileSink`` for a convenient way to write to
/// disk.
public init(
mode: Mode,
fakeLocalAddress: SocketAddress? = nil,
fakeRemoteAddress: SocketAddress? = nil,
settings: Settings,
fileSink: @escaping (ByteBuffer) -> Void
) {
self.settings = settings
self.fileSink = fileSink
self.mode = mode
if let fakeLocalAddress = fakeLocalAddress {
self.localAddress = fakeLocalAddress
}
if let fakeRemoteAddress = fakeRemoteAddress {
self.remoteAddress = fakeRemoteAddress
}
}
/// Initialize a ``NIOWritePCAPHandler`` with default settings.
///
/// - parameters:
/// - mode: Whether the handler should behave in the client or server mode.
/// - fakeLocalAddress: Allows you to optionally override the local address to be different from the real one.
/// - fakeRemoteAddress: Allows you to optionally override the remote address to be different from the real one.
/// - fileSink: The `fileSink` closure is called every time a new chunk of the `.pcap` file is ready to be
/// written to disk or elsewhere. See `NIOSynchronizedFileSink` for a convenient way to write to
/// disk.
public convenience init(
mode: Mode,
fakeLocalAddress: SocketAddress? = nil,
fakeRemoteAddress: SocketAddress? = nil,
fileSink: @escaping (ByteBuffer) -> Void
) {
self.init(
mode: mode,
fakeLocalAddress: fakeLocalAddress,
fakeRemoteAddress: fakeRemoteAddress,
settings: Settings(),
fileSink: fileSink
)
}
private func writeBuffer(_ buffer: ByteBuffer) {
self.fileSink(buffer)
}
private func localAddress(context: ChannelHandlerContext) -> SocketAddress {
if let localAddress = self.localAddress {
return localAddress
} else {
let localAddress = context.channel.localAddress ?? NIOWritePCAPHandler.fakeLocalAddress
self.localAddress = localAddress
return localAddress
}
}
private func remoteAddress(context: ChannelHandlerContext) -> SocketAddress {
if let remoteAddress = self.remoteAddress {
return remoteAddress
} else {
let remoteAddress = context.channel.remoteAddress ?? NIOWritePCAPHandler.fakeRemoteAddress
self.remoteAddress = remoteAddress
return remoteAddress
}
}
private func clientAddress(context: ChannelHandlerContext) -> SocketAddress {
switch self.mode {
case .client:
return self.localAddress(context: context)
case .server:
return self.remoteAddress(context: context)
}
}
private func serverAddress(context: ChannelHandlerContext) -> SocketAddress {
switch self.mode {
case .client:
return self.remoteAddress(context: context)
case .server:
return self.localAddress(context: context)
}
}
private func takeSensiblySizedPayload(buffer: inout ByteBuffer) -> ByteBuffer? {
guard buffer.readableBytes > 0 else {
return nil
}
return buffer.readSlice(length: min(buffer.readableBytes, self.maxPayloadSize))
}
private func sequenceNumber(byteCount: UInt64) -> UInt32 {
UInt32(byteCount % (UInt64(UInt32.max) + 1))
}
}
@available(*, unavailable)
extension NIOWritePCAPHandler: Sendable {}
extension NIOWritePCAPHandler: ChannelDuplexHandler {
public typealias InboundIn = ByteBuffer
public typealias InboundOut = ByteBuffer
public typealias OutboundIn = IOData
public typealias OutboundOut = IOData
public func handlerAdded(context: ChannelHandlerContext) {
self.buffer = context.channel.allocator.buffer(capacity: 256)
}
public func channelActive(context: ChannelHandlerContext) {
self.buffer.clear()
self.readInboundBytes = 1
self.writtenOutboundBytes = 1
do {
let clientAddress = self.clientAddress(context: context)
let serverAddress = self.serverAddress(context: context)
try self.buffer.writePCAPRecord(
.init(
payloadLength: 0,
src: clientAddress,
dst: serverAddress,
tcp: TCPHeader(
flags: [.syn],
ackNumber: nil,
sequenceNumber: 0,
srcPort: .init(clientAddress.port!),
dstPort: .init(serverAddress.port!)
)
)
)
try self.buffer.writePCAPRecord(
.init(
payloadLength: 0,
src: serverAddress,
dst: clientAddress,
tcp: TCPHeader(
flags: [.syn, .ack],
ackNumber: 1,
sequenceNumber: 0,
srcPort: .init(serverAddress.port!),
dstPort: .init(clientAddress.port!)
)
)
)
try self.buffer.writePCAPRecord(
.init(
payloadLength: 0,
src: clientAddress,
dst: serverAddress,
tcp: TCPHeader(
flags: [.ack],
ackNumber: 1,
sequenceNumber: 1,
srcPort: .init(clientAddress.port!),
dstPort: .init(serverAddress.port!)
)
)
)
self.writeBuffer(self.buffer)
} catch {
context.fireErrorCaught(error)
}
context.fireChannelActive()
}
public func channelInactive(context: ChannelHandlerContext) {
let didLocalInitiateTheClose: Bool
switch self.closeState {
case .closedInitiatorLocal:
didLocalInitiateTheClose = true
case .closedInitiatorRemote:
didLocalInitiateTheClose = false
case .notClosing:
self.closeState = .closedInitiatorRemote
didLocalInitiateTheClose = false
}
self.buffer.clear()
do {
let closeInitiatorAddress =
didLocalInitiateTheClose ? self.localAddress(context: context) : self.remoteAddress(context: context)
let closeRecipientAddress =
didLocalInitiateTheClose ? self.remoteAddress(context: context) : self.localAddress(context: context)
let initiatorSeq = self.sequenceNumber(
byteCount: didLocalInitiateTheClose ? self.writtenOutboundBytes : self.readInboundBytes
)
let recipientSeq = self.sequenceNumber(
byteCount: didLocalInitiateTheClose ? self.readInboundBytes : self.writtenOutboundBytes
)
// terminate the connection cleanly
try self.buffer.writePCAPRecord(
.init(
payloadLength: 0,
src: closeInitiatorAddress,
dst: closeRecipientAddress,
tcp: TCPHeader(
flags: [.fin],
ackNumber: nil,
sequenceNumber: initiatorSeq,
srcPort: .init(closeInitiatorAddress.port!),
dstPort: .init(closeRecipientAddress.port!)
)
)
)
try self.buffer.writePCAPRecord(
.init(
payloadLength: 0,
src: closeRecipientAddress,
dst: closeInitiatorAddress,
tcp: TCPHeader(
flags: [.ack, .fin],
ackNumber: initiatorSeq + 1,
sequenceNumber: recipientSeq,
srcPort: .init(closeRecipientAddress.port!),
dstPort: .init(closeInitiatorAddress.port!)
)
)
)
try self.buffer.writePCAPRecord(
.init(
payloadLength: 0,
src: closeInitiatorAddress,
dst: closeRecipientAddress,
tcp: TCPHeader(
flags: [.ack],
ackNumber: recipientSeq + 1,
sequenceNumber: initiatorSeq + 1,
srcPort: .init(closeInitiatorAddress.port!),
dstPort: .init(closeRecipientAddress.port!)
)
)
)
self.writeBuffer(self.buffer)
} catch {
context.fireErrorCaught(error)
}
context.fireChannelInactive()
}
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
defer {
context.fireChannelRead(data)
}
guard self.closeState == .notClosing else {
return
}
let data = self.unwrapInboundIn(data)
guard data.readableBytes > 0 else {
return
}
self.buffer.clear()
do {
var data = data
while var payloadToSend = self.takeSensiblySizedPayload(buffer: &data) {
try self.buffer.writePCAPRecord(
.init(
payloadLength: payloadToSend.readableBytes,
src: self.remoteAddress(context: context),
dst: self.localAddress(context: context),
tcp: TCPHeader(
flags: [],
ackNumber: nil,
sequenceNumber: self.sequenceNumber(byteCount: self.readInboundBytes),
srcPort: .init(self.remoteAddress(context: context).port!),
dstPort: .init(self.localAddress(context: context).port!)
)
)
)
self.readInboundBytes += UInt64(payloadToSend.readableBytes)
self.buffer.writeBuffer(&payloadToSend)
}
assert(data.readableBytes == 0)
self.writeBuffer(self.buffer)
} catch {
context.fireErrorCaught(error)
}
}
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
var buffer = self.unwrapInboundIn(data)
func emitWrites() {
do {
self.buffer.clear()
while var payloadToSend = self.takeSensiblySizedPayload(buffer: &buffer) {
try self.buffer.writePCAPRecord(
.init(
payloadLength: payloadToSend.readableBytes,
src: self.localAddress(context: context),
dst: self.remoteAddress(context: context),
tcp: TCPHeader(
flags: [],
ackNumber: nil,
sequenceNumber: self.sequenceNumber(byteCount: self.writtenOutboundBytes),
srcPort: .init(self.localAddress(context: context).port!),
dstPort: .init(self.remoteAddress(context: context).port!)
)
)
)
self.writtenOutboundBytes += UInt64(payloadToSend.readableBytes)
self.buffer.writeBuffer(&payloadToSend)
}
self.writeBuffer(self.buffer)
} catch {
context.fireErrorCaught(error)
}
}
switch self.settings.emitPCAPWrites {
case .whenCompleted:
let promise = promise ?? context.eventLoop.makePromise()
// A user-provided promise might be on a different event-loop, hop back to the context's
// event loop.
promise.futureResult.hop(to: context.eventLoop).assumeIsolatedUnsafeUnchecked().whenSuccess {
emitWrites()
}
context.write(data, promise: promise)
case .whenIssued:
emitWrites()
context.write(data, promise: promise)
}
}
public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if let event = event as? ChannelEvent {
if event == .inputClosed {
switch self.closeState {
case .closedInitiatorLocal:
() // fair enough, we already closed locally
case .closedInitiatorRemote:
() // that's odd but okay
case .notClosing:
self.closeState = .closedInitiatorRemote
}
}
}
context.fireUserInboundEventTriggered(event)
}
public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
switch self.closeState {
case .closedInitiatorLocal:
() // weird, this looks like a double-close
case .closedInitiatorRemote:
() // fair enough, already closed I guess
case .notClosing:
self.closeState = .closedInitiatorLocal
}
context.close(mode: mode, promise: promise)
}
}
extension ByteBuffer {
mutating func writePCAPHeader() {
// guint32 magic_number; // magic number
self.writeInteger(0xa1b2_c3d4, endianness: .host, as: UInt32.self)
// guint16 version_major; // major version number
self.writeInteger(2, endianness: .host, as: UInt16.self)
// guint16 version_minor; // minor version number
self.writeInteger(4, endianness: .host, as: UInt16.self)
// gint32 thiszone; // GMT to local correction
self.writeInteger(0, endianness: .host, as: UInt32.self)
// guint32 sigfigs; // accuracy of timestamps
self.writeInteger(0, endianness: .host, as: UInt32.self)
// guint32 snaplen; // max length of captured packets, in octets
self.writeInteger(.max, endianness: .host, as: UInt32.self)
// guint32 network; // data link type
self.writeInteger(0, endianness: .host, as: UInt32.self)
}
mutating func writePCAPRecord(_ record: PCAPRecordHeader) throws {
let rawDataLength = record.payloadLength
let tcpLength = rawDataLength + 20 // TCP header length
// record
// guint32 ts_sec; // timestamp seconds
self.writeInteger(.init(record.time.tv_sec), endianness: .host, as: UInt32.self)
// guint32 ts_usec; // timestamp microseconds
self.writeInteger(.init(record.time.tv_usec), endianness: .host, as: UInt32.self)
// continued below ...
switch record.addresses {
case .v4(let la, let ra):
let ipv4WholeLength = tcpLength + 20 // IPv4 header length, included in IPv4
let recordLength = ipv4WholeLength + 4 // 32 bits for protocol id
// record, continued
// guint32 incl_len; // number of octets of packet saved in file
self.writeInteger(.init(recordLength), endianness: .host, as: UInt32.self)
// guint32 orig_len; // actual length of packet
self.writeInteger(.init(recordLength), endianness: .host, as: UInt32.self)
self.writeInteger(2, endianness: .host, as: UInt32.self) // IPv4
// IPv4 packet
self.writeInteger(0x45, as: UInt8.self) // IP version (4) & IHL (5)
self.writeInteger(0, as: UInt8.self) // DSCP
self.writeInteger(.init(ipv4WholeLength), as: UInt16.self)
self.writeInteger(0, as: UInt16.self) // identification
self.writeInteger(0x4000, as: UInt16.self) // flags & fragment offset, 0x4000 sets "don't fragment"
self.writeInteger(.max, as: UInt8.self) // TTL, `.max` as we don't care about the TTL
self.writeInteger(6, as: UInt8.self) // TCP
self.writeInteger(0, as: UInt16.self) // checksum
self.writeInteger(la.address.sin_addr.s_addr, endianness: .host, as: UInt32.self)
self.writeInteger(ra.address.sin_addr.s_addr, endianness: .host, as: UInt32.self)
case .v6(let la, let ra):
let ipv6PayloadLength = tcpLength
let recordLength = ipv6PayloadLength + 4 + 40 // IPv6 header length (+4 gives 32 bits for protocol id)
// record, continued
// guint32 incl_len; // number of octets of packet saved in file
self.writeInteger(.init(recordLength), endianness: .host, as: UInt32.self)
// guint32 orig_len; // actual length of packet
self.writeInteger(.init(recordLength), endianness: .host, as: UInt32.self)
self.writeInteger(24, endianness: .host, as: UInt32.self) // IPv6
// IPv6 packet
self.writeInteger((6 << 28), as: UInt32.self) // IP version (6) & fancy stuff
self.writeInteger(.init(ipv6PayloadLength), as: UInt16.self)
self.writeInteger(6, as: UInt8.self) // TCP
self.writeInteger(.max, as: UInt8.self) // hop limit (like TTL & we don't care about TTL)
var laAddress = la.address
withUnsafeBytes(of: &laAddress.sin6_addr) { ptr in
assert(ptr.count == 16)
self.writeBytes(ptr)
}
var raAddress = ra.address
withUnsafeBytes(of: &raAddress.sin6_addr) { ptr in
assert(ptr.count == 16)
self.writeBytes(ptr)
}
}
// TCP
self.writeInteger(record.tcp.srcPort, as: UInt16.self)
self.writeInteger(record.tcp.dstPort, as: UInt16.self)
self.writeInteger(record.tcp.sequenceNumber, as: UInt32.self) // seq no
self.writeInteger(record.tcp.ackNumber ?? 0, as: UInt32.self) // ack no
// data offset + reserved bits + fancy stuff
self.writeInteger(5 << 12 | UInt16(record.tcp.flags.rawValue), as: UInt16.self)
self.writeInteger(.max, as: UInt16.self) // window size, `.max` as we don't do actual window sizes
self.writeInteger(0xbad, as: UInt16.self) // checksum (a fake one)
self.writeInteger(0, as: UInt16.self) // urgent pointer
}
}
extension NIOWritePCAPHandler {
/// A synchronised file sink that uses a `DispatchQueue` to do all the necessary write synchronously.
///
/// A `SynchronizedFileSink` is thread-safe so can be used from any thread/`EventLoop`. After use, you
/// _must_ call `syncClose` or `close` on the `SynchronizedFileSink` to shut it and all the associated resources down. Failing
/// to do so triggers undefined behaviour.
public final class SynchronizedFileSink {
private let fileHandle: NIOFileHandle
private let workQueue: DispatchQueue
private let writesGroup = DispatchGroup()
private let errorHandler: (Swift.Error) -> Void
private var state: State = .running // protected by `workQueue`
public enum FileWritingMode: Sendable {
case appendToExistingPCAPFile
case createNewPCAPFile
}
public struct Error: Swift.Error {
public var errorCode: Int
internal enum ErrorCode: Int {
case cannotOpenFileError = 1
case cannotWriteToFileError
}
}
private enum State {
case running
case error(Swift.Error)
}
/// Creates a `SynchronizedFileSink` for writing to a `.pcap` file at `path`.
///
/// Typically, after you created a `SynchronizedFileSink`, you will hand `myFileSink.write` to
/// `NIOWritePCAPHandler`'s constructor so `NIOPCAPHandler` can write `.pcap` files. Example:
///
/// ```swift
/// let fileSink = try NIOWritePCAPHandler.SynchronizedFileSink.fileSinkWritingToFile(path: "test.pcap",
/// errorHandler: { error in
/// print("ERROR: \(error)")
/// })
/// defer {
/// try fileSink.syncClose()
/// }
/// // [...]
/// channel.pipeline.addHandler(NIOWritePCAPHandler(mode: .server, fileSink: fileSink.write))
/// ```
///
/// - parameters:
/// - path: The path of the `.pcap` file to write.
/// - fileWritingMode: Whether to append to an existing `.pcap` file or to create a new `.pcap` file. If you
/// choose to append to an existing `.pcap` file, the file header does not get written.
/// - errorHandler: Invoked when an unrecoverable error has occured. In this event you may log the error and
/// you must then `syncClose` the `SynchronizedFileSink`. When `errorHandler` has been
/// called, no further writes will be attempted and `errorHandler` will also not be called
/// again.
public static func fileSinkWritingToFile(
path: String,
fileWritingMode: FileWritingMode = .createNewPCAPFile,
errorHandler: @escaping (Swift.Error) -> Void
) throws -> SynchronizedFileSink {
let oflag: CInt = fileWritingMode == FileWritingMode.createNewPCAPFile ? (O_TRUNC | O_CREAT) : O_APPEND
let fd = try path.withCString { pathPtr -> CInt in
let fd = open(pathPtr, O_WRONLY | oflag, 0o600)
guard fd >= 0 else {
throw SynchronizedFileSink.Error(errorCode: Error.ErrorCode.cannotOpenFileError.rawValue)
}
return fd
}
if fileWritingMode == .createNewPCAPFile {
let writeOk = NIOWritePCAPHandler.pcapFileHeader.withUnsafeReadableBytes { ptr in
sysWrite(fd, ptr.baseAddress, ptr.count) == ptr.count
}
guard writeOk else {
throw SynchronizedFileSink.Error(errorCode: Error.ErrorCode.cannotWriteToFileError.rawValue)
}
}
return SynchronizedFileSink(
fileHandle: NIOFileHandle(_deprecatedTakingOwnershipOfDescriptor: fd),
errorHandler: errorHandler
)
}
private init(
fileHandle: NIOFileHandle,
errorHandler: @escaping (Swift.Error) -> Void
) {
self.fileHandle = fileHandle
self.workQueue = DispatchQueue(label: "io.swiftnio.extras.WritePCAPHandler.SynchronizedFileSink.workQueue")
self.errorHandler = errorHandler
}
/// Synchronously close this `SynchronizedFileSink` and any associated resources.
///
/// After use, it is mandatory to close a `SynchronizedFileSink` exactly once. `syncClose` may be called from
/// any thread but not from an `EventLoop` as it will block, and may not be called from an async context.
@available(*, noasync, message: "syncClose() can block indefinitely, prefer close()", renamed: "close()")
public func syncClose() throws {
try self._syncClose()
}
private func _syncClose() throws {
self.writesGroup.wait()
try self.workQueue.sync {
try self.fileHandle.close()
}
}
/// Close this `SynchronizedFileSink` and any associated resources.
///
/// After use, it is mandatory to close a `SynchronizedFileSink` exactly once.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public func close() async throws {
try await withCheckedThrowingContinuation { continuation in
self.workQueue.async {
continuation.resume(with: Result { try self.fileHandle.close() })
}
}
}
public func write(buffer: ByteBuffer) {
self.workQueue.async(group: self.writesGroup) {
guard case .running = self.state else {
return
}
do {
try self.fileHandle.withUnsafeFileDescriptor { fd in
var buffer = buffer
while buffer.readableBytes > 0 {
try buffer.readWithUnsafeReadableBytes { dataPtr in
let r = sysWrite(fd, dataPtr.baseAddress, dataPtr.count)
assert(r != 0, "write returned 0 but we tried to write \(dataPtr.count) bytes")
guard r > 0 else {
throw Error.init(errorCode: Error.ErrorCode.cannotWriteToFileError.rawValue)
}
return r
}
}
}
} catch {
self.state = .error(error)
self.errorHandler(error)
}
}
}
}
}
extension NIOWritePCAPHandler.SynchronizedFileSink: @unchecked Sendable {}