Skip to content

Commit 15e73f1

Browse files
committed
fix unit_tests and batch yield
1 parent ca42a4e commit 15e73f1

File tree

4 files changed

+25
-129
lines changed

4 files changed

+25
-129
lines changed

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py

+4-9
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,8 @@ def __init__(self, api: 'API', jobs: Iterator[AsyncJob]):
4343
self._api = api
4444
self._jobs = jobs
4545
self._running_jobs = []
46-
self._empty = False
4746

48-
@property
49-
def done(self):
50-
return self._empty
51-
52-
def start_jobs(self):
47+
def _start_jobs(self):
5348
"""Enqueue new jobs."""
5449

5550
self._update_api_throttle_limit()
@@ -59,7 +54,7 @@ def start_jobs(self):
5954
self._get_current_throttle_value() < self.THROTTLE_LIMIT
6055
and len(self._running_jobs) < self.MAX_JOBS_IN_QUEUE
6156
):
62-
job = next(self._jobs, None)
57+
job = next(iter(self._jobs), None)
6358
if not job:
6459
self._empty = True
6560
break
@@ -80,7 +75,7 @@ def completed_jobs(self) -> Iterator[AsyncJob]:
8075
:yield: completed jobs
8176
"""
8277
if not self._running_jobs:
83-
self.start_jobs()
78+
self._start_jobs()
8479

8580
while self._running_jobs:
8681
completed_jobs = self._check_jobs_status_and_restart()
@@ -89,7 +84,7 @@ def completed_jobs(self) -> Iterator[AsyncJob]:
8984
time.sleep(self.JOB_STATUS_UPDATE_SLEEP_SECONDS)
9085
completed_jobs = self._check_jobs_status_and_restart()
9186
yield from completed_jobs
92-
self.start_jobs()
87+
self._start_jobs()
9388

9489
def _check_jobs_status_and_restart(self) -> List[AsyncJob]:
9590
""" Checks jobs status in advance and restart if some failed.

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,9 @@ def failure(response: FacebookResponse):
8686
api_batch.add_request(request, success=success, failure=failure)
8787
if len(api_batch) == self.MAX_BATCH_SIZE:
8888
self._execute_batch(api_batch)
89-
90-
yield from records
91-
api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
92-
records = []
89+
yield from records
90+
api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
91+
records = []
9392

9493
self._execute_batch(api_batch)
9594
yield from records

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def test_completed_skipped(self, failed_job, api, adreport):
169169
assert failed_job.completed
170170
assert failed_job.failed
171171

172-
def test_completed_timeout(self, job, adreport, mocker):
172+
def test_completed_timeout(self, job, adreport):
173173
job.start()
174174
adreport["async_status"] = Status.STARTED.value
175175
adreport["async_percent_completion"] = 1
@@ -178,7 +178,7 @@ def test_completed_timeout(self, job, adreport, mocker):
178178
assert not job.completed
179179
assert not job.failed
180180

181-
def test_completed_timeout_not_started(self, job, adreport, mocker):
181+
def test_completed_timeout_not_started(self, job, adreport):
182182
job.start()
183183
adreport["async_status"] = Status.STARTED.value
184184
adreport["async_percent_completion"] = 0

airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py

+16-114
Original file line numberDiff line numberDiff line change
@@ -2,126 +2,28 @@
22
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
33
#
44

5-
from unittest.mock import MagicMock, PropertyMock, patch
6-
7-
import pendulum
85
import pytest
96
from source_facebook_marketing.streams.async_job_manager import InsightAsyncJobManager
107

118

12-
@pytest.fixture(autouse=False)
13-
def logger_mock():
14-
with patch(
15-
"source_facebook_marketing.streams.async_job_manager.logger",
16-
) as log_mock:
17-
yield log_mock
18-
19-
20-
@pytest.fixture(scope="function")
21-
def job_mock():
22-
with patch("source_facebook_marketing.streams.async_job_manager.AsyncJob", PropertyMock()) as async_job_mock:
23-
async_job_mock.return_value = async_job_mock
24-
async_job_mock.failed = False
25-
yield async_job_mock
26-
27-
28-
def make_api_mock():
29-
api_mock = MagicMock()
30-
api_mock.api.ads_insights_throttle = 0.5, 0.5
31-
api_mock.api.new_batch.return_value = api_mock
32-
api_mock.execute.return_value = None
33-
return api_mock
34-
35-
36-
@pytest.mark.parametrize(
37-
"from_date,to_date",
38-
[
39-
("2020-10-10", "2021-10-10"),
40-
("2021-10-09", "2021-10-10"),
41-
],
42-
)
43-
def test_async_job_manager(job_mock, from_date, to_date):
44-
from_date, to_date = pendulum.parse(from_date), pendulum.parse(to_date)
45-
assert from_date <= to_date
46-
api_mock = make_api_mock()
47-
job_manager = InsightAsyncJobManager(
48-
api=api_mock,
49-
job_params={"breakdowns": []},
50-
from_date=from_date,
51-
to_date=to_date,
52-
)
53-
job_manager.add_async_jobs()
54-
assert not job_manager.done()
55-
jobs = []
56-
while not job_manager.done():
57-
jobs.append(job_manager.get_next_completed_job())
58-
assert len(jobs) == max((to_date - from_date).total_days(), 1)
59-
assert job_manager.done()
60-
61-
62-
@pytest.mark.skip("Now this case is failing, fix it later")
63-
def test_async_job_manager_to_date_greater_from(job_mock):
64-
from_date, to_date = pendulum.parse("2020-10-10"), pendulum.parse("2019-10-10")
65-
assert from_date > to_date
66-
api_mock = make_api_mock()
67-
job_manager = InsightAsyncJobManager(
68-
api=api_mock,
69-
job_params={"breakdowns": []},
70-
from_date=from_date,
71-
to_date=to_date,
72-
)
73-
job_manager.add_async_jobs()
74-
assert job_manager.done()
75-
9+
@pytest.fixture(name="api")
10+
def api_fixture(mocker):
11+
api = mocker.Mock()
12+
api.api.ads_insights_throttle = (0, 0)
13+
return api
7614

77-
def test_job_failed(job_mock):
78-
from_date, to_date = pendulum.parse("2019-10-10"), pendulum.parse("2019-10-10")
79-
api_mock = make_api_mock()
80-
job_manager = InsightAsyncJobManager(
81-
api=api_mock,
82-
job_params={"breakdowns": []},
83-
from_date=from_date,
84-
to_date=to_date,
85-
)
86-
job_mock.failed = True
87-
job_manager.add_async_jobs()
88-
assert not job_manager.done()
89-
with pytest.raises(Exception, match=r"^Job .* failed$"):
90-
job_manager.get_next_completed_job()
91-
assert job_mock.restart.called
9215

16+
class TestInsightAsyncManager:
17+
def test_jobs_empty(self, api):
18+
manager = InsightAsyncJobManager(api=api, jobs=[])
19+
jobs = list(manager.completed_jobs())
20+
assert not jobs
9321

94-
def test_job_failed_two_times(job_mock):
95-
from_date, to_date = pendulum.parse("2019-10-10"), pendulum.parse("2019-10-10")
96-
api_mock = make_api_mock()
97-
job_manager = InsightAsyncJobManager(
98-
api=api_mock,
99-
job_params={"breakdowns": []},
100-
from_date=from_date,
101-
to_date=to_date,
102-
)
103-
job_manager.add_async_jobs()
104-
assert not job_manager.done()
105-
type(job_mock).failed = PropertyMock(side_effect=[True, True, False])
106-
while not job_manager.done():
107-
job_manager.get_next_completed_job()
108-
assert job_mock.restart.called
109-
assert job_mock.restart.call_count == 2
22+
def test_job_restarted(self):
23+
"""TODO"""
11024

25+
def test_job_split(self):
26+
"""TODO"""
11127

112-
def test_job_wait_unitll_completed(job_mock, time_sleep_mock):
113-
from_date, to_date = pendulum.parse("2019-10-10"), pendulum.parse("2019-10-10")
114-
api_mock = make_api_mock()
115-
job_manager = InsightAsyncJobManager(
116-
api=api_mock,
117-
job_params={"breakdowns": []},
118-
from_date=from_date,
119-
to_date=to_date,
120-
)
121-
job_manager.add_async_jobs()
122-
assert not job_manager.done()
123-
type(job_mock).completed = PropertyMock(side_effect=[False, False, True])
124-
while not job_manager.done():
125-
job_manager.get_next_completed_job()
126-
assert not job_mock.restart.called
127-
time_sleep_mock.assert_called_with(30)
28+
def test_job_failed_too_many_times(self):
29+
"""TODO"""

0 commit comments

Comments
 (0)