-
Notifications
You must be signed in to change notification settings - Fork 208
/
Copy pathvertica_metadata_extractor.py
140 lines (114 loc) · 4.98 KB
/
vertica_metadata_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
from collections import namedtuple
from itertools import groupby
from typing import (
Any, Dict, Iterator, Union,
)
from pyhocon import ConfigFactory, ConfigTree
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
LOGGER = logging.getLogger(__name__)
class VerticaMetadataExtractor(Extractor):
"""
Extracts vertica table and column metadata from underlying meta store database using SQLAlchemyExtractor
V_CATALOG does not have table and column description columns
CLUSTER_KEY config parameter is used as cluster name
Not distinguishing between table & view for now
"""
# SELECT statement from vertica information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
lower(c.column_name) AS col_name,
lower(c.data_type) AS col_type,
c.ordinal_position AS col_sort_order,
{cluster_source} AS cluster,
lower(c.table_schema) AS "schema",
lower(c.table_name) AS name,
False AS is_view
FROM
V_CATALOG.COLUMNS AS c
LEFT JOIN
V_CATALOG.TABLES t
ON c.TABLE_NAME = t.TABLE_NAME
AND c.TABLE_SCHEMA = t.TABLE_SCHEMA
{where_clause_suffix}
ORDER by cluster, "schema", name, col_sort_order ;
"""
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'
# Default values
DEFAULT_CLUSTER_NAME = 'master'
DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: False}
)
def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(VerticaMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(VerticaMetadataExtractor.CLUSTER_KEY))
if conf.get_bool(VerticaMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
cluster_source = "c.table_catalog"
else:
cluster_source = "'{}'".format(self._cluster)
self._database = conf.get_string(VerticaMetadataExtractor.DATABASE_KEY, default='vertica')
self.sql_stmt = VerticaMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(VerticaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source
)
self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))
self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)
LOGGER.info('SQL for vertica metadata: {}'.format(self.sql_stmt))
self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None
def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
def get_scope(self) -> str:
return 'extractor.vertica_metadata'
def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []
for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], None,
row['col_type'], row['col_sort_order']))
yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
None,
columns,
is_view=last_row['is_view'])
def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()
def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])
return None