Strict concurrency for NIOExtrasPerformanceTester

This commit is contained in:
George Barnett 2025-03-24 16:14:13 +00:00
parent c92af9d4ef
commit 6688e3a3f4
4 changed files with 70 additions and 45 deletions

View File

@ -90,7 +90,8 @@ var targets: [PackageDescription.Target] = [
.product(name: "NIOPosix", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOEmbedded", package: "swift-nio"), .product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "NIOHTTP1", package: "swift-nio"), .product(name: "NIOHTTP1", package: "swift-nio"),
] ],
swiftSettings: strictConcurrencySettings
), ),
.target( .target(
name: "NIOSOCKS", name: "NIOSOCKS",

View File

@ -14,55 +14,68 @@
import Foundation import Foundation
import NIOCore import NIOCore
import NIOConcurrencyHelpers
import NIOExtras import NIOExtras
class HTTP1ThreadedPCapPerformanceTest: HTTP1ThreadedPerformanceTest { class HTTP1ThreadedPCapPerformanceTest: HTTP1ThreadedPerformanceTest {
private class SinkHolder { private final class SinkHolder: Sendable {
var fileSink: NIOWritePCAPHandler.SynchronizedFileSink! let fileSink: NIOLoopBound<NIOWritePCAPHandler.SynchronizedFileSink>
let eventLoop: any EventLoop
init(eventLoop: any EventLoop) {
self.eventLoop = eventLoop
func setUp() throws {
let outputFile = NSTemporaryDirectory() + "/" + UUID().uuidString let outputFile = NSTemporaryDirectory() + "/" + UUID().uuidString
self.fileSink = try NIOWritePCAPHandler.SynchronizedFileSink.fileSinkWritingToFile(path: outputFile) { let fileSink = try! NIOWritePCAPHandler.SynchronizedFileSink.fileSinkWritingToFile(path: outputFile) {
error in error in
print("ERROR: \(error)") print("ERROR: \(error)")
exit(1) exit(1)
} }
self.fileSink = NIOLoopBound(fileSink, eventLoop: eventLoop)
} }
func tearDown() { func tearDown() -> EventLoopFuture<Void> {
try! self.fileSink.syncClose() self.eventLoop.submit {
try self.fileSink.value.syncClose()
}
} }
} }
init() { init() {
let sinkHolder = SinkHolder() let sinkHolders = NIOLockedValueBox<[SinkHolder]>([])
func addPCap(channel: Channel) -> EventLoopFuture<Void> { self.sinkHolders = sinkHolders
channel.eventLoop.submit {
let pcapHandler = NIOWritePCAPHandler(
mode: .client,
fileSink: sinkHolder.fileSink.write
)
return try channel.pipeline.syncOperations.addHandler(pcapHandler, position: .first)
}
}
self.sinkHolder = sinkHolder
super.init( super.init(
numberOfRepeats: 50, numberOfRepeats: 50,
numberOfClients: System.coreCount, numberOfClients: System.coreCount,
requestsPerClient: 500, requestsPerClient: 500,
extraInitialiser: { channel in addPCap(channel: channel) } extraInitialiser: { channel in
channel.eventLoop.makeCompletedFuture {
let sinkHolder = SinkHolder(eventLoop: channel.eventLoop)
sinkHolders.withLockedValue { $0.append(sinkHolder) }
let pcapHandler = NIOWritePCAPHandler(
mode: .client,
fileSink: sinkHolder.fileSink.value.write(buffer:)
)
return try channel.pipeline.syncOperations.addHandler(pcapHandler, position: .first)
}
}
) )
} }
private let sinkHolder: SinkHolder private let sinkHolders: NIOLockedValueBox<[SinkHolder]>
override func run() throws -> Int { override func run() throws -> Int {
// Opening and closing the file included here as flushing data to disk is not known to complete until closed. let result = Result {
try sinkHolder.setUp() try super.run()
defer {
sinkHolder.tearDown()
} }
return try super.run()
let holders = self.sinkHolders.withLockedValue { $0 }
for holder in holders {
try holder.tearDown().wait()
}
return try result.get()
} }
} }

View File

@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===// //===----------------------------------------------------------------------===//
import NIOCore import NIOCore
import NIOConcurrencyHelpers
import NIOHTTP1 import NIOHTTP1
import NIOPosix import NIOPosix
@ -107,7 +108,7 @@ final class RepeatedRequests: ChannelInboundHandler {
let reqPart = self.unwrapInboundIn(data) let reqPart = self.unwrapInboundIn(data)
if case .end(nil) = reqPart { if case .end(nil) = reqPart {
if self.remainingNumberOfRequests <= 0 { if self.remainingNumberOfRequests <= 0 {
context.channel.close().map { self.doneRequests }.cascade(to: self.isDonePromise) context.channel.close().assumeIsolated().map { self.doneRequests }.nonisolated().cascade(to: self.isDonePromise)
} else { } else {
self.doneRequests += 1 self.doneRequests += 1
self.remainingNumberOfRequests -= 1 self.remainingNumberOfRequests -= 1
@ -124,7 +125,7 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
let numberOfRepeats: Int let numberOfRepeats: Int
let numberOfClients: Int let numberOfClients: Int
let requestsPerClient: Int let requestsPerClient: Int
let extraInitialiser: (Channel) -> EventLoopFuture<Void> let extraInitialiser: @Sendable (Channel) -> EventLoopFuture<Void>
let head: HTTPRequestHead let head: HTTPRequestHead
@ -135,7 +136,7 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
numberOfRepeats: Int, numberOfRepeats: Int,
numberOfClients: Int, numberOfClients: Int,
requestsPerClient: Int, requestsPerClient: Int,
extraInitialiser: @escaping (Channel) -> EventLoopFuture<Void> extraInitialiser: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
) { ) {
self.numberOfRepeats = numberOfRepeats self.numberOfRepeats = numberOfRepeats
self.numberOfClients = numberOfClients self.numberOfClients = numberOfClients
@ -152,8 +153,10 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
self.serverChannel = try ServerBootstrap(group: self.group) self.serverChannel = try ServerBootstrap(group: self.group)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelInitializer { channel in .childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true).flatMap { channel.eventLoop.makeCompletedFuture {
channel.pipeline.addHandler(SimpleHTTPServer()) let sync = channel.pipeline.syncOperations
try sync.configureHTTPServerPipeline(withPipeliningAssistance: true)
try sync.addHandler(SimpleHTTPServer())
} }
}.bind(host: "127.0.0.1", port: 0).wait() }.bind(host: "127.0.0.1", port: 0).wait()
} }
@ -167,23 +170,31 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
var reqs: [Int] = [] var reqs: [Int] = []
reqs.reserveCapacity(self.numberOfRepeats) reqs.reserveCapacity(self.numberOfRepeats)
for _ in 0..<self.numberOfRepeats { for _ in 0..<self.numberOfRepeats {
var requestHandlers: [RepeatedRequests] = [] let requestsCompletedFutures = NIOLockedValueBox<[EventLoopFuture<Int>]>([])
requestHandlers.reserveCapacity(self.numberOfClients) requestsCompletedFutures.withLockedValue({ $0.reserveCapacity(self.numberOfClients) })
var clientChannels: [Channel] = [] var clientChannels: [Channel] = []
clientChannels.reserveCapacity(self.numberOfClients) clientChannels.reserveCapacity(self.numberOfClients)
for _ in 0..<self.numberOfClients { for _ in 0..<self.numberOfClients {
let clientChannel = try! ClientBootstrap(group: self.group) let clientChannel = try! ClientBootstrap(group: self.group)
.channelInitializer { channel in .channelInitializer { [head = self.head, requestsPerClient = self.requestsPerClient, extraInitialiser = self.extraInitialiser] channel in
channel.pipeline.addHTTPClientHandlers().flatMap { channel.eventLoop.makeCompletedFuture {
let sync = channel.pipeline.syncOperations
try sync.addHTTPClientHandlers()
let repeatedRequestsHandler = RepeatedRequests( let repeatedRequestsHandler = RepeatedRequests(
numberOfRequests: self.requestsPerClient, numberOfRequests: requestsPerClient,
eventLoop: channel.eventLoop, eventLoop: channel.eventLoop,
head: self.head head: head
) )
requestHandlers.append(repeatedRequestsHandler)
return channel.pipeline.addHandler(repeatedRequestsHandler) requestsCompletedFutures.withLockedValue {
$0.append(repeatedRequestsHandler.completedFuture)
}
try sync.addHandler(repeatedRequestsHandler)
}.flatMap { }.flatMap {
self.extraInitialiser(channel) extraInitialiser(channel)
} }
} }
.connect(to: self.serverChannel.localAddress!) .connect(to: self.serverChannel.localAddress!)
@ -199,13 +210,12 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
let allWrites = EventLoopFuture<Void>.andAllComplete(writeFutures, on: writeFutures.first!.eventLoop) let allWrites = EventLoopFuture<Void>.andAllComplete(writeFutures, on: writeFutures.first!.eventLoop)
try! allWrites.wait() try! allWrites.wait()
let streamCompletedFutures = requestHandlers.map { rh in rh.completedFuture } let futures = requestsCompletedFutures.withLockedValue { $0 }
let requestsServed = EventLoopFuture<Int>.reduce( let requestsServed = EventLoopFuture<Int>.reduce(
0, 0,
streamCompletedFutures, futures,
on: streamCompletedFutures.first!.eventLoop, on: futures.first!.eventLoop
+ ) { $0 + $1 }
)
reqs.append(try! requestsServed.wait()) reqs.append(try! requestsServed.wait())
} }
return reqs.reduce(0, +) / self.numberOfRepeats return reqs.reduce(0, +) / self.numberOfRepeats

View File

@ -17,6 +17,7 @@ import NIOExtras
class HTTP1ThreadedRollingPCapPerformanceTest: HTTP1ThreadedPerformanceTest { class HTTP1ThreadedRollingPCapPerformanceTest: HTTP1ThreadedPerformanceTest {
init() { init() {
@Sendable
func addRollingPCap(channel: Channel) -> EventLoopFuture<Void> { func addRollingPCap(channel: Channel) -> EventLoopFuture<Void> {
channel.eventLoop.submit { channel.eventLoop.submit {
let pcapRingBuffer = NIOPCAPRingBuffer( let pcapRingBuffer = NIOPCAPRingBuffer(