|
50 | 50 |
|
51 | 51 | import logging
|
52 | 52 |
|
53 |
| -__version__ = 'v2017.8.2' |
| 53 | +__version__ = 'v2017.8.3' |
54 | 54 |
|
55 | 55 | _max_content_bytes = 100000
|
56 | 56 | http_event_collector_SSL_verify = False
|
|
62 | 62 |
|
63 | 63 |
|
64 | 64 | def returner(ret):
|
65 |
| - opts_list = _get_options() |
66 |
| - |
67 |
| - # Get cloud details |
68 |
| - clouds = get_cloud_details() |
69 |
| - |
70 |
| - for opts in opts_list: |
71 |
| - logging.info('Options: %s' % json.dumps(opts)) |
72 |
| - http_event_collector_key = opts['token'] |
73 |
| - http_event_collector_host = opts['indexer'] |
74 |
| - http_event_collector_port = opts['port'] |
75 |
| - hec_ssl = opts['http_event_server_ssl'] |
76 |
| - proxy = opts['proxy'] |
77 |
| - timeout = opts['timeout'] |
78 |
| - custom_fields = opts['custom_fields'] |
79 |
| - |
80 |
| - # Set up the fields to be extracted at index time. The field values must be strings. |
81 |
| - # Note that these fields will also still be available in the event data |
82 |
| - index_extracted_fields = ['aws_instance_id', 'aws_account_id', 'azure_vmId'] |
83 |
| - try: |
84 |
| - index_extracted_fields.extend(opts['index_extracted_fields']) |
85 |
| - except TypeError: |
86 |
| - pass |
87 |
| - |
88 |
| - # Set up the collector |
89 |
| - hec = http_event_collector(http_event_collector_key, http_event_collector_host, http_event_port=http_event_collector_port, http_event_server_ssl=hec_ssl, proxy=proxy, timeout=timeout) |
90 |
| - |
91 |
| - # st = 'salt:hubble:nova' |
92 |
| - data = ret['return'] |
93 |
| - minion_id = ret['id'] |
94 |
| - jid = ret['jid'] |
95 |
| - master = __grains__['master'] |
96 |
| - fqdn = __grains__['fqdn'] |
97 |
| - # Sometimes fqdn is blank. If it is, replace it with minion_id |
98 |
| - fqdn = fqdn if fqdn else minion_id |
99 |
| - try: |
100 |
| - fqdn_ip4 = __grains__['fqdn_ip4'][0] |
101 |
| - except IndexError: |
102 |
| - fqdn_ip4 = __grains__['ipv4'][0] |
103 |
| - if fqdn_ip4.startswith('127.'): |
104 |
| - for ip4_addr in __grains__['ipv4']: |
105 |
| - if ip4_addr and not ip4_addr.startswith('127.'): |
106 |
| - fqdn_ip4 = ip4_addr |
107 |
| - break |
108 |
| - |
109 |
| - if not data: |
110 |
| - return |
111 |
| - else: |
112 |
| - for query in data: |
113 |
| - for query_name, query_results in query.iteritems(): |
114 |
| - for query_result in query_results['data']: |
115 |
| - event = {} |
116 |
| - payload = {} |
117 |
| - event.update(query_result) |
118 |
| - event.update({'query': query_name}) |
119 |
| - event.update({'job_id': jid}) |
120 |
| - event.update({'master': master}) |
121 |
| - event.update({'minion_id': minion_id}) |
122 |
| - event.update({'dest_host': fqdn}) |
123 |
| - event.update({'dest_ip': fqdn_ip4}) |
124 |
| - |
125 |
| - for cloud in clouds: |
126 |
| - event.update(cloud) |
127 |
| - |
128 |
| - for custom_field in custom_fields: |
129 |
| - custom_field_name = 'custom_' + custom_field |
130 |
| - custom_field_value = __salt__['config.get'](custom_field, '') |
131 |
| - if isinstance(custom_field_value, str): |
132 |
| - event.update({custom_field_name: custom_field_value}) |
133 |
| - elif isinstance(custom_field_value, list): |
134 |
| - custom_field_value = ','.join(custom_field_value) |
135 |
| - event.update({custom_field_name: custom_field_value}) |
136 |
| - |
137 |
| - payload.update({'host': fqdn}) |
138 |
| - payload.update({'index': opts['index']}) |
139 |
| - if opts['add_query_to_sourcetype']: |
140 |
| - payload.update({'sourcetype': "%s_%s" % (opts['sourcetype'], query_name)}) |
141 |
| - else: |
142 |
| - payload.update({'sourcetype': opts['sourcetype']}) |
143 |
| - payload.update({'event': event}) |
144 |
| - |
145 |
| - # Potentially add metadata fields: |
146 |
| - fields = {} |
147 |
| - for item in index_extracted_fields: |
148 |
| - if item in payload['event'] and not isinstance(payload['event'][item], (list, dict, tuple)): |
149 |
| - fields[item] = str(payload['event'][item]) |
150 |
| - if fields: |
151 |
| - payload.update({'fields': fields}) |
152 |
| - |
153 |
| - # If the osquery query includes a field called 'time' it will be checked. |
154 |
| - # If it's within the last year, it will be used as the eventtime. |
155 |
| - event_time = query_result.get('time', '') |
156 |
| - try: |
157 |
| - if (datetime.fromtimestamp(time.time()) - datetime.fromtimestamp(float(event_time))).days > 365: |
| 65 | + try: |
| 66 | + opts_list = _get_options() |
| 67 | + |
| 68 | + # Get cloud details |
| 69 | + clouds = get_cloud_details() |
| 70 | + |
| 71 | + for opts in opts_list: |
| 72 | + logging.info('Options: %s' % json.dumps(opts)) |
| 73 | + http_event_collector_key = opts['token'] |
| 74 | + http_event_collector_host = opts['indexer'] |
| 75 | + http_event_collector_port = opts['port'] |
| 76 | + hec_ssl = opts['http_event_server_ssl'] |
| 77 | + proxy = opts['proxy'] |
| 78 | + timeout = opts['timeout'] |
| 79 | + custom_fields = opts['custom_fields'] |
| 80 | + |
| 81 | + # Set up the fields to be extracted at index time. The field values must be strings. |
| 82 | + # Note that these fields will also still be available in the event data |
| 83 | + index_extracted_fields = ['aws_instance_id', 'aws_account_id', 'azure_vmId'] |
| 84 | + try: |
| 85 | + index_extracted_fields.extend(opts['index_extracted_fields']) |
| 86 | + except TypeError: |
| 87 | + pass |
| 88 | + |
| 89 | + # Set up the collector |
| 90 | + hec = http_event_collector(http_event_collector_key, http_event_collector_host, http_event_port=http_event_collector_port, http_event_server_ssl=hec_ssl, proxy=proxy, timeout=timeout) |
| 91 | + |
| 92 | + # st = 'salt:hubble:nova' |
| 93 | + data = ret['return'] |
| 94 | + minion_id = ret['id'] |
| 95 | + jid = ret['jid'] |
| 96 | + master = __grains__['master'] |
| 97 | + fqdn = __grains__['fqdn'] |
| 98 | + # Sometimes fqdn is blank. If it is, replace it with minion_id |
| 99 | + fqdn = fqdn if fqdn else minion_id |
| 100 | + try: |
| 101 | + fqdn_ip4 = __grains__['fqdn_ip4'][0] |
| 102 | + except IndexError: |
| 103 | + fqdn_ip4 = __grains__['ipv4'][0] |
| 104 | + if fqdn_ip4.startswith('127.'): |
| 105 | + for ip4_addr in __grains__['ipv4']: |
| 106 | + if ip4_addr and not ip4_addr.startswith('127.'): |
| 107 | + fqdn_ip4 = ip4_addr |
| 108 | + break |
| 109 | + |
| 110 | + if not data: |
| 111 | + return |
| 112 | + else: |
| 113 | + for query in data: |
| 114 | + for query_name, query_results in query.iteritems(): |
| 115 | + for query_result in query_results['data']: |
| 116 | + event = {} |
| 117 | + payload = {} |
| 118 | + event.update(query_result) |
| 119 | + event.update({'query': query_name}) |
| 120 | + event.update({'job_id': jid}) |
| 121 | + event.update({'master': master}) |
| 122 | + event.update({'minion_id': minion_id}) |
| 123 | + event.update({'dest_host': fqdn}) |
| 124 | + event.update({'dest_ip': fqdn_ip4}) |
| 125 | + |
| 126 | + for cloud in clouds: |
| 127 | + event.update(cloud) |
| 128 | + |
| 129 | + for custom_field in custom_fields: |
| 130 | + custom_field_name = 'custom_' + custom_field |
| 131 | + custom_field_value = __salt__['config.get'](custom_field, '') |
| 132 | + if isinstance(custom_field_value, str): |
| 133 | + event.update({custom_field_name: custom_field_value}) |
| 134 | + elif isinstance(custom_field_value, list): |
| 135 | + custom_field_value = ','.join(custom_field_value) |
| 136 | + event.update({custom_field_name: custom_field_value}) |
| 137 | + |
| 138 | + payload.update({'host': fqdn}) |
| 139 | + payload.update({'index': opts['index']}) |
| 140 | + if opts['add_query_to_sourcetype']: |
| 141 | + payload.update({'sourcetype': "%s_%s" % (opts['sourcetype'], query_name)}) |
| 142 | + else: |
| 143 | + payload.update({'sourcetype': opts['sourcetype']}) |
| 144 | + payload.update({'event': event}) |
| 145 | + |
| 146 | + # Potentially add metadata fields: |
| 147 | + fields = {} |
| 148 | + for item in index_extracted_fields: |
| 149 | + if item in payload['event'] and not isinstance(payload['event'][item], (list, dict, tuple)): |
| 150 | + fields[item] = str(payload['event'][item]) |
| 151 | + if fields: |
| 152 | + payload.update({'fields': fields}) |
| 153 | + |
| 154 | + # If the osquery query includes a field called 'time' it will be checked. |
| 155 | + # If it's within the last year, it will be used as the eventtime. |
| 156 | + event_time = query_result.get('time', '') |
| 157 | + try: |
| 158 | + if (datetime.fromtimestamp(time.time()) - datetime.fromtimestamp(float(event_time))).days > 365: |
| 159 | + event_time = '' |
| 160 | + except: |
158 | 161 | event_time = ''
|
159 |
| - except: |
160 |
| - event_time = '' |
161 |
| - finally: |
162 |
| - hec.batchEvent(payload, eventtime=event_time) |
| 162 | + finally: |
| 163 | + hec.batchEvent(payload, eventtime=event_time) |
163 | 164 |
|
164 |
| - hec.flushBatch() |
| 165 | + hec.flushBatch() |
| 166 | + except: |
| 167 | + log.exception('Error ocurred in splunk_nebula_return') |
165 | 168 | return
|
166 | 169 |
|
167 | 170 |
|
@@ -202,6 +205,7 @@ def _get_options():
|
202 | 205 | splunk_opts['proxy'] = __salt__['config.get']('hubblestack:nebula:returner:splunk:proxy', {})
|
203 | 206 | splunk_opts['timeout'] = __salt__['config.get']('hubblestack:nebula:returner:splunk:timeout', 9.05)
|
204 | 207 | splunk_opts['index_extracted_fields'] = __salt__['config.get']('hubblestack:nebula:returner:splunk:index_extracted_fields', [])
|
| 208 | + splunk_opts['port'] = __salt__['config.get']('hubblestack:nebula:returner:splunk:port', '8088') |
205 | 209 |
|
206 | 210 | return [splunk_opts]
|
207 | 211 |
|
|
0 commit comments