From 12ba21ddab9d596c78d3bddcea03b2f76c567fa9 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 14 Aug 2024 14:53:31 -0700 Subject: [PATCH 1/2] Add reproing test --- .../workflow/machines/workflow_machines.rs | 1 + sdk/src/workflow_context.rs | 11 +++- sdk/src/workflow_future.rs | 18 +++++- .../workflow_tests/upsert_search_attrs.rs | 55 ++++++++++++++----- 4 files changed, 68 insertions(+), 17 deletions(-) diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index 85e055f7d..e6df93bea 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -1152,6 +1152,7 @@ impl WorkflowMachines { ); } ProtoCmdAttrs::UpsertWorkflowSearchAttributesCommandAttributes(attrs) => { + // TODO: Update here self.add_cmd_to_wf_task( upsert_search_attrs_internal(attrs), CommandIdKind::NeverResolves, diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index 9736c7a32..11d4b30c6 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -13,11 +13,12 @@ use crate::{ }; use crossbeam_channel::{Receiver, Sender}; use futures::{task::Context, FutureExt, Stream, StreamExt}; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockReadGuard}; use std::{ collections::HashMap, future::Future, marker::PhantomData, + ops::Deref, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, @@ -40,7 +41,7 @@ use temporal_sdk_core_protos::{ SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes, }, }, - temporal::api::common::v1::{Memo, Payload}, + temporal::api::common::v1::{Memo, Payload, SearchAttributes}, }; use tokio::sync::{mpsc, oneshot, watch}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -123,6 +124,11 @@ impl WfContext { self.shared.read().current_build_id.clone() } + /// Return current values for workflow search attributes + pub fn search_attributes(&self) -> impl Deref + '_ { + RwLockReadGuard::map(self.shared.read(), |s| &s.search_attributes) + } + /// A future that resolves if/when the workflow is cancelled pub async fn cancelled(&self) { if *self.am_cancelled.borrow() { @@ -412,6 +418,7 @@ pub(crate) struct WfContextSharedData { pub(crate) wf_time: Option, pub(crate) history_length: u32, pub(crate) current_build_id: Option, + pub(crate) search_attributes: SearchAttributes, } /// Helper Wrapper that can drain the channel into a Vec in a blocking way. Useful diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index d6f4f61b1..3add43f4b 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -176,7 +176,8 @@ impl WorkflowFuture { if let Some(v) = variant { match v { Variant::StartWorkflow(_) => { - // TODO: Can assign randomness seed whenever needed + // Don't do anything in here. Start workflow is looked at earlier, before + // jobs are handled, and may have information taken out of it to avoid clones. } Variant::FireTimer(FireTimer { seq }) => { self.unblock(UnblockEvent::Timer(seq, TimerResult::Fired))? @@ -311,7 +312,7 @@ impl Future for WorkflowFuture { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { 'activations: loop { // WF must always receive an activation first before responding with commands - let activation = match self.incoming_activations.poll_recv(cx) { + let mut activation = match self.incoming_activations.poll_recv(cx) { Poll::Ready(a) => match a { Some(act) => act, None => { @@ -339,6 +340,18 @@ impl Future for WorkflowFuture { let mut die_of_eviction_when_done = false; let mut activation_cmds = vec![]; + // Assign initial state from start workflow job + if let Some(start_info) = activation.jobs.iter_mut().find_map(|j| { + if let Some(Variant::StartWorkflow(s)) = j.variant.as_mut() { + Some(s) + } else { + None + } + }) { + // TODO: Can assign randomness seed whenever needed + self.wf_ctx.shared.write().search_attributes = + dbg!(start_info.search_attributes.take().unwrap_or_default()); + }; // Lame hack to avoid hitting "unregistered" update handlers in a situation where // the history has no commands until an update is accepted. Will go away w/ SDK redesign if activation @@ -352,6 +365,7 @@ impl Future for WorkflowFuture { ) }) { + dbg!("Polling first time"); // Poll the workflow future once to get things registered if self.poll_wf_future(cx, &run_id, &mut activation_cmds)? { continue; diff --git a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs index 18980a444..d273d567e 100644 --- a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs +++ b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs @@ -1,7 +1,13 @@ -use std::{collections::HashMap, env}; -use temporal_client::{WorkflowClientTrait, WorkflowOptions}; -use temporal_sdk::{WfContext, WorkflowResult}; -use temporal_sdk_core_protos::coresdk::{AsJsonPayloadExt, FromJsonPayloadExt}; +use assert_matches::assert_matches; +use std::{collections::HashMap, env, time::Duration}; +use temporal_client::{ + GetWorkflowResultOpts, WfClientExt, WorkflowClientTrait, WorkflowExecutionResult, + WorkflowOptions, +}; +use temporal_sdk::{WfContext, WfExitValue, WorkflowResult}; +use temporal_sdk_core_protos::coresdk::{ + workflow_commands::ContinueAsNewWorkflowExecution, AsJsonPayloadExt, FromJsonPayloadExt, +}; use temporal_sdk_core_test_utils::{CoreWfStarter, INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR}; use tracing::warn; use uuid::Uuid; @@ -12,11 +18,28 @@ static TXT_ATTR: &str = "CustomTextField"; static INT_ATTR: &str = "CustomIntField"; async fn search_attr_updater(ctx: WfContext) -> WorkflowResult<()> { + let mut int_val = ctx + .search_attributes() + .indexed_fields + .get(INT_ATTR) + .cloned() + .unwrap_or_default(); + dbg!(&int_val); + let orig_val = int_val.data[0]; + int_val.data[0] += 1; ctx.upsert_search_attributes([ - (TXT_ATTR.to_string(), "goodbye".as_json_payload().unwrap()), - (INT_ATTR.to_string(), 98.as_json_payload().unwrap()), + (TXT_ATTR.to_string(), "goodbye".as_json_payload()?), + (INT_ATTR.to_string(), int_val), ]); - Ok(().into()) + dbg!(orig_val); + // 49 is ascii 1 + if orig_val == 49 { + Ok(WfExitValue::ContinueAsNew(Box::new( + ContinueAsNewWorkflowExecution::default(), + ))) + } else { + Ok(().into()) + } } #[tokio::test] @@ -33,7 +56,7 @@ async fn sends_upsert() { } worker.register_wf(wf_name, search_attr_updater); - let run_id = worker + worker .submit_wf( wf_id.to_string(), wf_name, @@ -43,6 +66,7 @@ async fn sends_upsert() { (TXT_ATTR.to_string(), "hello".as_json_payload().unwrap()), (INT_ATTR.to_string(), 1.as_json_payload().unwrap()), ])), + execution_timeout: Some(Duration::from_secs(4)), ..Default::default() }, ) @@ -50,10 +74,9 @@ async fn sends_upsert() { .unwrap(); worker.run_until_done().await.unwrap(); - let search_attrs = starter - .get_client() - .await - .describe_workflow_execution(wf_id.to_string(), Some(run_id)) + let client = starter.get_client().await; + let search_attrs = client + .describe_workflow_execution(wf_id.to_string(), None) .await .unwrap() .workflow_execution_info @@ -70,5 +93,11 @@ async fn sends_upsert() { "goodbye", String::from_json_payload(txt_attr_payload).unwrap() ); - assert_eq!(98, usize::from_json_payload(int_attr_payload).unwrap()); + assert_eq!(2, usize::from_json_payload(int_attr_payload).unwrap()); + let handle = client.get_untyped_workflow_handle(wf_id.to_string(), ""); + let res = handle + .get_workflow_result(GetWorkflowResultOpts::default()) + .await + .unwrap(); + assert_matches!(res, WorkflowExecutionResult::Succeeded(_)); } From d611940ab16a3bd987146cc3018522dfffeb454d Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 14 Aug 2024 17:03:20 -0700 Subject: [PATCH 2/2] Fix --- core/src/worker/workflow/driven_workflow.rs | 30 +++++++++++++++++-- .../workflow/machines/workflow_machines.rs | 16 +++++----- sdk/src/workflow_future.rs | 3 +- .../workflow_tests/upsert_search_attrs.rs | 12 ++------ 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/core/src/worker/workflow/driven_workflow.rs b/core/src/worker/workflow/driven_workflow.rs index 6042b0787..7c9966b25 100644 --- a/core/src/worker/workflow/driven_workflow.rs +++ b/core/src/worker/workflow/driven_workflow.rs @@ -3,10 +3,13 @@ use crate::{ worker::workflow::{OutgoingJob, WFCommand, WorkflowStartedInfo}, }; use prost_types::Timestamp; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::{ + collections::HashMap, + sync::mpsc::{self, Receiver, Sender}, +}; use temporal_sdk_core_protos::{ coresdk::workflow_activation::{start_workflow_from_attribs, WorkflowActivationJob}, - temporal::api::history::v1::WorkflowExecutionStartedEventAttributes, + temporal::api::{common::v1::Payload, history::v1::WorkflowExecutionStartedEventAttributes}, utilities::TryIntoOrNone, }; @@ -14,6 +17,7 @@ use temporal_sdk_core_protos::{ /// command responses pulled out. pub(crate) struct DrivenWorkflow { started_attrs: Option, + search_attribute_modifications: HashMap, incoming_commands: Receiver>, /// Outgoing activation jobs that need to be sent to the lang sdk outgoing_wf_activation_jobs: Vec, @@ -25,6 +29,7 @@ impl DrivenWorkflow { ( Self { started_attrs: None, + search_attribute_modifications: Default::default(), incoming_commands: rx, outgoing_wf_activation_jobs: vec![], }, @@ -85,4 +90,25 @@ impl DrivenWorkflow { debug!(in_cmds = %in_cmds.display(), "wf bridge iteration fetch"); in_cmds } + + /// Lang sent us an SA upsert command - use it to update our current view of search attributes. + pub(crate) fn search_attributes_update(&mut self, update: HashMap) { + self.search_attribute_modifications.extend(update); + } + + /// Return a view of the "current" state of search attributes. IE: The initial attributes + /// plus any changes during the lifetime of the workflow. + pub(crate) fn get_current_search_attributes(&self) -> HashMap { + let mut retme = self + .started_attrs + .as_ref() + .and_then(|si| si.search_attrs.as_ref().map(|sa| sa.indexed_fields.clone())) + .unwrap_or_default(); + retme.extend( + self.search_attribute_modifications + .iter() + .map(|(a, b)| (a.clone(), b.clone())), + ); + retme + } } diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index e6df93bea..5fc26c7b6 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -1152,7 +1152,9 @@ impl WorkflowMachines { ); } ProtoCmdAttrs::UpsertWorkflowSearchAttributesCommandAttributes(attrs) => { - // TODO: Update here + // We explicitly do not update the workflows current SAs here since + // core-generated upserts aren't meant to be modified or used within + // workflows by users (but rather, just for them to search with). self.add_cmd_to_wf_task( upsert_search_attrs_internal(attrs), CommandIdKind::NeverResolves, @@ -1263,6 +1265,8 @@ impl WorkflowMachines { self.add_cmd_to_wf_task(new_timer(attrs), CommandID::Timer(seq).into()); } WFCommand::UpsertSearchAttributes(attrs) => { + self.drive_me + .search_attributes_update(attrs.search_attributes.clone()); self.add_cmd_to_wf_task( upsert_search_attrs( attrs, @@ -1539,17 +1543,13 @@ impl WorkflowMachines { .map(Into::into) .unwrap_or_default(); } - if attrs.search_attributes.is_empty() { - attrs.search_attributes = started_info - .search_attrs - .clone() - .map(Into::into) - .unwrap_or_default(); - } if attrs.retry_policy.is_none() { attrs.retry_policy.clone_from(&started_info.retry_policy); } } + if attrs.search_attributes.is_empty() { + attrs.search_attributes = self.drive_me.get_current_search_attributes(); + } attrs } diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index 3add43f4b..deb977d9c 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -350,7 +350,7 @@ impl Future for WorkflowFuture { }) { // TODO: Can assign randomness seed whenever needed self.wf_ctx.shared.write().search_attributes = - dbg!(start_info.search_attributes.take().unwrap_or_default()); + start_info.search_attributes.take().unwrap_or_default(); }; // Lame hack to avoid hitting "unregistered" update handlers in a situation where // the history has no commands until an update is accepted. Will go away w/ SDK redesign @@ -365,7 +365,6 @@ impl Future for WorkflowFuture { ) }) { - dbg!("Polling first time"); // Poll the workflow future once to get things registered if self.poll_wf_future(cx, &run_id, &mut activation_cmds)? { continue; diff --git a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs index d273d567e..4bc797b50 100644 --- a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs +++ b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs @@ -5,9 +5,7 @@ use temporal_client::{ WorkflowOptions, }; use temporal_sdk::{WfContext, WfExitValue, WorkflowResult}; -use temporal_sdk_core_protos::coresdk::{ - workflow_commands::ContinueAsNewWorkflowExecution, AsJsonPayloadExt, FromJsonPayloadExt, -}; +use temporal_sdk_core_protos::coresdk::{AsJsonPayloadExt, FromJsonPayloadExt}; use temporal_sdk_core_test_utils::{CoreWfStarter, INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR}; use tracing::warn; use uuid::Uuid; @@ -24,19 +22,15 @@ async fn search_attr_updater(ctx: WfContext) -> WorkflowResult<()> { .get(INT_ATTR) .cloned() .unwrap_or_default(); - dbg!(&int_val); let orig_val = int_val.data[0]; int_val.data[0] += 1; ctx.upsert_search_attributes([ (TXT_ATTR.to_string(), "goodbye".as_json_payload()?), (INT_ATTR.to_string(), int_val), ]); - dbg!(orig_val); // 49 is ascii 1 if orig_val == 49 { - Ok(WfExitValue::ContinueAsNew(Box::new( - ContinueAsNewWorkflowExecution::default(), - ))) + Ok(WfExitValue::ContinueAsNew(Box::default())) } else { Ok(().into()) } @@ -93,7 +87,7 @@ async fn sends_upsert() { "goodbye", String::from_json_payload(txt_attr_payload).unwrap() ); - assert_eq!(2, usize::from_json_payload(int_attr_payload).unwrap()); + assert_eq!(3, usize::from_json_payload(int_attr_payload).unwrap()); let handle = client.get_untyped_workflow_handle(wf_id.to_string(), ""); let res = handle .get_workflow_result(GetWorkflowResultOpts::default())