Skip to content

Commit 2d870d5

Browse files
author
healzhou
committed
[INLONG-738][Manager] Supports paging query for workflow execution log
1 parent 61b2bb1 commit 2d870d5

File tree

9 files changed

+121
-102
lines changed

9 files changed

+121
-102
lines changed

inlong-manager/manager-dao/src/main/resources/mappers/ProcessInstanceEntityMapper.xml

+44-40
Original file line numberDiff line numberDiff line change
@@ -51,46 +51,50 @@
5151
select
5252
<include refid="Base_Column_List"/>
5353
from wf_process_instance
54-
where 1=1
55-
<if test="id != null">
56-
and id = #{id,jdbcType=INTEGER}
57-
</if>
58-
<if test="ids != null and ids.size()>0">
59-
AND id IN
60-
<foreach item="item" index="index" collection="ids" open="(" close=")" separator=",">
61-
#{item}
62-
</foreach>
63-
</if>
64-
<if test="name != null and name !=''">
65-
and name = #{name,jdbcType=VARCHAR}
66-
</if>
67-
<if test="displayName != null and displayName !=''">
68-
and display_name = #{displayName,jdbcType=VARCHAR}
69-
</if>
70-
<if test="businessId != null and businessId !=''">
71-
and business_id = #{businessId,jdbcType=VARCHAR}
72-
</if>
73-
<if test="applicant != null and applicant !=''">
74-
and applicant = #{applicant,jdbcType=VARCHAR}
75-
</if>
76-
<if test="state != null">
77-
and state = #{state,jdbcType=VARCHAR}
78-
</if>
79-
<if test="startTimeBegin != null">
80-
and start_time >= #{startTimeBegin,jdbcType=TIMESTAMP}
81-
</if>
82-
<if test="startTimeEnd != null">
83-
and start_time &lt;= #{startTimeEnd,jdbcType=TIMESTAMP}
84-
</if>
85-
<if test="endTimeBegin != null">
86-
and end_time >= #{endTimeBegin,jdbcType=TIMESTAMP}
87-
</if>
88-
<if test="endTimeEnd != null">
89-
and end_time &lt;= #{endTimeEnd,jdbcType=TIMESTAMP}
90-
</if>
91-
<if test="hidden != null">
92-
and hidden = #{hidden,jdbcType=BIT}
93-
</if>
54+
<where>
55+
<if test="id != null">
56+
and id = #{id,jdbcType=INTEGER}
57+
</if>
58+
<if test="idList != null and idList.size()>0">
59+
and id in
60+
<foreach item="item" index="index" collection="idList" open="(" close=")" separator=",">
61+
#{item}
62+
</foreach>
63+
</if>
64+
<if test="nameList != null and nameList.size() > 0">
65+
and name in
66+
<foreach item="item" index="index" collection="nameList" open="(" close=")" separator=",">
67+
#{item}
68+
</foreach>
69+
</if>
70+
<if test="displayName != null and displayName !=''">
71+
and display_name = #{displayName,jdbcType=VARCHAR}
72+
</if>
73+
<if test="businessId != null and businessId !=''">
74+
and business_id = #{businessId,jdbcType=VARCHAR}
75+
</if>
76+
<if test="applicant != null and applicant !=''">
77+
and applicant = #{applicant,jdbcType=VARCHAR}
78+
</if>
79+
<if test="state != null">
80+
and state = #{state,jdbcType=VARCHAR}
81+
</if>
82+
<if test="startTimeBegin != null">
83+
and start_time >= #{startTimeBegin,jdbcType=TIMESTAMP}
84+
</if>
85+
<if test="startTimeEnd != null">
86+
and start_time &lt;= #{startTimeEnd,jdbcType=TIMESTAMP}
87+
</if>
88+
<if test="endTimeBegin != null">
89+
and end_time >= #{endTimeBegin,jdbcType=TIMESTAMP}
90+
</if>
91+
<if test="endTimeEnd != null">
92+
and end_time &lt;= #{endTimeEnd,jdbcType=TIMESTAMP}
93+
</if>
94+
<if test="hidden != null">
95+
and hidden = #{hidden,jdbcType=BIT}
96+
</if>
97+
</where>
9498
order by id desc
9599
</select>
96100

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,17 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
7979

8080
Integer tryNumber = tryBean.getMaxAttempts();
8181
Long delay = tryBean.getDelay();
82-
while (--tryNumber > 0) {
83-
if (topicExist) {
84-
break;
85-
}
86-
82+
while (!topicExist && --tryNumber > 0) {
83+
log.info("check whether the tube topic exists, try count={}", tryNumber);
8784
try {
8885
Thread.sleep(delay);
8986
delay *= tryBean.getMultiplier();
9087
topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
9188
} catch (InterruptedException e) {
92-
log.error("Try to determine whether the tube topic exists {}", e.getMessage());
89+
log.error("check the tube topic exists error", e);
9390
}
94-
9591
}
96-
log.info("Try to determine whether the tube topic exists ,try number is {}", tryNumber);
92+
9793
AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
9894
addTubeConsumeGroupRequest.setClusterId(clusterId);
9995
addTubeConsumeGroupRequest.setCreateUser(businessInfo.getCreator());

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowService.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ public interface WorkflowService {
3939
/**
4040
* Initiation process
4141
*
42-
* @param name Process name
42+
* @param process Process name
4343
* @param applicant Applicant
4444
* @param form Process form
4545
* @return result
4646
*/
47-
WorkflowResult start(ProcessName name, String applicant, ProcessForm form);
47+
WorkflowResult start(ProcessName process, String applicant, ProcessForm form);
4848

4949
/**
5050
* Cancellation process application
@@ -144,6 +144,6 @@ public interface WorkflowService {
144144
* @param query Query conditions
145145
* @return Execution log
146146
*/
147-
List<WorkflowTaskExecuteLog> listTaskExecuteLogs(WorkflowTaskExecuteLogQuery query);
147+
PageInfo<WorkflowTaskExecuteLog> listTaskExecuteLogs(WorkflowTaskExecuteLogQuery query);
148148

149149
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java

+53-22
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.inlong.manager.common.util.Preconditions;
3232
import org.apache.inlong.manager.service.workflow.WorkflowTaskExecuteLog.ListenerExecutorLog;
3333
import org.apache.inlong.manager.service.workflow.WorkflowTaskExecuteLog.TaskExecutorLog;
34+
import org.apache.inlong.manager.workflow.core.QueryService;
3435
import org.apache.inlong.manager.workflow.core.WorkflowEngine;
3536
import org.apache.inlong.manager.workflow.exception.WorkflowNoRollbackException;
3637
import org.apache.inlong.manager.workflow.model.TaskState;
@@ -85,8 +86,8 @@ private void init() {
8586

8687
@Override
8788
@Transactional(noRollbackFor = WorkflowNoRollbackException.class, rollbackFor = Exception.class)
88-
public WorkflowResult start(ProcessName name, String applicant, ProcessForm form) {
89-
return WorkflowResult.of(workflowEngine.processService().start(name.name(), applicant, form));
89+
public WorkflowResult start(ProcessName process, String applicant, ProcessForm form) {
90+
return WorkflowResult.of(workflowEngine.processService().start(process.name(), applicant, form));
9091
}
9192

9293
@Override
@@ -171,27 +172,57 @@ public TaskSummaryView taskSummary(TaskSummaryQuery query) {
171172
}
172173

173174
@Override
174-
public List<WorkflowTaskExecuteLog> listTaskExecuteLogs(WorkflowTaskExecuteLogQuery query) {
175-
Preconditions.checkNotNull(query, "query params can't be null");
176-
Preconditions.checkNotEmpty(query.getBusinessId(), "business id can't be null");
177-
Preconditions.checkNotEmpty(query.getProcessNames(), "process names can't be null");
178-
179-
List<WorkflowTaskExecuteLog> workflowTaskExecuteLogs = query.getProcessNames().stream()
180-
.map(processName -> ProcessQuery.builder().businessId(query.getBusinessId()).name(processName).build())
181-
.map(workflowEngine.queryService()::listProcess)
182-
.flatMap(List::stream)
183-
.map(WorkflowTaskExecuteLog::buildBaseInfoFromProcessInst)
184-
.collect(Collectors.toList());
175+
public PageInfo<WorkflowTaskExecuteLog> listTaskExecuteLogs(WorkflowTaskExecuteLogQuery query) {
176+
Preconditions.checkNotNull(query, "workflow task execute log query params cannot be null");
177+
Preconditions.checkNotEmpty(query.getBusinessId(), "business id cannot be null");
178+
Preconditions.checkNotEmpty(query.getProcessNames(), "process name list cannot be null");
179+
180+
ProcessQuery processQuery = new ProcessQuery();
181+
processQuery.setBusinessId(query.getBusinessId());
182+
processQuery.setNameList(query.getProcessNames());
183+
processQuery.setHidden(true);
184+
185+
// 分页查询流程实例,构造流程执行日志
186+
QueryService queryService = workflowEngine.queryService();
187+
PageHelper.startPage(query.getPageNum(), query.getPageSize());
188+
Page<ProcessInstance> instanceList = (Page<ProcessInstance>) queryService.listProcess(processQuery);
185189

186-
workflowTaskExecuteLogs.forEach(executeLog -> {
187-
List<TaskExecutorLog> taskExecutorLogs = getTaskExecutorLogs(executeLog.getProcessInstId(),
188-
query.getTaskType());
189-
taskExecutorLogs.forEach(taskExecutorLog -> taskExecutorLog
190-
.setListenerExecutorLogs(getListenerExecutorLogs(taskExecutorLog)));
191-
executeLog.setTaskExecutorLogs(taskExecutorLogs);
192-
}
190+
PageInfo<WorkflowTaskExecuteLog> pageInfo = instanceList.toPageInfo(inst -> WorkflowTaskExecuteLog.builder()
191+
.processInstId(inst.getId())
192+
.processDisplayName(inst.getDisplayName())
193+
.state(inst.getState())
194+
.startTime(inst.getStartTime())
195+
.endTime(inst.getEndTime())
196+
.build()
193197
);
194-
return workflowTaskExecuteLogs;
198+
199+
// 根据流程执行日志,查询流程中各个任务的执行日志
200+
for (WorkflowTaskExecuteLog executeLog : pageInfo.getList()) {
201+
TaskQuery taskQuery = new TaskQuery();
202+
taskQuery.setProcessInstId(executeLog.getProcessInstId());
203+
taskQuery.setType(query.getTaskType());
204+
List<TaskExecutorLog> taskExecutorLogs = queryService.listTask(taskQuery)
205+
.stream()
206+
.map(TaskExecutorLog::buildFromTaskInst)
207+
.collect(Collectors.toList());
208+
209+
// 设置任务的监听器的执行日志
210+
for (TaskExecutorLog taskExecutorLog : taskExecutorLogs) {
211+
EventLogQuery eventLogQuery = new EventLogQuery();
212+
eventLogQuery.setTaskInstId(taskExecutorLog.getTaskInstId());
213+
List<ListenerExecutorLog> logs = queryService.listEventLog(eventLogQuery)
214+
.stream()
215+
.map(ListenerExecutorLog::fromEventLog)
216+
.collect(Collectors.toList());
217+
taskExecutorLog.setListenerExecutorLogs(logs);
218+
}
219+
220+
executeLog.setTaskExecutorLogs(taskExecutorLogs);
221+
}
222+
223+
log.info("success to page list task execute logs for " + query);
224+
pageInfo.setTotal(instanceList.getTotal());
225+
return pageInfo;
195226
}
196227

197228
private List<TaskExecutorLog> getTaskExecutorLogs(Integer processInstId, String taskType) {
@@ -241,7 +272,7 @@ private void addShowInListForEachTask(List<TaskListView> taskList) {
241272
List<Integer> processInstIds = taskList.stream().map(TaskListView::getProcessInstId)
242273
.distinct().collect(Collectors.toList());
243274
List<ProcessInstance> processInstances = this.workflowEngine.queryService().listProcess(
244-
ProcessQuery.builder().ids(processInstIds).build());
275+
ProcessQuery.builder().idList(processInstIds).build());
245276
Map<Integer, Map<String, Object>> process2ShowInListMap = Maps.newHashMap();
246277
processInstances.forEach(p -> process2ShowInListMap.put(p.getId(), getShowInList(p)));
247278
taskList.forEach(task -> task.setShowInList(process2ShowInListMap.get(task.getProcessInstId())));

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowTaskExecuteLog.java

-11
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import lombok.Data;
2727
import lombok.NoArgsConstructor;
2828
import org.apache.inlong.manager.workflow.model.instance.EventLog;
29-
import org.apache.inlong.manager.workflow.model.instance.ProcessInstance;
3029
import org.apache.inlong.manager.workflow.model.instance.TaskInstance;
3130

3231
/**
@@ -57,16 +56,6 @@ public class WorkflowTaskExecuteLog {
5756
@ApiModelProperty("Task execution log")
5857
private List<TaskExecutorLog> taskExecutorLogs;
5958

60-
public static WorkflowTaskExecuteLog buildBaseInfoFromProcessInst(ProcessInstance processInstance) {
61-
return WorkflowTaskExecuteLog.builder()
62-
.processInstId(processInstance.getId())
63-
.processDisplayName(processInstance.getDisplayName())
64-
.state(processInstance.getState())
65-
.startTime(processInstance.getStartTime())
66-
.endTime(processInstance.getEndTime())
67-
.build();
68-
}
69-
7059
@Data
7160
@Builder
7261
@NoArgsConstructor

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowTaskExecuteLogQuery.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,26 @@
1919

2020
import io.swagger.annotations.ApiModel;
2121
import io.swagger.annotations.ApiModelProperty;
22-
23-
import java.util.Set;
24-
22+
import java.util.List;
2523
import lombok.Data;
24+
import lombok.EqualsAndHashCode;
25+
import org.apache.inlong.manager.workflow.model.view.PageQuery;
2626

2727
/**
2828
* Task execution log query
29-
*
3029
*/
3130
@Data
31+
@EqualsAndHashCode(callSuper = false)
3232
@ApiModel("Task execution log query conditions")
33-
public class WorkflowTaskExecuteLogQuery {
33+
public class WorkflowTaskExecuteLogQuery extends PageQuery {
3434

3535
@ApiModelProperty("Business ID")
3636
private String businessId;
3737

3838
@ApiModelProperty("Process name")
39-
private Set<String> processNames;
39+
private List<String> processNames;
4040

4141
@ApiModelProperty("Task type: system task: ServiceTask; user task: UserTask")
4242
private String taskType;
43+
4344
}

inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/WorkflowController.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.swagger.annotations.ApiImplicitParam;
2323
import io.swagger.annotations.ApiImplicitParams;
2424
import io.swagger.annotations.ApiOperation;
25-
import java.util.List;
2625
import lombok.extern.slf4j.Slf4j;
2726
import org.apache.inlong.manager.common.beans.Response;
2827
import org.apache.inlong.manager.common.enums.OperationType;
@@ -162,7 +161,7 @@ public Response<TaskSummaryView> taskSummary(TaskSummaryQuery query) {
162161

163162
@GetMapping("/listTaskExecuteLogs")
164163
@ApiOperation(value = "Get task execution log")
165-
public Response<List<WorkflowTaskExecuteLog>> listTaskExecuteLogs(WorkflowTaskExecuteLogQuery query) {
164+
public Response<PageInfo<WorkflowTaskExecuteLog>> listTaskExecuteLogs(WorkflowTaskExecuteLogQuery query) {
166165
return Response.success(workflowService.listTaskExecuteLogs(query));
167166
}
168167

inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowProcessorExecutorImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private boolean isSkipCurrentElement(Element element, WorkflowContext context) {
123123

124124
private void executeSkipAndNext(Element element, WorkflowContext context) {
125125
if (!(element instanceof SkippableElement)) {
126-
throw new WorkflowException("element not instance of skipable element " + element.getDisplayName());
126+
throw new WorkflowException("element not instance of skip element " + element.getDisplayName());
127127
}
128128

129129
if (!(element instanceof NextableElement)) {
@@ -134,7 +134,7 @@ private void executeSkipAndNext(Element element, WorkflowContext context) {
134134

135135
if (!(processor instanceof SkipAbleElementProcessor)) {
136136
throw new WorkflowException(
137-
"element processor not instance of skipable processor " + element.getDisplayName());
137+
"element processor not instance of skip processor " + element.getDisplayName());
138138
}
139139

140140
// Execute skip logic

inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/view/ProcessQuery.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,23 @@
1717

1818
package org.apache.inlong.manager.workflow.model.view;
1919

20-
import org.apache.inlong.manager.workflow.model.ProcessState;
21-
2220
import io.swagger.annotations.ApiModel;
2321
import io.swagger.annotations.ApiModelProperty;
24-
2522
import java.util.Date;
2623
import java.util.List;
27-
2824
import lombok.AllArgsConstructor;
2925
import lombok.Builder;
3026
import lombok.Data;
27+
import lombok.EqualsAndHashCode;
3128
import lombok.NoArgsConstructor;
32-
29+
import org.apache.inlong.manager.workflow.model.ProcessState;
3330
import org.springframework.format.annotation.DateTimeFormat;
3431

3532
/**
3633
* Process query conditions
3734
*/
3835
@Data
36+
@EqualsAndHashCode(callSuper = false)
3937
@Builder
4038
@NoArgsConstructor
4139
@AllArgsConstructor
@@ -46,10 +44,10 @@ public class ProcessQuery extends PageQuery {
4644
private Integer id;
4745

4846
@ApiModelProperty("process form list of ID")
49-
private List<Integer> ids;
47+
private List<Integer> idList;
5048

51-
@ApiModelProperty("process name")
52-
private String name;
49+
@ApiModelProperty("process form list of name")
50+
private List<String> nameList;
5351

5452
@ApiModelProperty("process display name")
5553
private String displayName;
@@ -87,4 +85,5 @@ public class ProcessQuery extends PageQuery {
8785

8886
@ApiModelProperty("whether to include the form information displayed in the list")
8987
private boolean includeShowInList = true;
88+
9089
}

0 commit comments

Comments
 (0)