Skip to content

Commit ba7e7f4

Browse files
authored
Merge 8049e3e into 5050a8e
2 parents 5050a8e + 8049e3e commit ba7e7f4

File tree

3 files changed

+89
-17
lines changed
  • dolphinscheduler-task-plugin
    • dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils
    • dolphinscheduler-task-remoteshell/src

3 files changed

+89
-17
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java

+28-15
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.commons.lang3.StringUtils;
4040
import org.apache.commons.lang3.SystemUtils;
4141

42+
import java.util.ArrayList;
4243
import java.util.Arrays;
4344
import java.util.HashMap;
4445
import java.util.List;
@@ -117,33 +118,45 @@ public static boolean kill(@NonNull TaskExecutionContext request) {
117118
* @throws Exception exception
118119
*/
119120
public static String getPidsStr(int processId) throws Exception {
120-
StringBuilder sb = new StringBuilder();
121-
Matcher mat = null;
121+
122+
String rawPidStr;
123+
122124
// pstree pid get sub pids
123125
if (SystemUtils.IS_OS_MAC) {
124-
String pids = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId));
125-
if (StringUtils.isNotEmpty(pids)) {
126-
mat = MACPATTERN.matcher(pids);
126+
rawPidStr = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId));
127+
} else if (SystemUtils.IS_OS_LINUX) {
128+
rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
129+
} else {
130+
rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
131+
}
132+
133+
return parsePidStr(rawPidStr);
134+
}
135+
136+
public static String parsePidStr(String rawPidStr) {
137+
138+
log.info("prepare to parse pid, raw pid string: {}", rawPidStr);
139+
ArrayList<String> allPidList = new ArrayList<>();
140+
Matcher mat = null;
141+
if (SystemUtils.IS_OS_MAC) {
142+
if (StringUtils.isNotEmpty(rawPidStr)) {
143+
mat = MACPATTERN.matcher(rawPidStr);
127144
}
128145
} else if (SystemUtils.IS_OS_LINUX) {
129-
String pids = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
130-
if (StringUtils.isNotEmpty(pids)) {
131-
mat = LINUXPATTERN.matcher(pids);
146+
if (StringUtils.isNotEmpty(rawPidStr)) {
147+
mat = LINUXPATTERN.matcher(rawPidStr);
132148
}
133149
} else {
134-
String pids = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
135-
if (StringUtils.isNotEmpty(pids)) {
136-
mat = WINDOWSPATTERN.matcher(pids);
150+
if (StringUtils.isNotEmpty(rawPidStr)) {
151+
mat = WINDOWSPATTERN.matcher(rawPidStr);
137152
}
138153
}
139-
140154
if (null != mat) {
141155
while (mat.find()) {
142-
sb.append(mat.group(1)).append(" ");
156+
allPidList.add(mat.group(1));
143157
}
144158
}
145-
146-
return sb.toString().trim();
159+
return String.join(" ", allPidList).trim();
147160
}
148161

149162
/**

dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java

+42-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.remoteshell;
1919

20+
import static org.apache.dolphinscheduler.plugin.task.remoteshell.RemoteExecutor.COMMAND.PSTREE_COMMAND;
21+
2022
import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils;
2123
import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam;
2224
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
2325
import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
26+
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
2427

2528
import org.apache.commons.lang3.StringUtils;
29+
import org.apache.commons.lang3.math.NumberUtils;
2630
import org.apache.sshd.client.SshClient;
2731
import org.apache.sshd.client.channel.ChannelExec;
2832
import org.apache.sshd.client.channel.ClientChannelEvent;
@@ -50,7 +54,6 @@ public class RemoteExecutor implements AutoCloseable {
5054
static final int TRACK_INTERVAL = 5000;
5155

5256
protected Map<String, String> taskOutputParams = new HashMap<>();
53-
5457
private SshClient sshClient;
5558
private ClientSession session;
5659
private SSHConnectionParam sshConnectionParam;
@@ -154,11 +157,45 @@ public void cleanData(String taskId) {
154157

155158
public void kill(String taskId) throws IOException {
156159
String pid = getTaskPid(taskId);
157-
String killCommand = String.format(COMMAND.KILL_COMMAND, pid);
160+
161+
if (StringUtils.isEmpty(pid)) {
162+
log.warn("query remote-shell task remote process id with empty");
163+
return;
164+
}
165+
if (!NumberUtils.isParsable(pid)) {
166+
log.error("query remote-shell task remote process id error, pid {} can not parse to number", pid);
167+
return;
168+
}
169+
170+
// query all pid
171+
String remotePidStr = getAllRemotePidStr(pid);
172+
String killCommand = String.format(COMMAND.KILL_COMMAND, remotePidStr);
173+
log.info("prepare to execute kill command in host: {}, kill cmd: {}", sshConnectionParam.getHost(),
174+
killCommand);
158175
runRemote(killCommand);
159176
cleanData(taskId);
160177
}
161178

179+
protected String getAllRemotePidStr(String pid) {
180+
181+
String remoteProcessIdStr = "";
182+
String cmd = String.format(PSTREE_COMMAND, pid);
183+
log.info("query all process id cmd: {}", cmd);
184+
185+
try {
186+
String rawPidStr = runRemote(cmd);
187+
remoteProcessIdStr = ProcessUtils.parsePidStr(rawPidStr);
188+
if (!remoteProcessIdStr.startsWith(pid)) {
189+
log.error("query remote process id error, [{}] first pid not equal [{}]", remoteProcessIdStr, pid);
190+
remoteProcessIdStr = pid;
191+
}
192+
} catch (Exception e) {
193+
log.error("query remote all process id error", e);
194+
remoteProcessIdStr = pid;
195+
}
196+
return remoteProcessIdStr;
197+
}
198+
162199
public String getTaskPid(String taskId) throws IOException {
163200
String pidCommand = String.format(COMMAND.GET_PID_COMMAND, taskId);
164201
return runRemote(pidCommand).trim();
@@ -238,6 +275,9 @@ private COMMAND() {
238275
static final String ADD_STATUS_COMMAND = "\necho %s$?";
239276

240277
static final String CAT_FINAL_SCRIPT = "cat %s%s.sh";
278+
279+
static final String PSTREE_COMMAND = "pstree -p %s";
280+
241281
}
242282

243283
}

dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java

+19
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.remoteshell;
1919

20+
import static org.mockito.ArgumentMatchers.anyString;
2021
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
2122
import static org.mockito.Mockito.doNothing;
2223
import static org.mockito.Mockito.doReturn;
@@ -135,4 +136,22 @@ void testGetTaskExitCode() throws IOException {
135136
doReturn("DOLPHINSCHEDULER-REMOTE-SHELL-TASK-STATUS-1").when(remoteExecutor).runRemote(trackCommand);
136137
Assertions.assertEquals(1, remoteExecutor.getTaskExitCode(taskId));
137138
}
139+
140+
@Test
141+
void getAllRemotePidStr() throws IOException {
142+
143+
RemoteExecutor remoteExecutor = spy(new RemoteExecutor(sshConnectionParam));
144+
doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
145+
String allPidStr = remoteExecutor.getAllRemotePidStr("9527");
146+
Assertions.assertEquals("9527 9528", allPidStr);
147+
148+
doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
149+
allPidStr = remoteExecutor.getAllRemotePidStr("9527");
150+
Assertions.assertEquals("9527", allPidStr);
151+
152+
doThrow(new TaskException()).when(remoteExecutor).runRemote(anyString());
153+
allPidStr = remoteExecutor.getAllRemotePidStr("9527");
154+
Assertions.assertEquals("9527", allPidStr);
155+
156+
}
138157
}

0 commit comments

Comments
 (0)