Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source FB Marketing: add incremental support #1699

Merged
merged 26 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing"
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://hub.docker.com/r/airbyte/source-facebook-marketing
- sourceDefinitionId: 57eb1576-8f52-463d-beb6-2e107cdf571d
name: Hubspot
Expand Down
52 changes: 39 additions & 13 deletions airbyte-integrations/bases/base-python/base_python/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
import os
import pkgutil
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Dict, Generator, List, Tuple
from typing import Any, Callable, Dict, Generator, List, Mapping, Tuple

import pkg_resources
from airbyte_protocol import AirbyteRecordMessage, AirbyteStream
from airbyte_protocol import AirbyteStream, SyncMode
from jsonschema import RefResolver


Expand Down Expand Up @@ -114,17 +113,31 @@ def get_schema(self, name: str) -> dict:
return raw_schema


class BaseClient(ABC):
class StreamStateMixin:
def get_stream_state(self, name: str) -> Any:
"""Get state of stream with corresponding name"""
raise NotImplementedError

def set_stream_state(self, name: str, state: Any):
"""Set state of stream with corresponding name"""
raise NotImplementedError

def stream_has_state(self, name: str) -> bool:
"""Tell if stream supports incremental sync"""
return False


class BaseClient(StreamStateMixin, ABC):
"""Base client for API"""

schema_loader_class = ResourceSchemaLoader

def __init__(self):
def __init__(self, **kwargs):
package_name = package_name_from_class(self.__class__)
self._schema_loader = self.schema_loader_class(package_name)
self._stream_methods = self._enumerate_methods()

def _enumerate_methods(self) -> Dict[str, callable]:
def _enumerate_methods(self) -> Mapping[str, callable]:
"""Detect available streams and return mapping"""
prefix = "stream__"
mapping = {}
Expand All @@ -139,23 +152,36 @@ def _enumerate_methods(self) -> Dict[str, callable]:
def _get_fields_from_stream(stream: AirbyteStream) -> List[str]:
return list(stream.json_schema.get("properties", {}).keys())

def read_stream(self, stream: AirbyteStream) -> Generator[AirbyteRecordMessage, None, None]:
"""Yield records from stream"""
method = self._stream_methods.get(stream.name)
def _get_stream_method(self, name: str) -> Callable:
method = self._stream_methods.get(name)
if not method:
raise ValueError(f"Client does not know how to read stream `{stream.name}`")
raise ValueError(f"Client does not know how to read stream `{name}`")
return method

def read_stream(self, stream: AirbyteStream) -> Generator[Dict[str, Any], None, None]:
"""Yield records from stream"""
method = self._get_stream_method(stream.name)
fields = self._get_fields_from_stream(stream)

for message in method(fields=fields):
now = int(datetime.now().timestamp()) * 1000
yield AirbyteRecordMessage(stream=stream.name, data=message, emitted_at=now)
yield dict(message)

@property
def streams(self) -> Generator[AirbyteStream, None, None]:
"""List of available streams"""
for name, method in self._stream_methods.items():
yield AirbyteStream(name=name, json_schema=self._schema_loader.get_schema(name))
supported_sync_modes = [SyncMode.full_refresh]
source_defined_cursor = False
if self.stream_has_state(name):
supported_sync_modes = [SyncMode.incremental]
source_defined_cursor = True

yield AirbyteStream(
name=name,
json_schema=self._schema_loader.get_schema(name),
supported_sync_modes=supported_sync_modes,
source_defined_cursor=source_defined_cursor,
)

@abstractmethod
def health_check(self) -> Tuple[bool, str]:
Expand Down
37 changes: 31 additions & 6 deletions airbyte-integrations/bases/base-python/base_python/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@
"""

import json
from typing import Dict, Generator, Type

import airbyte_protocol
from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status
from datetime import datetime
from typing import Dict, Generator, Mapping, Type

from airbyte_protocol import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
Status,
)
from airbyte_protocol import Type as MessageType

from .client import BaseClient
from .integration import Source
Expand All @@ -38,7 +47,7 @@ class BaseSource(Source):

client_class: Type[BaseClient] = None

def _get_client(self, config: json):
def _get_client(self, config: Mapping):
"""Construct client"""
client = self.client_class(**config)

Expand All @@ -65,7 +74,23 @@ def read(
client = self._get_client(config)

logger.info(f"Starting syncing {self.__class__.__name__}")
total_state = {**state}
for configured_stream in catalog.streams:
stream_name = configured_stream.stream.name

if client.stream_has_state(stream_name) and state.get(stream_name):
logger.info(f"Set state of {stream_name} stream to {state.get(stream_name)}")
client.set_stream_state(stream_name, state.get(stream_name))

logger.info(f"Syncing {stream_name} stream")
for record in client.read_stream(configured_stream.stream):
yield AirbyteMessage(type=airbyte_protocol.Type.RECORD, record=record)
now = int(datetime.now().timestamp()) * 1000
message = AirbyteRecordMessage(stream=stream_name, data=record, emitted_at=now)
yield AirbyteMessage(type=MessageType.RECORD, record=message)

if client.stream_has_state(stream_name):
total_state[stream_name] = client.get_stream_state(stream_name)
# output state object only together with other stream states
yield AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=total_state))

logger.info(f"Finished syncing {self.__class__.__name__}")
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install .

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Loading