Skip to content

Commit ba42a20

Browse files
author
anrs
committed
feat: transfer tool for renaming to workload
1 parent 9490bac commit ba42a20

File tree

2 files changed

+279
-16
lines changed

2 files changed

+279
-16
lines changed
+263
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
import argparse
4+
import functools
5+
import json
6+
import os
7+
import sys
8+
9+
import etcd3
10+
11+
def remove_prefix(s, prefix):
12+
return s[len(prefix):].lstrip('/') if s.startswith(prefix) else s
13+
14+
def range_prefix(meta, obj_prefix, fn):
15+
orig_prefix = os.path.join(meta.orig_root_prefix, obj_prefix)
16+
17+
for orig_value, orig_meta in meta.etcd.get_prefix(orig_prefix):
18+
orig_key = orig_meta.key.decode('utf-8')
19+
objname = remove_prefix(orig_key, orig_prefix)
20+
21+
new_key = fn(objname, orig_value.decode('utf-8'))
22+
if new_key:
23+
print('convert %s to %s' % (orig_key, new_key))
24+
25+
26+
class Pod(object):
27+
28+
def __init__(self, meta):
29+
"""initializes a pod transfer"""
30+
self.meta = meta
31+
self.pod_prefix = 'pod/info'
32+
self.range_prefix = functools.partial(range_prefix, self.meta)
33+
34+
def trans(self):
35+
self.range_prefix(self.pod_prefix, self._trans)
36+
37+
def _trans(self, podname, orig_value):
38+
new_key = os.path.join(self.meta.new_root_prefix, self.pod_prefix, podname)
39+
self.meta.etcd.put(new_key, orig_value)
40+
return new_key
41+
42+
43+
class Node(object):
44+
45+
def __init__(self, meta):
46+
"""initializes a node transfer"""
47+
self.meta = meta
48+
self.info_prefix = 'node'
49+
self.range_prefix = functools.partial(range_prefix, self.meta, self.info_prefix)
50+
self.nodes = {}
51+
52+
def trans(self):
53+
self.range_prefix(self._trans_info)
54+
self.range_prefix(self._trans_pod)
55+
self.range_prefix(self._trans_cert)
56+
self.range_prefix(self._trans_workload)
57+
58+
def _trans_info(self, nodename, orig_value):
59+
# skipping extra info.
60+
if ':' in nodename:
61+
return
62+
63+
self.nodes[nodename] = json.loads(orig_value)
64+
65+
new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, nodename)
66+
self.meta.etcd.put(new_key, orig_value)
67+
return new_key
68+
69+
def _trans_pod(self, node_pod_pair, orig_value):
70+
# parsering node-pod-pair itself only
71+
if ':pod/' not in node_pod_pair:
72+
return
73+
74+
podname, _, nodename = node_pod_pair.partition(':pod/')
75+
if not (podname and nodename):
76+
raise ValueError('invalid podname or nodename for %s' % node_pod_pair)
77+
78+
new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, '%s:pod' % podname, nodename)
79+
self.meta.etcd.put(new_key, orig_value)
80+
return new_key
81+
82+
def _trans_cert(self, cert_key, orig_value):
83+
nodename, _, cert_type = cert_key.partition(':')
84+
85+
# parsering orig_key which ends with :ca, :cert, :key only
86+
if cert_type not in ('ca', 'cert', 'key'):
87+
return
88+
89+
new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, '%s:%s' % (nodename, cert_type))
90+
self.meta.etcd.put(new_key, orig_value)
91+
return new_key
92+
93+
def _trans_workload(self, node_wrk_pair, orig_value):
94+
nodename, _, wrk_id = node_wrk_pair.partition(':containers/')
95+
96+
# parsering orig_key which belongs node-workload pair only.
97+
if not (nodename and wrk_id):
98+
return
99+
100+
new_key = os.path.join(self.meta.new_root_prefix, self.info_prefix, '%s:worklods' % nodename, wrk_id)
101+
wrk = Workload.conv(orig_value, self)
102+
self.meta.etcd.put(new_key, json.dumps(wrk))
103+
return new_key
104+
105+
def get_numa_node(self, cpumap, nodename):
106+
"""ref core types/core.go GetNUMANode func"""
107+
numa_node_id = ""
108+
109+
node = self.nodes.get(nodename)
110+
if not node:
111+
raise ValueError('invalid nodename %s' % nodename)
112+
113+
numa = node.get('numa')
114+
if not numa:
115+
return numa_node_id
116+
117+
for cpu_id in cpumap:
118+
mem_node = numa.get(cpu_id)
119+
if not mem_node:
120+
continue
121+
122+
if numa_node_id == '':
123+
numa_node_id = mem_node
124+
elif numa_node_id != mem_node:
125+
numa_node_id = ''
126+
127+
return numa_node_id
128+
129+
130+
class Workload(object):
131+
132+
def __init__(self, meta, node_transfer):
133+
"""initializes a workload transfer"""
134+
self.meta = meta
135+
self.container_prefix = 'containers'
136+
self.deploy_prefix = 'deploy'
137+
self.range_prefix = functools.partial(range_prefix, self.meta)
138+
self.node_transfer = node_transfer
139+
140+
def trans(self):
141+
self.range_prefix(self.container_prefix, self._trans_container)
142+
self.range_prefix(self.deploy_prefix, self._trans_deploy)
143+
144+
def _trans_container(self, wrk_id, orig_value):
145+
new_key = os.path.join(self.meta.new_root_prefix, self.container_prefix, wrk_id)
146+
wrk = self.conv(orig_value, self.node_transfer)
147+
self.meta.etcd.put(new_key, json.dumps(wrk))
148+
return new_key
149+
150+
def _trans_deploy(self, deploy_key, orig_value):
151+
parts = deploy_key.split('/')
152+
if len(parts) != 4:
153+
raise ValueError('invalid deploy key: %s' % deploy_key)
154+
155+
appname, entrypoint, nodename, wrk_id = parts
156+
157+
new_key = os.path.join(self.meta.new_root_prefix, self.deploy_prefix, appname, entrypoint, nodename, wrk_id)
158+
wrk = self.conv(orig_value, self.node_transfer)
159+
self.meta.etcd.put(new_key, json.dumps(wrk))
160+
161+
return new_key
162+
163+
@classmethod
164+
def conv(cls, orig_value, node_transfer):
165+
166+
def delete(*keys):
167+
for k in keys:
168+
try:
169+
del dic[k]
170+
except KeyError:
171+
pass
172+
173+
del_keys = set()
174+
def get(new_field, orig_field, transit_field, default=None):
175+
value = None
176+
177+
# don't use dic.get(new_field, dic[orig_field]),
178+
# due to there isn't orig_field but has new_field.
179+
if new_field in dic:
180+
value = dic[new_field]
181+
elif transit_field in dic:
182+
value = dic[transit_field]
183+
else:
184+
if default is None:
185+
value = dic[orig_field]
186+
else:
187+
value = dic.get(orig_field, default)
188+
189+
del_keys.update({orig_field, transit_field})
190+
191+
return value
192+
193+
dic = json.loads(orig_value)
194+
dic['cpu'] = get('CPU', 'cpu', 'CPU')
195+
196+
dic.update(dict(
197+
create_time=1553990400,
198+
cpu_quota_request=get('cpu_quota_request', 'quota', 'CPUQuotaRequest'),
199+
cpu_quota_limit=get('cpu_quota_limit', 'quota', 'CPUQuotaLimit'),
200+
memory_request=get('memory_request', 'memory', 'MemoryRequest'),
201+
memory_limit=get('memory_limit', 'memory', 'MemoryLimit'),
202+
volume_request=get('volume_request', 'volumes', 'VolumeRequest'),
203+
volume_limit=get('volume_limit', 'volumes', 'VolumeLimit'),
204+
volume_plan_request=get('volume_plan_request', 'volume_plan', 'VolumePlanRequest', default={}),
205+
volume_plan_limit=get('volume_plan_limit', 'volume_plan', 'VolumePlanLimit', default={}),
206+
volume_changed=dic.get('volume_changed', False),
207+
storage_request=get('storage_request', 'storage', 'StorageRequest'),
208+
storage_limit=get('storage_limit', 'storage', 'StorageLimit'),
209+
))
210+
211+
dic['numa_node'] = ''
212+
if dic['cpu'] and node_transfer:
213+
numa_node = node_transfer.get_numa_node(dic['cpu'], dic['nodename'])
214+
dic['numa_node'] = dic.get('NUMANode', numa_node)
215+
216+
# don't removing *cpu* from the original dict.
217+
try:
218+
del_keys.remove('cpu')
219+
except KeyError:
220+
pass
221+
222+
del_keys.update({'softlimit', 'VolumeChanged', 'NUMANode'})
223+
delete(*list(del_keys))
224+
225+
return dic
226+
227+
228+
class Transfer(object):
229+
230+
def __init__(self, etcd, orig_root_prefix, new_root_prefix):
231+
"""initializes a transfer which includes common utilities"""
232+
self.etcd = etcd
233+
self.orig_root_prefix = orig_root_prefix
234+
self.new_root_prefix = new_root_prefix
235+
236+
def trans(self):
237+
Pod(self).trans()
238+
239+
node_transfer = Node(self)
240+
node_transfer.trans()
241+
242+
Workload(self, node_transfer).trans()
243+
244+
def getargs():
245+
ap = argparse.ArgumentParser()
246+
ap.add_argument('-o', dest='orig_root_prefix', help='original prefix', default='/eru')
247+
ap.add_argument('-n', dest='new_root_prefix', help='new prefix', default='/eru2')
248+
ap.add_argument('--etcd-host', default='127.0.0.1')
249+
ap.add_argument('--etcd-port', type=int, default=2379)
250+
return ap.parse_args()
251+
252+
def connect_etcd(host, port):
253+
return etcd3.client(host=host, port=port)
254+
255+
def main():
256+
args = getargs()
257+
etcd = connect_etcd(args.etcd_host, args.etcd_port)
258+
trans = Transfer(etcd, args.orig_root_prefix, args.new_root_prefix)
259+
trans.trans()
260+
return 0
261+
262+
if __name__ == '__main__':
263+
sys.exit(main())

types/resource.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,22 @@ type ResourceOptions struct {
2323

2424
// ResourceMeta for messages and workload to store
2525
type ResourceMeta struct {
26-
CPUQuotaRequest float64
27-
CPUQuotaLimit float64
28-
CPU ResourceMap
29-
NUMANode string
30-
31-
MemoryRequest int64
32-
MemoryLimit int64
33-
34-
VolumeRequest VolumeBindings
35-
VolumeLimit VolumeBindings
36-
VolumePlanRequest VolumePlan
37-
VolumePlanLimit VolumePlan
38-
VolumeChanged bool
39-
40-
StorageRequest int64
41-
StorageLimit int64
26+
CPUQuotaRequest float64 `json:"cpu_quota_request"`
27+
CPUQuotaLimit float64 `json:"cpu_quota_limit"`
28+
CPU ResourceMap `json:"cpu"`
29+
NUMANode string `json:"numa_node"`
30+
31+
MemoryRequest int64 `json:"memory_request"`
32+
MemoryLimit int64 `json:"memory_limit"`
33+
34+
VolumeRequest VolumeBindings `json:"volume_request"`
35+
VolumeLimit VolumeBindings `json:"volume_limit"`
36+
VolumePlanRequest VolumePlan `json:"volume_plan_request"`
37+
VolumePlanLimit VolumePlan `json:"volume_plan_limit"`
38+
VolumeChanged bool `json:"volume_changed"`
39+
40+
StorageRequest int64 `json:"storage_request"`
41+
StorageLimit int64 `json:"storage_limit"`
4242
}
4343

4444
// ResourceType .

0 commit comments

Comments
 (0)