Skip to content

Commit

Permalink
Renamed list method and removed _filter_dynamic_fields
Browse files Browse the repository at this point in the history
  • Loading branch information
lgomezm committed Jan 4, 2022
1 parent 6b54439 commit 42b1f42
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,8 @@ def name(self) -> str:
stream_name = stream_name[: -len("Stream")]
return stream_name

def list(self, fields) -> Iterable:
def list_records(self, fields) -> Iterable:
yield from self.read(partial(self._api.get, url=self.url))

def _filter_dynamic_fields(self, records: Iterable) -> Iterable:
"""Skip certain fields because they are too dynamic and change every call (timers, etc),
see https://github.com/airbytehq/airbyte/issues/2397
"""
for record in records:
if isinstance(record, Mapping) and "properties" in record:
for key in list(record["properties"].keys()):
if key.startswith("hs_time_in"):
record["properties"].pop(key)
yield record

@staticmethod
def _cast_value(declared_field_types: List, field_name: str, field_value: Any, declared_format: str = None) -> Any:
Expand Down Expand Up @@ -575,7 +564,7 @@ def search(
# As per their docs: `These search endpoints are rate limited to four requests per second per authentication token`.
return self._api.post(url=url, data=data, params=params)

def list(self, fields) -> Iterable:
def list_records(self, fields) -> Iterable:
params = {
"archived": str(self._include_archived_only).lower(),
"associations": self.associations,
Expand All @@ -584,7 +573,7 @@ def list(self, fields) -> Iterable:
generator = self.read(partial(self.search, url=self.url), params)
else:
generator = self.read(partial(self._api.get, url=self.url), params)
yield from self._flat_associations(self._filter_dynamic_fields(self._filter_old_records(generator)))
yield from self._flat_associations(self._filter_old_records(generator))

def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
"""Apply state filter to set of records, update cursor(state) if necessary in the end"""
Expand Down Expand Up @@ -652,7 +641,7 @@ def __init__(
if not self.entity:
raise ValueError("Entity must be set either on class or instance level")

def list(self, fields) -> Iterable:
def list_records(self, fields) -> Iterable:
params = {
"archived": str(self._include_archived_only).lower(),
"associations": self.associations,
Expand All @@ -679,7 +668,7 @@ class CampaignStream(Stream):
limit = 500
updated_at_field = "lastUpdatedTime"

def list(self, fields) -> Iterable:
def list_records(self, fields) -> Iterable:
for row in self.read(getter=partial(self._api.get, url=self.url)):
record = self._api.get(f"/email/public/v1/campaigns/{row['id']}")
yield {**row, **record}
Expand Down Expand Up @@ -718,7 +707,7 @@ def _transform(self, records: Iterable) -> Iterable:
if updated_at:
yield {"id": record.get("dealId"), "dealstage": dealstage, self.updated_at_field: updated_at}

def list(self, fields) -> Iterable:
def list_records(self, fields) -> Iterable:
params = {"propertiesWithHistory": "dealstage"}
yield from self.read(partial(self._api.get, url=self.url), params)

Expand All @@ -730,12 +719,12 @@ def __init__(self, **kwargs):
super().__init__(entity="deal", last_modified_field="hs_lastmodifieddate", **kwargs)
self._stage_history = DealStageHistoryStream(**kwargs)

def list(self, fields) -> Iterable:
def list_records(self, fields) -> Iterable:
history_by_id = {}
for record in self._stage_history.list(fields):
for record in self._stage_history.list_records(fields):
if all(field in record for field in ("id", "dealstage")):
history_by_id[record["id"]] = record["dealstage"]
for record in super().list(fields):
for record in super().list_records(fields):
if record.get("id") and int(record["id"]) in history_by_id:
record["dealstage"] = history_by_id[int(record["id"])]
yield record
Expand Down Expand Up @@ -815,7 +804,7 @@ def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator:
if self.state:
params['since'] = self._state
count = 0
for record in self._filter_dynamic_fields(self._filter_old_records(self._read(getter, params))):
for record in self._filter_old_records(self._read(getter, params)):
yield record
count += 1
cursor = record[self.updated_at_field]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(self, start_date, credentials, **kwargs):
super().__init__(**kwargs)

def _enumerate_methods(self) -> Mapping[str, Callable]:
return {name: api.list for name, api in self._apis.items()}
return {name: api.list_records for name, api in self._apis.items()}

@property
def streams(self) -> Iterator[AirbyteStream]:
Expand Down

0 comments on commit 42b1f42

Please sign in to comment.