Skip to content

Commit

Permalink
suricatals: add a flag to bypass engine analysis check
Browse files Browse the repository at this point in the history
Ref: #5
  • Loading branch information
sonicold committed Oct 16, 2023
1 parent 4582de7 commit 10b69c2
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 105 deletions.
8 changes: 7 additions & 1 deletion suricatals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def main():
'--batch-file', default=None,
help="Batch mode to parse only the file in argument"
)
parser.add_argument(
'--no-engine-analysis', action='store_true', default=False,
help='Disable suricata engine analysis (used with --batch-file only)'
)
args = parser.parse_args()
if args.version:
print("{0}".format(__version__))
Expand All @@ -77,6 +81,8 @@ def main():
"max_tracked_files": args.max_tracked_files,
}
#
if not args.batch_file and args.no_engine_analysis:
print('--no-engine-analysis must be used with --batch-file')

if args.batch_file is None:
stdin, stdout = _binary_stdio()
Expand All @@ -85,7 +91,7 @@ def main():
s.run()
else:
s = LangServer(conn=None, settings=settings)
_, diags = s.analyse_file(args.batch_file)
_, diags = s.analyse_file(args.batch_file, not args.no_engine_analysis)
for diag in diags:
print(json.dumps(diag.to_message()))

Expand Down
4 changes: 2 additions & 2 deletions suricatals/langserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,10 @@ def serve_default(self, request):
code=-32601,
message="method {} not found".format(request["method"]))

def analyse_file(self, filepath, **kwargs):
def analyse_file(self, filepath, engine_analysis=True, **kwargs):
file_obj = SuricataFile(filepath, self.rules_tester)
file_obj.load_from_disk()
return file_obj.check_file(**kwargs)
return file_obj.check_file(engine_analysis=engine_analysis, **kwargs)


class JSONRPC2Error(Exception):
Expand Down
151 changes: 76 additions & 75 deletions suricatals/parse_signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ def load_from_disk(self):
def sort_diagnosis(self, key):
return -key.severity

def check_file(self, workspace=None, **kwargs):
def check_file(self, workspace=None, engine_analysis=True, **kwargs):
diagnostics = []
result = {}
if not workspace:
workspace = {}
with open(self.path, 'r', encoding='utf-8', errors='replace') as fhandle:
result = self.rules_tester.check_rule_buffer(fhandle.read(), **kwargs)
result = self.rules_tester.check_rule_buffer(fhandle.read(), engine_analysis, **kwargs)
self.mpm = result.get('mpm', {}).get('buffer')
for sid in result.get('mpm', {}).get('sids', []):
signature = self.sigset.get_sig_by_sid(sid)
Expand Down Expand Up @@ -288,84 +288,85 @@ def check_file(self, workspace=None, **kwargs):
l_diag.sid = warning.get('sid', 'UNKNOWN')

diagnostics.append(l_diag)
for info in result.get('info', []):
line = None
signature = None
l_diag = Diagnosis()
l_diag.message = info['message']
l_diag.source = info['source']
l_diag.errno = info['error_code']
l_diag.severity = Diagnosis.INFO_LEVEL
if 'line' in info:
line = info['line']
elif 'content' in info:
signature = self.sigset.get_sig_by_content(info['content'])
if signature:
line = signature.line
if line is None:
continue
sig_range = FileRange(line, 0, line, 1)
if signature is not None:
if "Fast Pattern \"" in info['message']:
if signature.mpm is not None:
sig_range = signature.get_diag_range(mode='pattern', pattern=signature.mpm['pattern'])

if engine_analysis:
for info in result.get('info', []):
line = None
signature = None
l_diag = Diagnosis()
l_diag.message = info['message']
l_diag.source = info['source']
l_diag.severity = Diagnosis.INFO_LEVEL
if 'line' in info:
line = info['line']
elif 'content' in info:
signature = self.sigset.get_sig_by_content(info['content'])
if signature:
line = signature.line
if line is None:
continue
sig_range = FileRange(line, 0, line, 1)
if signature is not None:
if "Fast Pattern \"" in info['message']:
if signature.mpm is not None:
sig_range = signature.get_diag_range(mode='pattern', pattern=signature.mpm['pattern'])
else:
sig_range = signature.get_diag_range(mode='sid')
else:
sig_range = signature.get_diag_range(mode='sid')
else:
sig_range = signature.get_diag_range(mode='sid')

l_diag.content = signature.content
l_diag.sid = signature.sid
else:
l_diag.content = info.get('content', '')
l_diag.sid = info.get('sid', 'UNKNOWN')
l_diag.content = signature.content
l_diag.sid = signature.sid
else:
l_diag.content = info.get('content', '')
l_diag.sid = info.get('sid', 'UNKNOWN')

l_diag.range = sig_range
diagnostics.append(l_diag)
for sig in self.sigset.signatures:
if sig.mpm is None:
if sig.sid and sig.has_error is False:
message = "No Fast Pattern used, if possible add one content match to improve performance."
l_diag = Diagnosis()
l_diag.message = message
l_diag.source = "Suricata MPM Analysis"
l_diag.severity = Diagnosis.INFO_LEVEL
l_diag.range = sig.get_diag_range(mode="sid")
l_diag.sid = sig.sid
l_diag.content = sig.content
diagnostics.append(l_diag)
continue
# mpm is content:"$pattern"
pattern = self.mpm.get(sig.mpm['buffer'], {}).get(sig.mpm['pattern'])
if pattern is None:
continue
pattern_count = pattern['count']
for sig_file in workspace:
if sig_file != self.path:
file_obj = workspace.get(sig_file)
if file_obj is None or file_obj.mpm is None:
continue
f_pattern = file_obj.mpm.get(sig.mpm['buffer'], {}).get(sig.mpm['pattern'])
if f_pattern is None:
continue
pattern_count += f_pattern['count']
l_diag = Diagnosis()
if pattern_count > 1:
l_diag.message = "Fast Pattern '%s' on '%s' buffer is used in %d different signatures, " \
"consider using a unique fast pattern to improve performance." \
% (sig.mpm['pattern'], sig.mpm['buffer'], pattern_count)
l_diag.source = "SLS MPM Analysis"
else:
if sig.get_content_keyword_count() == 1:
l_diag.range = sig_range
diagnostics.append(l_diag)
for sig in self.sigset.signatures:
if sig.mpm is None:
if sig.sid and sig.has_error is False:
message = "No Fast Pattern used, if possible add one content match to improve performance."
l_diag = Diagnosis()
l_diag.message = message
l_diag.source = "Suricata MPM Analysis"
l_diag.severity = Diagnosis.INFO_LEVEL
l_diag.range = sig.get_diag_range(mode="sid")
l_diag.sid = sig.sid
l_diag.content = sig.content
diagnostics.append(l_diag)
continue
l_diag.message = "Fast Pattern '%s' on '%s' buffer" % (sig.mpm['pattern'], sig.mpm['buffer'])
l_diag.source = "Suricata MPM Analysis"
sig_range = sig.get_diag_range(mode="pattern", pattern=sig.mpm['pattern'])
l_diag.severity = Diagnosis.INFO_LEVEL
l_diag.range = sig.get_diag_range(mode="pattern", pattern=sig.mpm['pattern'])
l_diag.sid = sig.sid
l_diag.content = sig.content
diagnostics.append(l_diag)
# mpm is content:"$pattern"
pattern = self.mpm.get(sig.mpm['buffer'], {}).get(sig.mpm['pattern'])
if pattern is None:
continue
pattern_count = pattern['count']
for sig_file in workspace:
if sig_file != self.path:
file_obj = workspace.get(sig_file)
if file_obj is None or file_obj.mpm is None:
continue
f_pattern = file_obj.mpm.get(sig.mpm['buffer'], {}).get(sig.mpm['pattern'])
if f_pattern is None:
continue
pattern_count += f_pattern['count']
l_diag = Diagnosis()
if pattern_count > 1:
l_diag.message = "Fast Pattern '%s' on '%s' buffer is used in %d different signatures, " \
"consider using a unique fast pattern to improve performance." \
% (sig.mpm['pattern'], sig.mpm['buffer'], pattern_count)
l_diag.source = "SLS MPM Analysis"
else:
if sig.get_content_keyword_count() == 1:
continue
l_diag.message = "Fast Pattern '%s' on '%s' buffer" % (sig.mpm['pattern'], sig.mpm['buffer'])
l_diag.source = "Suricata MPM Analysis"
sig_range = sig.get_diag_range(mode="pattern", pattern=sig.mpm['pattern'])
l_diag.severity = Diagnosis.INFO_LEVEL
l_diag.range = sig.get_diag_range(mode="pattern", pattern=sig.mpm['pattern'])
l_diag.sid = sig.sid
l_diag.content = sig.content
diagnostics.append(l_diag)
for sig in self.sigset.signatures:
sls_diag = sig.sls_syntax_check()
if len(sls_diag):
Expand Down
57 changes: 30 additions & 27 deletions suricatals/tests_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def generate_config(self, tmpdir, config_buffer=None, related_files=None,

return config_file

def rule_buffer(self, rule_buffer, config_buffer=None, related_files=None,
def rule_buffer(self, rule_buffer, engine_analysis=True, config_buffer=None, related_files=None,
reference_config=None, classification_config=None, extra_buffers=None):
# create temp directory
tmpdir = tempfile.mkdtemp()
Expand Down Expand Up @@ -449,38 +449,41 @@ def rule_buffer(self, rule_buffer, config_buffer=None, related_files=None,
if suriprocess.returncode != 0:
result['status'] = False
result['errors'] = errdata.decode('utf-8')
# runs rules analysis to have warnings
suri_cmd = [self.suricata_binary, '--engine-analysis', '-l', tmpdir, '-S', rule_file, '-c', config_file]
# start suricata in test mode
suriprocess = subprocess.Popen(suri_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
suriprocess.communicate()
engine_analysis = self.parse_engine_analysis(tmpdir)
for signature in engine_analysis:
for warning in signature.get('warnings', []):
result['warnings'].append({
'message': warning,
'source': self.SURICATA_ENGINE_ANALYSIS,
'sid': signature.get('sid', 'UNKNOWN'),
'content': signature['content']
})

for info in signature.get('info', []):
result['info'].append({
'message': info,
'source': self.SURICATA_ENGINE_ANALYSIS,
'content': signature['content'],
'sid': signature.get('sid', 'UNKNOWN')
})

mpm_analysis = self.mpm_parse_rules_json(tmpdir)
result['mpm'] = mpm_analysis

if engine_analysis:
# runs rules analysis to have warnings
suri_cmd = [self.suricata_binary, '--engine-analysis', '-l', tmpdir, '-S', rule_file, '-c', config_file]
# start suricata in test mode
suriprocess = subprocess.Popen(suri_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
suriprocess.communicate()
engine_analysis = self.parse_engine_analysis(tmpdir)
for signature in engine_analysis:
for warning in signature.get('warnings', []):
result['warnings'].append({
'message': warning,
'source': self.SURICATA_ENGINE_ANALYSIS,
'sid': signature.get('sid', 'UNKNOWN'),
'content': signature['content']
})

for info in signature.get('info', []):
result['info'].append({
'message': info,
'source': self.SURICATA_ENGINE_ANALYSIS,
'content': signature['content'],
'sid': signature.get('sid', 'UNKNOWN')
})

mpm_analysis = self.mpm_parse_rules_json(tmpdir)
result['mpm'] = mpm_analysis
shutil.rmtree(tmpdir)
return result

def check_rule_buffer(self, rule_buffer, config_buffer=None, related_files=None, extra_buffers=None):
def check_rule_buffer(self, rule_buffer, engine_analysis=True, config_buffer=None, related_files=None, extra_buffers=None):
related_files = related_files or {}
prov_result = self.rule_buffer(
rule_buffer,
engine_analysis=engine_analysis,
config_buffer=config_buffer,
related_files=related_files,
extra_buffers=extra_buffers
Expand Down

0 comments on commit 10b69c2

Please sign in to comment.