From 6688e3a3f4277cad6f066becbd19ddaa410e07be Mon Sep 17 00:00:00 2001
From: George Barnett <gbarnett@apple.com>
Date: Mon, 24 Mar 2025 16:14:13 +0000
Subject: [PATCH] Strict concurrency for NIOExtrasPerformanceTester

---
 Package.swift                                 |  3 +-
 .../HTTP1PCAPPerformanceTests.swift           | 63 +++++++++++--------
 .../HTTP1PerformanceTestFramework.swift       | 48 ++++++++------
 .../HTTP1RollingPCAPPerformanceTests.swift    |  1 +
 4 files changed, 70 insertions(+), 45 deletions(-)

diff --git a/Package.swift b/Package.swift
index 9fff710..e9f2073 100644
--- a/Package.swift
+++ b/Package.swift
@@ -90,7 +90,8 @@ var targets: [PackageDescription.Target] = [
             .product(name: "NIOPosix", package: "swift-nio"),
             .product(name: "NIOEmbedded", package: "swift-nio"),
             .product(name: "NIOHTTP1", package: "swift-nio"),
-        ]
+        ],
+        swiftSettings: strictConcurrencySettings
     ),
     .target(
         name: "NIOSOCKS",
diff --git a/Sources/NIOExtrasPerformanceTester/HTTP1PCAPPerformanceTests.swift b/Sources/NIOExtrasPerformanceTester/HTTP1PCAPPerformanceTests.swift
index 2a35372..310c770 100644
--- a/Sources/NIOExtrasPerformanceTester/HTTP1PCAPPerformanceTests.swift
+++ b/Sources/NIOExtrasPerformanceTester/HTTP1PCAPPerformanceTests.swift
@@ -14,55 +14,68 @@
 
 import Foundation
 import NIOCore
+import NIOConcurrencyHelpers
 import NIOExtras
 
 class HTTP1ThreadedPCapPerformanceTest: HTTP1ThreadedPerformanceTest {
-    private class SinkHolder {
-        var fileSink: NIOWritePCAPHandler.SynchronizedFileSink!
+    private final class SinkHolder: Sendable {
+        let fileSink: NIOLoopBound<NIOWritePCAPHandler.SynchronizedFileSink>
+        let eventLoop: any EventLoop
+
+        init(eventLoop: any EventLoop) {
+            self.eventLoop = eventLoop
 
-        func setUp() throws {
             let outputFile = NSTemporaryDirectory() + "/" + UUID().uuidString
-            self.fileSink = try NIOWritePCAPHandler.SynchronizedFileSink.fileSinkWritingToFile(path: outputFile) {
+            let fileSink = try! NIOWritePCAPHandler.SynchronizedFileSink.fileSinkWritingToFile(path: outputFile) {
                 error in
                 print("ERROR: \(error)")
                 exit(1)
             }
+
+            self.fileSink = NIOLoopBound(fileSink, eventLoop: eventLoop)
         }
 
-        func tearDown() {
-            try! self.fileSink.syncClose()
+        func tearDown() -> EventLoopFuture<Void> {
+            self.eventLoop.submit {
+                try self.fileSink.value.syncClose()
+            }
         }
     }
 
     init() {
-        let sinkHolder = SinkHolder()
-        func addPCap(channel: Channel) -> EventLoopFuture<Void> {
-            channel.eventLoop.submit {
-                let pcapHandler = NIOWritePCAPHandler(
-                    mode: .client,
-                    fileSink: sinkHolder.fileSink.write
-                )
-                return try channel.pipeline.syncOperations.addHandler(pcapHandler, position: .first)
-            }
-        }
-
-        self.sinkHolder = sinkHolder
+        let sinkHolders = NIOLockedValueBox<[SinkHolder]>([])
+        self.sinkHolders = sinkHolders
         super.init(
             numberOfRepeats: 50,
             numberOfClients: System.coreCount,
             requestsPerClient: 500,
-            extraInitialiser: { channel in addPCap(channel: channel) }
+            extraInitialiser: { channel in
+                channel.eventLoop.makeCompletedFuture {
+                    let sinkHolder = SinkHolder(eventLoop: channel.eventLoop)
+                    sinkHolders.withLockedValue { $0.append(sinkHolder) }
+
+                    let pcapHandler = NIOWritePCAPHandler(
+                        mode: .client,
+                        fileSink: sinkHolder.fileSink.value.write(buffer:)
+                    )
+                    return try channel.pipeline.syncOperations.addHandler(pcapHandler, position: .first)
+                }
+            }
         )
     }
 
-    private let sinkHolder: SinkHolder
+    private let sinkHolders: NIOLockedValueBox<[SinkHolder]>
 
     override func run() throws -> Int {
-        // Opening and closing the file included here as flushing data to disk is not known to complete until closed.
-        try sinkHolder.setUp()
-        defer {
-            sinkHolder.tearDown()
+        let result = Result {
+            try super.run()
         }
-        return try super.run()
+
+        let holders = self.sinkHolders.withLockedValue { $0 }
+        for holder in holders {
+            try holder.tearDown().wait()
+        }
+
+        return try result.get()
     }
 }
diff --git a/Sources/NIOExtrasPerformanceTester/HTTP1PerformanceTestFramework.swift b/Sources/NIOExtrasPerformanceTester/HTTP1PerformanceTestFramework.swift
index 0e9e3ac..7e00f8b 100644
--- a/Sources/NIOExtrasPerformanceTester/HTTP1PerformanceTestFramework.swift
+++ b/Sources/NIOExtrasPerformanceTester/HTTP1PerformanceTestFramework.swift
@@ -13,6 +13,7 @@
 //===----------------------------------------------------------------------===//
 
 import NIOCore
+import NIOConcurrencyHelpers
 import NIOHTTP1
 import NIOPosix
 
@@ -107,7 +108,7 @@ final class RepeatedRequests: ChannelInboundHandler {
         let reqPart = self.unwrapInboundIn(data)
         if case .end(nil) = reqPart {
             if self.remainingNumberOfRequests <= 0 {
-                context.channel.close().map { self.doneRequests }.cascade(to: self.isDonePromise)
+                context.channel.close().assumeIsolated().map { self.doneRequests }.nonisolated().cascade(to: self.isDonePromise)
             } else {
                 self.doneRequests += 1
                 self.remainingNumberOfRequests -= 1
@@ -124,7 +125,7 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
     let numberOfRepeats: Int
     let numberOfClients: Int
     let requestsPerClient: Int
-    let extraInitialiser: (Channel) -> EventLoopFuture<Void>
+    let extraInitialiser: @Sendable (Channel) -> EventLoopFuture<Void>
 
     let head: HTTPRequestHead
 
@@ -135,7 +136,7 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
         numberOfRepeats: Int,
         numberOfClients: Int,
         requestsPerClient: Int,
-        extraInitialiser: @escaping (Channel) -> EventLoopFuture<Void>
+        extraInitialiser: @escaping @Sendable (Channel) -> EventLoopFuture<Void>
     ) {
         self.numberOfRepeats = numberOfRepeats
         self.numberOfClients = numberOfClients
@@ -152,8 +153,10 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
         self.serverChannel = try ServerBootstrap(group: self.group)
             .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
             .childChannelInitializer { channel in
-                channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true).flatMap {
-                    channel.pipeline.addHandler(SimpleHTTPServer())
+                channel.eventLoop.makeCompletedFuture {
+                    let sync = channel.pipeline.syncOperations
+                    try sync.configureHTTPServerPipeline(withPipeliningAssistance: true)
+                    try sync.addHandler(SimpleHTTPServer())
                 }
             }.bind(host: "127.0.0.1", port: 0).wait()
     }
@@ -167,23 +170,31 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
         var reqs: [Int] = []
         reqs.reserveCapacity(self.numberOfRepeats)
         for _ in 0..<self.numberOfRepeats {
-            var requestHandlers: [RepeatedRequests] = []
-            requestHandlers.reserveCapacity(self.numberOfClients)
+            let requestsCompletedFutures = NIOLockedValueBox<[EventLoopFuture<Int>]>([])
+            requestsCompletedFutures.withLockedValue({ $0.reserveCapacity(self.numberOfClients) })
+
             var clientChannels: [Channel] = []
             clientChannels.reserveCapacity(self.numberOfClients)
             for _ in 0..<self.numberOfClients {
                 let clientChannel = try! ClientBootstrap(group: self.group)
-                    .channelInitializer { channel in
-                        channel.pipeline.addHTTPClientHandlers().flatMap {
+                    .channelInitializer { [head = self.head, requestsPerClient = self.requestsPerClient, extraInitialiser = self.extraInitialiser] channel in
+                        channel.eventLoop.makeCompletedFuture {
+                            let sync = channel.pipeline.syncOperations
+                            try sync.addHTTPClientHandlers()
+
                             let repeatedRequestsHandler = RepeatedRequests(
-                                numberOfRequests: self.requestsPerClient,
+                                numberOfRequests: requestsPerClient,
                                 eventLoop: channel.eventLoop,
-                                head: self.head
+                                head: head
                             )
-                            requestHandlers.append(repeatedRequestsHandler)
-                            return channel.pipeline.addHandler(repeatedRequestsHandler)
+
+                            requestsCompletedFutures.withLockedValue {
+                                $0.append(repeatedRequestsHandler.completedFuture)
+                            }
+
+                            try sync.addHandler(repeatedRequestsHandler)
                         }.flatMap {
-                            self.extraInitialiser(channel)
+                            extraInitialiser(channel)
                         }
                     }
                     .connect(to: self.serverChannel.localAddress!)
@@ -199,13 +210,12 @@ class HTTP1ThreadedPerformanceTest: Benchmark {
             let allWrites = EventLoopFuture<Void>.andAllComplete(writeFutures, on: writeFutures.first!.eventLoop)
             try! allWrites.wait()
 
-            let streamCompletedFutures = requestHandlers.map { rh in rh.completedFuture }
+            let futures = requestsCompletedFutures.withLockedValue { $0 }
             let requestsServed = EventLoopFuture<Int>.reduce(
                 0,
-                streamCompletedFutures,
-                on: streamCompletedFutures.first!.eventLoop,
-                +
-            )
+                futures,
+                on: futures.first!.eventLoop
+            ) { $0 + $1 }
             reqs.append(try! requestsServed.wait())
         }
         return reqs.reduce(0, +) / self.numberOfRepeats
diff --git a/Sources/NIOExtrasPerformanceTester/HTTP1RollingPCAPPerformanceTests.swift b/Sources/NIOExtrasPerformanceTester/HTTP1RollingPCAPPerformanceTests.swift
index 392c0c9..f1a6140 100644
--- a/Sources/NIOExtrasPerformanceTester/HTTP1RollingPCAPPerformanceTests.swift
+++ b/Sources/NIOExtrasPerformanceTester/HTTP1RollingPCAPPerformanceTests.swift
@@ -17,6 +17,7 @@ import NIOExtras
 
 class HTTP1ThreadedRollingPCapPerformanceTest: HTTP1ThreadedPerformanceTest {
     init() {
+        @Sendable
         func addRollingPCap(channel: Channel) -> EventLoopFuture<Void> {
             channel.eventLoop.submit {
                 let pcapRingBuffer = NIOPCAPRingBuffer(