Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.

Commit 8b283c1

Browse files
authored
Merge pull request #36 from azhard/bigquery-support
Added BigQuery support
2 parents ea94198 + ba48647 commit 8b283c1

15 files changed

+244
-23
lines changed

.circleci/config.yml

+24
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ jobs:
1010
steps:
1111
- checkout
1212

13+
- run:
14+
run: setup_creds
15+
command: |
16+
echo $BIGQUERY_SERVICE_ACCOUNT_JSON > ${HOME}/bigquery-service-key.json
17+
1318
- restore_cache:
1419
key: deps1-{{ .Branch }}
1520

@@ -78,6 +83,25 @@ jobs:
7883
dbt --warn-error run --target snowflake --full-refresh
7984
dbt --warn-error run --target snowflake
8085
86+
- run:
87+
name: "Run Tests - BigQuery"
88+
environment:
89+
BIGQUERY_SERVICE_KEY_PATH: "/home/circleci/bigquery-service-key.json"
90+
91+
command: |
92+
. venv/bin/activate
93+
echo `pwd`
94+
cd integration_tests
95+
dbt --warn-error deps --target bigquery
96+
dbt --warn-error run-operation drop_audit_schema --target bigquery
97+
dbt --warn-error run --target bigquery --full-refresh
98+
dbt --warn-error run --target bigquery
99+
100+
dbt --warn-error run-operation drop_audit_schema --target bigquery
101+
dbt --warn-error run-operation create_legacy_audit_table --target bigquery
102+
dbt --warn-error run --target bigquery --full-refresh
103+
dbt --warn-error run --target bigquery
104+
81105
- save_cache:
82106
key: deps1-{{ .Branch }}
83107
paths:

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22
target/
33
dbt_modules/
44
logs/
5+
6+
# pycharm
7+
.idea/

README.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
> this package, especially as a post-hook. Please consider if this package is
66
> appropriate for your use case before using it.
77
8-
Requires dbt >= 0.17.0
8+
Requires dbt >= 0.18.0
99

1010
This package provides out-of-the-box functionality to log events for all dbt
1111
invocations, including run start, run end, model start, and model end. It
@@ -49,9 +49,11 @@ For example to always log into a specific schema, say `analytics_meta`, regardle
4949

5050
### Adapter support
5151

52-
This package is currently compatible with dbt's Snowflake, Redshift, and
52+
This package is currently compatible with dbt's BigQuery<sup>1</sup>, Snowflake, Redshift, and
5353
Postgres integrations.
5454

55+
<sup>1</sup> BigQuery support may only work when 1 thread is set in your `profiles.yml` file. Anything larger may result in "quota exceeded" errors.
56+
5557
### Migration guide
5658

5759
#### v0.1.17 -> v0.2.0

dbt_project.yml

+5-3
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,14 @@ name: 'logging'
22
version: '0.3.0'
33
config-version: 2
44

5-
require-dbt-version: [">=0.17.0"]
5+
require-dbt-version: ">=0.18.0"
66

77
source-paths: ["models"]
88
analysis-paths: ["analysis"]
99
test-paths: ["tests"]
1010
data-paths: ["data"]
1111
macro-paths: ["macros"]
1212

13-
require-dbt-version: ">=0.16.0"
14-
1513
target-path: "target"
1614
clean-targets:
1715
- "target"
@@ -31,3 +29,7 @@ on-run-end:
3129
models:
3230
logging:
3331
+schema: meta
32+
bigquery:
33+
+enabled: '{{ target.type == "bigquery" | as_bool }}'
34+
default:
35+
+enabled: '{{ target.type != "bigquery" | as_bool }}'

integration_tests/Makefile

+13-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,19 @@ test-snowflake:
3131
dbt run --target snowflake --full-refresh
3232
dbt run --target snowflake
3333

34-
test-all: test-postgres test-redshift test-snowflake
34+
35+
test-bigquery:
36+
dbt run-operation drop_audit_schema --target bigquery
37+
dbt run --target bigquery --full-refresh
38+
dbt run --target bigquery
39+
40+
41+
dbt run-operation drop_audit_schema --target bigquery
42+
dbt run-operation create_legacy_audit_table --target bigquery
43+
dbt run --target bigquery --full-refresh
44+
dbt run --target bigquery
45+
46+
test-all: test-postgres test-redshift test-snowflake test-bigquery
3547
echo "Completed successfully"
3648

3749
test-cloud: test-redshift test-snowflake

integration_tests/ci/sample.profiles.yml

+8
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,11 @@ integration_tests:
3939
warehouse: "{{ env_var('SNOWFLAKE_TEST_WAREHOUSE') }}"
4040
schema: event_logging_integration_tests_snowflake
4141
threads: 1
42+
43+
bigquery:
44+
type: bigquery
45+
method: service-account
46+
keyfile: "{{ env_var('BIGQUERY_SERVICE_KEY_PATH') }}"
47+
project: "{{ env_var('BIGQUERY_TEST_DATABASE') }}"
48+
schema: event_logging_integration_tests_bigquery
49+
threads: 1

integration_tests/macros/create_old_audit_table.sql

+6-6
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66

77
create table if not exists {{ logging.get_audit_relation() }}
88
(
9-
event_name varchar(512),
10-
event_timestamp {{dbt_utils.type_timestamp()}},
11-
event_schema varchar(512),
12-
event_model varchar(512),
13-
invocation_id varchar(512)
9+
event_name {{ dbt_utils.type_string() }},
10+
event_timestamp {{ dbt_utils.type_timestamp() }},
11+
event_schema {{ dbt_utils.type_string() }},
12+
event_model {{ dbt_utils.type_string() }},
13+
invocation_id {{ dbt_utils.type_string() }}
1414
)
15-
{{ dbt_utils.log_info("Created legacy audit table") }}
15+
{% do dbt_utils.log_info("Created legacy audit table") %}
1616
{% endmacro %}

integration_tests/macros/drop_audit_schema.sql

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
{% if adapter.check_schema_exists(target.database, audit_schema) %}
55
{% set audit_schema_relation = api.Relation.create(database=target.database, schema=audit_schema).without_identifier() %}
66
{% do drop_schema(audit_schema_relation) %}
7-
{% do run_query("commit;") %}
7+
{% if adapter.type() != 'bigquery' %}
8+
{% do run_query("commit;") %}
9+
{% endif %}
810
{{ dbt_utils.log_info("Audit schema dropped")}}
911

1012
{% else %}

macros/audit.sql

+33-9
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,14 @@
2525

2626
{% endmacro %}
2727

28-
{% macro log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) %}
28+
29+
{% macro log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) -%}
30+
31+
{{ return(adapter.dispatch('log_audit_event', packages=['logging'])(event_name, schema, relation, user, target_name, is_full_refresh)) }}
32+
33+
{% endmacro %}
34+
35+
{% macro default__log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) %}
2936

3037
insert into {{ logging.get_audit_relation() }} (
3138
event_name,
@@ -55,21 +62,31 @@
5562

5663

5764
{% macro create_audit_schema() %}
58-
create schema if not exists {{ logging.get_audit_schema() }}
65+
{% do create_schema(api.Relation.create(
66+
database=target.database,
67+
schema=logging.get_audit_schema())
68+
) %}
5969
{% endmacro %}
6070

6171

6272
{% macro create_audit_log_table() -%}
6373

74+
{{ return(adapter.dispatch('create_audit_log_table', packages=['logging'])()) }}
75+
76+
{% endmacro %}
77+
78+
79+
{% macro default__create_audit_log_table() -%}
80+
6481
{% set required_columns = [
65-
["event_name", "varchar(512)"],
82+
["event_name", dbt_utils.type_string()],
6683
["event_timestamp", dbt_utils.type_timestamp()],
67-
["event_schema", "varchar(512)"],
68-
["event_model", "varchar(512)"],
69-
["event_user", "varchar(512)"],
70-
["event_target", "varchar(512)"],
84+
["event_schema", dbt_utils.type_string()],
85+
["event_model", dbt_utils.type_string()],
86+
["event_user", dbt_utils.type_string()],
87+
["event_target", dbt_utils.type_string()],
7188
["event_is_full_refresh", "boolean"],
72-
["invocation_id", "varchar(512)"],
89+
["invocation_id", dbt_utils.type_string()],
7390
] -%}
7491

7592
{% set audit_table = logging.get_audit_relation() -%}
@@ -125,7 +142,7 @@
125142

126143

127144
{% macro log_model_start_event() %}
128-
{{logging.log_audit_event(
145+
{{ logging.log_audit_event(
129146
'model deployment started', schema=this.schema, relation=this.name, user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH
130147
) }}
131148
{% endmacro %}
@@ -136,3 +153,10 @@
136153
'model deployment completed', schema=this.schema, relation=this.name, user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH
137154
) }}
138155
{% endmacro %}
156+
157+
158+
{% macro log_custom_event(event_name) %}
159+
{{ logging.log_audit_event(
160+
event_name, schema=this.schema, relation=this.name, user=target.user, target_name=target.name, is_full_refresh=flags.FULL_REFRESH
161+
) }}
162+
{% endmacro %}

macros/bigquery.sql

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
{% macro bigquery__log_audit_event(event_name, schema, relation, user, target_name, is_full_refresh) %}
2+
3+
insert into {{ logging.get_audit_relation() }} (
4+
event_name,
5+
event_timestamp,
6+
event_schema,
7+
event_model,
8+
event_target,
9+
event_is_full_refresh,
10+
invocation_id
11+
)
12+
13+
values (
14+
'{{ event_name }}',
15+
{{ dbt_utils.current_timestamp_in_utc() }},
16+
{% if schema != None %}'{{ schema }}'{% else %}null{% endif %},
17+
{% if relation != None %}'{{ relation }}'{% else %}null{% endif %},
18+
{% if target_name != None %}'{{ target_name }}'{% else %}null{% endif %},
19+
{% if is_full_refresh %}TRUE{% else %}FALSE{% endif %},
20+
'{{ invocation_id }}'
21+
);
22+
23+
{% endmacro %}
24+
25+
26+
{% macro bigquery__create_audit_log_table() -%}
27+
28+
{% set required_columns = [
29+
["event_name", dbt_utils.type_string()],
30+
["event_timestamp", dbt_utils.type_timestamp()],
31+
["event_schema", dbt_utils.type_string()],
32+
["event_model", dbt_utils.type_string()],
33+
["event_target", dbt_utils.type_string()],
34+
["event_is_full_refresh", "BOOLEAN"],
35+
["invocation_id", dbt_utils.type_string()],
36+
] -%}
37+
38+
{% set audit_table = logging.get_audit_relation() -%}
39+
40+
{% set audit_table_exists = adapter.get_relation(audit_table.database, audit_table.schema, audit_table.name) -%}
41+
42+
43+
{% if audit_table_exists -%}
44+
45+
{%- set columns_to_create = [] -%}
46+
47+
{# map to lower to cater for snowflake returning column names as upper case #}
48+
{%- set existing_columns = adapter.get_columns_in_relation(audit_table)|map(attribute='column')|map('lower')|list -%}
49+
50+
{%- for required_column in required_columns -%}
51+
{%- if required_column[0] not in existing_columns -%}
52+
{%- do columns_to_create.append(required_column) -%}
53+
54+
{%- endif -%}
55+
{%- endfor -%}
56+
57+
58+
{%- for column in columns_to_create -%}
59+
alter table {{ audit_table }}
60+
add column {{ column[0] }} {{ column[1] }}
61+
default null;
62+
{% endfor -%}
63+
64+
{%- else -%}
65+
create table if not exists {{ audit_table }}
66+
(
67+
{% for column in required_columns %}
68+
{{ column[0] }} {{ column[1] }}{% if not loop.last %},{% endif %}
69+
{% endfor %}
70+
)
71+
{%- endif -%}
72+
73+
{%- endmacro %}
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
with events as (
2+
3+
select * from {{ref('stg_dbt_audit_log')}}
4+
5+
),
6+
7+
aggregated as (
8+
9+
select
10+
11+
invocation_id,
12+
event_target as target,
13+
event_is_full_refresh as is_full_refresh,
14+
15+
min(case
16+
when event_name = 'run started' then event_timestamp
17+
end) as deployment_started_at,
18+
19+
min(case
20+
when event_name = 'run completed' then event_timestamp
21+
end) as deployment_completed_at,
22+
23+
count(distinct case
24+
when event_name like '%model%' then event_model
25+
end) as models_deployed
26+
27+
from events
28+
29+
{{ dbt_utils.group_by(n=3) }}
30+
31+
)
32+
33+
select * from aggregated
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
with events as (
2+
3+
select * from {{ ref('stg_dbt_audit_log') }}
4+
5+
),
6+
7+
aggregated as (
8+
9+
select
10+
11+
{{ dbt_utils.surrogate_key([
12+
'event_model',
13+
'invocation_id'
14+
]) }} as model_deployment_id,
15+
16+
invocation_id,
17+
event_model as model,
18+
event_schema as schema,
19+
event_target as target,
20+
event_is_full_refresh as is_full_refresh,
21+
22+
min(case
23+
when event_name = 'model deployment started' then event_timestamp
24+
end) as deployment_started_at,
25+
26+
min(case
27+
when event_name = 'model deployment completed' then event_timestamp
28+
end) as deployment_completed_at
29+
30+
from events
31+
32+
where event_name like '%model%'
33+
34+
{{ dbt_utils.group_by(n=6) }}
35+
36+
)
37+
38+
select * from aggregated
File renamed without changes.

packages.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
packages:
22
- package: fishtown-analytics/dbt_utils
3-
version: [">=0.4.0", "<0.7.0"]
3+
version: [">=0.6.0", "<0.7.0"]

0 commit comments

Comments
 (0)