Strict concurrency for NIOExtras and NIOExtrasTests

This commit is contained in:
George Barnett 2025-03-24 11:13:17 +00:00
parent ae4d6b4b9a
commit d3f624aefd
20 changed files with 147 additions and 56 deletions

View File

@ -15,6 +15,24 @@
import PackageDescription
let strictConcurrencyDevelopment = false
let strictConcurrencySettings: [SwiftSetting] = {
var initialSettings: [SwiftSetting] = []
initialSettings.append(contentsOf: [
.enableUpcomingFeature("StrictConcurrency"),
.enableUpcomingFeature("InferSendableFromCaptures"),
])
if strictConcurrencyDevelopment {
// -warnings-as-errors here is a workaround so that IDE-based development can
// get tripped up on -require-explicit-sendable.
initialSettings.append(.unsafeFlags(["-require-explicit-sendable", "-warnings-as-errors"]))
}
return initialSettings
}()
var targets: [PackageDescription.Target] = [
.target(
name: "NIOExtras",
@ -22,7 +40,8 @@ var targets: [PackageDescription.Target] = [
.product(name: "NIO", package: "swift-nio"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOHTTP1", package: "swift-nio"),
]
],
swiftSettings: strictConcurrencySettings
),
.target(
name: "NIOHTTPCompression",
@ -101,7 +120,8 @@ var targets: [PackageDescription.Target] = [
.product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOTestUtils", package: "swift-nio"),
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
]
],
swiftSettings: strictConcurrencySettings
),
.testTarget(
name: "NIOHTTPCompressionTests",
@ -237,10 +257,10 @@ let package = Package(
.library(name: "NIOHTTPResponsiveness", targets: ["NIOHTTPResponsiveness"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.77.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"),
.package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.27.0"),
.package(url: "https://github.com/apple/swift-http-types.git", from: "1.3.0"),
.package(url: "https://github.com/apple/swift-http-structured-headers.git", from: "1.1.0"),
.package(url: "https://github.com/apple/swift-http-structured-headers.git", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-algorithms.git", from: "1.2.0"),

View File

@ -181,3 +181,6 @@ public class DebugInboundEventsHandler: ChannelInboundHandler {
@available(*, unavailable)
extension DebugInboundEventsHandler: Sendable {}
@available(*, unavailable)
extension DebugInboundEventsHandler.Event: Sendable {}

View File

@ -177,3 +177,6 @@ public class DebugOutboundEventsHandler: ChannelOutboundHandler {
@available(*, unavailable)
extension DebugOutboundEventsHandler: Sendable {}
@available(*, unavailable)
extension DebugOutboundEventsHandler.Event: Sendable {}

View File

@ -162,8 +162,8 @@ public final class NIOHTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableC
return
}
let loopBoundContext = NIOLoopBound.init(context, eventLoop: context.eventLoop)
let timeout = context.eventLoop.scheduleTask(deadline: self.deadline) {
let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)
let timeout = context.eventLoop.assumeIsolated().scheduleTask(deadline: self.deadline) {
switch self.state {
case .initialized:
preconditionFailure("How can we have a scheduled timeout, if the connection is not even up?")
@ -361,6 +361,9 @@ public final class NIOHTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableC
}
@available(*, unavailable)
extension NIOHTTP1ProxyConnectHandler: Sendable {}
extension NIOHTTP1ProxyConnectHandler.Error: Hashable {
// compare only the kind of error, not the associated response head
public static func == (lhs: Self, rhs: Self) -> Bool {

View File

@ -13,6 +13,6 @@
//===----------------------------------------------------------------------===//
/// Namespace to contain JSON framing implementation.
public enum NIOJSONRPCFraming {
public enum NIOJSONRPCFraming: Sendable {
// just a name-space
}

View File

@ -80,7 +80,7 @@ public final class LengthFieldBasedFrameDecoder: ByteToMessageDecoder {
public static let maxSupportedLengthFieldSize: Int = Int(Int32.max)
/// An enumeration to describe the length of a piece of data in bytes.
public enum ByteLength {
public enum ByteLength: Sendable {
/// One byte
case one
/// Two bytes

View File

@ -54,7 +54,7 @@ public enum LengthFieldPrependerError: Error {
///
public final class LengthFieldPrepender: ChannelOutboundHandler {
/// An enumeration to describe the length of a piece of data in bytes.
public enum ByteLength {
public enum ByteLength: Sendable {
/// One byte
case one
/// Two bytes

View File

@ -17,7 +17,7 @@ import NIOCore
public protocol NIOExtrasError: Equatable, Error {}
/// Errors that are raised in NIOExtras.
public enum NIOExtrasErrors {
public enum NIOExtrasErrors: Sendable {
/// Error indicating that after an operation some unused bytes are left.
public struct LeftOverBytesError: NIOExtrasError {

View File

@ -12,8 +12,9 @@
//
//===----------------------------------------------------------------------===//
@preconcurrency
public protocol NIORequestIdentifiable {
associatedtype RequestID: Hashable
associatedtype RequestID: Hashable & Sendable
var requestID: RequestID { get }
}

View File

@ -206,8 +206,8 @@ private final class CollectAcceptedChannelsHandler: ChannelInboundHandler {
do {
try self.channelCollector.channelAdded(channel)
let closeFuture = channel.closeFuture
closeFuture.whenComplete { (_: Result<Void, Error>) in
self.channelCollector.channelRemoved(channel)
closeFuture.whenComplete { [ channelCollector = self.channelCollector] _ in
channelCollector.channelRemoved(channel)
}
context.fireChannelRead(data)
} catch ShutdownError.alreadyShutdown {

View File

@ -15,12 +15,12 @@
import NIOCore
/// ``RequestResponseHandler`` receives a `Request` alongside an `EventLoopPromise<Response>` from the `Channel`'s
/// outbound side. It will fulfill the promise with the `Response` once it's received from the `Channel`'s inbound
/// outbound side. It will fulfil the promise with the `Response` once it's received from the `Channel`'s inbound
/// side.
///
/// ``RequestResponseHandler`` does support pipelining `Request`s and it will send them pipelined further down the
/// `Channel`. Should ``RequestResponseHandler`` receive an error from the `Channel`, it will fail all promises meant for
/// the outstanding `Reponse`s and close the `Channel`. All requests enqueued after an error occured will be immediately
/// the outstanding `Response`s and close the `Channel`. All requests enqueued after an error occurred will be immediately
/// failed with the first error the channel received.
///
/// ``RequestResponseHandler`` requires that the `Response`s arrive on `Channel` in the same order as the `Request`s
@ -88,7 +88,18 @@ public final class RequestResponseHandler<Request, Response>: ChannelDuplexHandl
let response = self.unwrapInboundIn(data)
let promise = self.promiseBuffer.removeFirst()
promise.succeed(response)
// If the event loop of the promise is the same as the context then there's no
// change in isolation. Otherwise transfer the response onto the correct event-loop
// before succeeding the promise.
if promise.futureResult.eventLoop === context.eventLoop {
promise.assumeIsolatedUnsafeUnchecked().succeed(response)
} else {
let unsafeTransfer = UnsafeTransfer(response)
promise.futureResult.eventLoop.execute {
let response = unsafeTransfer.wrappedValue
promise.assumeIsolatedUnsafeUnchecked().succeed(response)
}
}
}
public func errorCaught(context: ChannelHandlerContext, error: Error) {

View File

@ -96,7 +96,18 @@ where Request.RequestID == Response.RequestID {
let response = self.unwrapInboundIn(data)
if let promise = self.promiseBuffer.removeValue(forKey: response.requestID) {
promise.succeed(response)
// If the event loop of the promise is the same as the context then there's no
// change in isolation. Otherwise transfer the response onto the correct event-loop
// before succeeding the promise.
if promise.futureResult.eventLoop === context.eventLoop {
promise.assumeIsolatedUnsafeUnchecked().succeed(response)
} else {
let unsafeTransfer = UnsafeTransfer(response)
promise.futureResult.eventLoop.execute {
let response = unsafeTransfer.wrappedValue
promise.assumeIsolatedUnsafeUnchecked().succeed(response)
}
}
} else {
context.fireErrorCaught(NIOExtrasErrors.ResponseForInvalidRequest<Response>(requestID: response.requestID))
}
@ -134,6 +145,9 @@ where Request.RequestID == Response.RequestID {
}
}
@available(*, unavailable)
extension NIORequestResponseWithIDHandler: Sendable {}
extension NIOExtrasErrors {
public struct ResponseForInvalidRequest<Response: NIORequestIdentifiable>: NIOExtrasError, Equatable {
public var requestID: Response.RequestID

View File

@ -0,0 +1,32 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
/// ``UnsafeTransfer`` can be used to make non-`Sendable` values `Sendable`.
/// As the name implies, the usage of this is unsafe because it disables the sendable checking of the compiler.
/// It can be used similar to `@unsafe Sendable` but for values instead of types.
@usableFromInline
struct UnsafeTransfer<Wrapped> {
@usableFromInline
var wrappedValue: Wrapped
@inlinable
init(_ wrappedValue: Wrapped) {
self.wrappedValue = wrappedValue
}
}
extension UnsafeTransfer: @unchecked Sendable {}
extension UnsafeTransfer: Equatable where Wrapped: Equatable {}
extension UnsafeTransfer: Hashable where Wrapped: Hashable {}

View File

@ -127,15 +127,15 @@ struct PCAPRecordHeader {
/// ``NIOWritePCAPHandler`` will also work with Unix Domain Sockets in which case it will still synthesize a TCP packet
/// capture with local address `111.111.111.111` (port `1111`) and remote address `222.222.222.222` (port `2222`).
public class NIOWritePCAPHandler: RemovableChannelHandler {
public enum Mode {
public enum Mode: Sendable {
case client
case server
}
/// Settings for ``NIOWritePCAPHandler``.
public struct Settings {
public struct Settings: Sendable {
/// When to issue data into the `.pcap` file.
public enum EmitPCAP {
public enum EmitPCAP: Sendable {
/// Write the data immediately when ``NIOWritePCAPHandler`` saw the event on the `ChannelPipeline`.
///
/// For writes this means when the `write` event is triggered. Please note that this will write potentially
@ -521,7 +521,9 @@ extension NIOWritePCAPHandler: ChannelDuplexHandler {
switch self.settings.emitPCAPWrites {
case .whenCompleted:
let promise = promise ?? context.eventLoop.makePromise()
promise.futureResult.whenSuccess {
// A user-provided promise might be on a different event-loop, hop back to the context's
// event loop.
promise.futureResult.hop(to: context.eventLoop).assumeIsolatedUnsafeUnchecked().whenSuccess {
emitWrites()
}
context.write(data, promise: promise)
@ -673,7 +675,7 @@ extension NIOWritePCAPHandler {
private let errorHandler: (Swift.Error) -> Void
private var state: State = .running // protected by `workQueue`
public enum FileWritingMode {
public enum FileWritingMode: Sendable {
case appendToExistingPCAPFile
case createNewPCAPFile
}

View File

@ -559,7 +559,7 @@ class LengthFieldBasedFrameDecoderTest: XCTestCase {
context.fireChannelRead(data)
}
}
XCTAssertNoThrow(try channel.pipeline.addHandler(CloseInReadHandler()).wait())
XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(CloseInReadHandler()))
var buf = channel.allocator.buffer(capacity: 1024)
buf.writeBytes([UInt8(0), 0, 0, 1, 100])

View File

@ -165,7 +165,7 @@ class LineBasedFrameDecoderTest: XCTestCase {
}
let receivedLeftOversPromise: EventLoopPromise<ByteBuffer> = self.channel.eventLoop.makePromise()
let handler = CloseWhenMyFavouriteMessageArrives(receivedLeftOversPromise: receivedLeftOversPromise)
XCTAssertNoThrow(try self.channel.pipeline.addHandler(handler).wait())
XCTAssertNoThrow(try self.channel.pipeline.syncOperations.addHandler(handler))
var buffer = self.channel.allocator.buffer(capacity: 16)
buffer.writeString("a\nbb\nccc\ndddd\neeeee\nffffff\nXXX")
XCTAssertNoThrow(try self.channel.writeInbound(buffer))

View File

@ -161,7 +161,7 @@ class PCAPRingBufferTest: XCTestCase {
private var bytesUntilTrigger: Int
private var pcapRingBuffer: NIOPCAPRingBuffer
private var sink: (ByteBuffer) -> Void
private let sink: (ByteBuffer) -> Void
init(triggerBytes: Int, pcapRingBuffer: NIOPCAPRingBuffer, sink: @escaping (ByteBuffer) -> Void) {
self.bytesUntilTrigger = triggerBytes
@ -173,9 +173,9 @@ class PCAPRingBufferTest: XCTestCase {
if bytesUntilTrigger > 0 {
self.bytesUntilTrigger -= self.unwrapOutboundIn(data).readableBytes
if self.bytesUntilTrigger <= 0 {
context.write(data).map {
context.write(data).assumeIsolated().map {
self.sink(captureBytes(ringBuffer: self.pcapRingBuffer))
}.cascade(to: promise)
}.nonisolated().cascade(to: promise)
return
}
}

View File

@ -35,7 +35,7 @@ private final class WaitForQuiesceUserEvent: ChannelInboundHandler {
}
}
public class QuiescingHelperTest: XCTestCase {
final class QuiescingHelperTest: XCTestCase {
func testShutdownIsImmediateWhenNoChannelsCollected() throws {
let el = EmbeddedEventLoop()
let channel = EmbeddedChannel(handler: nil, loop: el)
@ -58,7 +58,7 @@ public class QuiescingHelperTest: XCTestCase {
XCTAssertNoThrow(try serverChannel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 1)).wait())
let quiesce = ServerQuiescingHelper(group: el)
let collectionHandler = quiesce.makeServerChannelHandler(channel: serverChannel)
XCTAssertNoThrow(try serverChannel.pipeline.addHandler(collectionHandler).wait())
XCTAssertNoThrow(try serverChannel.pipeline.syncOperations.addHandler(collectionHandler))
var waitForFutures: [EventLoopFuture<Void>] = []
var childChannels: [Channel] = []
@ -123,8 +123,10 @@ public class QuiescingHelperTest: XCTestCase {
}
let channel = try! ServerBootstrap(group: group).serverChannelInitializer { channel in
channel.pipeline.addHandler(MakeFirstCloseFailAndDontActuallyCloseHandler(), position: .first).flatMap {
channel.pipeline.addHandler(quiesce.makeServerChannelHandler(channel: channel))
channel.eventLoop.makeCompletedFuture {
let sync = channel.pipeline.syncOperations
try sync.addHandler(MakeFirstCloseFailAndDontActuallyCloseHandler(), position: .first)
try sync.addHandler(quiesce.makeServerChannelHandler(channel: channel))
}
}.bind(host: "localhost", port: 0).wait()
defer {
@ -194,7 +196,7 @@ public class QuiescingHelperTest: XCTestCase {
XCTAssertNoThrow(try serverChannel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 1)).wait())
let quiesce = ServerQuiescingHelper(group: el)
let collectionHandler = quiesce.makeServerChannelHandler(channel: serverChannel)
XCTAssertNoThrow(try serverChannel.pipeline.addHandler(collectionHandler).wait())
XCTAssertNoThrow(try serverChannel.pipeline.syncOperations.addHandler(collectionHandler))
// let's one channels
let eventCounterHandler = EventCounterHandler()
@ -237,7 +239,7 @@ public class QuiescingHelperTest: XCTestCase {
XCTAssertNoThrow(try serverChannel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 1)).wait())
let quiesce = ServerQuiescingHelper(group: el)
let collectionHandler = quiesce.makeServerChannelHandler(channel: serverChannel)
XCTAssertNoThrow(try serverChannel.pipeline.addHandler(collectionHandler).wait())
XCTAssertNoThrow(try serverChannel.pipeline.syncOperations.addHandler(collectionHandler))
// let's add one channel
let waitForPromise1: EventLoopPromise<Void> = el.makePromise()
@ -293,7 +295,7 @@ public class QuiescingHelperTest: XCTestCase {
XCTAssertNoThrow(try serverChannel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 1)).wait())
let quiesce = ServerQuiescingHelper(group: el)
let collectionHandler = quiesce.makeServerChannelHandler(channel: serverChannel)
XCTAssertNoThrow(try serverChannel.pipeline.addHandler(collectionHandler).wait())
XCTAssertNoThrow(try serverChannel.pipeline.syncOperations.addHandler(collectionHandler))
// check that the server is running
XCTAssertTrue(serverChannel.isActive)

View File

@ -42,9 +42,9 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
func testSimpleRequestWorks() {
XCTAssertNoThrow(
try self.channel.pipeline.addHandler(
try self.channel.pipeline.syncOperations.addHandler(
NIORequestResponseWithIDHandler<ValueWithRequestID<IOData>, ValueWithRequestID<String>>()
).wait()
)
)
self.buffer.writeString("hello")
@ -77,9 +77,9 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
func testEnqueingMultipleRequestsWorks() throws {
struct DummyError: Error {}
XCTAssertNoThrow(
try self.channel.pipeline.addHandler(
try self.channel.pipeline.syncOperations.addHandler(
NIORequestResponseWithIDHandler<ValueWithRequestID<IOData>, ValueWithRequestID<Int>>()
).wait()
)
)
var futures: [EventLoopFuture<ValueWithRequestID<Int>>] = []
@ -151,9 +151,9 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
func testRequestsEnqueuedAfterErrorAreFailed() {
struct DummyError: Error {}
XCTAssertNoThrow(
try self.channel.pipeline.addHandler(
try self.channel.pipeline.syncOperations.addHandler(
NIORequestResponseWithIDHandler<ValueWithRequestID<IOData>, ValueWithRequestID<Void>>()
).wait()
)
)
self.channel.pipeline.fireErrorCaught(DummyError())
@ -181,9 +181,9 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
struct DummyError2: Error {}
XCTAssertNoThrow(
try self.channel.pipeline.addHandler(
try self.channel.pipeline.syncOperations.addHandler(
NIORequestResponseWithIDHandler<ValueWithRequestID<IOData>, ValueWithRequestID<Void>>()
).wait()
)
)
let p: EventLoopPromise<ValueWithRequestID<Void>> = self.eventLoop.makePromise()
@ -213,9 +213,9 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
func testClosedConnectionFailsOutstandingPromises() {
XCTAssertNoThrow(
try self.channel.pipeline.addHandler(
try self.channel.pipeline.syncOperations.addHandler(
NIORequestResponseWithIDHandler<ValueWithRequestID<String>, ValueWithRequestID<Void>>()
).wait()
)
)
let promise = self.eventLoop.makePromise(of: ValueWithRequestID<Void>.self)
@ -229,9 +229,9 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
func testOutOfOrderResponsesWork() {
XCTAssertNoThrow(
try self.channel.pipeline.addHandler(
try self.channel.pipeline.syncOperations.addHandler(
NIORequestResponseWithIDHandler<ValueWithRequestID<String>, ValueWithRequestID<String>>()
).wait()
)
)
self.buffer.writeString("hello")
@ -259,9 +259,9 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
func testErrorOnResponseForNonExistantRequest() {
XCTAssertNoThrow(
try self.channel.pipeline.addHandler(
try self.channel.pipeline.syncOperations.addHandler(
NIORequestResponseWithIDHandler<ValueWithRequestID<String>, ValueWithRequestID<String>>()
).wait()
)
)
self.buffer.writeString("hello")
@ -315,7 +315,7 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
defer {
XCTAssertTrue(responsePromiseCompleted)
}
writePromise.futureResult.whenComplete { result in
writePromise.futureResult.assumeIsolated().whenComplete { result in
writePromiseCompleted = true
switch result {
case .success:
@ -324,7 +324,7 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
XCTAssertEqual(.ioOnClosedChannel, error as? ChannelError)
}
}
responsePromise.futureResult.whenComplete { result in
responsePromise.futureResult.assumeIsolated().whenComplete { result in
responsePromiseCompleted = true
switch result {
case .success:
@ -337,10 +337,10 @@ class RequestResponseWithIDHandlerTest: XCTestCase {
}
XCTAssertNoThrow(
try self.channel.pipeline.addHandlers(
try self.channel.pipeline.syncOperations.addHandlers(
NIORequestResponseWithIDHandler<ValueWithRequestID<IOData>, ValueWithRequestID<String>>(),
EmitRequestOnInactiveHandler()
).wait()
)
)
self.buffer.writeString("hello")
@ -378,5 +378,5 @@ struct ValueWithRequestID<T>: NIORequestIdentifiable {
var value: T
}
extension ValueWithRequestID: Equatable where T: Equatable {
}
extension ValueWithRequestID: Equatable where T: Equatable {}
extension ValueWithRequestID: Sendable where T: Sendable {}

View File

@ -782,7 +782,7 @@ class WritePCAPHandlerTest: XCTestCase {
}
// Let's drop all writes/flushes so EmbeddedChannel won't accumulate them.
XCTAssertNoThrow(try channel.pipeline.addHandler(DropAllWritesAndFlushes()).wait())
XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(DropAllWritesAndFlushes()))
XCTAssertNoThrow(
try channel.pipeline.syncOperations.addHandler(
NIOWritePCAPHandler(
@ -796,7 +796,7 @@ class WritePCAPHandlerTest: XCTestCase {
)
)
// Let's also drop all channelReads to prevent accumulation of all the data.
XCTAssertNoThrow(try channel.pipeline.addHandler(DropAllChannelReads()).wait())
XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(DropAllChannelReads()))
let chunkSize = Int(UInt16.max - 40) // needs to fit into the IPv4 header which adds 40
self.scratchBuffer = channel.allocator.buffer(capacity: chunkSize)