From 2e8d67992a9900e054762e79e087d5c9307e036c Mon Sep 17 00:00:00 2001 From: Rafael Sampaio <5679073+r4f4ss@users.noreply.github.com> Date: Tue, 17 Sep 2024 20:09:41 -0300 Subject: [PATCH 1/6] Refactor of main to allow graceful shutdown of database, server, UDPv5 and networks --- cmd/shisui/main.go | 101 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 84 insertions(+), 17 deletions(-) diff --git a/cmd/shisui/main.go b/cmd/shisui/main.go index 16243775bd1c..2297b776c937 100644 --- a/cmd/shisui/main.go +++ b/cmd/shisui/main.go @@ -6,9 +6,11 @@ import ( "fmt" "net" "net/http" + "os/signal" "path" "slices" "strings" + "syscall" "os" @@ -45,6 +47,14 @@ type Config struct { Networks []string } +type Client struct { + DiscV5API *discover.DiscV5API + HistoryNetwork *history.HistoryNetwork + BeaconNetwork *beacon.BeaconNetwork + StateNetwork *state.StateNetwork + Server *http.Server +} + var app = flags.NewApp("the go-portal-network command line interface") var ( @@ -85,6 +95,9 @@ func shisui(ctx *cli.Context) error { setDefaultLogger(*config) + clientChan := make(chan *Client, 1) + go sigInterrupt(clientChan) + addr, err := net.ResolveUDPAddr("udp", config.Protocol.ListenAddr) if err != nil { return err @@ -94,7 +107,7 @@ func shisui(ctx *cli.Context) error { return err } - return startPortalRpcServer(*config, conn, config.RpcAddr) + return startPortalRpcServer(*config, conn, config.RpcAddr, clientChan) } func setDefaultLogger(config Config) { @@ -105,7 +118,52 @@ func setDefaultLogger(config Config) { log.SetDefault(defaultLogger) } -func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) error { +func sigInterrupt(clientChan <-chan *Client) { + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(interrupt) + + <-interrupt + log.Warn("Closing Shisui gracefully (type CTRL-C again to force quit)") + + go func() { + if len(clientChan) == 0 { + log.Warn("Waiting for the client to start...") + } + c := <-clientChan + closePortalRpcServer(c) + }() + + <-interrupt + os.Exit(1) +} + +func closePortalRpcServer(client *Client) { + log.Info("Closing Database...") + client.DiscV5API.DiscV5.LocalNode().Database().Close() + log.Info("Closing UDPv5 protocol...") + client.DiscV5API.DiscV5.Close() + if client.HistoryNetwork != nil { + log.Info("Closing history network...") + client.HistoryNetwork.Stop() + } + if client.BeaconNetwork != nil { + log.Info("Closing beacon network...") + client.BeaconNetwork.Stop() + } + if client.StateNetwork != nil { + log.Info("Closing state network...") + client.StateNetwork.Stop() + } + log.Info("Closing servers...") + client.Server.Close() + + os.Exit(1) +} + +func startPortalRpcServer(config Config, conn discover.UDPConn, addr string, clientChan chan<- *Client) error { + client := &Client{} + discV5, localNode, err := initDiscV5(config, conn) if err != nil { return err @@ -117,6 +175,7 @@ func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) err if err != nil { return err } + client.DiscV5API = discV5API api := &web3.API{} err = server.RegisterName("web3", api) @@ -130,20 +189,25 @@ func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) err if err != nil { return err } + client.HistoryNetwork = historyNetwork } + var beaconNetwork *beacon.BeaconNetwork if slices.Contains(config.Networks, portalwire.Beacon.Name()) { - err = initBeacon(config, server, conn, localNode, discV5) + beaconNetwork, err = initBeacon(config, server, conn, localNode, discV5) if err != nil { return err } + client.BeaconNetwork = beaconNetwork } + var stateNetwork *state.StateNetwork if slices.Contains(config.Networks, portalwire.State.Name()) { - err = initState(config, server, conn, localNode, discV5) + stateNetwork, err = initState(config, server, conn, localNode, discV5) if err != nil { return err } + client.StateNetwork = stateNetwork } ethapi := ðapi.API{ @@ -160,6 +224,9 @@ func startPortalRpcServer(config Config, conn discover.UDPConn, addr string) err Addr: addr, Handler: server, } + client.Server = httpServer + + clientChan <- client return httpServer.ListenAndServe() } @@ -223,15 +290,15 @@ func initHistory(config Config, server *rpc.Server, conn discover.UDPConn, local return historyNetwork, historyNetwork.Start() } -func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) error { +func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) (*beacon.BeaconNetwork, error) { dbPath := path.Join(config.DataDir, "beacon") err := os.MkdirAll(dbPath, 0755) if err != nil { - return err + return nil, err } sqlDb, err := sql.Open("sqlite3", path.Join(dbPath, "beacon.sqlite")) if err != nil { - return err + return nil, err } contentStorage, err := beacon.NewBeaconStorage(storage.PortalStorageConfig{ @@ -241,32 +308,32 @@ func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localN Spec: configs.Mainnet, }) if err != nil { - return err + return nil, err } contentQueue := make(chan *discover.ContentElement, 50) protocol, err := discover.NewPortalProtocol(config.Protocol, portalwire.Beacon, config.PrivateKey, conn, localNode, discV5, contentStorage, contentQueue) if err != nil { - return err + return nil, err } portalApi := discover.NewPortalAPI(protocol) beaconAPI := beacon.NewBeaconNetworkAPI(portalApi) err = server.RegisterName("portal", beaconAPI) if err != nil { - return err + return nil, err } beaconNetwork := beacon.NewBeaconNetwork(protocol) - return beaconNetwork.Start() + return beaconNetwork, beaconNetwork.Start() } -func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) error { +func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNode *enode.LocalNode, discV5 *discover.UDPv5) (*state.StateNetwork, error) { networkName := portalwire.State.Name() db, err := history.NewDB(config.DataDir, networkName) if err != nil { - return err + return nil, err } contentStorage, err := history.NewHistoryStorage(storage.PortalStorageConfig{ StorageCapacityMB: config.DataCapacity, @@ -275,24 +342,24 @@ func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNo NetworkName: networkName, }) if err != nil { - return err + return nil, err } contentQueue := make(chan *discover.ContentElement, 50) protocol, err := discover.NewPortalProtocol(config.Protocol, portalwire.State, config.PrivateKey, conn, localNode, discV5, contentStorage, contentQueue) if err != nil { - return err + return nil, err } api := discover.NewPortalAPI(protocol) stateNetworkAPI := state.NewStateNetworkAPI(api) err = server.RegisterName("portal", stateNetworkAPI) if err != nil { - return err + return nil, err } client := rpc.DialInProc(server) historyNetwork := state.NewStateNetwork(protocol, client) - return historyNetwork.Start() + return historyNetwork, historyNetwork.Start() } func getPortalConfig(ctx *cli.Context) (*Config, error) { From d6a2dd3ec69380abc114b59f87a69e7777f95c19 Mon Sep 17 00:00:00 2001 From: Rafael Sampaio <5679073+r4f4ss@users.noreply.github.com> Date: Wed, 18 Sep 2024 18:04:30 -0300 Subject: [PATCH 2/6] Transformation of closePortalRpcServer a method of Client to form a centralized management object --- cmd/shisui/main.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/shisui/main.go b/cmd/shisui/main.go index 2297b776c937..c2aa00e46356 100644 --- a/cmd/shisui/main.go +++ b/cmd/shisui/main.go @@ -131,32 +131,32 @@ func sigInterrupt(clientChan <-chan *Client) { log.Warn("Waiting for the client to start...") } c := <-clientChan - closePortalRpcServer(c) + c.closePortalRpcServer() }() <-interrupt os.Exit(1) } -func closePortalRpcServer(client *Client) { +func (cli *Client) closePortalRpcServer() { log.Info("Closing Database...") - client.DiscV5API.DiscV5.LocalNode().Database().Close() + cli.DiscV5API.DiscV5.LocalNode().Database().Close() log.Info("Closing UDPv5 protocol...") - client.DiscV5API.DiscV5.Close() - if client.HistoryNetwork != nil { + cli.DiscV5API.DiscV5.Close() + if cli.HistoryNetwork != nil { log.Info("Closing history network...") - client.HistoryNetwork.Stop() + cli.HistoryNetwork.Stop() } - if client.BeaconNetwork != nil { + if cli.BeaconNetwork != nil { log.Info("Closing beacon network...") - client.BeaconNetwork.Stop() + cli.BeaconNetwork.Stop() } - if client.StateNetwork != nil { + if cli.StateNetwork != nil { log.Info("Closing state network...") - client.StateNetwork.Stop() + cli.StateNetwork.Stop() } log.Info("Closing servers...") - client.Server.Close() + cli.Server.Close() os.Exit(1) } From 145865d3c36364e51940d71bfcd52ebfe0118b9c Mon Sep 17 00:00:00 2001 From: Rafael Sampaio <5679073+r4f4ss@users.noreply.github.com> Date: Wed, 18 Sep 2024 18:06:56 -0300 Subject: [PATCH 3/6] Improves function name --- cmd/shisui/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/shisui/main.go b/cmd/shisui/main.go index c2aa00e46356..3bb01f35875d 100644 --- a/cmd/shisui/main.go +++ b/cmd/shisui/main.go @@ -96,7 +96,7 @@ func shisui(ctx *cli.Context) error { setDefaultLogger(*config) clientChan := make(chan *Client, 1) - go sigInterrupt(clientChan) + go handlerInterrupt(clientChan) addr, err := net.ResolveUDPAddr("udp", config.Protocol.ListenAddr) if err != nil { @@ -118,7 +118,7 @@ func setDefaultLogger(config Config) { log.SetDefault(defaultLogger) } -func sigInterrupt(clientChan <-chan *Client) { +func handlerInterrupt(clientChan <-chan *Client) { interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(interrupt) From 68dbb1a6b26e4ff3556eadadfb78ebaa58b10fd8 Mon Sep 17 00:00:00 2001 From: Rafael Sampaio <5679073+r4f4ss@users.noreply.github.com> Date: Wed, 18 Sep 2024 18:40:26 -0300 Subject: [PATCH 4/6] Removal of discV5 close in sub networks --- p2p/discover/portal_protocol.go | 1 - 1 file changed, 1 deletion(-) diff --git a/p2p/discover/portal_protocol.go b/p2p/discover/portal_protocol.go index c59c268f741b..a1b32fc15dc0 100644 --- a/p2p/discover/portal_protocol.go +++ b/p2p/discover/portal_protocol.go @@ -265,7 +265,6 @@ func (p *PortalProtocol) Start() error { func (p *PortalProtocol) Stop() { p.cancelCloseCtx() p.table.close() - p.DiscV5.Close() err := p.utp.Close() if err != nil { p.Log.Error("failed to close utp listener", "err", err) From 2755aeec54aa019a75ac67520dd821e48b422279 Mon Sep 17 00:00:00 2001 From: Rafael Sampaio <5679073+r4f4ss@users.noreply.github.com> Date: Wed, 18 Sep 2024 19:22:21 -0300 Subject: [PATCH 5/6] Reorder of Close() calls in closePortalRpcServer --- cmd/shisui/main.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cmd/shisui/main.go b/cmd/shisui/main.go index 3bb01f35875d..e4d3065dacd3 100644 --- a/cmd/shisui/main.go +++ b/cmd/shisui/main.go @@ -139,10 +139,6 @@ func handlerInterrupt(clientChan <-chan *Client) { } func (cli *Client) closePortalRpcServer() { - log.Info("Closing Database...") - cli.DiscV5API.DiscV5.LocalNode().Database().Close() - log.Info("Closing UDPv5 protocol...") - cli.DiscV5API.DiscV5.Close() if cli.HistoryNetwork != nil { log.Info("Closing history network...") cli.HistoryNetwork.Stop() @@ -155,9 +151,12 @@ func (cli *Client) closePortalRpcServer() { log.Info("Closing state network...") cli.StateNetwork.Stop() } + log.Info("Closing Database...") + cli.DiscV5API.DiscV5.LocalNode().Database().Close() + log.Info("Closing UDPv5 protocol...") + cli.DiscV5API.DiscV5.Close() log.Info("Closing servers...") cli.Server.Close() - os.Exit(1) } From aacb076ab1959cac5a4a1d6479184c4b0600941a Mon Sep 17 00:00:00 2001 From: Rafael Sampaio <5679073+r4f4ss@users.noreply.github.com> Date: Wed, 18 Sep 2024 22:07:04 -0300 Subject: [PATCH 6/6] Recolocation of discV5 close in sub networks --- p2p/discover/portal_protocol.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/discover/portal_protocol.go b/p2p/discover/portal_protocol.go index a1b32fc15dc0..c59c268f741b 100644 --- a/p2p/discover/portal_protocol.go +++ b/p2p/discover/portal_protocol.go @@ -265,6 +265,7 @@ func (p *PortalProtocol) Start() error { func (p *PortalProtocol) Stop() { p.cancelCloseCtx() p.table.close() + p.DiscV5.Close() err := p.utp.Close() if err != nil { p.Log.Error("failed to close utp listener", "err", err)