-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
Copy pathDefaultAirbyteDestination.java
187 lines (156 loc) · 7.18 KB
/
DefaultAirbyteDestination.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.workers.internal;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.commons.protocol.DefaultProtocolSerializer;
import io.airbyte.commons.protocol.ProtocolSerializer;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.IntegrationLauncher;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultAirbyteDestination implements AirbyteDestination {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteDestination.class);
public static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("destination")
.setPrefixColor(Color.YELLOW_BACKGROUND);
static final Set<Integer> IGNORED_EXIT_CODES = Set.of(
0, // Normal exit
143 // SIGTERM
);
private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final AirbyteMessageBufferedWriterFactory messageWriterFactory;
private final ProtocolSerializer protocolSerializer;
private final AtomicBoolean inputHasEnded = new AtomicBoolean(false);
private Process destinationProcess = null;
private AirbyteMessageBufferedWriter writer = null;
private Iterator<AirbyteMessage> messageIterator = null;
private Integer exitValue = null;
public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER), new DefaultAirbyteMessageBufferedWriterFactory(),
new DefaultProtocolSerializer());
}
public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final AirbyteMessageBufferedWriterFactory messageWriterFactory,
final ProtocolSerializer protocolSerializer) {
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.messageWriterFactory = messageWriterFactory;
this.protocolSerializer = protocolSerializer;
}
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void start(final WorkerDestinationConfig destinationConfig, final Path jobRoot) throws IOException, WorkerException {
Preconditions.checkState(destinationProcess == null);
LOGGER.info("Running destination...");
destinationProcess = integrationLauncher.write(
jobRoot,
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
Jsons.serialize(destinationConfig.getDestinationConnectionConfiguration()),
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME,
protocolSerializer.serialize(destinationConfig.getCatalog()));
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination", CONTAINER_LOG_MDC_BUILDER);
writer = messageWriterFactory.createWriter(new BufferedWriter(new OutputStreamWriter(destinationProcess.getOutputStream(), Charsets.UTF_8)));
final List<Type> acceptedMessageTypes = List.of(Type.STATE, Type.TRACE, Type.CONTROL);
messageIterator = streamFactory.create(IOs.newBufferedReader(destinationProcess.getInputStream()))
.filter(message -> acceptedMessageTypes.contains(message.getType()))
.iterator();
}
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void accept(final AirbyteMessage message) throws IOException {
Preconditions.checkState(destinationProcess != null && !inputHasEnded.get());
writer.write(message);
}
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void notifyEndOfInput() throws IOException {
Preconditions.checkState(destinationProcess != null && !inputHasEnded.get());
writer.flush();
writer.close();
inputHasEnded.set(true);
}
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void close() throws Exception {
if (destinationProcess == null) {
LOGGER.debug("Destination process already exited");
return;
}
if (!inputHasEnded.get()) {
notifyEndOfInput();
}
LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(destinationProcess, 1, TimeUnit.MINUTES);
if (destinationProcess.isAlive() || !IGNORED_EXIT_CODES.contains(getExitValue())) {
final String message =
destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + getExitValue();
throw new WorkerException(message + ". This warning is normal if the job was cancelled.");
}
}
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public void cancel() throws Exception {
LOGGER.info("Attempting to cancel destination process...");
if (destinationProcess == null) {
LOGGER.info("Destination process no longer exists, cancellation is a no-op.");
} else {
LOGGER.info("Destination process exists, cancelling...");
WorkerUtils.cancelProcess(destinationProcess);
LOGGER.info("Cancelled destination process!");
}
}
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public boolean isFinished() {
Preconditions.checkState(destinationProcess != null);
/*
* As this check is done on every message read, it is important for this operation to be efficient.
* Short circuit early to avoid checking the underlying process. Note: hasNext is blocking.
*/
return !messageIterator.hasNext() && !destinationProcess.isAlive();
}
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public int getExitValue() {
Preconditions.checkState(destinationProcess != null, "Destination process is null, cannot retrieve exit value.");
Preconditions.checkState(!destinationProcess.isAlive(), "Destination process is still alive, cannot retrieve exit value.");
if (exitValue == null) {
exitValue = destinationProcess.exitValue();
}
return exitValue;
}
@Trace(operationName = WORKER_OPERATION_NAME)
@Override
public Optional<AirbyteMessage> attemptRead() {
Preconditions.checkState(destinationProcess != null);
return Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
}
}