Skip to content

Commit

Permalink
Temporal client method to restart failed workflow (#15051)
Browse files Browse the repository at this point in the history
* Implementation and test

* TMP save cahnge

* move to singleton

* Update test with the connectionUtils being injected

* Rm unwanted change

* Use UUID instead of string

* Minor typo

* use right method name

* Fix build
  • Loading branch information
benmoriceau authored Aug 11, 2022
1 parent f4cbfa0 commit 246df1f
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@
@Slf4j
public class ConnectionManagerUtils {

private static final ConnectionManagerUtils instance = new ConnectionManagerUtils();

private ConnectionManagerUtils() {}

public static ConnectionManagerUtils getInstance() {
return instance;
}

/**
* Attempts to send a signal to the existing ConnectionManagerWorkflow for the provided connection.
*
Expand All @@ -44,9 +52,9 @@ public class ConnectionManagerUtils {
* @return the healthy connection manager workflow that was signaled
* @throws DeletedWorkflowException if the connection manager workflow was deleted
*/
static ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc> signalMethod)
ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc> signalMethod)
throws DeletedWorkflowException {
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.empty());
}
Expand All @@ -66,10 +74,10 @@ static ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final Workfl
* @return the healthy connection manager workflow that was signaled
* @throws DeletedWorkflowException if the connection manager workflow was deleted
*/
static <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc1<T>> signalMethod,
final T signalArgument)
<T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, Proc1<T>> signalMethod,
final T signalArgument)
throws DeletedWorkflowException {
return signalWorkflowAndRepairIfNecessary(client, connectionId, signalMethod, Optional.of(signalArgument));
}
Expand All @@ -79,10 +87,10 @@ static <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final Wo
// Keeping this private and only exposing the above methods outside this class provides a strict
// type enforcement for external calls, and means this method can assume consistent type
// implementations for both cases.
private static <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, ? extends TemporalFunctionalInterfaceMarker> signalMethod,
final Optional<T> signalArgument)
private <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(final WorkflowClient client,
final UUID connectionId,
final Function<ConnectionManagerWorkflow, ? extends TemporalFunctionalInterfaceMarker> signalMethod,
final Optional<T> signalArgument)
throws DeletedWorkflowException {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionManagerWorkflow(client, connectionId);
Expand Down Expand Up @@ -129,10 +137,10 @@ private static <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(
}
}

static void safeTerminateWorkflow(final WorkflowClient client, final UUID connectionId, final String reason) {
log.info("Attempting to terminate existing workflow for connection {}.", connectionId);
void safeTerminateWorkflow(final WorkflowClient client, final String workflowId, final String reason) {
log.info("Attempting to terminate existing workflow for workflowId {}.", workflowId);
try {
client.newUntypedWorkflowStub(getConnectionManagerName(connectionId)).terminate(reason);
client.newUntypedWorkflowStub(workflowId).terminate(reason);
} catch (final Exception e) {
log.warn(
"Could not terminate temporal workflow due to the following error; "
Expand All @@ -141,7 +149,11 @@ static void safeTerminateWorkflow(final WorkflowClient client, final UUID connec
}
}

static ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
void safeTerminateWorkflow(final WorkflowClient client, final UUID connectionId, final String reason) {
safeTerminateWorkflow(client, getConnectionManagerName(connectionId), reason);
}

ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput input = buildStartWorkflowInput(connectionId);
WorkflowClient.start(connectionManagerWorkflow::run, input);
Expand All @@ -157,7 +169,7 @@ static ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowCl
* @throws DeletedWorkflowException if the workflow was deleted, according to the workflow state
* @throws UnreachableWorkflowException if the workflow is in an unreachable state
*/
static ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClient client, final UUID connectionId)
ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClient client, final UUID connectionId)
throws DeletedWorkflowException, UnreachableWorkflowException {

final ConnectionManagerWorkflow connectionManagerWorkflow;
Expand Down Expand Up @@ -193,7 +205,7 @@ static ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClie
return connectionManagerWorkflow;
}

static boolean isWorkflowStateRunning(final WorkflowClient client, final UUID connectionId) {
boolean isWorkflowStateRunning(final WorkflowClient client, final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class,
getConnectionManagerName(connectionId));
Expand All @@ -203,7 +215,7 @@ static boolean isWorkflowStateRunning(final WorkflowClient client, final UUID co
}
}

static WorkflowExecutionStatus getConnectionManagerWorkflowStatus(final WorkflowClient workflowClient, final UUID connectionId) {
WorkflowExecutionStatus getConnectionManagerWorkflowStatus(final WorkflowClient workflowClient, final UUID connectionId) {
final DescribeWorkflowExecutionRequest describeWorkflowExecutionRequest = DescribeWorkflowExecutionRequest.newBuilder()
.setExecution(WorkflowExecution.newBuilder()
.setWorkflowId(getConnectionManagerName(connectionId))
Expand All @@ -216,25 +228,25 @@ static WorkflowExecutionStatus getConnectionManagerWorkflowStatus(final Workflow
return describeWorkflowExecutionResponse.getWorkflowExecutionInfo().getStatus();
}

static long getCurrentJobId(final WorkflowClient client, final UUID connectionId) {
long getCurrentJobId(final WorkflowClient client, final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId);
final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionManagerWorkflow(client, connectionId);
return connectionManagerWorkflow.getJobInformation().getJobId();
} catch (final Exception e) {
return ConnectionManagerWorkflowImpl.NON_RUNNING_JOB_ID;
}
}

static ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) {
ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) {
return client.newWorkflowStub(ConnectionManagerWorkflow.class,
TemporalUtils.getWorkflowOptionsWithWorkflowId(TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)));
}

static String getConnectionManagerName(final UUID connectionId) {
String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

public static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) {
public ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) {
return ConnectionUpdaterInput.builder()
.connectionId(connectionId)
.jobId(null)
Expand Down
Loading

0 comments on commit 246df1f

Please sign in to comment.