在关闭客户端连接的情况下,Vert.x:正确地流到http响应
我有以下用于vert.x应用程序的用例:
这看起来很简单,在处理程序中,我将创建ReadStream,然后使用Pump将流传输到响应(WriteStream)上。
然而,我注意到,在泵仍处于活动状态时,客户端可能会关闭与我的处理程序的HTTP连接。 在这种情况下,我期待WriteStream实例发生异常。 但事实并非如此,WriteStream#writeQueueFull方法返回“true”,将ReadStream设置为暂停模式。 它现在等待排水事件,但是这个事件永远不会被发送,因为写连接已经关闭。 结果是随着时间的推移,打开(暂停)ReadStreams的数量不断增长,最终导致泄漏。
处理这种情况的正确方法是什么? 看起来很奇怪的第一个方面是写入流没有异常。 但即使我能够弄清楚错误情况(例如,通过监听响应中的关闭事件),我应该如何处理ReadStream? 它不能被取消,但它也不能保持开放。 我能想到的方法是将其内容抽成零流(即消耗它但忽略其内容)。 总体而言,这使得完整的泵送过程非常复杂。
下面的例子显示了一个简单的测试用例。 主要方法对Verticle发出请求并立即关闭连接。 在服务器上(即在Verticle中)不会触发异常,而是将ReadStream锁定在暂停状态。
示例代码的输出是:
request
false
starting to pipe ... false
response closed: false
writeQueueFull false
closed ...
writeQueueFull true
pause: 1
response is closed
任何建议,高度赞赏。
package com.ibm.wps.test.vertx;
import java.io.File;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.streams.Pump;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
public class CancellationTest {
private static final class ReadStreamProxy implements ReadStream<Buffer> {
private int countPause;
private final ReadStream<Buffer> delegate;
private ReadStreamProxy(final ReadStream<Buffer> aDelegate) {
delegate = aDelegate;
}
@Override
public ReadStream<Buffer> endHandler(final Handler<Void> endHandler) {
delegate.endHandler(endHandler);
return this;
}
@Override
public ReadStream<Buffer> exceptionHandler(final Handler<Throwable> handler) {
delegate.exceptionHandler(handler);
return this;
}
@Override
public ReadStream<Buffer> handler(final Handler<Buffer> handler) {
delegate.handler(handler);
return this;
}
@Override
public ReadStream<Buffer> pause() {
countPause++;
delegate.pause();
System.out.println("pause: " + countPause);
return this;
}
@Override
public ReadStream<Buffer> resume() {
countPause--;
delegate.resume();
System.out.println("resume: " + countPause);
return this;
}
}
private static final class TestVerticle extends AbstractVerticle {
private HttpServer server;
@Override
public void start(final Future<Void> startFuture) throws Exception {
final String data = new File(CancellationTest.class.getResource("data.txt").toURI()).getCanonicalPath();
System.out.println("data " + data);
server = vertx.createHttpServer();
server.requestHandler(req -> {
System.out.println("request");
final HttpServerResponse resp = req.response();
System.out.println(resp.closed());
resp.exceptionHandler(th -> {
System.out.println("exception from response " + th);
});
resp.closeHandler(v -> {
System.out.println("response is closed");
});
resp.setChunked(true);
vertx.setTimer(100, l -> {
System.out.println("starting to pipe ... " + resp.closed());
final OpenOptions opts = new OpenOptions();
opts.setWrite(false);
opts.setRead(true);
vertx.fileSystem().open(data.toString(), opts, fileRes -> {
final AsyncFile file = fileRes.result();
file.exceptionHandler(ex -> {
System.out.println("file exception " + ex);
});
file.endHandler(v -> {
System.out.println("file ended");
});
System.out.println("response closed: " + resp.closed());
pipe(file, resp);
});
});
});
server.listen(8080, result -> {
if (result.failed()) {
startFuture.fail(result.cause());
} else {
startFuture.complete();
}
});
}
}
private static final class WriteStreamProxy implements WriteStream<Buffer> {
private final WriteStream<Buffer> delegate;
private WriteStreamProxy(final WriteStream<Buffer> aDelegate) {
delegate = aDelegate;
}
@Override
public WriteStream<Buffer> drainHandler(final Handler<Void> handler) {
delegate.drainHandler(handler);
return this;
}
@Override
public void end() {
delegate.end();
}
@Override
public WriteStream<Buffer> exceptionHandler(final Handler<Throwable> handler) {
delegate.exceptionHandler(handler);
return this;
}
@Override
public WriteStream<Buffer> setWriteQueueMaxSize(final int maxSize) {
delegate.setWriteQueueMaxSize(maxSize);
return this;
}
@Override
public WriteStream<Buffer> write(final Buffer data) {
delegate.write(data);
return this;
}
@Override
public boolean writeQueueFull() {
final boolean result = delegate.writeQueueFull();
System.out.println("writeQueueFull " + result);
return result;
}
}
public static void main(final String[] args) throws Exception {
System.out.println(System.getProperties());
final CompletableFuture<Void> sync = new CompletableFuture<Void>();
final Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new TestVerticle(), result -> {
try {
final URL url = new URL("http://localhost:8080/");
final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
final InputStream is = conn.getInputStream();
is.close();
conn.disconnect();
System.out.println("closed ...");
sync.complete(null);
} catch (final Throwable th) {
sync.completeExceptionally(th);
}
});
sync.get();
vertx.close();
}
private static final void pipe(final ReadStream<Buffer> aRead, final WriteStream<Buffer> aWrite) {
aWrite.exceptionHandler(ex -> {
new Exception().printStackTrace();
System.out.println("write stream exception " + ex);
});
Pump.pump(new ReadStreamProxy(aRead), new WriteStreamProxy(aWrite)).start();
}
}``
链接地址: http://www.djcxy.com/p/96725.html上一篇: Vert.x: correctly stream to http response in case of closed client connection
下一篇: stop superagent after receiving first byte (to only get redirect url)