Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop using gentle close with heartbeat #8036

Merged
merged 3 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.airbyte.integrations.destination.e2e_test;

import static java.lang.Thread.sleep;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailAfterNDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(FailAfterNDestination.class);

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
return new FailAfterNConsumer(config.get("num_messages").asLong(), outputRecordCollector);
}

public static class FailAfterNConsumer implements AirbyteMessageConsumer {

private final Consumer<AirbyteMessage> outputRecordCollector;
private final long numMessagesAfterWhichToFail;
private long numMessagesSoFar;

public FailAfterNConsumer(final long numMessagesAfterWhichToFail, final Consumer<AirbyteMessage> outputRecordCollector) {
this.numMessagesAfterWhichToFail = numMessagesAfterWhichToFail;
this.outputRecordCollector = outputRecordCollector;
this.numMessagesSoFar = 0;
}

@Override
public void start() {}

@Override
public void accept(final AirbyteMessage message) throws Exception {
LOGGER.info("received record: {}", message);
numMessagesSoFar += 1;
LOGGER.info("received {} messages so far", numMessagesSoFar);

if (numMessagesSoFar > numMessagesAfterWhichToFail) {
throw new IllegalStateException("Forcing a fail after processing " + numMessagesAfterWhichToFail + " messages.");
}

if (message.getType() == Type.STATE) {
LOGGER.info("emitting state: {}", message);
outputRecordCollector.accept(message);
}
}

@Override
public void close() {}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ public class TestingDestinations extends BaseConnector implements Destination {
public enum TestDestinationType {
LOGGING,
THROTTLED,
SILENT
SILENT,
FAILING
}

public TestingDestinations() {
this(ImmutableMap.<TestDestinationType, Destination>builder()
.put(TestDestinationType.LOGGING, new LoggingDestination())
.put(TestDestinationType.THROTTLED, new ThrottledDestination())
.put(TestDestinationType.SILENT, new SilentDestination())
.put(TestDestinationType.FAILING, new FailAfterNDestination())
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@
"type": "integer"
}
}
},
{
"title": "Failing",
"required": ["type", "num_messages"],
"additionalProperties": false,
"properties": {
"type": {
"type": "string",
"const": "FAILING",
"default": "FAILING"
},
"num_messages": {
"description": "Number of messages after which to fail.",
"type": "integer"
}
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.source.e2e_test;

import static java.lang.Thread.sleep;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -51,13 +53,23 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final C
final Predicate<Long> anotherRecordPredicate =
config.has("max_records") ? recordNumber -> recordNumber < config.get("max_records").asLong() : recordNumber -> true;

final long sleepTime = config.has("message_interval") ? config.get("message_interval").asLong() : 3000L;

final AtomicLong i = new AtomicLong();

return AutoCloseableIterators.fromIterator(new AbstractIterator<>() {

@Override
protected AirbyteMessage computeNext() {
if (anotherRecordPredicate.test(i.get())) {
if (i.get() != 0) {
try {
LOGGER.info("sleeping for {} ms", sleepTime);
sleep(sleepTime);
} catch (final InterruptedException e) {
throw new RuntimeException();
}
}
i.incrementAndGet();
LOGGER.info("source emitting record {}:", i.get());
return new AirbyteMessage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
},
{
"title": "Infinite Feed",
"required": ["type", "max_records"],
"required": ["type", "max_records", "message_interval"],
"additionalProperties": false,
"properties": {
"type": {
Expand All @@ -36,6 +36,11 @@
"title": "Max Records",
"description": "Number of records to emit. If not set, defaults to infinity.",
"type": "integer"
},
"message_interval": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw this field could probably be optional.

"title": "Message Interval",
"description": "Interval between messages in ms.",
"type": "integer"
}
}
}
Expand Down
25 changes: 6 additions & 19 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void gentleClose(final Process process, final long timeout, final
}

if (process.isAlive()) {
forceShutdown(process, Duration.of(1, ChronoUnit.MINUTES));
closeProcess(process, Duration.of(1, ChronoUnit.MINUTES));
}
}

Expand Down Expand Up @@ -90,7 +90,7 @@ public static void gentleCloseWithHeartbeat(final Process process,
gracefulShutdownDuration,
checkHeartbeatDuration,
forcedShutdownDuration,
WorkerUtils::forceShutdown);
WorkerUtils::closeProcess);
}

@VisibleForTesting
Expand Down Expand Up @@ -134,28 +134,15 @@ static void gentleCloseWithHeartbeat(final Process process,
}
}

@VisibleForTesting
static void forceShutdown(final Process process, final Duration lastChanceDuration) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was basically a copy of closeProcess(), but without the logic to actually forcibly shutdown the process (despite the name). So I opted to just remove it and use closeProcess in its place instead.

LOGGER.warn("Process is taking too long to finish. Killing it");
process.destroy();
try {
process.waitFor(lastChanceDuration.toMillis(), TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
LOGGER.error("Exception while while killing the process", e);
}
if (process.isAlive()) {
LOGGER.error("Couldn't kill the process. You might have a zombie process.");
}
}

public static void closeProcess(final Process process, final int duration, final TimeUnit timeUnit) {
public static void closeProcess(final Process process, final Duration lastChanceDuration) {
if (process == null) {
return;
}
try {
process.destroy();
process.waitFor(duration, timeUnit);
process.waitFor(lastChanceDuration.toMillis(), TimeUnit.MILLISECONDS);
if (process.isAlive()) {
LOGGER.warn("Process is still alive after calling destroy. Attempting to destroy forcibly...");
process.destroyForcibly();
}
} catch (final InterruptedException e) {
Expand All @@ -172,7 +159,7 @@ public static void wait(final Process process) {
}

public static void cancelProcess(final Process process) {
closeProcess(process, 10, TimeUnit.SECONDS);
closeProcess(process, Duration.of(10, ChronoUnit.SECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void close() throws Exception {
}

LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS);
WorkerUtils.gentleClose(destinationProcess, 1, TimeUnit.MINUTES);
if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) {
final String message =
destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,11 +33,7 @@ public class DefaultAirbyteSource implements AirbyteSource {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteSource.class);

private static final Duration HEARTBEAT_FRESH_DURATION = Duration.of(5, ChronoUnit.MINUTES);
private static final Duration CHECK_HEARTBEAT_DURATION = Duration.of(10, ChronoUnit.SECONDS);
// todo (cgardens) - keep the graceful shutdown consistent with current behavior for release. make
// sure everything is working well before we reduce this to something more reasonable.
private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(10, ChronoUnit.HOURS);
private static final Duration FORCED_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES);
private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES);

private static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("source")
Expand Down Expand Up @@ -110,12 +107,10 @@ public void close() throws Exception {
}

LOGGER.debug("Closing source process");
WorkerUtils.gentleCloseWithHeartbeat(
WorkerUtils.gentleClose(
sourceProcess,
heartbeatMonitor,
GRACEFUL_SHUTDOWN_DURATION,
CHECK_HEARTBEAT_DURATION,
FORCED_SHUTDOWN_DURATION);
GRACEFUL_SHUTDOWN_DURATION.toMillis(),
TimeUnit.MILLISECONDS);

if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) {
final String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + sourceProcess.exitValue();
Expand Down