From 7e4e464f3b75eb9401c1e04683617ce7033daddd Mon Sep 17 00:00:00 2001 From: Tharun Rajendran Date: Mon, 18 Dec 2023 12:29:04 +0530 Subject: [PATCH 1/4] feat(workflow): add upsert memo command --- packages/test/src/integration-tests-old.ts | 14 ++++++ packages/test/src/test-sinks.ts | 38 ++++++++++++++ packages/test/src/workflows/index.ts | 1 + .../src/workflows/upsert-and-read-memo.ts | 6 +++ packages/workflow/src/workflow.ts | 50 +++++++++++++++++++ 5 files changed, 109 insertions(+) create mode 100644 packages/test/src/workflows/upsert-and-read-memo.ts diff --git a/packages/test/src/integration-tests-old.ts b/packages/test/src/integration-tests-old.ts index 375c3b27f..67319906b 100644 --- a/packages/test/src/integration-tests-old.ts +++ b/packages/test/src/integration-tests-old.ts @@ -677,6 +677,20 @@ export function runIntegrationTests(codec?: PayloadCodec): void { ); }); + test('Workflow can upsert memo', async (t) => { + const { client } = t.context; + const workflow = await client.start(workflows.upsertAndReadMemo, { + taskQueue: 'test', + workflowId: uuid4(), + args: [{ note2: 'bar' }], + }); + const result = await workflow.result(); + t.deepEqual(result, { + note: 'foo', + note2: 'bar', + }); + }); + test('Workflow can read WorkflowInfo', async (t) => { const { client } = t.context; const workflowId = uuid4(); diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index a8b53af8a..2be6448b7 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -417,6 +417,44 @@ if (RUN_INTEGRATION_TESTS) { ]); }); + test('Sink functions contains upserted memo', async (t) => { + const taskQueue = `${__filename}-${t.title}`; + + const recordedMessages = Array<{ message: string; memo: Record | undefined }>(); + const sinks = asSdkLoggerSink(async (info, message, _attrs) => { + recordedMessages.push({ + message, + memo: info.memo, + }); + }); + + const client = new WorkflowClient(); + + const worker = await Worker.create({ + ...defaultOptions, + taskQueue, + sinks, + }); + await worker.runUntil( + client.execute(workflows.upsertAndReadMemo, { + taskQueue, + workflowId: uuid4(), + args: [{ note: 'foo' }], + }) + ); + + t.deepEqual(recordedMessages, [ + { + message: 'Workflow started', + memo: undefined, + }, + { + message: 'Workflow completed', + memo: { note: 'foo' }, + }, + ]); + }); + test('Core issue 589', async (t) => { const taskQueue = `${__filename}-${t.title}`; diff --git a/packages/test/src/workflows/index.ts b/packages/test/src/workflows/index.ts index e0bfa07f9..9c6748d3d 100644 --- a/packages/test/src/workflows/index.ts +++ b/packages/test/src/workflows/index.ts @@ -96,3 +96,4 @@ export * from './upsert-and-read-search-attributes'; export * from './url-whatwg'; export * from './wait-on-user'; export * from './workflow-cancellation-scenarios'; +export * from './upsert-and-read-memo'; diff --git a/packages/test/src/workflows/upsert-and-read-memo.ts b/packages/test/src/workflows/upsert-and-read-memo.ts new file mode 100644 index 000000000..e33676638 --- /dev/null +++ b/packages/test/src/workflows/upsert-and-read-memo.ts @@ -0,0 +1,6 @@ +import { upsertMemo, workflowInfo } from '@temporalio/workflow'; + +export async function upsertAndReadMemo(memo: Record): Promise | undefined> { + upsertMemo(memo); + return workflowInfo().memo; +} diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index f67a45029..3bf49af4e 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -1352,5 +1352,55 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void }); } +/** + * Updates this Workflow's Memo by merging the provided `memo` with the existing Memo, + * `workflowInfo().memo`. + * + * For example, this Workflow code: + * + * ```ts + * upsertMemo({ + * key1: value, + * }); + * upsertMemo({ + * key2: value, + * }); + * ``` + * + * would result in the Workflow having these Memo: + * + * ```ts + * { + * key1: value, + * key2: value, + * } + * ``` + * + * @param memo The Record to merge. Use a value of `null` to clear a key from the Memo. + */ +export function upsertMemo(memo: Record): void { + const activator = assertInWorkflowContext('Workflow.upsertMemo(...) may only be used from a Workflow Execution.'); + + if (memo == null) { + throw new Error('memo must be a non-null Record'); + } + + activator.pushCommand({ + modifyWorkflowProperties: { + upsertedMemo: mapToPayloads(activator.payloadConverter, memo), + }, + }); + + activator.mutateWorkflowInfo((info: WorkflowInfo): WorkflowInfo => { + return { + ...info, + memo: { + ...info.memo, + ...memo, + }, + }; + }); +} + export const stackTraceQuery = defineQuery('__stack_trace'); export const enhancedStackTraceQuery = defineQuery('__enhanced_stack_trace'); From d88631775839eac58b50ec31238b2fd7ae5f2c54 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Tue, 16 Jul 2024 17:24:28 -0400 Subject: [PATCH 2/4] Convert integration test --- packages/test/src/integration-tests-old.ts | 14 ------- .../test/src/test-integration-workflows.ts | 42 +++++++++++++++++++ packages/workflow/src/workflow.ts | 39 ++++++++++++----- 3 files changed, 71 insertions(+), 24 deletions(-) diff --git a/packages/test/src/integration-tests-old.ts b/packages/test/src/integration-tests-old.ts index abadacc31..a9bfdf542 100644 --- a/packages/test/src/integration-tests-old.ts +++ b/packages/test/src/integration-tests-old.ts @@ -677,20 +677,6 @@ export function runIntegrationTests(codec?: PayloadCodec): void { ); }); - test('Workflow can upsert memo', async (t) => { - const { client } = t.context; - const workflow = await client.start(workflows.upsertAndReadMemo, { - taskQueue: 'test', - workflowId: uuid4(), - args: [{ note2: 'bar' }], - }); - const result = await workflow.result(); - t.deepEqual(result, { - note: 'foo', - note2: 'bar', - }); - }); - test('Workflow can read WorkflowInfo', async (t) => { const { client } = t.context; const workflowId = uuid4(); diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index fd2d3fd4f..8ea2c75c6 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -947,6 +947,48 @@ if (RUN_TIME_SKIPPING_TESTS) { }); } +export async function upsertAndReadMemo(memo: Record): Promise | undefined> { + workflow.upsertMemo(memo); + return workflow.workflowInfo().memo; +} + +test('Workflow can upsert memo', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await startWorkflow(upsertAndReadMemo, { + memo: { + alpha: 'bar1', + bravo: 'bar3', + charlie: { delta: 'bar2', echo: 12 }, + foxtrot: 'bar4', + }, + args: [ + { + alpha: 'bar11', + bravo: null, + charlie: { echo: 34, golf: 'bar5' }, + hotel: 'bar6', + }, + ], + }); + const result = await handle.result(); + t.deepEqual(result, { + alpha: 'bar11', + charlie: { echo: 34, golf: 'bar5' }, + foxtrot: 'bar4', + hotel: 'bar6', + }); + const { memo } = await handle.describe(); + t.deepEqual(memo, { + alpha: 'bar11', + charlie: { echo: 34, golf: 'bar5' }, + foxtrot: 'bar4', + hotel: 'bar6', + }); + }); +}); + export const interceptors: workflow.WorkflowInterceptorsFactory = () => { const interceptorsFactoryFunc = module.exports[`${workflow.workflowInfo().workflowType}Interceptors`]; if (typeof interceptorsFactoryFunc === 'function') { diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 644fdacb2..7414619df 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -1367,17 +1367,26 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void } /** - * Updates this Workflow's Memo by merging the provided `memo` with the existing Memo, - * `workflowInfo().memo`. + * Updates this Workflow's Memos by merging the provided `memo` with existing + * Memos (as returned by `workflowInfo().memo`). * - * For example, this Workflow code: + * New memo is merged by replacing properties of the same name _at the first + * level only_. Setting a property to value `undefined` or `null` clears that + * key from the Memo. + * + * For example: * * ```ts * upsertMemo({ * key1: value, + * key3: { subkey1: value } + * key4: value, * }); * upsertMemo({ - * key2: value, + * key2: value + * key3: { subkey2: value } + * key3: value2, + * key4: undefined, * }); * ``` * @@ -1387,10 +1396,12 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void * { * key1: value, * key2: value, + * key3: { subkey2: value } // Note this object was completely replaced + * // Note that key4 was completely removed * } * ``` * - * @param memo The Record to merge. Use a value of `null` to clear a key from the Memo. + * @param memo The Record to merge. */ export function upsertMemo(memo: Record): void { const activator = assertInWorkflowContext('Workflow.upsertMemo(...) may only be used from a Workflow Execution.'); @@ -1401,17 +1412,25 @@ export function upsertMemo(memo: Record): void { activator.pushCommand({ modifyWorkflowProperties: { - upsertedMemo: mapToPayloads(activator.payloadConverter, memo), + upsertedMemo: { + fields: mapToPayloads( + activator.payloadConverter, + // Convert null to undefined + Object.fromEntries(Object.entries(memo).map(([k, v]) => [k, v ?? undefined])) + ), + }, }, }); activator.mutateWorkflowInfo((info: WorkflowInfo): WorkflowInfo => { return { ...info, - memo: { - ...info.memo, - ...memo, - }, + memo: Object.fromEntries( + Object.entries({ + ...info.memo, + ...memo, + }).filter(([_, v]) => v != null) + ), }; }); } From fa74d2ec30e73c64db1cfa6989766bb9a0216a52 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Tue, 16 Jul 2024 17:33:16 -0400 Subject: [PATCH 3/4] Fix incorrect comment --- packages/workflow/src/workflow.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/workflow/src/workflow.ts b/packages/workflow/src/workflow.ts index 7414619df..dc74bd491 100644 --- a/packages/workflow/src/workflow.ts +++ b/packages/workflow/src/workflow.ts @@ -1385,7 +1385,6 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void * upsertMemo({ * key2: value * key3: { subkey2: value } - * key3: value2, * key4: undefined, * }); * ``` From c71b52d340a4ec9d5f29310b98021ab547d3c7a4 Mon Sep 17 00:00:00 2001 From: James Watkins-Harvey Date: Tue, 16 Jul 2024 18:43:16 -0400 Subject: [PATCH 4/4] Also convert sinks test --- packages/test/src/helpers.ts | 18 ++++- .../test/src/test-integration-workflows.ts | 49 +++++++++++++- packages/test/src/test-sinks.ts | 65 +------------------ 3 files changed, 67 insertions(+), 65 deletions(-) diff --git a/packages/test/src/helpers.ts b/packages/test/src/helpers.ts index 318a53502..350f01aa6 100644 --- a/packages/test/src/helpers.ts +++ b/packages/test/src/helpers.ts @@ -5,7 +5,7 @@ import ava, { TestFn } from 'ava'; import * as grpc from '@grpc/grpc-js'; import asyncRetry from 'async-retry'; import { v4 as uuid4 } from 'uuid'; -import { inWorkflowContext } from '@temporalio/workflow'; +import { inWorkflowContext, WorkflowInfo } from '@temporalio/workflow'; import { Payload, PayloadCodec } from '@temporalio/common'; import { Worker as RealWorker, WorkerOptions } from '@temporalio/worker'; import * as worker from '@temporalio/worker'; @@ -16,6 +16,7 @@ import { TestWorkflowEnvironment as RealTestWorkflowEnvironment, TimeSkippingTestWorkflowEnvironmentOptions, } from '@temporalio/testing'; +import { LoggerSinksInternal as DefaultLoggerSinks } from '@temporalio/workflow/lib/logs'; export function u8(s: string): Uint8Array { // TextEncoder requires lib "dom" @@ -260,3 +261,18 @@ export async function getRandomPort(fn = (_port: number) => Promise.resolve()): }); }); } + +export function asSdkLoggerSink( + fn: (info: WorkflowInfo, message: string, attrs?: Record) => Promise, + opts?: Omit, 'fn'> +): worker.InjectedSinks { + return { + __temporal_logger: { + trace: { fn, ...opts }, + debug: { fn, ...opts }, + info: { fn, ...opts }, + warn: { fn, ...opts }, + error: { fn, ...opts }, + }, + }; +} diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 8ea2c75c6..c6fd68b3e 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -15,7 +15,7 @@ import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; import { Context, helpers, makeTestFunction } from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; -import { RUN_TIME_SKIPPING_TESTS } from './helpers'; +import { RUN_TIME_SKIPPING_TESTS, asSdkLoggerSink } from './helpers'; const test = makeTestFunction({ workflowsPath: __filename, workflowInterceptorModules: [__filename] }); @@ -989,6 +989,53 @@ test('Workflow can upsert memo', async (t) => { }); }); +test('Sink functions contains upserted memo', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const recordedMessages = Array<{ message: string; memo: Record | undefined }>(); + const sinks = asSdkLoggerSink(async (info, message, _attrs) => { + recordedMessages.push({ + message, + memo: info.memo, + }); + }); + const worker = await createWorker({ sinks }); + await worker.runUntil(async () => { + await executeWorkflow(upsertAndReadMemo, { + memo: { + note1: 'aaa', + note2: 'bbb', + note4: 'eee', + }, + args: [ + { + note2: 'ccc', + note3: 'ddd', + note4: null, + }, + ], + }); + }); + + t.deepEqual(recordedMessages, [ + { + message: 'Workflow started', + memo: { + note1: 'aaa', + note2: 'bbb', + note4: 'eee', + }, + }, + { + message: 'Workflow completed', + memo: { + note1: 'aaa', + note2: 'ccc', + note3: 'ddd', + }, + }, + ]); +}); + export const interceptors: workflow.WorkflowInterceptorsFactory = () => { const interceptorsFactoryFunc = module.exports[`${workflow.workflowInfo().workflowType}Interceptors`]; if (typeof interceptorsFactoryFunc === 'function') { diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index 45b67fba8..8f2462f09 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -2,19 +2,11 @@ import test from 'ava'; import { v4 as uuid4 } from 'uuid'; import { Connection, WorkflowClient } from '@temporalio/client'; -import { - DefaultLogger, - InjectedSinks, - Runtime, - InjectedSinkFunction, - WorkerOptions, - LogEntry, -} from '@temporalio/worker'; -import { LoggerSinksInternal as DefaultLoggerSinks } from '@temporalio/workflow/lib/logs'; +import { DefaultLogger, InjectedSinks, Runtime, WorkerOptions, LogEntry } from '@temporalio/worker'; import { SearchAttributes, WorkflowInfo } from '@temporalio/workflow'; import { UnsafeWorkflowInfo } from '@temporalio/workflow/src/interfaces'; import { SdkComponent } from '@temporalio/common'; -import { RUN_INTEGRATION_TESTS, Worker, registerDefaultCustomSearchAttributes } from './helpers'; +import { RUN_INTEGRATION_TESTS, Worker, asSdkLoggerSink, registerDefaultCustomSearchAttributes } from './helpers'; import { defaultOptions } from './mock-native-worker'; import * as workflows from './workflows'; @@ -27,21 +19,6 @@ class DependencyError extends Error { } } -function asSdkLoggerSink( - fn: (info: WorkflowInfo, message: string, attrs?: Record) => Promise, - opts?: Omit, 'fn'> -): InjectedSinks { - return { - __temporal_logger: { - trace: { fn, ...opts }, - debug: { fn, ...opts }, - info: { fn, ...opts }, - warn: { fn, ...opts }, - error: { fn, ...opts }, - }, - }; -} - if (RUN_INTEGRATION_TESTS) { const recordedLogs: { [workflowId: string]: LogEntry[] } = {}; @@ -432,44 +409,6 @@ if (RUN_INTEGRATION_TESTS) { ]); }); - test('Sink functions contains upserted memo', async (t) => { - const taskQueue = `${__filename}-${t.title}`; - - const recordedMessages = Array<{ message: string; memo: Record | undefined }>(); - const sinks = asSdkLoggerSink(async (info, message, _attrs) => { - recordedMessages.push({ - message, - memo: info.memo, - }); - }); - - const client = new WorkflowClient(); - - const worker = await Worker.create({ - ...defaultOptions, - taskQueue, - sinks, - }); - await worker.runUntil( - client.execute(workflows.upsertAndReadMemo, { - taskQueue, - workflowId: uuid4(), - args: [{ note: 'foo' }], - }) - ); - - t.deepEqual(recordedMessages, [ - { - message: 'Workflow started', - memo: undefined, - }, - { - message: 'Workflow completed', - memo: { note: 'foo' }, - }, - ]); - }); - test('Core issue 589', async (t) => { const taskQueue = `${__filename}-${t.title}`;