|
1 |
| -# -*- coding: utf-8 -*- |
2 |
| -# |
3 |
| -# Licensed to the Apache Software Foundation (ASF) under one |
4 |
| -# or more contributor license agreements. See the NOTICE file |
5 |
| -# distributed with this work for additional information |
6 |
| -# regarding copyright ownership. The ASF licenses this file |
7 |
| -# to you under the Apache License, Version 2.0 (the |
8 |
| -# "License"); you may not use this file except in compliance |
9 |
| -# with the License. You may obtain a copy of the License at |
10 |
| -# |
11 |
| -# http://www.apache.org/licenses/LICENSE-2.0 |
12 |
| -# |
13 |
| -# Unless required by applicable law or agreed to in writing, |
14 |
| -# software distributed under the License is distributed on an |
15 |
| -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
16 |
| -# KIND, either express or implied. See the License for the |
17 |
| -# specific language governing permissions and limitations |
18 |
| -# under the License. |
19 |
| -from airflow.contrib.hooks.azure_cosmos_hook import AzureCosmosDBHook |
20 |
| -from airflow.sensors.base_sensor_operator import BaseSensorOperator |
21 |
| -from airflow.utils.decorators import apply_defaults |
22 |
| - |
23 |
| - |
24 |
| -class AzureCosmosDocumentSensor(BaseSensorOperator): |
25 |
| - """ |
26 |
| - Checks for the existence of a document which |
27 |
| - matches the given query in CosmosDB. Example: |
28 |
| -
|
29 |
| - >>> azure_cosmos_sensor = AzureCosmosDocumentSensor(database_name="somedatabase_name", |
30 |
| - ... collection_name="somecollection_name", |
31 |
| - ... document_id="unique-doc-id", |
32 |
| - ... azure_cosmos_conn_id="azure_cosmos_default", |
33 |
| - ... task_id="azure_cosmos_sensor") |
34 |
| - """ |
35 |
| - template_fields = ('database_name', 'collection_name', 'document_id') |
36 |
| - |
37 |
| - @apply_defaults |
38 |
| - def __init__( |
39 |
| - self, |
40 |
| - database_name, |
41 |
| - collection_name, |
42 |
| - document_id, |
43 |
| - azure_cosmos_conn_id="azure_cosmos_default", |
44 |
| - *args, |
45 |
| - **kwargs): |
46 |
| - """ |
47 |
| - Create a new AzureCosmosDocumentSensor |
48 |
| -
|
49 |
| - :param database_name: Target CosmosDB database_name. |
50 |
| - :type database_name: str |
51 |
| - :param collection_name: Target CosmosDB collection_name. |
52 |
| - :type collection_name: str |
53 |
| - :param document_id: The ID of the target document. |
54 |
| - :type query: str |
55 |
| - :param azure_cosmos_conn_id: Reference to the Azure CosmosDB connection. |
56 |
| - :type azure_cosmos_conn_id: str |
57 |
| - """ |
58 |
| - super(AzureCosmosDocumentSensor, self).__init__(*args, **kwargs) |
59 |
| - self.azure_cosmos_conn_id = azure_cosmos_conn_id |
60 |
| - self.database_name = database_name |
61 |
| - self.collection_name = collection_name |
62 |
| - self.document_id = document_id |
63 |
| - |
64 |
| - def poke(self, context): |
65 |
| - self.log.info("*** Intering poke") |
66 |
| - hook = AzureCosmosDBHook(self.azure_cosmos_conn_id) |
67 |
| - return hook.get_document(self.document_id, self.database_name, self.collection_name) is not None |
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +# |
| 3 | +# Licensed to the Apache Software Foundation (ASF) under one |
| 4 | +# or more contributor license agreements. See the NOTICE file |
| 5 | +# distributed with this work for additional information |
| 6 | +# regarding copyright ownership. The ASF licenses this file |
| 7 | +# to you under the Apache License, Version 2.0 (the |
| 8 | +# "License"); you may not use this file except in compliance |
| 9 | +# with the License. You may obtain a copy of the License at |
| 10 | +# |
| 11 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 12 | +# |
| 13 | +# Unless required by applicable law or agreed to in writing, |
| 14 | +# software distributed under the License is distributed on an |
| 15 | +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 16 | +# KIND, either express or implied. See the License for the |
| 17 | +# specific language governing permissions and limitations |
| 18 | +# under the License. |
| 19 | +from airflow.contrib.hooks.azure_cosmos_hook import AzureCosmosDBHook |
| 20 | +from airflow.sensors.base_sensor_operator import BaseSensorOperator |
| 21 | +from airflow.utils.decorators import apply_defaults |
| 22 | + |
| 23 | + |
| 24 | +class AzureCosmosDocumentSensor(BaseSensorOperator): |
| 25 | + """ |
| 26 | + Checks for the existence of a document which |
| 27 | + matches the given query in CosmosDB. Example: |
| 28 | +
|
| 29 | + >>> azure_cosmos_sensor = AzureCosmosDocumentSensor(database_name="somedatabase_name", |
| 30 | + ... collection_name="somecollection_name", |
| 31 | + ... document_id="unique-doc-id", |
| 32 | + ... azure_cosmos_conn_id="azure_cosmos_default", |
| 33 | + ... task_id="azure_cosmos_sensor") |
| 34 | + """ |
| 35 | + template_fields = ('database_name', 'collection_name', 'document_id') |
| 36 | + |
| 37 | + @apply_defaults |
| 38 | + def __init__( |
| 39 | + self, |
| 40 | + database_name, |
| 41 | + collection_name, |
| 42 | + document_id, |
| 43 | + azure_cosmos_conn_id="azure_cosmos_default", |
| 44 | + *args, |
| 45 | + **kwargs): |
| 46 | + """ |
| 47 | + Create a new AzureCosmosDocumentSensor |
| 48 | +
|
| 49 | + :param database_name: Target CosmosDB database_name. |
| 50 | + :type database_name: str |
| 51 | + :param collection_name: Target CosmosDB collection_name. |
| 52 | + :type collection_name: str |
| 53 | + :param document_id: The ID of the target document. |
| 54 | + :type query: str |
| 55 | + :param azure_cosmos_conn_id: Reference to the Azure CosmosDB connection. |
| 56 | + :type azure_cosmos_conn_id: str |
| 57 | + """ |
| 58 | + super(AzureCosmosDocumentSensor, self).__init__(*args, **kwargs) |
| 59 | + self.azure_cosmos_conn_id = azure_cosmos_conn_id |
| 60 | + self.database_name = database_name |
| 61 | + self.collection_name = collection_name |
| 62 | + self.document_id = document_id |
| 63 | + |
| 64 | + def poke(self, context): |
| 65 | + self.log.info("*** Intering poke") |
| 66 | + hook = AzureCosmosDBHook(self.azure_cosmos_conn_id) |
| 67 | + return hook.get_document(self.document_id, self.database_name, self.collection_name) is not None |
0 commit comments