Skip to content

Commit

Permalink
[apache#26902] Add switch for supporting cross compile arm64 for data…
Browse files Browse the repository at this point in the history
…flow
  • Loading branch information
lostluck committed Jul 25, 2023
1 parent 193b720 commit 27bde86
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 6 deletions.
1 change: 1 addition & 0 deletions sdks/go/container/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ docker {
project.rootProject.hasProperty(["isRelease"])])
buildx project.containerPlatforms() != [project.nativeArchitecture()]
platform(*project.containerPlatforms())
push true
}
dockerPrepare.dependsOn tasks.named("goBuild")

Expand Down
8 changes: 7 additions & 1 deletion sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"os"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
Expand All @@ -47,7 +48,12 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker
} else {
// Cross-compile as last resort.

worker, err := runnerlib.BuildTempWorkerBinary(ctx)
var copts runnerlib.CompileOpts
if strings.HasPrefix(opts.MachineType, "t2a") {
copts.Arch = "arm64"
}

worker, err := runnerlib.BuildTempWorkerBinary(ctx, copts)
if err != nil {
return presult, err
}
Expand Down
24 changes: 20 additions & 4 deletions sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,20 @@ func IsWorkerCompatibleBinary() (string, bool) {

var unique int32

// CompileOpts are additional options for dynamic compiles of the local code
// for development purposes. Production runs should build the worker binary
// separately for the target environment.
// See https://beam.apache.org/documentation/sdks/go-cross-compilation/ for details.
type CompileOpts struct {
OS, Arch string
}

// BuildTempWorkerBinary creates a local worker binary in the tmp directory
// for linux/amd64. Caller responsible for deleting the binary.
func BuildTempWorkerBinary(ctx context.Context) (string, error) {
func BuildTempWorkerBinary(ctx context.Context, opts CompileOpts) (string, error) {
id := atomic.AddInt32(&unique, 1)
filename := filepath.Join(os.TempDir(), fmt.Sprintf("worker-%v-%v", id, time.Now().UnixNano()))
if err := buildWorkerBinary(ctx, filename); err != nil {
if err := buildWorkerBinary(ctx, filename, opts); err != nil {
return "", err
}
return filename, nil
Expand All @@ -59,7 +67,7 @@ func BuildTempWorkerBinary(ctx context.Context) (string, error) {
// * /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go (skip: 3)
// /usr/local/go/src/runtime/proc.go (skip: 4) // not always present
// /usr/local/go/src/runtime/asm_amd64.s (skip: 4 or 5)
func buildWorkerBinary(ctx context.Context, filename string) error {
func buildWorkerBinary(ctx context.Context, filename string, opts CompileOpts) error {
program := ""
var isTest bool
for i := 3; ; i++ {
Expand All @@ -77,9 +85,17 @@ func buildWorkerBinary(ctx context.Context, filename string) error {
}
goos := "linux"
goarch := "amd64"

if opts.OS != "" {
goos = opts.OS
}
if opts.Arch != "" {
goarch = opts.Arch
}

cgo := "0"

log.Infof(ctx, "Cross-compiling %v with GOOS=%s GOARCH=%s CGO_ENABLED=%s as %v", goos, goarch, cgo, program, filename)
log.Infof(ctx, "Cross-compiling %v with GOOS=%s GOARCH=%s CGO_ENABLED=%s as %v", program, goos, goarch, cgo, filename)

// Cross-compile given go program. Not awesome.
program = program[:strings.LastIndex(program, "/")+1]
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO
} else {
// Cross-compile as last resort.

worker, err := BuildTempWorkerBinary(ctx)
worker, err := BuildTempWorkerBinary(ctx, CompileOpts{})
if err != nil {
return presult, err
}
Expand Down
25 changes: 25 additions & 0 deletions sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,31 @@ task dataflowValidatesRunner() {
}
}

// ValidatesRunner tests for Dataflow. Runs tests in the integration directory
// with Dataflow to validate that the runner behaves as expected, on arm64 machines.
task dataflowValidatesRunnerARM64() {
group = "Verification"

dependsOn ":sdks:go:test:goBuild"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"

doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--machine_type=t2a-standard-1",
]
def options = [
"--runner dataflow",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
executable "sh"
args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}"
}
}
}


// ValidatesRunner tests for Flink. Runs tests in the integration directory
// with Flink to validate that the runner behaves as expected.
task flinkValidatesRunner {
Expand Down

0 comments on commit 27bde86

Please sign in to comment.