From e5194a7b79a286b5b1ba27be8d77d50f517acbc8 Mon Sep 17 00:00:00 2001 From: Alexei Barantsev Date: Wed, 17 Feb 2021 13:58:48 +0300 Subject: [PATCH] [grid] Implementing node heartbeating that makes the distributor aware about node availability. Fixes #9182 --- .../grid/data/NodeHeartBeatEvent.java | 40 ++++++++++++ .../openqa/selenium/grid/data/NodeStatus.java | 24 +++++++ .../grid/distributor/local/GridModel.java | 65 +++++++++++++++++++ .../distributor/local/LocalDistributor.java | 5 ++ .../grid/node/config/NodeOptions.java | 6 ++ .../selenium/grid/node/httpd/NodeFlags.java | 7 ++ .../selenium/grid/node/k8s/OneShotNode.java | 6 ++ .../selenium/grid/node/local/LocalNode.java | 13 ++++ .../grid/node/local/LocalNodeFactory.java | 3 +- .../selenium/grid/data/NodeStatusTest.java | 2 + .../grid/distributor/AddingNodesTest.java | 2 + .../selector/DefaultSlotSelectorTest.java | 2 + 12 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 java/server/src/org/openqa/selenium/grid/data/NodeHeartBeatEvent.java diff --git a/java/server/src/org/openqa/selenium/grid/data/NodeHeartBeatEvent.java b/java/server/src/org/openqa/selenium/grid/data/NodeHeartBeatEvent.java new file mode 100644 index 0000000000000..d2efebc1b6271 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/data/NodeHeartBeatEvent.java @@ -0,0 +1,40 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.data; + +import org.openqa.selenium.events.Event; +import org.openqa.selenium.events.EventListener; +import org.openqa.selenium.events.EventName; +import org.openqa.selenium.internal.Require; + +import java.util.function.Consumer; + +public class NodeHeartBeatEvent extends Event { + + private static final EventName NODE_HEARTBEAT = new EventName("node-heartbeat"); + + public NodeHeartBeatEvent(NodeId nodeId) { + super(NODE_HEARTBEAT, Require.nonNull("Node id", nodeId)); + } + + public static EventListener listener(Consumer handler) { + Require.nonNull("Handler", handler); + + return new EventListener(NODE_HEARTBEAT, NodeId.class, handler); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/data/NodeStatus.java b/java/server/src/org/openqa/selenium/grid/data/NodeStatus.java index b5a1fe1e36ebd..ef7ef474988cb 100644 --- a/java/server/src/org/openqa/selenium/grid/data/NodeStatus.java +++ b/java/server/src/org/openqa/selenium/grid/data/NodeStatus.java @@ -23,6 +23,7 @@ import org.openqa.selenium.json.TypeToken; import java.net.URI; +import java.time.Duration; import java.time.Instant; import java.util.HashSet; import java.util.Map; @@ -40,8 +41,10 @@ public class NodeStatus { private final int maxSessionCount; private final Set slots; private final Availability availability; + private Duration heartbeatPeriod; private final String version; private final Map osInfo; + private long touched = System.currentTimeMillis(); public NodeStatus( NodeId nodeId, @@ -49,6 +52,7 @@ public NodeStatus( int maxSessionCount, Set slots, Availability availability, + Duration heartbeatPeriod, String version, Map osInfo) { this.nodeId = Require.nonNull("Node id", nodeId); @@ -58,6 +62,7 @@ public NodeStatus( "Make sure that a driver is available on $PATH"); this.slots = unmodifiableSet(new HashSet<>(Require.nonNull("Slots", slots))); this.availability = Require.nonNull("Availability", availability); + this.heartbeatPeriod = heartbeatPeriod; this.version = Require.nonNull("Grid Node version", version); this.osInfo = Require.nonNull("Node host OS info", osInfo); } @@ -68,6 +73,7 @@ public static NodeStatus fromJson(JsonInput input) { int maxSessions = 0; Set slots = null; Availability availability = null; + Duration heartbeatPeriod = null; String version = null; Map osInfo = null; @@ -78,6 +84,10 @@ public static NodeStatus fromJson(JsonInput input) { availability = input.read(Availability.class); break; + case "heartbeatPeriod": + heartbeatPeriod = Duration.ofMillis(input.read(Long.class)); + break; + case "id": nodeId = input.read(NodeId.class); break; @@ -116,6 +126,7 @@ public static NodeStatus fromJson(JsonInput input) { maxSessions, slots, availability, + heartbeatPeriod, version, osInfo); } @@ -179,6 +190,18 @@ public long getLastSessionCreated() { .orElse(0); } + public Duration heartbeatPeriod() { + return heartbeatPeriod; + } + + public void touch() { + touched = System.currentTimeMillis(); + } + + public long touched() { + return touched; + } + @Override public boolean equals(Object o) { if (!(o instanceof NodeStatus)) { @@ -206,6 +229,7 @@ private Map toJson() { toReturn.put("maxSessions", maxSessionCount); toReturn.put("slots", slots); toReturn.put("availability", availability); + toReturn.put("heartbeatPeriod", heartbeatPeriod.toMillis()); toReturn.put("version", version); toReturn.put("osInfo", osInfo); diff --git a/java/server/src/org/openqa/selenium/grid/distributor/local/GridModel.java b/java/server/src/org/openqa/selenium/grid/distributor/local/GridModel.java index 24b8e362a22be..1485f2a63b296 100644 --- a/java/server/src/org/openqa/selenium/grid/distributor/local/GridModel.java +++ b/java/server/src/org/openqa/selenium/grid/distributor/local/GridModel.java @@ -45,6 +45,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Logger; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toSet; import static org.openqa.selenium.grid.data.Availability.DOWN; import static org.openqa.selenium.grid.data.Availability.DRAINING; import static org.openqa.selenium.grid.data.Availability.UP; @@ -126,6 +128,22 @@ public GridModel refresh(NodeStatus status) { } } + public GridModel touch(NodeId id) { + Require.nonNull("Node ID", id); + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + AvailabilityAndNode availabilityAndNode = findNode(id); + if (availabilityAndNode != null) { + availabilityAndNode.status.touch(); + } + return this; + } finally { + writeLock.unlock(); + } + } + public GridModel remove(NodeId id) { Require.nonNull("Node ID", id); @@ -144,6 +162,51 @@ public GridModel remove(NodeId id) { } } + public void purgeDeadNodes() { + long now = System.currentTimeMillis(); + long timeout = 30000; + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + Set lost = nodes(UP).stream() + .filter(status -> now - status.touched() > status.heartbeatPeriod().toMillis()) + .collect(toSet()); + Set resurrected = nodes(DOWN).stream() + .filter(status -> now - status.touched() <= status.heartbeatPeriod().toMillis()) + .collect(toSet()); + Set dead = nodes(DOWN).stream() + .filter(status -> now - status.touched() > status.heartbeatPeriod().toMillis() * 4) + .collect(toSet()); + if (lost.size() > 0) { + LOG.info(String.format( + "Switching nodes %s from UP to DOWN", + lost.stream() + .map(node -> String.format("%s (uri: %s)", node.getId(), node.getUri())) + .collect(joining(", ")))); + nodes(UP).removeAll(lost); + nodes(DOWN).addAll(lost); + } + if (resurrected.size() > 0) { + LOG.info(String.format( + "Switching nodes %s from DOWN to UP", + resurrected.stream() + .map(node -> String.format("%s (uri: %s)", node.getId(), node.getUri())) + .collect(joining(", ")))); + nodes(UP).addAll(resurrected); + } + if (dead.size() > 0) { + LOG.info(String.format( + "Removing nodes %s that are DOWN for too long", + dead.stream() + .map(node -> String.format("%s (uri: %s)", node.getId(), node.getUri())) + .collect(joining(", ")))); + nodes(DOWN).removeAll(dead); + } + } finally { + writeLock.unlock(); + } + } + public Availability setAvailability(NodeId id, Availability availability) { Require.nonNull("Node ID", id); Require.nonNull("Availability", availability); @@ -251,6 +314,7 @@ private NodeStatus rewrite(NodeStatus status, Availability availability) { status.getMaxSessionCount(), status.getSlots(), availability, + status.heartbeatPeriod(), status.getVersion(), status.getOsInfo()); } @@ -362,6 +426,7 @@ private void amend(Availability availability, NodeStatus status, Slot slot) { status.getMaxSessionCount(), newSlots, status.getAvailability(), + status.heartbeatPeriod(), status.getVersion(), status.getOsInfo())); } diff --git a/java/server/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java b/java/server/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java index 227334a70251b..2c018b74b2311 100644 --- a/java/server/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java +++ b/java/server/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java @@ -41,6 +41,7 @@ import org.openqa.selenium.grid.data.NewSessionResponseEvent; import org.openqa.selenium.grid.data.NodeAddedEvent; import org.openqa.selenium.grid.data.NodeDrainComplete; +import org.openqa.selenium.grid.data.NodeHeartBeatEvent; import org.openqa.selenium.grid.data.NodeId; import org.openqa.selenium.grid.data.NodeRemovedEvent; import org.openqa.selenium.grid.data.NodeStatus; @@ -132,9 +133,13 @@ public LocalDistributor( bus.addListener(NodeStatusEvent.listener(this::register)); bus.addListener(NodeStatusEvent.listener(model::refresh)); + bus.addListener(NodeHeartBeatEvent.listener(model::touch)); bus.addListener(NodeDrainComplete.listener(this::remove)); bus.addListener(NewSessionRequestEvent.listener(requestIds::offer)); + Regularly regularly = new Regularly("Local Distributor"); + regularly.submit(model::purgeDeadNodes, Duration.ofSeconds(30), Duration.ofSeconds(30)); + Thread shutdownHook = new Thread(this::callExecutorShutdown); Runtime.getRuntime().addShutdownHook(shutdownHook); NewSessionRunnable runnable = new NewSessionRunnable(); diff --git a/java/server/src/org/openqa/selenium/grid/node/config/NodeOptions.java b/java/server/src/org/openqa/selenium/grid/node/config/NodeOptions.java index 70021572e2e4f..7d5d8a8f5f90c 100644 --- a/java/server/src/org/openqa/selenium/grid/node/config/NodeOptions.java +++ b/java/server/src/org/openqa/selenium/grid/node/config/NodeOptions.java @@ -92,6 +92,12 @@ public Duration getRegisterPeriod() { return Duration.ofSeconds(seconds); } + public Duration getHeartbeatPeriod() { + // If the user sets 0 or less, we default to 1s. + int seconds = Math.max(config.getInt(NODE_SECTION, "heartbeat-period").orElse(10), 1); + return Duration.ofSeconds(seconds); + } + public Map> getSessionFactories( /* Danger! Java stereotype ahead! */ Function> factoryFactory) { diff --git a/java/server/src/org/openqa/selenium/grid/node/httpd/NodeFlags.java b/java/server/src/org/openqa/selenium/grid/node/httpd/NodeFlags.java index 760a158765b59..46b3fd91adb46 100644 --- a/java/server/src/org/openqa/selenium/grid/node/httpd/NodeFlags.java +++ b/java/server/src/org/openqa/selenium/grid/node/httpd/NodeFlags.java @@ -114,6 +114,13 @@ public class NodeFlags implements HasRoles { @ConfigValue(section = "node", name = "register-cycle", example = "120") public int registerPeriod; + @Parameter( + names = "--heartbeat-period", + description = "How often, in seconds, will the Node send heartbeat events to the Distributor " + + "to inform it that the Node is up.") + @ConfigValue(section = "node", name = "heartbeat-period", example = "10") + public int heartbeatPeriod; + @Override public Set getRoles() { return Collections.singleton(NODE_ROLE); diff --git a/java/server/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java b/java/server/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java index 7e669710ac322..608b5e536ca34 100644 --- a/java/server/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java +++ b/java/server/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java @@ -61,6 +61,7 @@ import java.lang.reflect.Field; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -91,6 +92,7 @@ public class OneShotNode extends Node { private final EventBus events; private final WebDriverInfo driverInfo; private final Capabilities stereotype; + private final Duration heartbeatPeriod; private final URI gridUri; private final UUID slotId = UUID.randomUUID(); private RemoteWebDriver driver; @@ -103,6 +105,7 @@ private OneShotNode( Tracer tracer, EventBus events, Secret registrationSecret, + Duration heartbeatPeriod, NodeId id, URI uri, URI gridUri, @@ -110,6 +113,7 @@ private OneShotNode( WebDriverInfo driverInfo) { super(tracer, id, uri, registrationSecret); + this.heartbeatPeriod = heartbeatPeriod; this.events = Require.nonNull("Event bus", events); this.gridUri = Require.nonNull("Public Grid URI", gridUri); this.stereotype = ImmutableCapabilities.copyOf(Require.nonNull("Stereotype", stereotype)); @@ -147,6 +151,7 @@ public static Node create(Config config) { loggingOptions.getTracer(), eventOptions.getEventBus(), secretOptions.getRegistrationSecret(), + nodeOptions.getHeartbeatPeriod(), new NodeId(UUID.randomUUID()), serverOptions.getExternalUri(), nodeOptions.getPublicGridUri().orElseThrow(() -> new ConfigException("Unable to determine public grid address")), @@ -360,6 +365,7 @@ public NodeStatus getStatus() { Optional.empty() : Optional.of(new Session(sessionId, getUri(), stereotype, capabilities, Instant.now())))), isDraining() ? DRAINING : UP, + heartbeatPeriod, getNodeVersion(), getOsInfo()); } diff --git a/java/server/src/org/openqa/selenium/grid/node/local/LocalNode.java b/java/server/src/org/openqa/selenium/grid/node/local/LocalNode.java index 57b7155144471..36475142e0a80 100644 --- a/java/server/src/org/openqa/selenium/grid/node/local/LocalNode.java +++ b/java/server/src/org/openqa/selenium/grid/node/local/LocalNode.java @@ -38,6 +38,7 @@ import org.openqa.selenium.grid.data.CreateSessionResponse; import org.openqa.selenium.grid.data.NodeDrainComplete; import org.openqa.selenium.grid.data.NodeDrainStarted; +import org.openqa.selenium.grid.data.NodeHeartBeatEvent; import org.openqa.selenium.grid.data.NodeId; import org.openqa.selenium.grid.data.NodeStatus; import org.openqa.selenium.grid.data.Session; @@ -107,6 +108,7 @@ public class LocalNode extends Node { private final EventBus bus; private final URI externalUri; private final URI gridUri; + private final Duration heartbeatPeriod; private final HealthCheck healthCheck; private final int maxSessionCount; private final List factories; @@ -124,6 +126,7 @@ private LocalNode( int maxSessionCount, Ticker ticker, Duration sessionTimeout, + Duration heartbeatPeriod, List factories, Secret registrationSecret) { super(tracer, new NodeId(UUID.randomUUID()), uri, registrationSecret); @@ -133,6 +136,7 @@ private LocalNode( this.externalUri = Require.nonNull("Remote node URI", uri); this.gridUri = Require.nonNull("Grid URI", gridUri); this.maxSessionCount = Math.min(Require.positive("Max session count", maxSessionCount), factories.size()); + this.heartbeatPeriod = heartbeatPeriod; this.factories = ImmutableList.copyOf(factories); Require.nonNull("Registration secret", registrationSecret); @@ -168,6 +172,7 @@ private LocalNode( this.regularly = new Regularly("Local Node: " + externalUri); regularly.submit(currentSessions::cleanUp, Duration.ofSeconds(30), Duration.ofSeconds(30)); regularly.submit(tempFileSystems::cleanUp, Duration.ofSeconds(30), Duration.ofSeconds(30)); + regularly.submit(() -> bus.fire(new NodeHeartBeatEvent(getId())), heartbeatPeriod, heartbeatPeriod); bus.addListener(SessionClosedEvent.listener(id -> { try { @@ -512,6 +517,7 @@ public NodeStatus getStatus() { maxSessionCount, slots, isDraining() ? DRAINING : UP, + heartbeatPeriod, getNodeVersion(), getOsInfo()); } @@ -557,6 +563,7 @@ public static class Builder { private Ticker ticker = Ticker.systemTicker(); private Duration sessionTimeout = Duration.ofMinutes(5); private HealthCheck healthCheck; + private Duration heartbeatPeriod; private Builder( Tracer tracer, @@ -591,6 +598,11 @@ public Builder sessionTimeout(Duration timeout) { return this; } + public Builder heartbeatPeriod(Duration heartbeatPeriod) { + this.heartbeatPeriod = heartbeatPeriod; + return this; + } + public LocalNode build() { return new LocalNode( tracer, @@ -601,6 +613,7 @@ public LocalNode build() { maxCount, ticker, sessionTimeout, + heartbeatPeriod, factories.build(), registrationSecret); } diff --git a/java/server/src/org/openqa/selenium/grid/node/local/LocalNodeFactory.java b/java/server/src/org/openqa/selenium/grid/node/local/LocalNodeFactory.java index e0bcb92a83e92..d95630d57d24c 100644 --- a/java/server/src/org/openqa/selenium/grid/node/local/LocalNodeFactory.java +++ b/java/server/src/org/openqa/selenium/grid/node/local/LocalNodeFactory.java @@ -60,7 +60,8 @@ public static Node create(Config config) { serverOptions.getExternalUri(), nodeOptions.getPublicGridUri().orElseGet(serverOptions::getExternalUri), secretOptions.getRegistrationSecret()) - .maximumConcurrentSessions(nodeOptions.getMaxSessions()); + .maximumConcurrentSessions(nodeOptions.getMaxSessions()) + .heartbeatPeriod(nodeOptions.getHeartbeatPeriod()); List> builders = new ArrayList<>(); diff --git a/java/server/test/org/openqa/selenium/grid/data/NodeStatusTest.java b/java/server/test/org/openqa/selenium/grid/data/NodeStatusTest.java index ece3868883b60..98a4b89ba73aa 100644 --- a/java/server/test/org/openqa/selenium/grid/data/NodeStatusTest.java +++ b/java/server/test/org/openqa/selenium/grid/data/NodeStatusTest.java @@ -26,6 +26,7 @@ import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.time.Instant; import java.util.Optional; import java.util.UUID; @@ -55,6 +56,7 @@ public void ensureRoundTripWorks() throws URISyntaxException { new ImmutableCapabilities("peas", "sausages"), Instant.now())))), UP, + Duration.ofSeconds(10), "4.0.0", ImmutableMap.of( "name", "Max OS X", diff --git a/java/server/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java b/java/server/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java index 250558fd007ea..49cc1b52ee4f1 100644 --- a/java/server/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java +++ b/java/server/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java @@ -254,6 +254,7 @@ public void distributorShouldUpdateStateOfExistingNodeWhenNodePublishesStateChan Optional.of(new Session( new SessionId(UUID.randomUUID()), sessionUri, CAPS, CAPS, Instant.now())))), UP, + Duration.ofSeconds(10), status.getVersion(), status.getOsInfo()); @@ -376,6 +377,7 @@ public NodeStatus getStatus() { Instant.now(), Optional.ofNullable(sess))), UP, + Duration.ofSeconds(10), getNodeVersion(), getOsInfo()); } diff --git a/java/server/test/org/openqa/selenium/grid/distributor/selector/DefaultSlotSelectorTest.java b/java/server/test/org/openqa/selenium/grid/distributor/selector/DefaultSlotSelectorTest.java index b1bb789232a58..e435175a0896b 100644 --- a/java/server/test/org/openqa/selenium/grid/distributor/selector/DefaultSlotSelectorTest.java +++ b/java/server/test/org/openqa/selenium/grid/distributor/selector/DefaultSlotSelectorTest.java @@ -45,6 +45,7 @@ import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.Collections; @@ -209,6 +210,7 @@ private NodeStatus createNode(List stereotypes, int count, int cur count, ImmutableSet.copyOf(slots), UP, + Duration.ofSeconds(10), "4.0.0", ImmutableMap.of( "name", "Max OS X",