Vert.x: correctly stream to http response in case of closed client connection
I have the following usecase for a vert.x aplication:
This looked straight forward, in the handler I would create the ReadStream and then use a Pump to pipe the stream onto the response (WriteStream).
I noticed however that it can happen that the client closes the HTTP connection to my handler while the pump is still active. In this situation I had expected an exception on the WriteStream instance to occurs. However this is not the case, instead the WriteStream#writeQueueFull method returns "true" which sets the ReadStream into paused mode. It now waits for a drain event, but this event is never sent, because the write connection has been closed. The result is that over time the number of open (paused) ReadStreams grows, eventually generating a leak.
What is the correct way to handle this situation? The first aspect that looks strange is that there is no exception on the write stream. But even if I were able to figure out the error situation (eg by listening on the close event on the response), what am I supposed to do with the ReadStream? It cannot be canceled but it also cannot stay open. The approach I can think of is to pump its content into a nil stream (ie consume it but ignore its content). Overall this makes the complete pumping process pretty complicated.
The example below shows a simple testcase. The main method makes a request against a verticle and closed the connection immediately. On the server (ie in the verticle) no exception is triggered, instead the ReadStream is locked in paused state.
The output of the sample code is:
request
false
starting to pipe ... false
response closed: false
writeQueueFull false
closed ...
writeQueueFull true
pause: 1
response is closed
Any suggestions are highly appreciated.
package com.ibm.wps.test.vertx;
import java.io.File;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.streams.Pump;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
public class CancellationTest {
private static final class ReadStreamProxy implements ReadStream<Buffer> {
private int countPause;
private final ReadStream<Buffer> delegate;
private ReadStreamProxy(final ReadStream<Buffer> aDelegate) {
delegate = aDelegate;
}
@Override
public ReadStream<Buffer> endHandler(final Handler<Void> endHandler) {
delegate.endHandler(endHandler);
return this;
}
@Override
public ReadStream<Buffer> exceptionHandler(final Handler<Throwable> handler) {
delegate.exceptionHandler(handler);
return this;
}
@Override
public ReadStream<Buffer> handler(final Handler<Buffer> handler) {
delegate.handler(handler);
return this;
}
@Override
public ReadStream<Buffer> pause() {
countPause++;
delegate.pause();
System.out.println("pause: " + countPause);
return this;
}
@Override
public ReadStream<Buffer> resume() {
countPause--;
delegate.resume();
System.out.println("resume: " + countPause);
return this;
}
}
private static final class TestVerticle extends AbstractVerticle {
private HttpServer server;
@Override
public void start(final Future<Void> startFuture) throws Exception {
final String data = new File(CancellationTest.class.getResource("data.txt").toURI()).getCanonicalPath();
System.out.println("data " + data);
server = vertx.createHttpServer();
server.requestHandler(req -> {
System.out.println("request");
final HttpServerResponse resp = req.response();
System.out.println(resp.closed());
resp.exceptionHandler(th -> {
System.out.println("exception from response " + th);
});
resp.closeHandler(v -> {
System.out.println("response is closed");
});
resp.setChunked(true);
vertx.setTimer(100, l -> {
System.out.println("starting to pipe ... " + resp.closed());
final OpenOptions opts = new OpenOptions();
opts.setWrite(false);
opts.setRead(true);
vertx.fileSystem().open(data.toString(), opts, fileRes -> {
final AsyncFile file = fileRes.result();
file.exceptionHandler(ex -> {
System.out.println("file exception " + ex);
});
file.endHandler(v -> {
System.out.println("file ended");
});
System.out.println("response closed: " + resp.closed());
pipe(file, resp);
});
});
});
server.listen(8080, result -> {
if (result.failed()) {
startFuture.fail(result.cause());
} else {
startFuture.complete();
}
});
}
}
private static final class WriteStreamProxy implements WriteStream<Buffer> {
private final WriteStream<Buffer> delegate;
private WriteStreamProxy(final WriteStream<Buffer> aDelegate) {
delegate = aDelegate;
}
@Override
public WriteStream<Buffer> drainHandler(final Handler<Void> handler) {
delegate.drainHandler(handler);
return this;
}
@Override
public void end() {
delegate.end();
}
@Override
public WriteStream<Buffer> exceptionHandler(final Handler<Throwable> handler) {
delegate.exceptionHandler(handler);
return this;
}
@Override
public WriteStream<Buffer> setWriteQueueMaxSize(final int maxSize) {
delegate.setWriteQueueMaxSize(maxSize);
return this;
}
@Override
public WriteStream<Buffer> write(final Buffer data) {
delegate.write(data);
return this;
}
@Override
public boolean writeQueueFull() {
final boolean result = delegate.writeQueueFull();
System.out.println("writeQueueFull " + result);
return result;
}
}
public static void main(final String[] args) throws Exception {
System.out.println(System.getProperties());
final CompletableFuture<Void> sync = new CompletableFuture<Void>();
final Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new TestVerticle(), result -> {
try {
final URL url = new URL("http://localhost:8080/");
final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
final InputStream is = conn.getInputStream();
is.close();
conn.disconnect();
System.out.println("closed ...");
sync.complete(null);
} catch (final Throwable th) {
sync.completeExceptionally(th);
}
});
sync.get();
vertx.close();
}
private static final void pipe(final ReadStream<Buffer> aRead, final WriteStream<Buffer> aWrite) {
aWrite.exceptionHandler(ex -> {
new Exception().printStackTrace();
System.out.println("write stream exception " + ex);
});
Pump.pump(new ReadStreamProxy(aRead), new WriteStreamProxy(aWrite)).start();
}
} ```
链接地址: http://www.djcxy.com/p/96726.html上一篇: C ++随机浮点数生成