Skip to content

Commit

Permalink
sdk: allow types to abstract over encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
djc committed Apr 16, 2024
1 parent 409e74e commit cd37e25
Show file tree
Hide file tree
Showing 21 changed files with 277 additions and 194 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ tonic-build = "0.11"
opentelemetry = "0.22"
prost = "0.12"
prost-types = "0.12"
serde_json = "1.0"
30 changes: 14 additions & 16 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use temporal_sdk_core_protos::{
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::{ActivityCancellationType, ScheduleLocalActivity},
workflow_completion::WorkflowActivationCompletion,
ActivityTaskCompletion, AsJsonPayloadExt,
ActivityTaskCompletion, ToPayload,
},
temporal::api::{
common::v1::RetryPolicy,
Expand Down Expand Up @@ -91,7 +91,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
|ctx: WfContext| async move {
let la = ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
});
ctx.timer(Duration::from_secs(1)).await;
Expand All @@ -117,9 +117,7 @@ pub async fn local_act_fanout_wf(ctx: WfContext) -> WorkflowResult<()> {
.map(|i| {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: format!("Hi {i}")
.as_json_payload()
.expect("serializes fine"),
input: format!("Hi {i}").to_payload().expect("serializes fine"),
..Default::default()
})
})
Expand Down Expand Up @@ -198,7 +196,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
})
.await;
Expand Down Expand Up @@ -254,7 +252,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(50))),
backoff_coefficient: 1.2,
Expand Down Expand Up @@ -335,7 +333,7 @@ async fn local_act_retry_long_backoff_uses_timer() {
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(65))),
// This will make the second backoff 65 seconds, plenty to use timer
Expand Down Expand Up @@ -389,7 +387,7 @@ async fn local_act_null_result() {
|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "nullres".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
})
.await;
Expand Down Expand Up @@ -432,7 +430,7 @@ async fn local_act_command_immediately_follows_la_marker() {
|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "nullres".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
})
.await;
Expand Down Expand Up @@ -736,7 +734,7 @@ async fn test_schedule_to_start_timeout() {
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
// Impossibly small timeout so we timeout in the queue
schedule_to_start_timeout: prost_dur!(from_nanos(1)),
..Default::default()
Expand Down Expand Up @@ -824,7 +822,7 @@ async fn test_schedule_to_start_timeout_not_based_on_original_time(
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(50))),
backoff_coefficient: 1.2,
Expand Down Expand Up @@ -897,7 +895,7 @@ async fn start_to_close_timeout_allows_retries(#[values(true, false)] la_complet
let la_res = ctx
.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(20))),
backoff_coefficient: 1.0,
Expand Down Expand Up @@ -971,7 +969,7 @@ async fn wft_failure_cancels_running_las() {
|ctx: WfContext| async move {
let la_handle = ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
});
tokio::join!(
Expand Down Expand Up @@ -1038,7 +1036,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() {
WorkflowFunction::new::<_, _, ()>(|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
..Default::default()
})
.await;
Expand Down Expand Up @@ -1092,7 +1090,7 @@ async fn local_act_records_nonfirst_attempts_ok() {
|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
input: "hi".to_payload().expect("serializes fine"),
retry_policy: RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(10))),
backoff_coefficient: 1.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ mod tests {
use temporal_sdk_core_protos::{
coresdk::{
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
AsJsonPayloadExt,
ToPayload,
},
temporal::api::{
common::v1::RetryPolicy, enums::v1::WorkflowTaskFailedCause, failure::v1::Failure,
Expand All @@ -906,7 +906,7 @@ mod tests {
async fn la_wf(ctx: WfContext) -> WorkflowResult<()> {
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
retry_policy: RetryPolicy {
maximum_attempts: 1,
..Default::default()
Expand Down Expand Up @@ -1003,13 +1003,13 @@ mod tests {
async fn two_la_wf(ctx: WfContext) -> WorkflowResult<()> {
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
.await;
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
.await;
Expand All @@ -1020,12 +1020,12 @@ mod tests {
tokio::join!(
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
}),
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
);
Expand Down Expand Up @@ -1131,14 +1131,14 @@ mod tests {
async fn la_timer_la(ctx: WfContext) -> WorkflowResult<()> {
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
.await;
ctx.timer(Duration::from_secs(5)).await;
ctx.local_activity(LocalActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
..Default::default()
})
.await;
Expand Down Expand Up @@ -1426,7 +1426,7 @@ mod tests {
worker.register_wf(DEFAULT_WORKFLOW_TYPE, move |ctx: WfContext| async move {
let la = ctx.local_activity(LocalActivityOptions {
cancel_type,
input: ().as_json_payload().unwrap(),
input: ().to_payload().unwrap(),
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
..Default::default()
});
Expand Down
10 changes: 5 additions & 5 deletions core/src/worker/workflow/machines/patch_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::{
};
use temporal_sdk_core_protos::{
constants::PATCH_MARKER_NAME,
coresdk::{common::build_has_change_marker_details, AsJsonPayloadExt},
coresdk::{common::build_has_change_marker_details, ToPayload},
temporal::api::{
command::v1::{
Command, RecordMarkerCommandAttributes, UpsertWorkflowSearchAttributesCommandAttributes,
Expand Down Expand Up @@ -135,7 +135,7 @@ pub(super) fn has_change<'a>(
let mut all_ids = BTreeSet::from_iter(existing_patch_ids);
all_ids.insert(machine.shared_state.patch_id.as_str());
let serialized = all_ids
.as_json_payload()
.to_payload()
.context("Could not serialize search attribute value for patch machine")
.map_err(|e| WFMachinesError::Fatal(e.to_string()))?;

Expand Down Expand Up @@ -296,7 +296,7 @@ mod tests {
coresdk::{
common::decode_change_marker_details,
workflow_activation::{workflow_activation_job, NotifyHasPatch, WorkflowActivationJob},
AsJsonPayloadExt, FromJsonPayloadExt,
FromPayload, ToPayload,
},
temporal::api::{
command::v1::{
Expand Down Expand Up @@ -606,7 +606,7 @@ mod tests {
{ search_attributes: Some(attrs) }
)
if attrs.indexed_fields.get(VERSION_SEARCH_ATTR_KEY).unwrap()
== &[MY_PATCH_ID].as_json_payload().unwrap()
== &[MY_PATCH_ID].to_payload().unwrap()
);
}
// The only time the "old" timer should fire is in v2, replaying, without a marker.
Expand Down Expand Up @@ -790,7 +790,7 @@ mod tests {
);
let expected_patches: HashSet<String, _> =
(1..i).map(|i| format!("patch-{i}")).collect();
let deserialized = HashSet::<String, RandomState>::from_json_payload(
let deserialized = HashSet::<String, RandomState>::from_payload(
attrs.indexed_fields.get(VERSION_SEARCH_ATTR_KEY).unwrap(),
)
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod tests {
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::SetPatchMarker,
workflow_completion::WorkflowActivationCompletion,
AsJsonPayloadExt,
ToPayload,
},
temporal::api::{
command::v1::command::Attributes, common::v1::Payload,
Expand Down Expand Up @@ -370,7 +370,7 @@ mod tests {
let mut ver_upsert = HashMap::new();
ver_upsert.insert(
VERSION_SEARCH_ATTR_KEY.to_string(),
"hi".as_json_payload().unwrap(),
"hi".to_payload().unwrap(),
);
let act = core.poll_workflow_activation().await.unwrap();
let mut cmds = if with_patched_cmd {
Expand Down
11 changes: 7 additions & 4 deletions sdk-core-protos/src/history_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
},
external_data::LocalActivityMarkerData,
workflow_commands::ScheduleActivity,
AsJsonPayloadExt, IntoPayloadsExt,
IntoPayloadsExt, Json, ToPayload,
},
temporal::api::{
common::v1::{
Expand All @@ -17,8 +17,7 @@ use crate::{
failure::v1::{failure, CanceledFailureInfo, Failure},
history::v1::{history_event::Attributes, *},
taskqueue::v1::TaskQueue,
update,
update::v1::outcome,
update::{self, v1::outcome},
},
HistoryInfo,
};
Expand Down Expand Up @@ -429,7 +428,7 @@ impl TestHistoryBuilder {
let mut indexed_fields = HashMap::new();
indexed_fields.insert(
"TemporalChangeVersion".to_string(),
attribs.as_json_payload().unwrap(),
attribs.to_payload().unwrap(),
);
let attrs = UpsertWorkflowSearchAttributesEventAttributes {
workflow_task_completed_event_id: self.previous_task_completed_id,
Expand Down Expand Up @@ -605,6 +604,10 @@ impl TestHistoryBuilder {
}
}

impl ToPayload for &[String] {
type Encoder = Json;
}

fn default_attribs(et: EventType) -> Result<Attributes> {
Ok(match et {
EventType::WorkflowExecutionStarted => default_wes_attribs().into(),
Expand Down
Loading

0 comments on commit cd37e25

Please sign in to comment.