Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebase to main #768

Merged
merged 9 commits into from
Feb 27, 2025
39 changes: 28 additions & 11 deletions temporalcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,25 @@ func (c *ClientOptions) dialClient(cctx *CommandContext) (client.Client, error)

// Headers
if len(c.GrpcMeta) > 0 {
headers := make(stringMapHeadersProvider, len(c.GrpcMeta))
for _, kv := range c.GrpcMeta {
pieces := strings.SplitN(kv, "=", 2)
if len(pieces) != 2 {
return nil, fmt.Errorf("gRPC meta of %q does not have '='", kv)
}
headers[pieces[0]] = pieces[1]
headers, err := NewStringMapHeaderProvider(c.GrpcMeta)
if err != nil {
return nil, fmt.Errorf("grpc-meta %s", err)
}
clientOptions.HeadersProvider = headers
}

// Remote codec
if c.CodecEndpoint != "" {
interceptor, err := payloadCodecInterceptor(c.Namespace, c.CodecEndpoint, c.CodecAuth)
codecHeaders, err := NewStringMapHeaderProvider(c.CodecHeader)
if err != nil {
return nil, fmt.Errorf("codec-header %s", err)
}

if c.CodecAuth != "" {
codecHeaders["Authorization"] = c.CodecAuth
}

interceptor, err := payloadCodecInterceptor(c.Namespace, c.CodecEndpoint, codecHeaders)
if err != nil {
return nil, fmt.Errorf("failed creating payload codec interceptor: %w", err)
}
Expand Down Expand Up @@ -145,16 +150,16 @@ func fixedHeaderOverrideInterceptor(
return invoker(ctx, method, req, reply, cc, opts...)
}

func payloadCodecInterceptor(namespace, codecEndpoint, codecAuth string) (grpc.UnaryClientInterceptor, error) {
func payloadCodecInterceptor(namespace, codecEndpoint string, codecHeaders stringMapHeadersProvider) (grpc.UnaryClientInterceptor, error) {
codecEndpoint = strings.ReplaceAll(codecEndpoint, "{namespace}", namespace)

payloadCodec := converter.NewRemotePayloadCodec(
converter.RemotePayloadCodecOptions{
Endpoint: codecEndpoint,
ModifyRequest: func(req *http.Request) error {
req.Header.Set("X-Namespace", namespace)
if codecAuth != "" {
req.Header.Set("Authorization", codecAuth)
for headerName, headerValue := range codecHeaders {
req.Header.Set(headerName, headerValue)
}
return nil
},
Expand Down Expand Up @@ -185,6 +190,18 @@ func (s stringMapHeadersProvider) GetHeaders(context.Context) (map[string]string
return s, nil
}

func NewStringMapHeaderProvider(config []string) (stringMapHeadersProvider, error) {
headers := make(stringMapHeadersProvider, len(config))
for _, kv := range config {
pieces := strings.SplitN(kv, "=", 2)
if len(pieces) != 2 {
return nil, fmt.Errorf("%q does not have '='", kv)
}
headers[pieces[0]] = pieces[1]
}
return headers, nil
}

var DataConverterWithRawValue = converter.NewCompositeDataConverter(
rawValuePayloadConverter{},
converter.NewNilPayloadConverter(),
Expand Down
53 changes: 52 additions & 1 deletion temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ClientOptions struct {
TlsServerName string
CodecEndpoint string
CodecAuth string
CodecHeader []string
}

func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
Expand All @@ -41,7 +42,7 @@ func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
cctx.BindFlagEnvVar(f.Lookup("namespace"), "TEMPORAL_NAMESPACE")
f.StringVar(&v.ApiKey, "api-key", "", "API key for request.")
cctx.BindFlagEnvVar(f.Lookup("api-key"), "TEMPORAL_API_KEY")
f.StringArrayVar(&v.GrpcMeta, "grpc-meta", nil, "HTTP headers for requests. format as a `KEY=VALUE` pair May be passed multiple times to set multiple headers.")
f.StringArrayVar(&v.GrpcMeta, "grpc-meta", nil, "HTTP headers for requests. Format as a `KEY=VALUE` pair. May be passed multiple times to set multiple headers.")
f.BoolVar(&v.Tls, "tls", false, "Enable base TLS encryption. Does not have additional options like mTLS or client certs.")
cctx.BindFlagEnvVar(f.Lookup("tls"), "TEMPORAL_TLS")
f.StringVar(&v.TlsCertPath, "tls-cert-path", "", "Path to x509 certificate. Can't be used with --tls-cert-data.")
Expand All @@ -64,6 +65,7 @@ func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
cctx.BindFlagEnvVar(f.Lookup("codec-endpoint"), "TEMPORAL_CODEC_ENDPOINT")
f.StringVar(&v.CodecAuth, "codec-auth", "", "Authorization header for Codec Server requests.")
cctx.BindFlagEnvVar(f.Lookup("codec-auth"), "TEMPORAL_CODEC_AUTH")
f.StringArrayVar(&v.CodecHeader, "codec-header", nil, "HTTP headers for requests to codec server. Format as a `KEY=VALUE` pair. May be passed multiple times to set multiple headers.")
}

type OverlapPolicyOptions struct {
Expand Down Expand Up @@ -2861,6 +2863,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) *
s.Command.AddCommand(&NewTemporalWorkflowResultCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowShowCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowSignalCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command)
Expand Down Expand Up @@ -3055,6 +3058,7 @@ type TemporalWorkflowListCommand struct {
Query string
Archived bool
Limit int
PageSize int
}

func NewTemporalWorkflowListCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowListCommand {
Expand All @@ -3072,6 +3076,7 @@ func NewTemporalWorkflowListCommand(cctx *CommandContext, parent *TemporalWorkfl
s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.")
s.Command.Flags().BoolVar(&s.Archived, "archived", false, "Limit output to archived Workflow Executions.")
s.Command.Flags().IntVar(&s.Limit, "limit", 0, "Maximum number of Workflow Executions to display.")
s.Command.Flags().IntVar(&s.PageSize, "page-size", 0, "Maximum number of Workflow Executions to fetch at a time from the server.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down Expand Up @@ -3288,6 +3293,52 @@ func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWork
return &s
}

type TemporalWorkflowSignalWithStartCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
SharedWorkflowStartOptions
WorkflowStartOptions
PayloadInputOptions
SignalName string
SignalInput []string
SignalInputFile []string
SignalInputMeta []string
SignalInputBase64 bool
}

func NewTemporalWorkflowSignalWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowSignalWithStartCommand {
var s TemporalWorkflowSignalWithStartCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "signal-with-start [flags]"
s.Command.Short = "Send a message to a Workflow Execution, start the execution if it isn't running"
if hasHighlighting {
s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n\x1b[1mtemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m"
} else {
s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n```\ntemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.SignalName, "signal-name", "", "Signal name. Required. Aliased as \"--signal-type\".")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "signal-name")
s.Command.Flags().StringArrayVar(&s.SignalInput, "signal-input", nil, "Signal input value. Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input-file. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.SignalInputFile, "signal-input-file", nil, "A path or paths for input file(s). Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.SignalInputMeta, "signal-input-meta", nil, "Input signal payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.")
s.Command.Flags().BoolVar(&s.SignalInputBase64, "signal-input-base64", false, "Assume signal inputs are base64-encoded and attempt to decode them.")
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"name": "type",
"signal-type": "signal-name",
}))
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkflowStackCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
Expand Down
19 changes: 19 additions & 0 deletions temporalcli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/temporalio/cli/temporalcli/internal/printer"
"github.com/temporalio/ui-server/v2/server/version"
"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/failure/v1"
"go.temporal.io/api/temporalproto"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/temporal"
"go.temporal.io/server/common/headers"
"google.golang.org/grpc"
Expand Down Expand Up @@ -589,3 +591,20 @@ func fromApplicationError(err *temporal.ApplicationError) (*structuredError, err
Details: deets,
}, nil
}

func encodeMapToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) {
if len(in) == 0 {
return nil, nil
}
// search attributes always use default dataconverter
dc := converter.GetDefaultDataConverter()
out := make(map[string]*commonpb.Payload, len(in))
for key, val := range in {
payload, err := dc.ToPayload(val)
if err != nil {
return nil, err
}
out[key] = payload
}
return out, nil
}
20 changes: 1 addition & 19 deletions temporalcli/commands.schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
schedpb "go.temporal.io/api/schedule/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/server/common/primitives/timestamp"
)

Expand Down Expand Up @@ -257,7 +256,7 @@ func toScheduleAction(sw *SharedWorkflowStartOptions, i *PayloadInputOptions) (c
if err != nil {
return nil, err
}
untypedSearchAttributes, err := encodeSearchAttributesToPayloads(opts.SearchAttributes)
untypedSearchAttributes, err := encodeMapToPayloads(opts.SearchAttributes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -605,20 +604,3 @@ func formatDuration(d time.Duration) string {
s = strings.TrimSpace(s)
return s
}

func encodeSearchAttributesToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) {
if len(in) == 0 {
return nil, nil
}
// search attributes always use default dataconverter
dc := converter.GetDefaultDataConverter()
out := make(map[string]*commonpb.Payload, len(in))
for key, val := range in {
payload, err := dc.ToPayload(val)
if err != nil {
return nil, err
}
out[key] = payload
}
return out, nil
}
14 changes: 0 additions & 14 deletions temporalcli/commands.worker.deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,6 @@ type jsonDeploymentVersionInfoType struct {
Metadata map[string]*common.Payload `json:"metadata"`
}

/*
type jsonDeploymentReachabilityInfoType struct {
DeploymentInfo jsonDeploymentInfoType `json:"deploymentInfo"`
Reachability string `json:"reachability"`
LastUpdateTime time.Time `json:"lastUpdateTime"`
}

type jsonDeploymentListEntryType struct {
Deployment jsonDeploymentType `json:"deployment"`
CreateTime time.Time `json:"createTime"`
IsCurrent bool `json:"isCurrent"`
}
*/

func (s *SharedServerSuite) TestDeployment_Set_Current_Version() {
deploymentName := uuid.NewString()
buildId := uuid.NewString()
Expand Down
112 changes: 112 additions & 0 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ import (
"time"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/temporalproto"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/types/known/durationpb"
)

func (c *TemporalWorkflowStartCommand) run(cctx *CommandContext, args []string) error {
Expand Down Expand Up @@ -92,6 +98,112 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string
return err
}

func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []string) error {
if c.SharedWorkflowStartOptions.WorkflowId == "" {
return fmt.Errorf("--workflow-id flag must be provided")
}

cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

wfStartOpts, err := buildStartOptions(&c.SharedWorkflowStartOptions, &c.WorkflowStartOptions)
if err != nil {
return err
}
wfInput, err := c.buildRawInputPayloads()
if err != nil {
return err
}

signalPayloadInputOpts := PayloadInputOptions{
Input: c.SignalInput,
InputFile: c.SignalInputFile,
InputMeta: c.InputMeta,
InputBase64: c.SignalInputBase64,
}
signalInput, err := signalPayloadInputOpts.buildRawInputPayloads()
if err != nil {
return err
}

var retryPolicy *common.RetryPolicy
if wfStartOpts.RetryPolicy != nil {
retryPolicy = &commonpb.RetryPolicy{
MaximumInterval: durationpb.New(wfStartOpts.RetryPolicy.MaximumInterval),
InitialInterval: durationpb.New(wfStartOpts.RetryPolicy.InitialInterval),
BackoffCoefficient: wfStartOpts.RetryPolicy.BackoffCoefficient,
MaximumAttempts: wfStartOpts.RetryPolicy.MaximumAttempts,
NonRetryableErrorTypes: wfStartOpts.RetryPolicy.NonRetryableErrorTypes,
}
}
var memo *common.Memo
if wfStartOpts.Memo != nil {
fields, err := encodeMapToPayloads(wfStartOpts.Memo)
if err != nil {
return err
}
memo = &common.Memo{Fields: fields}
}
var searchAttr *common.SearchAttributes
if wfStartOpts.SearchAttributes != nil {
fields, err := encodeMapToPayloads(wfStartOpts.SearchAttributes)
if err != nil {
return err
}
searchAttr = &common.SearchAttributes{IndexedFields: fields}
}

if wfStartOpts.VersioningOverride != (client.VersioningOverride{}) {
cctx.Logger.Warn("VersioningOverride is not configured for the signal-with-start command")
}

// We have to use the raw signal service call here because the Go SDK's
// signal-with-start call doesn't accept multiple signal arguments.
resp, err := cl.WorkflowService().SignalWithStartWorkflowExecution(
cctx,
&workflowservice.SignalWithStartWorkflowExecutionRequest{
Namespace: c.Parent.Namespace,
RequestId: uuid.NewString(),
WorkflowId: c.WorkflowId,
WorkflowType: &common.WorkflowType{Name: c.Type},
TaskQueue: &taskqueuepb.TaskQueue{Name: c.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Input: wfInput,
WorkflowExecutionTimeout: durationpb.New(wfStartOpts.WorkflowExecutionTimeout),
WorkflowRunTimeout: durationpb.New(wfStartOpts.WorkflowRunTimeout),
WorkflowTaskTimeout: durationpb.New(wfStartOpts.WorkflowTaskTimeout),
SignalName: c.SignalName,
SignalInput: signalInput,
Identity: clientIdentity(),
RetryPolicy: retryPolicy,
CronSchedule: wfStartOpts.CronSchedule,
Memo: memo,
SearchAttributes: searchAttr,
WorkflowIdReusePolicy: wfStartOpts.WorkflowIDReusePolicy,
WorkflowIdConflictPolicy: wfStartOpts.WorkflowIDConflictPolicy,
},
)
if err != nil {
return err
}
cctx.Printer.Println(color.MagentaString("Running execution:"))
return cctx.Printer.PrintStructured(struct {
WorkflowId string `json:"workflowId"`
RunId string `json:"runId"`
Type string `json:"type"`
Namespace string `json:"namespace"`
TaskQueue string `json:"taskQueue"`
}{
WorkflowId: c.WorkflowId,
RunId: resp.RunId,
Type: c.Type,
Namespace: c.Parent.Namespace,
TaskQueue: c.TaskQueue,
}, printer.StructuredOptions{})
}

type workflowJSONResult struct {
WorkflowId string `json:"workflowId"`
RunId string `json:"runId"`
Expand Down
Loading
Loading