Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce persistence barrier for self-messages #164

Merged
merged 1 commit into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ class OrchestrationMessageBatch : InternalReadEvent, TransportAbstraction.IDurab
public string WorkItemId;
public double? WaitingSince; // measures time waiting to execute

readonly PartitionUpdateEvent waitForDequeueCountPersistence;
readonly PartitionUpdateEvent waitForPersistence; // if nonnull, must wait for this event
OrchestrationWorkItem workItem;

public override EventId EventId => EventId.MakePartitionInternalEventId(this.WorkItemId);

public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition, PartitionUpdateEvent filingEvent)
public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition, PartitionUpdateEvent filingEvent, bool waitForPersistence)
{
this.InstanceId = instanceId;
this.SessionId = session.SessionId;
Expand All @@ -46,12 +46,12 @@ public OrchestrationMessageBatch(string instanceId, SessionsState.Session sessio

session.CurrentBatch = this;

if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem || filingEvent is RecoveryCompleted)
if (waitForPersistence || partition.Settings.PersistDequeueCountBeforeStartingWorkItem)
{
this.waitForDequeueCountPersistence = filingEvent;
this.waitForPersistence = filingEvent;
}

partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId}");
partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId} waitForPersistence={this.waitForPersistence}");

// continue when we have the history state loaded, which gives us the latest state and/or cursor
partition.SubmitEvent(this);
Expand Down Expand Up @@ -144,10 +144,10 @@ public override void OnReadComplete(TrackedObject s, Partition partition)
}
else
{
if (this.waitForDequeueCountPersistence != null)
if (this.waitForPersistence != null)
{
// process the work item once the filing event is persisted
DurabilityListeners.Register(this.waitForDequeueCountPersistence, this);
DurabilityListeners.Register(this.waitForPersistence, this);
}
else
{
Expand Down
44 changes: 27 additions & 17 deletions src/DurableTask.Netherite/PartitionState/SessionsState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public override void Process(RecoveryCompleted evt, EffectTracker effects)

if (!effects.IsReplaying) // during replay, we don't start work items until end of recovery
{
new OrchestrationMessageBatch(kvp.Key, kvp.Value, this.Partition, evt);
// submit a message batch for processing
new OrchestrationMessageBatch(kvp.Key, kvp.Value, this.Partition, evt, waitForPersistence: true);
}
}

Expand Down Expand Up @@ -144,12 +145,13 @@ void AddMessageToSession(TaskMessage message, string originWorkItemId, bool isRe

if (!isReplaying) // during replay, we don't start work items until end of recovery
{
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent);
// submit a message batch for processing
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent, waitForPersistence: false);
}
}
}

void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerable<TaskMessage> messages, bool isReplaying, PartitionUpdateEvent filingEvent)
Session AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerable<TaskMessage> messages, bool isReplaying, PartitionUpdateEvent filingEvent)
{
this.Partition.Assert(!string.IsNullOrEmpty(originWorkItemId), "null originWorkItem");
int? forceNewExecution = FindLastExecutionStartedEvent(messages);
Expand All @@ -159,7 +161,7 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl
// A session for this instance already exists, so a work item is in progress already.
// We don't need to schedule a work item because we'll notice the new messages
// when the previous work item completes.
foreach(var message in messages)
foreach (var message in messages)
{
if (!isReplaying)
{
Expand All @@ -185,12 +187,12 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl
}
messages = messages.Skip(forceNewExecution.Value);
}

// Create a new session
this.Sessions[instanceId] = session = new Session()
{
SessionId = this.SequenceNumber++,
Batch = new List<(TaskMessage,string)>(),
Batch = new List<(TaskMessage, string)>(),
BatchStartPosition = 0,
DequeueCount = 1,
ForceNewExecution = forceNewExecution.HasValue,
Expand All @@ -208,9 +210,11 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl

if (!isReplaying) // we don't start work items until end of recovery
{
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent);
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent, waitForPersistence: false);
}
}

return session;
}

static int? FindLastExecutionStartedEvent(IEnumerable<TaskMessage> messages)
Expand Down Expand Up @@ -331,6 +335,9 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
return;
};

// detect loopback messages, to guarantee that they act as a persistence barrier
bool containsLoopbackMessages = false;

if (!evt.NotExecutable)
{

Expand All @@ -354,7 +361,12 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
{
foreach (var group in evt.LocalMessages.GroupBy(tm => tm.OrchestrationInstance.InstanceId))
{
this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying, evt);
var targetSession = this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying, evt);

if (targetSession == session)
{
containsLoopbackMessages = true;
}
}
}

Expand All @@ -370,22 +382,20 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
session.BatchStartPosition += evt.BatchLength;
session.DequeueCount = 1;

this.StartNewBatchIfNeeded(session, effects, evt.InstanceId, effects.IsReplaying, evt);
}

void StartNewBatchIfNeeded(Session session, EffectTracker effects, string instanceId, bool isReplaying, PartitionUpdateEvent filingEvent)
{
// start a new batch if needed
if (session.Batch.Count == 0)
{
// no more pending messages for this instance, so we delete the session.
this.Sessions.Remove(instanceId);
this.Sessions.Remove(evt.InstanceId);
}
else
{
if (!isReplaying) // we don't start work items until end of recovery
// there are more messages to process

if (!effects.IsReplaying) // we don't start work items until end of recovery
{
// there are more messages. Start another work item.
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent);
// submit a message batch for processing
new OrchestrationMessageBatch(evt.InstanceId, session, this.Partition, evt, waitForPersistence: containsLoopbackMessages);
}
}
}
Expand Down