Skip to content

Commit

Permalink
[cdp] Ensure that each devtools session is closed once
Browse files Browse the repository at this point in the history
  • Loading branch information
shs96c committed May 19, 2020
1 parent 948c69e commit 3fa8447
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 76 deletions.
164 changes: 90 additions & 74 deletions java/client/src/org/openqa/selenium/devtools/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@

package org.openqa.selenium.devtools;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.openqa.selenium.json.Json.MAP_TYPE;
import static org.openqa.selenium.remote.http.HttpMethod.GET;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;

import org.openqa.selenium.devtools.target.model.SessionID;
import org.openqa.selenium.json.Json;
import org.openqa.selenium.json.JsonInput;
Expand All @@ -40,15 +35,27 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.openqa.selenium.json.Json.MAP_TYPE;
import static org.openqa.selenium.remote.http.HttpMethod.GET;

public class Connection implements Closeable {

private static final Logger LOG = Logger.getLogger(Connection.class.getName());
private static final Json JSON = new Json();
private static final Executor EXECUTOR = Executors.newCachedThreadPool(r -> {
Thread thread = new Thread(r, "CDP Connection");
thread.setDaemon(true);
return thread;
});
private static final AtomicLong NEXT_ID = new AtomicLong(1L);
private final WebSocket socket;
private final Map<Long, Consumer<JsonInput>> methodCallbacks = new LinkedHashMap<>();
Expand Down Expand Up @@ -96,6 +103,7 @@ public <X> CompletableFuture<X> send(SessionID sessionId, Command<X> command) {
X value = command.getMapper().apply(input);
result.complete(value);
} catch (Throwable e) {
LOG.log(Level.WARNING, String.format("Unable to map result for %s", command.getMethod()), e);
result.completeExceptionally(e);
}
}));
Expand All @@ -109,7 +117,7 @@ public <X> CompletableFuture<X> send(SessionID sessionId, Command<X> command) {
serialized.put("sessionId", sessionId);
}

LOG.fine(JSON.toJson(serialized.build()));
LOG.fine(() -> String.format("-> %s", JSON.toJson(serialized.build())));
socket.sendText(JSON.toJson(serialized.build()));

if (!command.getSendsResponse() ) {
Expand All @@ -121,7 +129,8 @@ public <X> CompletableFuture<X> send(SessionID sessionId, Command<X> command) {

public <X> X sendAndWait(SessionID sessionId, Command<X> command, Duration timeout) {
try {
return send(sessionId, command).get(timeout.toMillis(), MILLISECONDS);
CompletableFuture<X> future = send(sessionId, command);
return future.get(timeout.toMillis(), MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Thread has been interrupted", e);
Expand Down Expand Up @@ -160,79 +169,86 @@ private class Listener implements WebSocket.Listener {

@Override
public void onText(CharSequence data) {
// It's kind of gross to decode the data twice, but this lets us get started on something
// that feels nice to users.
// TODO: decode once, and once only

String asString = String.valueOf(data);
LOG.fine(asString);

Map<String, Object> raw = JSON.toType(asString, MAP_TYPE);
if (raw.get("id") instanceof Number && raw.get("result") != null) {
Consumer<JsonInput> consumer = methodCallbacks.remove(((Number) raw.get("id")).longValue());
if (consumer == null) {
return;
EXECUTOR.execute(() -> {
try {
handle(data);
} catch (Throwable t) {
LOG.log(Level.WARNING, "Unable to process: " + data, t);
}
});
}
}

try (StringReader reader = new StringReader(asString);
JsonInput input = JSON.newInput(reader)) {
input.beginObject();
while (input.hasNext()) {
switch (input.nextName()) {
case "result":
LOG.fine("Sending result to consumer: " + consumer);
consumer.accept(input);
break;

default:
input.skipValue();
}
private void handle(CharSequence data) {
// It's kind of gross to decode the data twice, but this lets us get started on something
// that feels nice to users.
// TODO: decode once, and once only

String asString = String.valueOf(data);
LOG.fine(() -> String.format("<- %s", asString));

Map<String, Object> raw = JSON.toType(asString, MAP_TYPE);
if (raw.get("id") instanceof Number && raw.get("result") != null) {
Consumer<JsonInput> consumer = methodCallbacks.remove(((Number) raw.get("id")).longValue());
if (consumer == null) {
return;
}

try (StringReader reader = new StringReader(asString);
JsonInput input = JSON.newInput(reader)) {
input.beginObject();
while (input.hasNext()) {
switch (input.nextName()) {
case "result":
consumer.accept(input);
break;

default:
input.skipValue();
}
input.endObject();
}
} else if (raw.get("method") instanceof String && raw.get("params") instanceof Map) {
LOG.fine("Seen: " + raw);

synchronized (eventCallbacks) {
// TODO: Also only decode once.
eventCallbacks.keySet().stream()
.filter(event -> raw.get("method").equals(event.getMethod()))
.forEach(event -> {
// TODO: This is grossly inefficient. I apologise, and we should fix this.
try (StringReader reader = new StringReader(asString);
JsonInput input = JSON.newInput(reader)) {
Object value = null;
input.beginObject();
while (input.hasNext()) {
switch (input.nextName()) {
case "params":
value = event.getMapper().apply(input);
break;

default:
input.skipValue();
break;
}
}
input.endObject();

if (value == null) {
// Do nothing.
return;
}

final Object finalValue = value;

for (Consumer<?> action : eventCallbacks.get(event)) {
@SuppressWarnings("unchecked") Consumer<Object> obj = (Consumer<Object>) action;
obj.accept(finalValue);
}
input.endObject();
}
} else if (raw.get("method") instanceof String && raw.get("params") instanceof Map) {
synchronized (eventCallbacks) {
// TODO: Also only decode once.
eventCallbacks.keySet().stream()
.filter(event -> raw.get("method").equals(event.getMethod()))
.forEach(event -> {
// TODO: This is grossly inefficient. I apologise, and we should fix this.
try (StringReader reader = new StringReader(asString);
JsonInput input = JSON.newInput(reader)) {
Object value = null;
input.beginObject();
while (input.hasNext()) {
switch (input.nextName()) {
case "params":
value = event.getMapper().apply(input);
break;

default:
input.skipValue();
break;
}
});
}
} else {
LOG.warning("Unhandled type: " + data);
}
input.endObject();

if (value == null) {
// Do nothing.
return;
}

final Object finalValue = value;

for (Consumer<?> action : eventCallbacks.get(event)) {
@SuppressWarnings("unchecked") Consumer<Object> obj = (Consumer<Object>) action;
obj.accept(finalValue);
}
}
});
}
} else {
LOG.warning("Unhandled type: " + data);
}
}
}
8 changes: 6 additions & 2 deletions java/client/src/org/openqa/selenium/devtools/DevTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ public DevTools(Connection connection) {

@Override
public void close() {
connection.sendAndWait(
cdpSession, Target.detachFromTarget(Optional.of(cdpSession), Optional.empty()), timeout);
if (cdpSession != null) {
SessionID id = cdpSession;
cdpSession = null;
connection.sendAndWait(
cdpSession, Target.detachFromTarget(Optional.of(id), Optional.empty()), timeout);
}
}

public <X> X send(Command<X> command) {
Expand Down

0 comments on commit 3fa8447

Please sign in to comment.