Skip to content

Commit

Permalink
Merge pull request #5 from upfluence/shutdown-behavour
Browse files Browse the repository at this point in the history
Shutdown behaviours implementation
  • Loading branch information
AlexisMontagne committed May 12, 2015
2 parents 9b6df65 + 1d34f79 commit 9e64841
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 91 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
etcdenv-linux-amd64-*
etcdenv-darwin-amd64-*
etcdenv-*
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@ GOPATH=`pwd`/Godeps/_workspace go build -o etcdenv .
| ------ | ------- | ----------- |
| `server`, `s` | http://127.0.0.1:4001 | Location of the etcd server |
| `namespace`, `n`| /environments/production | Etcd directory where the environment variables are fetched. You can watch multiple namespaces by using a comma-separated list (/environments/production,/environments/global) |
| `r` | false | Not restart the command when a key watch change |
| `shutdown-behaviour`, `b` | keepalive | Strategy to apply when the process exit, further information into the next paragraph |
| `watched`, `w` | `""` | A comma-separated list of environment variables triggering the command restart when they change |


### Shutdown strategies

* `restart`: `etcdenv` rerun the command when the wrapped process exits
* `exit`: The `etcdenv` process exits with the same exit status as the
wrapped process's
* `keepalive`: The `etcdenv` process will stay alive and keep looking
for etcd changes to re-run the command

### Command line

The CLI interface supports all of the options detailed above.
Expand Down
26 changes: 16 additions & 10 deletions etcdenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ import (
"flag"
"fmt"
"github.com/upfluence/etcdenv/etcdenv"
"log"
"os"
"os/signal"
"strings"
)

const currentVersion = "0.1.3"
const currentVersion = "0.2.0"

var (
flagset = flag.NewFlagSet("etcdenv", flag.ExitOnError)
flags = struct {
Version bool
RestartOnChange bool

Server string
Namespace string
WatchedKeys string
Version bool
ShutdownBehaviour string
Server string
Namespace string
WatchedKeys string
}{}
)

Expand All @@ -53,7 +53,8 @@ func init() {
flagset.BoolVar(&flags.Version, "version", false, "Print the version and exit")
flagset.BoolVar(&flags.Version, "v", false, "Print the version and exit")

flagset.BoolVar(&flags.RestartOnChange, "r", false, "Not restart the command when a value change")
flagset.StringVar(&flags.ShutdownBehaviour, "b", "keepalive", "Behaviour when the process stop [exit|keepalive|restart]")
flagset.StringVar(&flags.ShutdownBehaviour, "shutdown-behaviour", "keepalive", "Behaviour when the process stop [exit|keepalive|restart]")

flagset.StringVar(&flags.Server, "server", "http://127.0.0.1:4001", "Location of the etcd server")
flagset.StringVar(&flags.Server, "s", "http://127.0.0.1:4001", "Location of the etcd server")
Expand Down Expand Up @@ -90,14 +91,19 @@ func main() {
watchedKeysList = strings.Split(flags.WatchedKeys, ",")
}

ctx := etcdenv.NewContext(
ctx, err := etcdenv.NewContext(
strings.Split(flags.Namespace, ","),
[]string{flags.Server},
flagset.Args(),
!flags.RestartOnChange,
flags.ShutdownBehaviour,
watchedKeysList,
)

if err != nil {
log.Fatalf(err.Error())
os.Exit(1)
}

go ctx.Run()

select {
Expand Down
149 changes: 72 additions & 77 deletions etcdenv/context.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
package etcdenv

import (
"errors"
"fmt"
"github.com/cenkalti/backoff"
"github.com/coreos/go-etcd/etcd"
"log"
"os"
"os/exec"
"reflect"
"strings"
"syscall"
"time"
)

type Context struct {
Namespaces []string
Runner *Runner
ExitChan chan bool
RestartOnChange bool
WatchedKeys []string
CurrentEnv map[string]string
maxRetry int
etcdClient *etcd.Client
Namespaces []string
Runner *Runner
ExitChan chan bool
ShutdownBehaviour string
WatchedKeys []string
CurrentEnv map[string]string
maxRetry int
etcdClient *etcd.Client
}

func NewContext(namespaces []string, endpoints, command []string, restart bool, watchedKeys []string) *Context {
return &Context{
Namespaces: namespaces,
Runner: NewRunner(command),
etcdClient: etcd.NewClient(endpoints),
RestartOnChange: restart,
ExitChan: make(chan bool),
WatchedKeys: watchedKeys,
CurrentEnv: make(map[string]string),
maxRetry: 3,
func NewContext(namespaces []string, endpoints, command []string,
shutdownBehaviour string, watchedKeys []string) (*Context, error) {

if shutdownBehaviour != "keepalive" && shutdownBehaviour != "restart" &&
shutdownBehaviour != "exit" {
return nil,
errors.New("Choose a correct shutdown behaviour : keepalive | exit | restart")
}

return &Context{
Namespaces: namespaces,
Runner: NewRunner(command),
etcdClient: etcd.NewClient(endpoints),
ShutdownBehaviour: shutdownBehaviour,
ExitChan: make(chan bool),
WatchedKeys: watchedKeys,
CurrentEnv: make(map[string]string),
maxRetry: 3,
}, nil
}

func (ctx *Context) escapeNamespace(key string) string {
Expand Down Expand Up @@ -122,82 +130,69 @@ func (ctx *Context) Run() {
ctx.CurrentEnv = ctx.fetchEtcdVariables()
ctx.Runner.Start(ctx.CurrentEnv)

if ctx.RestartOnChange {
responseChan := make(chan *etcd.Response)
responseChan := make(chan *etcd.Response)
processExitChan := make(chan int)

for _, namespace := range ctx.Namespaces {
go func() {
var t time.Duration
b := backoff.NewExponentialBackOff()
b.Reset()

for {
resp, err := ctx.etcdClient.Watch(namespace, 0, true, nil, ctx.ExitChan)

if err != nil {
log.Println(err.Error())
for _, namespace := range ctx.Namespaces {
go func() {
var t time.Duration
b := backoff.NewExponentialBackOff()
b.Reset()

if !reflect.TypeOf(err).ConvertibleTo(etcdErrorType) {
continue
}
for {
resp, err := ctx.etcdClient.Watch(namespace, 0, true, nil, ctx.ExitChan)

if err.(*etcd.EtcdError).ErrorCode == etcd.ErrCodeEtcdNotReachable {
t = b.NextBackOff()
log.Printf("Can't join the etcd server, wait %v", t)
time.Sleep(t)
}
if err != nil {
log.Println(err.Error())

if t == backoff.Stop {
return
} else {
continue
}
if !reflect.TypeOf(err).ConvertibleTo(etcdErrorType) {
continue
}

log.Printf("%s key changed", resp.Node.Key)
if err.(*etcd.EtcdError).ErrorCode == etcd.ErrCodeEtcdNotReachable {
t = b.NextBackOff()
log.Printf("Can't join the etcd server, wait %v", t)
time.Sleep(t)
}

if ctx.shouldRestart(ctx.escapeNamespace(resp.Node.Key), resp.Node.Value) {
responseChan <- resp
if t == backoff.Stop {
return
} else {
continue
}
}
}()
}

for {
select {
case <-responseChan:
log.Println("Process restarted")
ctx.CurrentEnv = ctx.fetchEtcdVariables()
ctx.Runner.Restart(ctx.CurrentEnv)
case <-ctx.ExitChan:
ctx.Runner.Stop()
}
}

} else {
processExitChan := make(chan int)
log.Printf("%s key changed", resp.Node.Key)

time.Sleep(200 * time.Millisecond)

go func() {
err := ctx.Runner.Wait()
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
processExitChan <- status.ExitStatus()
} else {
processExitChan <- 0
if ctx.shouldRestart(ctx.escapeNamespace(resp.Node.Key), resp.Node.Value) {
responseChan <- resp
}
} else {
processExitChan <- 0
}
}()
}

go ctx.Runner.WatchProcess(processExitChan)

for {
select {
case <-responseChan:
log.Println("Process restarted")
ctx.CurrentEnv = ctx.fetchEtcdVariables()
ctx.Runner.Restart(ctx.CurrentEnv)
case <-ctx.ExitChan:
ctx.Runner.Stop()
case status := <-processExitChan:
ctx.ExitChan <- true
os.Exit(status)
log.Println(fmt.Sprintf("Process exited with the status %d", status))

if ctx.ShutdownBehaviour == "exit" {
ctx.ExitChan <- true
os.Exit(status)
} else if ctx.ShutdownBehaviour == "restart" {
log.Println("Process restarted")
ctx.CurrentEnv = ctx.fetchEtcdVariables()
ctx.Runner.Restart(ctx.CurrentEnv)
go ctx.Runner.WatchProcess(processExitChan)
}
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions etcdenv/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"os"
"os/exec"
"syscall"
"time"
)

type Runner struct {
Expand Down Expand Up @@ -75,3 +77,17 @@ func (r *Runner) Wait() error {

return err
}

func (r *Runner) WatchProcess(exitStatus chan int) {
time.Sleep(200 * time.Millisecond)
err := r.Wait()
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
exitStatus <- status.ExitStatus()
} else {
exitStatus <- 0
}
} else {
exitStatus <- 0
}
}
3 changes: 2 additions & 1 deletion release
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/bin/sh

export GOPATH=`pwd`/Godeps/_workspace:$GOPATH
version=$1

version=`grep currentVersion etcdenv.go | head -n 1 | cut -d\" -f2`

git tag v$version

Expand Down

0 comments on commit 9e64841

Please sign in to comment.