From ed9e709f4969c84b4c3b7a3b0ad348ba697a1cae Mon Sep 17 00:00:00 2001 From: nick2wang Date: Tue, 1 Mar 2022 14:01:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81ClickHouse=E4=B8=8A=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 支持ClickHouse上线 --- README.md | 2 +- sql/engines/clickhouse.py | 238 ++++++++++++++++++++++++++++++++++- sql/engines/tests.py | 95 ++++++++++++++ sql/templates/sqlsubmit.html | 4 + 4 files changed, 336 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2bf986717f..4cfb89eb98 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ Archery是[archer](https://github.com/jly8866/archer)的分支项目,定位于 | MongoDB | √ | √ | √ | × | × | × | × | × | × | × | | Phoenix | √ | × | √ | × | × | × | × | × | × | × | | ODPS | √ | × | × | × | × | × | × | × | × | × | -| ClickHouse | √ | × | × | × | × | × | × | × | × | × | +| ClickHouse | √ | √ | √ | × | × | × | × | × | × | × | diff --git a/sql/engines/clickhouse.py b/sql/engines/clickhouse.py index e2d1caf25c..b7d4d0eae8 100644 --- a/sql/engines/clickhouse.py +++ b/sql/engines/clickhouse.py @@ -1,7 +1,10 @@ # -*- coding: UTF-8 -*- from clickhouse_driver import connect +from sql.utils.sql_utils import get_syntax_type +from .models import ResultSet, ReviewResult, ReviewSet +from common.utils.timer import FuncTimer +from common.config import SysConfig from . import EngineBase -from .models import ResultSet import sqlparse import logging import re @@ -13,6 +16,7 @@ class ClickHouseEngine(EngineBase): def __init__(self, instance=None): super(ClickHouseEngine, self).__init__(instance=instance) + self.config = SysConfig() def get_connection(self, db_name=None): if self.conn: @@ -45,12 +49,25 @@ def server_version(self): version = result.rows[0][0].split(' ')[1] return tuple([int(n) for n in version.split('.')[:3]]) + def get_table_engine(self, tb_name): + """获取某个table的engine type""" + sql = f"""select engine + from system.tables + where database='{tb_name.split('.')[0]}' + and name='{tb_name.split('.')[1]}'""" + query_result = self.query(sql=sql) + if query_result.rows: + result = {'status': 1, 'engine': query_result.rows[0][0]} + else: + result = {'status': 0, 'engine': 'None'} + return result + def get_all_databases(self): """获取数据库列表, 返回一个ResultSet""" sql = "show databases" result = self.query(sql=sql) db_list = [row[0] for row in result.rows - if row[0] not in ('system','INFORMATION_SCHEMA', 'information_schema', 'datasets')] + if row[0] not in ('system', 'INFORMATION_SCHEMA', 'information_schema', 'datasets')] result.rows = db_list return result @@ -170,6 +187,223 @@ def filter_sql(self, sql='', limit_num=0): sql = f'{sql};' return sql + def explain_check(self, check_result, db_name=None, line=0, statement=''): + """使用explain ast检查sql语法, 返回Review set""" + result = ReviewResult(id=line, errlevel=0, + stagestatus='Audit completed', + errormessage='None', + sql=statement, + affected_rows=0, + execute_time=0, ) + # clickhouse版本>=20.6.3才使用explain检查 + if self.server_version >= (20, 6, 3): + explain_result = self.query(db_name=db_name, sql=f"explain ast {statement}") + if explain_result.error: + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='驳回未通过检查SQL', + errormessage=f'explain语法检查错误:{explain_result.error}', + sql=statement) + return result + + def execute_check(self, db_name=None, sql=''): + """上线单执行前的检查, 返回Review set""" + sql = sqlparse.format(sql, strip_comments=True) + sql_list = sqlparse.split(sql) + + # 禁用/高危语句检查 + check_result = ReviewSet(full_sql=sql) + line = 1 + critical_ddl_regex = self.config.get('critical_ddl_regex', '') + p = re.compile(critical_ddl_regex) + check_result.syntax_type = 2 # TODO 工单类型 0、其他 1、DDL,2、DML + + for statement in sql_list: + statement = statement.rstrip(';') + # 禁用语句 + if re.match(r"^select|^show", statement.lower()): + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='驳回不支持语句', + errormessage='仅支持DML和DDL语句,查询语句请使用SQL查询功能!', + sql=statement) + # 高危语句 + elif critical_ddl_regex and p.match(statement.strip().lower()): + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='驳回高危SQL', + errormessage='禁止提交匹配' + critical_ddl_regex + '条件的语句!', + sql=statement) + # alter语句 + elif re.match(r"^alter", statement.lower()): + # alter table语句 + if re.match(r"^alter\s+table\s+(.+?)\s+", statement.lower()): + table_name = re.match(r"^alter\s+table\s+(.+?)\s+", statement.lower(), re.M).group(1) + if '.' not in table_name: + table_name = f"{db_name}.{table_name}" + table_engine = self.get_table_engine(table_name)['engine'] + table_exist = self.get_table_engine(table_name)['status'] + if table_exist == 1: + if not table_engine.endswith('MergeTree') and table_engine not in ('Merge', 'Distributed'): + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='驳回不支持SQL', + errormessage='ALTER TABLE仅支持*MergeTree,Merge以及Distributed等引擎表!', + sql=statement) + else: + # delete与update语句,实际是alter语句的变种 + if re.match(r"^alter\s+table\s+(.+?)\s+(delete|update)\s+", statement.lower()): + if not table_engine.endswith('MergeTree'): + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='驳回不支持SQL', + errormessage='DELETE与UPDATE仅支持*MergeTree引擎表!', + sql=statement) + else: + result = self.explain_check(check_result, db_name, line, statement) + else: + result = self.explain_check(check_result, db_name, line, statement) + else: + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='表不存在', + errormessage=f'表 {table_name} 不存在!', + sql=statement) + # 其他alter语句 + else: + result = self.explain_check(check_result, db_name, line, statement) + # truncate语句 + elif re.match(r"^truncate\s+table\s+(.+?)(\s|$)", statement.lower()): + table_name = re.match(r"^truncate\s+table\s+(.+?)(\s|$)", statement.lower(), re.M).group(1) + if '.' not in table_name: + table_name = f"{db_name}.{table_name}" + table_engine = self.get_table_engine(table_name)['engine'] + table_exist = self.get_table_engine(table_name)['status'] + if table_exist == 1: + if table_engine in ('View', 'File,', 'URL', 'Buffer', 'Null'): + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='驳回不支持SQL', + errormessage='TRUNCATE不支持View,File,URL,Buffer和Null表引擎!', + sql=statement) + else: + result = self.explain_check(check_result, db_name, line, statement) + else: + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='表不存在', + errormessage=f'表 {table_name} 不存在!', + sql=statement) + # insert语句,explain无法正确判断,暂时只做表存在性检查与简单关键字匹配 + elif re.match(r"^insert", statement.lower()): + if re.match(r"^insert\s+into\s+(.+?)(\s+|\s*\(.+?)(values|format|select)(\s+|\()", statement.lower()): + table_name = re.match(r"^insert\s+into\s+(.+?)(\s+|\s*\(.+?)(values|format|select)(\s+|\()", + statement.lower(), re.M).group(1) + if '.' not in table_name: + table_name = f"{db_name}.{table_name}" + table_exist = self.get_table_engine(table_name)['status'] + if table_exist == 1: + result = ReviewResult(id=line, errlevel=0, + stagestatus='Audit completed', + errormessage='None', + sql=statement, + affected_rows=0, + execute_time=0, ) + else: + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='表不存在', + errormessage=f'表 {table_name} 不存在!', + sql=statement) + else: + check_result.is_critical = True + result = ReviewResult(id=line, errlevel=2, + stagestatus='驳回不支持SQL', + errormessage='INSERT语法不正确!', + sql=statement) + # 其他语句使用explain ast简单检查 + else: + result = self.explain_check(check_result, db_name, line, statement) + + # 没有找出DDL语句的才继续执行此判断 + if check_result.syntax_type == 2: + if get_syntax_type(statement, parser=False, db_type='mysql') == 'DDL': + check_result.syntax_type = 1 + check_result.rows += [result] + + # 遇到禁用和高危语句直接返回 + if check_result.is_critical: + check_result.error_count += 1 + return check_result + line += 1 + return check_result + + def execute_workflow(self, workflow): + """执行上线单,返回Review set""" + sql = workflow.sqlworkflowcontent.sql_content + execute_result = ReviewSet(full_sql=sql) + sqls = sqlparse.format(sql, strip_comments=True) + sql_list = sqlparse.split(sqls) + + line = 1 + for statement in sql_list: + with FuncTimer() as t: + result = self.execute(db_name=workflow.db_name, sql=statement, close_conn=True) + if not result.error: + execute_result.rows.append(ReviewResult( + id=line, + errlevel=0, + stagestatus='Execute Successfully', + errormessage='None', + sql=statement, + affected_rows=0, + execute_time=t.cost, + )) + line += 1 + else: + # 追加当前报错语句信息到执行结果中 + execute_result.error = result.error + execute_result.rows.append(ReviewResult( + id=line, + errlevel=2, + stagestatus='Execute Failed', + errormessage=f'异常信息:{result.error}', + sql=statement, + affected_rows=0, + execute_time=0, + )) + line += 1 + # 报错语句后面的语句标记为审核通过、未执行,追加到执行结果中 + for statement in sql_list[line - 1:]: + execute_result.rows.append(ReviewResult( + id=line, + errlevel=0, + stagestatus='Audit completed', + errormessage=f'前序语句失败, 未执行', + sql=statement, + affected_rows=0, + execute_time=0, + )) + line += 1 + break + return execute_result + + def execute(self, db_name=None, sql='', close_conn=True): + """原生执行语句""" + result = ResultSet(full_sql=sql) + conn = self.get_connection(db_name=db_name) + try: + cursor = conn.cursor() + for statement in sqlparse.split(sql): + cursor.execute(statement) + cursor.close() + except Exception as e: + logger.warning(f"ClickHouse语句执行报错,语句:{sql},错误信息{e}") + result.error = str(e).split('Stack trace')[0] + if close_conn: + self.close() + return result + def close(self): if self.conn: self.conn.close() diff --git a/sql/engines/tests.py b/sql/engines/tests.py index 159fdefc7e..7678431105 100644 --- a/sql/engines/tests.py +++ b/sql/engines/tests.py @@ -1656,10 +1656,26 @@ def setUp(self): port=9000, user='ins_user', password='some_str') self.ins1.save() self.sys_config = SysConfig() + self.wf = SqlWorkflow.objects.create( + workflow_name='some_name', + group_id=1, + group_name='g1', + engineer_display='', + audit_auth_groups='some_group', + create_time=datetime.now() - timedelta(days=1), + status='workflow_finish', + is_backup=False, + instance=self.ins1, + db_name='some_db', + syntax_type=1 + ) + SqlWorkflowContent.objects.create(workflow=self.wf) def tearDown(self): self.ins1.delete() self.sys_config.purge() + SqlWorkflow.objects.all().delete() + SqlWorkflowContent.objects.all().delete() @patch.object(ClickHouseEngine, 'query') def test_server_version(self, mock_query): @@ -1670,6 +1686,16 @@ def test_server_version(self, mock_query): server_version = new_engine.server_version self.assertTupleEqual(server_version, (22, 1, 3)) + @patch.object(ClickHouseEngine, 'query') + def test_table_engine(self, mock_query): + table_name = 'default.tb_test' + result = ResultSet() + result.rows = [('MergeTree',)] + mock_query.return_value = result + new_engine = ClickHouseEngine(instance=self.ins1) + table_engine = new_engine.get_table_engine(table_name) + self.assertDictEqual(table_engine, {'status': 1, 'engine': 'MergeTree'}) + @patch('clickhouse_driver.connect') def test_engine_base_info(self, _conn): new_engine = ClickHouseEngine(instance=self.ins1) @@ -1740,6 +1766,36 @@ def test_query_check_update_sql(self): {'msg': '不支持的查询语法类型!', 'bad_query': True, 'filtered_sql': 'update user set id=0', 'has_star': False}) + @patch.object(ClickHouseEngine, 'query') + def test_explain_check(self, mock_query): + result = ResultSet() + result.rows = [('ClickHouse 20.1.3.7',)] + mock_query.return_value = result + new_engine = ClickHouseEngine(instance=self.ins1) + server_version = new_engine.server_version + sql = "insert into tb_test(note) values ('xbb');" + check_result = ReviewSet(full_sql=sql) + explain_result = new_engine.explain_check(check_result, db_name='some_db', line=1, statement=sql) + self.assertEqual(explain_result.stagestatus, "Audit completed") + + def test_execute_check_select_sql(self): + new_engine = ClickHouseEngine(instance=self.ins1) + select_sql = 'select id,name from tb_test' + check_result = new_engine.execute_check(db_name='some_db', sql=select_sql) + self.assertEqual(check_result.rows[0].errormessage, "仅支持DML和DDL语句,查询语句请使用SQL查询功能!") + + @patch.object(ClickHouseEngine, 'query') + def test_execute_check_alter_sql(self, mock_query): + table_name = 'default.tb_test' + result = ResultSet() + result.rows = [('Log',)] + mock_query.return_value = result + new_engine = ClickHouseEngine(instance=self.ins1) + table_engine = new_engine.get_table_engine(table_name) + alter_sql = "alter table default.tb_test add column remark String" + check_result = new_engine.execute_check(db_name='some_db', sql=alter_sql) + self.assertEqual(check_result.rows[0].errormessage, "ALTER TABLE仅支持*MergeTree,Merge以及Distributed等引擎表!") + def test_filter_sql_with_delimiter(self): new_engine = ClickHouseEngine(instance=self.ins1) sql_without_limit = 'select user from usertable;' @@ -1787,3 +1843,42 @@ def test_filter_sql_not_select(self): sql_without_limit = 'show create table usertable;' check_result = new_engine.filter_sql(sql=sql_without_limit, limit_num=1) self.assertEqual(check_result, 'show create table usertable;') + + @patch('clickhouse_driver.connect.cursor.execute') + @patch('clickhouse_driver.connect.cursor') + @patch('clickhouse_driver.connect') + def test_execute(self, _connect, _cursor, _execute): + new_engine = ClickHouseEngine(instance=self.ins1) + execute_result = new_engine.execute(self.wf) + self.assertIsInstance(execute_result, ResultSet) + + @patch('clickhouse_driver.connect.cursor.execute') + @patch('clickhouse_driver.connect.cursor') + @patch('clickhouse_driver.connect') + def test_execute_workflow_success(self, _conn, _cursor, _execute): + sql = "insert into tb_test values('test')" + row = ReviewResult(id=1, + errlevel=0, + stagestatus='Execute Successfully', + errormessage='None', + sql=sql, + affected_rows=0, + execute_time=0) + wf = SqlWorkflow.objects.create( + workflow_name='some_name', + group_id=1, + group_name='g1', + engineer_display='', + audit_auth_groups='some_group', + create_time=datetime.now() - timedelta(days=1), + status='workflow_finish', + is_backup=False, + instance=self.ins1, + db_name='some_db', + syntax_type=1 + ) + SqlWorkflowContent.objects.create(workflow=wf, sql_content=sql) + new_engine = ClickHouseEngine(instance=self.ins1) + execute_result = new_engine.execute_workflow(workflow=wf) + self.assertIsInstance(execute_result, ReviewSet) + self.assertEqual(execute_result.rows[0].__dict__.keys(), row.__dict__.keys()) diff --git a/sql/templates/sqlsubmit.html b/sql/templates/sqlsubmit.html index 7d09238a50..7dd2622d8f 100644 --- a/sql/templates/sqlsubmit.html +++ b/sql/templates/sqlsubmit.html @@ -60,6 +60,7 @@ +
@@ -621,6 +622,7 @@ $("#optgroup-pgsql").empty(); $("#optgroup-phoenix").empty(); $("#optgroup-mongo").empty(); + $("#optgroup-clickhouse").empty(); for (var i = 0; i < result.length; i++) { var instance = ""; if (result[i]['db_type'] === 'mysql') { @@ -637,6 +639,8 @@ $("#optgroup-mongo").append(instance); } else if (result[i]['db_type'] === 'phoenix') { $("#optgroup-phoenix").append(instance); + } else if (result[i]['db_type'] === 'clickhouse') { + $("#optgroup-clickhouse").append(instance); } } $('#instance_name').selectpicker('render');