Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Mailchimp: Get Member activities #3415

Merged
merged 19 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,174 @@
"sync_mode": "incremental",
"cursor_field": ["date_created"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "Email_activity",
"json_schema": {
"type": "object",
"title": "Email Activity",
"description": "A list of member's subscriber activity in a specific campaign.",
"properties": {
"emails": {
"type": "object",
"title": "List members",
"description": "An array of members that were sent the campaign.",
"properties": {
"campaign_id": {
"type": "string",
"title": "The unique id for the campaign.",
"description": "The unique id for the campaign."
},
"list_id": {
"type": "string",
"title": "The unique id for the list.",
"description": "The unique id for the list."
},
"list_is_active": {
"type": "boolean",
"title": "The status of the list used.",
"description": "The status of the list used, namely if it's deleted or disabled."
},
"email_id": {
"type": "string",
"title": "email MD5 hash.",
"description": "The MD5 hash of the lowercase version of the list member's email address."
},
"email_address": {
"type": "string",
"title": "Email address for a subscriber.",
"description": "Email address for a subscriber."
},
"activity": {
"type": "array",
"title": "Activity",
"description": "An array of objects, each showing an interaction with the email.",
"items": {
"type": "object",
"properties": {
"action": {
"type": "string",
"title": "action",
"enum": ["open", "click", "bounce"],
"description": "One of the following actions: 'open', 'click', or 'bounce'"
},
"type": {
"type": "string",
"title": "Type",
"enum": ["hard", "soft"],
"description": "If the action is a 'bounce', the type of bounce received: 'hard', 'soft'."
},
"timestamp": {
"type": "string",
"title": "Action date and time",
"description": "The date and time recorded for the action in ISO 8601 format.",
"format": "date-time"
},
"url": {
"type": "string",
"title": "The IP address.",
"description": "The IP address recorded for the action."
},
"ip": {
"type": "string",
"title": "Action ip address",
"description": "The IP address recorded for the action."
}
}
}
},
"_links": {
"type": "array",
"title": "link types",
"description": "A list of link types and descriptions for the API schema documents.",
"items": {
"type": "object",
"properties": {
"rel": {
"type": "string",
"title": "Type of link",
"description": "As with an HTML 'rel' attribute, this describes the type of link."
},
"href": {
"type": "string",
"title": "Fully-qualified URL",
"description": "This property contains a fully-qualified URL that can be called to retrieve the linked resource or perform the linked action."
},
"method": {
"type": "string",
"title": "HTTP method",
"description": "The HTTP method that should be used when accessing the URL defined in 'href'. Possible values: 'GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS', or 'HEAD'."
},
"targetSchema": {
"type": "string",
"title": "URL representing the schema for GET",
"description": "For GETs, this is a URL representing the schema that the response should conform to."
},
"schema": {
"type": "string",
"title": "URL representing the schema for POST and PUT",
"description": "For HTTP methods that can receive bodies (POST and PUT), this is a URL representing the schema that the body should conform to."
}
}
}
}
}
},
"campaign_id": {
"type": "string",
"title": "Campaign ID",
"description": "The unique id for the sent campaign."
},
"total_items": {
"type": "integer",
"title": "Total amount of items",
"description": "The total number of items matching the query regardless of pagination."
},
"_links": {
"type": "array",
"title": "link types",
"description": "A list of link types and descriptions for the API schema documents.",
"items": {
"type": "object",
"properties": {
"rel": {
"type": "string",
"title": "Type of link",
"description": "As with an HTML 'rel' attribute, this describes the type of link."
},
"href": {
"type": "string",
"title": "Fully-qualified URL",
"description": "This property contains a fully-qualified URL that can be called to retrieve the linked resource or perform the linked action."
},
"method": {
"type": "string",
"title": "HTTP method",
"description": "The HTTP method that should be used when accessing the URL defined in 'href'. Possible values: 'GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS', or 'HEAD'."
},
"targetSchema": {
"type": "string",
"title": "URL representing the schema for GET",
"description": "For GETs, this is a URL representing the schema that the response should conform to."
},
"schema": {
"type": "string",
"title": "URL representing the schema for POST and PUT",
"description": "For HTTP methods that can receive bodies (POST and PUT), this is a URL representing the schema that the body should conform to."
}
}
}
}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
"sync_mode": "incremental",
"cursor_field": [],
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"Campaigns": { "create_time": "2020-11-23T05:42:11+00:00" },
"Lists": { "date_created": "2020-09-25T04:47:31+00:00" }
"Lists": { "date_created": "2020-09-25T04:47:31+00:00" },
"Email_activity": { "create_time": "2020-11-23T05:42:11+00:00" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ class Client:
PAGINATION = 100
_CAMPAIGNS = "Campaigns"
_LISTS = "Lists"
_ENTITIES = [_CAMPAIGNS, _LISTS]
_EMAIL_ACTIVITY = "Email_activity"
_ENTITIES = [_CAMPAIGNS, _LISTS, _EMAIL_ACTIVITY]

def __init__(self, username: str, apikey: str):
self._client = MailChimp(mc_api=apikey, mc_user=username)
self.reset_campaign_ids()

def reset_campaign_ids(self):
self.campaign_ids = []

def health_check(self):
try:
Expand All @@ -63,45 +68,87 @@ def lists(self, state: DefaultDict[str, any]) -> Generator[AirbyteMessage, None,
date_created = self._get_cursor_or_none(state, stream_name, cursor_field)
default_params, max_date_created = self._get_default_params_and_cursor(cursor_field, date_created)

offset = 0
done = False
while not done:
params = dict(default_params, count=self.PAGINATION, offset=offset)
lists_response = self._client.lists.all(**params)["lists"]
for mc_list in lists_response:
list_created_at = parser.isoparse(mc_list[cursor_field])
max_date_created = max(max_date_created, list_created_at) if max_date_created else list_created_at
yield self._record(stream=self._LISTS, data=mc_list)

if max_date_created:
state[self._LISTS][cursor_field] = self._format_date_as_string(max_date_created)
yield self._state(state)

done = len(lists_response) < self.PAGINATION
offset += self.PAGINATION
generator = self.query_paginated(
state=state,
stream_name=stream_name,
params=dict(default_params, count=self.PAGINATION, offset=0),
query_subject=self._client.lists,
response_data_field="lists",
cursor_field=cursor_field,
max_cursor_field_value=max_date_created,
)
return generator

def campaigns(self, state: DefaultDict[str, any]) -> Generator[AirbyteMessage, None, None]:

self.reset_campaign_ids()
cursor_field = "create_time"
stream_name = self._CAMPAIGNS
create_time = self._get_cursor_or_none(state, stream_name, cursor_field)
default_params, max_create_time = self._get_default_params_and_cursor(cursor_field, create_time)

offset = 0
generator = self.query_paginated(
state=state,
stream_name=stream_name,
params=dict(default_params, count=self.PAGINATION, offset=0),
query_subject=self._client.campaigns,
response_data_field="campaigns",
cursor_field=cursor_field,
max_cursor_field_value=max_create_time,
store_responce_field={"where": self.campaign_ids, "field": "id"},
)
return generator

def email_activity(self, state: DefaultDict[str, any]) -> Generator[AirbyteMessage, None, None]:
if not self.campaign_ids:
_ = self.campaigns(state=state)
stream_name = self._EMAIL_ACTIVITY
for campaign_id in self.campaign_ids:
# possible TODO - use batch
params = {"campaign_id": campaign_id, "count": self.PAGINATION, "offset": 0}
generator = self.query_paginated(
state=state,
stream_name=stream_name,
params=params,
query_subject=self._client.reports.email_activity,
response_data_field="emails",
cursor_field=None,
max_cursor_field_value=None,
)

yield from generator

def query_paginated(
self,
state: DefaultDict[str, any],
stream_name: str,
params: dict,
query_subject,
response_data_field: str,
cursor_field: str = None,
max_cursor_field_value: str = None,
store_responce_field: Dict[str, str] = None,
) -> Generator[AirbyteMessage, None, None]:

done = False
while not done:
params = dict(default_params, count=self.PAGINATION, offset=offset)
campaigns_response = self._client.campaigns.all(**params)
for campaign in campaigns_response["campaigns"]:
campaign_created_at = parser.isoparse(campaign[cursor_field])
max_create_time = max(max_create_time, campaign_created_at) if max_create_time else campaign_created_at
yield self._record(stream=stream_name, data=campaign)

if max_create_time:
state[stream_name][cursor_field] = self._format_date_as_string(max_create_time)
api_response = query_subject.all(**params)
api_data = api_response[response_data_field]
for entry in api_data:
if store_responce_field:
value = entry[store_responce_field["field"]]
store_responce_field["where"].append(value)
if cursor_field:
created_at = parser.isoparse(entry[cursor_field])
max_cursor_field_value = max(max_cursor_field_value, created_at) if max_cursor_field_value else created_at
yield self._record(stream=stream_name, data=entry)

if max_cursor_field_value:
state[stream_name][cursor_field] = self._format_date_as_string(max_cursor_field_value)
yield self._state(state)

done = len(campaigns_response) < self.PAGINATION
offset += self.PAGINATION
done = len(api_data) < self.PAGINATION
params["offset"] += self.PAGINATION

@staticmethod
def _get_default_params_and_cursor(cursor_field_name: str, cursor_value: str) -> Tuple[Dict[str, str], datetime]:
Expand Down
Loading