Skip to content

Commit

Permalink
[java] First naive reactor-netty based implementation of websockets c…
Browse files Browse the repository at this point in the history
…onnection
  • Loading branch information
barancev committed Apr 28, 2020
1 parent 8a7e58d commit 62ab34a
Showing 1 changed file with 45 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@
import org.openqa.selenium.remote.http.HttpMethod;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;
import org.openqa.selenium.remote.http.Message;
import org.openqa.selenium.remote.http.TextMessage;
import org.openqa.selenium.remote.http.WebSocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.util.function.Tuple2;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Objects;

Expand All @@ -52,13 +57,14 @@ public class ReactorClient implements HttpClient {

private ReactorClient(ClientConfig config) {
this.config = config;
httpClient = reactor.netty.http.client.HttpClient.create();
httpClient = reactor.netty.http.client.HttpClient.create()
.baseUrl(config.baseUrl().toString())
.keepAlive(true);
}

@Override
public HttpResponse execute(HttpRequest request) {
Tuple2<InputStream, HttpResponse> result = httpClient
.baseUrl(config.baseUrl().toString())
.headers(h -> request.getHeaderNames().forEach(
name -> request.getHeaders(name).forEach(value -> h.set(name, value))))
.request(methodMap.get(request.getMethod()))
Expand Down Expand Up @@ -110,7 +116,18 @@ public WebSocket openSocket(HttpRequest request, WebSocket.Listener listener) {
Objects.requireNonNull(request, "Request to send must be set.");
Objects.requireNonNull(listener, "WebSocket listener must be set.");

return null;
try {
URI origUri = new URI(request.getUri());
URI wsUri = new URI("ws", null, origUri.getHost(), origUri.getPort(), origUri.getPath(), null, null);

return new ReactorWebSocket(httpClient
.headers(h -> request.getHeaderNames().forEach(
name -> request.getHeaders(name).forEach(value -> h.set(name, value))))
.websocket().uri(wsUri.toString()), listener);
} catch (URISyntaxException e) {
e.printStackTrace();
return null;
}
}

@Override
Expand All @@ -129,4 +146,29 @@ public HttpClient createClient(ClientConfig config) {
return new ReactorClient(config);
}
}

private static class ReactorWebSocket implements WebSocket {

private WebsocketOutbound out;

ReactorWebSocket(reactor.netty.http.client.HttpClient.WebsocketSender websocket, WebSocket.Listener listener) {
Flux<String> response = websocket.handle((in, out) -> {
this.out = out;
return in.receive().asString();
});
response.subscribe(listener::onText);
}

@Override
public WebSocket send(Message message) {
TextMessage txt = (TextMessage) message;
out.sendString(Flux.just(txt.text())).then().subscribe();
return this;
}

@Override
public void close() {
out.sendClose().then().subscribe();
}
}
}

0 comments on commit 62ab34a

Please sign in to comment.