Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native execution of very simple WASM jobs #816

Merged
merged 5 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/bacalhau/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func init() { //nolint:gochecknoinits // Using init in cobra command is idomatic

// Plumbing commands (advanced usage)
RootCmd.AddCommand(dockerCmd)
RootCmd.AddCommand(wasmCmd)

// Porcelain commands (language specific easy to use commands)
RootCmd.AddCommand(runCmd)
Expand All @@ -51,8 +52,6 @@ func init() { //nolint:gochecknoinits // Using init in cobra command is idomatic
RootCmd.AddCommand(idCmd)
RootCmd.AddCommand(devstackCmd)

// TODO: RootCmd.AddCommand(wasmCmd)

defaultAPIHost := system.Envs[system.Production].APIHost
defaultAPIPort := system.Envs[system.Production].APIPort

Expand Down
126 changes: 66 additions & 60 deletions cmd/bacalhau/run_python.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"time"

"github.com/c2h5oh/datasize"
"github.com/filecoin-project/bacalhau/pkg/job"
"github.com/filecoin-project/bacalhau/pkg/model"
"github.com/filecoin-project/bacalhau/pkg/system"
Expand All @@ -31,6 +32,8 @@ var (
OLR = NewLanguageRunOptions()
)

const maximumContextSize datasize.ByteSize = 10 * datasize.MB

// LanguageRunOptions declares the arguments accepted by the `'language' run` command
type LanguageRunOptions struct {
Deterministic bool // Execute this job deterministically
Expand Down Expand Up @@ -165,12 +168,6 @@ var runPythonCmd = &cobra.Command{
defer rootSpan.End()
cm.RegisterCallback(system.CleanupTraceProvider)

// error if determinism is false
if !OLR.Deterministic {
return fmt.Errorf("determinism=false not supported yet " +
"(python only supports wasm backend with forced determinism)")
}

// TODO: prepare context

var programPath string
Expand All @@ -192,69 +189,78 @@ var runPythonCmd = &cobra.Command{
OLR.InputVolumes = append(OLR.InputVolumes, "/inputs:/inputs")
}

//nolint:lll // it's ok to be long
// TODO: #450 These two code paths make me nervous - the fact that we have ConstructLanguageJob and ConstructDockerJob as separate means manually keeping them in sync.
j, err := job.ConstructLanguageJob(
model.APIVersionLatest(),
OLR.InputVolumes,
OLR.InputUrls,
OLR.OutputVolumes,
[]string{}, // no env vars (yet)
OLR.Concurrency,
OLR.Confidence,
OLR.MinBids,
"python",
"3.10",
OLR.Command,
programPath,
OLR.RequirementsPath,
OLR.ContextPath,
OLR.Deterministic,
OLR.Labels,
doNotTrack,
)
if err != nil {
return err
}

var buf bytes.Buffer

if OLR.ContextPath == "." && OLR.RequirementsPath == "" && programPath == "" {
cmd.Println("no program or requirements specified, not uploading context - set --context-path to full path to force context upload")
OLR.ContextPath = ""
}
return SubmitLanguageJob(cmd, ctx, "python", "3.10", programPath)
},
}

if OLR.ContextPath != "" {
// construct a tar file from the contextPath directory
// tar + gzip
cmd.Printf("Uploading %s to server to execute command in context, press Ctrl+C to cancel\n", OLR.ContextPath)
time.Sleep(1 * time.Second)
err = compress(ctx, OLR.ContextPath, &buf)
if err != nil {
return err
}
func SubmitLanguageJob(cmd *cobra.Command, ctx context.Context, language, version, programPath string) error {
//nolint:lll // it's ok to be long
// TODO: #450 These two code paths make me nervous - the fact that we have ConstructLanguageJob and ConstructDockerJob as separate means manually keeping them in sync.
j, err := job.ConstructLanguageJob(
model.APIVersionLatest(),
OLR.InputVolumes,
OLR.InputUrls,
OLR.OutputVolumes,
[]string{}, // no env vars (yet)
OLR.Concurrency,
OLR.Confidence,
OLR.MinBids,
language,
version,
OLR.Command,
programPath,
OLR.RequirementsPath,
OLR.ContextPath,
OLR.Deterministic,
OLR.Labels,
doNotTrack,
)
if err != nil {
return err
}

// check size of buf
if buf.Len() > 10*1024*1024 {
Fatal("context tar file is too large (>10MiB)", 1)
}
// error if determinism is false
if !OLR.Deterministic {
return fmt.Errorf("determinism=false not supported yet " +
"(languages only support wasm backend with forced determinism)")
}

}
var buf bytes.Buffer

log.Debug().Msgf(
"submitting job %+v", j)
if OLR.ContextPath == "." && OLR.RequirementsPath == "" && programPath == "" {
cmd.Println("no program or requirements specified, not uploading context - set --context-path to full path to force context upload")
OLR.ContextPath = ""
}

returnedJob, err := GetAPIClient().Submit(ctx, j, &buf)
if OLR.ContextPath != "" {
// construct a tar file from the contextPath directory
// tar + gzip
cmd.Printf("Uploading %s to server to execute command in context, press Ctrl+C to cancel\n", OLR.ContextPath)
time.Sleep(1 * time.Second)
err = compress(ctx, OLR.ContextPath, &buf)
if err != nil {
Fatal(fmt.Sprintf("Error submitting job: %s", err), 1)
return err
}

err = PrintResultsToUser(ctx, returnedJob)
if err != nil {
Fatal(fmt.Sprintf("Error submitting job: %s", err), 1)
// check size of buf
if buf.Len() > int(maximumContextSize) {
Fatal(fmt.Sprintf("context tar file is too large (> %s)", maximumContextSize.HumanReadable()), 1)
}
return nil
},
}

log.Debug().Msgf(
"submitting job %+v", j)

returnedJob, err := GetAPIClient().Submit(ctx, j, &buf)
if err != nil {
Fatal(fmt.Sprintf("Error submitting job: %s", err), 1)
}

err = PrintResultsToUser(ctx, returnedJob)
if err != nil {
Fatal(fmt.Sprintf("Error submitting job: %s", err), 1)
}
return nil
}

// from https://github.com/mimoo/eureka/blob/master/folders.go under Apache 2
Expand Down
86 changes: 86 additions & 0 deletions cmd/bacalhau/wasm_run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package bacalhau

import (
"fmt"

"github.com/bytecodealliance/wasmtime-go"
"github.com/filecoin-project/bacalhau/pkg/executor/wasm"
"github.com/filecoin-project/bacalhau/pkg/system"
"github.com/filecoin-project/bacalhau/pkg/version"
"github.com/spf13/cobra"
)

func init() { //nolint:gochecknoinits // idiomatic for cobra commands
wasmCmd.AddCommand(runWasmCommand)
wasmCmd.AddCommand(validateWasmCommand)
}

var wasmCmd = &cobra.Command{
Use: "wasm",
Short: "Run and prepare WASM jobs on the network",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
// Check that the server version is compatible with the client version
serverVersion, _ := GetAPIClient().Version(cmd.Context()) // Ok if this fails, version validation will skip
if err := ensureValidVersion(cmd.Context(), version.Get(), serverVersion); err != nil {
Fatal(fmt.Sprintf("version validation failed: %s", err), 1)
return err
}

return nil
},
}

var runWasmCommand = &cobra.Command{
Use: "run",
Short: "Run a WASM job on the network",
Long: languageRunLong,
Example: languageRunExample,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
cm := system.NewCleanupManager()
defer cm.Cleanup()

ctx, rootSpan := system.NewRootSpan(cmd.Context(), system.GetTracer(), "cmd/bacalhau/wasm_run.runWasmCommand")
defer rootSpan.End()
cm.RegisterCallback(system.CleanupTraceProvider)

programPath := args[0]
OLR.ContextPath = programPath
OLR.Command = args[1]

return SubmitLanguageJob(cmd, ctx, "wasm", "2.0", programPath)
},
}

var validateWasmCommand = &cobra.Command{
Use: "validate",
Short: "Check that a WASM program is runnable on the network",
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
cm := system.NewCleanupManager()
defer cm.Cleanup()

_, rootSpan := system.NewRootSpan(cmd.Context(), system.GetTracer(), "cmd/bacalhau/wasm_run.validateWasmCommand")
defer rootSpan.End()
cm.RegisterCallback(system.CleanupTraceProvider)

programPath := args[0]
entryPoint := args[1]

engine := wasmtime.NewEngine()
module, err := wasmtime.NewModuleFromFile(engine, programPath)
if err != nil {
Fatal("Could not load supplied WASM file", 1)
return err
}

err = wasm.ValidateModuleAsEntryPoint(module, entryPoint)
if err != nil {
Fatal(err.Error(), 2)
return err
} else {
cmd.Println("OK")
return nil
}
},
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
)

require (
github.com/bytecodealliance/wasmtime-go v1.0.0
github.com/golang/mock v1.6.0
github.com/imdario/mergo v0.3.5
github.com/invopop/jsonschema v0.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/bytecodealliance/wasmtime-go v1.0.0 h1:9u9gqaUiaJeN5IoD1L7egD8atOnTGyJcNp8BhkL9cUU=
github.com/bytecodealliance/wasmtime-go v1.0.0/go.mod h1:jjlqQbWUfVSbehpErw3UoWFndBXRRMvfikYH6KsCwOg=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b h1:6+ZFm0flnudZzdSE0JxlhR2hKnGPcNB35BjQf4RYQDY=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
Expand Down
28 changes: 21 additions & 7 deletions pkg/executor/language/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ type Executor struct {
executors executor.ExecutorProvider
}

type LanguageSpec struct {
Language, Version string
}

var supportedVersions = map[LanguageSpec]model.Engine{
{"python", "3.10"}: model.EnginePythonWasm,
{"wasm", "2.0"}: model.EngineWasm,
}

func NewExecutor(
ctx context.Context,
cm *system.CleanupManager,
Expand Down Expand Up @@ -50,23 +59,28 @@ func (e *Executor) RunShard(
shard model.JobShard,
jobResultsDir string,
) (*model.RunCommandResult, error) {
if shard.Job.Spec.Language.Language != "python" && shard.Job.Spec.Language.LanguageVersion != "3.10" {
err := fmt.Errorf("only python 3.10 is supported")
requiredLang := LanguageSpec{
Language: shard.Job.Spec.Language.Language,
Version: shard.Job.Spec.Language.LanguageVersion,
}

engineKey, exists := supportedVersions[requiredLang]
if !exists {
err := fmt.Errorf("%v is not supported", requiredLang)
return &model.RunCommandResult{ErrorMsg: err.Error()}, err
}

if shard.Job.Spec.Language.Deterministic {
log.Ctx(ctx).Debug().Msgf("running deterministic python 3.10")
log.Ctx(ctx).Debug().Msgf("Running deterministic %v", requiredLang)
// Instantiate a python_wasm
// TODO: mutate job as needed?
pythonWasmExecutor, err := e.executors.GetExecutor(ctx, model.EnginePythonWasm)
executor, err := e.executors.GetExecutor(ctx, engineKey)
if err != nil {
return nil, err
}
return pythonWasmExecutor.RunShard(ctx, shard, jobResultsDir)
return executor.RunShard(ctx, shard, jobResultsDir)
} else {
log.Ctx(ctx).Debug().Msgf("running arbitrary python 3.10")
err := fmt.Errorf("arbitrary python not supported yet")
err := fmt.Errorf("non-deterministic %v not supported yet", requiredLang)
// TODO: Instantiate a docker with python:3.10 image
return &model.RunCommandResult{ErrorMsg: err.Error()}, err
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/executor/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/filecoin-project/bacalhau/pkg/executor/language"
noop_executor "github.com/filecoin-project/bacalhau/pkg/executor/noop"
pythonwasm "github.com/filecoin-project/bacalhau/pkg/executor/python_wasm"
"github.com/filecoin-project/bacalhau/pkg/executor/wasm"
"github.com/filecoin-project/bacalhau/pkg/model"
"github.com/filecoin-project/bacalhau/pkg/storage"
"github.com/filecoin-project/bacalhau/pkg/storage/combo"
Expand Down Expand Up @@ -111,19 +112,24 @@ func NewStandardExecutorProvider(
cm *system.CleanupManager,
executorOptions StandardExecutorOptions,
) (executor.ExecutorProvider, error) {
storageProviders, err := NewStandardStorageProvider(ctx, cm, executorOptions.Storage)
storageProvider, err := NewStandardStorageProvider(ctx, cm, executorOptions.Storage)
if err != nil {
return nil, err
}

dockerExecutor, err := docker.NewExecutor(ctx, cm, executorOptions.DockerID, storageProviders)
dockerExecutor, err := docker.NewExecutor(ctx, cm, executorOptions.DockerID, storageProvider)
if err != nil {
return nil, err
}

wasmExecutor, err := wasm.NewExecutor(ctx, storageProvider)
if err != nil {
return nil, err
}

executors := executor.NewTypeExecutorProvider(map[model.Engine]executor.Executor{
model.EngineDocker: dockerExecutor,
model.EngineWasm: wasmExecutor,
})

// language executors wrap other executors, so pass them a reference to all
Expand Down
Loading