Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement --dry-run, formerly --plan #862

Merged
merged 14 commits into from
Mar 11, 2022
56 changes: 22 additions & 34 deletions cli/internal/core/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ type Task struct {
Deps util.Set
// TopoDeps are dependencies across packages within the same topological graph (e.g. parent `build` -> child `build`) */
TopoDeps util.Set
Cache *bool
Run func(cwd string) error
}

type scheduler struct {
type Visitor = func(taskID string) error

type Scheduler struct {
// TopologicGraph is a graph of workspaces
TopologicGraph *dag.AcyclicGraph
// TaskGraph is a graph of package-tasks
Expand All @@ -30,15 +30,11 @@ type scheduler struct {
Tasks map[string]*Task
taskDeps [][]string
PackageTaskDeps [][]string
// Concurrency is the number of concurrent tasks that can be executed
Concurrency int
// Parallel is whether to run tasks in parallel
Parallel bool
}

// NewScheduler creates a new scheduler given a topologic graph of workspace package names
func NewScheduler(topologicalGraph *dag.AcyclicGraph) *scheduler {
return &scheduler{
func NewScheduler(topologicalGraph *dag.AcyclicGraph) *Scheduler {
return &Scheduler{
Tasks: make(map[string]*Task),
TopologicGraph: topologicalGraph,
TaskGraph: &dag.AcyclicGraph{},
Expand All @@ -53,15 +49,11 @@ type SchedulerExecutionOptions struct {
Packages []string
// TaskNames in the execution scope, if nil, all tasks will be executed
TaskNames []string
// Concurrency is the number of concurrent tasks that can be executed
Concurrency int
// Parallel is whether to run tasks in parallel
Parallel bool
// Restrict execution to only the listed task names
TasksOnly bool
}

func (p *scheduler) Prepare(options *SchedulerExecutionOptions) error {
func (p *Scheduler) Prepare(options *SchedulerExecutionOptions) error {
pkgs := options.Packages
if len(pkgs) == 0 {
// TODO(gsoltis): Is this behavior only used in tests?
Expand All @@ -72,49 +64,45 @@ func (p *scheduler) Prepare(options *SchedulerExecutionOptions) error {

tasks := options.TaskNames
if len(tasks) == 0 {
// TODO(gsoltis): Is this behavior used?
for key := range p.Tasks {
tasks = append(tasks, key)
}
}

p.Concurrency = options.Concurrency

p.Parallel = options.Parallel

if err := p.generateTaskGraph(pkgs, tasks, options.TasksOnly); err != nil {
return err
}

return nil
}

// ExecOpts controls a single walk of the task graph
type ExecOpts struct {
// Parallel is whether to run tasks in parallel
Parallel bool
// Concurrency is the number of concurrent tasks that can be executed
Concurrency int
}

// Execute executes the pipeline, constructing an internal task graph and walking it accordingly.
func (p *scheduler) Execute() []error {
var sema = util.NewSemaphore(p.Concurrency)
func (p *Scheduler) Execute(visitor Visitor, opts ExecOpts) []error {
var sema = util.NewSemaphore(opts.Concurrency)
return p.TaskGraph.Walk(func(v dag.Vertex) error {
// Always return if it is the root node
if strings.Contains(dag.VertexName(v), ROOT_NODE_NAME) {
return nil
}
// Acquire the semaphore unless parallel
if !p.Parallel {
if !opts.Parallel {
sema.Acquire()
defer sema.Release()
}
// Find and run the task for the current vertex
_, taskName := util.GetPackageTaskFromId(dag.VertexName(v))
task, ok := p.Tasks[taskName]
if !ok {
return fmt.Errorf("task %s not found", dag.VertexName(v))
}
if task.Run != nil {
return task.Run(dag.VertexName(v))
}
return nil
return visitor(dag.VertexName(v))
})
}

func (p *scheduler) generateTaskGraph(scope []string, taskNames []string, tasksOnly bool) error {
func (p *Scheduler) generateTaskGraph(scope []string, taskNames []string, tasksOnly bool) error {
if p.PackageTaskDeps == nil {
p.PackageTaskDeps = [][]string{}
}
Expand Down Expand Up @@ -238,12 +226,12 @@ func getPackageTaskDepsMap(packageTaskDeps [][]string) map[string][]string {
return depMap
}

func (p *scheduler) AddTask(task *Task) *scheduler {
func (p *Scheduler) AddTask(task *Task) *Scheduler {
p.Tasks[task.Name] = task
return p
}

func (p *scheduler) AddDep(fromTaskId string, toTaskId string) *scheduler {
func (p *Scheduler) AddDep(fromTaskId string, toTaskId string) *Scheduler {
p.PackageTaskDeps = append(p.PackageTaskDeps, []string{fromTaskId, toTaskId})
return p
}
58 changes: 18 additions & 40 deletions cli/internal/core/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ import (
"fmt"
"strings"
"testing"

"github.com/vercel/turborepo/cli/internal/util"

"github.com/pyr-sh/dag"
)

func testVisitor(taskID string) error {
fmt.Println(taskID)
return nil
}

func TestSchedulerDefault(t *testing.T) {
var g dag.AcyclicGraph
g.Add("a")
Expand All @@ -26,34 +32,18 @@ func TestSchedulerDefault(t *testing.T) {
Name: "build",
TopoDeps: topoDeps,
Deps: deps,
Run: func(cwd string) error {
fmt.Println(cwd)
return nil
},
})
p.AddTask(&Task{
Name: "test",
TopoDeps: topoDeps,
Deps: deps,
Run: func(cwd string) error {
fmt.Println(cwd)
return nil
},
})
p.AddTask(&Task{
Name: "prepare",
Run: func(cwd string) error {
fmt.Println(cwd)
return nil
},
})
p.AddTask(&Task{
Name: "side-quest", // not in the build/test tree
Deps: deps,
Run: func(cwd string) error {
fmt.Println(cwd)
return nil
},
})

if _, ok := p.Tasks["build"]; !ok {
Expand All @@ -65,18 +55,18 @@ func TestSchedulerDefault(t *testing.T) {
}

err := p.Prepare(&SchedulerExecutionOptions{
Packages: nil,
TaskNames: []string{"test"},
Concurrency: 10,
Parallel: false,
TasksOnly: false,
Packages: []string{"a", "b", "c"},
TaskNames: []string{"test"},
TasksOnly: false,
})

if err != nil {
t.Fatalf("%v", err)
}

errs := p.Execute()
errs := p.Execute(testVisitor, ExecOpts{
Concurrency: 10,
})

for _, err := range errs {
t.Fatalf("%v", err)
Expand Down Expand Up @@ -106,26 +96,14 @@ func TestSchedulerTasksOnly(t *testing.T) {
Name: "build",
TopoDeps: topoDeps,
Deps: deps,
Run: func(cwd string) error {
fmt.Println(cwd)
return nil
},
})
p.AddTask(&Task{
Name: "test",
TopoDeps: topoDeps,
Deps: deps,
Run: func(cwd string) error {
fmt.Println(cwd)
return nil
},
})
p.AddTask(&Task{
Name: "prepare",
Run: func(cwd string) error {
fmt.Println(cwd)
return nil
},
})

if _, ok := p.Tasks["build"]; !ok {
Expand All @@ -137,18 +115,18 @@ func TestSchedulerTasksOnly(t *testing.T) {
}

err := p.Prepare(&SchedulerExecutionOptions{
Packages: nil,
TaskNames: []string{"test"},
Concurrency: 10,
Parallel: false,
TasksOnly: true,
Packages: []string{"a", "b", "c"},
TaskNames: []string{"test"},
TasksOnly: true,
})

if err != nil {
t.Fatalf("%v", err)
}

errs := p.Execute()
errs := p.Execute(testVisitor, ExecOpts{
Concurrency: 10,
})

for _, err := range errs {
t.Fatalf("%v", err)
Expand Down
Loading