This repository was archived by the owner on Nov 29, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathbalancer.go
137 lines (117 loc) · 2.93 KB
/
balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package balancer
import (
"sync/atomic"
"time"
"gopkg.in/redis.v3"
)
type BalanceMode int
const (
// LeastConn picks the backend with the fewest connections.
ModeLeastConn BalanceMode = iota
// FirstUp always picks the first available backend.
ModeFirstUp
// ModeMinLatency always picks the backend with the minimal latency.
ModeMinLatency
// ModeRandom selects backends randomly.
ModeRandom
// ModeWeightedLatency uses latency as a weight for random selection.
ModeWeightedLatency
// ModeRoundRobin round-robins across available backends.
ModeRoundRobin
)
const minCheckInterval = 100 * time.Millisecond
// Balancer client
type Balancer struct {
selector pool
mode BalanceMode
cursor int32
}
// New initializes a new redis balancer
func New(opts []Options, mode BalanceMode) *Balancer {
if len(opts) == 0 {
opts = []Options{
{Options: redis.Options{Network: "tcp", Addr: "127.0.0.1:6379"}},
}
}
balancer := &Balancer{
selector: make(pool, len(opts)),
mode: mode,
}
for i, opt := range opts {
balancer.selector[i] = newRedisBackend(&opt)
}
return balancer
}
// Next returns the next available redis client
func (b *Balancer) Next() *redis.Client { return b.pickNext().client }
// Close closes all connecitons in the balancer
func (b *Balancer) Close() (err error) {
for _, b := range b.selector {
if e := b.Close(); e != nil {
err = e
}
}
return
}
// Pick the next backend
func (b *Balancer) pickNext() (backend *redisBackend) {
switch b.mode {
case ModeLeastConn:
backend = b.selector.MinUp(func(b *redisBackend) int64 {
return b.Connections()
})
case ModeFirstUp:
backend = b.selector.FirstUp()
case ModeMinLatency:
backend = b.selector.MinUp(func(b *redisBackend) int64 {
return int64(b.Latency())
})
case ModeRandom:
backend = b.selector.Up().Random()
case ModeWeightedLatency:
backend = b.selector.Up().WeightedRandom(func(b *redisBackend) int64 {
factor := int64(b.Latency())
return factor * factor
})
case ModeRoundRobin:
next := int(atomic.AddInt32(&b.cursor, 1))
backend = b.selector.Up().At(next)
}
// Fall back on random backend
if backend == nil {
backend = b.selector.Random()
}
// Increment the number of connections
backend.incConnections(1)
return
}
// --------------------------------------------------------------------
// Custom balancer options
type Options struct {
redis.Options
// Check interval, min 100ms, defaults to 1s
CheckInterval time.Duration
// Rise and Fall indicate the number of checks required to
// mark the instance as up or down, defaults to 1
Rise, Fall int
}
func (o *Options) getCheckInterval() time.Duration {
if o.CheckInterval == 0 {
return time.Second
} else if o.CheckInterval < minCheckInterval {
return minCheckInterval
}
return o.CheckInterval
}
func (o *Options) getRise() int {
if o.Rise < 1 {
return 1
}
return o.Rise
}
func (o *Options) getFall() int {
if o.Fall < 1 {
return 1
}
return o.Fall
}