Johannes Weiß 4b14b7f2b1 quiescing helper & demo (#2)
Motivation:

In a few cases quiescing a server application is useful but it's harder
than necessary with core-NIO. Therefore this adds a helper & a
demonstration.

Modifications:

- add `QuiescingHelper` which helps users to quiesce a channel by
  collecting all accepted channels and when needed sends them the
  quiescing user event. When all collected channels have closed the
  user will be notified and can just shut down the ELG.
- added a demo implementation with a simple HTTP server that quiesces
  when receiving a signal

Result:

Make it quite easy to quiesce a server and show users how to do it.
2018-05-17 20:27:15 +02:00

97 lines
4.2 KiB
Swift

//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIO
import NIOHTTP1
import NIOExtras
import Dispatch
private final class HTTPHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
let req = self.unwrapInboundIn(data)
switch req {
case .head(let head):
guard head.version == HTTPVersion(major: 1, minor: 1) else {
ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: head.version, status: .badRequest))), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete {
ctx.close(promise: nil)
}
return
}
case .body:
() // ignore
case .end:
var buffer = ctx.channel.allocator.buffer(capacity: 128)
buffer.write(staticString: "received request; waiting 30s then finishing up request\n")
buffer.write(staticString: "press Ctrl+C in the server's terminal or run the following command to initiate server shutdown\n")
buffer.write(string: " kill -INT \(getpid())\n")
ctx.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1),
status: .ok))), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
buffer.clear()
buffer.write(staticString: "done with the request now\n")
_ = ctx.eventLoop.scheduleTask(in: .seconds(30)) { [buffer] in
ctx.write(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
}
}
}
}
let group = MultiThreadedEventLoopGroup(numThreads: System.coreCount)
let quiesce = ServerQuiescingHelper(group: group)
let signalQueue = DispatchQueue(label: "io.swift-nio.NIOExtrasDemo.SignalHandlingQueue")
let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: signalQueue)
let fullyShutdownPromise: EventLoopPromise<Void> = group.next().newPromise()
signalSource.setEventHandler {
signalSource.cancel()
print("\nreceived signal, initiating shutdown which should complete after the last request finished.")
quiesce.initiateShutdown(promise: fullyShutdownPromise)
}
signal(SIGINT, SIG_IGN)
signalSource.resume()
do {
let serverChannel = try ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.backlog, value: 256)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.serverChannelInitializer { channel in
channel.pipeline.add(handler: quiesce.makeServerChannelHandler(channel: channel))
}
.childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.childChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true, withErrorHandling: true).then {
channel.pipeline.add(handler: HTTPHandler())
}
}
.bind(host: "localhost", port: 0)
.wait()
print("HTTP server up and running on \(serverChannel.localAddress!)")
print("to connect to this server, run")
print(" curl http://localhost:\(serverChannel.localAddress!.port!)")
} catch {
try group.syncShutdownGracefully()
throw error
}
try fullyShutdownPromise.futureResult.wait()
try group.syncShutdownGracefully()