From 4cb0c1829464624b0fcf3e537b507552c6a31190 Mon Sep 17 00:00:00 2001 From: js-ts Date: Tue, 2 Aug 2022 10:51:33 +0530 Subject: [PATCH 01/26] bacalhau --local --- benchmark/explode.sh | 0 benchmark/quick_read.sh | 0 benchmark/quick_submit.sh | 0 benchmark/single_explode.sh | 0 benchmark/start_and_run.sh | 0 benchmark/submit.sh | 0 cmd/bacalhau/docker_run.go | 88 ++++--- go.mod | 5 +- go.sum | 9 +- pkg/executor/local/executor.go | 418 +++++++++++++++++++++++++++++++++ pkg/executor/types.go | 18 ++ pkg/executor/util/utils.go | 43 ++++ pkg/local/utils.go | 252 ++++++++++++++++++++ 13 files changed, 794 insertions(+), 39 deletions(-) mode change 100755 => 100644 benchmark/explode.sh mode change 100755 => 100644 benchmark/quick_read.sh mode change 100755 => 100644 benchmark/quick_submit.sh mode change 100755 => 100644 benchmark/single_explode.sh mode change 100755 => 100644 benchmark/start_and_run.sh mode change 100755 => 100644 benchmark/submit.sh create mode 100644 pkg/executor/local/executor.go create mode 100644 pkg/local/utils.go diff --git a/benchmark/explode.sh b/benchmark/explode.sh old mode 100755 new mode 100644 diff --git a/benchmark/quick_read.sh b/benchmark/quick_read.sh old mode 100755 new mode 100644 diff --git a/benchmark/quick_submit.sh b/benchmark/quick_submit.sh old mode 100755 new mode 100644 diff --git a/benchmark/single_explode.sh b/benchmark/single_explode.sh old mode 100755 new mode 100644 diff --git a/benchmark/start_and_run.sh b/benchmark/start_and_run.sh old mode 100755 new mode 100644 diff --git a/benchmark/submit.sh b/benchmark/submit.sh old mode 100755 new mode 100644 diff --git a/cmd/bacalhau/docker_run.go b/cmd/bacalhau/docker_run.go index af9d14f4a2..8c43f0f248 100644 --- a/cmd/bacalhau/docker_run.go +++ b/cmd/bacalhau/docker_run.go @@ -13,7 +13,7 @@ import ( "github.com/filecoin-project/bacalhau/pkg/executor" "github.com/filecoin-project/bacalhau/pkg/ipfs" pjob "github.com/filecoin-project/bacalhau/pkg/job" - + "github.com/filecoin-project/bacalhau/pkg/local" "github.com/filecoin-project/bacalhau/pkg/system" "github.com/filecoin-project/bacalhau/pkg/verifier" "github.com/rs/zerolog/log" @@ -36,6 +36,7 @@ var jobCPU string var jobMemory string var jobGPU string var skipSyntaxChecking bool +var isLocal bool var waitForJobToFinishAndPrintOutput bool var jobLabels []string @@ -353,6 +354,11 @@ func init() { // nolint:gochecknoinits // Using init in cobra command is idomati `Wait For Job To Finish And Print Output`, ) + dockerRunCmd.PersistentFlags().BoolVar( + &isLocal, "local", false, + `Run the job locally. Docker is required`, + ) + // ipfs get wait time dockerRunCmd.PersistentFlags().IntVarP( &jobIpfsGetTimeOut, "gettimeout", "g", 10, //nolint: gomnd @@ -454,47 +460,61 @@ var dockerRunCmd = &cobra.Command{ } } - job, err := getAPIClient().Submit(ctx, spec, deal, nil) - if err != nil { - return err - } - - states, err := getAPIClient().GetExecutionStates(ctx, job.ID) - if err != nil { - return err - } - - cmd.Printf("%s\n", job.ID) - currentNodeID, _ := pjob.GetCurrentJobState(states) - nodeIds := []string{currentNodeID} - - // TODO: #424 Should we refactor all this waiting out? I worry about putting this all here \ - // feels like we're overloading the surface of the CLI command a lot. - if waitForJobToFinishAndPrintOutput { - err = WaitForJob(ctx, job.ID, job, - WaitForJobThrowErrors(job, []executor.JobStateType{ - executor.JobStateCancelled, - executor.JobStateError, - }), - WaitForJobAllHaveState(nodeIds, executor.JobStateComplete), - ) + if isLocal { + client, err := local.NewDockerClient() if err != nil { + cmd.Printf("%t\n", local.IsInstalled(client)) return err } + std, err := local.RunJobLocally(ctx, spec) + cmd.Printf("%v", std) - cidl := Get(job.ID, jobIpfsGetTimeOut, jobLocalOutput) - - // TODO: #425 Can you explain what the below is doing? Please comment. - var cidv string - for cid := range cidl { - cidv = filepath.Join(jobLocalOutput, cid) + if err != nil { + return err } - body, err := os.ReadFile(cidv + "/stdout") + } else { + job, err := getAPIClient().Submit(ctx, spec, deal, nil) if err != nil { return err } - fmt.Println() - fmt.Println(string(body)) + + states, err := getAPIClient().GetExecutionStates(ctx, job.ID) + if err != nil { + return err + } + + cmd.Printf("%s\n", job.ID) + currentNodeID, _ := pjob.GetCurrentJobState(states) + nodeIds := []string{currentNodeID} + + // TODO: #424 Should we refactor all this waiting out? I worry about putting this all here \ + // feels like we're overloading the surface of the CLI command a lot. + if waitForJobToFinishAndPrintOutput { + err = WaitForJob(ctx, job.ID, job, + WaitForJobThrowErrors(job, []executor.JobStateType{ + executor.JobStateCancelled, + executor.JobStateError, + }), + WaitForJobAllHaveState(nodeIds, executor.JobStateComplete), + ) + if err != nil { + return err + } + + cidl := Get(job.ID, jobIpfsGetTimeOut, jobLocalOutput) + + // TODO: #425 Can you explain what the below is doing? Please comment. + var cidv string + for cid := range cidl { + cidv = filepath.Join(jobLocalOutput, cid) + } + body, err := os.ReadFile(cidv + "/stdout") + if err != nil { + return err + } + fmt.Println() + fmt.Println(string(body)) + } } return nil diff --git a/go.mod b/go.mod index 241944b814..4c1b5b7f47 100644 --- a/go.mod +++ b/go.mod @@ -206,7 +206,7 @@ require ( github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect github.com/miekg/dns v1.1.48 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect @@ -231,7 +231,7 @@ require ( github.com/onsi/ginkgo v1.16.5 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect - github.com/opencontainers/runtime-spec v1.0.2 // indirect + github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/openzipkin/zipkin-go v0.4.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect @@ -289,6 +289,7 @@ require ( golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e // indirect golang.org/x/text v0.3.7 // indirect + golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index a273417c38..8e8afb848a 100644 --- a/go.sum +++ b/go.sum @@ -1206,8 +1206,9 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= -github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= +github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= @@ -1375,8 +1376,9 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= -github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= +github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 h1:3snG66yBm59tKhhSPQrQ/0bCrv1LQbKt40LnUPiUxdc= +github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -2040,8 +2042,9 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s= +golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pkg/executor/local/executor.go b/pkg/executor/local/executor.go new file mode 100644 index 0000000000..0237ef2317 --- /dev/null +++ b/pkg/executor/local/executor.go @@ -0,0 +1,418 @@ +package local + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "runtime/debug" + + "github.com/google/uuid" + + dockertypes "github.com/docker/docker/api/types" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" + "github.com/docker/docker/api/types/network" + dockerclient "github.com/docker/docker/client" + "github.com/filecoin-project/bacalhau/pkg/capacitymanager" + "github.com/filecoin-project/bacalhau/pkg/config" + "github.com/filecoin-project/bacalhau/pkg/docker" + "github.com/filecoin-project/bacalhau/pkg/executor" + "github.com/filecoin-project/bacalhau/pkg/storage" + "github.com/filecoin-project/bacalhau/pkg/storage/util" + "github.com/filecoin-project/bacalhau/pkg/system" + "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel/trace" +) + +const NanoCPUCoefficient = 1000000000 + +type Local struct { + ID string + + jobID string + + ResultsDir string + + StorageProviders map[storage.StorageSourceType]storage.StorageProvider + + Client *dockerclient.Client +} + +// var jobID string + +func NewExecutor( + cm *system.CleanupManager, + id string, + storageProviders map[storage.StorageSourceType]storage.StorageProvider, +) (*Local, error) { + dockerClient, err := docker.NewDockerClient() + jobID := genId() + if err != nil { + return nil, err + } + + dir, err := ioutil.TempDir("", "bacalhau-docker-executor") + if err != nil { + return nil, err + } + + de := &Local{ + ID: id, + jobID: jobID, + ResultsDir: dir, + StorageProviders: storageProviders, + Client: dockerClient, + } + + cm.RegisterCallback(func() error { + de.cleanupAll() + return nil + }) + + return de, nil +} + +// write a function for interactive mode + +func (e *Local) getStorageProvider(ctx context.Context, engine storage.StorageSourceType) (storage.StorageProvider, error) { + return util.GetStorageProvider(ctx, engine, e.StorageProviders) +} + +// IsInstalled checks if docker itself is installed. +func (e *Local) IsInstalled(ctx context.Context) (bool, error) { + return docker.IsInstalled(e.Client), nil +} + +func (e *Local) HasStorageLocally(ctx context.Context, volume storage.StorageSpec) (bool, error) { + ctx, span := newSpan(ctx, "HasStorageLocally") + defer span.End() + + s, err := e.getStorageProvider(ctx, volume.Engine) + if err != nil { + return false, err + } + + return s.HasStorageLocally(ctx, volume) +} + +func (e *Local) GetVolumeSize(ctx context.Context, volume storage.StorageSpec) (uint64, error) { + storageProvider, err := e.getStorageProvider(ctx, volume.Engine) + if err != nil { + return 0, err + } + return storageProvider.GetVolumeSize(ctx, volume) +} + +// TODO: #289 Clean up RunJob +// nolint:funlen,gocyclo // will clean up +func (e *Local) RunJobLocally(ctx context.Context, jobSpec executor.JobSpec) (string, error) { + ctx, span := newSpan(ctx, "RunJob") + defer span.End() + + jobResultsDir, err := e.ensureJobResultsDir() + if err != nil { + return "", err + } + + // the actual mounts we will give to the container + // these are paths for both input and output data + mounts := []mount.Mount{} + + // loop over the job storage inputs and prepare them + for _, inputStorage := range jobSpec.Inputs { + var storageProvider storage.StorageProvider + var volumeMount storage.StorageVolume + storageProvider, err = e.getStorageProvider(ctx, inputStorage.Engine) + if err != nil { + return "", err + } + + volumeMount, err = storageProvider.PrepareStorage(ctx, inputStorage) + if err != nil { + return "", err + } + + if volumeMount.Type == storage.StorageVolumeConnectorBind { + log.Trace().Msgf("Input Volume: %+v %+v", inputStorage, volumeMount) + + mounts = append(mounts, mount.Mount{ + Type: "bind", + + // this is an input volume so is read only + ReadOnly: true, + Source: volumeMount.Source, + Target: volumeMount.Target, + }) + } else { + return "", fmt.Errorf( + "unknown storage volume type: %s", volumeMount.Type) + } + } + + // for this phase of the outputs we ignore the engine because it's just about collecting the + // data from the job and keeping it locally + // the engine property of the output storage spec is how we will "publish" the output volume + // if and when the deal is settled + for _, output := range jobSpec.Outputs { + if output.Name == "" { + return "", fmt.Errorf("output volume has no name: %+v", output) + } + + if output.Path == "" { + return "", fmt.Errorf("output volume has no path: %+v", output) + } + + srcd := fmt.Sprintf("%s/%s", jobResultsDir, output.Name) + err = os.Mkdir(srcd, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W) + if err != nil { + return "", err + } + + log.Trace().Msgf("Output Volume: %+v", output) + + // create a mount so the output data does not need to be copied back to the host + mounts = append(mounts, mount.Mount{ + + Type: "bind", + // this is an output volume so can be written to + ReadOnly: false, + + // we create a named folder in the job results folder for this output + Source: srcd, + + // the path of the output volume is from the perspective of inside the container + Target: output.Path, + }) + } + + if os.Getenv("SKIP_IMAGE_PULL") == "" { + // TODO: #283 work out why this does not work in github actions + // err = docker.PullImage(e.Client, job.Spec.Vm.Image) + var im dockertypes.ImageInspect + im, _, err = e.Client.ImageInspectWithRaw(ctx, jobSpec.Docker.Image) + if err == nil { + log.Debug().Msgf("Not pulling image %s, already have %+v", jobSpec.Docker.Image, im) + } else if dockerclient.IsErrNotFound(err) { + stdout, err := system.RunCommandGetResults( // nolint:govet // shadowing ok + "docker", + []string{"pull", jobSpec.Docker.Image}, + ) + if err != nil { + return "", fmt.Errorf("error pulling %s: %s, %s", jobSpec.Docker.Image, err, stdout) + } + log.Trace().Msgf("Pull image output: %s\n%s", jobSpec.Docker.Image, stdout) + } else { + return "", fmt.Errorf("error checking if we have %s locally: %s", jobSpec.Docker.Image, err) + } + } + + // json the job spec and pass it into all containers + // TODO: check if this will overwrite a user supplied version of this value + // (which is what we actually want to happen) + jsonJobSpec, err := json.Marshal(jobSpec) + if err != nil { + return "", err + } + + useEnv := append(jobSpec.Docker.Env, fmt.Sprintf("BACALHAU_JOB_SPEC=%s", string(jsonJobSpec))) // nolint:gocritic + + containerConfig := &container.Config{ + Image: jobSpec.Docker.Image, + Tty: false, + Env: useEnv, + Entrypoint: jobSpec.Docker.Entrypoint, + Labels: e.jobContainerLabels(), + NetworkDisabled: true, + } + + log.Trace().Msgf("Container: %+v %+v", containerConfig, mounts) + + resourceRequirements := capacitymanager.ParseResourceUsageConfig(jobSpec.Resources) + + // Create GPU request if the job requests it + var deviceRequests []container.DeviceRequest + if resourceRequirements.GPU > 0 { + deviceRequests = append(deviceRequests, + container.DeviceRequest{ + DeviceIDs: []string{"0"}, // TODO: how do we know which device ID to use? + Capabilities: [][]string{{"gpu"}}, + }, + ) + log.Trace().Msgf("Adding %d GPUs to request", resourceRequirements.GPU) + } + + jobContainer, err := e.Client.ContainerCreate( + ctx, + containerConfig, + &container.HostConfig{ + Mounts: mounts, + Resources: container.Resources{ + Memory: int64(resourceRequirements.Memory), + NanoCPUs: int64(resourceRequirements.CPU * NanoCPUCoefficient), + DeviceRequests: deviceRequests, + }, + }, + &network.NetworkingConfig{}, + nil, + e.jobContainerName(), + ) + if err != nil { + return "", fmt.Errorf("failed to create container: %w", err) + } + + err = e.Client.ContainerStart( + ctx, + jobContainer.ID, + dockertypes.ContainerStartOptions{}, + ) + if err != nil { + return "", fmt.Errorf("failed to start container: %w", err) + } + defer e.cleanupJob() + + // the idea here is even if the container errors + // we want to capture stdout, stderr and feed it back to the user + var containerError error + var containerExitStatusCode int64 + statusCh, errCh := e.Client.ContainerWait( + ctx, + jobContainer.ID, + container.WaitConditionNotRunning, + ) + select { + case err = <-errCh: + containerError = err + case exitStatus := <-statusCh: + containerExitStatusCode = exitStatus.StatusCode + if exitStatus.Error != nil { + containerError = errors.New(exitStatus.Error.Message) + } + } + if containerExitStatusCode != 0 { + if containerError == nil { + containerError = fmt.Errorf("exit code was not zero: %d", containerExitStatusCode) + } + log.Info().Msgf("container error %s", containerError) + } + + stdout, stderr, err := system.RunCommandGetStdoutAndStderr( + "docker", + []string{ + "logs", + "-f", + jobContainer.ID, + }, + ) + if err != nil { + return "", fmt.Errorf("failed to get logs: %w", err) + } + + err = os.WriteFile( + fmt.Sprintf("%s/exitCode", jobResultsDir), + []byte(fmt.Sprintf("%d", containerExitStatusCode)), + util.OS_ALL_R|util.OS_USER_RW, + ) + if err != nil { + msg := fmt.Sprintf("could not write results to exitCode: %s", err) + log.Error().Msg(msg) + return "", errors.New(msg) + } + + err = os.WriteFile( + fmt.Sprintf("%s/stdout", jobResultsDir), + []byte(stdout), + util.OS_ALL_R|util.OS_USER_RW, + ) + if err != nil { + msg := fmt.Sprintf("could not write results to stdout: %s", err) + log.Error().Msg(msg) + return "", errors.New(msg) + } + + err = os.WriteFile( + fmt.Sprintf("%s/stderr", jobResultsDir), + []byte(stderr), + util.OS_ALL_R|util.OS_USER_RW, + ) + if err != nil { + msg := fmt.Sprintf("could not write results to stderr: %s", err) + log.Error().Msg(msg) + return "", errors.New(msg) + } + fmt.Println(stdout) + fmt.Println(stderr) + return jobResultsDir, containerError +} + +func (e *Local) cleanupJob() { + if config.ShouldKeepStack() { + return + } + + err := docker.RemoveContainer(e.Client, e.jobContainerName()) + if err != nil { + log.Error().Msgf("Docker remove container error: %s", err.Error()) + debug.PrintStack() + } +} + +func (e *Local) cleanupAll() { + if config.ShouldKeepStack() { + return + } + + log.Info().Msgf("Cleaning up all bacalhau containers for executor %s...", e.ID) + containersWithLabel, err := docker.GetContainersWithLabel(e.Client, "bacalhau-executor", e.ID) + if err != nil { + log.Error().Msgf("Docker executor stop error: %s", err.Error()) + return + } + // TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop) + // nolint:gocritic // will fix when we care + for _, container := range containersWithLabel { + err = docker.RemoveContainer(e.Client, container.ID) + if err != nil { + log.Error().Msgf("Non-critical error cleaning up container: %s", err.Error()) + } + } +} + +func (e *Local) jobContainerName() string { + return fmt.Sprintf("bacalhau-%s-%s", e.ID, e.jobID) +} + +func (e *Local) jobContainerLabels() map[string]string { + return map[string]string{ + "bacalhau-executor": e.ID, + "bacalhau-jobID": e.jobID, + } +} + +func (e *Local) jobResultsDir() string { + return fmt.Sprintf("%s/%s", e.ResultsDir, e.jobID) +} + +func (e *Local) ensureJobResultsDir() (string, error) { + dir := e.jobResultsDir() + err := os.MkdirAll(dir, util.OS_ALL_RWX) + info, _ := os.Stat(dir) + log.Trace().Msgf("Created job results dir (%s). Permissions: %s", dir, info.Mode()) + return dir, err +} + +func newSpan(ctx context.Context, apiName string) (context.Context, trace.Span) { + return system.Span(ctx, "executor/local", apiName) +} + +func genId() string { + jobUUID, err := uuid.NewRandom() + jobID := jobUUID.String() + if err != nil { + fmt.Printf("error creating job id: %v", err) + } + return jobID +} diff --git a/pkg/executor/types.go b/pkg/executor/types.go index 5c438318cf..f77595fef8 100644 --- a/pkg/executor/types.go +++ b/pkg/executor/types.go @@ -31,6 +31,24 @@ type Executor interface { RunJob(context.Context, Job) (string, error) } +type Local interface { + // tells you if the required software is installed on this machine + // this is used in job selection + IsInstalled(context.Context) (bool, error) + + // used to filter and select jobs + // tells us if the storage resource is "close" i.e. cheap to access + HasStorageLocally(context.Context, storage.StorageSpec) (bool, error) + // tells us how much storage the given volume would consume + // which we then use to calculate if there is capacity + // alongside cpu & memory usage + GetVolumeSize(context.Context, storage.StorageSpec) (uint64, error) + + // run the given job - it's expected that we have already prepared the job + // this will return a local filesystem path to the jobs results + RunJobLocally(context.Context, JobSpec) (string, error) +} + // Job contains data about a job in the bacalhau network. type Job struct { // The unique global ID of this job in the bacalhau network. diff --git a/pkg/executor/util/utils.go b/pkg/executor/util/utils.go index 83a8c61b14..b7d5755069 100644 --- a/pkg/executor/util/utils.go +++ b/pkg/executor/util/utils.go @@ -4,6 +4,7 @@ import ( "github.com/filecoin-project/bacalhau/pkg/executor" "github.com/filecoin-project/bacalhau/pkg/executor/docker" "github.com/filecoin-project/bacalhau/pkg/executor/language" + "github.com/filecoin-project/bacalhau/pkg/executor/local" noop_executor "github.com/filecoin-project/bacalhau/pkg/executor/noop" pythonwasm "github.com/filecoin-project/bacalhau/pkg/executor/python_wasm" "github.com/filecoin-project/bacalhau/pkg/storage" @@ -66,6 +67,48 @@ func NewStandardExecutors( return executors, nil } +func NewLocalStandardExecutors( + cm *system.CleanupManager, + ipfsMultiAddress, + dockerID string, +) (*local.Local, error) { + // Don't allow user to choose the fuse driver in case it has security issues. + // ipfsFuseStorage, err := fusedocker.NewStorageProvider(cm, ipfsMultiAddress) + // if err != nil { + // return nil, err + // } + + ipfsAPICopyStorage, err := apicopy.NewStorageProvider(cm, ipfsMultiAddress) + if err != nil { + return nil, err + } + + urlDownloadStorage, err := urldownload.NewStorageProvider(cm) + if err != nil { + return nil, err + } + + exDocker, err := local.NewExecutor(cm, dockerID, + map[storage.StorageSourceType]storage.StorageProvider{ + // fuse driver is disabled so that - in case it poses a security + // risk - arbitrary users can't request it + // storage.IPFS_FUSE_DOCKER: ipfsFuseStorage, + storage.StorageSourceIPFS: ipfsAPICopyStorage, + // we make the copy driver the "default" storage driver for docker + // users have to specify the fuse driver explicitly + storage.StorageSourceURLDownload: urlDownloadStorage, + }) + if err != nil { + return nil, err + } + + // executors := map[executor.EngineType]local.Local{ + // executor.EngineDocker: exDocker, + // } + + return exDocker, nil +} + // return noop executors for all engines func NewNoopExecutors( cm *system.CleanupManager, diff --git a/pkg/local/utils.go b/pkg/local/utils.go new file mode 100644 index 0000000000..b0ecb2301e --- /dev/null +++ b/pkg/local/utils.go @@ -0,0 +1,252 @@ +package local + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/docker/docker/api/types" + dockerclient "github.com/docker/docker/client" + "github.com/filecoin-project/bacalhau/pkg/config" + "github.com/filecoin-project/bacalhau/pkg/executor" + executor_util "github.com/filecoin-project/bacalhau/pkg/executor/util" + "github.com/filecoin-project/bacalhau/pkg/system" + "github.com/filecoin-project/bacalhau/pkg/transport/libp2p" + "github.com/moby/moby/pkg/stdcopy" + ma "github.com/multiformats/go-multiaddr" + "github.com/rs/zerolog/log" +) + +// ErrContainerMarkedForRemoval indicates that the docker daemon is about to +// delete, or has already deleted, the given container. +var ErrContainerMarkedForRemoval = fmt.Errorf("docker container marked for removal") + +var DefaultBootstrapAddresses = []string{ + "/ip4/35.245.115.191/tcp/1235/p2p/QmdZQ7ZbhnvWY1J12XYKGHApJ6aufKyLNSvf8jZBrBaAVL", + "/ip4/35.245.61.251/tcp/1235/p2p/QmXaXu9N5GNetatsvwnTfQqNtSeKAD6uCmarbh3LMRYAcF", + "/ip4/35.245.251.239/tcp/1235/p2p/QmYgxZiySj3MRkwLSL4X2MF5F9f2PMhAE3LV49XkfNL1o3", +} +var DefaultSwarmPort = 1235 + +func NewDockerClient() (*dockerclient.Client, error) { + return dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) +} + +func IsInstalled(dockerClient *dockerclient.Client) bool { + _, err := dockerClient.Info(context.Background()) + return err == nil +} + +func GetContainer(dockerClient *dockerclient.Client, nameOrID string) (*types.Container, error) { + containers, err := dockerClient.ContainerList(context.Background(), types.ContainerListOptions{ + All: true, + }) + if err != nil { + return nil, err + } + + // TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop) + // nolint:gocritic // will fix when we care + for _, container := range containers { + if container.ID == nameOrID { + return &container, nil + } + + if container.ID[0:11] == nameOrID { + return &container, nil + } + + for _, containerName := range container.Names { + if containerName == fmt.Sprintf("/%s", nameOrID) { + return &container, nil + } + } + } + + return nil, nil +} + +func GetContainersWithLabel(dockerClient *dockerclient.Client, labelName, labelValue string) ([]types.Container, error) { + results := []types.Container{} + containers, err := dockerClient.ContainerList(context.Background(), types.ContainerListOptions{ + All: true, + }) + + if err != nil { + return nil, err + } + // TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop) + // nolint:gocritic // will fix when we care + for _, container := range containers { + value, ok := container.Labels[labelName] + if !ok { + continue + } + if value == labelValue { + results = append(results, container) + } + } + return results, nil +} + +func GetLogs(dockerClient *dockerclient.Client, nameOrID string) (stdout, stderr string, err error) { + container, err := GetContainer(dockerClient, nameOrID) + if err != nil { + return "", "", fmt.Errorf("failed to get container: %w", err) + } + if container == nil { + return "", "", fmt.Errorf("no container found: %s", nameOrID) + } + + logsReader, err := dockerClient.ContainerLogs(context.Background(), container.ID, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + }) + if err != nil { + // String checking is unfortunately the best we have, as errors are + // returned by the docker server as strings, and aren't strongly typed. + if strings.Contains(err.Error(), "can not get logs from container which is dead or marked for removal") { + return "", "", ErrContainerMarkedForRemoval + } + + return "", "", fmt.Errorf("failed to get container logs: %w", err) + } + + stdoutBuffer := bytes.NewBuffer([]byte{}) + stderrBuffer := bytes.NewBuffer([]byte{}) + _, err = stdcopy.StdCopy(stdoutBuffer, stderrBuffer, logsReader) + if err != nil { + return "", "", err + } + + return stdoutBuffer.String(), stderrBuffer.String(), nil +} + +func RemoveContainer(dockerClient *dockerclient.Client, nameOrID string) error { + ctx := context.Background() + + container, err := GetContainer(dockerClient, nameOrID) + if err != nil { + return err + } + if container == nil { + return nil + } + log.Debug().Msgf("Container Stop: %s", container.ID) + timeout := time.Millisecond * 100 + err = dockerClient.ContainerStop(ctx, container.ID, &timeout) + if err != nil { + return err + } + err = dockerClient.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{ + RemoveVolumes: true, + Force: true, + }) + if err != nil { + return err + } + return nil +} + +func WaitForContainer(client *dockerclient.Client, id string, maxAttempts int, delay time.Duration) error { + waiter := &system.FunctionWaiter{ + Name: fmt.Sprintf("wait for container to be running: %s", id), + MaxAttempts: maxAttempts, + Delay: delay, + Handler: func() (bool, error) { + container, err := GetContainer(client, id) + if err != nil { + return false, err + } + if container == nil { + return false, nil + } + return container.State == "running", nil + }, + } + return waiter.Wait() +} + +func WaitForContainerLogs(client *dockerclient.Client, id string, maxAttempts int, delay time.Duration, findString string) (string, error) { + lastLogs := "" + waiter := &system.FunctionWaiter{ + Name: fmt.Sprintf("wait for container to be running: %s", id), + MaxAttempts: maxAttempts, + Delay: delay, + Handler: func() (bool, error) { + container, err := GetContainer(client, id) + if err != nil { + return false, err + } + if container == nil { + return false, nil + } + if container.State != "running" { + return false, nil + } + stdout, stderr, err := GetLogs(client, id) + if err != nil { + return false, err + } + lastLogs = stdout + "\n" + stderr + return strings.Contains(stdout, findString) || strings.Contains(stderr, findString), nil + }, + } + err := waiter.Wait() + return lastLogs, err +} + +func PullImage(dockerClient *dockerclient.Client, image string) error { + imagePullStream, err := dockerClient.ImagePull( + context.Background(), + image, + types.ImagePullOptions{}, + ) + + if err != nil { + return err + } + + if config.IsDebug() { + _, err = io.Copy(os.Stdout, imagePullStream) + if err != nil { + return err + } + } + + return imagePullStream.Close() +} + +func RunJobLocally(ctx context.Context, jobspec executor.JobSpec) (string, error) { + log.Debug().Msgf("in your local docker executor!") + + cm := system.NewCleanupManager() + cm.RegisterCallback(system.CleanupTracer) + defer cm.Cleanup() + + peers := DefaultBootstrapAddresses // Default to connecting to defaults + log.Debug().Msgf("libp2p connecting to: %s", strings.Join(peers, ", ")) + + hostPort := DefaultSwarmPort + transport, err := libp2p.NewTransport(cm, hostPort, peers) + if err != nil { + fmt.Printf("error is : %v", err) + } + hostID, err := transport.HostID(context.Background()) + if err != nil { + fmt.Printf("error is : %v", err) + } + + addrStr := "/ip4/0.0.0.0/tcp/8080" + addr, err := ma.NewMultiaddr(addrStr) + if err != nil { + fmt.Printf("error is : %v", err) + } + e, _ := executor_util.NewLocalStandardExecutors(cm, addr.String(), fmt.Sprintf("bacalhau-%s", hostID)) + return e.RunJobLocally(ctx, jobspec) + +} From 711d48c4afcf0e34b63b8ff64e54346239309448 Mon Sep 17 00:00:00 2001 From: js-ts Date: Wed, 3 Aug 2022 07:05:45 +0530 Subject: [PATCH 02/26] adding test for --local --- cmd/bacalhau/apply_test.go | 2 -- cmd/bacalhau/docker_run_test.go | 17 +++++++++++++++++ cmd/bacalhau/utils.go | 28 ++++++++++++++++++++++++++++ pkg/executor/local/executor.go | 4 ++-- pkg/executor/util/utils.go | 4 ---- 5 files changed, 47 insertions(+), 8 deletions(-) diff --git a/cmd/bacalhau/apply_test.go b/cmd/bacalhau/apply_test.go index 77bab96829..d1ce2458a3 100644 --- a/cmd/bacalhau/apply_test.go +++ b/cmd/bacalhau/apply_test.go @@ -23,7 +23,6 @@ func TestApplySuite(t *testing.T) { suite.Run(t, new(ApplySuite)) } - //before all the suite func (suite *ApplySuite) SetupAllSuite() { @@ -112,4 +111,3 @@ func (suite *ApplySuite) TestApplyYAML_GenericSubmit() { } } } - diff --git a/cmd/bacalhau/docker_run_test.go b/cmd/bacalhau/docker_run_test.go index 91372e93f9..997e2dda4f 100644 --- a/cmd/bacalhau/docker_run_test.go +++ b/cmd/bacalhau/docker_run_test.go @@ -177,6 +177,23 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitWait() { } } +func (suite *DockerRunSuite) TestRun_GenericSubmitLocal() { + args := []string{"docker", "run", "ubuntu", "echo", "hello", "--local"} + expectedStdout := "hello" + done := capture() + _, _, err := ExecuteTestCobraCommand(suite.T(), suite.rootCmd, args...) + out, _ := done() + + if err != nil { + fmt.Println(err.Error()) + } + trimmedStdout := strings.TrimSpace(string(out)) + fmt.Println(trimmedStdout) + + require.Equal(suite.T(), trimmedStdout, expectedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) + +} + func (suite *DockerRunSuite) TestRun_SubmitInputs() { tests := []struct { numberOfJobs int diff --git a/cmd/bacalhau/utils.go b/cmd/bacalhau/utils.go index 609a9e59a1..5f3d6d1940 100644 --- a/cmd/bacalhau/utils.go +++ b/cmd/bacalhau/utils.go @@ -3,6 +3,7 @@ package bacalhau import ( "bytes" "fmt" + "io" "os" "strings" "testing" @@ -102,6 +103,33 @@ func ReverseList(s []string) []string { return s } +func capture() func() (string, error) { + r, w, err := os.Pipe() + if err != nil { + panic(err) + } + + done := make(chan error, 1) + + save := os.Stdout + os.Stdout = w + + var buf strings.Builder + + go func() { + _, err := io.Copy(&buf, r) + r.Close() + done <- err + }() + + return func() (string, error) { + os.Stdout = save + w.Close() + err := <-done + return buf.String(), err + } +} + // func RandInt(i int) int { // n, err := rand.Int(rand.Reader, big.NewInt(int64(i))) // if err != nil { diff --git a/pkg/executor/local/executor.go b/pkg/executor/local/executor.go index 0237ef2317..584c791f93 100644 --- a/pkg/executor/local/executor.go +++ b/pkg/executor/local/executor.go @@ -50,7 +50,7 @@ func NewExecutor( storageProviders map[storage.StorageSourceType]storage.StorageProvider, ) (*Local, error) { dockerClient, err := docker.NewDockerClient() - jobID := genId() + jobID := genID() if err != nil { return nil, err } @@ -408,7 +408,7 @@ func newSpan(ctx context.Context, apiName string) (context.Context, trace.Span) return system.Span(ctx, "executor/local", apiName) } -func genId() string { +func genID() string { jobUUID, err := uuid.NewRandom() jobID := jobUUID.String() if err != nil { diff --git a/pkg/executor/util/utils.go b/pkg/executor/util/utils.go index b7d5755069..22d48772ae 100644 --- a/pkg/executor/util/utils.go +++ b/pkg/executor/util/utils.go @@ -102,10 +102,6 @@ func NewLocalStandardExecutors( return nil, err } - // executors := map[executor.EngineType]local.Local{ - // executor.EngineDocker: exDocker, - // } - return exDocker, nil } From 7df27a441985defd81b06d2aec893f1f89f0867b Mon Sep 17 00:00:00 2001 From: js-ts Date: Wed, 3 Aug 2022 07:10:04 +0530 Subject: [PATCH 03/26] //nolint:unused,deadcode --- cmd/bacalhau/utils.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/bacalhau/utils.go b/cmd/bacalhau/utils.go index 5f3d6d1940..5ba9fca5f1 100644 --- a/cmd/bacalhau/utils.go +++ b/cmd/bacalhau/utils.go @@ -103,6 +103,7 @@ func ReverseList(s []string) []string { return s } +//nolint:unused,deadcode func capture() func() (string, error) { r, w, err := os.Pipe() if err != nil { From 7e610745a58820ae5008891c5723ab667d578b21 Mon Sep 17 00:00:00 2001 From: js-ts Date: Wed, 3 Aug 2022 07:19:11 +0530 Subject: [PATCH 04/26] removing line --- pkg/local/utils.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/local/utils.go b/pkg/local/utils.go index b0ecb2301e..72e66c0e89 100644 --- a/pkg/local/utils.go +++ b/pkg/local/utils.go @@ -248,5 +248,4 @@ func RunJobLocally(ctx context.Context, jobspec executor.JobSpec) (string, error } e, _ := executor_util.NewLocalStandardExecutors(cm, addr.String(), fmt.Sprintf("bacalhau-%s", hostID)) return e.RunJobLocally(ctx, jobspec) - } From b010732d059549cc62a9ae05c7648eaf6a827156 Mon Sep 17 00:00:00 2001 From: js-ts Date: Wed, 3 Aug 2022 13:30:13 +0530 Subject: [PATCH 05/26] add require.Noeror --- cmd/bacalhau/docker_run_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/bacalhau/docker_run_test.go b/cmd/bacalhau/docker_run_test.go index 997e2dda4f..683d6cfaac 100644 --- a/cmd/bacalhau/docker_run_test.go +++ b/cmd/bacalhau/docker_run_test.go @@ -184,9 +184,7 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitLocal() { _, _, err := ExecuteTestCobraCommand(suite.T(), suite.rootCmd, args...) out, _ := done() - if err != nil { - fmt.Println(err.Error()) - } + require.NoError(suite.T(), err) trimmedStdout := strings.TrimSpace(string(out)) fmt.Println(trimmedStdout) From eff3486ece33c0da3d758c65ee7930de0003debc Mon Sep 17 00:00:00 2001 From: vedant Date: Wed, 3 Aug 2022 14:57:19 +0530 Subject: [PATCH 06/26] chainging permissions --- benchmark/explode.sh | 0 benchmark/quick_read.sh | 0 benchmark/quick_submit.sh | 0 benchmark/single_explode.sh | 0 benchmark/start_and_run.sh | 0 benchmark/submit.sh | 0 6 files changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 benchmark/explode.sh mode change 100644 => 100755 benchmark/quick_read.sh mode change 100644 => 100755 benchmark/quick_submit.sh mode change 100644 => 100755 benchmark/single_explode.sh mode change 100644 => 100755 benchmark/start_and_run.sh mode change 100644 => 100755 benchmark/submit.sh diff --git a/benchmark/explode.sh b/benchmark/explode.sh old mode 100644 new mode 100755 diff --git a/benchmark/quick_read.sh b/benchmark/quick_read.sh old mode 100644 new mode 100755 diff --git a/benchmark/quick_submit.sh b/benchmark/quick_submit.sh old mode 100644 new mode 100755 diff --git a/benchmark/single_explode.sh b/benchmark/single_explode.sh old mode 100644 new mode 100755 diff --git a/benchmark/start_and_run.sh b/benchmark/start_and_run.sh old mode 100644 new mode 100755 diff --git a/benchmark/submit.sh b/benchmark/submit.sh old mode 100644 new mode 100755 From 970a1943599b1dd31f15a4a0bbc9987d5ba7f91e Mon Sep 17 00:00:00 2001 From: js-ts Date: Wed, 3 Aug 2022 18:35:33 +0530 Subject: [PATCH 07/26] adding comments, and default for isLocal --- cmd/bacalhau/docker_run.go | 1 + cmd/bacalhau/utils.go | 9 +++++++++ pkg/executor/local/executor.go | 1 + 3 files changed, 11 insertions(+) diff --git a/cmd/bacalhau/docker_run.go b/cmd/bacalhau/docker_run.go index 8c43f0f248..921393bf22 100644 --- a/cmd/bacalhau/docker_run.go +++ b/cmd/bacalhau/docker_run.go @@ -397,6 +397,7 @@ var dockerRunCmd = &cobra.Command{ jobCPU = "" jobMemory = "" jobGPU = "" + isLocal = false skipSyntaxChecking = false waitForJobToFinishAndPrintOutput = false jobIpfsGetTimeOut = 10 diff --git a/cmd/bacalhau/utils.go b/cmd/bacalhau/utils.go index 5ba9fca5f1..0fa4abb9ef 100644 --- a/cmd/bacalhau/utils.go +++ b/cmd/bacalhau/utils.go @@ -103,6 +103,15 @@ func ReverseList(s []string) []string { return s } +// this function captures the output of all functions running in it between capture() and done() +// example: +// done := capture() +// fmt.Println("hello") +// s, _ := done() +// after trimming str := strings.TrimSpace(s) it will return "hello" +// so if we want to compare the output in the console with a expected output like "hello" we could do that +// this is mainly used in testing --local +// go playground link https://go.dev/play/p/cuGIaIorWfD //nolint:unused,deadcode func capture() func() (string, error) { r, w, err := os.Pipe() diff --git a/pkg/executor/local/executor.go b/pkg/executor/local/executor.go index 584c791f93..a24a99f81b 100644 --- a/pkg/executor/local/executor.go +++ b/pkg/executor/local/executor.go @@ -76,6 +76,7 @@ func NewExecutor( return de, nil } +// to do // write a function for interactive mode func (e *Local) getStorageProvider(ctx context.Context, engine storage.StorageSourceType) (storage.StorageProvider, error) { From a65907fbce3a7eaaa6e217fd7c9866564e69fc0b Mon Sep 17 00:00:00 2001 From: js-ts Date: Fri, 5 Aug 2022 22:57:21 +0530 Subject: [PATCH 08/26] Make Input Volumes Work --- pkg/executor/local/executor.go | 2 +- pkg/local/utils.go | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/executor/local/executor.go b/pkg/executor/local/executor.go index a24a99f81b..1746d66da6 100644 --- a/pkg/executor/local/executor.go +++ b/pkg/executor/local/executor.go @@ -111,7 +111,7 @@ func (e *Local) GetVolumeSize(ctx context.Context, volume storage.StorageSpec) ( // TODO: #289 Clean up RunJob // nolint:funlen,gocyclo // will clean up func (e *Local) RunJobLocally(ctx context.Context, jobSpec executor.JobSpec) (string, error) { - ctx, span := newSpan(ctx, "RunJob") + ctx, span := newSpan(ctx, "RunJobLocally") defer span.End() jobResultsDir, err := e.ensureJobResultsDir() diff --git a/pkg/local/utils.go b/pkg/local/utils.go index 72e66c0e89..46fd33daab 100644 --- a/pkg/local/utils.go +++ b/pkg/local/utils.go @@ -17,7 +17,6 @@ import ( "github.com/filecoin-project/bacalhau/pkg/system" "github.com/filecoin-project/bacalhau/pkg/transport/libp2p" "github.com/moby/moby/pkg/stdcopy" - ma "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog/log" ) @@ -241,11 +240,10 @@ func RunJobLocally(ctx context.Context, jobspec executor.JobSpec) (string, error fmt.Printf("error is : %v", err) } - addrStr := "/ip4/0.0.0.0/tcp/8080" - addr, err := ma.NewMultiaddr(addrStr) + addrStr := "/ip4/127.0.0.1/tcp/5001" if err != nil { fmt.Printf("error is : %v", err) } - e, _ := executor_util.NewLocalStandardExecutors(cm, addr.String(), fmt.Sprintf("bacalhau-%s", hostID)) + e, _ := executor_util.NewLocalStandardExecutors(cm, addrStr, fmt.Sprintf("bacalhau-%s", hostID)) return e.RunJobLocally(ctx, jobspec) } From 0e8305b8c98e580ba7e3620c8488571e363f20ab Mon Sep 17 00:00:00 2001 From: js-ts Date: Sat, 6 Aug 2022 14:16:51 +0530 Subject: [PATCH 09/26] resolving merge errors --- cmd/bacalhau/docker_run.go | 97 ++++++++++++-------------------------- cmd/bacalhau/utils.go | 2 +- 2 files changed, 30 insertions(+), 69 deletions(-) diff --git a/cmd/bacalhau/docker_run.go b/cmd/bacalhau/docker_run.go index fc6cddd64c..1e4dd460c5 100644 --- a/cmd/bacalhau/docker_run.go +++ b/cmd/bacalhau/docker_run.go @@ -10,9 +10,9 @@ import ( "github.com/filecoin-project/bacalhau/pkg/executor" "github.com/filecoin-project/bacalhau/pkg/ipfs" - pjob "github.com/filecoin-project/bacalhau/pkg/job" - "github.com/filecoin-project/bacalhau/pkg/local" jobutils "github.com/filecoin-project/bacalhau/pkg/job" + "github.com/filecoin-project/bacalhau/pkg/local" + "github.com/filecoin-project/bacalhau/pkg/system" "github.com/filecoin-project/bacalhau/pkg/verifier" "github.com/rs/zerolog/log" @@ -35,7 +35,6 @@ var jobMemory string var jobGPU string var jobWorkingDir string var skipSyntaxChecking bool - var isLocal bool var waitForJobToFinish bool var waitForJobToFinishAndPrintOutput bool @@ -271,19 +270,8 @@ var dockerRunCmd = &cobra.Command{ return err } } - if isLocal { client, err := local.NewDockerClient() - job, err := getAPIClient().Submit(ctx, spec, deal, nil) - if err != nil { - return err - } - - cmd.Printf("%s\n", job.ID) - if waitForJobToFinish { - resolver := getAPIClient().GetJobStateResolver() - resolver.SetWaitTime(waitForJobTimeoutSecs, time.Second*1) - err = resolver.WaitUntilComplete(ctx, job.ID) if err != nil { cmd.Printf("%t\n", local.IsInstalled(client)) return err @@ -299,67 +287,40 @@ var dockerRunCmd = &cobra.Command{ if err != nil { return err } - states, err := getAPIClient().GetExecutionStates(ctx, job.ID) - if err != nil { - return err - } cmd.Printf("%s\n", job.ID) - currentNodeID, _ := pjob.GetCurrentJobState(states) - nodeIds := []string{currentNodeID} - - // TODO: #424 Should we refactor all this waiting out? I worry about putting this all here \ - // feels like we're overloading the surface of the CLI command a lot. - if waitForJobToFinishAndPrintOutput { - err = WaitForJob(ctx, job.ID, job, - WaitForJobThrowErrors(job, []executor.JobStateType{ - executor.JobStateCancelled, - executor.JobStateError, - }), - WaitForJobAllHaveState(nodeIds, executor.JobStateComplete), - ) + if waitForJobToFinish { + resolver := getAPIClient().GetJobStateResolver() + resolver.SetWaitTime(waitForJobTimeoutSecs, time.Second*1) + err = resolver.WaitUntilComplete(ctx, job.ID) if err != nil { return err } - cidl := Get(job.ID, jobIpfsGetTimeOut, jobLocalOutput) - - // TODO: #425 Can you explain what the below is doing? Please comment. - var cidv string - for cid := range cidl { - cidv = filepath.Join(jobLocalOutput, cid) - } - body, err := os.ReadFile(cidv + "/stdout") - if err != nil { - return err - } - cmd.Println(string(body)) - } - - - if waitForJobToFinishAndPrintOutput { - results, err := getAPIClient().GetResults(ctx, job.ID) - if err != nil { - return err - } - if len(results) == 0 { - return fmt.Errorf("no results found") - } - err = ipfs.DownloadJob( - cm, - job, - results, - runDownloadFlags, - ) - if err != nil { - return err - } - body, err := os.ReadFile(filepath.Join(runDownloadFlags.OutputDir, "stdout")) - if err != nil { - return err + if waitForJobToFinishAndPrintOutput { + results, err := getAPIClient().GetResults(ctx, job.ID) + if err != nil { + return err + } + if len(results) == 0 { + return fmt.Errorf("no results found") + } + err = ipfs.DownloadJob( + cm, + job, + results, + runDownloadFlags, + ) + if err != nil { + return err + } + body, err := os.ReadFile(filepath.Join(runDownloadFlags.OutputDir, "stdout")) + if err != nil { + return err + } + fmt.Println() + fmt.Println(string(body)) } - fmt.Println() - fmt.Println(string(body)) } } diff --git a/cmd/bacalhau/utils.go b/cmd/bacalhau/utils.go index 230b67e3a3..bdd30d56aa 100644 --- a/cmd/bacalhau/utils.go +++ b/cmd/bacalhau/utils.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "strings" "testing" "time" @@ -152,4 +153,3 @@ func setupDownloadFlags(cmd *cobra.Command, settings *ipfs.DownloadSettings) { cmd.Flags().StringVar(&settings.IPFSSwarmAddrs, "ipfs-swarm-addrs", settings.IPFSSwarmAddrs, "Comma-separated list of IPFS nodes to connect to.") } - From f1d29961436bb83f71f8af635144e864298a9ee9 Mon Sep 17 00:00:00 2001 From: js-ts Date: Sat, 6 Aug 2022 14:30:03 +0530 Subject: [PATCH 10/26] removing nolint spaces --- cmd/bacalhau/utils.go | 1 + pkg/executor/local/executor.go | 8 ++++---- pkg/local/utils.go | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cmd/bacalhau/utils.go b/cmd/bacalhau/utils.go index bdd30d56aa..acf9b14dda 100644 --- a/cmd/bacalhau/utils.go +++ b/cmd/bacalhau/utils.go @@ -108,6 +108,7 @@ func ReverseList(s []string) []string { // so if we want to compare the output in the console with a expected output like "hello" we could do that // this is mainly used in testing --local // go playground link https://go.dev/play/p/cuGIaIorWfD + //nolint:unused,deadcode func capture() func() (string, error) { r, w, err := os.Pipe() diff --git a/pkg/executor/local/executor.go b/pkg/executor/local/executor.go index 1746d66da6..7bd0ff13a2 100644 --- a/pkg/executor/local/executor.go +++ b/pkg/executor/local/executor.go @@ -109,7 +109,7 @@ func (e *Local) GetVolumeSize(ctx context.Context, volume storage.StorageSpec) ( } // TODO: #289 Clean up RunJob -// nolint:funlen,gocyclo // will clean up +//nolint:funlen,gocyclo // will clean up func (e *Local) RunJobLocally(ctx context.Context, jobSpec executor.JobSpec) (string, error) { ctx, span := newSpan(ctx, "RunJobLocally") defer span.End() @@ -198,7 +198,7 @@ func (e *Local) RunJobLocally(ctx context.Context, jobSpec executor.JobSpec) (st if err == nil { log.Debug().Msgf("Not pulling image %s, already have %+v", jobSpec.Docker.Image, im) } else if dockerclient.IsErrNotFound(err) { - stdout, err := system.RunCommandGetResults( // nolint:govet // shadowing ok + stdout, err := system.RunCommandGetResults( //nolint:govet // shadowing ok "docker", []string{"pull", jobSpec.Docker.Image}, ) @@ -219,7 +219,7 @@ func (e *Local) RunJobLocally(ctx context.Context, jobSpec executor.JobSpec) (st return "", err } - useEnv := append(jobSpec.Docker.Env, fmt.Sprintf("BACALHAU_JOB_SPEC=%s", string(jsonJobSpec))) // nolint:gocritic + useEnv := append(jobSpec.Docker.Env, fmt.Sprintf("BACALHAU_JOB_SPEC=%s", string(jsonJobSpec))) //nolint:gocritic containerConfig := &container.Config{ Image: jobSpec.Docker.Image, @@ -373,7 +373,7 @@ func (e *Local) cleanupAll() { return } // TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop) - // nolint:gocritic // will fix when we care + //nolint:gocritic // will fix when we care for _, container := range containersWithLabel { err = docker.RemoveContainer(e.Client, container.ID) if err != nil { diff --git a/pkg/local/utils.go b/pkg/local/utils.go index 46fd33daab..714b7ab8c7 100644 --- a/pkg/local/utils.go +++ b/pkg/local/utils.go @@ -49,7 +49,7 @@ func GetContainer(dockerClient *dockerclient.Client, nameOrID string) (*types.Co } // TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop) - // nolint:gocritic // will fix when we care + //nolint:gocritic // will fix when we care for _, container := range containers { if container.ID == nameOrID { return &container, nil @@ -79,7 +79,7 @@ func GetContainersWithLabel(dockerClient *dockerclient.Client, labelName, labelV return nil, err } // TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop) - // nolint:gocritic // will fix when we care + //nolint:gocritic // will fix when we care for _, container := range containers { value, ok := container.Labels[labelName] if !ok { From d37a1065acf256b95d0c3d9a363ea1558a929f6a Mon Sep 17 00:00:00 2001 From: js-ts Date: Sat, 6 Aug 2022 14:55:50 +0530 Subject: [PATCH 11/26] lint issues --- pkg/executor/local/executor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/executor/local/executor.go b/pkg/executor/local/executor.go index 7bd0ff13a2..d090e26c61 100644 --- a/pkg/executor/local/executor.go +++ b/pkg/executor/local/executor.go @@ -109,7 +109,8 @@ func (e *Local) GetVolumeSize(ctx context.Context, volume storage.StorageSpec) ( } // TODO: #289 Clean up RunJob -//nolint:funlen,gocyclo // will clean up +// will clean up +//nolint:funlen,gocyclo func (e *Local) RunJobLocally(ctx context.Context, jobSpec executor.JobSpec) (string, error) { ctx, span := newSpan(ctx, "RunJobLocally") defer span.End() From 5302bdb39bd215ac7b8242123f78f092f969ef41 Mon Sep 17 00:00:00 2001 From: js-ts Date: Sat, 6 Aug 2022 15:01:01 +0530 Subject: [PATCH 12/26] removing comment --- pkg/executor/local/executor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/executor/local/executor.go b/pkg/executor/local/executor.go index d090e26c61..27db055316 100644 --- a/pkg/executor/local/executor.go +++ b/pkg/executor/local/executor.go @@ -109,7 +109,6 @@ func (e *Local) GetVolumeSize(ctx context.Context, volume storage.StorageSpec) ( } // TODO: #289 Clean up RunJob -// will clean up //nolint:funlen,gocyclo func (e *Local) RunJobLocally(ctx context.Context, jobSpec executor.JobSpec) (string, error) { ctx, span := newSpan(ctx, "RunJobLocally") From 6712e7619c066af64419da8de0c61ed39563d044 Mon Sep 17 00:00:00 2001 From: js-ts Date: Mon, 8 Aug 2022 19:33:45 +0530 Subject: [PATCH 13/26] add test for --local input --- cmd/bacalhau/docker_run_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/cmd/bacalhau/docker_run_test.go b/cmd/bacalhau/docker_run_test.go index e86e0873c1..7673f65bfc 100644 --- a/cmd/bacalhau/docker_run_test.go +++ b/cmd/bacalhau/docker_run_test.go @@ -196,6 +196,27 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitLocal() { } +func (suite *DockerRunSuite) TestRun_GenericSubmitLocalInput() { + // -v QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN:/hello.txt ubuntu cat hello.txt + CID := "QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN" + args := []string{"docker", "run", + "--local", + "-v", fmt.Sprintf("%s:/hello.txt", CID), + "ubuntu", + "cat", "hello.txt"} + expectedStdout := "hello" + done := capture() + _, _, err := ExecuteTestCobraCommand(suite.T(), suite.rootCmd, args...) + out, _ := done() + + require.NoError(suite.T(), err) + trimmedStdout := strings.TrimSpace(string(out)) + fmt.Println(trimmedStdout) + + require.Equal(suite.T(), trimmedStdout, expectedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) + +} + func (suite *DockerRunSuite) TestRun_SubmitInputs() { tests := []struct { numberOfJobs int From b1815c6bc7e8c4fdbfba9fe73fdb17eaa80a1d2e Mon Sep 17 00:00:00 2001 From: js-ts Date: Mon, 8 Aug 2022 20:56:59 +0530 Subject: [PATCH 14/26] try fix lint error --- pkg/executor/local/executor.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/executor/local/executor.go b/pkg/executor/local/executor.go index 27db055316..e98a46ffd9 100644 --- a/pkg/executor/local/executor.go +++ b/pkg/executor/local/executor.go @@ -108,8 +108,7 @@ func (e *Local) GetVolumeSize(ctx context.Context, volume storage.StorageSpec) ( return storageProvider.GetVolumeSize(ctx, volume) } -// TODO: #289 Clean up RunJob -//nolint:funlen,gocyclo +//nolint:funlen,gocyclo // will clean up func (e *Local) RunJobLocally(ctx context.Context, jobSpec executor.JobSpec) (string, error) { ctx, span := newSpan(ctx, "RunJobLocally") defer span.End() From 7623cf81f02b102dcd5b67f72e41dc9d9fdfb47b Mon Sep 17 00:00:00 2001 From: js-ts Date: Mon, 8 Aug 2022 21:54:44 +0530 Subject: [PATCH 15/26] pull .circleci\config.yaml from main --- .circleci/config.yml | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 302739217c..a0ca35eb34 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -277,14 +277,16 @@ workflows: ignore: main tags: ignore: /.*/ - - deploy: - name: deploy-development-cluster - requires: - - build-linux-amd64 - rollout_stage: development - GOOGLE_APPLICATION_CREDENTIALS_VARIABLE: "GOOGLE_APPLICATION_DEVELOPMENT_CREDENTIALS_B64" - filters: - <<: *filters_dev_branches # this is calling the previously set yaml anchor + ## deploying to dev terraform cluster should not happen from non-main branch builds in CI + ## See https://github.com/filecoin-project/bacalhau/issues/434 + # - deploy: + # name: deploy-development-cluster + # requires: + # - build-linux-amd64 + # rollout_stage: development + # GOOGLE_APPLICATION_CREDENTIALS_VARIABLE: "GOOGLE_APPLICATION_DEVELOPMENT_CREDENTIALS_B64" + # filters: + # <<: *filters_dev_branches # this is calling the previously set yaml anchor main_only: # This workflow will only run on 'main' and will not run on tags jobs: @@ -333,4 +335,4 @@ workflows: - run_perf: name: Running perf requires: - - build-linux-amd64 + - build-linux-amd64 \ No newline at end of file From fd380cd4e927ecc5a3f1900d193942041999d387 Mon Sep 17 00:00:00 2001 From: Enrico Rotundo Date: Tue, 9 Aug 2022 10:34:08 +0200 Subject: [PATCH 16/26] Update config.yml --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a0ca35eb34..635c089c07 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -335,4 +335,4 @@ workflows: - run_perf: name: Running perf requires: - - build-linux-amd64 \ No newline at end of file + - build-linux-amd64 From fc23661448cf5170c1bbc09804613f1e4e23f83d Mon Sep 17 00:00:00 2001 From: Kai Davenport Date: Wed, 10 Aug 2022 15:39:31 +0100 Subject: [PATCH 17/26] add a local devstack mode that is the same but with a publically connected ipfs server --- cmd/bacalhau/devstack.go | 1 + pkg/devstack/devstack.go | 75 ++++++++++++++++++++++++++++++++++++-- pkg/test/devstack/utils.go | 1 + 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/cmd/bacalhau/devstack.go b/cmd/bacalhau/devstack.go index 00718c05c0..5b4ffba952 100644 --- a/cmd/bacalhau/devstack.go +++ b/cmd/bacalhau/devstack.go @@ -128,6 +128,7 @@ var devstackCmd = &cobra.Command{ getVerifiers, computeNodeConfig, devStackPeer, + false, ) if err != nil { return err diff --git a/pkg/devstack/devstack.go b/pkg/devstack/devstack.go index 99a358f352..ef50b86e4a 100644 --- a/pkg/devstack/devstack.go +++ b/pkg/devstack/devstack.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/bacalhau/pkg/config" "github.com/filecoin-project/bacalhau/pkg/controller" "github.com/filecoin-project/bacalhau/pkg/executor" + executor_util "github.com/filecoin-project/bacalhau/pkg/executor/util" "github.com/filecoin-project/bacalhau/pkg/ipfs" "github.com/filecoin-project/bacalhau/pkg/localdb/inmemory" "github.com/filecoin-project/bacalhau/pkg/publicapi" @@ -20,6 +21,7 @@ import ( "github.com/filecoin-project/bacalhau/pkg/system" "github.com/filecoin-project/bacalhau/pkg/transport/libp2p" "github.com/filecoin-project/bacalhau/pkg/verifier" + verifier_util "github.com/filecoin-project/bacalhau/pkg/verifier/util" "github.com/ipfs/go-datastore" "github.com/phayes/freeport" "github.com/rs/zerolog/log" @@ -69,6 +71,63 @@ type GetVerifiersFunc func( error, ) +func NewDevStackForRunLocal( + cm *system.CleanupManager, + count int, //nolint:unparam // Incorrectly assumed as unused +) (*DevStack, error) { + getStorageProviders := func(ipfsMultiAddress string, nodeIndex int) (map[storage.StorageSourceType]storage.StorageProvider, error) { + return executor_util.NewStandardStorageProviders(cm, ipfsMultiAddress) + } + getExecutors := func( + ipfsMultiAddress string, + nodeIndex int, + ctrl *controller.Controller, + ) ( + map[executor.EngineType]executor.Executor, + error, + ) { + ipfsParts := strings.Split(ipfsMultiAddress, "/") + ipfsSuffix := ipfsParts[len(ipfsParts)-1] + return executor_util.NewStandardExecutors( + cm, + ipfsMultiAddress, + fmt.Sprintf("devstacknode%d-%s", nodeIndex, ipfsSuffix), + ) + } + getVerifiers := func( + ipfsMultiAddress string, + nodeIndex int, + ctrl *controller.Controller, + ) ( + map[verifier.VerifierType]verifier.Verifier, + error, + ) { + jobLoader := func(ctx context.Context, id string) (executor.Job, error) { + return ctrl.GetJob(ctx, id) + } + stateLoader := func(ctx context.Context, id string) (executor.JobState, error) { + return ctrl.GetJobState(ctx, id) + } + return verifier_util.NewIPFSVerifiers(cm, ipfsMultiAddress, jobLoader, stateLoader) + } + + return NewDevStack( + cm, + count, 0, + getStorageProviders, + getExecutors, + getVerifiers, + computenode.ComputeNodeConfig{ + JobSelectionPolicy: computenode.JobSelectionPolicy{ + Locality: computenode.Anywhere, + RejectStatelessJobs: false, + }, + }, + "", + true, + ) +} + //nolint:funlen,gocyclo func NewDevStack( cm *system.CleanupManager, @@ -79,6 +138,7 @@ func NewDevStack( //nolint:gocritic config computenode.ComputeNodeConfig, peer string, + publicIPFSMode bool, ) (*DevStack, error) { ctx, span := newSpan("NewDevStack") defer span.End() @@ -92,6 +152,8 @@ func NewDevStack( ////////////////////////////////////// var err error var ipfsSwarmAddrs []string + var ipfsNode *ipfs.Node + if i > 0 { ipfsSwarmAddrs, err = nodes[0].IpfsNode.SwarmAddresses() if err != nil { @@ -99,9 +161,16 @@ func NewDevStack( } } - ipfsNode, err := ipfs.NewLocalNode(cm, ipfsSwarmAddrs) - if err != nil { - return nil, fmt.Errorf("failed to create ipfs node: %w", err) + if publicIPFSMode { + ipfsNode, err = ipfs.NewNode(cm, []string{}) + if err != nil { + return nil, fmt.Errorf("failed to create ipfs node: %w", err) + } + } else { + ipfsNode, err = ipfs.NewLocalNode(cm, ipfsSwarmAddrs) + if err != nil { + return nil, fmt.Errorf("failed to create ipfs node: %w", err) + } } ipfsClient, err := ipfsNode.Client() diff --git a/pkg/test/devstack/utils.go b/pkg/test/devstack/utils.go index 048edec566..e13291219a 100644 --- a/pkg/test/devstack/utils.go +++ b/pkg/test/devstack/utils.go @@ -77,6 +77,7 @@ func SetupTest( getVerifiers, config, "", + false, ) require.NoError(t, err) From 94a89bc6e8f65afe0e2e1173a364f7d34bcb4ab5 Mon Sep 17 00:00:00 2001 From: Kai Davenport Date: Wed, 10 Aug 2022 15:41:30 +0100 Subject: [PATCH 18/26] remove local executor - local is more about having a devstack locally to post the job --- cmd/bacalhau/docker_run.go | 15 +- pkg/executor/local/executor.go | 418 --------------------------------- pkg/executor/util/utils.go | 39 --- pkg/local/utils.go | 249 -------------------- 4 files changed, 3 insertions(+), 718 deletions(-) delete mode 100644 pkg/executor/local/executor.go delete mode 100644 pkg/local/utils.go diff --git a/cmd/bacalhau/docker_run.go b/cmd/bacalhau/docker_run.go index 1e4dd460c5..add8600a64 100644 --- a/cmd/bacalhau/docker_run.go +++ b/cmd/bacalhau/docker_run.go @@ -8,10 +8,10 @@ import ( "strings" "time" + "github.com/davecgh/go-spew/spew" "github.com/filecoin-project/bacalhau/pkg/executor" "github.com/filecoin-project/bacalhau/pkg/ipfs" jobutils "github.com/filecoin-project/bacalhau/pkg/job" - "github.com/filecoin-project/bacalhau/pkg/local" "github.com/filecoin-project/bacalhau/pkg/system" "github.com/filecoin-project/bacalhau/pkg/verifier" @@ -271,17 +271,8 @@ var dockerRunCmd = &cobra.Command{ } } if isLocal { - client, err := local.NewDockerClient() - if err != nil { - cmd.Printf("%t\n", local.IsInstalled(client)) - return err - } - std, err := local.RunJobLocally(ctx, spec) - cmd.Printf("%v", std) - - if err != nil { - return err - } + fmt.Printf("LOCAL --------------------------------------\n") + spew.Dump("LOCAL") } else { job, err := getAPIClient().Submit(ctx, spec, deal, nil) if err != nil { diff --git a/pkg/executor/local/executor.go b/pkg/executor/local/executor.go deleted file mode 100644 index e98a46ffd9..0000000000 --- a/pkg/executor/local/executor.go +++ /dev/null @@ -1,418 +0,0 @@ -package local - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "os" - "runtime/debug" - - "github.com/google/uuid" - - dockertypes "github.com/docker/docker/api/types" - - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/mount" - "github.com/docker/docker/api/types/network" - dockerclient "github.com/docker/docker/client" - "github.com/filecoin-project/bacalhau/pkg/capacitymanager" - "github.com/filecoin-project/bacalhau/pkg/config" - "github.com/filecoin-project/bacalhau/pkg/docker" - "github.com/filecoin-project/bacalhau/pkg/executor" - "github.com/filecoin-project/bacalhau/pkg/storage" - "github.com/filecoin-project/bacalhau/pkg/storage/util" - "github.com/filecoin-project/bacalhau/pkg/system" - "github.com/rs/zerolog/log" - "go.opentelemetry.io/otel/trace" -) - -const NanoCPUCoefficient = 1000000000 - -type Local struct { - ID string - - jobID string - - ResultsDir string - - StorageProviders map[storage.StorageSourceType]storage.StorageProvider - - Client *dockerclient.Client -} - -// var jobID string - -func NewExecutor( - cm *system.CleanupManager, - id string, - storageProviders map[storage.StorageSourceType]storage.StorageProvider, -) (*Local, error) { - dockerClient, err := docker.NewDockerClient() - jobID := genID() - if err != nil { - return nil, err - } - - dir, err := ioutil.TempDir("", "bacalhau-docker-executor") - if err != nil { - return nil, err - } - - de := &Local{ - ID: id, - jobID: jobID, - ResultsDir: dir, - StorageProviders: storageProviders, - Client: dockerClient, - } - - cm.RegisterCallback(func() error { - de.cleanupAll() - return nil - }) - - return de, nil -} - -// to do -// write a function for interactive mode - -func (e *Local) getStorageProvider(ctx context.Context, engine storage.StorageSourceType) (storage.StorageProvider, error) { - return util.GetStorageProvider(ctx, engine, e.StorageProviders) -} - -// IsInstalled checks if docker itself is installed. -func (e *Local) IsInstalled(ctx context.Context) (bool, error) { - return docker.IsInstalled(e.Client), nil -} - -func (e *Local) HasStorageLocally(ctx context.Context, volume storage.StorageSpec) (bool, error) { - ctx, span := newSpan(ctx, "HasStorageLocally") - defer span.End() - - s, err := e.getStorageProvider(ctx, volume.Engine) - if err != nil { - return false, err - } - - return s.HasStorageLocally(ctx, volume) -} - -func (e *Local) GetVolumeSize(ctx context.Context, volume storage.StorageSpec) (uint64, error) { - storageProvider, err := e.getStorageProvider(ctx, volume.Engine) - if err != nil { - return 0, err - } - return storageProvider.GetVolumeSize(ctx, volume) -} - -//nolint:funlen,gocyclo // will clean up -func (e *Local) RunJobLocally(ctx context.Context, jobSpec executor.JobSpec) (string, error) { - ctx, span := newSpan(ctx, "RunJobLocally") - defer span.End() - - jobResultsDir, err := e.ensureJobResultsDir() - if err != nil { - return "", err - } - - // the actual mounts we will give to the container - // these are paths for both input and output data - mounts := []mount.Mount{} - - // loop over the job storage inputs and prepare them - for _, inputStorage := range jobSpec.Inputs { - var storageProvider storage.StorageProvider - var volumeMount storage.StorageVolume - storageProvider, err = e.getStorageProvider(ctx, inputStorage.Engine) - if err != nil { - return "", err - } - - volumeMount, err = storageProvider.PrepareStorage(ctx, inputStorage) - if err != nil { - return "", err - } - - if volumeMount.Type == storage.StorageVolumeConnectorBind { - log.Trace().Msgf("Input Volume: %+v %+v", inputStorage, volumeMount) - - mounts = append(mounts, mount.Mount{ - Type: "bind", - - // this is an input volume so is read only - ReadOnly: true, - Source: volumeMount.Source, - Target: volumeMount.Target, - }) - } else { - return "", fmt.Errorf( - "unknown storage volume type: %s", volumeMount.Type) - } - } - - // for this phase of the outputs we ignore the engine because it's just about collecting the - // data from the job and keeping it locally - // the engine property of the output storage spec is how we will "publish" the output volume - // if and when the deal is settled - for _, output := range jobSpec.Outputs { - if output.Name == "" { - return "", fmt.Errorf("output volume has no name: %+v", output) - } - - if output.Path == "" { - return "", fmt.Errorf("output volume has no path: %+v", output) - } - - srcd := fmt.Sprintf("%s/%s", jobResultsDir, output.Name) - err = os.Mkdir(srcd, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W) - if err != nil { - return "", err - } - - log.Trace().Msgf("Output Volume: %+v", output) - - // create a mount so the output data does not need to be copied back to the host - mounts = append(mounts, mount.Mount{ - - Type: "bind", - // this is an output volume so can be written to - ReadOnly: false, - - // we create a named folder in the job results folder for this output - Source: srcd, - - // the path of the output volume is from the perspective of inside the container - Target: output.Path, - }) - } - - if os.Getenv("SKIP_IMAGE_PULL") == "" { - // TODO: #283 work out why this does not work in github actions - // err = docker.PullImage(e.Client, job.Spec.Vm.Image) - var im dockertypes.ImageInspect - im, _, err = e.Client.ImageInspectWithRaw(ctx, jobSpec.Docker.Image) - if err == nil { - log.Debug().Msgf("Not pulling image %s, already have %+v", jobSpec.Docker.Image, im) - } else if dockerclient.IsErrNotFound(err) { - stdout, err := system.RunCommandGetResults( //nolint:govet // shadowing ok - "docker", - []string{"pull", jobSpec.Docker.Image}, - ) - if err != nil { - return "", fmt.Errorf("error pulling %s: %s, %s", jobSpec.Docker.Image, err, stdout) - } - log.Trace().Msgf("Pull image output: %s\n%s", jobSpec.Docker.Image, stdout) - } else { - return "", fmt.Errorf("error checking if we have %s locally: %s", jobSpec.Docker.Image, err) - } - } - - // json the job spec and pass it into all containers - // TODO: check if this will overwrite a user supplied version of this value - // (which is what we actually want to happen) - jsonJobSpec, err := json.Marshal(jobSpec) - if err != nil { - return "", err - } - - useEnv := append(jobSpec.Docker.Env, fmt.Sprintf("BACALHAU_JOB_SPEC=%s", string(jsonJobSpec))) //nolint:gocritic - - containerConfig := &container.Config{ - Image: jobSpec.Docker.Image, - Tty: false, - Env: useEnv, - Entrypoint: jobSpec.Docker.Entrypoint, - Labels: e.jobContainerLabels(), - NetworkDisabled: true, - } - - log.Trace().Msgf("Container: %+v %+v", containerConfig, mounts) - - resourceRequirements := capacitymanager.ParseResourceUsageConfig(jobSpec.Resources) - - // Create GPU request if the job requests it - var deviceRequests []container.DeviceRequest - if resourceRequirements.GPU > 0 { - deviceRequests = append(deviceRequests, - container.DeviceRequest{ - DeviceIDs: []string{"0"}, // TODO: how do we know which device ID to use? - Capabilities: [][]string{{"gpu"}}, - }, - ) - log.Trace().Msgf("Adding %d GPUs to request", resourceRequirements.GPU) - } - - jobContainer, err := e.Client.ContainerCreate( - ctx, - containerConfig, - &container.HostConfig{ - Mounts: mounts, - Resources: container.Resources{ - Memory: int64(resourceRequirements.Memory), - NanoCPUs: int64(resourceRequirements.CPU * NanoCPUCoefficient), - DeviceRequests: deviceRequests, - }, - }, - &network.NetworkingConfig{}, - nil, - e.jobContainerName(), - ) - if err != nil { - return "", fmt.Errorf("failed to create container: %w", err) - } - - err = e.Client.ContainerStart( - ctx, - jobContainer.ID, - dockertypes.ContainerStartOptions{}, - ) - if err != nil { - return "", fmt.Errorf("failed to start container: %w", err) - } - defer e.cleanupJob() - - // the idea here is even if the container errors - // we want to capture stdout, stderr and feed it back to the user - var containerError error - var containerExitStatusCode int64 - statusCh, errCh := e.Client.ContainerWait( - ctx, - jobContainer.ID, - container.WaitConditionNotRunning, - ) - select { - case err = <-errCh: - containerError = err - case exitStatus := <-statusCh: - containerExitStatusCode = exitStatus.StatusCode - if exitStatus.Error != nil { - containerError = errors.New(exitStatus.Error.Message) - } - } - if containerExitStatusCode != 0 { - if containerError == nil { - containerError = fmt.Errorf("exit code was not zero: %d", containerExitStatusCode) - } - log.Info().Msgf("container error %s", containerError) - } - - stdout, stderr, err := system.RunCommandGetStdoutAndStderr( - "docker", - []string{ - "logs", - "-f", - jobContainer.ID, - }, - ) - if err != nil { - return "", fmt.Errorf("failed to get logs: %w", err) - } - - err = os.WriteFile( - fmt.Sprintf("%s/exitCode", jobResultsDir), - []byte(fmt.Sprintf("%d", containerExitStatusCode)), - util.OS_ALL_R|util.OS_USER_RW, - ) - if err != nil { - msg := fmt.Sprintf("could not write results to exitCode: %s", err) - log.Error().Msg(msg) - return "", errors.New(msg) - } - - err = os.WriteFile( - fmt.Sprintf("%s/stdout", jobResultsDir), - []byte(stdout), - util.OS_ALL_R|util.OS_USER_RW, - ) - if err != nil { - msg := fmt.Sprintf("could not write results to stdout: %s", err) - log.Error().Msg(msg) - return "", errors.New(msg) - } - - err = os.WriteFile( - fmt.Sprintf("%s/stderr", jobResultsDir), - []byte(stderr), - util.OS_ALL_R|util.OS_USER_RW, - ) - if err != nil { - msg := fmt.Sprintf("could not write results to stderr: %s", err) - log.Error().Msg(msg) - return "", errors.New(msg) - } - fmt.Println(stdout) - fmt.Println(stderr) - return jobResultsDir, containerError -} - -func (e *Local) cleanupJob() { - if config.ShouldKeepStack() { - return - } - - err := docker.RemoveContainer(e.Client, e.jobContainerName()) - if err != nil { - log.Error().Msgf("Docker remove container error: %s", err.Error()) - debug.PrintStack() - } -} - -func (e *Local) cleanupAll() { - if config.ShouldKeepStack() { - return - } - - log.Info().Msgf("Cleaning up all bacalhau containers for executor %s...", e.ID) - containersWithLabel, err := docker.GetContainersWithLabel(e.Client, "bacalhau-executor", e.ID) - if err != nil { - log.Error().Msgf("Docker executor stop error: %s", err.Error()) - return - } - // TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop) - //nolint:gocritic // will fix when we care - for _, container := range containersWithLabel { - err = docker.RemoveContainer(e.Client, container.ID) - if err != nil { - log.Error().Msgf("Non-critical error cleaning up container: %s", err.Error()) - } - } -} - -func (e *Local) jobContainerName() string { - return fmt.Sprintf("bacalhau-%s-%s", e.ID, e.jobID) -} - -func (e *Local) jobContainerLabels() map[string]string { - return map[string]string{ - "bacalhau-executor": e.ID, - "bacalhau-jobID": e.jobID, - } -} - -func (e *Local) jobResultsDir() string { - return fmt.Sprintf("%s/%s", e.ResultsDir, e.jobID) -} - -func (e *Local) ensureJobResultsDir() (string, error) { - dir := e.jobResultsDir() - err := os.MkdirAll(dir, util.OS_ALL_RWX) - info, _ := os.Stat(dir) - log.Trace().Msgf("Created job results dir (%s). Permissions: %s", dir, info.Mode()) - return dir, err -} - -func newSpan(ctx context.Context, apiName string) (context.Context, trace.Span) { - return system.Span(ctx, "executor/local", apiName) -} - -func genID() string { - jobUUID, err := uuid.NewRandom() - jobID := jobUUID.String() - if err != nil { - fmt.Printf("error creating job id: %v", err) - } - return jobID -} diff --git a/pkg/executor/util/utils.go b/pkg/executor/util/utils.go index e37c31adb1..32993ec9d9 100644 --- a/pkg/executor/util/utils.go +++ b/pkg/executor/util/utils.go @@ -4,7 +4,6 @@ import ( "github.com/filecoin-project/bacalhau/pkg/executor" "github.com/filecoin-project/bacalhau/pkg/executor/docker" "github.com/filecoin-project/bacalhau/pkg/executor/language" - "github.com/filecoin-project/bacalhau/pkg/executor/local" noop_executor "github.com/filecoin-project/bacalhau/pkg/executor/noop" pythonwasm "github.com/filecoin-project/bacalhau/pkg/executor/python_wasm" "github.com/filecoin-project/bacalhau/pkg/storage" @@ -87,44 +86,6 @@ func NewStandardExecutors( return executors, nil } -func NewLocalStandardExecutors( - cm *system.CleanupManager, - ipfsMultiAddress, - dockerID string, -) (*local.Local, error) { - // Don't allow user to choose the fuse driver in case it has security issues. - // ipfsFuseStorage, err := fusedocker.NewStorageProvider(cm, ipfsMultiAddress) - // if err != nil { - // return nil, err - // } - - ipfsAPICopyStorage, err := apicopy.NewStorageProvider(cm, ipfsMultiAddress) - if err != nil { - return nil, err - } - - urlDownloadStorage, err := urldownload.NewStorageProvider(cm) - if err != nil { - return nil, err - } - - exDocker, err := local.NewExecutor(cm, dockerID, - map[storage.StorageSourceType]storage.StorageProvider{ - // fuse driver is disabled so that - in case it poses a security - // risk - arbitrary users can't request it - // storage.IPFS_FUSE_DOCKER: ipfsFuseStorage, - storage.StorageSourceIPFS: ipfsAPICopyStorage, - // we make the copy driver the "default" storage driver for docker - // users have to specify the fuse driver explicitly - storage.StorageSourceURLDownload: urlDownloadStorage, - }) - if err != nil { - return nil, err - } - - return exDocker, nil -} - // return noop executors for all engines func NewNoopExecutors( cm *system.CleanupManager, diff --git a/pkg/local/utils.go b/pkg/local/utils.go deleted file mode 100644 index 714b7ab8c7..0000000000 --- a/pkg/local/utils.go +++ /dev/null @@ -1,249 +0,0 @@ -package local - -import ( - "bytes" - "context" - "fmt" - "io" - "os" - "strings" - "time" - - "github.com/docker/docker/api/types" - dockerclient "github.com/docker/docker/client" - "github.com/filecoin-project/bacalhau/pkg/config" - "github.com/filecoin-project/bacalhau/pkg/executor" - executor_util "github.com/filecoin-project/bacalhau/pkg/executor/util" - "github.com/filecoin-project/bacalhau/pkg/system" - "github.com/filecoin-project/bacalhau/pkg/transport/libp2p" - "github.com/moby/moby/pkg/stdcopy" - "github.com/rs/zerolog/log" -) - -// ErrContainerMarkedForRemoval indicates that the docker daemon is about to -// delete, or has already deleted, the given container. -var ErrContainerMarkedForRemoval = fmt.Errorf("docker container marked for removal") - -var DefaultBootstrapAddresses = []string{ - "/ip4/35.245.115.191/tcp/1235/p2p/QmdZQ7ZbhnvWY1J12XYKGHApJ6aufKyLNSvf8jZBrBaAVL", - "/ip4/35.245.61.251/tcp/1235/p2p/QmXaXu9N5GNetatsvwnTfQqNtSeKAD6uCmarbh3LMRYAcF", - "/ip4/35.245.251.239/tcp/1235/p2p/QmYgxZiySj3MRkwLSL4X2MF5F9f2PMhAE3LV49XkfNL1o3", -} -var DefaultSwarmPort = 1235 - -func NewDockerClient() (*dockerclient.Client, error) { - return dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) -} - -func IsInstalled(dockerClient *dockerclient.Client) bool { - _, err := dockerClient.Info(context.Background()) - return err == nil -} - -func GetContainer(dockerClient *dockerclient.Client, nameOrID string) (*types.Container, error) { - containers, err := dockerClient.ContainerList(context.Background(), types.ContainerListOptions{ - All: true, - }) - if err != nil { - return nil, err - } - - // TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop) - //nolint:gocritic // will fix when we care - for _, container := range containers { - if container.ID == nameOrID { - return &container, nil - } - - if container.ID[0:11] == nameOrID { - return &container, nil - } - - for _, containerName := range container.Names { - if containerName == fmt.Sprintf("/%s", nameOrID) { - return &container, nil - } - } - } - - return nil, nil -} - -func GetContainersWithLabel(dockerClient *dockerclient.Client, labelName, labelValue string) ([]types.Container, error) { - results := []types.Container{} - containers, err := dockerClient.ContainerList(context.Background(), types.ContainerListOptions{ - All: true, - }) - - if err != nil { - return nil, err - } - // TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop) - //nolint:gocritic // will fix when we care - for _, container := range containers { - value, ok := container.Labels[labelName] - if !ok { - continue - } - if value == labelValue { - results = append(results, container) - } - } - return results, nil -} - -func GetLogs(dockerClient *dockerclient.Client, nameOrID string) (stdout, stderr string, err error) { - container, err := GetContainer(dockerClient, nameOrID) - if err != nil { - return "", "", fmt.Errorf("failed to get container: %w", err) - } - if container == nil { - return "", "", fmt.Errorf("no container found: %s", nameOrID) - } - - logsReader, err := dockerClient.ContainerLogs(context.Background(), container.ID, types.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - }) - if err != nil { - // String checking is unfortunately the best we have, as errors are - // returned by the docker server as strings, and aren't strongly typed. - if strings.Contains(err.Error(), "can not get logs from container which is dead or marked for removal") { - return "", "", ErrContainerMarkedForRemoval - } - - return "", "", fmt.Errorf("failed to get container logs: %w", err) - } - - stdoutBuffer := bytes.NewBuffer([]byte{}) - stderrBuffer := bytes.NewBuffer([]byte{}) - _, err = stdcopy.StdCopy(stdoutBuffer, stderrBuffer, logsReader) - if err != nil { - return "", "", err - } - - return stdoutBuffer.String(), stderrBuffer.String(), nil -} - -func RemoveContainer(dockerClient *dockerclient.Client, nameOrID string) error { - ctx := context.Background() - - container, err := GetContainer(dockerClient, nameOrID) - if err != nil { - return err - } - if container == nil { - return nil - } - log.Debug().Msgf("Container Stop: %s", container.ID) - timeout := time.Millisecond * 100 - err = dockerClient.ContainerStop(ctx, container.ID, &timeout) - if err != nil { - return err - } - err = dockerClient.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{ - RemoveVolumes: true, - Force: true, - }) - if err != nil { - return err - } - return nil -} - -func WaitForContainer(client *dockerclient.Client, id string, maxAttempts int, delay time.Duration) error { - waiter := &system.FunctionWaiter{ - Name: fmt.Sprintf("wait for container to be running: %s", id), - MaxAttempts: maxAttempts, - Delay: delay, - Handler: func() (bool, error) { - container, err := GetContainer(client, id) - if err != nil { - return false, err - } - if container == nil { - return false, nil - } - return container.State == "running", nil - }, - } - return waiter.Wait() -} - -func WaitForContainerLogs(client *dockerclient.Client, id string, maxAttempts int, delay time.Duration, findString string) (string, error) { - lastLogs := "" - waiter := &system.FunctionWaiter{ - Name: fmt.Sprintf("wait for container to be running: %s", id), - MaxAttempts: maxAttempts, - Delay: delay, - Handler: func() (bool, error) { - container, err := GetContainer(client, id) - if err != nil { - return false, err - } - if container == nil { - return false, nil - } - if container.State != "running" { - return false, nil - } - stdout, stderr, err := GetLogs(client, id) - if err != nil { - return false, err - } - lastLogs = stdout + "\n" + stderr - return strings.Contains(stdout, findString) || strings.Contains(stderr, findString), nil - }, - } - err := waiter.Wait() - return lastLogs, err -} - -func PullImage(dockerClient *dockerclient.Client, image string) error { - imagePullStream, err := dockerClient.ImagePull( - context.Background(), - image, - types.ImagePullOptions{}, - ) - - if err != nil { - return err - } - - if config.IsDebug() { - _, err = io.Copy(os.Stdout, imagePullStream) - if err != nil { - return err - } - } - - return imagePullStream.Close() -} - -func RunJobLocally(ctx context.Context, jobspec executor.JobSpec) (string, error) { - log.Debug().Msgf("in your local docker executor!") - - cm := system.NewCleanupManager() - cm.RegisterCallback(system.CleanupTracer) - defer cm.Cleanup() - - peers := DefaultBootstrapAddresses // Default to connecting to defaults - log.Debug().Msgf("libp2p connecting to: %s", strings.Join(peers, ", ")) - - hostPort := DefaultSwarmPort - transport, err := libp2p.NewTransport(cm, hostPort, peers) - if err != nil { - fmt.Printf("error is : %v", err) - } - hostID, err := transport.HostID(context.Background()) - if err != nil { - fmt.Printf("error is : %v", err) - } - - addrStr := "/ip4/127.0.0.1/tcp/5001" - if err != nil { - fmt.Printf("error is : %v", err) - } - e, _ := executor_util.NewLocalStandardExecutors(cm, addrStr, fmt.Sprintf("bacalhau-%s", hostID)) - return e.RunJobLocally(ctx, jobspec) -} From 8472c0e538b801d6740ff0a2149a8399d40956d3 Mon Sep 17 00:00:00 2001 From: Kai Davenport Date: Wed, 10 Aug 2022 15:58:50 +0100 Subject: [PATCH 19/26] pass the job into local devstack --- cmd/bacalhau/docker_run.go | 75 +++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/cmd/bacalhau/docker_run.go b/cmd/bacalhau/docker_run.go index add8600a64..ce9b7067b6 100644 --- a/cmd/bacalhau/docker_run.go +++ b/cmd/bacalhau/docker_run.go @@ -8,10 +8,11 @@ import ( "strings" "time" - "github.com/davecgh/go-spew/spew" + "github.com/filecoin-project/bacalhau/pkg/devstack" "github.com/filecoin-project/bacalhau/pkg/executor" "github.com/filecoin-project/bacalhau/pkg/ipfs" jobutils "github.com/filecoin-project/bacalhau/pkg/job" + "github.com/filecoin-project/bacalhau/pkg/publicapi" "github.com/filecoin-project/bacalhau/pkg/system" "github.com/filecoin-project/bacalhau/pkg/verifier" @@ -270,48 +271,56 @@ var dockerRunCmd = &cobra.Command{ return err } } + + var apiClient *publicapi.APIClient if isLocal { - fmt.Printf("LOCAL --------------------------------------\n") - spew.Dump("LOCAL") + stack, err := devstack.NewDevStackForRunLocal(cm, 1) + if err != nil { + return err + } + apiUri := stack.Nodes[0].APIServer.GetURI() + apiClient = publicapi.NewAPIClient(apiUri) } else { - job, err := getAPIClient().Submit(ctx, spec, deal, nil) + apiClient = getAPIClient() + } + + job, err := apiClient.Submit(ctx, spec, deal, nil) + if err != nil { + return err + } + + cmd.Printf("%s\n", job.ID) + if waitForJobToFinish { + resolver := getAPIClient().GetJobStateResolver() + resolver.SetWaitTime(waitForJobTimeoutSecs, time.Second*1) + err = resolver.WaitUntilComplete(ctx, job.ID) if err != nil { return err } - cmd.Printf("%s\n", job.ID) - if waitForJobToFinish { - resolver := getAPIClient().GetJobStateResolver() - resolver.SetWaitTime(waitForJobTimeoutSecs, time.Second*1) - err = resolver.WaitUntilComplete(ctx, job.ID) + if waitForJobToFinishAndPrintOutput { + results, err := getAPIClient().GetResults(ctx, job.ID) if err != nil { return err } - - if waitForJobToFinishAndPrintOutput { - results, err := getAPIClient().GetResults(ctx, job.ID) - if err != nil { - return err - } - if len(results) == 0 { - return fmt.Errorf("no results found") - } - err = ipfs.DownloadJob( - cm, - job, - results, - runDownloadFlags, - ) - if err != nil { - return err - } - body, err := os.ReadFile(filepath.Join(runDownloadFlags.OutputDir, "stdout")) - if err != nil { - return err - } - fmt.Println() - fmt.Println(string(body)) + if len(results) == 0 { + return fmt.Errorf("no results found") + } + err = ipfs.DownloadJob( + cm, + job, + results, + runDownloadFlags, + ) + if err != nil { + return err + } + body, err := os.ReadFile(filepath.Join(runDownloadFlags.OutputDir, "stdout")) + if err != nil { + return err } + fmt.Println() + fmt.Println(string(body)) } } From 9adb6e79d3fed29832475105224006b278dc007d Mon Sep 17 00:00:00 2001 From: Enrico Rotundo Date: Thu, 11 Aug 2022 15:09:51 +0200 Subject: [PATCH 20/26] Fix expected vs acutal comparison in --local test --- cmd/bacalhau/docker_run_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/cmd/bacalhau/docker_run_test.go b/cmd/bacalhau/docker_run_test.go index 7673f65bfc..0369824dc0 100644 --- a/cmd/bacalhau/docker_run_test.go +++ b/cmd/bacalhau/docker_run_test.go @@ -182,7 +182,8 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitWait() { } func (suite *DockerRunSuite) TestRun_GenericSubmitLocal() { - args := []string{"docker", "run", "ubuntu", "echo", "hello", "--local"} + // TODO @enricorotundo check if --wait is working + args := []string{"docker", "run", "ubuntu", "echo", "hello", "--local", "--wait"} expectedStdout := "hello" done := capture() _, _, err := ExecuteTestCobraCommand(suite.T(), suite.rootCmd, args...) @@ -192,12 +193,10 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitLocal() { trimmedStdout := strings.TrimSpace(string(out)) fmt.Println(trimmedStdout) - require.Equal(suite.T(), trimmedStdout, expectedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) - + require.Equal(suite.T(), expectedStdout, trimmedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) } func (suite *DockerRunSuite) TestRun_GenericSubmitLocalInput() { - // -v QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN:/hello.txt ubuntu cat hello.txt CID := "QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN" args := []string{"docker", "run", "--local", @@ -214,7 +213,6 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitLocalInput() { fmt.Println(trimmedStdout) require.Equal(suite.T(), trimmedStdout, expectedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) - } func (suite *DockerRunSuite) TestRun_SubmitInputs() { From 76887f688ef346c37880622d1c7481fdb2ae3263 Mon Sep 17 00:00:00 2001 From: Enrico Rotundo Date: Thu, 11 Aug 2022 16:29:31 +0200 Subject: [PATCH 21/26] Fix subtle bug --- cmd/bacalhau/docker_run.go | 4 ++-- pkg/executor/docker/executor.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cmd/bacalhau/docker_run.go b/cmd/bacalhau/docker_run.go index ce9b7067b6..655bf3d279 100644 --- a/cmd/bacalhau/docker_run.go +++ b/cmd/bacalhau/docker_run.go @@ -291,7 +291,7 @@ var dockerRunCmd = &cobra.Command{ cmd.Printf("%s\n", job.ID) if waitForJobToFinish { - resolver := getAPIClient().GetJobStateResolver() + resolver := apiClient.GetJobStateResolver() resolver.SetWaitTime(waitForJobTimeoutSecs, time.Second*1) err = resolver.WaitUntilComplete(ctx, job.ID) if err != nil { @@ -299,7 +299,7 @@ var dockerRunCmd = &cobra.Command{ } if waitForJobToFinishAndPrintOutput { - results, err := getAPIClient().GetResults(ctx, job.ID) + results, err := apiClient.GetResults(ctx, job.ID) if err != nil { return err } diff --git a/pkg/executor/docker/executor.go b/pkg/executor/docker/executor.go index 38b87c4887..9f37811b04 100644 --- a/pkg/executor/docker/executor.go +++ b/pkg/executor/docker/executor.go @@ -105,6 +105,8 @@ func (e *Executor) RunShard(ctx context.Context, j executor.Job, shardIndex int) ctx, span := newSpan(ctx, "RunJob") defer span.End() + log.Debug().Msgf("Running job %s on executor %s", j.ID, e.ID) + jobResultsDir, err := e.ensureShardResultsDir(j, shardIndex) if err != nil { return "", err @@ -240,7 +242,7 @@ func (e *Executor) RunShard(ctx context.Context, j executor.Job, shardIndex int) NetworkDisabled: true, WorkingDir: j.Spec.Docker.WorkingDir, } - + log.Trace().Msgf("Container: %+v %+v", containerConfig, mounts) resourceRequirements := capacitymanager.ParseResourceUsageConfig(j.Spec.Resources) @@ -274,8 +276,10 @@ func (e *Executor) RunShard(ctx context.Context, j executor.Job, shardIndex int) ) if err != nil { return "", fmt.Errorf("failed to create container: %w", err) + } else { + log.Trace().Msgf("Created container: %s", jobContainer.ID) } - + err = e.Client.ContainerStart( ctx, jobContainer.ID, @@ -283,7 +287,10 @@ func (e *Executor) RunShard(ctx context.Context, j executor.Job, shardIndex int) ) if err != nil { return "", fmt.Errorf("failed to start container: %w", err) + } else { + log.Trace().Msgf("Started container: %s", jobContainer.ID) } + defer e.cleanupJob(j, shardIndex) // the idea here is even if the container errors From 0b8637fd4b83e8bfc3affd3f39947ae79f6801cd Mon Sep 17 00:00:00 2001 From: Enrico Rotundo Date: Thu, 11 Aug 2022 19:36:12 +0200 Subject: [PATCH 22/26] Fix test cases for --local flag --- cmd/bacalhau/docker_run_test.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/cmd/bacalhau/docker_run_test.go b/cmd/bacalhau/docker_run_test.go index 0369824dc0..c8fa7e524b 100644 --- a/cmd/bacalhau/docker_run_test.go +++ b/cmd/bacalhau/docker_run_test.go @@ -182,28 +182,46 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitWait() { } func (suite *DockerRunSuite) TestRun_GenericSubmitLocal() { - // TODO @enricorotundo check if --wait is working - args := []string{"docker", "run", "ubuntu", "echo", "hello", "--local", "--wait"} expectedStdout := "hello" + args := []string{"docker", "run", "ubuntu", "echo", expectedStdout, "--local", "--wait", "--download"} done := capture() + + dir, _ := ioutil.TempDir("", "bacalhau-TestRun_GenericSubmitLocal-") + defer func() { + err := os.RemoveAll(dir) + require.NoError(suite.T(), err) + }() + runDownloadFlags.OutputDir = dir + _, _, err := ExecuteTestCobraCommand(suite.T(), suite.rootCmd, args...) out, _ := done() require.NoError(suite.T(), err) trimmedStdout := strings.TrimSpace(string(out)) - fmt.Println(trimmedStdout) require.Equal(suite.T(), expectedStdout, trimmedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) + + runDownloadFlags.OutputDir = "." } func (suite *DockerRunSuite) TestRun_GenericSubmitLocalInput() { CID := "QmZULkCELmmk5XNfCgTnCyFgAVxBRBXyDHGGMVoLFLiXEN" args := []string{"docker", "run", "--local", + "--wait", + "--download", "-v", fmt.Sprintf("%s:/hello.txt", CID), "ubuntu", "cat", "hello.txt"} expectedStdout := "hello" + + dir, _ := ioutil.TempDir("", "bacalhau-TestRun_GenericSubmitLocalInput-") + defer func() { + err := os.RemoveAll(dir) + require.NoError(suite.T(), err) + }() + runDownloadFlags.OutputDir = dir + done := capture() _, _, err := ExecuteTestCobraCommand(suite.T(), suite.rootCmd, args...) out, _ := done() @@ -212,7 +230,9 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitLocalInput() { trimmedStdout := strings.TrimSpace(string(out)) fmt.Println(trimmedStdout) - require.Equal(suite.T(), trimmedStdout, expectedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) + require.Equal(suite.T(), expectedStdout, trimmedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) + + runDownloadFlags.OutputDir = "." } func (suite *DockerRunSuite) TestRun_SubmitInputs() { From 5a248cd403414bb87b4364b039beb1defd715e4f Mon Sep 17 00:00:00 2001 From: Enrico Rotundo Date: Thu, 11 Aug 2022 20:12:16 +0200 Subject: [PATCH 23/26] Fix CI errors --- cmd/bacalhau/docker_run.go | 10 +++++----- pkg/executor/docker/executor.go | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/bacalhau/docker_run.go b/cmd/bacalhau/docker_run.go index 5135a99c49..5d67fc0a9e 100644 --- a/cmd/bacalhau/docker_run.go +++ b/cmd/bacalhau/docker_run.go @@ -283,12 +283,12 @@ var dockerRunCmd = &cobra.Command{ var apiClient *publicapi.APIClient if isLocal { - stack, err := devstack.NewDevStackForRunLocal(cm, 1) - if err != nil { - return err + stack, errLocalDevStack := devstack.NewDevStackForRunLocal(cm, 1) + if errLocalDevStack != nil { + return errLocalDevStack } - apiUri := stack.Nodes[0].APIServer.GetURI() - apiClient = publicapi.NewAPIClient(apiUri) + apiURI := stack.Nodes[0].APIServer.GetURI() + apiClient = publicapi.NewAPIClient(apiURI) } else { apiClient = getAPIClient() } diff --git a/pkg/executor/docker/executor.go b/pkg/executor/docker/executor.go index 9f37811b04..a276dae262 100644 --- a/pkg/executor/docker/executor.go +++ b/pkg/executor/docker/executor.go @@ -242,7 +242,7 @@ func (e *Executor) RunShard(ctx context.Context, j executor.Job, shardIndex int) NetworkDisabled: true, WorkingDir: j.Spec.Docker.WorkingDir, } - + log.Trace().Msgf("Container: %+v %+v", containerConfig, mounts) resourceRequirements := capacitymanager.ParseResourceUsageConfig(j.Spec.Resources) @@ -279,7 +279,7 @@ func (e *Executor) RunShard(ctx context.Context, j executor.Job, shardIndex int) } else { log.Trace().Msgf("Created container: %s", jobContainer.ID) } - + err = e.Client.ContainerStart( ctx, jobContainer.ID, @@ -290,7 +290,7 @@ func (e *Executor) RunShard(ctx context.Context, j executor.Job, shardIndex int) } else { log.Trace().Msgf("Started container: %s", jobContainer.ID) } - + defer e.cleanupJob(j, shardIndex) // the idea here is even if the container errors From 33d5bc2477815e3dbbe2bc23635f64edfb5b71fb Mon Sep 17 00:00:00 2001 From: js-ts Date: Fri, 12 Aug 2022 07:13:11 +0530 Subject: [PATCH 24/26] Pull to fix tests, Remove the Local interface --- pkg/executor/types.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/pkg/executor/types.go b/pkg/executor/types.go index 3dce938502..9225c8e488 100644 --- a/pkg/executor/types.go +++ b/pkg/executor/types.go @@ -31,24 +31,6 @@ type Executor interface { RunShard(ctx context.Context, job Job, shardIndex int) (string, error) } -type Local interface { - // tells you if the required software is installed on this machine - // this is used in job selection - IsInstalled(context.Context) (bool, error) - - // used to filter and select jobs - // tells us if the storage resource is "close" i.e. cheap to access - HasStorageLocally(context.Context, storage.StorageSpec) (bool, error) - // tells us how much storage the given volume would consume - // which we then use to calculate if there is capacity - // alongside cpu & memory usage - GetVolumeSize(context.Context, storage.StorageSpec) (uint64, error) - - // run the given job - it's expected that we have already prepared the job - // this will return a local filesystem path to the jobs results - RunJobLocally(context.Context, JobSpec) (string, error) -} - // Job contains data about a job in the bacalhau network. type Job struct { // The unique global ID of this job in the bacalhau network. From 5de5dda0ec0d2a2753afe70aeb6971ba774fa701 Mon Sep 17 00:00:00 2001 From: js-ts Date: Fri, 12 Aug 2022 11:21:21 +0530 Subject: [PATCH 25/26] Add GPU support --- cmd/bacalhau/docker_run.go | 2 +- pkg/devstack/devstack.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cmd/bacalhau/docker_run.go b/cmd/bacalhau/docker_run.go index 5d67fc0a9e..ebc6c39ff8 100644 --- a/cmd/bacalhau/docker_run.go +++ b/cmd/bacalhau/docker_run.go @@ -283,7 +283,7 @@ var dockerRunCmd = &cobra.Command{ var apiClient *publicapi.APIClient if isLocal { - stack, errLocalDevStack := devstack.NewDevStackForRunLocal(cm, 1) + stack, errLocalDevStack := devstack.NewDevStackForRunLocal(cm, 1, jobGPU) if errLocalDevStack != nil { return errLocalDevStack } diff --git a/pkg/devstack/devstack.go b/pkg/devstack/devstack.go index cf3919e857..7de4130125 100644 --- a/pkg/devstack/devstack.go +++ b/pkg/devstack/devstack.go @@ -7,6 +7,7 @@ import ( "os" "strings" + "github.com/filecoin-project/bacalhau/pkg/capacitymanager" "github.com/filecoin-project/bacalhau/pkg/computenode" "github.com/filecoin-project/bacalhau/pkg/config" "github.com/filecoin-project/bacalhau/pkg/controller" @@ -73,7 +74,8 @@ type GetVerifiersFunc func( func NewDevStackForRunLocal( cm *system.CleanupManager, - count int, //nolint:unparam // Incorrectly assumed as unused + count int, + jobGPU string, //nolint:unparam // Incorrectly assumed as unused ) (*DevStack, error) { getStorageProviders := func(ipfsMultiAddress string, nodeIndex int) (map[storage.StorageSourceType]storage.StorageProvider, error) { return executor_util.NewStandardStorageProviders(cm, ipfsMultiAddress) @@ -121,6 +123,10 @@ func NewDevStackForRunLocal( JobSelectionPolicy: computenode.JobSelectionPolicy{ Locality: computenode.Anywhere, RejectStatelessJobs: false, + }, CapacityManagerConfig: capacitymanager.Config{ + ResourceLimitTotal: capacitymanager.ResourceUsageConfig{ + GPU: jobGPU, + }, }, }, "", From c199751cea65e5f883d3146e91d28ae0b6a787dc Mon Sep 17 00:00:00 2001 From: js-ts Date: Fri, 12 Aug 2022 18:11:30 +0530 Subject: [PATCH 26/26] Add test for local output --- cmd/bacalhau/docker_run_test.go | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/cmd/bacalhau/docker_run_test.go b/cmd/bacalhau/docker_run_test.go index 93a7cd8af1..4b2a058e7f 100644 --- a/cmd/bacalhau/docker_run_test.go +++ b/cmd/bacalhau/docker_run_test.go @@ -200,7 +200,7 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitLocal() { trimmedStdout := strings.TrimSpace(string(out)) require.Equal(suite.T(), expectedStdout, trimmedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) - + runDownloadFlags.OutputDir = "." } @@ -235,6 +235,35 @@ func (suite *DockerRunSuite) TestRun_GenericSubmitLocalInput() { runDownloadFlags.OutputDir = "." } +func (suite *DockerRunSuite) TestRun_GenericSubmitLocalOutput() { + args := []string{"docker", "run", + "ubuntu", + "--local", + "--wait", + "--download", + "-w", "/outputs", + "--", + "/bin/bash", "-c", "printf hello > hello.txt"} + expectedStdout := "hello" + + // done := capture() + _, _, err := ExecuteTestCobraCommand(suite.T(), suite.rootCmd, args...) + if err != nil { + fmt.Print(err) + } + // out, _ := done() + + require.NoError(suite.T(), err) + content, _ := ioutil.ReadFile("volumes/outputs/hello.txt") + out := string(content) + trimmedStdout := strings.TrimSpace(string(out)) + fmt.Println(trimmedStdout) + + require.Equal(suite.T(), expectedStdout, trimmedStdout, "Expected %s as output, but got %s", expectedStdout, trimmedStdout) + + runDownloadFlags.OutputDir = "." +} + func (suite *DockerRunSuite) TestRun_SubmitInputs() { tests := []struct { numberOfJobs int