@@ -1678,4 +1678,129 @@ class HTTPClientTests: XCTestCase {
16781678 }
16791679 XCTAssertNoThrow ( try promise. futureResult. wait ( ) )
16801680 }
1681+
1682+ func testUploadsReallyStream( ) {
1683+ final class HTTPServer : ChannelInboundHandler {
1684+ typealias InboundIn = HTTPServerRequestPart
1685+ typealias OutboundOut = HTTPServerResponsePart
1686+
1687+ private let headPromise : EventLoopPromise < HTTPRequestHead >
1688+ private let bodyPromises : [ EventLoopPromise < ByteBuffer > ]
1689+ private let endPromise : EventLoopPromise < Void >
1690+ private var bodyPartsSeenSoFar = 0
1691+
1692+ init ( headPromise: EventLoopPromise < HTTPRequestHead > ,
1693+ bodyPromises: [ EventLoopPromise < ByteBuffer > ] ,
1694+ endPromise: EventLoopPromise < Void > ) {
1695+ self . headPromise = headPromise
1696+ self . bodyPromises = bodyPromises
1697+ self . endPromise = endPromise
1698+ }
1699+
1700+ func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
1701+ switch self . unwrapInboundIn ( data) {
1702+ case . head( let head) :
1703+ XCTAssert ( self . bodyPartsSeenSoFar == 0 )
1704+ self . headPromise. succeed ( head)
1705+ case . body( let bytes) :
1706+ let myNumber = self . bodyPartsSeenSoFar
1707+ self . bodyPartsSeenSoFar += 1
1708+ self . bodyPromises. dropFirst ( myNumber) . first? . succeed ( bytes) ?? XCTFail ( " ouch, too many chunks " )
1709+ case . end:
1710+ context. write ( self . wrapOutboundOut ( . head( . init( version: . init( major: 1 , minor: 1 ) , status: . ok) ) ) ,
1711+ promise: nil )
1712+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: self . endPromise)
1713+ }
1714+ }
1715+
1716+ func handlerRemoved( context: ChannelHandlerContext ) {
1717+ struct NotFulfilledError : Error { }
1718+
1719+ self . headPromise. fail ( NotFulfilledError ( ) )
1720+ self . bodyPromises. forEach {
1721+ $0. fail ( NotFulfilledError ( ) )
1722+ }
1723+ self . endPromise. fail ( NotFulfilledError ( ) )
1724+ }
1725+ }
1726+
1727+ let group = MultiThreadedEventLoopGroup ( numberOfThreads: 2 )
1728+ defer {
1729+ XCTAssertNoThrow ( try group. syncShutdownGracefully ( ) )
1730+ }
1731+ let client = HTTPClient ( eventLoopGroupProvider: . shared( group) )
1732+ defer {
1733+ XCTAssertNoThrow ( try client. syncShutdown ( ) )
1734+ }
1735+ let headPromise = group. next ( ) . makePromise ( of: HTTPRequestHead . self)
1736+ let bodyPromises = ( 0 ..< 16 ) . map { _ in group. next ( ) . makePromise ( of: ByteBuffer . self) }
1737+ let endPromise = group. next ( ) . makePromise ( of: Void . self)
1738+ let sentOffAllBodyPartsPromise = group. next ( ) . makePromise ( of: Void . self)
1739+ // Because of https://github.com/swift-server/async-http-client/issues/200 we also need to pull off a terrible
1740+ // hack and get the internal EventLoop out :(. Once the bug is fixed, this promise should only get the
1741+ // StreamWriter.
1742+ let streamWriterPromise = group. next ( ) . makePromise ( of: ( EventLoop, HTTPClient . Body. StreamWriter) . self)
1743+
1744+ func makeServer( ) -> Channel ? {
1745+ return try ? ServerBootstrap ( group: group)
1746+ . childChannelInitializer { channel in
1747+ channel. pipeline. configureHTTPServerPipeline ( ) . flatMap {
1748+ channel. pipeline. addHandler ( HTTPServer ( headPromise: headPromise,
1749+ bodyPromises: bodyPromises,
1750+ endPromise: endPromise) )
1751+ }
1752+ }
1753+ . serverChannelOption ( ChannelOptions . socket ( . init( SOL_SOCKET) , . init( SO_REUSEADDR) ) , value: 1 )
1754+ . bind ( host: " 127.0.0.1 " , port: 0 )
1755+ . wait ( )
1756+ }
1757+
1758+ func makeRequest( server: Channel ) -> Request ? {
1759+ guard let localAddress = server. localAddress else {
1760+ return nil
1761+ }
1762+
1763+ return try ? HTTPClient . Request ( url: " http:// \( localAddress. ipAddress!) : \( localAddress. port!) " ,
1764+ method: . POST,
1765+ headers: [ " transfer-encoding " : " chunked " ] ,
1766+ body: . stream { streamWriter in
1767+ // Due to https://github.com/swift-server/async-http-client/issues/200
1768+ // we also need to pull off a terrible hack and get the internal
1769+ // EventLoop out :(. Once the bug is fixed, this promise should only get
1770+ // the StreamWriter.
1771+ let currentEL = MultiThreadedEventLoopGroup . currentEventLoop! // HACK!!
1772+ streamWriterPromise. succeed ( ( currentEL, streamWriter) )
1773+ return sentOffAllBodyPartsPromise. futureResult
1774+ } )
1775+ }
1776+
1777+ guard let server = makeServer ( ) , let request = makeRequest ( server: server) else {
1778+ XCTFail ( " couldn't make a server Channel and a matching Request... " )
1779+ return
1780+ }
1781+ defer {
1782+ XCTAssertNoThrow ( try server. close ( ) . wait ( ) )
1783+ }
1784+
1785+ var buffer = ByteBufferAllocator ( ) . buffer ( capacity: 1 )
1786+ let runningRequest = client. execute ( request: request)
1787+ guard let streamWriter = try ? streamWriterPromise. futureResult. wait ( ) else {
1788+ XCTFail ( " didn't get StreamWriter " )
1789+ return
1790+ }
1791+
1792+ XCTAssertNoThrow ( XCTAssertEqual ( . POST, try headPromise. futureResult. wait ( ) . method) )
1793+ for bodyChunkNumber in 0 ..< 16 {
1794+ buffer. clear ( )
1795+ buffer. writeString ( String ( bodyChunkNumber, radix: 16 ) )
1796+ XCTAssertEqual ( 1 , buffer. readableBytes)
1797+ XCTAssertNoThrow ( try streamWriter. 0 . flatSubmit {
1798+ streamWriter. 1 . write ( . byteBuffer( buffer) )
1799+ } . wait ( ) )
1800+ XCTAssertNoThrow ( XCTAssertEqual ( buffer, try bodyPromises [ bodyChunkNumber] . futureResult. wait ( ) ) )
1801+ }
1802+ sentOffAllBodyPartsPromise. succeed ( ( ) )
1803+ XCTAssertNoThrow ( try endPromise. futureResult. wait ( ) )
1804+ XCTAssertNoThrow ( try runningRequest. wait ( ) )
1805+ }
16811806}
0 commit comments