Skip to content

Commit 983a4e1

Browse files
author
tonic
committed
Merge branch 'dev' into 'master'
Update 1. embed eru-alloc into core 2. add complexscheduler 3. delete simplescheduler See merge request !5
2 parents a9580c6 + 5f701ad commit 983a4e1

File tree

12 files changed

+692
-10
lines changed

12 files changed

+692
-10
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ agent_port: "12345"
3131
permdir: "/mnt/mfs/permdirs"
3232
etcd:
3333
- "http://127.0.0.1:2379"
34+
etcd_lock_key: "erucore"
35+
etcd_lock_ttl: 10
36+
scheduler: "complex"
3437
3538
git:
3639
public_key: "[path_to_pub_key]"

cluster/calcium/cluster.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package calcium
33
import (
44
"sync"
55

6+
"github.com/coreos/etcd/client"
67
"gitlab.ricebook.net/platform/core/network"
78
"gitlab.ricebook.net/platform/core/network/calico"
89
"gitlab.ricebook.net/platform/core/scheduler"
10+
"gitlab.ricebook.net/platform/core/scheduler/complex"
911
"gitlab.ricebook.net/platform/core/scheduler/simple"
1012
"gitlab.ricebook.net/platform/core/source"
1113
"gitlab.ricebook.net/platform/core/source/gitlab"
@@ -29,7 +31,12 @@ func New(config types.Config) (*Calcium, error) {
2931
return nil, err
3032
}
3133

32-
scheduler := simplescheduler.New()
34+
if config.Scheduler == "simple" {
35+
scheduler := simplescheduler.New()
36+
} else {
37+
scheduler := complexscheduler.NewPotassim(store, config.EtcdLockKey, config.EtcdLockTTL)
38+
}
39+
3340
titanium := calico.New()
3441
source := gitlab.New(config)
3542

cluster/calcium/create_container.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ func (c *Calcium) prepareNodes(podname string, quota float64, num int) (map[stri
7878
c.Lock()
7979
defer c.Unlock()
8080

81-
q := int(quota)
8281
r := make(map[string][]types.CPUMap)
8382

8483
nodes, err := c.ListPodNodes(podname)
@@ -87,21 +86,21 @@ func (c *Calcium) prepareNodes(podname string, quota float64, num int) (map[stri
8786
}
8887

8988
// if public, use only public nodes
90-
if q == 0 {
89+
if quota == 0.0 { // 因为要考虑quota=0.5这种需求,所以这里有点麻烦
9190
nodes = filterNodes(nodes, true)
9291
} else {
9392
nodes = filterNodes(nodes, false)
9493
}
9594

9695
cpumap := makeCPUMap(nodes)
97-
r, err = c.scheduler.SelectNodes(cpumap, q, num)
96+
r, err = c.scheduler.SelectNodes(cpumap, quota, num) // 这个接口统一使用float64了
9897
if err != nil {
9998
return r, err
10099
}
101100

102101
// if quota is set to 0
103102
// then no cpu is required
104-
if q > 0 {
103+
if quota > 0 {
105104
// cpus remained
106105
// update data to etcd
107106
// `SelectNodes` reduces count in cpumap

lock/lock.go

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package lock
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"strings"
7+
"sync"
8+
"time"
9+
10+
log "github.com/Sirupsen/logrus"
11+
"github.com/coreos/etcd/client"
12+
"golang.org/x/net/context"
13+
)
14+
15+
const (
16+
defaultTTL = 60
17+
defaultTry = 3
18+
)
19+
20+
// A Mutex is a mutual exclusion lock which is distributed across a cluster.
21+
type Mutex struct {
22+
key string
23+
id string // The identity of the caller
24+
kapi client.KeysAPI
25+
ttl time.Duration
26+
mutex sync.Mutex
27+
}
28+
29+
// New creates a Mutex with the given key which must be the same
30+
// across the cluster nodes.
31+
// machines are the ectd cluster addresses
32+
func NewMutex(c client.KeysAPI, key string, ttl int) *Mutex {
33+
hostname, err := os.Hostname()
34+
if err != nil || key == "" {
35+
return nil
36+
}
37+
38+
if !strings.HasPrefix(key, "/") {
39+
key = fmt.Sprintf("/%s", key)
40+
}
41+
42+
if ttl < 1 {
43+
ttl = defaultTTL
44+
}
45+
46+
return &Mutex{
47+
key: key,
48+
id: fmt.Sprintf("%v-%v-%v", hostname, os.Getpid(), time.Now().Format("20060102-15:04:05.999999999")),
49+
kapi: c,
50+
ttl: time.Second * time.Duration(ttl),
51+
mutex: sync.Mutex{},
52+
}
53+
}
54+
55+
// Lock locks m.
56+
// If the lock is already in use, the calling goroutine
57+
// blocks until the mutex is available.
58+
func (m *Mutex) Lock() (err error) {
59+
m.mutex.Lock()
60+
for try := 1; try <= defaultTry; try++ {
61+
err = m.lock()
62+
if err == nil {
63+
return nil
64+
}
65+
66+
log.Debugf("%s Lock node %v ERROR %v", m.id, m.key, err)
67+
68+
if try < defaultTry {
69+
log.Debugf("%s Try to lock node %v again ERROR, %v", m.id, m.key, err)
70+
}
71+
}
72+
return err
73+
}
74+
75+
func (m *Mutex) lock() (err error) {
76+
log.Debugf("%s Trying to create a node : key=%v", m.id, m.key)
77+
setOptions := &client.SetOptions{
78+
PrevExist: client.PrevNoExist,
79+
TTL: m.ttl,
80+
}
81+
for {
82+
resp, err := m.kapi.Set(context.TODO(), m.key, m.id, setOptions)
83+
if err == nil {
84+
log.Debugf("%s Create node %v OK [%q]", m.id, m.key, resp)
85+
return nil
86+
}
87+
88+
log.Debugf("%s Create node %v failed [%v]", m.id, m.key, err)
89+
e, ok := err.(client.Error)
90+
if !ok {
91+
return err
92+
}
93+
94+
if e.Code != client.ErrorCodeNodeExist {
95+
return err
96+
}
97+
98+
// Get the already node's value.
99+
resp, err = m.kapi.Get(context.TODO(), m.key, nil)
100+
if err != nil {
101+
return err
102+
}
103+
log.Debugf("%s, Get node %v OK", m.id, m.key)
104+
watcherOptions := &client.WatcherOptions{
105+
AfterIndex: resp.Index,
106+
Recursive: false,
107+
}
108+
watcher := m.kapi.Watcher(m.key, watcherOptions)
109+
for {
110+
log.Debugf("%s Watch %v ...", m.id, m.key)
111+
resp, err = watcher.Next(context.TODO())
112+
if err != nil {
113+
return err
114+
}
115+
116+
log.Debugf("%s Received an event: %q", m.id, resp)
117+
if resp.Action == "delete" || resp.Action == "expire" {
118+
// break this for-loop, and try to create the node again.
119+
break
120+
}
121+
}
122+
}
123+
return err
124+
}
125+
126+
// Unlock unlocks m.
127+
// It is a run-time error if m is not locked on entry to Unlock.
128+
//
129+
// A locked Mutex is not associated with a particular goroutine.
130+
// It is allowed for one goroutine to lock a Mutex and then
131+
// arrange for another goroutine to unlock it.
132+
func (m *Mutex) Unlock() (err error) {
133+
defer m.mutex.Unlock()
134+
for i := 1; i <= defaultTry; i++ {
135+
var resp *client.Response
136+
resp, err = m.kapi.Delete(context.TODO(), m.key, nil)
137+
if err == nil {
138+
log.Debugf("%s Delete %v OK", m.id, m.key)
139+
return nil
140+
}
141+
log.Debugf("%s Delete %v failed: %q", m.id, m.key, resp)
142+
e, ok := err.(client.Error)
143+
if ok && e.Code == client.ErrorCodeKeyNotFound {
144+
return nil
145+
}
146+
}
147+
return err
148+
}

0 commit comments

Comments
 (0)