Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #455 from bcwaldon/fleetctl-blocking
Browse files Browse the repository at this point in the history
Block forever in fleetctl operations
  • Loading branch information
bcwaldon committed May 15, 2014
2 parents 20cd7e4 + abbbbd5 commit daa8cd1
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 36 deletions.
65 changes: 41 additions & 24 deletions fleetctl/fleetctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,11 @@ func lazyStartJobs(args []string) ([]string, error) {

// waitForJobStates polls each of the indicated jobs until each of their
// states is equal to that which the caller indicates, or until the
// polling operation times out. waitForJobStates will retry up to N
// times before timing out. Returned is an error channel used to
// communicate when timeouts occur. The returned error channel will be
// closed after all polling operation is complete.
// polling operation times out. waitForJobStates will retry forever, or
// up to maxAttempts times before timing out if maxAttempts is greater
// than zero. Returned is an error channel used to communicate when
// timeouts occur. The returned error channel will be closed after all
// polling operation is complete.
func waitForJobStates(jobs []string, js job.JobState, maxAttempts int, out io.Writer) chan error {
errchan := make(chan error)
var wg sync.WaitGroup
Expand All @@ -465,33 +466,49 @@ func waitForJobStates(jobs []string, js job.JobState, maxAttempts int, out io.Wr
func checkJobState(jobName string, js job.JobState, maxAttempts int, out io.Writer, wg *sync.WaitGroup, errchan chan error) {
defer wg.Done()

for attempts := 0; attempts < maxAttempts; attempts++ {
j, err := registryCtl.GetJob(jobName)
if err != nil {
log.Warningf("Error retrieving Job(%s) from Registry: %v", jobName, err)
continue
sleep := 100 * time.Millisecond

if maxAttempts < 1 {
for {
if assertJobState(jobName, js) {
return
}
time.Sleep(sleep)
}
if j == nil || j.State == nil || *(j.State) != js {
time.Sleep(100 * time.Millisecond)
continue
} else {
for attempt := 0; attempt < maxAttempts; attempt++ {
if assertJobState(jobName, js) {
return
}
time.Sleep(sleep)
}
errchan <- fmt.Errorf("Timed out waiting for job %s to report state %s", jobName, js)
}
}

msg := fmt.Sprintf("Job %s %s", jobName, *(j.State))
func assertJobState(name string, js job.JobState) bool {
j, err := registryCtl.GetJob(name)
if err != nil {
log.Warningf("Error retrieving Job(%s) from Registry: %v", name, err)
return false
}
if j == nil || j.State == nil || *(j.State) != js {
return false
}

tgt, err := registryCtl.GetJobTarget(jobName)
if err != nil {
log.Warningf("Error retrieving target information for Job(%s) from Registry: %v", jobName, err)
} else if tgt != "" {
if ms, _ := registryCtl.GetMachineState(tgt); ms != nil {
msg = fmt.Sprintf("%s on %s", msg, machineFullLegend(*ms, false))
}
}
msg := fmt.Sprintf("Job %s %s", name, *(j.State))

fmt.Fprintln(out, msg)
return
tgt, err := registryCtl.GetJobTarget(name)
if err != nil {
log.Warningf("Error retrieving target information for Job(%s) from Registry: %v", name, err)
} else if tgt != "" {
if ms, _ := registryCtl.GetMachineState(tgt); ms != nil {
msg = fmt.Sprintf("%s on %s", msg, machineFullLegend(*ms, false))
}
}

errchan <- fmt.Errorf("Timed out waiting for job %s to report state %s", jobName, js)
fmt.Fprintln(out, msg)
return true
}

// unitNameMangle tries to turn a string that might not be a unit name into a
Expand Down
2 changes: 1 addition & 1 deletion fleetctl/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (

func init() {
cmdLoadUnits.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "Sign unit file signatures and verify submitted units using local SSH identities.")
cmdLoadUnits.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 10, "Wait until the jobs are loaded, performing up to N attempts before giving up.")
cmdLoadUnits.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the jobs are loaded, performing up to N attempts before giving up. A value of 0 indicates no limit.")
cmdLoadUnits.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the jobs have been loaded before exiting.")
}

Expand Down
6 changes: 1 addition & 5 deletions fleetctl/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import (
)

var (
flagRequire string
flagBlockAttempts int
flagNoBlock bool

cmdStartUnit = &Command{
Name: "start",
Summary: "Instruct systemd to start one or more units in the cluster, first submitting and loading if necessary.",
Expand All @@ -34,7 +30,7 @@ Machine metadata is located in the fleet configuration file.`,

func init() {
cmdStartUnit.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "Sign unit file signatures using local SSH identities.")
cmdStartUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 10, "Wait until the jobs are launched, performing up to N attempts before giving up.")
cmdStartUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the jobs are launched, performing up to N attempts before giving up. A value of 0 indicates no limit.")
cmdStartUnit.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the jobs have been launched before exiting.")
}

Expand Down
2 changes: 1 addition & 1 deletion fleetctl/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Stop an entire directory of units with glob matching:
}

func init() {
cmdStopUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 10, "Wait until the jobs are stopped, performing up to N attempts before giving up.")
cmdStopUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the jobs are stopped, performing up to N attempts before giving up. A value of 0 indicates no limit.")
cmdStopUnit.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the jobs have stopped before exiting.")
}

Expand Down
2 changes: 1 addition & 1 deletion fleetctl/unload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (
)

func init() {
cmdUnloadUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 10, "Wait until the jobs are inactive, performing up to N attempts before giving up.")
cmdUnloadUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the jobs are inactive, performing up to N attempts before giving up. A value of 0 indicates no limit.")
cmdUnloadUnit.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the jobs have become inactive before exiting.")
}

Expand Down
6 changes: 2 additions & 4 deletions functional/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,13 @@ func TestScheduleOneWayConflict(t *testing.T) {

// Start a unit that conflicts with a yet-to-be-scheduled unit
name := "fixtures/units/conflicts-with-hello.service"
if _, _, err := cluster.Fleetctl("start", name); err != nil {
if _, _, err := cluster.Fleetctl("start", "--no-block", name); err != nil {
t.Fatalf("Failed starting unit %s: %v", name, err)
}

// Start a unit that has not defined conflicts
name = "fixtures/units/hello.service"
if _, _, err := cluster.Fleetctl("start", name); err == nil {
t.Fatalf("Unit %s unexpectedly started", name)
}
cluster.Fleetctl("start", "--no-block", name)

// Both units should show up, but only conflicts-with-hello.service
// should report ACTIVE
Expand Down

0 comments on commit daa8cd1

Please sign in to comment.