From b7687890598c35efc424f81ddf64bba3ef5ac582 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Thu, 3 Mar 2022 17:46:24 +0530 Subject: [PATCH] Add ability to stop routines on partial error. Signed-off-by: Harkishen-Singh --- CHANGELOG.md | 1 + go.mod | 1 + pkg/runner/runner.go | 108 +++++++++++++++++++++++++------------------ 3 files changed, 66 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 318a4c5cc5..cfea1df192 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ We use the following categories for changes: ### Fixed - Register `promscale_ingest_channel_len_bucket` metric and make it a gauge [#1177] - Log warning when failing to write response to remote read requests [#1180] +- Fix Promscale running even when some component may fail to start [#1217] ## [0.10.0] - 2022-02-17 diff --git a/go.mod b/go.mod index f829f058e7..f909a3869b 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/jackc/pgtype v1.10.0 github.com/jackc/pgx/v4 v4.15.1-0.20220219175125-b6b24f9e8a5d github.com/jaegertracing/jaeger v1.31.0 + github.com/oklog/run v1.1.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.46.0 github.com/opentracing-contrib/go-stdlib v1.0.0 github.com/opentracing/opentracing-go v1.2.0 diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 16f701dafd..5e7ff52660 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -12,10 +12,12 @@ import ( "fmt" "net" "net/http" - "sync" + "os" + "os/signal" "time" _ "github.com/jackc/pgx/v4/stdlib" + "github.com/oklog/run" "go.opentelemetry.io/collector/model/otlpgrpc" "go.opentelemetry.io/otel" "google.golang.org/grpc" @@ -126,9 +128,7 @@ func Run(cfg *Config) error { return fmt.Errorf("error registering metrics for telemetry: %w", err) } - log.Info("msg", "Started Prometheus HTTP server", "listening-port", cfg.ListenAddr) - - var wg sync.WaitGroup + var group run.Group if len(cfg.ThanosStoreAPIListenAddr) > 0 { srv := thanos.NewStorage(client.Queryable()) options := make([]grpc.ServerOption, 0) @@ -143,22 +143,20 @@ func Run(cfg *Config) error { grpcServer := grpc.NewServer(options...) storepb.RegisterStoreServer(grpcServer, srv) - wg.Add(1) - go func() { - listener, err := net.Listen("tcp", cfg.ThanosStoreAPIListenAddr) - if err != nil { - wg.Done() - log.Error("msg", "Listening for Thanos StoreAPI failed", "err", err) - return - } - - log.Info("msg", "Started Thanos StoreAPI GRPC server", "listening-port", cfg.ThanosStoreAPIListenAddr) - wg.Done() - if err := grpcServer.Serve(listener); err != nil { - log.Error("msg", "Starting the Thanos store failed", "err", err) - return - } - }() + group.Add( + func() error { + listener, err := net.Listen("tcp", cfg.ThanosStoreAPIListenAddr) + if err != nil { + log.Error("msg", "Listening for Thanos StoreAPI failed", "err", err) + return err + } + log.Info("msg", "Started Thanos StoreAPI GRPC server", "listening-port", cfg.ThanosStoreAPIListenAddr) + return grpcServer.Serve(listener) + }, func(error) { + log.Info("msg", "Stopping Thanos StoreAPI GRPC server") + grpcServer.Stop() + }, + ) } options := []grpc.ServerOption{ @@ -194,40 +192,62 @@ func Run(cfg *Config) error { return err } - wg.Add(1) - go func() { + group.Add( + func() error { listener, err := net.Listen("tcp", cfg.OTLPGRPCListenAddr) if err != nil { - wg.Done() - log.Error("msg", "Listening for OTLP GRPC server failed", "err", err) - return + log.Error("msg", "Listening for OpenTelemetry OTLP GRPC server failed", "err", err) + return err } - log.Info("msg", "Started OpenTelemetry OTLP GRPC server", "listening-port", cfg.OTLPGRPCListenAddr) - wg.Done() - if err := grpcServer.Serve(listener); err != nil { - log.Error("msg", "Starting the OTLP GRPC server failed", "err", err) - return - } - }() - } + return grpcServer.Serve(listener) + }, func(error) { + log.Info("msg", "Stopping OpenTelemetry OTLP GRPC server") + grpcServer.Stop() + }, + ) mux := http.NewServeMux() mux.Handle("/", router) - wg.Wait() - log.Info("msg", "All components are ready!") + server := http.Server{ + Addr: cfg.ListenAddr, + Handler: mux, + } + group.Add( + func() error { + var err error + log.Info("msg", "Started Prometheus remote-storage HTTP server", "listening-port", cfg.ListenAddr) + if cfg.TLSCertFile != "" { + err = server.ListenAndServeTLS(cfg.TLSCertFile, cfg.TLSKeyFile) + } else { + err = server.ListenAndServe() + } + return err + }, func(error) { + log.Info("msg", "Stopping Prometheus remote-storage HTTP server") + err = server.Shutdown(context.Background()) + if err != nil { + log.Error("msg", "unable to shutdown Prometheus remote-storage HTTP server", "err", err.Error()) + } + }, + ) - if cfg.TLSCertFile != "" { - err = http.ListenAndServeTLS(cfg.ListenAddr, cfg.TLSCertFile, cfg.TLSKeyFile, mux) - } else { - err = http.ListenAndServe(cfg.ListenAddr, mux) - } + // Listen to OS interrupt signals. + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + group.Add( + func() error { + <-c + return nil + }, func(err error) { + close(c) + }, + ) + err = group.Run() if err != nil { - log.Error("msg", "Listen failure", "err", err) - return startupError + log.Error("msg", "Execution failure, stopping Promscale", "err", err) } - - return nil + return err }