Skip to content

Commit 9d42127

Browse files
authored
fix: prepare api call kwargs correctly for synchronous methods when using FivetranHookAsync (#115)
* fix: always use (username,password) tuple for sync auth and aiohttp.BasicAuth for async auth This is related to issue #107 Changes: Always return kwargs["auth"] as a tuple from FivetranHook._prepare_api_call_kwargs Rename FivetranHookAsync._prepare_api_call_kwargs to FivetranHookAsync._prepare_api_call_kwargs_async
1 parent 7f3a96c commit 9d42127

File tree

2 files changed

+69
-5
lines changed

2 files changed

+69
-5
lines changed

fivetran_provider_async/hooks.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def _prepare_api_call_kwargs(self, method: str, endpoint: str, **kwargs: Any) ->
106106

107107
auth = (self.fivetran_conn.login, self.fivetran_conn.password)
108108

109-
kwargs.setdefault("auth", auth)
109+
kwargs["auth"] = auth
110110
kwargs.setdefault("headers", {})
111111

112112
kwargs["headers"].setdefault("User-Agent", self.api_user_agent + self._get_airflow_version())
@@ -634,8 +634,8 @@ class FivetranHookAsync(FivetranHook):
634634
def __init__(self, *args, **kwargs) -> None:
635635
super().__init__(*args, **kwargs)
636636

637-
def _prepare_api_call_kwargs(self, method: str, endpoint: str, **kwargs: Any) -> dict[str, Any]:
638-
kwargs = super()._prepare_api_call_kwargs(method, endpoint, **kwargs)
637+
def _prepare_api_call_kwargs_async(self, method: str, endpoint: str, **kwargs: Any) -> dict[str, Any]:
638+
kwargs = self._prepare_api_call_kwargs(method, endpoint, **kwargs)
639639
auth = kwargs.get("auth")
640640
if auth is not None and is_container(auth) and 2 <= len(auth) <= 3:
641641
kwargs["auth"] = aiohttp.BasicAuth(*auth)
@@ -666,7 +666,7 @@ async def _do_api_call_async(
666666

667667
url = f"{self.api_protocol}://{self.api_host}/{endpoint}"
668668

669-
kwargs = self._prepare_api_call_kwargs(method, endpoint, **kwargs)
669+
kwargs = self._prepare_api_call_kwargs_async(method, endpoint, **kwargs)
670670

671671
async with aiohttp.ClientSession() as session:
672672
attempt_num = 1

tests/hooks/test_fivetran.py

+65-1
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
import pendulum
66
import pytest
77
import requests_mock
8-
from aiohttp import ClientResponseError, RequestInfo
8+
from aiohttp import BasicAuth, ClientResponseError, RequestInfo
99
from airflow.exceptions import AirflowException
10+
from airflow.utils.helpers import is_container
1011

1112
from fivetran_provider_async.hooks import FivetranHook, FivetranHookAsync
1213
from tests.common.static import (
@@ -596,6 +597,34 @@ async def mock_fun(arg1, arg2, arg3, arg4):
596597
response = await hook._do_api_call_async(("POST", "v1/connectors/test"))
597598
assert response == {"status": "success"}
598599

600+
@pytest.mark.asyncio
601+
@mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession")
602+
@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection")
603+
async def test_do_api_call_async_verify_using_async_kwargs_preparation(
604+
self, mock_get_connection, mock_session
605+
):
606+
"""Tests that _do_api_call_async calls _prepare_api_call_kwargs_async"""
607+
608+
async def mock_fun(arg1, arg2, arg3, arg4):
609+
return {"status": "success"}
610+
611+
mock_session.return_value.__aexit__.return_value = mock_fun
612+
mock_session.return_value.__aenter__.return_value.request.return_value.json.return_value = {
613+
"status": "success"
614+
}
615+
616+
hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran")
617+
618+
hook.fivetran_conn = mock_get_connection
619+
hook.fivetran_conn.login = LOGIN
620+
hook.fivetran_conn.password = PASSWORD
621+
with mock.patch(
622+
"fivetran_provider_async.hooks.FivetranHookAsync._prepare_api_call_kwargs_async"
623+
) as prep_func:
624+
await hook._do_api_call_async(("POST", "v1/connectors/test"))
625+
626+
prep_func.assert_called_once_with("POST", "v1/connectors/test")
627+
599628
@pytest.mark.asyncio
600629
@mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession")
601630
@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection")
@@ -648,6 +677,24 @@ async def test_do_api_call_async_with_retryable_client_response_error(
648677

649678
assert str(exc.value) == "API requests to Fivetran failed 3 times. Giving up."
650679

680+
@pytest.mark.asyncio
681+
@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection")
682+
async def test_prepare_api_call_kwargs_async_returns_aiohttp_basicauth(self, mock_get_connection):
683+
"""Tests to verify that the 'auth' value returned from kwarg preparation is
684+
of type aiohttp.BasicAuth"""
685+
hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran")
686+
hook.fivetran_conn = mock_get_connection
687+
hook.fivetran_conn.login = LOGIN
688+
hook.fivetran_conn.password = PASSWORD
689+
690+
# Test first without passing in an auth kwarg
691+
kwargs = hook._prepare_api_call_kwargs_async("POST", "v1/connectors/test")
692+
assert isinstance(kwargs["auth"], BasicAuth)
693+
694+
# Pass in auth kwarg of a different type (using a string for the test)
695+
kwargs = hook._prepare_api_call_kwargs_async("POST", "v1/connectors/test", auth="BadAuth")
696+
assert isinstance(kwargs["auth"], BasicAuth)
697+
651698

652699
# Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`)
653700
@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@")
@@ -794,3 +841,20 @@ def test_start_fivetran_sync(self, m):
794841
)
795842
result = hook.start_fivetran_sync(connector_id="interchangeable_revenge")
796843
assert result is not None
844+
845+
def test_prepare_api_call_kwargs_always_returns_tuple(self):
846+
"""Tests to verify that given a valid fivetran_conn _prepare_api_call_kwargs always returns
847+
a username/password tuple"""
848+
hook = FivetranHook(
849+
fivetran_conn_id="conn_fivetran",
850+
)
851+
852+
# Test first without passing in an auth kwarg
853+
kwargs = hook._prepare_api_call_kwargs("POST", "v1/connectors/test")
854+
assert not isinstance(kwargs["auth"], BasicAuth)
855+
assert is_container(kwargs["auth"]) and len(kwargs["auth"]) == 2
856+
857+
# Pass in auth kwarg of a different type (using a string for the test)
858+
kwargs = hook._prepare_api_call_kwargs("POST", "v1/connectors/test", auth="BadAuth")
859+
assert not isinstance(kwargs["auth"], BasicAuth)
860+
assert is_container(kwargs["auth"]) and len(kwargs["auth"]) == 2

0 commit comments

Comments
 (0)