Skip to content

Commit

Permalink
Merge pull request #12 from LinuxSuRen/binary-cache
Browse files Browse the repository at this point in the history
Fix the issue of candidates gc
  • Loading branch information
LinuxSuRen authored Oct 24, 2021
2 parents b96d991 + 447296a commit ac6fcd3
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 47 deletions.
5 changes: 4 additions & 1 deletion cmd/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type serverOption struct {
mode string
externalAddress string
gcDuration string
proxyCenter string
}

var defaultGCDuration = time.Minute * 4
Expand All @@ -42,6 +43,8 @@ func createServerCommand() (cmd *cobra.Command) {
flags.StringVarP(&opt.mode, "mode", "m", "server", "This could be a normal server or a proxy")
flags.StringVarP(&opt.externalAddress, "externalAddress", "", "",
"The external address which used to registry to the center proxy")
flags.StringVarP(&opt.proxyCenter, "proxyCenter", "", "http://goget.surenpi.com",
"The address of the center proxy")
flags.StringVarP(&opt.gcDuration, "gc-duration", "", defaultGCDuration.String(),
"The duration of not alive candidates gc")
return
Expand All @@ -51,7 +54,7 @@ func (o *serverOption) runE(cmd *cobra.Command, args []string) (err error) {
switch o.mode {
case "server":
http.HandleFunc("/", server.GogetHandler)
if err = server.IntervalSelfRegistry(o.externalAddress, time.Minute*1); err != nil {
if err = server.IntervalSelfRegistry(o.proxyCenter, o.externalAddress, time.Minute*1); err != nil {
err = fmt.Errorf("failed to self registry to the center proxy, error: %v", err)
return
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/proxy/candidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,39 @@ func TestCandidate(t *testing.T) {
_, ok = expiredCandidates.findAlive()
assert.False(t, ok)
}

func TestCandidatesHelper(t *testing.T) {
// invalid candidates array
candidatesArray := []interface{}{
struct {
}{},
}
candidates := newFromArray(candidatesArray)
assert.Equal(t, 0, candidates.size())

// valid candidates array
candidatesArray = []interface{}{
map[interface{}]interface{}{
"address": "fake",
"heartBeat": time.Now().Format(timeFormat),
},
}
candidates = newFromArray(candidatesArray)
assert.Equal(t, 1, candidates.size())
aliveCandidate, ok := candidates.findAlive()
assert.True(t, ok)
assert.Equal(t, "fake", aliveCandidate.address)

// from map
candidatesMap := []map[interface{}]interface{}{
{
"address": "fake",
"heartBeat": time.Now(),
},
}
candidates = newFromMap(candidatesMap)
assert.Equal(t, 1, candidates.size())
aliveCandidate, ok = candidates.findAlive()
assert.True(t, ok)
assert.Equal(t, "fake", aliveCandidate.address)
}
63 changes: 47 additions & 16 deletions pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/viper"
"net/http"
"strings"
"sync"
"time"
)

Expand All @@ -15,17 +16,28 @@ const KCandidates = "candidates"
// RedirectionHandler is the handler of proxy
func RedirectionHandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("received a request", r.RequestURI)
if !isValid(r.RequestURI) {
// TODO do the validation check
w.WriteHeader(http.StatusBadRequest)
_,_ = w.Write([]byte("invalid request, please check https://github.com/LinuxSuRen/goget"))
return
}

candidates := getCandidatesFromConfig()

fmt.Println("found possible candidates", candidates.size())
if candidate, ok := candidates.findAlive(); ok {
fmt.Println("redirect to", candidate.address)
http.Redirect(w, r, fmt.Sprintf("https://%s/%s", candidate.address, r.RequestURI), http.StatusMovedPermanently)
http.Redirect(w, r, fmt.Sprintf("http://%s/%s", candidate.address, r.RequestURI), http.StatusMovedPermanently)
return
}
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("no candidates found"))
_, _ = w.Write([]byte("no candidates found, please feel free to be a candidate with command 'goget-server --mode proxy --externalAddress your-ip:port'"))
}

func isValid(uri string) bool {
return strings.HasPrefix(uri, "/github.com/") ||
strings.HasPrefix(uri, "/gitee.com/")
}

// RegistryHandler receive the proxy registry request
Expand All @@ -44,11 +56,12 @@ func RegistryHandler(w http.ResponseWriter, r *http.Request) {
candidates = newFromMap(candidatesRaw)
}
} else {
candidates = newArray(candidatesRaw)
candidates = newFromArray(candidatesRaw)
}

candidates.addCandidate(address)

fmt.Println("receive candidate server", address)
if err := saveCandidates(candidates); err == nil {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
Expand All @@ -58,11 +71,6 @@ func RegistryHandler(w http.ResponseWriter, r *http.Request) {
}
}

func saveCandidates(candidates *candidateSlice) (err error) {
viper.Set(KCandidates, candidates.getMap())
return viper.WriteConfig()
}

// CandidatesGC removes the not alive candidates
func CandidatesGC(ctx context.Context, duration time.Duration) {
go func(ctx context.Context) {
Expand All @@ -82,16 +90,39 @@ func CandidatesGC(ctx context.Context, duration time.Duration) {
}(ctx)
}

var mutex = &sync.Mutex{}
func saveCandidates(candidates *candidateSlice) (err error) {
mutex.Lock()
defer func() {
mutex.Unlock()
}()
viper.Set(KCandidates, candidates.getMap())
return viper.WriteConfig()
}

func getCandidatesFromConfig() (candidates *candidateSlice) {
if candidatesRaw, ok := viper.Get(KCandidates).([]interface{}); !ok {
if candidatesRaw, ok := viper.Get(KCandidates).([]map[interface{}]interface{}); !ok {
candidates = newFromMap(candidatesRaw)
} else {
candidates = newFromMap(candidatesRaw)
}
} else {
candidates = newArray(candidatesRaw)
mutex.Lock()
defer func() {
mutex.Unlock()
}()
switch val := viper.Get(KCandidates).(type) {
case []interface{}:
candidates = newFromArray(val)
case []map[interface{}]interface{}:
candidates = newFromMap(val)
default:
fmt.Println(val)
candidates = &candidateSlice{}
}
//if candidatesRaw, ok := viper.Get(KCandidates).([]interface{}); !ok {
// if candidatesRaw, ok := viper.Get(KCandidates).([]map[interface{}]interface{}); !ok {
// candidates = newFromMap(candidatesRaw)
// } else {
// candidates = newFromMap(candidatesRaw)
// }
//} else {
// candidates = newFromArray(candidatesRaw)
//}
return
}

Expand Down
57 changes: 32 additions & 25 deletions pkg/proxy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type candidate struct {

var aliveDuration = time.Minute * 2

const timeFormat = time.RFC3339

type candidateSlice struct {
candidates []candidate
}
Expand Down Expand Up @@ -57,7 +59,25 @@ func (c *candidateSlice) size() int {
return len(c.candidates)
}

func newArray(candidates []interface{}) *candidateSlice {
func (c *candidateSlice) getMap() (result []map[interface{}]interface{}) {
result = make([]map[interface{}]interface{}, 0)
for i, _ := range c.candidates {
can := c.candidates[i]
// don't persistent the expired candidates
if can.expired {
fmt.Println("skip expired candidate:", can.address)
continue
}

result = append(result, map[interface{}]interface{}{
"address": can.address,
"heartBeat": can.heartBeat.Format(timeFormat),
})
}
return
}

func newFromArray(candidates []interface{}) *candidateSlice {
targetCandidates := make([]candidate, 0)

for i, _ := range candidates {
Expand All @@ -82,32 +102,19 @@ func newFromMap(candidates []map[interface{}]interface{}) *candidateSlice {
for i, _ := range candidates {
can := candidates[i]

heartBeat, _ := time.Parse(time.RFC3339, fmt.Sprintf("%v", can["heartBeat"]))
targetCandidates = append(targetCandidates, candidate{
address: fmt.Sprintf("%v", can["address"]),
heartBeat: heartBeat,
})
targetCandidate := candidate{
address: fmt.Sprintf("%v", can["address"]),
}

switch v := can["heartBeat"].(type) {
case time.Time:
targetCandidate.heartBeat = v
case string:
targetCandidate.heartBeat, _ = time.Parse(timeFormat, v)
}
targetCandidates = append(targetCandidates, targetCandidate)
}
return &candidateSlice{
candidates: targetCandidates,
}
}

func (c *candidateSlice) getMap() (result []map[interface{}]interface{}) {
result = make([]map[interface{}]interface{}, 0)
fmt.Println(c.candidates)
for i, _ := range c.candidates {
can := c.candidates[i]
// don't persistent the expired candidates
if can.expired {
fmt.Println("skip expired candidate:", can.address)
continue
}

result = append(result, map[interface{}]interface{}{
"address": can.address,
"heartBeat": can.heartBeat.Format(time.RFC3339),
})
}
return
}
12 changes: 7 additions & 5 deletions pkg/server/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,29 @@ import (
)

// selfRegistry registries myself to the center proxy
func selfRegistry(address string) (err error) {
func selfRegistry(center, address string) (err error) {
if address == "" {
err = fmt.Errorf("the external address is empty")
}

var resp *http.Response
if resp, err = http.Post(fmt.Sprintf("http://goget.surenpi.com/registry?address=%s", address), "", nil); err == nil {
if resp, err = http.Post(fmt.Sprintf("%s/registry?address=%s", center, address), "", nil); err == nil {
if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
}
return
}

func IntervalSelfRegistry(address string, duration time.Duration) (err error) {
err = selfRegistry(address)
func IntervalSelfRegistry(center, address string, duration time.Duration) (err error) {
err = selfRegistry(center, address)

go func() {
ticker := time.NewTicker(duration)
for range ticker.C {
selfRegistry(address)
if err = selfRegistry(center, address); err != nil {
fmt.Println("self registry failed", err)
}
}
}()
return
Expand Down

0 comments on commit ac6fcd3

Please sign in to comment.