Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the issue of candidates gc #12

Merged
merged 1 commit into from
Oct 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type serverOption struct {
mode string
externalAddress string
gcDuration string
proxyCenter string
}

var defaultGCDuration = time.Minute * 4
Expand All @@ -42,6 +43,8 @@ func createServerCommand() (cmd *cobra.Command) {
flags.StringVarP(&opt.mode, "mode", "m", "server", "This could be a normal server or a proxy")
flags.StringVarP(&opt.externalAddress, "externalAddress", "", "",
"The external address which used to registry to the center proxy")
flags.StringVarP(&opt.proxyCenter, "proxyCenter", "", "http://goget.surenpi.com",
"The address of the center proxy")
flags.StringVarP(&opt.gcDuration, "gc-duration", "", defaultGCDuration.String(),
"The duration of not alive candidates gc")
return
Expand All @@ -51,7 +54,7 @@ func (o *serverOption) runE(cmd *cobra.Command, args []string) (err error) {
switch o.mode {
case "server":
http.HandleFunc("/", server.GogetHandler)
if err = server.IntervalSelfRegistry(o.externalAddress, time.Minute*1); err != nil {
if err = server.IntervalSelfRegistry(o.proxyCenter, o.externalAddress, time.Minute*1); err != nil {
err = fmt.Errorf("failed to self registry to the center proxy, error: %v", err)
return
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/proxy/candidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,39 @@ func TestCandidate(t *testing.T) {
_, ok = expiredCandidates.findAlive()
assert.False(t, ok)
}

func TestCandidatesHelper(t *testing.T) {
// invalid candidates array
candidatesArray := []interface{}{
struct {
}{},
}
candidates := newFromArray(candidatesArray)
assert.Equal(t, 0, candidates.size())

// valid candidates array
candidatesArray = []interface{}{
map[interface{}]interface{}{
"address": "fake",
"heartBeat": time.Now().Format(timeFormat),
},
}
candidates = newFromArray(candidatesArray)
assert.Equal(t, 1, candidates.size())
aliveCandidate, ok := candidates.findAlive()
assert.True(t, ok)
assert.Equal(t, "fake", aliveCandidate.address)

// from map
candidatesMap := []map[interface{}]interface{}{
{
"address": "fake",
"heartBeat": time.Now(),
},
}
candidates = newFromMap(candidatesMap)
assert.Equal(t, 1, candidates.size())
aliveCandidate, ok = candidates.findAlive()
assert.True(t, ok)
assert.Equal(t, "fake", aliveCandidate.address)
}
63 changes: 47 additions & 16 deletions pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/viper"
"net/http"
"strings"
"sync"
"time"
)

Expand All @@ -15,17 +16,28 @@ const KCandidates = "candidates"
// RedirectionHandler is the handler of proxy
func RedirectionHandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("received a request", r.RequestURI)
if !isValid(r.RequestURI) {
// TODO do the validation check
w.WriteHeader(http.StatusBadRequest)
_,_ = w.Write([]byte("invalid request, please check https://github.com/LinuxSuRen/goget"))
return
}

candidates := getCandidatesFromConfig()

fmt.Println("found possible candidates", candidates.size())
if candidate, ok := candidates.findAlive(); ok {
fmt.Println("redirect to", candidate.address)
http.Redirect(w, r, fmt.Sprintf("https://%s/%s", candidate.address, r.RequestURI), http.StatusMovedPermanently)
http.Redirect(w, r, fmt.Sprintf("http://%s/%s", candidate.address, r.RequestURI), http.StatusMovedPermanently)
return
}
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("no candidates found"))
_, _ = w.Write([]byte("no candidates found, please feel free to be a candidate with command 'goget-server --mode proxy --externalAddress your-ip:port'"))
}

func isValid(uri string) bool {
return strings.HasPrefix(uri, "/github.com/") ||
strings.HasPrefix(uri, "/gitee.com/")
}

// RegistryHandler receive the proxy registry request
Expand All @@ -44,11 +56,12 @@ func RegistryHandler(w http.ResponseWriter, r *http.Request) {
candidates = newFromMap(candidatesRaw)
}
} else {
candidates = newArray(candidatesRaw)
candidates = newFromArray(candidatesRaw)
}

candidates.addCandidate(address)

fmt.Println("receive candidate server", address)
if err := saveCandidates(candidates); err == nil {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
Expand All @@ -58,11 +71,6 @@ func RegistryHandler(w http.ResponseWriter, r *http.Request) {
}
}

func saveCandidates(candidates *candidateSlice) (err error) {
viper.Set(KCandidates, candidates.getMap())
return viper.WriteConfig()
}

// CandidatesGC removes the not alive candidates
func CandidatesGC(ctx context.Context, duration time.Duration) {
go func(ctx context.Context) {
Expand All @@ -82,16 +90,39 @@ func CandidatesGC(ctx context.Context, duration time.Duration) {
}(ctx)
}

var mutex = &sync.Mutex{}
func saveCandidates(candidates *candidateSlice) (err error) {
mutex.Lock()
defer func() {
mutex.Unlock()
}()
viper.Set(KCandidates, candidates.getMap())
return viper.WriteConfig()
}

func getCandidatesFromConfig() (candidates *candidateSlice) {
if candidatesRaw, ok := viper.Get(KCandidates).([]interface{}); !ok {
if candidatesRaw, ok := viper.Get(KCandidates).([]map[interface{}]interface{}); !ok {
candidates = newFromMap(candidatesRaw)
} else {
candidates = newFromMap(candidatesRaw)
}
} else {
candidates = newArray(candidatesRaw)
mutex.Lock()
defer func() {
mutex.Unlock()
}()
switch val := viper.Get(KCandidates).(type) {
case []interface{}:
candidates = newFromArray(val)
case []map[interface{}]interface{}:
candidates = newFromMap(val)
default:
fmt.Println(val)
candidates = &candidateSlice{}
}
//if candidatesRaw, ok := viper.Get(KCandidates).([]interface{}); !ok {
// if candidatesRaw, ok := viper.Get(KCandidates).([]map[interface{}]interface{}); !ok {
// candidates = newFromMap(candidatesRaw)
// } else {
// candidates = newFromMap(candidatesRaw)
// }
//} else {
// candidates = newFromArray(candidatesRaw)
//}
return
}

Expand Down
57 changes: 32 additions & 25 deletions pkg/proxy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type candidate struct {

var aliveDuration = time.Minute * 2

const timeFormat = time.RFC3339

type candidateSlice struct {
candidates []candidate
}
Expand Down Expand Up @@ -57,7 +59,25 @@ func (c *candidateSlice) size() int {
return len(c.candidates)
}

func newArray(candidates []interface{}) *candidateSlice {
func (c *candidateSlice) getMap() (result []map[interface{}]interface{}) {
result = make([]map[interface{}]interface{}, 0)
for i, _ := range c.candidates {
can := c.candidates[i]
// don't persistent the expired candidates
if can.expired {
fmt.Println("skip expired candidate:", can.address)
continue
}

result = append(result, map[interface{}]interface{}{
"address": can.address,
"heartBeat": can.heartBeat.Format(timeFormat),
})
}
return
}

func newFromArray(candidates []interface{}) *candidateSlice {
targetCandidates := make([]candidate, 0)

for i, _ := range candidates {
Expand All @@ -82,32 +102,19 @@ func newFromMap(candidates []map[interface{}]interface{}) *candidateSlice {
for i, _ := range candidates {
can := candidates[i]

heartBeat, _ := time.Parse(time.RFC3339, fmt.Sprintf("%v", can["heartBeat"]))
targetCandidates = append(targetCandidates, candidate{
address: fmt.Sprintf("%v", can["address"]),
heartBeat: heartBeat,
})
targetCandidate := candidate{
address: fmt.Sprintf("%v", can["address"]),
}

switch v := can["heartBeat"].(type) {
case time.Time:
targetCandidate.heartBeat = v
case string:
targetCandidate.heartBeat, _ = time.Parse(timeFormat, v)
}
targetCandidates = append(targetCandidates, targetCandidate)
}
return &candidateSlice{
candidates: targetCandidates,
}
}

func (c *candidateSlice) getMap() (result []map[interface{}]interface{}) {
result = make([]map[interface{}]interface{}, 0)
fmt.Println(c.candidates)
for i, _ := range c.candidates {
can := c.candidates[i]
// don't persistent the expired candidates
if can.expired {
fmt.Println("skip expired candidate:", can.address)
continue
}

result = append(result, map[interface{}]interface{}{
"address": can.address,
"heartBeat": can.heartBeat.Format(time.RFC3339),
})
}
return
}
12 changes: 7 additions & 5 deletions pkg/server/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,29 @@ import (
)

// selfRegistry registries myself to the center proxy
func selfRegistry(address string) (err error) {
func selfRegistry(center, address string) (err error) {
if address == "" {
err = fmt.Errorf("the external address is empty")
}

var resp *http.Response
if resp, err = http.Post(fmt.Sprintf("http://goget.surenpi.com/registry?address=%s", address), "", nil); err == nil {
if resp, err = http.Post(fmt.Sprintf("%s/registry?address=%s", center, address), "", nil); err == nil {
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
}
return
}

func IntervalSelfRegistry(address string, duration time.Duration) (err error) {
err = selfRegistry(address)
func IntervalSelfRegistry(center, address string, duration time.Duration) (err error) {
err = selfRegistry(center, address)

go func() {
ticker := time.NewTicker(duration)
for range ticker.C {
selfRegistry(address)
if err = selfRegistry(center, address); err != nil {
fmt.Println("self registry failed", err)
}
}
}()
return
Expand Down