From cd37e25bde6300f71e0f0d8f2d9598dba7584786 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Tue, 16 Apr 2024 14:23:25 +0200 Subject: [PATCH] sdk: allow types to abstract over encoding --- Cargo.toml | 1 + core/src/core_tests/local_activities.rs | 30 ++- .../machines/local_activity_state_machine.rs | 18 +- .../workflow/machines/patch_state_machine.rs | 10 +- .../upsert_search_attributes_state_machine.rs | 4 +- sdk-core-protos/src/history_builder.rs | 11 +- sdk-core-protos/src/lib.rs | 190 ++++++++++++++---- sdk/Cargo.toml | 1 + sdk/src/lib.rs | 64 ++---- test-utils/src/lib.rs | 4 +- test-utils/src/workflows.rs | 6 +- tests/fuzzy_workflow.rs | 16 +- tests/heavy_tests.rs | 8 +- tests/integ_tests/heartbeat_tests.rs | 4 +- tests/integ_tests/update_tests.rs | 20 +- tests/integ_tests/workflow_tests.rs | 4 +- .../integ_tests/workflow_tests/activities.rs | 21 +- .../workflow_tests/appdata_propagation.rs | 4 +- .../workflow_tests/local_activities.rs | 28 ++- .../workflow_tests/modify_wf_properties.rs | 10 +- .../workflow_tests/upsert_search_attrs.rs | 17 +- 21 files changed, 277 insertions(+), 194 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9da16ffda..4ed53a5d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,4 @@ tonic-build = "0.11" opentelemetry = "0.22" prost = "0.12" prost-types = "0.12" +serde_json = "1.0" diff --git a/core/src/core_tests/local_activities.rs b/core/src/core_tests/local_activities.rs index b61fc8dec..2f97eddd8 100644 --- a/core/src/core_tests/local_activities.rs +++ b/core/src/core_tests/local_activities.rs @@ -34,7 +34,7 @@ use temporal_sdk_core_protos::{ workflow_activation::{workflow_activation_job, WorkflowActivationJob}, workflow_commands::{ActivityCancellationType, ScheduleLocalActivity}, workflow_completion::WorkflowActivationCompletion, - ActivityTaskCompletion, AsJsonPayloadExt, + ActivityTaskCompletion, ToPayload, }, temporal::api::{ common::v1::RetryPolicy, @@ -91,7 +91,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached: |ctx: WfContext| async move { let la = ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), ..Default::default() }); ctx.timer(Duration::from_secs(1)).await; @@ -117,9 +117,7 @@ pub async fn local_act_fanout_wf(ctx: WfContext) -> WorkflowResult<()> { .map(|i| { ctx.local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: format!("Hi {i}") - .as_json_payload() - .expect("serializes fine"), + input: format!("Hi {i}").to_payload().expect("serializes fine"), ..Default::default() }) }) @@ -198,7 +196,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) { |ctx: WfContext| async move { ctx.local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), ..Default::default() }) .await; @@ -254,7 +252,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) { let la_res = ctx .local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_millis(50))), backoff_coefficient: 1.2, @@ -335,7 +333,7 @@ async fn local_act_retry_long_backoff_uses_timer() { let la_res = ctx .local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_millis(65))), // This will make the second backoff 65 seconds, plenty to use timer @@ -389,7 +387,7 @@ async fn local_act_null_result() { |ctx: WfContext| async move { ctx.local_activity(LocalActivityOptions { activity_type: "nullres".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), ..Default::default() }) .await; @@ -432,7 +430,7 @@ async fn local_act_command_immediately_follows_la_marker() { |ctx: WfContext| async move { ctx.local_activity(LocalActivityOptions { activity_type: "nullres".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), ..Default::default() }) .await; @@ -736,7 +734,7 @@ async fn test_schedule_to_start_timeout() { let la_res = ctx .local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), // Impossibly small timeout so we timeout in the queue schedule_to_start_timeout: prost_dur!(from_nanos(1)), ..Default::default() @@ -824,7 +822,7 @@ async fn test_schedule_to_start_timeout_not_based_on_original_time( let la_res = ctx .local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_millis(50))), backoff_coefficient: 1.2, @@ -897,7 +895,7 @@ async fn start_to_close_timeout_allows_retries(#[values(true, false)] la_complet let la_res = ctx .local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_millis(20))), backoff_coefficient: 1.0, @@ -971,7 +969,7 @@ async fn wft_failure_cancels_running_las() { |ctx: WfContext| async move { let la_handle = ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), ..Default::default() }); tokio::join!( @@ -1038,7 +1036,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() { WorkflowFunction::new::<_, _, ()>(|ctx: WfContext| async move { ctx.local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), ..Default::default() }) .await; @@ -1092,7 +1090,7 @@ async fn local_act_records_nonfirst_attempts_ok() { |ctx: WfContext| async move { ctx.local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_millis(10))), backoff_coefficient: 1.0, diff --git a/core/src/worker/workflow/machines/local_activity_state_machine.rs b/core/src/worker/workflow/machines/local_activity_state_machine.rs index 0d696f77c..a887c5bbe 100644 --- a/core/src/worker/workflow/machines/local_activity_state_machine.rs +++ b/core/src/worker/workflow/machines/local_activity_state_machine.rs @@ -893,7 +893,7 @@ mod tests { use temporal_sdk_core_protos::{ coresdk::{ workflow_activation::{workflow_activation_job, WorkflowActivationJob}, - AsJsonPayloadExt, + ToPayload, }, temporal::api::{ common::v1::RetryPolicy, enums::v1::WorkflowTaskFailedCause, failure::v1::Failure, @@ -906,7 +906,7 @@ mod tests { async fn la_wf(ctx: WfContext) -> WorkflowResult<()> { ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: ().as_json_payload().unwrap(), + input: ().to_payload().unwrap(), retry_policy: RetryPolicy { maximum_attempts: 1, ..Default::default() @@ -1003,13 +1003,13 @@ mod tests { async fn two_la_wf(ctx: WfContext) -> WorkflowResult<()> { ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: ().as_json_payload().unwrap(), + input: ().to_payload().unwrap(), ..Default::default() }) .await; ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: ().as_json_payload().unwrap(), + input: ().to_payload().unwrap(), ..Default::default() }) .await; @@ -1020,12 +1020,12 @@ mod tests { tokio::join!( ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: ().as_json_payload().unwrap(), + input: ().to_payload().unwrap(), ..Default::default() }), ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: ().as_json_payload().unwrap(), + input: ().to_payload().unwrap(), ..Default::default() }) ); @@ -1131,14 +1131,14 @@ mod tests { async fn la_timer_la(ctx: WfContext) -> WorkflowResult<()> { ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: ().as_json_payload().unwrap(), + input: ().to_payload().unwrap(), ..Default::default() }) .await; ctx.timer(Duration::from_secs(5)).await; ctx.local_activity(LocalActivityOptions { activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), - input: ().as_json_payload().unwrap(), + input: ().to_payload().unwrap(), ..Default::default() }) .await; @@ -1426,7 +1426,7 @@ mod tests { worker.register_wf(DEFAULT_WORKFLOW_TYPE, move |ctx: WfContext| async move { let la = ctx.local_activity(LocalActivityOptions { cancel_type, - input: ().as_json_payload().unwrap(), + input: ().to_payload().unwrap(), activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), ..Default::default() }); diff --git a/core/src/worker/workflow/machines/patch_state_machine.rs b/core/src/worker/workflow/machines/patch_state_machine.rs index 4adf28f55..e5d9d28cc 100644 --- a/core/src/worker/workflow/machines/patch_state_machine.rs +++ b/core/src/worker/workflow/machines/patch_state_machine.rs @@ -39,7 +39,7 @@ use std::{ }; use temporal_sdk_core_protos::{ constants::PATCH_MARKER_NAME, - coresdk::{common::build_has_change_marker_details, AsJsonPayloadExt}, + coresdk::{common::build_has_change_marker_details, ToPayload}, temporal::api::{ command::v1::{ Command, RecordMarkerCommandAttributes, UpsertWorkflowSearchAttributesCommandAttributes, @@ -135,7 +135,7 @@ pub(super) fn has_change<'a>( let mut all_ids = BTreeSet::from_iter(existing_patch_ids); all_ids.insert(machine.shared_state.patch_id.as_str()); let serialized = all_ids - .as_json_payload() + .to_payload() .context("Could not serialize search attribute value for patch machine") .map_err(|e| WFMachinesError::Fatal(e.to_string()))?; @@ -296,7 +296,7 @@ mod tests { coresdk::{ common::decode_change_marker_details, workflow_activation::{workflow_activation_job, NotifyHasPatch, WorkflowActivationJob}, - AsJsonPayloadExt, FromJsonPayloadExt, + FromPayload, ToPayload, }, temporal::api::{ command::v1::{ @@ -606,7 +606,7 @@ mod tests { { search_attributes: Some(attrs) } ) if attrs.indexed_fields.get(VERSION_SEARCH_ATTR_KEY).unwrap() - == &[MY_PATCH_ID].as_json_payload().unwrap() + == &[MY_PATCH_ID].to_payload().unwrap() ); } // The only time the "old" timer should fire is in v2, replaying, without a marker. @@ -790,7 +790,7 @@ mod tests { ); let expected_patches: HashSet = (1..i).map(|i| format!("patch-{i}")).collect(); - let deserialized = HashSet::::from_json_payload( + let deserialized = HashSet::::from_payload( attrs.indexed_fields.get(VERSION_SEARCH_ATTR_KEY).unwrap(), ) .unwrap(); diff --git a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs index 0d12c509d..ed4871b5a 100644 --- a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs +++ b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs @@ -211,7 +211,7 @@ mod tests { workflow_activation::{workflow_activation_job, WorkflowActivationJob}, workflow_commands::SetPatchMarker, workflow_completion::WorkflowActivationCompletion, - AsJsonPayloadExt, + ToPayload, }, temporal::api::{ command::v1::command::Attributes, common::v1::Payload, @@ -370,7 +370,7 @@ mod tests { let mut ver_upsert = HashMap::new(); ver_upsert.insert( VERSION_SEARCH_ATTR_KEY.to_string(), - "hi".as_json_payload().unwrap(), + "hi".to_payload().unwrap(), ); let act = core.poll_workflow_activation().await.unwrap(); let mut cmds = if with_patched_cmd { diff --git a/sdk-core-protos/src/history_builder.rs b/sdk-core-protos/src/history_builder.rs index 9d4edc195..fabd7eb6a 100644 --- a/sdk-core-protos/src/history_builder.rs +++ b/sdk-core-protos/src/history_builder.rs @@ -7,7 +7,7 @@ use crate::{ }, external_data::LocalActivityMarkerData, workflow_commands::ScheduleActivity, - AsJsonPayloadExt, IntoPayloadsExt, + IntoPayloadsExt, Json, ToPayload, }, temporal::api::{ common::v1::{ @@ -17,8 +17,7 @@ use crate::{ failure::v1::{failure, CanceledFailureInfo, Failure}, history::v1::{history_event::Attributes, *}, taskqueue::v1::TaskQueue, - update, - update::v1::outcome, + update::{self, v1::outcome}, }, HistoryInfo, }; @@ -429,7 +428,7 @@ impl TestHistoryBuilder { let mut indexed_fields = HashMap::new(); indexed_fields.insert( "TemporalChangeVersion".to_string(), - attribs.as_json_payload().unwrap(), + attribs.to_payload().unwrap(), ); let attrs = UpsertWorkflowSearchAttributesEventAttributes { workflow_task_completed_event_id: self.previous_task_completed_id, @@ -605,6 +604,10 @@ impl TestHistoryBuilder { } } +impl ToPayload for &[String] { + type Encoder = Json; +} + fn default_attribs(et: EventType) -> Result { Ok(match et { EventType::WorkflowExecutionStarted => default_wes_attribs().into(), diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index c284f48dc..bf0cab296 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -21,7 +21,7 @@ pub use history_info::HistoryInfo; pub use task_token::TaskToken; pub static ENCODING_PAYLOAD_KEY: &str = "encoding"; -pub static JSON_ENCODING_VAL: &str = "json/plain"; +pub static JSON_ENCODING_VAL: &[u8] = b"json/plain"; pub static PATCHED_MARKER_DETAILS_KEY: &str = "patch-data"; #[allow( @@ -46,9 +46,9 @@ pub mod coresdk { ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL, }; use activity_task::ActivityTask; - use serde::{Deserialize, Serialize}; + use serde::{de::DeserializeOwned, Serialize}; use std::{ - collections::HashMap, + collections::{hash_map::RandomState, BTreeSet, HashMap, HashSet}, convert::TryFrom, fmt::{Display, Formatter}, iter::FromIterator, @@ -273,10 +273,7 @@ pub mod coresdk { tonic::include_proto!("coresdk.common"); use super::external_data::LocalActivityMarkerData; use crate::{ - coresdk::{ - external_data::PatchedMarkerData, AsJsonPayloadExt, FromJsonPayloadExt, - IntoPayloadsExt, - }, + coresdk::{external_data::PatchedMarkerData, FromPayload, IntoPayloadsExt, ToPayload}, temporal::api::common::v1::{Payload, Payloads}, PATCHED_MARKER_DETAILS_KEY, }; @@ -291,7 +288,7 @@ pub mod coresdk { id: patch_id.into(), deprecated, } - .as_json_payload()?; + .to_payload()?; hm.insert(PATCHED_MARKER_DETAILS_KEY.to_string(), encoded.into()); Ok(hm) } @@ -302,7 +299,7 @@ pub mod coresdk { // We used to write change markers with plain bytes, so try to decode if they are // json first, then fall back to that. if let Some(cd) = details.get(PATCHED_MARKER_DETAILS_KEY) { - let decoded = PatchedMarkerData::from_json_payload(cd.payloads.first()?).ok()?; + let decoded = PatchedMarkerData::from_payload(cd.payloads.first()?).ok()?; return Some((decoded.id, decoded.deprecated)); } @@ -320,7 +317,7 @@ pub mod coresdk { let mut hm = HashMap::new(); // It would be more efficient for this to be proto binary, but then it shows up as // meaningless in the Temporal UI... - if let Some(jsonified) = metadata.as_json_payload().into_payloads() { + if let Some(jsonified) = metadata.to_payload().into_payloads() { hm.insert("data".to_string(), jsonified); } if let Some(res) = result { @@ -354,6 +351,7 @@ pub mod coresdk { } pub mod external_data { + use crate::coresdk::{Json, PayloadExt}; use prost_wkt_types::{Duration, Timestamp}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; tonic::include_proto!("coresdk.external_data"); @@ -392,6 +390,14 @@ pub mod coresdk { } } + impl PayloadExt for PatchedMarkerData { + type Encoding = Json; + } + + impl PayloadExt for LocalActivityMarkerData { + type Encoding = Json; + } + // Luckily Duration is also stored the exact same way #[derive(Serialize, Deserialize)] #[serde(remote = "Duration")] @@ -1325,42 +1331,142 @@ pub mod coresdk { DeserializeErr(#[from] anyhow::Error), } - // TODO: Once the prototype SDK is un-prototyped this serialization will need to be compat with - // other SDKs (given they might execute an activity). - pub trait AsJsonPayloadExt { - fn as_json_payload(&self) -> anyhow::Result; + /// JSON encoding for payload types + pub struct Json(()); + + impl Decoder for Json { + type Error = serde_json::Error; + + fn decode(payload: &Payload) -> Result { + serde_json::from_slice(&payload.data) + } } - impl AsJsonPayloadExt for T - where - T: Serialize, - { - fn as_json_payload(&self) -> anyhow::Result { - let as_json = serde_json::to_string(self)?; - let mut metadata = HashMap::new(); - metadata.insert( - ENCODING_PAYLOAD_KEY.to_string(), - JSON_ENCODING_VAL.as_bytes().to_vec(), - ); - Ok(Payload { - metadata, - data: as_json.into_bytes(), - }) + + impl Encoder for Json { + type Error = serde_json::Error; + + fn encode(value: &T) -> Result { + let mut metadata = HashMap::default(); + metadata.insert(ENCODING_PAYLOAD_KEY.to_owned(), JSON_ENCODING_VAL.to_vec()); + serde_json::to_vec(value).map(|data| Payload { metadata, data }) } } - pub trait FromJsonPayloadExt: Sized { - fn from_json_payload(payload: &Payload) -> Result; + pub trait Encoder { + /// Error type for encoding + type Error: std::error::Error + Send + Sync + 'static; + + /// Encode a value of the given type into a [`Payload`] + fn encode(value: &T) -> Result; } - impl FromJsonPayloadExt for T - where - T: for<'de> Deserialize<'de>, - { - fn from_json_payload(payload: &Payload) -> Result { - if !payload.is_json_payload() { - return Err(PayloadDeserializeErr::DeserializerDoesNotHandle); - } - let payload_str = std::str::from_utf8(&payload.data).map_err(anyhow::Error::from)?; - Ok(serde_json::from_str(payload_str).map_err(anyhow::Error::from)?) + + pub trait Decoder { + /// Error type for decoding + type Error: std::error::Error + Send + Sync + 'static; + + /// Decode a [`Payload`] into a value of the desired type + fn decode(payload: &Payload) -> Result; + } + + /// Trait for types that are used as workflow or activity output + pub trait ToPayload { + /// The encoding with which the payload is serialized + type Encoder: Encoder; + + /// Encode a payload of type `Self` into a [`Payload`] + fn to_payload(&self) -> Result>::Error> { + ::encode(self) + } + } + + pub trait FromPayload: Sized { + /// The decoding with which the payload is deserialized + type Decoder: Decoder; + + /// Decode a [`Payload`] into a value of type `Self` + fn from_payload( + payload: &Payload, + ) -> Result>::Error> { + ::decode(payload) + } + } + + /// Trait for types that are used as workflow or activity input or output + pub trait PayloadExt: Sized { + /// The encoding with which the payload is (de)serialized + type Encoding: Encoder + Decoder; + } + + impl FromPayload for T { + type Decoder = T::Encoding; + } + + impl ToPayload for T { + type Encoder = T::Encoding; + } + + impl PayloadExt for String { + type Encoding = Json; + } + + impl ToPayload for &str { + type Encoder = Json; + } + + impl PayloadExt for () { + type Encoding = Json; + } + + impl ToPayload for BTreeSet<&str> { + type Encoder = Json; + } + + impl ToPayload for [&str] { + type Encoder = Json; + } + + impl PayloadExt for i32 { + type Encoding = Json; + } + + impl PayloadExt for usize { + type Encoding = Json; + } + + impl FromPayload for HashSet { + type Decoder = Json; + } + + /// Activity functions may return these values when exiting + pub enum ActExitValue { + /// Completion requires an asynchronous callback + WillCompleteAsync, + /// Finish with a result + Normal(T), + } + + pub trait ActivityOutput { + fn encode(self) -> Result, anyhow::Error>; + } + + impl ActivityOutput for ActExitValue { + fn encode(self) -> Result, anyhow::Error> { + Ok(self) + } + } + + impl ActivityOutput for ActExitValue { + fn encode(self) -> Result, anyhow::Error> { + Ok(match self { + ActExitValue::WillCompleteAsync => ActExitValue::WillCompleteAsync, + ActExitValue::Normal(x) => ActExitValue::Normal(x.to_payload()?), + }) + } + } + + impl ActivityOutput for T { + fn encode(self) -> Result, anyhow::Error> { + Ok(ActExitValue::Normal(self.to_payload()?)) } } @@ -1663,7 +1769,7 @@ pub mod temporal { pub fn is_json_payload(&self) -> bool { self.metadata .get(ENCODING_PAYLOAD_KEY) - .map(|v| v.as_slice() == JSON_ENCODING_VAL.as_bytes()) + .map(|v| v.as_slice() == JSON_ENCODING_VAL) .unwrap_or_default() } } diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 033868906..3bfb8e4ab 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -23,6 +23,7 @@ parking_lot = { version = "0.12", features = ["send_guard"] } prost-types = { version = "0.5", package = "prost-wkt-types" } sha2 = "0.10" serde = "1.0" +serde_json = "1" tokio = { version = "1.26", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs"] } tokio-util = { version = "0.7" } tokio-stream = "0.1" diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 03ec8c903..fb3d506f4 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -64,7 +64,6 @@ use crate::{interceptors::WorkerInterceptor, workflow_context::ChildWfCommon}; use anyhow::{anyhow, bail, Context}; use app_data::AppData; use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; -use serde::Serialize; use std::{ any::{Any, TypeId}, cell::RefCell, @@ -80,6 +79,7 @@ use temporal_sdk_core_api::{ errors::{PollActivityError, PollWfError}, Worker as CoreWorker, }; +pub use temporal_sdk_core_protos::coresdk::ActExitValue; use temporal_sdk_core_protos::{ coresdk::{ activity_result::{ActivityExecutionResult, ActivityResolution}, @@ -92,7 +92,7 @@ use temporal_sdk_core_protos::{ }, workflow_commands::{workflow_command, ContinueAsNewWorkflowExecution}, workflow_completion::WorkflowActivationCompletion, - ActivityTaskCompletion, AsJsonPayloadExt, FromJsonPayloadExt, + ActivityOutput, ActivityTaskCompletion, Encoder, FromPayload, ToPayload, }, temporal::api::{common::v1::Payload, failure::v1::Failure}, TaskToken, @@ -206,10 +206,10 @@ impl Worker { /// Register an Activity function to invoke when the Worker is asked to run an activity of /// `activity_type` - pub fn register_activity( + pub fn register_activity( &mut self, activity_type: impl Into, - act_function: impl IntoActivityFunc, + act_function: impl IntoActivityFunc, ) { self.activity_half.activity_fns.insert( activity_type.into(), @@ -714,7 +714,7 @@ impl From for WorkflowFunction where F: Fn(WfContext) -> Fut + Send + Sync + 'static, Fut: Future, anyhow::Error>> + Send + 'static, - O: Serialize, + O: ToPayload, { fn from(wf_func: F) -> Self { Self::new(wf_func) @@ -727,7 +727,7 @@ impl WorkflowFunction { where F: Fn(WfContext) -> Fut + Send + Sync + 'static, Fut: Future, anyhow::Error>> + Send + 'static, - O: Serialize, + O: ToPayload, { Self { wf_func: Box::new(move |ctx: WfContext| { @@ -738,7 +738,7 @@ impl WorkflowFunction { WfExitValue::ContinueAsNew(b) => WfExitValue::ContinueAsNew(b), WfExitValue::Cancelled => WfExitValue::Cancelled, WfExitValue::Evicted => WfExitValue::Evicted, - WfExitValue::Normal(o) => WfExitValue::Normal(o.as_json_payload()?), + WfExitValue::Normal(o) => WfExitValue::Normal(o.to_payload()?), }) }) }) @@ -774,20 +774,6 @@ impl WfExitValue { } } -/// Activity functions may return these values when exiting -pub enum ActExitValue { - /// Completion requires an asynchronous callback - WillCompleteAsync, - /// Finish with a result - Normal(T), -} - -impl From for ActExitValue { - fn from(t: T) -> Self { - Self::Normal(t) - } -} - type BoxActFn = Arc< dyn Fn(ActContext, Payload) -> BoxFuture<'static, Result, anyhow::Error>> + Send @@ -831,35 +817,24 @@ impl Display for ActivityCancelledError { pub struct NonRetryableActivityError(pub anyhow::Error); /// Closures / functions which can be turned into activity functions implement this trait -pub trait IntoActivityFunc { +pub trait IntoActivityFunc { /// Consume the closure or fn pointer and turned it into a boxed activity function fn into_activity_fn(self) -> BoxActFn; } -impl IntoActivityFunc for F +impl IntoActivityFunc for F where F: (Fn(ActContext, A) -> Rf) + Sync + Send + 'static, - A: FromJsonPayloadExt + Send, + A: FromPayload, Rf: Future> + Send + 'static, - R: Into>, - O: AsJsonPayloadExt, + R: ActivityOutput, { fn into_activity_fn(self) -> BoxActFn { let wrapper = move |ctx: ActContext, input: Payload| { // Some minor gymnastics are required to avoid needing to clone the function - match A::from_json_payload(&input) { + match A::from_payload(&input) { Ok(deser) => (self)(ctx, deser) - .map(|r| { - r.and_then(|r| { - let exit_val: ActExitValue = r.into(); - Ok(match exit_val { - ActExitValue::WillCompleteAsync => ActExitValue::WillCompleteAsync, - ActExitValue::Normal(x) => { - ActExitValue::Normal(x.as_json_payload()?) - } - }) - }) - }) + .map(|r| r.and_then(|r| r.encode())) .boxed(), Err(e) => async move { Err(e.into()) }.boxed(), } @@ -910,11 +885,11 @@ pub trait IntoUpdateValidatorFunc { } impl IntoUpdateValidatorFunc for F where - A: FromJsonPayloadExt + Send, + A: FromPayload + Send, F: (for<'a> Fn(&'a UpdateInfo, A) -> Result<(), anyhow::Error>) + Send + 'static, { fn into_update_validator_fn(self) -> BoxUpdateValidatorFn { - let wrapper = move |ctx: &UpdateInfo, input: &Payload| match A::from_json_payload(input) { + let wrapper = move |ctx: &UpdateInfo, input: &Payload| match A::from_payload(input) { Ok(deser) => (self)(ctx, deser), Err(e) => Err(e.into()), }; @@ -931,15 +906,16 @@ pub trait IntoUpdateHandlerFunc { } impl IntoUpdateHandlerFunc for F where - A: FromJsonPayloadExt + Send, + A: FromPayload + Send, F: (FnMut(UpdateContext, A) -> Rf) + Send + 'static, Rf: Future> + Send + 'static, - R: AsJsonPayloadExt, + R: ToPayload, + <::Encoder as Encoder>::Error: Into, { fn into_update_handler_fn(mut self) -> BoxUpdateHandlerFn { - let wrapper = move |ctx: UpdateContext, input: &Payload| match A::from_json_payload(input) { + let wrapper = move |ctx: UpdateContext, input: &Payload| match A::from_payload(input) { Ok(deser) => (self)(ctx, deser) - .map(|r| r.and_then(|r| r.as_json_payload())) + .map(|r| r.and_then(|r| Ok(r.to_payload()?))) .boxed(), Err(e) => async move { Err(e.into()) }.boxed(), }; diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index a11432ee2..0a373645d 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -421,10 +421,10 @@ impl TestWorker { self.inner.register_wf(workflow_type, wf_function) } - pub fn register_activity( + pub fn register_activity( &mut self, activity_type: impl Into, - act_function: impl IntoActivityFunc, + act_function: impl IntoActivityFunc, ) { self.inner.register_activity(activity_type, act_function) } diff --git a/test-utils/src/workflows.rs b/test-utils/src/workflows.rs index e85e7f8d1..a83218490 100644 --- a/test-utils/src/workflows.rs +++ b/test-utils/src/workflows.rs @@ -1,12 +1,12 @@ use crate::prost_dur; use std::time::Duration; use temporal_sdk::{ActivityOptions, LocalActivityOptions, WfContext, WorkflowResult}; -use temporal_sdk_core_protos::{coresdk::AsJsonPayloadExt, temporal::api::common::v1::RetryPolicy}; +use temporal_sdk_core_protos::{coresdk::ToPayload, temporal::api::common::v1::RetryPolicy}; pub async fn la_problem_workflow(ctx: WfContext) -> WorkflowResult<()> { ctx.local_activity(LocalActivityOptions { activity_type: "delay".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_micros(15))), backoff_coefficient: 1_000., @@ -21,7 +21,7 @@ pub async fn la_problem_workflow(ctx: WfContext) -> WorkflowResult<()> { ctx.activity(ActivityOptions { activity_type: "delay".to_string(), start_to_close_timeout: Some(Duration::from_secs(20)), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }) .await; diff --git a/tests/fuzzy_workflow.rs b/tests/fuzzy_workflow.rs index 8c7c10660..4c1b7453e 100644 --- a/tests/fuzzy_workflow.rs +++ b/tests/fuzzy_workflow.rs @@ -3,7 +3,9 @@ use rand::{prelude::Distribution, rngs::SmallRng, Rng, SeedableRng}; use std::{future, time::Duration}; use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ActContext, ActivityOptions, LocalActivityOptions, WfContext, WorkflowResult}; -use temporal_sdk_core_protos::coresdk::{AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt}; +use temporal_sdk_core_protos::coresdk::{ + FromPayload, IntoPayloadsExt, Json, PayloadExt, ToPayload, +}; use temporal_sdk_core_test_utils::CoreWfStarter; use tokio_util::sync::CancellationToken; @@ -16,6 +18,10 @@ enum FuzzyWfAction { DoLocalAct, } +impl PayloadExt for FuzzyWfAction { + type Encoding = Json; +} + struct FuzzyWfActionSampler; impl Distribution for FuzzyWfActionSampler { fn sample(&self, rng: &mut R) -> FuzzyWfAction { @@ -35,7 +41,7 @@ async fn echo(_ctx: ActContext, echo_me: String) -> Result WorkflowResult<()> { let sigchan = ctx .make_signal_channel(FUZZY_SIG) - .map(|sd| FuzzyWfAction::from_json_payload(&sd.input[0]).expect("Can deserialize signal")); + .map(|sd| FuzzyWfAction::from_payload(&sd.input[0]).expect("Can deserialize signal")); let done = CancellationToken::new(); let done_setter = done.clone(); @@ -47,7 +53,7 @@ async fn fuzzy_wf_def(ctx: WfContext) -> WorkflowResult<()> { .activity(ActivityOptions { activity_type: "echo_activity".to_string(), start_to_close_timeout: Some(Duration::from_secs(5)), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }) .map(|_| ()) @@ -56,7 +62,7 @@ async fn fuzzy_wf_def(ctx: WfContext) -> WorkflowResult<()> { .local_activity(LocalActivityOptions { activity_type: "echo_activity".to_string(), start_to_close_timeout: Some(Duration::from_secs(5)), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }) .map(|_| ()) @@ -112,7 +118,7 @@ async fn fuzzy_workflow() { format!("{wf_name}_{i}"), "".to_string(), FUZZY_SIG.to_string(), - [action.as_json_payload().expect("Serializes ok")].into_payloads(), + [action.to_payload().expect("Serializes ok")].into_payloads(), None, ) }) diff --git a/tests/heavy_tests.rs b/tests/heavy_tests.rs index 68b1f7774..ca39d91b8 100644 --- a/tests/heavy_tests.rs +++ b/tests/heavy_tests.rs @@ -2,9 +2,7 @@ use futures::{future::join_all, sink, stream::FuturesUnordered, StreamExt}; use std::time::{Duration, Instant}; use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ActContext, ActivityOptions, WfContext, WorkflowResult}; -use temporal_sdk_core_protos::coresdk::{ - workflow_commands::ActivityCancellationType, AsJsonPayloadExt, -}; +use temporal_sdk_core_protos::coresdk::{workflow_commands::ActivityCancellationType, ToPayload}; use temporal_sdk_core_test_utils::{workflows::la_problem_workflow, CoreWfStarter}; mod fuzzy_workflow; @@ -27,7 +25,7 @@ async fn activity_load() { let wf_fn = move |ctx: WfContext| { let task_queue = task_queue.clone(); - let payload = "yo".as_json_payload().unwrap(); + let payload = "yo".to_payload().unwrap(); async move { let activity = ActivityOptions { activity_id: Some(activity_id.to_string()), @@ -99,7 +97,7 @@ async fn workflow_load() { ctx.activity(ActivityOptions { activity_type: "echo_activity".to_string(), start_to_close_timeout: Some(Duration::from_secs(5)), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }) .await; diff --git a/tests/integ_tests/heartbeat_tests.rs b/tests/integ_tests/heartbeat_tests.rs index 35d00609b..9be817794 100644 --- a/tests/integ_tests/heartbeat_tests.rs +++ b/tests/integ_tests/heartbeat_tests.rs @@ -11,7 +11,7 @@ use temporal_sdk_core_protos::{ workflow_activation::{workflow_activation_job, ResolveActivity, WorkflowActivationJob}, workflow_commands::{ActivityCancellationType, ScheduleActivity}, workflow_completion::WorkflowActivationCompletion, - ActivityHeartbeat, ActivityTaskCompletion, AsJsonPayloadExt, IntoCompletion, + ActivityHeartbeat, ActivityTaskCompletion, IntoCompletion, ToPayload, }, temporal::api::{ common::v1::{Payload, RetryPolicy}, @@ -191,7 +191,7 @@ async fn activity_doesnt_heartbeat_hits_timeout_then_completes() { let res = ctx .activity(ActivityOptions { activity_type: "echo_activity".to_string(), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), start_to_close_timeout: Some(Duration::from_secs(10)), heartbeat_timeout: Some(Duration::from_secs(2)), retry_policy: Some(RetryPolicy { diff --git a/tests/integ_tests/update_tests.rs b/tests/integ_tests/update_tests.rs index c9d3c97f7..a0185bae2 100644 --- a/tests/integ_tests/update_tests.rs +++ b/tests/integ_tests/update_tests.rs @@ -20,7 +20,7 @@ use temporal_sdk_core_protos::{ update_response, CompleteWorkflowExecution, ScheduleLocalActivity, UpdateResponse, }, workflow_completion::WorkflowActivationCompletion, - ActivityTaskCompletion, AsJsonPayloadExt, IntoPayloadsExt, + ActivityTaskCompletion, IntoPayloadsExt, ToPayload, }, temporal::api::{ enums::v1::{EventType, UpdateWorkflowExecutionLifecycleStage}, @@ -498,7 +498,7 @@ async fn update_with_local_acts() { ctx.wf_ctx .local_activity(LocalActivityOptions { activity_type: "echo_activity".to_string(), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }) .await; @@ -532,7 +532,7 @@ async fn update_with_local_acts() { WaitPolicy { lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed as i32, }, - [().as_json_payload().unwrap()].into_payloads(), + [().to_payload().unwrap()].into_payloads(), ) }); for res in join_all(updates).await { @@ -587,7 +587,7 @@ async fn update_rejection_sdk() { WaitPolicy { lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed as i32, }, - [().as_json_payload().unwrap()].into_payloads(), + [().to_payload().unwrap()].into_payloads(), ) .await .unwrap(); @@ -631,7 +631,7 @@ async fn update_fail_sdk() { WaitPolicy { lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed as i32, }, - [().as_json_payload().unwrap()].into_payloads(), + [().to_payload().unwrap()].into_payloads(), ) .await .unwrap(); @@ -679,7 +679,7 @@ async fn update_timer_sequence() { WaitPolicy { lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed as i32, }, - [().as_json_payload().unwrap()].into_payloads(), + [().to_payload().unwrap()].into_payloads(), ) .await .unwrap(); @@ -730,7 +730,7 @@ async fn task_failure_during_validation() { WaitPolicy { lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed as i32, }, - [().as_json_payload().unwrap()].into_payloads(), + [().to_payload().unwrap()].into_payloads(), ) .await .unwrap(); @@ -794,7 +794,7 @@ async fn task_failure_after_update() { WaitPolicy { lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed as i32, }, - [().as_json_payload().unwrap()].into_payloads(), + [().to_payload().unwrap()].into_payloads(), ) .await .unwrap(); @@ -827,7 +827,7 @@ async fn worker_restarted_in_middle_of_update() { ctx.wf_ctx .activity(ActivityOptions { activity_type: "blocks".to_string(), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), start_to_close_timeout: Some(Duration::from_secs(2)), ..Default::default() }) @@ -859,7 +859,7 @@ async fn worker_restarted_in_middle_of_update() { WaitPolicy { lifecycle_stage: UpdateWorkflowExecutionLifecycleStage::Completed as i32, }, - [().as_json_payload().unwrap()].into_payloads(), + [().to_payload().unwrap()].into_payloads(), ) .await .unwrap(); diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index f9d67a324..526c98c39 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -42,7 +42,7 @@ use temporal_sdk_core_protos::{ ActivityCancellationType, FailWorkflowExecution, QueryResult, QuerySuccess, StartTimer, }, workflow_completion::WorkflowActivationCompletion, - ActivityTaskCompletion, AsJsonPayloadExt, IntoCompletion, + ActivityTaskCompletion, IntoCompletion, ToPayload, }, temporal::api::{ enums::v1::EventType, failure::v1::Failure, history::v1::history_event, @@ -565,7 +565,7 @@ async fn slow_completes_with_small_cache() { ctx.activity(ActivityOptions { activity_type: "echo_activity".to_string(), start_to_close_timeout: Some(Duration::from_secs(5)), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }) .await; diff --git a/tests/integ_tests/workflow_tests/activities.rs b/tests/integ_tests/workflow_tests/activities.rs index 86ca0010a..7f03ad84c 100644 --- a/tests/integ_tests/workflow_tests/activities.rs +++ b/tests/integ_tests/workflow_tests/activities.rs @@ -24,8 +24,7 @@ use temporal_sdk_core_protos::{ ActivityCancellationType, RequestCancelActivity, ScheduleActivity, StartTimer, }, workflow_completion::WorkflowActivationCompletion, - ActivityHeartbeat, ActivityTaskCompletion, AsJsonPayloadExt, FromJsonPayloadExt, - IntoCompletion, + ActivityHeartbeat, ActivityTaskCompletion, FromPayload, IntoCompletion, ToPayload, }, temporal::api::{ common::v1::{ActivityType, Payload, Payloads, RetryPolicy}, @@ -44,7 +43,7 @@ pub async fn one_activity_wf(ctx: WfContext) -> WorkflowResult<()> { ctx.activity(ActivityOptions { activity_type: "echo_activity".to_string(), start_to_close_timeout: Some(Duration::from_secs(5)), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }) .await; @@ -797,7 +796,7 @@ async fn one_activity_abandon_cancelled_after_complete() { let act_fut = ctx.activity(ActivityOptions { activity_type: "echo_activity".to_string(), start_to_close_timeout: Some(Duration::from_secs(5)), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), cancellation_type: ActivityCancellationType::Abandon, ..Default::default() }); @@ -848,16 +847,16 @@ async fn it_can_complete_async() { let activity_resolution = ctx .activity(ActivityOptions { activity_type: "complete_async_activity".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), start_to_close_timeout: Some(Duration::from_secs(30)), ..Default::default() }) .await; let res = match activity_resolution.status { - Some(act_res::Status::Completed(activity_result::Success { result })) => result - .map(|p| String::from_json_payload(&p).unwrap()) - .unwrap(), + Some(act_res::Status::Completed(activity_result::Success { result })) => { + result.map(|p| String::from_payload(&p).unwrap()).unwrap() + } _ => panic!("activity task failed {activity_resolution:?}"), }; @@ -891,7 +890,7 @@ async fn it_can_complete_async() { client .complete_activity_task( TaskToken(task_token), - Some(async_response.as_json_payload().unwrap().into()), + Some(async_response.to_payload().unwrap().into()), ) .await .unwrap(); @@ -932,7 +931,7 @@ async fn graceful_shutdown() { ..Default::default() }), cancellation_type: ActivityCancellationType::WaitCancellationCompleted, - input: "hi".as_json_payload().unwrap(), + input: "hi".to_payload().unwrap(), ..Default::default() }) }); @@ -991,7 +990,7 @@ async fn activity_can_be_cancelled_by_local_timeout() { .activity(ActivityOptions { activity_type: "echo_activity".to_string(), start_to_close_timeout: Some(Duration::from_secs(1)), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), retry_policy: Some(RetryPolicy { maximum_attempts: 1, ..Default::default() diff --git a/tests/integ_tests/workflow_tests/appdata_propagation.rs b/tests/integ_tests/workflow_tests/appdata_propagation.rs index df7972bec..5d1196928 100644 --- a/tests/integ_tests/workflow_tests/appdata_propagation.rs +++ b/tests/integ_tests/workflow_tests/appdata_propagation.rs @@ -2,7 +2,7 @@ use assert_matches::assert_matches; use std::time::Duration; use temporal_client::{WfClientExt, WorkflowExecutionResult, WorkflowOptions}; use temporal_sdk::{ActContext, ActivityOptions, WfContext, WorkflowResult}; -use temporal_sdk_core_protos::coresdk::AsJsonPayloadExt; +use temporal_sdk_core_protos::coresdk::ToPayload; use temporal_sdk_core_test_utils::CoreWfStarter; const TEST_APPDATA_MESSAGE: &str = "custom app data, yay"; @@ -15,7 +15,7 @@ pub async fn appdata_activity_wf(ctx: WfContext) -> WorkflowResult<()> { ctx.activity(ActivityOptions { activity_type: "echo_activity".to_string(), start_to_close_timeout: Some(Duration::from_secs(5)), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }) .await; diff --git a/tests/integ_tests/workflow_tests/local_activities.rs b/tests/integ_tests/workflow_tests/local_activities.rs index e6e03297f..3367cc685 100644 --- a/tests/integ_tests/workflow_tests/local_activities.rs +++ b/tests/integ_tests/workflow_tests/local_activities.rs @@ -16,7 +16,7 @@ use temporal_sdk_core_protos::{ workflow_commands::{workflow_command::Variant, ActivityCancellationType}, workflow_completion, workflow_completion::{workflow_activation_completion, WorkflowActivationCompletion}, - AsJsonPayloadExt, + ToPayload, }, temporal::api::{common::v1::RetryPolicy, enums::v1::TimeoutType}, TestHistoryBuilder, @@ -30,7 +30,7 @@ pub async fn one_local_activity_wf(ctx: WfContext) -> WorkflowResult<()> { let initial_workflow_time = ctx.workflow_time().expect("Workflow time should be set"); ctx.local_activity(LocalActivityOptions { activity_type: "echo_activity".to_string(), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }) .await; @@ -54,7 +54,7 @@ async fn one_local_activity() { pub async fn local_act_concurrent_with_timer_wf(ctx: WfContext) -> WorkflowResult<()> { let la = ctx.local_activity(LocalActivityOptions { activity_type: "echo_activity".to_string(), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }); let timer = ctx.timer(Duration::from_secs(1)); @@ -77,7 +77,7 @@ async fn local_act_concurrent_with_timer() { pub async fn local_act_then_timer_then_wait(ctx: WfContext) -> WorkflowResult<()> { let la = ctx.local_activity(LocalActivityOptions { activity_type: "echo_activity".to_string(), - input: "hi!".as_json_payload().expect("serializes fine"), + input: "hi!".to_payload().expect("serializes fine"), ..Default::default() }); ctx.timer(Duration::from_secs(1)).await; @@ -119,9 +119,7 @@ pub async fn local_act_fanout_wf(ctx: WfContext) -> WorkflowResult<()> { .map(|i| { ctx.local_activity(LocalActivityOptions { activity_type: "echo_activity".to_string(), - input: format!("Hi {i}") - .as_json_payload() - .expect("serializes fine"), + input: format!("Hi {i}").to_payload().expect("serializes fine"), ..Default::default() }) }) @@ -153,7 +151,7 @@ async fn local_act_retry_timer_backoff() { let res = ctx .local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_micros(15))), // We want two local backoffs that are short. Third backoff will use timer @@ -201,7 +199,7 @@ async fn cancel_immediate(#[case] cancel_type: ActivityCancellationType) { worker.register_wf(&wf_name, move |ctx: WfContext| async move { let la = ctx.local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), cancel_type, ..Default::default() }); @@ -289,7 +287,7 @@ async fn cancel_after_act_starts( worker.register_wf(&wf_name, move |ctx: WfContext| async move { let la = ctx.local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(bo_dur.try_into().unwrap()), backoff_coefficient: 1., @@ -382,7 +380,7 @@ async fn x_to_close_timeout(#[case] is_schedule: bool) { let res = ctx .local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_micros(15))), backoff_coefficient: 1_000., @@ -431,7 +429,7 @@ async fn schedule_to_close_timeout_across_timer_backoff(#[case] cached: bool) { let res = ctx .local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_millis(15))), backoff_coefficient: 1_000., @@ -496,7 +494,7 @@ async fn timer_backoff_concurrent_with_non_timer_backoff() { worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { let r1 = ctx.local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_micros(15))), backoff_coefficient: 1_000., @@ -509,7 +507,7 @@ async fn timer_backoff_concurrent_with_non_timer_backoff() { }); let r2 = ctx.local_activity(LocalActivityOptions { activity_type: "echo".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_millis(15))), backoff_coefficient: 10., @@ -543,7 +541,7 @@ async fn repro_nondeterminism_with_timer_bug() { let t1 = ctx.timer(Duration::from_secs(30)); let r1 = ctx.local_activity(LocalActivityOptions { activity_type: "delay".to_string(), - input: "hi".as_json_payload().expect("serializes fine"), + input: "hi".to_payload().expect("serializes fine"), retry_policy: RetryPolicy { initial_interval: Some(prost_dur!(from_micros(15))), backoff_coefficient: 1_000., diff --git a/tests/integ_tests/workflow_tests/modify_wf_properties.rs b/tests/integ_tests/workflow_tests/modify_wf_properties.rs index 17d9cadeb..8062aa1e5 100644 --- a/tests/integ_tests/workflow_tests/modify_wf_properties.rs +++ b/tests/integ_tests/workflow_tests/modify_wf_properties.rs @@ -1,6 +1,6 @@ use temporal_client::WorkflowClientTrait; use temporal_sdk::{WfContext, WorkflowResult}; -use temporal_sdk_core_protos::coresdk::{AsJsonPayloadExt, FromJsonPayloadExt}; +use temporal_sdk_core_protos::coresdk::{FromPayload, ToPayload}; use temporal_sdk_core_test_utils::CoreWfStarter; use uuid::Uuid; @@ -9,8 +9,8 @@ static FIELD_B: &str = "cute_level"; async fn memo_upserter(ctx: WfContext) -> WorkflowResult<()> { ctx.upsert_memo([ - (FIELD_A.to_string(), "enchi".as_json_payload().unwrap()), - (FIELD_B.to_string(), 9001.as_json_payload().unwrap()), + (FIELD_A.to_string(), "enchi".to_payload().unwrap()), + (FIELD_B.to_string(), 9001.to_payload().unwrap()), ]); Ok(().into()) } @@ -46,6 +46,6 @@ async fn sends_modify_wf_props() { for payload in [catname, cuteness] { assert!(payload.is_json_payload()); } - assert_eq!("enchi", String::from_json_payload(catname).unwrap()); - assert_eq!(9001, usize::from_json_payload(cuteness).unwrap()); + assert_eq!("enchi", String::from_payload(catname).unwrap()); + assert_eq!(9001, usize::from_payload(cuteness).unwrap()); } diff --git a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs index bfbe515a9..aef244ade 100644 --- a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs +++ b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, env}; use temporal_client::{WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{WfContext, WorkflowResult}; -use temporal_sdk_core_protos::coresdk::{AsJsonPayloadExt, FromJsonPayloadExt}; +use temporal_sdk_core_protos::coresdk::{FromPayload, ToPayload}; use temporal_sdk_core_test_utils::{CoreWfStarter, INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR}; use tracing::warn; use uuid::Uuid; @@ -13,8 +13,8 @@ static INT_ATTR: &str = "CustomIntField"; async fn search_attr_updater(ctx: WfContext) -> WorkflowResult<()> { ctx.upsert_search_attributes([ - (TXT_ATTR.to_string(), "goodbye".as_json_payload().unwrap()), - (INT_ATTR.to_string(), 98.as_json_payload().unwrap()), + (TXT_ATTR.to_string(), "goodbye".to_payload().unwrap()), + (INT_ATTR.to_string(), 98.to_payload().unwrap()), ]); Ok(().into()) } @@ -40,8 +40,8 @@ async fn sends_upsert() { vec![], WorkflowOptions { search_attributes: Some(HashMap::from([ - (TXT_ATTR.to_string(), "hello".as_json_payload().unwrap()), - (INT_ATTR.to_string(), 1.as_json_payload().unwrap()), + (TXT_ATTR.to_string(), "hello".to_payload().unwrap()), + (INT_ATTR.to_string(), 1.to_payload().unwrap()), ])), ..Default::default() }, @@ -66,9 +66,6 @@ async fn sends_upsert() { for payload in [txt_attr_payload, int_attr_payload] { assert!(payload.is_json_payload()); } - assert_eq!( - "goodbye", - String::from_json_payload(txt_attr_payload).unwrap() - ); - assert_eq!(98, usize::from_json_payload(int_attr_payload).unwrap()); + assert_eq!("goodbye", String::from_payload(txt_attr_payload).unwrap()); + assert_eq!(98, usize::from_payload(int_attr_payload).unwrap()); }