//===----------------------------------------------------------------------===// // // This source file is part of the SwiftNIO open source project // // Copyright (c) 2020-2021 Apple Inc. and the SwiftNIO project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information // See CONTRIBUTORS.txt for the list of SwiftNIO project authors // // SPDX-License-Identifier: Apache-2.0 // //===----------------------------------------------------------------------===// import NIOConcurrencyHelpers import NIOCore import NIOHTTP1 import NIOPosix // MARK: Handlers final class SimpleHTTPServer: ChannelInboundHandler { typealias InboundIn = HTTPServerRequestPart typealias OutboundOut = HTTPServerResponsePart private var files: [String] = [] private var seenEnd = false private var sentEnd = false private var isOpen = true private let cachedHead: HTTPResponseHead private let cachedBody: [UInt8] private let bodyLength = 1024 private let numberOfAdditionalHeaders = 10 init() { var head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok) head.headers.add(name: "Content-Length", value: "\(self.bodyLength)") for i in 0.. private let head: HTTPRequestHead init(numberOfRequests: Int, eventLoop: EventLoop, head: HTTPRequestHead) { self.remainingNumberOfRequests = numberOfRequests self.numberOfRequests = numberOfRequests self.isDonePromise = eventLoop.makePromise() self.head = head } func wait() throws -> Int { let reqs = try self.isDonePromise.futureResult.wait() precondition(reqs == self.numberOfRequests) return reqs } var completedFuture: EventLoopFuture { self.isDonePromise.futureResult } func errorCaught(context: ChannelHandlerContext, error: Error) { context.channel.close(promise: nil) self.isDonePromise.fail(error) } func channelRead(context: ChannelHandlerContext, data: NIOAny) { let reqPart = self.unwrapInboundIn(data) if case .end(nil) = reqPart { if self.remainingNumberOfRequests <= 0 { context.channel.close().assumeIsolated().map { self.doneRequests }.nonisolated().cascade( to: self.isDonePromise ) } else { self.doneRequests += 1 self.remainingNumberOfRequests -= 1 context.write(self.wrapOutboundOut(.head(self.head)), promise: nil) context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) } } } } // MARK: ThreadedPerfTest class HTTP1ThreadedPerformanceTest: Benchmark { let numberOfRepeats: Int let numberOfClients: Int let requestsPerClient: Int let extraInitialiser: @Sendable (Channel) -> EventLoopFuture let head: HTTPRequestHead var group: MultiThreadedEventLoopGroup! var serverChannel: Channel! init( numberOfRepeats: Int, numberOfClients: Int, requestsPerClient: Int, extraInitialiser: @escaping @Sendable (Channel) -> EventLoopFuture ) { self.numberOfRepeats = numberOfRepeats self.numberOfClients = numberOfClients self.requestsPerClient = requestsPerClient self.extraInitialiser = extraInitialiser var head = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1), method: .GET, uri: "/perf-test-1") head.headers.add(name: "Host", value: "localhost") self.head = head } func setUp() throws { self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) self.serverChannel = try ServerBootstrap(group: self.group) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .childChannelInitializer { channel in channel.eventLoop.makeCompletedFuture { let sync = channel.pipeline.syncOperations try sync.configureHTTPServerPipeline(withPipeliningAssistance: true) try sync.addHandler(SimpleHTTPServer()) } }.bind(host: "127.0.0.1", port: 0).wait() } func tearDown() { try! self.serverChannel.close().wait() try! self.group.syncShutdownGracefully() } func run() throws -> Int { var reqs: [Int] = [] reqs.reserveCapacity(self.numberOfRepeats) for _ in 0..]>([]) requestsCompletedFutures.withLockedValue({ $0.reserveCapacity(self.numberOfClients) }) var clientChannels: [Channel] = [] clientChannels.reserveCapacity(self.numberOfClients) for _ in 0..] = [] for clientChannel in clientChannels { clientChannel.write(HTTPClientRequestPart.head(self.head), promise: nil) writeFutures.append(clientChannel.writeAndFlush(HTTPClientRequestPart.end(nil))) } let allWrites = EventLoopFuture.andAllComplete(writeFutures, on: writeFutures.first!.eventLoop) try! allWrites.wait() let futures = requestsCompletedFutures.withLockedValue { $0 } let requestsServed = EventLoopFuture.reduce( 0, futures, on: futures.first!.eventLoop ) { $0 + $1 } reqs.append(try! requestsServed.wait()) } return reqs.reduce(0, +) / self.numberOfRepeats } }