Skip to content

Commit 27b1082

Browse files
committed
Support silently capturing output from Python if necessary.
1 parent 808fd7e commit 27b1082

File tree

2 files changed

+170
-17
lines changed

2 files changed

+170
-17
lines changed

dqops/src/main/java/com/dqops/utils/python/StreamingPythonProcess.java

+24-17
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class StreamingPythonProcess implements Closeable, ExecuteResultHandler {
4343
private static final int PYTHON_BUFFER_SIZE = 1024; // buffer size used in the python streaming process in a call to TextIO.read(buffer_size)
4444
private static final int PYTHON_RECEIVE_RESPONSE_BUFFER_SIZE = 1024;
4545
private static final byte[] PYTHON_BUFFER_SPACE = StringUtils.repeat(' ', PYTHON_BUFFER_SIZE + 10).getBytes(StandardCharsets.UTF_8);
46+
private static final boolean STOP_ON_STDERR_ERROR = true; // TODO: We can support using a parameter to turn it on/off
4647

4748
private PipedOutputStream writeToProcessStream;
4849
private PipedInputStream writeToProcessStreamProcessSide;
@@ -257,23 +258,29 @@ public void startProcessCore(PythonVirtualEnv pythonVirtualEnv) {
257258
this.jsonFactory = new JsonFactory();
258259
this.jsonParser = jsonFactory.createParser(this.readFromProcessStreamReader);
259260

260-
ActivityDetectionOutputStream errorOutputStream = new ActivityDetectionOutputStream(new FlushingOutputStream(this.errorStream));
261-
this.outputDetectedOnStderrFuture = errorOutputStream.getOutputDetectedFuture();
262-
this.outputDetectedOnStderrFuture
263-
.thenRun(() -> {
264-
try {
265-
// we detected that some output was written to the stderr of the python process, it is an error and we will terminate...
266-
Thread.sleep(100); // we need to wait for the remaining output
267-
this.waitForClose.countDown();
268-
this.close();
269-
270-
String errStreamText = this.errorStream.toString(StandardCharsets.UTF_8);
271-
log.error("Python process failed with an error, the error captured from the stderr: " + errStreamText);
272-
}
273-
catch (Exception ioe) {
274-
log.error("Python process failed with an error and we cannot close the stream: " + ioe.getMessage(), ioe);
275-
}
276-
});
261+
OutputStream errorOutputStream = null;
262+
if (STOP_ON_STDERR_ERROR) {
263+
ActivityDetectionOutputStream activityDetectionOutputStream = new ActivityDetectionOutputStream(new FlushingOutputStream(this.errorStream));
264+
errorOutputStream = activityDetectionOutputStream;
265+
this.outputDetectedOnStderrFuture = activityDetectionOutputStream.getOutputDetectedFuture();
266+
this.outputDetectedOnStderrFuture
267+
.thenRun(() -> {
268+
try {
269+
// we detected that some output was written to the stderr of the python process, it is an error and we will terminate...
270+
Thread.sleep(100); // we need to wait for the remaining output
271+
this.waitForClose.countDown();
272+
this.close();
273+
274+
String errStreamText = this.errorStream.toString(StandardCharsets.UTF_8);
275+
log.error("Python process failed with an error, the error captured from the stderr: " + errStreamText);
276+
} catch (Exception ioe) {
277+
log.error("Python process failed with an error and we cannot close the stream: " + ioe.getMessage(), ioe);
278+
}
279+
});
280+
} else {
281+
errorOutputStream = new TailOutputStream(this.errorStream);
282+
this.outputDetectedOnStderrFuture = new CompletableFuture<>();
283+
}
277284

278285
this.streamHandler = new FlushingPumpStreamHandler(
279286
new FlushingOutputStream(this.readFromProcessStreamProcessSide),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright © 2021 DQOps (support@dqops.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.dqops.utils.python;
17+
18+
import java.io.ByteArrayOutputStream;
19+
import java.io.IOException;
20+
import java.io.OutputStream;
21+
import java.util.concurrent.CompletableFuture;
22+
23+
/**
24+
* Output stream is simply captures the most recently received content and constantly rewinds the output stream.
25+
*/
26+
public class TailOutputStream extends OutputStream {
27+
private final OutputStream nestedStream;
28+
29+
/**
30+
* Creates an output stream wrapper that will detect any data that was written.
31+
* @param nestedStream Nested stream.
32+
*/
33+
public TailOutputStream(OutputStream nestedStream) {
34+
this.nestedStream = nestedStream;
35+
}
36+
37+
/**
38+
* Writes {@code b.length} bytes from the specified byte array
39+
* to this output stream. The general contract for {@code write(b)}
40+
* is that it should have exactly the same effect as the call
41+
* {@code write(b, 0, b.length)}.
42+
*
43+
* @param b the data.
44+
* @throws IOException if an I/O error occurs.
45+
* @see OutputStream#write(byte[], int, int)
46+
*/
47+
@Override
48+
public void write(byte[] b) throws IOException {
49+
this.nestedStream.write(b);
50+
}
51+
52+
/**
53+
* Writes {@code len} bytes from the specified byte array
54+
* starting at offset {@code off} to this output stream.
55+
* The general contract for {@code write(b, off, len)} is that
56+
* some of the bytes in the array {@code b} are written to the
57+
* output stream in order; element {@code b[off]} is the first
58+
* byte written and {@code b[off+len-1]} is the last byte written
59+
* by this operation.
60+
* <p>
61+
* The {@code write} method of {@code OutputStream} calls
62+
* the write method of one argument on each of the bytes to be
63+
* written out. Subclasses are encouraged to override this method and
64+
* provide a more efficient implementation.
65+
* <p>
66+
* If {@code b} is {@code null}, a
67+
* {@code NullPointerException} is thrown.
68+
* <p>
69+
* If {@code off} is negative, or {@code len} is negative, or
70+
* {@code off+len} is greater than the length of the array
71+
* {@code b}, then an {@code IndexOutOfBoundsException} is thrown.
72+
*
73+
* @param b the data.
74+
* @param off the start offset in the data.
75+
* @param len the number of bytes to write.
76+
* @throws IOException if an I/O error occurs. In particular,
77+
* an {@code IOException} is thrown if the output
78+
* stream is closed.
79+
*/
80+
@Override
81+
public void write(byte[] b, int off, int len) throws IOException {
82+
if (this.nestedStream instanceof ByteArrayOutputStream) {
83+
ByteArrayOutputStream byteArrayOutputStream = (ByteArrayOutputStream)this.nestedStream;
84+
byteArrayOutputStream.reset();
85+
}
86+
this.nestedStream.write(b, off, len);
87+
}
88+
89+
/**
90+
* Writes the specified byte to this output stream. The general
91+
* contract for {@code write} is that one byte is written
92+
* to the output stream. The byte to be written is the eight
93+
* low-order bits of the argument {@code b}. The 24
94+
* high-order bits of {@code b} are ignored.
95+
* <p>
96+
* Subclasses of {@code OutputStream} must provide an
97+
* implementation for this method.
98+
*
99+
* @param b the {@code byte}.
100+
* @throws IOException if an I/O error occurs. In particular,
101+
* an {@code IOException} may be thrown if the
102+
* output stream has been closed.
103+
*/
104+
@Override
105+
public void write(int b) throws IOException {
106+
this.nestedStream.write(b);
107+
}
108+
109+
/**
110+
* Flushes this output stream and forces any buffered output bytes
111+
* to be written out. The general contract of {@code flush} is
112+
* that calling it is an indication that, if any bytes previously
113+
* written have been buffered by the implementation of the output
114+
* stream, such bytes should immediately be written to their
115+
* intended destination.
116+
* <p>
117+
* If the intended destination of this stream is an abstraction provided by
118+
* the underlying operating system, for example a file, then flushing the
119+
* stream guarantees only that bytes previously written to the stream are
120+
* passed to the operating system for writing; it does not guarantee that
121+
* they are actually written to a physical device such as a disk drive.
122+
* <p>
123+
* The {@code flush} method of {@code OutputStream} does nothing.
124+
*
125+
* @throws IOException if an I/O error occurs.
126+
*/
127+
@Override
128+
public void flush() throws IOException {
129+
nestedStream.flush();
130+
}
131+
132+
/**
133+
* Closes this output stream and releases any system resources
134+
* associated with this stream. The general contract of {@code close}
135+
* is that it closes the output stream. A closed stream cannot perform
136+
* output operations and cannot be reopened.
137+
* <p>
138+
* The {@code close} method of {@code OutputStream} does nothing.
139+
*
140+
* @throws IOException if an I/O error occurs.
141+
*/
142+
@Override
143+
public void close() throws IOException {
144+
nestedStream.close();
145+
}
146+
}

0 commit comments

Comments
 (0)