Skip to content

Commit

Permalink
[grid] Ensure binary websocket messages are forwarded
Browse files Browse the repository at this point in the history
  • Loading branch information
shs96c committed May 19, 2020
1 parent 653237e commit 3e21429
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ public void onError(Throwable t) {
listener.onError(t);
}

@Override
public void onBinaryFrame(byte[] payload, boolean finalFragment, int rsv) {
if (payload != null) {
listener.onBinary(payload);
}
}

@Override
public void onTextFrame(String payload, boolean finalFragment, int rsv) {
if (payload != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ java_library(
srcs = glob(["*.java"]),
visibility = [
"//java/server/src/org/openqa/selenium/grid:__pkg__",
"//java/server/test/org/openqa/selenium/grid/router/httpd:__pkg__",
],
deps = [
"//java:auto-service",
"//java/client/src/org/openqa/selenium/remote",
"//java/server/src/org/openqa/selenium/cli",
"//java/server/src/org/openqa/selenium/grid:base-command",
"//java/server/src/org/openqa/selenium/grid/config",
"//java/server/src/org/openqa/selenium/grid/data",
"//java/server/src/org/openqa/selenium/grid/distributor",
"//java/server/src/org/openqa/selenium/grid/distributor/config",
"//java/server/src/org/openqa/selenium/grid/log",
Expand Down
105 changes: 105 additions & 0 deletions java/server/src/org/openqa/selenium/grid/router/httpd/ProxyCdp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.grid.router.httpd;

import org.openqa.selenium.NoSuchSessionException;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.sessionmap.SessionMap;
import org.openqa.selenium.remote.HttpSessionId;
import org.openqa.selenium.remote.SessionId;
import org.openqa.selenium.remote.http.BinaryMessage;
import org.openqa.selenium.remote.http.ClientConfig;
import org.openqa.selenium.remote.http.CloseMessage;
import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.Message;
import org.openqa.selenium.remote.http.TextMessage;
import org.openqa.selenium.remote.http.WebSocket;

import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

import static org.openqa.selenium.remote.http.HttpMethod.GET;

class ProxyCdp implements BiFunction<String, Consumer<Message>, Optional<Consumer<Message>>> {

private static final Logger LOG = Logger.getLogger(ProxyCdp.class.getName());
private final HttpClient.Factory clientFactory;
private final SessionMap sessions;

public ProxyCdp(HttpClient.Factory clientFactory, SessionMap sessions) {
this.clientFactory = Objects.requireNonNull(clientFactory);
this.sessions = Objects.requireNonNull(sessions);
}

@Override
public Optional<Consumer<Message>> apply(String uri, Consumer<Message> downstream) {
Objects.requireNonNull(uri);
Objects.requireNonNull(downstream);

Optional<SessionId> sessionId = HttpSessionId.getSessionId(uri).map(SessionId::new);
if (!sessionId.isPresent()) {
return Optional.empty();
}

try {
Session session = sessions.get(sessionId.get());

HttpClient client = clientFactory.createClient(ClientConfig.defaultConfig().baseUri(session.getUri()));
WebSocket upstream = client.openSocket(new HttpRequest(GET, uri), new ForwardingListener(downstream));

return Optional.of(upstream::send);

} catch (NoSuchSessionException e) {
LOG.info("Attempt to connect to non-existant session: " + uri);
return Optional.empty();
}
}

private static class ForwardingListener implements WebSocket.Listener {
private final Consumer<Message> downstream;

public ForwardingListener(Consumer<Message> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}

@Override
public void onBinary(byte[] data) {
downstream.accept(new BinaryMessage(data));
}

@Override
public void onClose(int code, String reason) {
downstream.accept(new CloseMessage(code, reason));
}

@Override
public void onText(CharSequence data) {
downstream.accept(new TextMessage(data));
}

@Override
public void onError(Throwable cause) {
LOG.log(Level.WARNING, "Error proxying CDP command", cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected void execute(Config config) {

Router router = new Router(tracer, clientFactory, sessions, distributor);

Server<?> server = new NettyServer(serverOptions, router);
Server<?> server = new NettyServer(serverOptions, router, new ProxyCdp(clientFactory, sessions));
server.start();

BuildInfo info = new BuildInfo();
Expand Down
22 changes: 22 additions & 0 deletions java/server/test/org/openqa/selenium/grid/router/httpd/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("@rules_jvm_external//:defs.bzl", "artifact")
load("//java:defs.bzl", "java_test_suite")

java_test_suite(
name = "medium-tests",
size = "medium",
srcs = glob(["*.java"]),
deps = [
"//java/client/src/org/openqa/selenium/remote",
"//java/client/test/org/openqa/selenium/remote/tracing:tracing-support",
"//java/server/src/org/openqa/selenium/events",
"//java/server/src/org/openqa/selenium/events/local",
"//java/server/src/org/openqa/selenium/grid/config",
"//java/server/src/org/openqa/selenium/grid/data",
"//java/server/src/org/openqa/selenium/grid/router/httpd",
"//java/server/src/org/openqa/selenium/grid/sessionmap",
"//java/server/src/org/openqa/selenium/grid/sessionmap/local",
"//java/server/src/org/openqa/selenium/netty/server",
artifact("junit:junit"),
artifact("org.assertj:assertj-core"),
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.grid.router.httpd;

import org.junit.Before;
import org.junit.Test;
import org.openqa.selenium.ImmutableCapabilities;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.events.local.GuavaEventBus;
import org.openqa.selenium.grid.config.Config;
import org.openqa.selenium.grid.config.MapConfig;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.server.BaseServerOptions;
import org.openqa.selenium.grid.server.Server;
import org.openqa.selenium.grid.sessionmap.SessionMap;
import org.openqa.selenium.grid.sessionmap.local.LocalSessionMap;
import org.openqa.selenium.netty.server.NettyServer;
import org.openqa.selenium.remote.SessionId;
import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;
import org.openqa.selenium.remote.http.TextMessage;
import org.openqa.selenium.remote.http.WebSocket;
import org.openqa.selenium.remote.tracing.DefaultTestTracer;
import org.openqa.selenium.remote.tracing.Tracer;

import java.net.URISyntaxException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.openqa.selenium.remote.http.HttpMethod.GET;

public class ProxyCdpTest {

private final HttpHandler nullHandler = req -> new HttpResponse();
private final Config emptyConfig = new MapConfig(Map.of());
private Server<?> proxyServer;
private SessionMap sessions;

@Before
public void setUp() {
Tracer tracer = DefaultTestTracer.createTracer();
EventBus events = new GuavaEventBus();

sessions = new LocalSessionMap(tracer, events);

// Set up the proxy we'll be using
HttpClient.Factory clientFactory = HttpClient.Factory.createDefault();
ProxyCdp proxy = new ProxyCdp(clientFactory, sessions);
proxyServer = new NettyServer(new BaseServerOptions(emptyConfig), nullHandler, proxy).start();
}

@Test
public void shouldForwardTextMessageToServer() throws URISyntaxException, InterruptedException {
HttpClient.Factory clientFactory = HttpClient.Factory.createDefault();

// Create a backend server which will capture any incoming text message
AtomicReference<String> text = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Server<?> backend = createBackendServer(latch, text, "");

// Push a session that resolves to the backend server into the session map
SessionId id = new SessionId(UUID.randomUUID());
sessions.add(new Session(id, backend.getUrl().toURI(), new ImmutableCapabilities()));

// Now! Send a message. We expect it to eventually show up in the backend
WebSocket socket = clientFactory.createClient(proxyServer.getUrl())
.openSocket(new HttpRequest(GET, String.format("/session/%s/cdp", id)), new WebSocket.Listener(){});

socket.sendText("Cheese!");

assertThat(latch.await(5, SECONDS)).isTrue();
assertThat(text.get()).isEqualTo("Cheese!");

socket.close();
}

@Test
public void shouldForwardTextMessageFromServerToLocalEnd() throws URISyntaxException, InterruptedException {
HttpClient.Factory clientFactory = HttpClient.Factory.createDefault();

Server<?> backend = createBackendServer(new CountDownLatch(1), new AtomicReference<>(), "Asiago");

// Push a session that resolves to the backend server into the session map
SessionId id = new SessionId(UUID.randomUUID());
sessions.add(new Session(id, backend.getUrl().toURI(), new ImmutableCapabilities()));

// Now! Send a message. We expect it to eventually show up in the backend
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> text = new AtomicReference<>();
WebSocket socket = clientFactory.createClient(proxyServer.getUrl())
.openSocket(new HttpRequest(GET, String.format("/session/%s/cdp", id)), new WebSocket.Listener() {
@Override
public void onText(CharSequence data) {
text.set(data.toString());
latch.countDown();
}
});

socket.sendText("Cheese!");

assertThat(latch.await(5, SECONDS)).isTrue();
assertThat(text.get()).isEqualTo("Asiago");

socket.close();
}

private Server<?> createBackendServer(CountDownLatch latch, AtomicReference<String> incomingRef, String response) {
return new NettyServer(
new BaseServerOptions(emptyConfig),
nullHandler,
(uri, sink) -> Optional.of(msg -> {
if (msg instanceof TextMessage) {
incomingRef.set(((TextMessage) msg).text());
sink.accept(new TextMessage(response));
latch.countDown();
}
}))
.start();
}
}

0 comments on commit 3e21429

Please sign in to comment.