Skip to content

Commit

Permalink
Fix compute nodes stuck at bidding (#921)
Browse files Browse the repository at this point in the history
Co-authored-by: Kai Davenport <kaiyadavenport@gmail.com>
  • Loading branch information
wdbaruni and binocarlos authored Oct 21, 2022
1 parent 8c5e798 commit 6518704
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 37 deletions.
2 changes: 2 additions & 0 deletions pkg/computenode/computenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ func (n *ComputeNode) triggerStateTransition(ctx context.Context, event model.Jo
shardState.Publish(ctx)
case model.JobEventResultsRejected:
shardState.ResultsRejected(ctx)
case model.JobEventInvalidRequest:
shardState.FailSilently(ctx, "Request rejected due to: "+event.Status)
}
} else {
log.Ctx(ctx).Debug().Msgf("Received %s for unknown shard %s", event.EventName, shard)
Expand Down
43 changes: 31 additions & 12 deletions pkg/computenode/shard_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func (a shardStateAction) String() string {

// request to change the state of the fsm
type shardStateRequest struct {
action shardStateAction
reason string
action shardStateAction
reason string
skipNotifyOnFailure bool
}

// types of shard state machines
Expand Down Expand Up @@ -220,11 +221,11 @@ type shardStateMachine struct {

currentState shardStateType
previousState shardStateType
latestRequest *shardStateRequest

runOutput *model.RunCommandResult
resultProposal []byte
cancellationMsg string
errorMsg string
runOutput *model.RunCommandResult
resultProposal []byte
errorMsg string

notifyOnFailure bool
}
Expand Down Expand Up @@ -284,10 +285,17 @@ func (m *shardStateMachine) Cancel(ctx context.Context, reason string) {
m.sendRequest(ctx, shardStateRequest{action: actionCancel, reason: reason})
}

// Move to an error state, and notify requester node if a bid was already published.
func (m *shardStateMachine) Fail(ctx context.Context, reason string) {
m.sendRequest(ctx, shardStateRequest{action: actionFail, reason: reason})
}

// Move to an error state without publishing an error to the requester node. This is used when the requester node
// rejects an invalid request from this compute node, and we don't want to publish an error to the requester node.
func (m *shardStateMachine) FailSilently(ctx context.Context, reason string) {
m.sendRequest(ctx, shardStateRequest{action: actionFail, reason: reason, skipNotifyOnFailure: true})
}

// send a request to the state machine by enquing it in the request channel.
// it is possible due to race condition or duplicate network events that a
// request is sent after the fsm is completed and no longer a goroutin is
Expand All @@ -304,6 +312,18 @@ func (m *shardStateMachine) sendRequest(ctx context.Context, request shardStateR
m.req <- request
}

// Read request from the channel and return it.
// The method also caches the latest request to enable the state machine to use the additional information it holds.
func (m *shardStateMachine) readRequest(ctx context.Context) *shardStateRequest {
select {
case request := <-m.req:
m.latestRequest = &request
case <-ctx.Done():
m.latestRequest = &shardStateRequest{action: actionFail, reason: "context canceled"}
}
return m.latestRequest
}

type StateFn func(context.Context, *shardStateMachine) StateFn

func (m *shardStateMachine) transitionedTo(ctx context.Context, newState shardStateType, reasons ...string) {
Expand All @@ -326,7 +346,7 @@ func enqueuedState(ctx context.Context, m *shardStateMachine) StateFn {
m.transitionedTo(ctx, shardEnqueued)

for {
req := <-m.req
req := m.readRequest(ctx)
switch req.action {
case actionBid:
err := m.node.notifyBidJob(ctx, m.Shard)
Expand All @@ -341,7 +361,6 @@ func enqueuedState(ctx context.Context, m *shardStateMachine) StateFn {

return biddingState
case actionCancel:
m.cancellationMsg = req.reason
return cancelledState
case actionFail:
m.errorMsg = req.reason
Expand All @@ -357,7 +376,7 @@ func biddingState(ctx context.Context, m *shardStateMachine) StateFn {
m.transitionedTo(ctx, shardBidding)

for {
req := <-m.req
req := m.readRequest(ctx)
switch req.action {
case actionRun:
return runningState
Expand Down Expand Up @@ -432,7 +451,7 @@ func verifyingResultsState(ctx context.Context, m *shardStateMachine) StateFn {
system.AddJobIDFromBaggageToSpan(ctx, span)

for {
req := <-m.req
req := m.readRequest(ctx)
switch req.action {
case actionPublish:
return publishingToRequesterState
Expand Down Expand Up @@ -481,7 +500,7 @@ func errorState(ctx context.Context, m *shardStateMachine) StateFn {
ctx = system.AddJobIDToBaggage(ctx, m.Shard.Job.ID)
system.AddJobIDFromBaggageToSpan(ctx, span)

if m.notifyOnFailure {
if m.notifyOnFailure && !m.latestRequest.skipNotifyOnFailure {
// we sent a bid, so we need to publish our failure to the network
err := m.node.notifyShardError(
ctx,
Expand All @@ -498,7 +517,7 @@ func errorState(ctx context.Context, m *shardStateMachine) StateFn {
}

func cancelledState(ctx context.Context, m *shardStateMachine) StateFn {
m.transitionedTo(ctx, shardCancelled, m.cancellationMsg)
m.transitionedTo(ctx, shardCancelled, m.latestRequest.reason)
// no notifications need to be sent here as you can only cancel a shard before a bid is sent.
return completedState
}
Expand Down
15 changes: 2 additions & 13 deletions pkg/job/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,26 +392,15 @@ func HasShardReachedCapacity(ctx context.Context, j *model.Job, jobState model.J
return false
}

bidsSeen := 0
acceptedBidsSeen := 0

for _, shardState := range shardStates { //nolint:gocritic
if shardState.State == model.JobStateBidding {
bidsSeen++
} else if shardState.State == model.JobStateWaiting {
if shardState.State.HasPassedBidAcceptedStage() {
acceptedBidsSeen++
}
}

if acceptedBidsSeen >= j.Deal.Concurrency {
return true
}

if bidsSeen >= j.Deal.Concurrency*2 {
return true
}

return false
return acceptedBidsSeen >= j.Deal.Concurrency
}

// group states by shard index so we can easily iterate over a whole set of them
Expand Down
6 changes: 5 additions & 1 deletion pkg/model/job_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (s JobStateType) IsComplete() bool {
return s == JobStateCompleted || s == JobStateError
}

func (s JobStateType) HasPassedBidAcceptedStage() bool {
return s == JobStateWaiting || s == JobStateRunning || s == JobStateVerifying || s == JobStateError || s == JobStateCompleted
}

func (s JobStateType) IsError() bool {
return s == JobStateError
}
Expand Down Expand Up @@ -128,7 +132,7 @@ func GetStateFromEvent(eventType JobEventType) JobStateType {
return JobStateRunning

// yikes
case JobEventError, JobEventComputeError:
case JobEventError, JobEventComputeError, JobEventInvalidRequest:
return JobStateError

// we are complete
Expand Down
9 changes: 8 additions & 1 deletion pkg/model/jobeventtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ const (
// a requester node declared an error running a job
JobEventError

// the requester node gives a compute node permission
// to forget about the job and free any resources it might
// currently be reserving - this can happen if a compute node
// bids when a job has completed - if the compute node does
// not hear back it will be stuck in reserving the resources for the job
JobEventInvalidRequest

jobEventDone // must be last
)

Expand All @@ -69,7 +76,7 @@ func (je JobEventType) IsTerminal() bool {
// ignore the rest of the job's lifecycle. This is the case for events caused
// by a node's bid being rejected.
func (je JobEventType) IsIgnorable() bool {
return je.IsTerminal() || je == JobEventComputeError || je == JobEventBidRejected
return je.IsTerminal() || je == JobEventComputeError || je == JobEventBidRejected || je == JobEventInvalidRequest
}

func ParseJobEventType(str string) (JobEventType, error) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/model/jobeventtype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions pkg/requesternode/requesternode.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ func (node *RequesterNode) triggerStateTransition(ctx context.Context, event mod
}
} else {
log.Ctx(ctx).Debug().Msgf("Received %s for unknown shard %s", event.EventName, shard)
if err := node.notifyShardInvalidRequest(ctx, shard, event.SourceNodeID, "shard state not found"); err != nil {
log.Ctx(ctx).Warn().Msgf(
"Received %s for unknown shard %s, and failed to notify the source node %s",
event.EventName, shard, event.SourceNodeID)
}
}
return nil
}
Expand Down Expand Up @@ -266,6 +271,18 @@ func (node *RequesterNode) notifyShardError(
return node.jobEventPublisher.HandleJobEvent(ctx, ev)
}

func (node *RequesterNode) notifyShardInvalidRequest(
ctx context.Context,
shard model.JobShard,
targetNodeID string,
status string,
) error {
ev := node.constructShardEvent(shard, model.JobEventInvalidRequest)
ev.Status = status
ev.TargetNodeID = targetNodeID
return node.jobEventPublisher.HandleJobEvent(ctx, ev)
}

func (node *RequesterNode) constructJobEvent(jobID string, eventName model.JobEventType) model.JobEvent {
return model.JobEvent{
SourceNodeID: node.ID,
Expand Down
23 changes: 16 additions & 7 deletions pkg/requesternode/shard_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,21 @@ func (m *shardStateMachine) resultsPublished(ctx context.Context, sourceNodeID s
func (m *shardStateMachine) sendRequest(ctx context.Context, request shardStateRequest) {
defer func() {
if r := recover(); r != nil {
log.Ctx(ctx).Warn().Msgf("%s ignoring action after channel closed: %s", m, request.action)
go m.notifyInvalidRequest(ctx, request, "shard fsm is completed")
}
}()
m.req <- request
}

// Notify the compute node that the request is invalid.
func (m *shardStateMachine) notifyInvalidRequest(ctx context.Context, request shardStateRequest, reason string) {
log.Ctx(ctx).Warn().Msgf("%s ignoring request due to `%s`: %+v", m, reason, request)

if err := m.node.notifyShardInvalidRequest(ctx, m.shard, request.sourceNodeID, reason); err != nil {
log.Ctx(ctx).Warn().Msgf("%s failed to notify invalid request `%s`: %+v due to %s", m, reason, request, err)
}
}

// ------------------------------------
// Shard State Machine Functions
// ------------------------------------
Expand Down Expand Up @@ -224,7 +233,7 @@ func enqueuedState(ctx context.Context, m *shardStateMachine) stateFn {
log.Ctx(ctx).Warn().Msgf("%s ignoring duplicate bid from %s", m, req.sourceNodeID)
}
default:
log.Ctx(ctx).Warn().Msgf("%s ignoring unknown action: %s", m, req.action)
m.notifyInvalidRequest(ctx, req, fmt.Sprintf("invalid action %s in state %s", req.action, m.currentState))
}
}
}
Expand Down Expand Up @@ -297,10 +306,10 @@ func acceptingBidsState(ctx context.Context, m *shardStateMachine) stateFn {
if _, ok := m.biddingNodes[req.sourceNodeID]; ok {
m.completedNodes[req.sourceNodeID] = struct{}{}
} else {
log.Ctx(ctx).Warn().Msgf("%s ignoring result from %s", m, req.sourceNodeID)
m.notifyInvalidRequest(ctx, req, "results received from a non-bidding node")
}
default:
log.Ctx(ctx).Warn().Msgf("%s ignoring unknown action: %s", m, req.action)
m.notifyInvalidRequest(ctx, req, fmt.Sprintf("invalid action %s in state %s", req.action, m.currentState))
}
}
}
Expand Down Expand Up @@ -329,10 +338,10 @@ func waitingForResultsState(ctx context.Context, m *shardStateMachine) stateFn {
return verifyingResultsState
}
} else {
log.Ctx(ctx).Warn().Msgf("%s ignoring result from %s", m, req.sourceNodeID)
m.notifyInvalidRequest(ctx, req, "results received from a non-bidding node")
}
default:
log.Ctx(ctx).Warn().Msgf("%s ignoring unknown action: %s", m, req.action)
m.notifyInvalidRequest(ctx, req, fmt.Sprintf("invalid action %s in state %s", req.action, m.currentState))
}
}
}
Expand Down Expand Up @@ -365,7 +374,7 @@ func waitingToPublishResultsState(ctx context.Context, m *shardStateMachine) sta
// publish the result and not all the compute nodes.
return completedState
default:
log.Ctx(ctx).Warn().Msgf("%s ignoring unknown action: %s", m, req.action)
m.notifyInvalidRequest(ctx, req, fmt.Sprintf("invalid action %s in state %s", req.action, m.currentState))
}
}
}
Expand Down

0 comments on commit 6518704

Please sign in to comment.