-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_cli_mappings.py
131 lines (102 loc) · 4.2 KB
/
generate_cli_mappings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import json
from pathlib import Path
import awscli.customizations.cloudfront
import awscli.customizations.cloudtrail
import awscli.customizations.cloudformation
import awscli.customizations.waiters
import awscli.clidriver
import urllib.request
import botocore.model
import boto3
def main():
driver = awscli.clidriver.create_clidriver()
cmd_table = driver._get_command_table()
p = Path(__file__).parent/'map.json'
if p.exists():
data = p.read_text()
else:
resp = urllib.request.urlopen('https://raw.githubusercontent.com/iann0036/iam-dataset/main/aws/map.json')
data = resp.read().decode('utf-8')
p.write_text(data)
iam_map = json.loads(data)
# iam_map['sdk_method_iam_mappings']
svc_id_to_js_names = json.loads(
Path('svc_id_to_js_names.json').read_text(),
)
def boto3_service(svc_name: str) -> botocore.model.ServiceModel:
client: boto3.client = boto3.client(svc_name)
svc_model: botocore.model.ServiceModel = client.meta.service_model
return svc_model
cli_svc_info = {}
for key, value in cmd_table.items():
if key in ['configure', 'history']:
continue
if hasattr(value, 'service_model'):
cli_svc_info[key] = {
"cmd_table": value,
"model": value.service_model,
"js_sdk_name": svc_id_to_js_names[value.service_model.service_id],
}
else:
svc_model = boto3_service(key)
cli_svc_info[key] = {
"cmd_table": value,
"model": svc_model,
"js_sdk_name": svc_id_to_js_names[svc_model.service_id],
}
awscli_iam_mappings = {}
for cli_svc_name, v in cli_svc_info.items():
try:
svc_cmd_table = v['cmd_table']._get_command_table()
except AttributeError:
print(f"Could not find command table: {cli_svc_name}")
continue
awscli_iam_mappings[cli_svc_name] = {}
for cli_cmd_name, cli_cmd in svc_cmd_table.items():
cli_cmd: awscli.clidriver.ServiceOperation
try:
op_model: botocore.model.OperationModel = cli_cmd._operation_model
except AttributeError:
if type(cli_cmd) in [
awscli.customizations.cloudformation.package.PackageCommand,
awscli.customizations.cloudfront.SignCommand,
awscli.customizations.cloudtrail.subscribe.CloudTrailSubscribe,
awscli.customizations.cloudtrail.subscribe.CloudTrailUpdate,
awscli.customizations.cloudtrail.validation.CloudTrailValidateLogs,
]:
continue
op_models = []
for name, subcmd in cli_cmd.subcommand_table.items():
try:
op_models.append(subcmd._operation_model)
except AttributeError:
continue
if op_models:
# TODO: probably need to handle all of these
op_model = op_models[0]
else:
print(f"Could not find operation_model: {cli_svc_name}.{cli_cmd_name}: {type(cli_cmd)}")
continue
try:
map = iam_map['sdk_method_iam_mappings'][f"{v['js_sdk_name']}.{op_model.name}"]
awscli_iam_mappings[cli_svc_name][cli_cmd_name] = map
except KeyError:
print(f"NotFound: {v['js_sdk_name']}.{op_model.name}")
awscli_iam_mappings[cli_svc_name][cli_cmd_name] = [{
"note": "NotFound",
"action": f"{v['model'].service_id}:{op_model.name}",
"resource_mappings": {},
}]
continue
Path('../../cmd/aws/cli_iam_map.json').write_text(
json.dumps(awscli_iam_mappings, indent=2),
)
# for name, mapping in iam_map['sdk_method_iam_mappings'].items():
# js_name, action_name = name.split('.')
# svc_id = js_names_to_svc_ids[js_name]
# cmd_name = svc_ids_cmd_names[svc_id]
#
# xform_name
#
if __name__ == '__main__':
main()