From 447296afc55a49433403e099dc70823b584ce3c8 Mon Sep 17 00:00:00 2001 From: rick <1450685+LinuxSuRen@users.noreply.github.com> Date: Sun, 24 Oct 2021 20:23:58 +0800 Subject: [PATCH] Fix the issue of candidates gc --- cmd/server/root.go | 5 ++- pkg/proxy/candidate_test.go | 36 +++++++++++++++++++++ pkg/proxy/handler.go | 63 +++++++++++++++++++++++++++---------- pkg/proxy/types.go | 57 ++++++++++++++++++--------------- pkg/server/registry.go | 12 ++++--- 5 files changed, 126 insertions(+), 47 deletions(-) diff --git a/cmd/server/root.go b/cmd/server/root.go index c67ce6f..b309ec5 100644 --- a/cmd/server/root.go +++ b/cmd/server/root.go @@ -23,6 +23,7 @@ type serverOption struct { mode string externalAddress string gcDuration string + proxyCenter string } var defaultGCDuration = time.Minute * 4 @@ -42,6 +43,8 @@ func createServerCommand() (cmd *cobra.Command) { flags.StringVarP(&opt.mode, "mode", "m", "server", "This could be a normal server or a proxy") flags.StringVarP(&opt.externalAddress, "externalAddress", "", "", "The external address which used to registry to the center proxy") + flags.StringVarP(&opt.proxyCenter, "proxyCenter", "", "http://goget.surenpi.com", + "The address of the center proxy") flags.StringVarP(&opt.gcDuration, "gc-duration", "", defaultGCDuration.String(), "The duration of not alive candidates gc") return @@ -51,7 +54,7 @@ func (o *serverOption) runE(cmd *cobra.Command, args []string) (err error) { switch o.mode { case "server": http.HandleFunc("/", server.GogetHandler) - if err = server.IntervalSelfRegistry(o.externalAddress, time.Minute*1); err != nil { + if err = server.IntervalSelfRegistry(o.proxyCenter, o.externalAddress, time.Minute*1); err != nil { err = fmt.Errorf("failed to self registry to the center proxy, error: %v", err) return } diff --git a/pkg/proxy/candidate_test.go b/pkg/proxy/candidate_test.go index b9d58c1..f9deb31 100644 --- a/pkg/proxy/candidate_test.go +++ b/pkg/proxy/candidate_test.go @@ -47,3 +47,39 @@ func TestCandidate(t *testing.T) { _, ok = expiredCandidates.findAlive() assert.False(t, ok) } + +func TestCandidatesHelper(t *testing.T) { + // invalid candidates array + candidatesArray := []interface{}{ + struct { + }{}, + } + candidates := newFromArray(candidatesArray) + assert.Equal(t, 0, candidates.size()) + + // valid candidates array + candidatesArray = []interface{}{ + map[interface{}]interface{}{ + "address": "fake", + "heartBeat": time.Now().Format(timeFormat), + }, + } + candidates = newFromArray(candidatesArray) + assert.Equal(t, 1, candidates.size()) + aliveCandidate, ok := candidates.findAlive() + assert.True(t, ok) + assert.Equal(t, "fake", aliveCandidate.address) + + // from map + candidatesMap := []map[interface{}]interface{}{ + { + "address": "fake", + "heartBeat": time.Now(), + }, + } + candidates = newFromMap(candidatesMap) + assert.Equal(t, 1, candidates.size()) + aliveCandidate, ok = candidates.findAlive() + assert.True(t, ok) + assert.Equal(t, "fake", aliveCandidate.address) +} diff --git a/pkg/proxy/handler.go b/pkg/proxy/handler.go index 279b527..58794b2 100644 --- a/pkg/proxy/handler.go +++ b/pkg/proxy/handler.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/viper" "net/http" "strings" + "sync" "time" ) @@ -15,17 +16,28 @@ const KCandidates = "candidates" // RedirectionHandler is the handler of proxy func RedirectionHandler(w http.ResponseWriter, r *http.Request) { fmt.Println("received a request", r.RequestURI) + if !isValid(r.RequestURI) { + // TODO do the validation check + w.WriteHeader(http.StatusBadRequest) + _,_ = w.Write([]byte("invalid request, please check https://github.com/LinuxSuRen/goget")) + return + } candidates := getCandidatesFromConfig() fmt.Println("found possible candidates", candidates.size()) if candidate, ok := candidates.findAlive(); ok { fmt.Println("redirect to", candidate.address) - http.Redirect(w, r, fmt.Sprintf("https://%s/%s", candidate.address, r.RequestURI), http.StatusMovedPermanently) + http.Redirect(w, r, fmt.Sprintf("http://%s/%s", candidate.address, r.RequestURI), http.StatusMovedPermanently) return } w.WriteHeader(http.StatusBadRequest) - _, _ = w.Write([]byte("no candidates found")) + _, _ = w.Write([]byte("no candidates found, please feel free to be a candidate with command 'goget-server --mode proxy --externalAddress your-ip:port'")) +} + +func isValid(uri string) bool { + return strings.HasPrefix(uri, "/github.com/") || + strings.HasPrefix(uri, "/gitee.com/") } // RegistryHandler receive the proxy registry request @@ -44,11 +56,12 @@ func RegistryHandler(w http.ResponseWriter, r *http.Request) { candidates = newFromMap(candidatesRaw) } } else { - candidates = newArray(candidatesRaw) + candidates = newFromArray(candidatesRaw) } candidates.addCandidate(address) + fmt.Println("receive candidate server", address) if err := saveCandidates(candidates); err == nil { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) @@ -58,11 +71,6 @@ func RegistryHandler(w http.ResponseWriter, r *http.Request) { } } -func saveCandidates(candidates *candidateSlice) (err error) { - viper.Set(KCandidates, candidates.getMap()) - return viper.WriteConfig() -} - // CandidatesGC removes the not alive candidates func CandidatesGC(ctx context.Context, duration time.Duration) { go func(ctx context.Context) { @@ -82,16 +90,39 @@ func CandidatesGC(ctx context.Context, duration time.Duration) { }(ctx) } +var mutex = &sync.Mutex{} +func saveCandidates(candidates *candidateSlice) (err error) { + mutex.Lock() + defer func() { + mutex.Unlock() + }() + viper.Set(KCandidates, candidates.getMap()) + return viper.WriteConfig() +} + func getCandidatesFromConfig() (candidates *candidateSlice) { - if candidatesRaw, ok := viper.Get(KCandidates).([]interface{}); !ok { - if candidatesRaw, ok := viper.Get(KCandidates).([]map[interface{}]interface{}); !ok { - candidates = newFromMap(candidatesRaw) - } else { - candidates = newFromMap(candidatesRaw) - } - } else { - candidates = newArray(candidatesRaw) + mutex.Lock() + defer func() { + mutex.Unlock() + }() + switch val := viper.Get(KCandidates).(type) { + case []interface{}: + candidates = newFromArray(val) + case []map[interface{}]interface{}: + candidates = newFromMap(val) + default: + fmt.Println(val) + candidates = &candidateSlice{} } + //if candidatesRaw, ok := viper.Get(KCandidates).([]interface{}); !ok { + // if candidatesRaw, ok := viper.Get(KCandidates).([]map[interface{}]interface{}); !ok { + // candidates = newFromMap(candidatesRaw) + // } else { + // candidates = newFromMap(candidatesRaw) + // } + //} else { + // candidates = newFromArray(candidatesRaw) + //} return } diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 13b0ad8..0bb5398 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -13,6 +13,8 @@ type candidate struct { var aliveDuration = time.Minute * 2 +const timeFormat = time.RFC3339 + type candidateSlice struct { candidates []candidate } @@ -57,7 +59,25 @@ func (c *candidateSlice) size() int { return len(c.candidates) } -func newArray(candidates []interface{}) *candidateSlice { +func (c *candidateSlice) getMap() (result []map[interface{}]interface{}) { + result = make([]map[interface{}]interface{}, 0) + for i, _ := range c.candidates { + can := c.candidates[i] + // don't persistent the expired candidates + if can.expired { + fmt.Println("skip expired candidate:", can.address) + continue + } + + result = append(result, map[interface{}]interface{}{ + "address": can.address, + "heartBeat": can.heartBeat.Format(timeFormat), + }) + } + return +} + +func newFromArray(candidates []interface{}) *candidateSlice { targetCandidates := make([]candidate, 0) for i, _ := range candidates { @@ -82,32 +102,19 @@ func newFromMap(candidates []map[interface{}]interface{}) *candidateSlice { for i, _ := range candidates { can := candidates[i] - heartBeat, _ := time.Parse(time.RFC3339, fmt.Sprintf("%v", can["heartBeat"])) - targetCandidates = append(targetCandidates, candidate{ - address: fmt.Sprintf("%v", can["address"]), - heartBeat: heartBeat, - }) + targetCandidate := candidate{ + address: fmt.Sprintf("%v", can["address"]), + } + + switch v := can["heartBeat"].(type) { + case time.Time: + targetCandidate.heartBeat = v + case string: + targetCandidate.heartBeat, _ = time.Parse(timeFormat, v) + } + targetCandidates = append(targetCandidates, targetCandidate) } return &candidateSlice{ candidates: targetCandidates, } } - -func (c *candidateSlice) getMap() (result []map[interface{}]interface{}) { - result = make([]map[interface{}]interface{}, 0) - fmt.Println(c.candidates) - for i, _ := range c.candidates { - can := c.candidates[i] - // don't persistent the expired candidates - if can.expired { - fmt.Println("skip expired candidate:", can.address) - continue - } - - result = append(result, map[interface{}]interface{}{ - "address": can.address, - "heartBeat": can.heartBeat.Format(time.RFC3339), - }) - } - return -} diff --git a/pkg/server/registry.go b/pkg/server/registry.go index baabbe2..67f0c9c 100644 --- a/pkg/server/registry.go +++ b/pkg/server/registry.go @@ -7,13 +7,13 @@ import ( ) // selfRegistry registries myself to the center proxy -func selfRegistry(address string) (err error) { +func selfRegistry(center, address string) (err error) { if address == "" { err = fmt.Errorf("the external address is empty") } var resp *http.Response - if resp, err = http.Post(fmt.Sprintf("http://goget.surenpi.com/registry?address=%s", address), "", nil); err == nil { + if resp, err = http.Post(fmt.Sprintf("%s/registry?address=%s", center, address), "", nil); err == nil { if resp.StatusCode != http.StatusOK { err = fmt.Errorf("unexpected status code: %d", resp.StatusCode) } @@ -21,13 +21,15 @@ func selfRegistry(address string) (err error) { return } -func IntervalSelfRegistry(address string, duration time.Duration) (err error) { - err = selfRegistry(address) +func IntervalSelfRegistry(center, address string, duration time.Duration) (err error) { + err = selfRegistry(center, address) go func() { ticker := time.NewTicker(duration) for range ticker.C { - selfRegistry(address) + if err = selfRegistry(center, address); err != nil { + fmt.Println("self registry failed", err) + } } }() return