在关闭客户端连接的情况下,Vert.x:正确地流到http响应

我有以下用于vert.x应用程序的用例:

  • 为GET请求编写一个REST处理程序
  • 在这个处理程序中将数据从ReadStream复制到响应中
  • 这看起来很简单,在处理程序中,我将创建ReadStream,然后使用Pump将流传输到响应(WriteStream)上。

    然而,我注意到,在泵仍处于活动状态时,客户端可能会关闭与我的处理程序的HTTP连接。 在这种情况下,我期待WriteStream实例发生异常。 但事实并非如此,WriteStream#writeQueueFull方法返回“true”,将ReadStream设置为暂停模式。 它现在等待排水事件,但是这个事件永远不会被发送,因为写连接已经关闭。 结果是随着时间的推移,打开(暂停)ReadStreams的数量不断增长,最终导致泄漏。

    处理这种情况的正确方法是什么? 看起来很奇怪的第一个方面是写入流没有异常。 但即使我能够弄清楚错误情况(例如,通过监听响应中的关闭事件),我应该如何处理ReadStream? 它不能被取消,但它也不能保持开放。 我能想到的方法是将其内容抽成零流(即消耗它但忽略其内容)。 总体而言,这使得完整的泵送过程非常复杂。

    下面的例子显示了一个简单的测试用例。 主要方法对Verticle发出请求并立即关闭连接。 在服务器上(即在Verticle中)不会触发异常,而是将ReadStream锁定在暂停状态。

    示例代码的输出是:

    request
    false
    starting to pipe ... false
    response closed: false
    writeQueueFull false
    closed ...
    writeQueueFull true
    pause: 1
    response is closed
    

    任何建议,高度赞赏。

    package com.ibm.wps.test.vertx;
    
    import java.io.File;
    import java.io.InputStream;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.concurrent.CompletableFuture;
    
    import io.vertx.core.AbstractVerticle;
    import io.vertx.core.Future;
    import io.vertx.core.Handler;
    import io.vertx.core.Vertx;
    import io.vertx.core.buffer.Buffer;
    import io.vertx.core.file.AsyncFile;
    import io.vertx.core.file.OpenOptions;
    import io.vertx.core.http.HttpServer;
    import io.vertx.core.http.HttpServerResponse;
    import io.vertx.core.streams.Pump;
    import io.vertx.core.streams.ReadStream;
    import io.vertx.core.streams.WriteStream;
    
    public class CancellationTest {
    
    private static final class ReadStreamProxy implements ReadStream<Buffer> {
    
        private int countPause;
    
        private final ReadStream<Buffer> delegate;
    
        private ReadStreamProxy(final ReadStream<Buffer> aDelegate) {
            delegate = aDelegate;
        }
    
        @Override
        public ReadStream<Buffer> endHandler(final Handler<Void> endHandler) {
            delegate.endHandler(endHandler);
            return this;
        }
    
        @Override
        public ReadStream<Buffer> exceptionHandler(final Handler<Throwable> handler) {
            delegate.exceptionHandler(handler);
            return this;
        }
    
        @Override
        public ReadStream<Buffer> handler(final Handler<Buffer> handler) {
            delegate.handler(handler);
            return this;
        }
    
        @Override
        public ReadStream<Buffer> pause() {
            countPause++;
            delegate.pause();
            System.out.println("pause: " + countPause);
            return this;
        }
    
        @Override
        public ReadStream<Buffer> resume() {
            countPause--;
            delegate.resume();
            System.out.println("resume: " + countPause);
            return this;
        }
    
    }
    
    private static final class TestVerticle extends AbstractVerticle {
    
        private HttpServer server;
    
        @Override
        public void start(final Future<Void> startFuture) throws Exception {
    
            final String data = new File(CancellationTest.class.getResource("data.txt").toURI()).getCanonicalPath();
            System.out.println("data " + data);
    
            server = vertx.createHttpServer();
            server.requestHandler(req -> {
                System.out.println("request");
    
                final HttpServerResponse resp = req.response();
                System.out.println(resp.closed());
                resp.exceptionHandler(th -> {
                    System.out.println("exception from response " + th);
                });
                resp.closeHandler(v -> {
                    System.out.println("response is closed");
                });
                resp.setChunked(true);
    
                vertx.setTimer(100, l -> {
                    System.out.println("starting to pipe ... " + resp.closed());
    
                    final OpenOptions opts = new OpenOptions();
                    opts.setWrite(false);
                    opts.setRead(true);
                    vertx.fileSystem().open(data.toString(), opts, fileRes -> {
                        final AsyncFile file = fileRes.result();
                        file.exceptionHandler(ex -> {
                            System.out.println("file exception " + ex);
                        });
                        file.endHandler(v -> {
                            System.out.println("file ended");
                        });
    
                        System.out.println("response closed: " + resp.closed());
    
                        pipe(file, resp);
                    });
                });
            });
    
            server.listen(8080, result -> {
                if (result.failed()) {
                    startFuture.fail(result.cause());
                } else {
                    startFuture.complete();
                }
            });
        }
    }
    
    private static final class WriteStreamProxy implements WriteStream<Buffer> {
    
        private final WriteStream<Buffer> delegate;
    
        private WriteStreamProxy(final WriteStream<Buffer> aDelegate) {
            delegate = aDelegate;
        }
    
        @Override
        public WriteStream<Buffer> drainHandler(final Handler<Void> handler) {
            delegate.drainHandler(handler);
            return this;
        }
    
        @Override
        public void end() {
            delegate.end();
        }
    
        @Override
        public WriteStream<Buffer> exceptionHandler(final Handler<Throwable> handler) {
            delegate.exceptionHandler(handler);
            return this;
        }
    
        @Override
        public WriteStream<Buffer> setWriteQueueMaxSize(final int maxSize) {
            delegate.setWriteQueueMaxSize(maxSize);
            return this;
        }
    
        @Override
        public WriteStream<Buffer> write(final Buffer data) {
            delegate.write(data);
            return this;
        }
    
        @Override
        public boolean writeQueueFull() {
            final boolean result = delegate.writeQueueFull();
            System.out.println("writeQueueFull " + result);
            return result;
        }
    
    }
    
    public static void main(final String[] args) throws Exception {
    
        System.out.println(System.getProperties());
    
    
        final CompletableFuture<Void> sync = new CompletableFuture<Void>();
    
        final Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new TestVerticle(), result -> {
            try {
                final URL url = new URL("http://localhost:8080/");
                final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.connect();
                final InputStream is = conn.getInputStream();
                is.close();
                conn.disconnect();
                System.out.println("closed ...");
                sync.complete(null);
            } catch (final Throwable th) {
                sync.completeExceptionally(th);
            }
        });
    
        sync.get();
        vertx.close();
    }
    
    private static final void pipe(final ReadStream<Buffer> aRead, final WriteStream<Buffer> aWrite) {
    
        aWrite.exceptionHandler(ex -> {
            new Exception().printStackTrace();
            System.out.println("write stream exception " + ex);
        });
    
    
        Pump.pump(new ReadStreamProxy(aRead), new WriteStreamProxy(aWrite)).start();
    }
    

    }``

    链接地址: http://www.djcxy.com/p/96725.html

    上一篇: Vert.x: correctly stream to http response in case of closed client connection

    下一篇: stop superagent after receiving first byte (to only get redirect url)