diff --git a/temporalcli/client.go b/temporalcli/client.go index d243f668..cb75e936 100644 --- a/temporalcli/client.go +++ b/temporalcli/client.go @@ -39,20 +39,25 @@ func (c *ClientOptions) dialClient(cctx *CommandContext) (client.Client, error) // Headers if len(c.GrpcMeta) > 0 { - headers := make(stringMapHeadersProvider, len(c.GrpcMeta)) - for _, kv := range c.GrpcMeta { - pieces := strings.SplitN(kv, "=", 2) - if len(pieces) != 2 { - return nil, fmt.Errorf("gRPC meta of %q does not have '='", kv) - } - headers[pieces[0]] = pieces[1] + headers, err := NewStringMapHeaderProvider(c.GrpcMeta) + if err != nil { + return nil, fmt.Errorf("grpc-meta %s", err) } clientOptions.HeadersProvider = headers } // Remote codec if c.CodecEndpoint != "" { - interceptor, err := payloadCodecInterceptor(c.Namespace, c.CodecEndpoint, c.CodecAuth) + codecHeaders, err := NewStringMapHeaderProvider(c.CodecHeader) + if err != nil { + return nil, fmt.Errorf("codec-header %s", err) + } + + if c.CodecAuth != "" { + codecHeaders["Authorization"] = c.CodecAuth + } + + interceptor, err := payloadCodecInterceptor(c.Namespace, c.CodecEndpoint, codecHeaders) if err != nil { return nil, fmt.Errorf("failed creating payload codec interceptor: %w", err) } @@ -145,7 +150,7 @@ func fixedHeaderOverrideInterceptor( return invoker(ctx, method, req, reply, cc, opts...) } -func payloadCodecInterceptor(namespace, codecEndpoint, codecAuth string) (grpc.UnaryClientInterceptor, error) { +func payloadCodecInterceptor(namespace, codecEndpoint string, codecHeaders stringMapHeadersProvider) (grpc.UnaryClientInterceptor, error) { codecEndpoint = strings.ReplaceAll(codecEndpoint, "{namespace}", namespace) payloadCodec := converter.NewRemotePayloadCodec( @@ -153,8 +158,8 @@ func payloadCodecInterceptor(namespace, codecEndpoint, codecAuth string) (grpc.U Endpoint: codecEndpoint, ModifyRequest: func(req *http.Request) error { req.Header.Set("X-Namespace", namespace) - if codecAuth != "" { - req.Header.Set("Authorization", codecAuth) + for headerName, headerValue := range codecHeaders { + req.Header.Set(headerName, headerValue) } return nil }, @@ -185,6 +190,18 @@ func (s stringMapHeadersProvider) GetHeaders(context.Context) (map[string]string return s, nil } +func NewStringMapHeaderProvider(config []string) (stringMapHeadersProvider, error) { + headers := make(stringMapHeadersProvider, len(config)) + for _, kv := range config { + pieces := strings.SplitN(kv, "=", 2) + if len(pieces) != 2 { + return nil, fmt.Errorf("%q does not have '='", kv) + } + headers[pieces[0]] = pieces[1] + } + return headers, nil +} + var DataConverterWithRawValue = converter.NewCompositeDataConverter( rawValuePayloadConverter{}, converter.NewNilPayloadConverter(), diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 66fe02dc..04c62658 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -32,6 +32,7 @@ type ClientOptions struct { TlsServerName string CodecEndpoint string CodecAuth string + CodecHeader []string } func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { @@ -41,7 +42,7 @@ func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { cctx.BindFlagEnvVar(f.Lookup("namespace"), "TEMPORAL_NAMESPACE") f.StringVar(&v.ApiKey, "api-key", "", "API key for request.") cctx.BindFlagEnvVar(f.Lookup("api-key"), "TEMPORAL_API_KEY") - f.StringArrayVar(&v.GrpcMeta, "grpc-meta", nil, "HTTP headers for requests. format as a `KEY=VALUE` pair May be passed multiple times to set multiple headers.") + f.StringArrayVar(&v.GrpcMeta, "grpc-meta", nil, "HTTP headers for requests. Format as a `KEY=VALUE` pair. May be passed multiple times to set multiple headers.") f.BoolVar(&v.Tls, "tls", false, "Enable base TLS encryption. Does not have additional options like mTLS or client certs.") cctx.BindFlagEnvVar(f.Lookup("tls"), "TEMPORAL_TLS") f.StringVar(&v.TlsCertPath, "tls-cert-path", "", "Path to x509 certificate. Can't be used with --tls-cert-data.") @@ -64,6 +65,7 @@ func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { cctx.BindFlagEnvVar(f.Lookup("codec-endpoint"), "TEMPORAL_CODEC_ENDPOINT") f.StringVar(&v.CodecAuth, "codec-auth", "", "Authorization header for Codec Server requests.") cctx.BindFlagEnvVar(f.Lookup("codec-auth"), "TEMPORAL_CODEC_AUTH") + f.StringArrayVar(&v.CodecHeader, "codec-header", nil, "HTTP headers for requests to codec server. Format as a `KEY=VALUE` pair. May be passed multiple times to set multiple headers.") } type OverlapPolicyOptions struct { @@ -2861,6 +2863,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowResultCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowShowCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowSignalCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command) @@ -3055,6 +3058,7 @@ type TemporalWorkflowListCommand struct { Query string Archived bool Limit int + PageSize int } func NewTemporalWorkflowListCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowListCommand { @@ -3072,6 +3076,7 @@ func NewTemporalWorkflowListCommand(cctx *CommandContext, parent *TemporalWorkfl s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.") s.Command.Flags().BoolVar(&s.Archived, "archived", false, "Limit output to archived Workflow Executions.") s.Command.Flags().IntVar(&s.Limit, "limit", 0, "Maximum number of Workflow Executions to display.") + s.Command.Flags().IntVar(&s.PageSize, "page-size", 0, "Maximum number of Workflow Executions to fetch at a time from the server.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) @@ -3288,6 +3293,52 @@ func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWork return &s } +type TemporalWorkflowSignalWithStartCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SharedWorkflowStartOptions + WorkflowStartOptions + PayloadInputOptions + SignalName string + SignalInput []string + SignalInputFile []string + SignalInputMeta []string + SignalInputBase64 bool +} + +func NewTemporalWorkflowSignalWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowSignalWithStartCommand { + var s TemporalWorkflowSignalWithStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "signal-with-start [flags]" + s.Command.Short = "Send a message to a Workflow Execution, start the execution if it isn't running" + if hasHighlighting { + s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n\x1b[1mtemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + } else { + s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n```\ntemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.SignalName, "signal-name", "", "Signal name. Required. Aliased as \"--signal-type\".") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "signal-name") + s.Command.Flags().StringArrayVar(&s.SignalInput, "signal-input", nil, "Signal input value. Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input-file. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.SignalInputFile, "signal-input-file", nil, "A path or paths for input file(s). Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.SignalInputMeta, "signal-input-meta", nil, "Input signal payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.") + s.Command.Flags().BoolVar(&s.SignalInputBase64, "signal-input-base64", false, "Assume signal inputs are base64-encoded and attempt to decode them.") + s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ + "name": "type", + "signal-type": "signal-name", + })) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowStackCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command diff --git a/temporalcli/commands.go b/temporalcli/commands.go index b9ba6322..217f2f29 100644 --- a/temporalcli/commands.go +++ b/temporalcli/commands.go @@ -23,8 +23,10 @@ import ( "github.com/temporalio/cli/temporalcli/internal/printer" "github.com/temporalio/ui-server/v2/server/version" "go.temporal.io/api/common/v1" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/failure/v1" "go.temporal.io/api/temporalproto" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/temporal" "go.temporal.io/server/common/headers" "google.golang.org/grpc" @@ -589,3 +591,20 @@ func fromApplicationError(err *temporal.ApplicationError) (*structuredError, err Details: deets, }, nil } + +func encodeMapToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) { + if len(in) == 0 { + return nil, nil + } + // search attributes always use default dataconverter + dc := converter.GetDefaultDataConverter() + out := make(map[string]*commonpb.Payload, len(in)) + for key, val := range in { + payload, err := dc.ToPayload(val) + if err != nil { + return nil, err + } + out[key] = payload + } + return out, nil +} diff --git a/temporalcli/commands.schedule.go b/temporalcli/commands.schedule.go index e458ba91..3d645cb2 100644 --- a/temporalcli/commands.schedule.go +++ b/temporalcli/commands.schedule.go @@ -16,7 +16,6 @@ import ( schedpb "go.temporal.io/api/schedule/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" - "go.temporal.io/sdk/converter" "go.temporal.io/server/common/primitives/timestamp" ) @@ -257,7 +256,7 @@ func toScheduleAction(sw *SharedWorkflowStartOptions, i *PayloadInputOptions) (c if err != nil { return nil, err } - untypedSearchAttributes, err := encodeSearchAttributesToPayloads(opts.SearchAttributes) + untypedSearchAttributes, err := encodeMapToPayloads(opts.SearchAttributes) if err != nil { return nil, err } @@ -605,20 +604,3 @@ func formatDuration(d time.Duration) string { s = strings.TrimSpace(s) return s } - -func encodeSearchAttributesToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) { - if len(in) == 0 { - return nil, nil - } - // search attributes always use default dataconverter - dc := converter.GetDefaultDataConverter() - out := make(map[string]*commonpb.Payload, len(in)) - for key, val := range in { - payload, err := dc.ToPayload(val) - if err != nil { - return nil, err - } - out[key] = payload - } - return out, nil -} diff --git a/temporalcli/commands.worker.deployment_test.go b/temporalcli/commands.worker.deployment_test.go index 3ebdcbdd..439f326d 100644 --- a/temporalcli/commands.worker.deployment_test.go +++ b/temporalcli/commands.worker.deployment_test.go @@ -61,20 +61,6 @@ type jsonDeploymentVersionInfoType struct { Metadata map[string]*common.Payload `json:"metadata"` } -/* -type jsonDeploymentReachabilityInfoType struct { - DeploymentInfo jsonDeploymentInfoType `json:"deploymentInfo"` - Reachability string `json:"reachability"` - LastUpdateTime time.Time `json:"lastUpdateTime"` -} - -type jsonDeploymentListEntryType struct { - Deployment jsonDeploymentType `json:"deployment"` - CreateTime time.Time `json:"createTime"` - IsCurrent bool `json:"isCurrent"` -} -*/ - func (s *SharedServerSuite) TestDeployment_Set_Current_Version() { deploymentName := uuid.NewString() buildId := uuid.NewString() diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index b885e4ca..08393a7d 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -12,12 +12,18 @@ import ( "time" "github.com/fatih/color" + "github.com/google/uuid" "github.com/temporalio/cli/temporalcli/internal/printer" "go.temporal.io/api/common/v1" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/history/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/temporalproto" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "google.golang.org/protobuf/types/known/durationpb" ) func (c *TemporalWorkflowStartCommand) run(cctx *CommandContext, args []string) error { @@ -92,6 +98,112 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string return err } +func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []string) error { + if c.SharedWorkflowStartOptions.WorkflowId == "" { + return fmt.Errorf("--workflow-id flag must be provided") + } + + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + wfStartOpts, err := buildStartOptions(&c.SharedWorkflowStartOptions, &c.WorkflowStartOptions) + if err != nil { + return err + } + wfInput, err := c.buildRawInputPayloads() + if err != nil { + return err + } + + signalPayloadInputOpts := PayloadInputOptions{ + Input: c.SignalInput, + InputFile: c.SignalInputFile, + InputMeta: c.InputMeta, + InputBase64: c.SignalInputBase64, + } + signalInput, err := signalPayloadInputOpts.buildRawInputPayloads() + if err != nil { + return err + } + + var retryPolicy *common.RetryPolicy + if wfStartOpts.RetryPolicy != nil { + retryPolicy = &commonpb.RetryPolicy{ + MaximumInterval: durationpb.New(wfStartOpts.RetryPolicy.MaximumInterval), + InitialInterval: durationpb.New(wfStartOpts.RetryPolicy.InitialInterval), + BackoffCoefficient: wfStartOpts.RetryPolicy.BackoffCoefficient, + MaximumAttempts: wfStartOpts.RetryPolicy.MaximumAttempts, + NonRetryableErrorTypes: wfStartOpts.RetryPolicy.NonRetryableErrorTypes, + } + } + var memo *common.Memo + if wfStartOpts.Memo != nil { + fields, err := encodeMapToPayloads(wfStartOpts.Memo) + if err != nil { + return err + } + memo = &common.Memo{Fields: fields} + } + var searchAttr *common.SearchAttributes + if wfStartOpts.SearchAttributes != nil { + fields, err := encodeMapToPayloads(wfStartOpts.SearchAttributes) + if err != nil { + return err + } + searchAttr = &common.SearchAttributes{IndexedFields: fields} + } + + if wfStartOpts.VersioningOverride != (client.VersioningOverride{}) { + cctx.Logger.Warn("VersioningOverride is not configured for the signal-with-start command") + } + + // We have to use the raw signal service call here because the Go SDK's + // signal-with-start call doesn't accept multiple signal arguments. + resp, err := cl.WorkflowService().SignalWithStartWorkflowExecution( + cctx, + &workflowservice.SignalWithStartWorkflowExecutionRequest{ + Namespace: c.Parent.Namespace, + RequestId: uuid.NewString(), + WorkflowId: c.WorkflowId, + WorkflowType: &common.WorkflowType{Name: c.Type}, + TaskQueue: &taskqueuepb.TaskQueue{Name: c.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: wfInput, + WorkflowExecutionTimeout: durationpb.New(wfStartOpts.WorkflowExecutionTimeout), + WorkflowRunTimeout: durationpb.New(wfStartOpts.WorkflowRunTimeout), + WorkflowTaskTimeout: durationpb.New(wfStartOpts.WorkflowTaskTimeout), + SignalName: c.SignalName, + SignalInput: signalInput, + Identity: clientIdentity(), + RetryPolicy: retryPolicy, + CronSchedule: wfStartOpts.CronSchedule, + Memo: memo, + SearchAttributes: searchAttr, + WorkflowIdReusePolicy: wfStartOpts.WorkflowIDReusePolicy, + WorkflowIdConflictPolicy: wfStartOpts.WorkflowIDConflictPolicy, + }, + ) + if err != nil { + return err + } + cctx.Printer.Println(color.MagentaString("Running execution:")) + return cctx.Printer.PrintStructured(struct { + WorkflowId string `json:"workflowId"` + RunId string `json:"runId"` + Type string `json:"type"` + Namespace string `json:"namespace"` + TaskQueue string `json:"taskQueue"` + }{ + WorkflowId: c.WorkflowId, + RunId: resp.RunId, + Type: c.Type, + Namespace: c.Parent.Namespace, + TaskQueue: c.TaskQueue, + }, printer.StructuredOptions{}) +} + type workflowJSONResult struct { WorkflowId string `json:"workflowId"` RunId string `json:"runId"` diff --git a/temporalcli/commands.workflow_exec_test.go b/temporalcli/commands.workflow_exec_test.go index 88a0a6d7..e706d946 100644 --- a/temporalcli/commands.workflow_exec_test.go +++ b/temporalcli/commands.workflow_exec_test.go @@ -664,3 +664,104 @@ func (s *SharedServerSuite) TestWorkflow_Execute_NullValue() { s.ContainsOnSameLine(out, "Status", "COMPLETED") s.ContainsOnSameLine(out, "Result", `{"foo":null}`) } + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_WorkflowIDMandatory() { + res := s.Execute( + "workflow", "signal-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--signal-name", "sigName", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { + wfId := uuid.NewString() + + // Send signal-with-start command. + res := s.Execute( + "workflow", "signal-with-start", + "--address", s.Address(), + "--workflow-id", wfId, + "--type", "DevWorkflow", + "--input", `{"wf-signal-with-start": "workflow-input"}`, + "--task-queue", "tq", + "--signal-name", "sigName", + "--signal-input", `{"signal-with-start": "signal-input"}`, + ) + + s.NoError(res.Err) + + // Confirm text output has key/vals as expected + out := res.Stdout.String() + s.ContainsOnSameLine(out, "WorkflowId", wfId) + s.Contains(out, "RunId") + s.ContainsOnSameLine(out, "TaskQueue", "tq") + s.ContainsOnSameLine(out, "Type", "DevWorkflow") + s.ContainsOnSameLine(out, "Namespace", "default") + + // Check that new workflow was started with expected workflow ID. + run := s.Client.GetWorkflow(s.Context, wfId, "") + s.Equal(wfId, run.GetID()) + + // Run workflow, block on signal. + var sigReceived any + s.StartDevWorker(s.t, DevWorkerOptions{TaskQueue: "tq"}).OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + return wfInput, nil + }) + + // Wait for workflow to complete. + var wfReturn any + err := s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) + s.NoError(err) + + // Expect workflow to have received signal and given inputs from signal-with-start. + s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) + s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn) +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { + // Run workflow, block on signal. + var sigReceived any + s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + return wfInput, nil + }) + + // Start workflow + run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") + s.NoError(err) + + wfId := run.GetID() + + // Send signal-with-start command. + res := s.Execute( + "workflow", "signal-with-start", + "--address", s.Address(), + "--workflow-id", wfId, + "--type", "DevWorkflow", + "--input", `{"workflow": "workflow-input"}`, + "--task-queue", s.Worker().Options.TaskQueue, + "--signal-name", "sigName", + "--signal-input", `{"signal-with-start": "signal-input"}`, + ) + s.NoError(res.Err) + + // Confirm text output has key/vals as expected + out := res.Stdout.String() + s.ContainsOnSameLine(out, "WorkflowId", wfId) + s.Contains(out, "RunId") + s.ContainsOnSameLine(out, "TaskQueue", s.Worker().Options.TaskQueue) + s.ContainsOnSameLine(out, "Type", "DevWorkflow") + s.ContainsOnSameLine(out, "Namespace", "default") + + // Wait for workflow to complete. + var ret any + s.NoError(run.Get(s.Context, &ret)) + + // Expect workflow to have not been started by the signal-with-start command. + s.Equal("not-signal-with-start-input", ret) + // Expect signal to have been received with given input. + s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) +} diff --git a/temporalcli/commands.workflow_view.go b/temporalcli/commands.workflow_view.go index f416a82a..a1b96932 100644 --- a/temporalcli/commands.workflow_view.go +++ b/temporalcli/commands.workflow_view.go @@ -303,9 +303,7 @@ func (c *TemporalWorkflowListCommand) run(cctx *CommandContext, _ []string) erro cctx.Printer.StartList() defer cctx.Printer.EndList() - // Build request and start looping. We always use default page size regardless - // of user-defined limit, because we're ok w/ extra page data and the default - // is not clearly defined. + // Build request and start looping. pageFetcher := c.pageFetcher(cctx, cl) var nextPageToken []byte var execsProcessed int @@ -357,16 +355,22 @@ func (c *TemporalWorkflowListCommand) pageFetcher( cctx *CommandContext, cl client.Client, ) func(next []byte) (workflowPage, error) { + + if c.Limit > 0 && c.Limit < c.PageSize { + c.PageSize = c.Limit + } return func(next []byte) (workflowPage, error) { if c.Archived { return cl.ListArchivedWorkflow(cctx, &workflowservice.ListArchivedWorkflowExecutionsRequest{ Query: c.Query, NextPageToken: next, + PageSize: int32(c.PageSize), }) } return cl.ListWorkflow(cctx, &workflowservice.ListWorkflowExecutionsRequest{ Query: c.Query, NextPageToken: next, + PageSize: int32(c.PageSize), }) } } diff --git a/temporalcli/commands.workflow_view_test.go b/temporalcli/commands.workflow_view_test.go index 3348012c..fe169bbd 100644 --- a/temporalcli/commands.workflow_view_test.go +++ b/temporalcli/commands.workflow_view_test.go @@ -442,10 +442,12 @@ func (s *SharedServerSuite) TestWorkflow_List() { "workflow", "list", "--address", s.Address(), "--query", fmt.Sprintf(`TaskQueue="%s"`, s.Worker().Options.TaskQueue), + "--page-size", "1", ) s.NoError(res.Err) out := res.Stdout.String() s.ContainsOnSameLine(out, "Completed", "DevWorkflow") + s.Equal(3, strings.Count(out, "DevWorkflow")) // JSON res = s.Execute( diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 21e4b3c5..85ec87d2 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -3138,6 +3138,9 @@ commands: - name: limit type: int description: Maximum number of Workflow Executions to display. + - name: page-size + type: int + description: Maximum number of Workflow Executions to fetch at a time from the server. - name: temporal workflow metadata summary: Query the Workflow for user-specified metadata @@ -3402,6 +3405,59 @@ commands: aliases: - type + - name: temporal workflow signal-with-start + summary: Send a message to a Workflow Execution, start the execution if it isn't running + description: | + Send an asynchronous notification (Signal) to a Workflow Execution. + If the Workflow Execution is not running or is not found, it starts the + workflow then sends the signal. + + ``` + temporal workflow signal-with-start \ + --signal-name YourSignal \ + --signal-input '{"some-key": "some-value"}' \ + --workflow-id YourWorkflowId \ + --type YourWorkflowType \ + --task-queue YourTaskQueue \ + --input '{"some-key": "some-value"}' + ``` + option-sets: + # workflow-id is "required" (runtime check) + - shared-workflow-start + - workflow-start + - payload-input + options: + - name: signal-name + type: string + description: Signal name. + required: true + aliases: + - signal-type + - name: signal-input + type: string[] + description: | + Signal input value. + Use JSON content or set --signal-input-meta to override. + Can't be combined with --signal-input-file. + Can be passed multiple times to pass multiple arguments. + - name: signal-input-file + type: string[] + description: | + A path or paths for input file(s). + Use JSON content or set --signal-input-meta to override. + Can't be combined with --signal-input. + Can be passed multiple times to pass multiple arguments. + - name: signal-input-meta + type: string[] + description: | + Input signal payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + Can be passed multiple times. + - name: signal-input-base64 + type: bool + description: | + Assume signal inputs are base64-encoded and attempt to decode them. + - name: temporal workflow stack summary: Trace a Workflow Execution description: | @@ -3654,7 +3710,7 @@ option-sets: type: string[] description: | HTTP headers for requests. - format as a `KEY=VALUE` pair + Format as a `KEY=VALUE` pair. May be passed multiple times to set multiple headers. - name: tls type: bool @@ -3714,6 +3770,12 @@ option-sets: type: string description: Authorization header for Codec Server requests. env: TEMPORAL_CODEC_AUTH + - name: codec-header + type: string[] + description: | + HTTP headers for requests to codec server. + Format as a `KEY=VALUE` pair. + May be passed multiple times to set multiple headers. - name: overlap-policy options: @@ -3965,6 +4027,7 @@ option-sets: - Fail - UseExisting - TerminateExisting + - name: payload-input options: - name: input diff --git a/temporalcli/internal/cmd/gen-commands/main.go b/temporalcli/internal/cmd/gen-commands/main.go index 00df1848..817ff296 100644 --- a/temporalcli/internal/cmd/gen-commands/main.go +++ b/temporalcli/internal/cmd/gen-commands/main.go @@ -21,7 +21,7 @@ func run() error { _, file, _, _ := runtime.Caller(0) commandsDir := filepath.Join(file, "../../../../") - // Parse markdown + // Parse YAML cmds, err := commandsgen.ParseCommands() if err != nil { return fmt.Errorf("failed parsing markdown: %w", err)