diff --git a/airbyte-integrations/connectors/source-facebook-marketing/README.md b/airbyte-integrations/connectors/source-facebook-marketing/README.md
index 77a2b5aca43fe..947a2cd657531 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/README.md
+++ b/airbyte-integrations/connectors/source-facebook-marketing/README.md
@@ -39,7 +39,7 @@ From the Airbyte repository root, run:
 **If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/facebook-marketing)
 to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_facebook_marketing/spec.json` file.
 Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information.
-See `sample_files/sample_config.json` for a sample config file.
+See `integration_tests/sample_config.json` for a sample config file.
 
 **If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source facebook-marketing test creds`
 and place them into `secrets/config.json`.
@@ -49,7 +49,7 @@ and place them into `secrets/config.json`.
 python main.py spec
 python main.py check --config secrets/config.json
 python main.py discover --config secrets/config.json
-python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
+python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
 ```
 
 ### Locally running the connector docker image
@@ -73,7 +73,7 @@ Then run any of the connector commands as follows:
 docker run --rm airbyte/source-facebook-marketing:dev spec
 docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-facebook-marketing:dev check --config /secrets/config.json
 docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-facebook-marketing:dev discover --config /secrets/config.json
-docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/source-facebook-marketing:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json
+docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-facebook-marketing:20211222-rc1 read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
 ```
 ## Testing
    Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml b/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml
index d426101a5ef5f..7f8f75981b355 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml
+++ b/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml
@@ -13,21 +13,10 @@ tests:
     - config_path: "secrets/config.json"
   basic_read:
     - config_path: "secrets/config.json"
-      configured_catalog_path: "integration_tests/configured_catalog.json"
-      timeout_seconds: 600
+      empty_streams: ["videos"]
   incremental:
     - config_path: "secrets/config.json"
       configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"
       future_state_path: "integration_tests/future_state.json"
   full_refresh:
     - config_path: "secrets/config.json"
-      configured_catalog_path: "integration_tests/configured_catalog.json"
-      # Ad Insights API has estimated metrics in response, which is calculated based on another metrics.
-      # Sometimes API doesn't return estimated metrics. E.g, cost_per_estimated_ad_recallers is calculated
-      # as total amount spent divided by estimated ad recall lift rate. When second metric is equal to zero
-      # API may or may not return value. Such behavior causes sequential reads test failing.
-      # Because one read response contains this metric, and other doesn't.
-      # Therefore, it's needed to ignore fields like this in API responses.
-      ignored_fields:
-        "ads_insights_age_and_gender": ["cost_per_estimated_ad_recallers"]
-        "ad_creatives": ["thumbnail_url"]
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json
index 67dc36d5d8071..b05433a47d649 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json
+++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json
@@ -64,31 +64,29 @@
       "stream": {
         "name": "ads_insights",
         "json_schema": {},
+        "default_cursor_field": ["date_start"],
         "supported_sync_modes": ["full_refresh", "incremental"],
         "source_defined_cursor": true,
-        "default_cursor_field": ["date_start"],
-        "source_defined_primary_key": null,
-        "namespace": null
+        "source_defined_primary_key": []
       },
       "sync_mode": "incremental",
+      "primary_key": [],
       "cursor_field": ["date_start"],
-      "destination_sync_mode": "append",
-      "primary_key": null
+      "destination_sync_mode": "append"
     },
     {
       "stream": {
-        "name": "ads_insights_age_and_gender",
+        "name": "ads_insights_platform_and_device",
         "json_schema": {},
+        "default_cursor_field": ["date_start"],
         "supported_sync_modes": ["full_refresh", "incremental"],
         "source_defined_cursor": true,
-        "default_cursor_field": ["date_start"],
-        "source_defined_primary_key": null,
-        "namespace": null
+        "source_defined_primary_key": []
       },
       "sync_mode": "incremental",
+      "primary_key": [],
       "cursor_field": ["date_start"],
-      "destination_sync_mode": "append",
-      "primary_key": null
+      "destination_sync_mode": "append"
     }
   ]
 }
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog_without_insights.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog_without_insights.json
index fee4405518324..31324e26d5bc3 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog_without_insights.json
+++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog_without_insights.json
@@ -7,8 +7,7 @@
         "supported_sync_modes": ["full_refresh", "incremental"],
         "source_defined_cursor": true,
         "default_cursor_field": ["updated_time"],
-        "source_defined_primary_key": [["id"]],
-        "namespace": null
+        "source_defined_primary_key": [["id"]]
       },
       "sync_mode": "incremental",
       "cursor_field": null,
@@ -22,13 +21,11 @@
         "supported_sync_modes": ["full_refresh", "incremental"],
         "source_defined_cursor": true,
         "default_cursor_field": ["updated_time"],
-        "source_defined_primary_key": [["id"]],
-        "namespace": null
+        "source_defined_primary_key": [["id"]]
       },
       "sync_mode": "incremental",
       "cursor_field": null,
-      "destination_sync_mode": "append",
-      "primary_key": null
+      "destination_sync_mode": "append"
     },
     {
       "stream": {
@@ -37,8 +34,7 @@
         "supported_sync_modes": ["full_refresh", "incremental"],
         "source_defined_cursor": true,
         "default_cursor_field": ["updated_time"],
-        "source_defined_primary_key": [["id"]],
-        "namespace": null
+        "source_defined_primary_key": [["id"]]
       },
       "sync_mode": "incremental",
       "cursor_field": null,
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py
index df9639fab612b..8bcb821e03238 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py
@@ -26,4 +26,7 @@ def config_with_wrong_account_fixture(config):
 
 @pytest.fixture(scope="session", name="config_with_include_deleted")
 def config_with_include_deleted_fixture(config):
-    return {**config, "include_deleted": True}
+    new_config = {**config, "include_deleted": True}
+    new_config.pop("_limit", None)
+    new_config.pop("end_date", None)
+    return new_config
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json
index 1f4e623f63685..5debd07b2087f 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json
+++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json
@@ -5,20 +5,10 @@
     "title": "Source Facebook Marketing",
     "type": "object",
     "properties": {
-      "account_id": {
-        "title": "Account Id",
-        "description": "The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.",
-        "type": "string"
-      },
-      "access_token": {
-        "title": "Access Token",
-        "description": "The value of the access token generated. See the <a href=\"https://docs.airbyte.io/integrations/sources/facebook-marketing\">docs</a> for more information",
-        "airbyte_secret": true,
-        "type": "string"
-      },
       "start_date": {
         "title": "Start Date",
         "description": "The date from which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.",
+        "order": 0,
         "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
         "examples": ["2017-01-25T00:00:00Z"],
         "type": "string",
@@ -27,42 +17,44 @@
       "end_date": {
         "title": "End Date",
         "description": "The date until which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated between start_date and this date will be replicated. Not setting this option will result in always syncing the latest data.",
+        "order": 1,
         "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
         "examples": ["2017-01-26T00:00:00Z"],
         "type": "string",
         "format": "date-time"
       },
-      "fetch_thumbnail_images": {
-        "title": "Fetch Thumbnail Images",
-        "description": "In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url",
-        "default": false,
-        "type": "boolean"
+      "account_id": {
+        "title": "Account ID",
+        "description": "The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.",
+        "order": 2,
+        "examples": ["111111111111111"],
+        "type": "string"
+      },
+      "access_token": {
+        "title": "Access Token",
+        "description": "The value of the access token generated. See the <a href=\"https://docs.airbyte.io/integrations/sources/facebook-marketing\">docs</a> for more information",
+        "order": 3,
+        "airbyte_secret": true,
+        "type": "string"
       },
       "include_deleted": {
         "title": "Include Deleted",
-        "description": "Include data from deleted campaigns, ads, and adsets",
+        "description": "Include data from deleted Campaigns, Ads, and AdSets",
         "default": false,
+        "order": 4,
         "type": "boolean"
       },
-      "insights_lookback_window": {
-        "title": "Insights Lookback Window",
-        "description": "The attribution window for the actions",
-        "default": 28,
-        "minimum": 0,
-        "maximum": 28,
-        "type": "integer"
-      },
-      "insights_days_per_job": {
-        "title": "Insights Days Per Job",
-        "description": "Number of days to sync in one job (the more data you have, the smaller this parameter should be)",
-        "default": 7,
-        "minimum": 1,
-        "maximum": 30,
-        "type": "integer"
+      "fetch_thumbnail_images": {
+        "title": "Fetch Thumbnail Images",
+        "description": "In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url",
+        "default": false,
+        "order": 5,
+        "type": "boolean"
       },
       "custom_insights": {
         "title": "Custom Insights",
-        "description": "A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)",
+        "description": "A list which contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)",
+        "order": 6,
         "type": "array",
         "items": {
           "title": "InsightConfig",
@@ -105,48 +97,7 @@
         }
       }
     },
-    "required": ["account_id", "access_token", "start_date"],
-    "definitions": {
-      "InsightConfig": {
-        "title": "InsightConfig",
-        "type": "object",
-        "properties": {
-          "name": {
-            "title": "Name",
-            "description": "The name value of insight",
-            "type": "string"
-          },
-          "fields": {
-            "title": "Fields",
-            "description": "A list of chosen fields for fields parameter",
-            "default": [],
-            "type": "array",
-            "items": {
-              "type": "string"
-            }
-          },
-          "breakdowns": {
-            "title": "Breakdowns",
-            "description": "A list of chosen breakdowns for breakdowns",
-            "default": [],
-            "type": "array",
-            "items": {
-              "type": "string"
-            }
-          },
-          "action_breakdowns": {
-            "title": "Action Breakdowns",
-            "description": "A list of chosen action_breakdowns for action_breakdowns",
-            "default": [],
-            "type": "array",
-            "items": {
-              "type": "string"
-            }
-          }
-        },
-        "required": ["name"]
-      }
-    }
+    "required": ["start_date", "account_id", "access_token"]
   },
   "supportsIncremental": true,
   "supported_destination_sync_modes": ["append"],
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py
index c363a4adf9593..ef869e5eec897 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py
@@ -4,10 +4,10 @@
 
 
 import copy
+import logging
 from typing import Any, List, MutableMapping, Set, Tuple
 
 import pytest
-from airbyte_cdk import AirbyteLogger
 from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type
 from source_facebook_marketing.source import SourceFacebookMarketing
 
@@ -52,7 +52,7 @@ def test_streams_with_include_deleted(self, stream_name, deleted_id, config_with
         assert deleted_records, f"{stream_name} stream should have deleted records returned"
         assert is_specific_deleted_pulled, f"{stream_name} stream should have a deleted record with id={deleted_id}"
 
-    @pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 1), ("ad_sets", 1)])
+    @pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 3), ("ad_sets", 1)])
     def test_streams_with_include_deleted_and_state(self, stream_name, deleted_num, config_with_include_deleted, configured_catalog, state):
         """Should ignore state because of include_deleted enabled"""
         catalog = self.slice_catalog(configured_catalog, {stream_name})
@@ -92,7 +92,7 @@ def slice_catalog(catalog: ConfiguredAirbyteCatalog, streams: Set[str]) -> Confi
     def _read_records(conf, catalog, state=None) -> Tuple[List[AirbyteMessage], List[AirbyteMessage]]:
         records = []
         states = []
-        for message in SourceFacebookMarketing().read(AirbyteLogger(), conf, catalog, state=state):
+        for message in SourceFacebookMarketing().read(logging.getLogger("airbyte"), conf, catalog, state=state):
             if message.type == Type.RECORD:
                 records.append(message)
             elif message.type == Type.STATE:
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/setup.py b/airbyte-integrations/connectors/source-facebook-marketing/setup.py
index 384ef83be414d..1d2b4267bb387 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/setup.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/setup.py
@@ -6,9 +6,9 @@
 from setuptools import find_packages, setup
 
 MAIN_REQUIREMENTS = [
-    "airbyte-cdk~=0.1.35",
-    "cached_property~=1.5",
-    "facebook_business~=12.0",
+    "airbyte-cdk==0.1.47",
+    "cached_property==1.5.2",
+    "facebook_business==12.0.1",
     "pendulum>=2,<3",
 ]
 
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py
index 9ee3d73c4f8af..1f9433475e049 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py
@@ -5,24 +5,41 @@
 import json
 import logging
 from time import sleep
+from typing import Tuple
 
+import backoff
 import pendulum
 from cached_property import cached_property
 from facebook_business import FacebookAdsApi
 from facebook_business.adobjects import user as fb_user
 from facebook_business.adobjects.adaccount import AdAccount
+from facebook_business.api import FacebookResponse
 from facebook_business.exceptions import FacebookRequestError
-from source_facebook_marketing.common import FacebookAPIException
+from source_facebook_marketing.streams.common import retry_pattern
 
 logger = logging.getLogger("airbyte")
 
 
+class FacebookAPIException(Exception):
+    """General class for all API errors"""
+
+
+backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
+
+
 class MyFacebookAdsApi(FacebookAdsApi):
     """Custom Facebook API class to intercept all API calls and handle call rate limits"""
 
-    call_rate_threshold = 90  # maximum percentage of call limit utilization
+    call_rate_threshold = 95  # maximum percentage of call limit utilization
     pause_interval_minimum = pendulum.duration(minutes=1)  # default pause interval if reached or close to call rate limit
 
+    # Insights async jobs throttle, from 1 to 100
+    _ads_insights_throttle: Tuple[float, float]
+
+    @property
+    def ads_insights_throttle(self) -> Tuple[float, float]:
+        return self._ads_insights_throttle
+
     @staticmethod
     def parse_call_rate_header(headers):
         usage = 0
@@ -86,6 +103,19 @@ def handle_call_rate_limit(self, response, params):
                 logger.warning(f"Utilization is too high ({usage})%, pausing for {pause_interval}")
                 sleep(pause_interval.total_seconds())
 
+    def _update_insights_throttle_limit(self, response: FacebookResponse):
+        """
+        For /insights call every response contains x-fb-ads-insights-throttle
+        header representing current throttle limit parameter for async insights
+        jobs for current app/account.  We need this information to adjust
+        number of running async jobs for optimal performance.
+        """
+        ads_insights_throttle = response.headers().get("x-fb-ads-insights-throttle")
+        if ads_insights_throttle:
+            ads_insights_throttle = json.loads(ads_insights_throttle)
+            self._ads_insights_throttle = ads_insights_throttle.get("app_id_util_pct", 0), ads_insights_throttle.get("acc_id_util_pct", 0)
+
+    @backoff_policy
     def call(
         self,
         method,
@@ -98,6 +128,7 @@ def call(
     ):
         """Makes an API call, delegate actual work to parent class and handles call rates"""
         response = super().call(method, path, params, headers, files, url_override, api_version)
+        self._update_insights_throttle_limit(response)
         self.handle_call_rate_limit(response, params)
         return response
 
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/async_job.py
deleted file mode 100644
index 5bb217533da3b..0000000000000
--- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/async_job.py
+++ /dev/null
@@ -1,149 +0,0 @@
-#
-# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
-#
-
-import logging
-from enum import Enum
-from typing import Any, Mapping
-
-import backoff
-import pendulum
-from facebook_business.exceptions import FacebookRequestError
-from source_facebook_marketing.api import API
-
-from .common import JobException, JobTimeoutException, retry_pattern
-
-backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
-logger = logging.getLogger("airbyte")
-
-
-class Status(Enum):
-    """Async job statuses"""
-
-    COMPLETED = "Job Completed"
-    FAILED = "Job Failed"
-    SKIPPED = "Job Skipped"
-    STARTED = "Job Started"
-
-
-class AsyncJob:
-    """AsyncJob wraps FB AdReport class and provides interface to restart/retry the async job"""
-
-    MAX_WAIT_TO_START = pendulum.duration(minutes=5)
-    MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30)
-
-    def __init__(self, api: API, params: Mapping[str, Any]):
-        """Initialize
-
-        :param api: Facebook Api wrapper
-        :param params: job params, required to start/restart job
-        """
-        self._params = params
-        self._api = api
-        self._job = None
-        self._start_time = None
-        self._finish_time = None
-        self._failed = False
-
-    @backoff_policy
-    def start(self):
-        """Start remote job"""
-        if self._job:
-            raise RuntimeError(f"{self}: Incorrect usage of start - the job already started, use restart instead")
-
-        self._job = self._api.account.get_insights(params=self._params, is_async=True)
-        self._start_time = pendulum.now()
-        job_id = self._job["report_run_id"]
-        time_range = self._params["time_range"]
-        breakdowns = self._params["breakdowns"]
-        logger.info(f"Created AdReportRun: {job_id} to sync insights {time_range} with breakdown {breakdowns}")
-
-    def restart(self):
-        """Restart failed job"""
-        if not self._job or not self.failed:
-            raise RuntimeError(f"{self}: Incorrect usage of restart - only failed jobs can be restarted")
-
-        self._job = None
-        self._failed = False
-        self._start_time = None
-        self._finish_time = None
-        self.start()
-        logger.info(f"{self}: restarted")
-
-    @property
-    def elapsed_time(self):
-        """Elapsed time since the job start"""
-        if not self._start_time:
-            return None
-
-        end_time = self._finish_time or pendulum.now()
-        return end_time - self._start_time
-
-    @property
-    def completed(self) -> bool:
-        """Check job status and return True if it is completed successfully
-
-        :return: True if completed successfully, False - if task still running
-        :raises: JobException in case job failed to start, failed or timed out
-        """
-        try:
-            return self._check_status()
-        except JobException:
-            self._failed = True
-            raise
-
-    @property
-    def failed(self) -> bool:
-        """Tell if the job previously failed"""
-        return self._failed
-
-    @backoff_policy
-    def _update_job(self):
-        if not self._job:
-            raise RuntimeError(f"{self}: Incorrect usage of the method - the job is not started")
-        self._job = self._job.api_get()
-
-    def _check_status(self) -> bool:
-        """Perform status check
-
-        :return: True if the job is completed, False - if the job is still running
-        :raises: errors if job failed or timed out
-        """
-        self._update_job()
-        job_progress_pct = self._job["async_percent_completion"]
-        logger.info(f"{self} is {job_progress_pct}% complete ({self._job['async_status']})")
-        runtime = self.elapsed_time
-
-        if self._job["async_status"] == Status.COMPLETED.value:
-            self._finish_time = pendulum.now()
-            return True
-        elif self._job["async_status"] == Status.FAILED.value:
-            raise JobException(f"{self._job} failed after {runtime.in_seconds()} seconds.")
-        elif self._job["async_status"] == Status.SKIPPED.value:
-            raise JobException(f"{self._job} skipped after {runtime.in_seconds()} seconds.")
-
-        if runtime > self.MAX_WAIT_TO_START and self._job["async_percent_completion"] == 0:
-            raise JobTimeoutException(
-                f"{self._job} did not start after {runtime.in_seconds()} seconds."
-                f" This is an intermittent error which may be fixed by retrying the job. Aborting."
-            )
-        elif runtime > self.MAX_WAIT_TO_FINISH:
-            raise JobTimeoutException(
-                f"{self._job} did not finish after {runtime.in_seconds()} seconds."
-                f" This is an intermittent error which may be fixed by retrying the job. Aborting."
-            )
-        return False
-
-    @backoff_policy
-    def get_result(self) -> Any:
-        """Retrieve result of the finished job."""
-        if not self._job or self.failed:
-            raise RuntimeError(f"{self}: Incorrect usage of get_result - the job is not started of failed")
-        return self._job.get_result()
-
-    def __str__(self) -> str:
-        """String representation of the job wrapper."""
-        job_id = self._job["report_run_id"] if self._job else "<None>"
-        time_range = self._params["time_range"]
-        breakdowns = self._params["breakdowns"]
-        return f"AdReportRun(id={job_id}, time_range={time_range}, breakdowns={breakdowns}"
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py
index 72f1328a7151b..645af8a88c9be 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py
@@ -3,26 +3,23 @@
 #
 
 import logging
-from datetime import datetime
-from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Type
+from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple, Type
 
-import pendulum
 from airbyte_cdk.logger import AirbyteLogger
 from airbyte_cdk.models import (
-    AirbyteConnectionStatus,
+    AirbyteMessage,
     AuthSpecification,
+    ConfiguredAirbyteStream,
     ConnectorSpecification,
     DestinationSyncMode,
     OAuth2Specification,
-    Status,
+    SyncMode,
 )
 from airbyte_cdk.sources import AbstractSource
 from airbyte_cdk.sources.streams import Stream
-from airbyte_cdk.sources.streams.core import package_name_from_class
-from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
-from jsonschema import RefResolver
-from pydantic import BaseModel, Field
+from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
 from source_facebook_marketing.api import API
+from source_facebook_marketing.spec import ConnectorConfig
 from source_facebook_marketing.streams import (
     AdCreatives,
     Ads,
@@ -40,101 +37,36 @@
 
 logger = logging.getLogger("airbyte")
 
-
-class InsightConfig(BaseModel):
-
-    name: str = Field(description="The name value of insight")
-
-    fields: Optional[List[str]] = Field(description="A list of chosen fields for fields parameter", default=[])
-
-    breakdowns: Optional[List[str]] = Field(description="A list of chosen breakdowns for breakdowns", default=[])
-
-    action_breakdowns: Optional[List[str]] = Field(description="A list of chosen action_breakdowns for action_breakdowns", default=[])
-
-
-class ConnectorConfig(BaseModel):
-    class Config:
-        title = "Source Facebook Marketing"
-
-    account_id: str = Field(description="The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.")
-
-    access_token: str = Field(
-        description='The value of the access token generated. See the <a href="https://docs.airbyte.io/integrations/sources/facebook-marketing">docs</a> for more information',
-        airbyte_secret=True,
-    )
-
-    start_date: datetime = Field(
-        description="The date from which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.",
-        pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
-        examples=["2017-01-25T00:00:00Z"],
-    )
-
-    end_date: Optional[datetime] = Field(
-        description="The date until which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated between start_date and this date will be replicated. Not setting this option will result in always syncing the latest data.",
-        pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
-        examples=["2017-01-26T00:00:00Z"],
-        default_factory=pendulum.now,
-    )
-
-    fetch_thumbnail_images: bool = Field(
-        default=False, description="In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url"
-    )
-
-    include_deleted: bool = Field(default=False, description="Include data from deleted campaigns, ads, and adsets")
-
-    insights_lookback_window: int = Field(
-        default=28,
-        description="The attribution window for the actions",
-        minimum=0,
-        maximum=28,
-    )
-
-    insights_days_per_job: int = Field(
-        default=7,
-        description="Number of days to sync in one job (the more data you have, the smaller this parameter should be)",
-        minimum=1,
-        maximum=30,
-    )
-    custom_insights: Optional[List[InsightConfig]] = Field(
-        description="A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)"
-    )
+DOCS_URL = "https://docs.airbyte.io/integrations/sources/facebook-marketing"
 
 
 class SourceFacebookMarketing(AbstractSource):
-    def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
+    def check_connection(self, _logger: "logging.Logger", config: Mapping[str, Any]) -> Tuple[bool, Any]:
         """Connection check to validate that the user-provided config can be used to connect to the underlying API
 
         :param config:  the user-input config object conforming to the connector's spec.json
-        :param logger:  logger object
+        :param _logger:  logger object
         :return Tuple[bool, Any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
         """
-        ok = False
-        error_msg = None
-
-        try:
-            config = ConnectorConfig.parse_obj(config)  # FIXME: this will be not need after we fix CDK
-            api = API(account_id=config.account_id, access_token=config.access_token)
-            logger.info(f"Select account {api.account}")
-            ok = True
-        except Exception as exc:
-            error_msg = repr(exc)
+        config = ConnectorConfig.parse_obj(config)
+        api = API(account_id=config.account_id, access_token=config.access_token)
+        logger.info(f"Select account {api.account}")
 
-        return ok, error_msg
+        return True, None
 
     def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
         """Discovery method, returns available streams
 
         :param config: A Mapping of the user input configuration as defined in the connector spec.
+        :return: list of the stream instances
         """
-        config: ConnectorConfig = ConnectorConfig.parse_obj(config)  # FIXME: this will be not need after we fix CDK
+        config: ConnectorConfig = ConnectorConfig.parse_obj(config)
         api = API(account_id=config.account_id, access_token=config.access_token)
 
         insights_args = dict(
             api=api,
             start_date=config.start_date,
             end_date=config.end_date,
-            buffer_days=config.insights_lookback_window,
-            days_per_job=config.insights_days_per_job,
         )
 
         streams = [
@@ -154,30 +86,17 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
 
         return self._update_insights_streams(insights=config.custom_insights, args=insights_args, streams=streams)
 
-    def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
-        """Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
-        try:
-            check_succeeded, error = self.check_connection(logger, config)
-            if not check_succeeded:
-                return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
-        except Exception as e:
-            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(e))
-
-        self._check_custom_insights_entries(config.get("custom_insights", []))
-
-        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
-
     def spec(self, *args, **kwargs) -> ConnectorSpecification:
-        """
-        Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
-        required to run this integration.
+        """Returns the spec for this integration.
+        The spec is a JSON-Schema object describing the required configurations
+        (e.g: username and password) required to run this integration.
         """
         return ConnectorSpecification(
             documentationUrl="https://docs.airbyte.io/integrations/sources/facebook-marketing",
             changelogUrl="https://docs.airbyte.io/integrations/sources/facebook-marketing",
             supportsIncremental=True,
             supported_destination_sync_modes=[DestinationSyncMode.append],
-            connectionSpecification=expand_local_ref(ConnectorConfig.schema()),
+            connectionSpecification=ConnectorConfig.schema(),
             authSpecification=AuthSpecification(
                 auth_type="oauth2.0",
                 oauth2Specification=OAuth2Specification(
@@ -189,7 +108,6 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification:
     def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream]]:
         """Update method, if insights have values returns streams replacing the
         default insights streams else returns streams
-
         """
         if not insights:
             return streams
@@ -206,52 +124,64 @@ def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream]
 
         return streams + insights_custom_streams
 
-    def _check_custom_insights_entries(self, insights: List[Mapping[str, Any]]):
-
-        loader = ResourceSchemaLoader(package_name_from_class(self.__class__))
-        default_fields = list(loader.get_schema("ads_insights").get("properties", {}).keys())
-        default_breakdowns = list(loader.get_schema("ads_insights_breakdowns").get("properties", {}).keys())
-        default_action_breakdowns = list(loader.get_schema("ads_insights_action_breakdowns").get("properties", {}).keys())
-
-        for insight in insights:
-            if insight.get("fields"):
-                value_checked, value = self._check_values(default_fields, insight.get("fields"))
-                if not value_checked:
-                    message = f"{value} is not a valid field name"
-                    raise Exception("Config validation error: " + message) from None
-            if insight.get("breakdowns"):
-                value_checked, value = self._check_values(default_breakdowns, insight.get("breakdowns"))
-                if not value_checked:
-                    message = f"{value} is not a valid breakdown name"
-                    raise Exception("Config validation error: " + message) from None
-            if insight.get("action_breakdowns"):
-                value_checked, value = self._check_values(default_action_breakdowns, insight.get("action_breakdowns"))
-                if not value_checked:
-                    message = f"{value} is not a valid action_breakdown name"
-                    raise Exception("Config validation error: " + message) from None
-
-        return True
-
-    def _check_values(self, default_value: List[str], custom_value: List[str]) -> Tuple[bool, Any]:
-        for e in custom_value:
-            if e not in default_value:
-                logger.error(f"{e} does not appear in {default_value}")
-                return False, e
-
-        return True, None
-
-
-def expand_local_ref(schema, resolver=None, **kwargs):
-    resolver = resolver or RefResolver("", schema)
-    if isinstance(schema, MutableMapping):
-        if "$ref" in schema:
-            ref_url = schema.pop("$ref")
-            url, resolved_schema = resolver.resolve(ref_url)
-            schema.update(resolved_schema)
-        for key, value in schema.items():
-            schema[key] = expand_local_ref(value, resolver=resolver)
-        return schema
-    elif isinstance(schema, List):
-        return [expand_local_ref(item, resolver=resolver) for item in schema]
-
-    return schema
+    def _read_incremental(
+        self,
+        _logger: AirbyteLogger,
+        stream_instance: Stream,
+        configured_stream: ConfiguredAirbyteStream,
+        connector_state: MutableMapping[str, Any],
+        internal_config: InternalConfig,
+    ) -> Iterator[AirbyteMessage]:
+        """We override this method because we need to inject new state handling.
+        Old way:
+            pass stream_state in read_records and other methods
+            call stream_state = stream_instance.get_updated_state(stream_state, record_data) for each record
+        New way:
+            stream_instance.state = stream_state
+            call stream_instance.state when we want to dump state message
+
+        :param _logger:
+        :param stream_instance:
+        :param configured_stream:
+        :param connector_state:
+        :param internal_config:
+        :return:
+        """
+        if not hasattr(stream_instance, "state"):
+            yield from super()._read_incremental(logger, stream_instance, configured_stream, connector_state, internal_config)
+            return
+
+        stream_name = configured_stream.stream.name
+        stream_state = connector_state.get(stream_name, {})
+        if stream_state:
+            logger.info(f"Setting state of {stream_name} stream to {stream_state}")
+            stream_instance.state = stream_state
+
+        slices = stream_instance.stream_slices(
+            cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state
+        )
+        total_records_counter = 0
+        for _slice in slices:
+            records = stream_instance.read_records(
+                sync_mode=SyncMode.incremental,
+                stream_slice=_slice,
+                stream_state=stream_state,
+                cursor_field=configured_stream.cursor_field or None,
+            )
+            for record_counter, record_data in enumerate(records, start=1):
+                yield self._as_airbyte_record(stream_name, record_data)
+                checkpoint_interval = stream_instance.state_checkpoint_interval
+                if checkpoint_interval and record_counter % checkpoint_interval == 0:
+                    yield self._checkpoint_state(stream_name, stream_instance.state, connector_state, logger)
+
+                total_records_counter += 1
+                # This functionality should ideally live outside of this method
+                # but since state is managed inside this method, we keep track
+                # of it here.
+                if self._limit_reached(internal_config, total_records_counter):
+                    # Break from slice loop to save state and exit from _read_incremental function.
+                    break
+
+            yield self._checkpoint_state(stream_name, stream_instance.state, connector_state, logger)
+            if self._limit_reached(internal_config, total_records_counter):
+                return
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py
new file mode 100644
index 0000000000000..2edb95f2b6af7
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py
@@ -0,0 +1,98 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+
+import logging
+from datetime import datetime
+from enum import Enum
+from typing import List, Optional
+
+import pendulum
+from airbyte_cdk.sources.config import BaseConfig
+from facebook_business.adobjects.adsinsights import AdsInsights
+from pydantic import BaseModel, Field
+
+logger = logging.getLogger("airbyte")
+
+
+ValidFields = Enum('ValidEnums', AdsInsights.Field.__dict__)
+ValidBreakdowns = Enum('ValidBreakdowns', AdsInsights.Breakdowns.__dict__)
+ValidActionBreakdowns = Enum('ValidActionBreakdowns', AdsInsights.ActionBreakdowns.__dict__)
+
+class InsightConfig(BaseModel):
+    """Config for custom insights"""
+
+    name: str = Field(description="The name value of insight")
+    fields: Optional[List[ValidFields]] = Field(description="A list of chosen fields for fields parameter", default=[])
+    breakdowns: Optional[List[ValidBreakdowns]] = Field(description="A list of chosen breakdowns for breakdowns", default=[])
+    action_breakdowns: Optional[List[ValidActionBreakdowns]] = Field(description="A list of chosen action_breakdowns for action_breakdowns", default=[])
+
+
+class ConnectorConfig(BaseConfig):
+    """Connector config"""
+
+    class Config:
+        title = "Source Facebook Marketing"
+
+    start_date: datetime = Field(
+        title="Start Date",
+        order=0,
+        description=(
+            "The date from which you'd like to replicate data for all incremental streams, "
+            "in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated."
+        ),
+        pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
+        examples=["2017-01-25T00:00:00Z"],
+    )
+
+    end_date: Optional[datetime] = Field(
+        title="End Date",
+        order=1,
+        description=(
+            "The date until which you'd like to replicate data for all incremental streams, in the format YYYY-MM-DDT00:00:00Z. "
+            "All data generated between start_date and this date will be replicated. "
+            "Not setting this option will result in always syncing the latest data."
+        ),
+        pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
+        examples=["2017-01-26T00:00:00Z"],
+        default_factory=pendulum.now,
+    )
+
+    account_id: str = Field(
+        title="Account ID",
+        order=2,
+        description="The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.",
+        examples=["111111111111111"],
+    )
+
+    access_token: str = Field(
+        title="Access Token",
+        order=3,
+        description=(
+            'The value of the access token generated. '
+            'See the <a href="https://docs.airbyte.io/integrations/sources/facebook-marketing">docs</a> for more information'
+        ),
+        airbyte_secret=True,
+    )
+
+    include_deleted: bool = Field(
+        title="Include Deleted",
+        order=4,
+        default=False,
+        description="Include data from deleted Campaigns, Ads, and AdSets",
+    )
+
+    fetch_thumbnail_images: bool = Field(
+        title="Fetch Thumbnail Images",
+        order=5,
+        default=False,
+        description="In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url",
+    )
+
+    custom_insights: Optional[List[InsightConfig]] = Field(
+        title="Custom Insights",
+        order=6,
+        description=(
+            "A list which contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)"
+        ),
+    )
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py
deleted file mode 100644
index fe0013a8966ea..0000000000000
--- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py
+++ /dev/null
@@ -1,490 +0,0 @@
-#
-# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
-#
-
-import base64
-import time
-import urllib.parse as urlparse
-from abc import ABC
-from collections import deque
-from datetime import datetime
-from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence
-
-import airbyte_cdk.sources.utils.casing as casing
-import backoff
-import pendulum
-import requests
-from airbyte_cdk.models import SyncMode
-from airbyte_cdk.sources.streams import Stream
-from airbyte_cdk.sources.streams.core import package_name_from_class
-from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
-from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
-from cached_property import cached_property
-from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse
-from facebook_business.exceptions import FacebookRequestError
-from source_facebook_marketing.api import API
-
-from .async_job import AsyncJob
-from .common import FacebookAPIException, JobException, batch, deep_merge, retry_pattern
-
-backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
-
-
-def remove_params_from_url(url: str, params: List[str]) -> str:
-    """
-    Parses a URL and removes the query parameters specified in params
-    :param url: URL
-    :param params: list of query parameters
-    :return: URL with params removed
-    """
-    parsed = urlparse.urlparse(url)
-    query = urlparse.parse_qs(parsed.query, keep_blank_values=True)
-    filtered = dict((k, v) for k, v in query.items() if k not in params)
-    return urlparse.urlunparse(
-        [parsed.scheme, parsed.netloc, parsed.path, parsed.params, urlparse.urlencode(filtered, doseq=True), parsed.fragment]
-    )
-
-
-def fetch_thumbnail_data_url(url: str) -> str:
-    try:
-        response = requests.get(url)
-        if response.status_code == 200:
-            type = response.headers["content-type"]
-            data = base64.b64encode(response.content)
-            return f"data:{type};base64,{data.decode('ascii')}"
-    except requests.exceptions.RequestException:
-        pass
-    return None
-
-
-class FBMarketingStream(Stream, ABC):
-    """Base stream class"""
-
-    primary_key = "id"
-    transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
-
-    page_size = 100
-
-    enable_deleted = False
-    entity_prefix = None
-
-    def __init__(self, api: API, include_deleted: bool = False, **kwargs):
-        super().__init__(**kwargs)
-        self._api = api
-        self._include_deleted = include_deleted if self.enable_deleted else False
-
-    @cached_property
-    def fields(self) -> List[str]:
-        """List of fields that we want to query, for now just all properties from stream's schema"""
-        return list(self.get_json_schema().get("properties", {}).keys())
-
-    @backoff_policy
-    def execute_in_batch(self, requests: Iterable[FacebookRequest]) -> Sequence[MutableMapping[str, Any]]:
-        """Execute list of requests in batches"""
-        records = []
-
-        def success(response: FacebookResponse):
-            records.append(response.json())
-
-        def failure(response: FacebookResponse):
-            raise response.error()
-
-        api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
-        for request in requests:
-            api_batch.add_request(request, success=success, failure=failure)
-        retry_batch = api_batch.execute()
-        if retry_batch:
-            raise FacebookAPIException(f"Batch has failed {len(retry_batch)} requests")
-
-        return records
-
-    def read_records(
-        self,
-        sync_mode: SyncMode,
-        cursor_field: List[str] = None,
-        stream_slice: Mapping[str, Any] = None,
-        stream_state: Mapping[str, Any] = None,
-    ) -> Iterable[Mapping[str, Any]]:
-        """Main read method used by CDK"""
-        for record in self._read_records(params=self.request_params(stream_state=stream_state)):
-            yield self._extend_record(record, fields=self.fields)
-
-    def _read_records(self, params: Mapping[str, Any]) -> Iterable:
-        """Wrapper around query to backoff errors.
-        We have default implementation because we still can override read_records so this method is not mandatory.
-        """
-        return []
-
-    @backoff_policy
-    def _extend_record(self, obj: Any, **kwargs):
-        """Wrapper around api_get to backoff errors"""
-        return obj.api_get(**kwargs).export_all_data()
-
-    def request_params(self, **kwargs) -> MutableMapping[str, Any]:
-        """Parameters that should be passed to query_records method"""
-        params = {"limit": self.page_size}
-
-        if self._include_deleted:
-            params.update(self._filter_all_statuses())
-
-        return params
-
-    def _filter_all_statuses(self) -> MutableMapping[str, Any]:
-        """Filter that covers all possible statuses thus including deleted/archived records"""
-        filt_values = [
-            "active",
-            "archived",
-            "completed",
-            "limited",
-            "not_delivering",
-            "deleted",
-            "not_published",
-            "pending_review",
-            "permanently_deleted",
-            "recently_completed",
-            "recently_rejected",
-            "rejected",
-            "scheduled",
-            "inactive",
-        ]
-
-        return {
-            "filtering": [
-                {"field": f"{self.entity_prefix}.delivery_info", "operator": "IN", "value": filt_values},
-            ],
-        }
-
-
-class FBMarketingIncrementalStream(FBMarketingStream, ABC):
-    cursor_field = "updated_time"
-
-    def __init__(self, start_date: datetime, end_date: datetime, **kwargs):
-        super().__init__(**kwargs)
-        self._start_date = pendulum.instance(start_date)
-        self._end_date = pendulum.instance(end_date)
-
-    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
-        """Update stream state from latest record"""
-        potentially_new_records_in_the_past = self._include_deleted and not current_stream_state.get("include_deleted", False)
-        record_value = latest_record[self.cursor_field]
-        state_value = current_stream_state.get(self.cursor_field) or record_value
-        max_cursor = max(pendulum.parse(state_value), pendulum.parse(record_value))
-        if potentially_new_records_in_the_past:
-            max_cursor = record_value
-
-        return {
-            self.cursor_field: str(max_cursor),
-            "include_deleted": self._include_deleted,
-        }
-
-    def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
-        """Include state filter"""
-        params = super().request_params(**kwargs)
-        params = deep_merge(params, self._state_filter(stream_state=stream_state or {}))
-        return params
-
-    def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
-        """Additional filters associated with state if any set"""
-        state_value = stream_state.get(self.cursor_field)
-        filter_value = self._start_date if not state_value else pendulum.parse(state_value)
-
-        potentially_new_records_in_the_past = self._include_deleted and not stream_state.get("include_deleted", False)
-        if potentially_new_records_in_the_past:
-            self.logger.info(f"Ignoring bookmark for {self.name} because of enabled `include_deleted` option")
-            filter_value = self._start_date
-
-        return {
-            "filtering": [
-                {
-                    "field": f"{self.entity_prefix}.{self.cursor_field}",
-                    "operator": "GREATER_THAN",
-                    "value": filter_value.int_timestamp,
-                },
-            ],
-        }
-
-
-class AdCreatives(FBMarketingStream):
-    """AdCreative is append only stream
-    doc: https://developers.facebook.com/docs/marketing-api/reference/ad-creative
-    """
-
-    entity_prefix = "adcreative"
-    batch_size = 50
-
-    def __init__(self, fetch_thumbnail_images: bool = False, **kwargs):
-        super().__init__(**kwargs)
-        self._fetch_thumbnail_images = fetch_thumbnail_images
-
-    def read_records(
-        self,
-        sync_mode: SyncMode,
-        cursor_field: List[str] = None,
-        stream_slice: Mapping[str, Any] = None,
-        stream_state: Mapping[str, Any] = None,
-    ) -> Iterable[Mapping[str, Any]]:
-        """Read records using batch API"""
-        records = self._read_records(params=self.request_params(stream_state=stream_state))
-        # "thumbnail_data_url" is a field in our stream's schema because we
-        # output it (see fix_thumbnail_urls below), but it's not a field that
-        # we can request from Facebook
-        request_fields = [f for f in self.fields if f != "thumbnail_data_url"]
-        requests = [record.api_get(fields=request_fields, pending=True) for record in records]
-        for requests_batch in batch(requests, size=self.batch_size):
-            for record in self.execute_in_batch(requests_batch):
-                yield self.fix_thumbnail_urls(record)
-
-    def fix_thumbnail_urls(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
-        """Cleans and, if enabled, fetches thumbnail URLs for each creative."""
-        # The thumbnail_url contains some extra query parameters that don't affect the validity of the URL, but break SAT
-        thumbnail_url = record.get("thumbnail_url")
-        if thumbnail_url:
-            record["thumbnail_url"] = remove_params_from_url(thumbnail_url, ["_nc_hash", "d"])
-            if self._fetch_thumbnail_images:
-                record["thumbnail_data_url"] = fetch_thumbnail_data_url(thumbnail_url)
-        return record
-
-    @backoff_policy
-    def _read_records(self, params: Mapping[str, Any]) -> Iterator:
-        return self._api.account.get_ad_creatives(params=params)
-
-
-class Ads(FBMarketingIncrementalStream):
-    """doc: https://developers.facebook.com/docs/marketing-api/reference/adgroup"""
-
-    entity_prefix = "ad"
-    enable_deleted = True
-
-    @backoff_policy
-    def _read_records(self, params: Mapping[str, Any]):
-        return self._api.account.get_ads(params=params, fields=[self.cursor_field])
-
-
-class AdSets(FBMarketingIncrementalStream):
-    """doc: https://developers.facebook.com/docs/marketing-api/reference/ad-campaign"""
-
-    entity_prefix = "adset"
-    enable_deleted = True
-
-    @backoff_policy
-    def _read_records(self, params: Mapping[str, Any]):
-        return self._api.account.get_ad_sets(params=params)
-
-
-class Campaigns(FBMarketingIncrementalStream):
-    """doc: https://developers.facebook.com/docs/marketing-api/reference/ad-campaign-group"""
-
-    entity_prefix = "campaign"
-    enable_deleted = True
-
-    @backoff_policy
-    def _read_records(self, params: Mapping[str, Any]):
-        return self._api.account.get_campaigns(params=params)
-
-
-class Videos(FBMarketingIncrementalStream):
-    """See: https://developers.facebook.com/docs/marketing-api/reference/video"""
-
-    entity_prefix = "video"
-    enable_deleted = True
-
-    @backoff_policy
-    def _read_records(self, params: Mapping[str, Any]) -> Iterator:
-        return self._api.account.get_ad_videos(params=params)
-
-
-class AdsInsights(FBMarketingIncrementalStream):
-    """doc: https://developers.facebook.com/docs/marketing-api/insights"""
-
-    cursor_field = "date_start"
-    primary_key = None
-
-    ALL_ACTION_ATTRIBUTION_WINDOWS = [
-        "1d_click",
-        "7d_click",
-        "28d_click",
-        "1d_view",
-        "7d_view",
-        "28d_view",
-    ]
-
-    ALL_ACTION_BREAKDOWNS = [
-        "action_type",
-        "action_target_id",
-        "action_destination",
-    ]
-
-    MAX_ASYNC_SLEEP = pendulum.duration(minutes=5)
-    MAX_ASYNC_JOBS = 10
-    INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30)
-
-    action_breakdowns = ALL_ACTION_BREAKDOWNS
-    level = "ad"
-    action_attribution_windows = ALL_ACTION_ATTRIBUTION_WINDOWS
-    time_increment = 1
-
-    breakdowns = []
-
-    def __init__(
-        self,
-        buffer_days,
-        days_per_job,
-        name: str = None,
-        fields: List[str] = None,
-        breakdowns: List[str] = None,
-        action_breakdowns: List[str] = None,
-        **kwargs,
-    ):
-
-        super().__init__(**kwargs)
-        self.lookback_window = pendulum.duration(days=buffer_days)
-        self._days_per_job = days_per_job
-        self._fields = fields
-        self.action_breakdowns = action_breakdowns or self.action_breakdowns
-        self.breakdowns = breakdowns or self.breakdowns
-        self._new_class_name = name
-
-    @property
-    def name(self) -> str:
-        """
-        :return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
-        """
-        name = self._new_class_name or self.__class__.__name__
-        return casing.camel_to_snake(name)
-
-    def read_records(
-        self,
-        sync_mode: SyncMode,
-        cursor_field: List[str] = None,
-        stream_slice: Mapping[str, Any] = None,
-        stream_state: Mapping[str, Any] = None,
-    ) -> Iterable[Mapping[str, Any]]:
-        """Waits for current job to finish (slice) and yield its result"""
-        job = self.wait_for_job(stream_slice["job"])
-        # because we query `lookback_window` days before actual cursor we might get records older then cursor
-
-        for obj in job.get_result():
-            yield obj.export_all_data()
-
-    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
-        """Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
-        This solution for Async was chosen because:
-        1. we should commit state after each successful job
-        2. we should run as many job as possible before checking for result
-        3. we shouldn't proceed to consumption of the next job before previous succeed
-        """
-        stream_state = stream_state or {}
-        running_jobs = deque()
-        date_ranges = list(self._date_ranges(stream_state=stream_state))
-        for params in date_ranges:
-            params = deep_merge(params, self.request_params(stream_state=stream_state))
-            job = AsyncJob(api=self._api, params=params)
-            job.start()
-            running_jobs.append(job)
-            if len(running_jobs) >= self.MAX_ASYNC_JOBS:
-                yield {"job": running_jobs.popleft()}
-
-        while running_jobs:
-            yield {"job": running_jobs.popleft()}
-
-    @retry_pattern(backoff.expo, JobException, max_tries=10, factor=5)
-    def wait_for_job(self, job: AsyncJob) -> AsyncJob:
-        if job.failed:
-            job.restart()
-
-        factor = 2
-        sleep_seconds = factor
-        while not job.completed:
-            self.logger.info(f"{job}: sleeping {sleep_seconds} seconds while waiting for completion")
-            time.sleep(sleep_seconds)
-            if sleep_seconds < self.MAX_ASYNC_SLEEP.in_seconds():
-                sleep_seconds *= factor
-
-        return job
-
-    def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
-        params = super().request_params(stream_state=stream_state, **kwargs)
-        params = deep_merge(
-            params,
-            {
-                "level": self.level,
-                "action_breakdowns": self.action_breakdowns,
-                "breakdowns": self.breakdowns,
-                "fields": self.fields,
-                "time_increment": self.time_increment,
-                "action_attribution_windows": self.action_attribution_windows,
-            },
-        )
-
-        return params
-
-    def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
-        """Works differently for insights, so remove it"""
-        return {}
-
-    def get_json_schema(self) -> Mapping[str, Any]:
-        """Add fields from breakdowns to the stream schema
-        :return: A dict of the JSON schema representing this stream.
-        """
-        loader = ResourceSchemaLoader(package_name_from_class(self.__class__))
-        schema = loader.get_schema("ads_insights")
-        if self._fields:
-            schema["properties"] = {k: v for k, v in schema["properties"].items() if k in self._fields}
-        if self.breakdowns:
-            breakdowns_properties = loader.get_schema("ads_insights_breakdowns")["properties"]
-            schema["properties"].update({prop: breakdowns_properties[prop] for prop in self.breakdowns})
-        return schema
-
-    @cached_property
-    def fields(self) -> List[str]:
-        """List of fields that we want to query, for now just all properties from stream's schema"""
-        if self._fields:
-            return self._fields
-        schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights")
-        return list(schema.get("properties", {}).keys())
-
-    def _date_ranges(self, stream_state: Mapping[str, Any]) -> Iterator[dict]:
-        """Iterate over period between start_date/state and now
-
-        Notes: Facebook freezes insight data 28 days after it was generated, which means that all data
-            from the past 28 days may have changed since we last emitted it, so we retrieve it again.
-        """
-        state_value = stream_state.get(self.cursor_field)
-        if state_value:
-            start_date = pendulum.parse(state_value) - self.lookback_window
-        else:
-            start_date = self._start_date
-        end_date = self._end_date
-        start_date = max(end_date - self.INSIGHTS_RETENTION_PERIOD, start_date)
-
-        for since in pendulum.period(start_date, end_date).range("days", self._days_per_job):
-            until = min(since.add(days=self._days_per_job - 1), end_date)  # -1 because time_range is inclusive
-            yield {
-                "time_range": {"since": since.to_date_string(), "until": until.to_date_string()},
-            }
-
-
-class AdsInsightsAgeAndGender(AdsInsights):
-    breakdowns = ["age", "gender"]
-
-
-class AdsInsightsCountry(AdsInsights):
-    breakdowns = ["country"]
-
-
-class AdsInsightsRegion(AdsInsights):
-    breakdowns = ["region"]
-
-
-class AdsInsightsDma(AdsInsights):
-    breakdowns = ["dma"]
-
-
-class AdsInsightsPlatformAndDevice(AdsInsights):
-    breakdowns = ["publisher_platform", "platform_position", "impression_device"]
-    action_breakdowns = ["action_type"]  # FB Async Job fails for unknown reason if we set other breakdowns
-
-
-class AdsInsightsActionType(AdsInsights):
-    breakdowns = []
-    action_breakdowns = ["action_type"]
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/__init__.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/__init__.py
new file mode 100644
index 0000000000000..784bf61081ffc
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/__init__.py
@@ -0,0 +1,33 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+
+from .streams import (
+    AdCreatives,
+    Ads,
+    AdSets,
+    AdsInsights,
+    AdsInsightsActionType,
+    AdsInsightsAgeAndGender,
+    AdsInsightsCountry,
+    AdsInsightsDma,
+    AdsInsightsPlatformAndDevice,
+    AdsInsightsRegion,
+    Campaigns,
+    Videos,
+)
+
+__all__ = [
+    "AdCreatives",
+    "Ads",
+    "AdSets",
+    "AdsInsights",
+    "AdsInsightsActionType",
+    "AdsInsightsAgeAndGender",
+    "AdsInsightsCountry",
+    "AdsInsightsDma",
+    "AdsInsightsPlatformAndDevice",
+    "AdsInsightsRegion",
+    "Campaigns",
+    "Videos",
+]
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py
new file mode 100644
index 0000000000000..7dc8c43948ec0
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py
@@ -0,0 +1,276 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+import copy
+import logging
+from abc import ABC, abstractmethod
+from enum import Enum
+from typing import Any, List, Mapping, Optional
+
+import pendulum
+from facebook_business.adobjects.adreportrun import AdReportRun
+from facebook_business.adobjects.campaign import Campaign
+from facebook_business.adobjects.objectparser import ObjectParser
+from facebook_business.api import FacebookAdsApiBatch, FacebookResponse
+
+logger = logging.getLogger("airbyte")
+
+
+def chunks(data, n):
+    """Yield successive n-sized chunks from lst."""
+    for i in range(0, len(data), n):
+        yield data[i : i + n]
+
+
+class Status(str, Enum):
+    """Async job statuses"""
+
+    COMPLETED = "Job Completed"
+    FAILED = "Job Failed"
+    SKIPPED = "Job Skipped"
+    STARTED = "Job Started"
+    RUNNING = "Job Running"
+    NOT_STARTED = "Job Not Started"
+
+
+class AsyncJob(ABC):
+    """Abstract AsyncJob base class"""
+
+    @abstractmethod
+    def start(self):
+        """Start remote job"""
+
+    @abstractmethod
+    def restart(self):
+        """Restart failed job"""
+
+    @property
+    @abstractmethod
+    def restart_number(self):
+        """Number of restarts"""
+
+    @property
+    @abstractmethod
+    def completed(self) -> bool:
+        """Check job status and return True if it is completed, use failed/succeeded to check if it was successful"""
+
+    @property
+    @abstractmethod
+    def failed(self) -> bool:
+        """Tell if the job previously failed"""
+
+    @abstractmethod
+    def update_job(self, batch=None):
+        """Method to retrieve job's status, separated because of retry handler"""
+
+    @abstractmethod
+    def get_result(self) -> Any:
+        """Retrieve result of the finished job."""
+
+
+class ParentAsyncJob(AsyncJob):
+    """Group of async jobs"""
+
+    def __init__(self, api, jobs: List[AsyncJob]):
+        self._api = api
+        self._jobs = jobs
+        self._restart_number = 0
+
+    def start(self):
+        """Start each job in the group."""
+        for job in self._jobs:
+            job.start()
+
+    def restart(self):
+        """Restart failed jobs"""
+        for job in self._jobs:
+            if job.failed:
+                job.restart()
+            self._restart_number = max(self._restart_number, job.restart_number)
+
+    @property
+    def restart_number(self):
+        """Number of restarts"""
+        return self._restart_number
+
+    @property
+    def completed(self) -> bool:
+        """Check job status and return True if all jobs are completed, use failed/succeeded to check if it was successful"""
+        return all(job.completed for job in self._jobs)
+
+    @property
+    def failed(self) -> bool:
+        """Tell if any job previously failed"""
+        return any(job.failed for job in self._jobs)
+
+    def update_job(self, batch=None):
+        """Checks jobs status in advance and restart if some failed."""
+        batch = self._api.new_batch()
+        unfinished_jobs = [job for job in self._jobs if not job.completed]
+        for jobs in chunks(unfinished_jobs, 50):
+            for job in jobs:
+                job.update_job(batch=batch)
+
+            while batch:
+                # If some of the calls from batch have failed, it returns  a new
+                # FacebookAdsApiBatch object with those calls
+                batch = batch.execute()
+
+    def get_result(self) -> Any:
+        """Retrieve result of the finished job."""
+        for job in self._jobs:
+            yield from job.get_result()
+
+    def split_job(self) -> "AsyncJob":
+        """Split existing job in few smaller ones grouped by ParentAsyncJob class"""
+        raise RuntimeError("Splitting of ParentAsyncJob is not allowed.")
+
+
+class InsightAsyncJob(AsyncJob):
+    """AsyncJob wraps FB AdReport class and provides interface to restart/retry the async job"""
+
+    def __init__(self, api, edge_object: Any, params: Mapping[str, Any], key: Optional[Any] = None):
+        """Initialize
+
+        :param api: FB API
+        :param edge_object: Account, Campaign, (AdSet or Ad in future)
+        :param params: job params, required to start/restart job
+        """
+        self._api = api
+        self._params = params
+        self._edge_object = edge_object
+        self._job: Optional[AdReportRun] = None
+        self._start_time = None
+        self._finish_time = None
+        self._failed = False
+        self._restart_number = 0
+        self.key = key
+
+    def split_job(self) -> ParentAsyncJob:
+        """Split existing job in few smaller ones grouped by ParentAsyncJob class.
+        TODO: use some cache to avoid expensive queries across different streams.
+        """
+        campaign_params = dict(copy.deepcopy(self._params))
+        # get campaigns from attribution window as well (28 day + 1 current day)
+        new_start = pendulum.parse(self._params["time_range"]["since"]) - pendulum.duration(days=28 + 1)
+        campaign_params.update(fields=["campaign_id"], level="campaign")
+        campaign_params["time_range"].update(since=new_start.to_date_string())
+        campaign_params.pop("time_increment")  # query all days
+        result = self._edge_object.get_insights(params=campaign_params)
+        campaign_ids = set(row["campaign_id"] for row in result)
+        logger.info(f"Got {len(campaign_ids)} campaigns for period {self._params['time_range']}: {campaign_ids}")
+
+        return ParentAsyncJob(self._api, jobs=[InsightAsyncJob(self._api, Campaign(pk), self._params) for pk in campaign_ids])
+
+    def start(self):
+        """Start remote job"""
+        if self._job:
+            raise RuntimeError(f"{self}: Incorrect usage of start - the job already started, use restart instead")
+
+        self._job = self._edge_object.get_insights(params=self._params, is_async=True)
+        self._start_time = pendulum.now()
+        job_id = self._job["report_run_id"]
+        time_range = self._params["time_range"]
+        breakdowns = self._params["breakdowns"]
+        logger.info(f"Created AdReportRun: {job_id} to sync insights {time_range} with breakdown {breakdowns} for {self._edge_object}")
+
+    def restart(self):
+        """Restart failed job"""
+        if not self._job or not self.failed:
+            raise RuntimeError(f"{self}: Incorrect usage of restart - only failed jobs can be restarted")
+
+        self._job = None
+        self._failed = False
+        self._start_time = None
+        self._finish_time = None
+        self._restart_number += 1
+        self.start()
+        logger.info(f"{self}: restarted")
+
+    @property
+    def restart_number(self):
+        """Number of restarts"""
+        return self._restart_number
+
+    @property
+    def elapsed_time(self) -> Optional[pendulum.duration]:
+        """Elapsed time since the job start"""
+        if not self._start_time:
+            return None
+
+        end_time = self._finish_time or pendulum.now()
+        return end_time - self._start_time
+
+    @property
+    def completed(self) -> bool:
+        """Check job status and return True if it is completed, use failed/succeeded to check if it was successful
+
+        :return: True if completed, False - if task still running
+        :raises: JobException in case job failed to start, failed or timed out
+        """
+        return bool(self._finish_time is not None)
+
+    @property
+    def failed(self) -> bool:
+        """Tell if the job previously failed"""
+        return self._failed
+
+    def _batch_success_handler(self, response: FacebookResponse):
+        """Update job status from response"""
+        print("GOT", response.json())
+        self._job = ObjectParser(reuse_object=self._job).parse_single(response.json())
+        self._check_status()
+
+    def _batch_failure_handler(self, response: FacebookResponse):
+        """Update job status from response"""
+        logger.info(f"Request failed with response: {response.body()}")
+
+    def update_job(self, batch: Optional[FacebookAdsApiBatch] = None):
+        """Method to retrieve job's status, separated because of retry handler"""
+        if not self._job:
+            raise RuntimeError(f"{self}: Incorrect usage of the method - the job is not started")
+
+        if self.completed:
+            job_progress_pct = self._job["async_percent_completion"]
+            logger.info(f"{self} is {job_progress_pct}% complete ({self._job['async_status']})")
+            # No need to update job status if its already completed
+            return
+
+        if batch is not None:
+            self._job.api_get(batch=batch, success=self._batch_success_handler, failure=self._batch_failure_handler)
+        else:
+            self._job = self._job.api_get()
+            self._check_status()
+
+    def _check_status(self) -> bool:
+        """Perform status check
+
+        :return: True if the job is completed, False - if the job is still running
+        """
+        job_progress_pct = self._job["async_percent_completion"]
+        job_status = self._job["async_status"]
+        logger.info(f"{self} is {job_progress_pct}% complete ({job_status})")
+
+        if job_status == Status.COMPLETED:
+            self._finish_time = pendulum.now()  # TODO: is not actual running time, but interval between check_status calls
+            return True
+        elif job_status in [Status.FAILED, Status.SKIPPED]:
+            self._finish_time = pendulum.now()
+            self._failed = True
+            logger.info(f"{self._job} has status {job_status} after {self.elapsed_time.in_seconds()} seconds.")
+            return True
+
+        return False
+
+    def get_result(self) -> Any:
+        """Retrieve result of the finished job."""
+        if not self._job or self.failed:
+            raise RuntimeError(f"{self}: Incorrect usage of get_result - the job is not started of failed")
+        return self._job.get_result(params={"limit": 100})
+
+    def __str__(self) -> str:
+        """String representation of the job wrapper."""
+        job_id = self._job["report_run_id"] if self._job else "<None>"
+        time_range = self._params["time_range"]
+        breakdowns = self._params["breakdowns"]
+        return f"AdReportRun(id={job_id}, {self._edge_object}, time_range={time_range}, breakdowns={breakdowns}"
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py
new file mode 100644
index 0000000000000..8c8ae23f13527
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py
@@ -0,0 +1,162 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+
+import logging
+import time
+from typing import TYPE_CHECKING, Iterator, List, Tuple
+
+from facebook_business.api import FacebookAdsApiBatch
+
+from .async_job import AsyncJob
+
+if TYPE_CHECKING:
+    from source_facebook_marketing.api import API
+
+logger = logging.getLogger("airbyte")
+
+
+class InsightAsyncJobManager:
+    """
+    Class for managing Ads Insights async jobs. Before running next job it
+    checks current insight throttle value and if it greater than THROTTLE_LIMIT
+    variable, no new jobs added.
+    To consume completed jobs use completed_job generator, jobs will be returned in the same order they finished.
+    """
+
+    # When current insights throttle hit this value no new jobs added.
+    THROTTLE_LIMIT = 70
+    FAILED_JOBS_RESTART_COUNT = 5
+    # Time to wait before checking job status update again.
+    JOB_STATUS_UPDATE_SLEEP_SECONDS = 30
+    # Maximum of concurrent jobs that could be scheduled. Since throttling
+    # limit is not reliable indicator of async workload capability we still
+    # have to use this parameter. It is equal to maximum number of request in batch (FB API limit)
+    MAX_JOBS_IN_QUEUE = 100
+    MAX_JOBS_TO_CHECK = 50
+
+    def __init__(self, api: "API", jobs: Iterator[AsyncJob]):
+        """Init
+
+        :param api:
+        :param jobs:
+        """
+        self._api = api
+        self._jobs = jobs
+        self._running_jobs = []
+
+    def _start_jobs(self):
+        """Enqueue new jobs."""
+
+        self._update_api_throttle_limit()
+        self._wait_throttle_limit_down()
+        prev_jobs_count = len(self._running_jobs)
+        while self._get_current_throttle_value() < self.THROTTLE_LIMIT and len(self._running_jobs) < self.MAX_JOBS_IN_QUEUE:
+            job = next(iter(self._jobs), None)
+            if not job:
+                self._empty = True
+                break
+            job.start()
+            self._running_jobs.append(job)
+
+        logger.info(
+            f"Added: {len(self._running_jobs) - prev_jobs_count} jobs. "
+            f"Current throttle limit is {self._current_throttle()}, "
+            f"{len(self._running_jobs)}/{self.MAX_JOBS_IN_QUEUE} job(s) in queue"
+        )
+
+    def completed_jobs(self) -> Iterator[AsyncJob]:
+        """Wait until job is ready and return it. If job
+            failed try to restart it for FAILED_JOBS_RESTART_COUNT times. After job
+            is completed new jobs added according to current throttling limit.
+
+        :yield: completed jobs
+        """
+        if not self._running_jobs:
+            self._start_jobs()
+
+        while self._running_jobs:
+            completed_jobs = self._check_jobs_status_and_restart()
+            while not completed_jobs:
+                logger.info(f"No jobs ready to be consumed, wait for {self.JOB_STATUS_UPDATE_SLEEP_SECONDS} seconds")
+                time.sleep(self.JOB_STATUS_UPDATE_SLEEP_SECONDS)
+                completed_jobs = self._check_jobs_status_and_restart()
+            yield from completed_jobs
+            self._start_jobs()
+
+    def _check_jobs_status_and_restart(self) -> List[AsyncJob]:
+        """Checks jobs status in advance and restart if some failed.
+
+        :return: list of completed jobs
+        """
+        completed_jobs = []
+        running_jobs = []
+        api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
+        for job in self._running_jobs:
+            # we check it here because job can be an instance of ParentAsyncJob, which uses its own batch object
+            if len(api_batch) >= self.MAX_JOBS_TO_CHECK:
+                logger.info("Reached batch queue limit")
+                break
+            job.update_job(batch=api_batch)
+
+        while api_batch:
+            # If some of the calls from batch have failed, it returns  a new
+            # FacebookAdsApiBatch object with those calls
+            api_batch = api_batch.execute()
+
+        failed_num = 0
+        self._wait_throttle_limit_down()
+        for job in self._running_jobs:
+            if job.completed:
+                if job.failed:
+                    if job.restart_number >= self.FAILED_JOBS_RESTART_COUNT:
+                        raise Exception(f"Job {job} failed more than {self.FAILED_JOBS_RESTART_COUNT} times. Terminating...")
+                    elif job.restart_number:
+                        logger.info(f"Job {job} failed, trying to split job into smaller chunks (campaigns).")
+                        group_job = job.split_job()
+                        running_jobs.append(group_job)
+                        group_job.start()
+                    else:
+                        logger.info(f"Job {job} failed, restarting")
+                        job.restart()
+                        running_jobs.append(job)
+                    failed_num += 1
+                else:
+                    completed_jobs.append(job)
+            else:
+                running_jobs.append(job)
+
+        self._running_jobs = running_jobs
+        logger.info(f"Completed jobs: {len(completed_jobs)}, Failed jobs: {failed_num}, Running jobs: {len(self._running_jobs)}")
+
+        return completed_jobs
+
+    def _wait_throttle_limit_down(self):
+        while self._get_current_throttle_value() > self.THROTTLE_LIMIT:
+            logger.info(f"Current throttle is {self._current_throttle()}, wait {self.JOB_STATUS_UPDATE_SLEEP_SECONDS} seconds")
+            time.sleep(self.JOB_STATUS_UPDATE_SLEEP_SECONDS)
+            self._update_api_throttle_limit()
+
+    def _current_throttle(self) -> Tuple[float, float]:
+        """
+        Return tuple of 2 floats representing current ads insights throttle values for app id and account id
+        """
+        return self._api.api.ads_insights_throttle
+
+    def _get_current_throttle_value(self) -> float:
+        """
+        Get current ads insights throttle value based on app id and account id.
+        It evaluated as minimum of those numbers cause when account id throttle
+        hit 100 it cool down very slowly (i.e. it still says 100 despite no jobs
+        running and it capable serve new requests). Because of this behaviour
+        facebook throttle limit is not reliable metric to estimate async workload.
+        """
+        return min(self._current_throttle()[0], self._current_throttle()[1])
+
+    def _update_api_throttle_limit(self):
+        """
+        Sends <ACCOUNT_ID>/insights GET request with no parameters so it would
+        respond with empty list of data so api use "x-fb-ads-insights-throttle"
+        header to update current insights throttle limit.
+        """
+        self._api.account.get_insights()
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py
new file mode 100644
index 0000000000000..9f5c2aae315bd
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py
@@ -0,0 +1,260 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+
+import copy
+import logging
+from typing import (
+    Any,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Optional,
+    Union,
+)
+
+import airbyte_cdk.sources.utils.casing as casing
+import pendulum
+from airbyte_cdk.models import SyncMode
+from airbyte_cdk.sources.streams.core import package_name_from_class
+from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
+from cached_property import cached_property
+from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob
+from source_facebook_marketing.streams.async_job_manager import InsightAsyncJobManager
+
+from .base_streams import FBMarketingIncrementalStream
+
+logger = logging.getLogger("airbyte")
+
+
+class AdsInsights(FBMarketingIncrementalStream):
+    """doc: https://developers.facebook.com/docs/marketing-api/insights"""
+
+    cursor_field = "date_start"
+
+    ALL_ACTION_ATTRIBUTION_WINDOWS = [
+        "1d_click",
+        "7d_click",
+        "28d_click",
+        "1d_view",
+        "7d_view",
+        "28d_view",
+    ]
+
+    ALL_ACTION_BREAKDOWNS = [
+        "action_type",
+        "action_target_id",
+        "action_destination",
+    ]
+
+    # Facebook store metrics maximum of 37 months old. Any time range that
+    # older that 37 months from current date would result in 400 Bad request
+    # HTTP response.
+    # https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/#overview
+    INSIGHTS_RETENTION_PERIOD = pendulum.duration(months=37)
+    # Facebook freezes insight data 28 days after it was generated, which means that all data
+    # from the past 28 days may have changed since we last emitted it, so we retrieve it again.
+    INSIGHTS_LOOKBACK_PERIOD = pendulum.duration(days=28)
+
+    action_breakdowns = ALL_ACTION_BREAKDOWNS
+    level = "ad"
+    action_attribution_windows = ALL_ACTION_ATTRIBUTION_WINDOWS
+    time_increment = 1
+
+    breakdowns = []
+
+    def __init__(
+        self,
+        name: str = None,
+        fields: List[str] = None,
+        breakdowns: List[str] = None,
+        action_breakdowns: List[str] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self._start_date = self._start_date.date()
+        self._end_date = self._end_date.date()
+        self._fields = fields
+        self.action_breakdowns = action_breakdowns or self.action_breakdowns
+        self.breakdowns = breakdowns or self.breakdowns
+        self._new_class_name = name
+
+        # state
+        self._cursor_value: Optional[pendulum.Date] = None  # latest period that was read
+        self._next_cursor_value = self._get_start_date()
+        self._completed_slices = set()
+
+    @property
+    def name(self) -> str:
+        """We override stream name to let the user change it via configuration."""
+        name = self._new_class_name or self.__class__.__name__
+        return casing.camel_to_snake(name)
+
+    @property
+    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
+        """Build complex PK based on slices and breakdowns"""
+        return ["date_start", "ad_id"] + self.breakdowns
+
+    def _get_campaign_ids(self, params) -> List[str]:
+        campaign_params = copy.deepcopy(params)
+        campaign_params.update(fields=["campaign_id"], level="campaign")
+        result = self._api.account.get_insights(params=campaign_params)
+        return list(set(row["campaign_id"] for row in result))
+
+    def read_records(
+        self,
+        sync_mode: SyncMode,
+        cursor_field: List[str] = None,
+        stream_slice: Mapping[str, Any] = None,
+        stream_state: Mapping[str, Any] = None,
+    ) -> Iterable[Mapping[str, Any]]:
+        """Waits for current job to finish (slice) and yield its result"""
+        job = stream_slice["insight_job"]
+        for obj in job.get_result():
+            yield obj.export_all_data()
+
+        self._completed_slices.add(job.key)
+        if job.key == self._next_cursor_value:
+            self._advance_cursor()
+
+    @property
+    def state(self) -> MutableMapping[str, Any]:
+        """State getter, the result can be stored by the source"""
+        if self._cursor_value:
+            return {
+                self.cursor_field: self._cursor_value.isoformat(),
+                "slices": [d.isoformat() for d in self._completed_slices],
+            }
+
+        if self._completed_slices:
+            return {
+                "slices": [d.isoformat() for d in self._completed_slices],
+            }
+
+        return {}
+
+    @state.setter
+    def state(self, value: MutableMapping[str, Any]):
+        """State setter"""
+        self._cursor_value = pendulum.parse(value[self.cursor_field]).date() if value.get(self.cursor_field) else None
+        self._completed_slices = set(pendulum.parse(v).date() for v in value.get("slices", []))
+        self._next_cursor_value = self._get_start_date()
+
+    def _date_intervals(self) -> Iterator[pendulum.Date]:
+        date_range = self._end_date - self._next_cursor_value
+        return date_range.range("days", self.time_increment)
+
+    def _advance_cursor(self):
+        """Iterate over state, find continuing sequence of slices. Get last value, advance cursor there and remove slices from state"""
+        for ts_start in self._date_intervals():
+            if ts_start not in self._completed_slices:
+                self._next_cursor_value = ts_start
+                break
+            self._completed_slices.remove(ts_start)
+            self._cursor_value = ts_start
+
+    def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]:
+        """Generator of async jobs
+
+        :param params:
+        :return:
+        """
+
+        for ts_start in self._date_intervals():
+            if ts_start in self._completed_slices:
+                continue
+            ts_end = ts_start + pendulum.duration(days=self.time_increment - 1)
+            total_params = {
+                **params,
+                "time_range": {
+                    "since": ts_start.to_date_string(),
+                    "until": ts_end.to_date_string(),
+                },
+            }
+            yield InsightAsyncJob(self._api.api, edge_object=self._api.account, params=total_params, key=ts_start)
+
+    def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
+        """Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
+        This solution for Async was chosen because:
+        1. we should commit state after each successful job
+        2. we should run as many job as possible before checking for result
+        3. we shouldn't proceed to consumption of the next job before previous succeed
+
+        generate slice only if it is not in state,
+        when we finished reading slice (in read_records) we check if current slice is the next one and do advance cursor
+
+        when slice is not next one we just update state with it
+        to do so source will check state attribute and call get_state,
+        """
+        manager = InsightAsyncJobManager(api=self._api, jobs=self._generate_async_jobs(params=self.request_params()))
+        for job in manager.completed_jobs():
+            yield {"insight_job": job}
+
+    def _get_start_date(self) -> pendulum.Date:
+        """Get start date to begin sync with. It is not that trivial as it might seem.
+            There are few rules:
+            - don't read data older than start_date
+            - re-read data within last 28 days
+            - don't read data older than retention date
+
+        :return: the first date to sync
+        """
+        today = pendulum.today().date()
+        oldest_date = today - self.INSIGHTS_RETENTION_PERIOD
+        refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD
+
+        if self._cursor_value:
+            start_date = self._cursor_value + pendulum.duration(days=self.time_increment)
+            if start_date > refresh_date:
+                logger.info(
+                    f"The cursor value within refresh period ({self.INSIGHTS_LOOKBACK_PERIOD}), start sync from {refresh_date} instead."
+                )
+            # FIXME: change cursor logic to not update cursor earlier than 28 days, after that we don't need this line
+            start_date = min(start_date, refresh_date)
+
+            if start_date < self._start_date:
+                logger.warning(f"Ignore provided state and start sync from start_date ({self._start_date}).")
+            start_date = max(start_date, self._start_date)
+        else:
+            start_date = self._start_date
+        if start_date < oldest_date:
+            logger.warning(f"Loading insights older then {self.INSIGHTS_RETENTION_PERIOD} is not possible. Start sync from {oldest_date}.")
+
+        return max(oldest_date, start_date)
+
+    def request_params(self, **kwargs) -> MutableMapping[str, Any]:
+        return {
+            "level": self.level,
+            "action_breakdowns": self.action_breakdowns,
+            "breakdowns": self.breakdowns,
+            "fields": self.fields,
+            "time_increment": self.time_increment,
+            "action_attribution_windows": self.action_attribution_windows,
+        }
+
+    def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
+        """Works differently for insights, so remove it"""
+        return {}
+
+    def get_json_schema(self) -> Mapping[str, Any]:
+        """Add fields from breakdowns to the stream schema
+        :return: A dict of the JSON schema representing this stream.
+        """
+        loader = ResourceSchemaLoader(package_name_from_class(self.__class__))
+        schema = loader.get_schema("ads_insights")
+        if self._fields:
+            schema["properties"] = {k: v for k, v in schema["properties"].items() if k in self._fields}
+        if self.breakdowns:
+            breakdowns_properties = loader.get_schema("ads_insights_breakdowns")["properties"]
+            schema["properties"].update({prop: breakdowns_properties[prop] for prop in self.breakdowns})
+        return schema
+
+    @cached_property
+    def fields(self) -> List[str]:
+        """List of fields that we want to query, for now just all properties from stream's schema"""
+        if self._fields:
+            return self._fields
+        schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights")
+        return list(schema.get("properties", {}).keys())
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py
new file mode 100644
index 0000000000000..42d00948a833c
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py
@@ -0,0 +1,186 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+
+import logging
+from abc import ABC
+from datetime import datetime
+from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping
+
+import pendulum
+from airbyte_cdk.models import SyncMode
+from airbyte_cdk.sources.streams import Stream
+from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
+from cached_property import cached_property
+from facebook_business.adobjects.abstractobject import AbstractObject
+from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse
+
+from .common import deep_merge
+
+if TYPE_CHECKING:
+    from source_facebook_marketing.api import API
+
+logger = logging.getLogger("airbyte")
+
+
+class FBMarketingStream(Stream, ABC):
+    """Base stream class"""
+
+    primary_key = "id"
+    transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
+
+    page_size = 100
+    use_batch = False
+
+    enable_deleted = False
+    entity_prefix = None
+
+    MAX_BATCH_SIZE = 50
+
+    def __init__(self, api: "API", include_deleted: bool = False, **kwargs):
+        super().__init__(**kwargs)
+        self._api = api
+        self._include_deleted = include_deleted if self.enable_deleted else False
+
+    @cached_property
+    def fields(self) -> List[str]:
+        """List of fields that we want to query, for now just all properties from stream's schema"""
+        return list(self.get_json_schema().get("properties", {}).keys())
+
+    def _execute_batch(self, batch):
+        """Execute batch, retry in case of failures"""
+        while batch:
+            batch = batch.execute()
+            if batch:
+                logger.info("Retry failed requests in batch")
+
+    def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Iterable[MutableMapping[str, Any]]:
+        """Execute list of requests in batches"""
+        records = []
+
+        def success(response: FacebookResponse):
+            records.append(response.json())
+
+        def failure(response: FacebookResponse):
+            # FIXME: stop sync or retry
+            logger.warning(f"Request failed with response: {response.body()}")
+
+        api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
+        for request in pending_requests:
+            api_batch.add_request(request, success=success, failure=failure)
+            if len(api_batch) == self.MAX_BATCH_SIZE:
+                self._execute_batch(api_batch)
+                yield from records
+                api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
+                records = []
+
+        self._execute_batch(api_batch)
+        yield from records
+
+    def read_records(
+        self,
+        sync_mode: SyncMode,
+        cursor_field: List[str] = None,
+        stream_slice: Mapping[str, Any] = None,
+        stream_state: Mapping[str, Any] = None,
+    ) -> Iterable[Mapping[str, Any]]:
+        """Main read method used by CDK"""
+        records_iter = self._read_records(params=self.request_params(stream_state=stream_state))
+        loaded_records_iter = (record.api_get(fields=self.fields, pending=self.use_batch) for record in records_iter)
+        if self.use_batch:
+            loaded_records_iter = self.execute_in_batch(loaded_records_iter)
+
+        for record in loaded_records_iter:
+            if isinstance(record, AbstractObject):
+                yield record.export_all_data()
+            else:
+                yield record
+
+    def _read_records(self, params: Mapping[str, Any]) -> Iterable:
+        """Wrapper around query to backoff errors.
+        We have default implementation because we still can override read_records so this method is not mandatory.
+        """
+        return []
+
+    def request_params(self, **kwargs) -> MutableMapping[str, Any]:
+        """Parameters that should be passed to query_records method"""
+        params = {"limit": self.page_size}
+
+        if self._include_deleted:
+            params.update(self._filter_all_statuses())
+
+        return params
+
+    def _filter_all_statuses(self) -> MutableMapping[str, Any]:
+        """Filter that covers all possible statuses thus including deleted/archived records"""
+        filt_values = [
+            "active",
+            "archived",
+            "completed",
+            "limited",
+            "not_delivering",
+            "deleted",
+            "not_published",
+            "pending_review",
+            "permanently_deleted",
+            "recently_completed",
+            "recently_rejected",
+            "rejected",
+            "scheduled",
+            "inactive",
+        ]
+
+        return {
+            "filtering": [
+                {"field": f"{self.entity_prefix}.delivery_info", "operator": "IN", "value": filt_values},
+            ],
+        }
+
+
+class FBMarketingIncrementalStream(FBMarketingStream, ABC):
+    cursor_field = "updated_time"
+
+    def __init__(self, start_date: datetime, end_date: datetime, **kwargs):
+        super().__init__(**kwargs)
+        self._start_date = pendulum.instance(start_date)
+        self._end_date = pendulum.instance(end_date)
+
+    def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
+        """Update stream state from latest record"""
+        potentially_new_records_in_the_past = self._include_deleted and not current_stream_state.get("include_deleted", False)
+        record_value = latest_record[self.cursor_field]
+        state_value = current_stream_state.get(self.cursor_field) or record_value
+        max_cursor = max(pendulum.parse(state_value), pendulum.parse(record_value))
+        if potentially_new_records_in_the_past:
+            max_cursor = record_value
+
+        return {
+            self.cursor_field: str(max_cursor),
+            "include_deleted": self._include_deleted,
+        }
+
+    def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
+        """Include state filter"""
+        params = super().request_params(**kwargs)
+        params = deep_merge(params, self._state_filter(stream_state=stream_state or {}))
+        return params
+
+    def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
+        """Additional filters associated with state if any set"""
+        state_value = stream_state.get(self.cursor_field)
+        filter_value = self._start_date if not state_value else pendulum.parse(state_value)
+
+        potentially_new_records_in_the_past = self._include_deleted and not stream_state.get("include_deleted", False)
+        if potentially_new_records_in_the_past:
+            self.logger.info(f"Ignoring bookmark for {self.name} because of enabled `include_deleted` option")
+            filter_value = self._start_date
+
+        return {
+            "filtering": [
+                {
+                    "field": f"{self.entity_prefix}.{self.cursor_field}",
+                    "operator": "GREATER_THAN",
+                    "value": filter_value.int_timestamp,
+                },
+            ],
+        }
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py
similarity index 76%
rename from airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py
rename to airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py
index 0ac09cbacd48f..8acf339cd7026 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py
@@ -1,10 +1,10 @@
 #
 # Copyright (c) 2021 Airbyte, Inc., all rights reserved.
 #
-
+import http.client
 import logging
 import sys
-from typing import Any, Iterable, Sequence
+from typing import Any
 
 import backoff
 import pendulum
@@ -13,31 +13,17 @@
 # The Facebook API error codes indicating rate-limiting are listed at
 # https://developers.facebook.com/docs/graph-api/overview/rate-limiting/
 FACEBOOK_RATE_LIMIT_ERROR_CODES = (4, 17, 32, 613, 80000, 80001, 80002, 80003, 80004, 80005, 80006, 80008)
+FACEBOOK_BATCH_ERROR_CODE = 960
 FACEBOOK_UNKNOWN_ERROR_CODE = 99
 DEFAULT_SLEEP_INTERVAL = pendulum.duration(minutes=1)
 
 logger = logging.getLogger("airbyte")
 
 
-class FacebookAPIException(Exception):
-    """General class for all API errors"""
-
-
 class JobException(Exception):
     """Scheduled job failed"""
 
 
-class JobTimeoutException(JobException):
-    """Scheduled job timed out"""
-
-
-def batch(iterable: Sequence, size: int = 1) -> Iterable:
-    """Split sequence in chunks"""
-    total_size = len(iterable)
-    for ndx in range(0, total_size, size):
-        yield iterable[ndx : min(ndx + size, total_size)]
-
-
 def retry_pattern(backoff_type, exception, **wait_gen_kwargs):
     def log_retry_attempt(details):
         _, exc, _ = sys.exc_info()
@@ -47,7 +33,9 @@ def log_retry_attempt(details):
     def should_retry_api_error(exc):
         if isinstance(exc, FacebookRequestError):
             call_rate_limit_error = exc.api_error_code() in FACEBOOK_RATE_LIMIT_ERROR_CODES
-            return exc.api_transient_error() or exc.api_error_subcode() == FACEBOOK_UNKNOWN_ERROR_CODE or call_rate_limit_error
+            batch_timeout_error = exc.http_status() == http.client.BAD_REQUEST and exc.api_error_code() == FACEBOOK_BATCH_ERROR_CODE
+            unknown_error = exc.api_error_subcode() == FACEBOOK_UNKNOWN_ERROR_CODE
+            return any((exc.api_transient_error(), unknown_error, call_rate_limit_error, batch_timeout_error))
         return True
 
     return backoff.on_exception(
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py
new file mode 100644
index 0000000000000..64eb702432627
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py
@@ -0,0 +1,131 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+
+import base64
+import logging
+from typing import Any, Iterable, Iterator, List, Mapping, Optional
+
+import requests
+from airbyte_cdk.models import SyncMode
+from cached_property import cached_property
+
+from .base_insight_streams import AdsInsights
+from .base_streams import FBMarketingIncrementalStream, FBMarketingStream
+
+logger = logging.getLogger("airbyte")
+
+
+def fetch_thumbnail_data_url(url: str) -> Optional[str]:
+    """Request thumbnail image and return it embedded into the data-link"""
+    try:
+        response = requests.get(url)
+        if response.status_code == requests.status_codes.codes.OK:
+            _type = response.headers["content-type"]
+            data = base64.b64encode(response.content)
+            return f"data:{_type};base64,{data.decode('ascii')}"
+        else:
+            logger.warning(f"Got {repr(response)} while requesting thumbnail image.")
+    except requests.exceptions.RequestException as exc:
+        logger.warning(f"Got {str(exc)} while requesting thumbnail image.")
+    return None
+
+
+class AdCreatives(FBMarketingStream):
+    """AdCreative is append only stream
+    doc: https://developers.facebook.com/docs/marketing-api/reference/ad-creative
+    """
+
+    entity_prefix = "adcreative"
+    use_batch = True
+
+    def __init__(self, fetch_thumbnail_images: bool = False, **kwargs):
+        super().__init__(**kwargs)
+        self._fetch_thumbnail_images = fetch_thumbnail_images
+
+    @cached_property
+    def fields(self) -> List[str]:
+        """Remove "thumbnail_data_url" field because it is computed field and it's not a field that we can request from Facebook"""
+        return [f for f in super().fields if f != "thumbnail_data_url"]
+
+    def read_records(
+        self,
+        sync_mode: SyncMode,
+        cursor_field: List[str] = None,
+        stream_slice: Mapping[str, Any] = None,
+        stream_state: Mapping[str, Any] = None,
+    ) -> Iterable[Mapping[str, Any]]:
+        """Read with super method and append thumbnail_data_url if enabled"""
+        for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
+            if self._fetch_thumbnail_images:
+                record["thumbnail_data_url"] = fetch_thumbnail_data_url(record.get("thumbnail_url"))
+            yield record
+
+    def _read_records(self, params: Mapping[str, Any]) -> Iterator:
+        return self._api.account.get_ad_creatives(params=params)
+
+
+class Ads(FBMarketingIncrementalStream):
+    """doc: https://developers.facebook.com/docs/marketing-api/reference/adgroup"""
+
+    entity_prefix = "ad"
+    enable_deleted = True
+
+    def _read_records(self, params: Mapping[str, Any]):
+        return self._api.account.get_ads(params=params, fields=[self.cursor_field])
+
+
+class AdSets(FBMarketingIncrementalStream):
+    """doc: https://developers.facebook.com/docs/marketing-api/reference/ad-campaign"""
+
+    entity_prefix = "adset"
+    enable_deleted = True
+
+    def _read_records(self, params: Mapping[str, Any]):
+        return self._api.account.get_ad_sets(params=params)
+
+
+class Campaigns(FBMarketingIncrementalStream):
+    """doc: https://developers.facebook.com/docs/marketing-api/reference/ad-campaign-group"""
+
+    entity_prefix = "campaign"
+    enable_deleted = True
+
+    def _read_records(self, params: Mapping[str, Any]):
+        return self._api.account.get_campaigns(params=params)
+
+
+class Videos(FBMarketingIncrementalStream):
+    """See: https://developers.facebook.com/docs/marketing-api/reference/video"""
+
+    entity_prefix = "video"
+    enable_deleted = True
+
+    def _read_records(self, params: Mapping[str, Any]) -> Iterator:
+        return self._api.account.get_ad_videos(params=params)
+
+
+class AdsInsightsAgeAndGender(AdsInsights):
+    breakdowns = ["age", "gender"]
+
+
+class AdsInsightsCountry(AdsInsights):
+    breakdowns = ["country"]
+
+
+class AdsInsightsRegion(AdsInsights):
+    breakdowns = ["region"]
+
+
+class AdsInsightsDma(AdsInsights):
+    breakdowns = ["dma"]
+
+
+class AdsInsightsPlatformAndDevice(AdsInsights):
+    breakdowns = ["publisher_platform", "platform_position", "impression_device"]
+    action_breakdowns = ["action_type"]  # FB Async Job fails for unknown reason if we set other breakdowns
+
+
+class AdsInsightsActionType(AdsInsights):
+    breakdowns = []
+    action_breakdowns = ["action_type"]
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py
new file mode 100644
index 0000000000000..31945a5b92039
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py
@@ -0,0 +1,11 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+
+from pytest import fixture
+
+
+@fixture(autouse=True)
+def time_sleep_mock(mocker):
+    time_mock = mocker.patch("time.sleep")
+    yield time_mock
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py
index 6bab56862fa87..9d36b3a95f3a2 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py
@@ -3,54 +3,112 @@
 #
 
 import time
+from typing import Iterator
+from unittest.mock import call
 
-import pendulum
 import pytest
-from source_facebook_marketing.api import API
-from source_facebook_marketing.async_job import AsyncJob, Status
-from source_facebook_marketing.common import JobException, JobTimeoutException
+from facebook_business.adobjects.adreportrun import AdReportRun
+from facebook_business.api import FacebookAdsApiBatch
+from source_facebook_marketing.api import MyFacebookAdsApi
+from source_facebook_marketing.streams.async_job import InsightAsyncJob, ParentAsyncJob, Status, chunks
 
 
 @pytest.fixture(name="adreport")
-def adreport_fixture(mocker):
-    response = {"report_run_id": 123}
-    job = mocker.MagicMock()
-    job.__getitem__.side_effect = response.__getitem__
-    job.__setitem__.side_effect = response.__setitem__
-    job.__iter__.side_effect = response.__iter__
-    job.__contains__.side_effect = response.__contains__
-    return job
+def adreport_fixture(mocker, api):
+    ao = AdReportRun(fbid=123, api=api)
+    ao["report_run_id"] = 123
+    mocker.patch.object(AdReportRun, 'api_get', return_value=ao)
+    mocker.patch.object(AdReportRun, 'get_result', return_value={})
+    return ao
+
+
+@pytest.fixture(name="account")
+def account_fixture(mocker, adreport):
+    account = mocker.Mock()
+    account.get_insights.return_value = adreport
+    return account
 
 
 @pytest.fixture(name="job")
-def job_fixture(api, mocker):
-    return AsyncJob(api=api, params=mocker.MagicMock())
+def job_fixture(api, account, mocker):
+    params = {
+        "level": "ad",
+        "action_breakdowns": [],
+        "breakdowns": [],
+        "fields": ["field1", "field2"],
+        "time_increment": 1,
+        "action_attribution_windows": [],
+        "time_range": {
+            "since": "2019-01-01",
+            "until": "2019-01-01",
+        },
+    }
 
+    return InsightAsyncJob(edge_object=account, api=api, params=params)
 
-@pytest.fixture(name="failed_job")
-def failed_job_fixture(job, adreport):
-    adreport["async_status"] = Status.FAILED.value
+
+@pytest.fixture(name="grouped_jobs")
+def grouped_jobs_fixture(mocker):
+    return [mocker.Mock(spec=InsightAsyncJob, restart_number=0, failed=False, completed=False) for _ in range(10)]
+
+
+@pytest.fixture(name="parent_job")
+def parent_job_fixture(api, grouped_jobs):
+    return ParentAsyncJob(api=api, jobs=grouped_jobs)
+
+
+
+@pytest.fixture(name="started_job")
+def started_job_fixture(job, adreport):
+    adreport["async_status"] = Status.RUNNING.value
     adreport["async_percent_completion"] = 0
     job.start()
 
-    try:
-        _ = job.completed
-    except JobException:
-        pass
-
     return job
 
 
+@pytest.fixture(name="completed_job")
+def completed_job_fixture(started_job, adreport):
+    adreport["async_status"] = Status.COMPLETED.value
+    adreport["async_percent_completion"] = 100
+    started_job.update_job()
+
+    return started_job
+
+
+@pytest.fixture(name="failed_job")
+def failed_job_fixture(started_job, adreport):
+    adreport["async_status"] = Status.FAILED.value
+    adreport["async_percent_completion"] = 0
+    started_job.update_job()
+
+    return started_job
+
+
 @pytest.fixture(name="api")
-def api_fixture(mocker, adreport):
-    api = mocker.Mock(spec=API)
-    api.account.get_insights.return_value = adreport
-    adreport.api_get.return_value = adreport
+def api_fixture(mocker):
+    api = mocker.Mock(spec=MyFacebookAdsApi)
+    api.call().json.return_value = {}
+    api.new_batch().execute.return_value = None  # short-circuit batch execution of failed jobs, prevent endless loop
 
     return api
 
 
-class TestAsyncJob:
+class TestChunks:
+    def test_two_or_more(self):
+        result = chunks([1, 2, 3, 4, 5], 2)
+
+        assert isinstance(result, Iterator), "should be iterator/generator"
+        assert list(result) == [[1, 2], [3, 4], [5]]
+
+    def test_single(self):
+        result = chunks([1, 2, 3, 4, 5], 6)
+
+        assert isinstance(result, Iterator), "should be iterator/generator"
+        assert list(result) == [[1, 2, 3, 4, 5]]
+
+
+class TestInsightAsyncJob:
     def test_start(self, job):
         job.start()
 
@@ -64,9 +122,12 @@ def test_start_already_started(self, job):
             job.start()
 
     def test_restart(self, failed_job, api, adreport):
+        assert failed_job.restart_number == 0
+
         failed_job.restart()
 
         assert not failed_job.failed, "restart should reset fail flag"
+        assert failed_job.restart_number == 1
 
     def test_restart_when_job_not_failed(self, job, api):
         job.start()
@@ -79,16 +140,49 @@ def test_restart_when_job_not_started(self, job):
         with pytest.raises(RuntimeError, match=r": Incorrect usage of restart - only failed jobs can be restarted"):
             job.restart()
 
+    def test_update_job_not_started(self, job):
+        with pytest.raises(RuntimeError, match=r": Incorrect usage of the method - the job is not started"):
+            job.update_job()
+
+    def test_update_job_on_completed_job(self, completed_job, adreport):
+        completed_job.update_job()
+
+        adreport.api_get.assert_called_once()
+
+    def test_update_job(self, started_job, adreport):
+        started_job.update_job()
+
+        adreport.api_get.assert_called_once()
+
+    def test_update_job_with_batch(self, started_job, adreport, mocker):
+        response = mocker.Mock()
+
+        response.json.return_value = {'id': '1128003977936306', 'account_id': '212551616838260', 'time_ref': 1642989751, 'time_completed': 1642989754,
+         'async_status': 'Job Completed', 'async_percent_completion': 100, 'date_start': '2021-02-24', 'date_stop': '2021-02-24'}
+        response.body.return_value = "Some error"
+        batch_mock = mocker.Mock(spec=FacebookAdsApiBatch)
+
+        started_job.update_job(batch=batch_mock)
+
+        adreport.api_get.assert_called_once()
+        args, kwargs = adreport.api_get.call_args
+        assert kwargs["batch"] == batch_mock
+
+        kwargs["success"](response)
+        assert started_job.completed
+
+        kwargs["failure"](response)
+
     def test_elapsed_time(self, job, api, adreport):
         assert job.elapsed_time is None, "should be None for the job that is not started"
 
         job.start()
         adreport["async_status"] = Status.COMPLETED.value
         adreport["async_percent_completion"] = 0
+        job.update_job()
 
         assert job.elapsed_time, "should be set for the job that is running"
 
-        _ = job.completed
         elapsed_1 = job.elapsed_time
         time.sleep(1)
         elapsed_2 = job.elapsed_time
@@ -96,47 +190,21 @@ def test_elapsed_time(self, job, api, adreport):
         assert elapsed_2 == elapsed_1, "should not change after job completed"
 
     def test_completed_without_start(self, job, api, adreport):
-        with pytest.raises(RuntimeError, match=r"Incorrect usage of the method - the job is not started"):
-            _ = job.completed
-
+        assert not job.completed
         assert not job.failed
 
-    def test_completed_ok(self, job, api, adreport):
-        job.start()
-        adreport["async_status"] = Status.COMPLETED.value
-        adreport["async_percent_completion"] = 0
-
-        assert job.completed, "should return True if the job was completed"
-        assert not job.failed, "failed should be set to False"
+    def test_completed_ok(self, completed_job, api, adreport):
+        assert completed_job.completed, "should return True if the job was completed"
+        assert not completed_job.failed, "failed should be set to False"
 
     def test_completed_failed(self, failed_job, api, adreport):
-        with pytest.raises(JobException, match=r"failed after \d* seconds."):
-            _ = failed_job.completed
+        assert failed_job.completed
+        assert failed_job.failed
 
     def test_completed_skipped(self, failed_job, api, adreport):
         adreport["async_status"] = Status.SKIPPED.value
-        with pytest.raises(JobException, match=r"skipped after \d* seconds."):
-            _ = failed_job.completed
-
-    def test_completed_timeout(self, job, adreport, mocker):
-        job.start()
-        adreport["async_status"] = Status.STARTED.value
-        adreport["async_percent_completion"] = 1
-        mocker.patch.object(job, "MAX_WAIT_TO_FINISH", pendulum.duration())
-        mocker.patch.object(job, "MAX_WAIT_TO_START", pendulum.duration())
-
-        with pytest.raises(JobTimeoutException, match=r" did not finish after \d* seconds."):
-            _ = job.completed
-
-    def test_completed_timeout_not_started(self, job, adreport, mocker):
-        job.start()
-        adreport["async_status"] = Status.STARTED.value
-        adreport["async_percent_completion"] = 0
-        mocker.patch.object(job, "MAX_WAIT_TO_FINISH", pendulum.duration())
-        mocker.patch.object(job, "MAX_WAIT_TO_START", pendulum.duration())
-
-        with pytest.raises(JobTimeoutException, match=r" did not start after \d* seconds."):
-            _ = job.completed
+        assert failed_job.completed
+        assert failed_job.failed
 
     def test_failed_no(self, job):
         assert not job.failed, "should return False for active job"
@@ -144,10 +212,10 @@ def test_failed_no(self, job):
     def test_failed_yes(self, failed_job):
         assert failed_job.failed, "should return True if the job previously failed"
 
-    def test_str(self, api, adreport):
-        job = AsyncJob(api=api, params={"time_range": 123, "breakdowns": [10, 20]})
+    def test_str(self, api, account):
+        job = InsightAsyncJob(edge_object=account, api=api, params={"time_range": 123, "breakdowns": [10, 20]})
 
-        assert str(job) == "AdReportRun(id=<None>, time_range=123, breakdowns=[10, 20]"
+        assert str(job) == f"AdReportRun(id=<None>, {account}, time_range=123, breakdowns=[10, 20]"
 
     def test_get_result(self, job, adreport):
         job.start()
@@ -164,3 +232,86 @@ def test_get_result_when_job_is_not_started(self, job):
     def test_get_result_when_job_is_failed(self, failed_job):
         with pytest.raises(RuntimeError, match=r"Incorrect usage of get_result - the job is not started of failed"):
             failed_job.get_result()
+
+    def test_split_job(self, job, account, mocker, api):
+        account.get_insights.return_value = [{"campaign_id": 1}, {"campaign_id": 2}, {"campaign_id": 3}]
+        parent_job_mock = mocker.patch("source_facebook_marketing.streams.async_job.ParentAsyncJob")
+        campaign_mock = mocker.patch("source_facebook_marketing.streams.async_job.Campaign")
+
+        parent_job = job.split_job()
+
+        account.get_insights.assert_called_once()
+        campaign_mock.assert_has_calls([call(1), call(2), call(3)])
+        assert parent_job_mock.called
+        assert parent_job
+        args, kwargs = parent_job_mock.call_args
+        assert args == (api,)
+        assert len(kwargs["jobs"]) == 3, "number of jobs should match number of campaigns"
+
+class TestParentAsyncJob:
+    def test_start(self, parent_job, grouped_jobs):
+        parent_job.start()
+        for job in grouped_jobs:
+            job.start.assert_called_once()
+
+    def test_restart(self, parent_job, grouped_jobs):
+        assert not parent_job.failed, "initially not failed"
+
+        # fail some jobs
+        grouped_jobs[0].failed = True
+        grouped_jobs[0].restart_number = 1
+        grouped_jobs[5].failed = True
+        grouped_jobs[0].restart_number = 1
+        grouped_jobs[6].restart_number = 3
+
+        assert parent_job.failed, "should be failed if any job failed"
+        parent_job.restart()
+        assert parent_job.failed
+        assert parent_job.restart_number == 3, "restart should be max value of all jobs"
+
+    def test_completed(self, parent_job, grouped_jobs):
+        assert not parent_job.completed, "initially not completed"
+
+        # complete some jobs
+        grouped_jobs[0].completed = True
+        grouped_jobs[5].completed = True
+
+        assert not parent_job.completed, "not completed until all jobs completed"
+
+        # complete all jobs
+        for job in grouped_jobs:
+            job.completed = True
+
+        assert parent_job.completed, "completed because all jobs completed"
+
+    def test_update_job(self, parent_job, grouped_jobs, api):
+        """Checks jobs status in advance and restart if some failed."""
+        # finish some jobs
+        grouped_jobs[0].completed = True
+        grouped_jobs[5].completed = True
+
+        parent_job.update_job()
+
+        # assert
+        batch_mock = api.new_batch()
+        for i, job in enumerate(grouped_jobs):
+            if i in (0, 5):
+                job.update_job.assert_not_called()
+            else:
+                job.update_job.assert_called_once_with(batch=batch_mock)
+
+    def test_get_result(self, parent_job, grouped_jobs):
+        """Retrieve result of the finished job."""
+        for job in grouped_jobs:
+            job.get_result.return_value = []
+        grouped_jobs[0].get_result.return_value = range(3, 8)
+        grouped_jobs[6].get_result.return_value = range(4, 11)
+
+        generator = parent_job.get_result()
+
+        assert isinstance(generator, Iterator)
+        assert list(generator) == list(range(3, 8)) + list(range(4, 11))
+
+    def test_split_job(self, parent_job):
+        with pytest.raises(RuntimeError, match="Splitting of ParentAsyncJob is not allowed."):
+            parent_job.split_job()
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py
new file mode 100644
index 0000000000000..0beba538aead1
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py
@@ -0,0 +1,29 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+
+import pytest
+from source_facebook_marketing.streams.async_job_manager import InsightAsyncJobManager
+
+
+@pytest.fixture(name="api")
+def api_fixture(mocker):
+    api = mocker.Mock()
+    api.api.ads_insights_throttle = (0, 0)
+    return api
+
+
+class TestInsightAsyncManager:
+    def test_jobs_empty(self, api):
+        manager = InsightAsyncJobManager(api=api, jobs=[])
+        jobs = list(manager.completed_jobs())
+        assert not jobs
+
+    def test_job_restarted(self):
+        """TODO"""
+
+    def test_job_split(self):
+        """TODO"""
+
+    def test_job_failed_too_many_times(self):
+        """TODO"""
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py
index 046c5da95a748..f14d8ab2d598a 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py
@@ -28,7 +28,7 @@ def some_config_fixture(account_id):
 
 @pytest.fixture(autouse=True)
 def mock_default_sleep_interval(mocker):
-    mocker.patch("source_facebook_marketing.common.DEFAULT_SLEEP_INTERVAL", return_value=pendulum.duration(seconds=5))
+    mocker.patch("source_facebook_marketing.streams.common.DEFAULT_SLEEP_INTERVAL", return_value=pendulum.duration(seconds=5))
 
 
 @pytest.fixture(name="api")
@@ -42,7 +42,10 @@ def api_fixture(some_config, requests_mock, fb_account_response):
 @pytest.fixture(name="fb_call_rate_response")
 def fb_call_rate_response_fixture():
     error = {
-        "message": "(#80000) There have been too many calls from this ad-account. Wait a bit and try again. For more info, please refer to https://developers.facebook.com/docs/graph-api/overview/rate-limiting.",
+        "message": (
+            "(#80000) There have been too many calls from this ad-account. Wait a bit and try again. "
+            "For more info, please refer to https://developers.facebook.com/docs/graph-api/overview/rate-limiting."
+        ),
         "type": "OAuthException",
         "code": 80000,
         "error_subcode": 2446079,
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_deep_merge.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_deep_merge.py
index 1941e61848474..2089e0cac64d3 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_deep_merge.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_deep_merge.py
@@ -4,7 +4,7 @@
 
 from copy import deepcopy
 
-from source_facebook_marketing.common import deep_merge
+from source_facebook_marketing.streams.common import deep_merge
 
 
 def test_return_new_object():
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py
new file mode 100644
index 0000000000000..4da6c6cc673bc
--- /dev/null
+++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py
@@ -0,0 +1,64 @@
+#
+# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+#
+
+import pydantic
+import pytest
+from airbyte_cdk.models import ConnectorSpecification
+from source_facebook_marketing import SourceFacebookMarketing
+
+
+@pytest.fixture(name="config")
+def config_fixture():
+    config = {
+        "account_id": 123,
+        "access_token": "TOKEN",
+        "start_date": "2019-10-10T00:00:00"
+    }
+
+    return config
+
+
+@pytest.fixture(name="api")
+def api_fixture(mocker):
+    api_mock = mocker.patch("source_facebook_marketing.source.API")
+    api_mock.return_value = mocker.Mock(account=123)
+    return api_mock
+
+
+@pytest.fixture(name="logger_mock")
+def logger_mock_fixture(mocker):
+    return mocker.patch("source_facebook_marketing.source.logger")
+
+class TestSourceFacebookMarketing:
+    def test_check_connection_ok(self, api, config, logger_mock):
+        ok, error_msg = SourceFacebookMarketing().check_connection(logger_mock, config=config)
+
+        assert ok
+        assert not error_msg
+        api.assert_called_once_with(account_id="123", access_token="TOKEN")
+        logger_mock.info.assert_called_once_with(f"Select account {api.return_value.account}")
+
+    def test_check_connection_invalid_config(self, api, config, logger_mock):
+        config.pop("start_date")
+
+        with pytest.raises(pydantic.ValidationError):
+            SourceFacebookMarketing().check_connection(logger_mock, config=config)
+
+        assert not api.called
+
+    def test_check_connection_exception(self, api, config, logger_mock):
+        api.side_effect = RuntimeError("Something went wrong!")
+
+        with pytest.raises(RuntimeError, match="Something went wrong!"):
+            SourceFacebookMarketing().check_connection(logger_mock, config=config)
+
+    def test_streams(self, config, api):
+        streams = SourceFacebookMarketing().streams(config)
+
+        assert len(streams) == 12
+
+    def test_spec(self):
+        spec = SourceFacebookMarketing().spec()
+
+        assert isinstance(spec, ConnectorSpecification)
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py
index a43e7493aa267..e69de29bb2d1d 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py
@@ -1,47 +0,0 @@
-#
-# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
-#
-
-from source_facebook_marketing.streams import remove_params_from_url
-
-
-class TestUrlParsing:
-    def test_empty_url(self):
-        url = ""
-        parsed_url = remove_params_from_url(url=url, params=[])
-        assert parsed_url == url
-
-    def test_does_not_raise_exception_for_invalid_url(self):
-        url = "abcd"
-        parsed_url = remove_params_from_url(url=url, params=["test"])
-        assert parsed_url == url
-
-    def test_escaped_characters(self):
-        url = "https://google.com?test=123%23%24%25%2A&test2=456"
-        parsed_url = remove_params_from_url(url=url, params=["test3"])
-        assert parsed_url == url
-
-    def test_no_params_url(self):
-        url = "https://google.com"
-        parsed_url = remove_params_from_url(url=url, params=["test"])
-        assert parsed_url == url
-
-    def test_no_params_arg(self):
-        url = "https://google.com?"
-        parsed_url = remove_params_from_url(url=url, params=["test"])
-        assert parsed_url == "https://google.com"
-
-    def test_partially_empty_params(self):
-        url = "https://google.com?test=122&&"
-        parsed_url = remove_params_from_url(url=url, params=[])
-        assert parsed_url == "https://google.com?test=122"
-
-    def test_no_matching_params(self):
-        url = "https://google.com?test=123"
-        parsed_url = remove_params_from_url(url=url, params=["test2"])
-        assert parsed_url == url
-
-    def test_removes_params(self):
-        url = "https://google.com?test=123&test2=456"
-        parsed_url = remove_params_from_url(url=url, params=["test2"])
-        assert parsed_url == "https://google.com?test=123"
diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md
index 88d3a98f16f9f..513d57ef76f0e 100644
--- a/docs/integrations/sources/facebook-marketing.md
+++ b/docs/integrations/sources/facebook-marketing.md
@@ -98,7 +98,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in
 | :--- | :--- | :--- | :--- |
 | 0.2.31  | 2021-12-28 | [](https://github.com/airbytehq/airbyte/pull/) | Fixed videos stream format field incorrect type |
 | 0.2.30  | 2021-12-20 | [8962](https://github.com/airbytehq/airbyte/pull/8962) | Added `asset_feed_spec` field to `ad creatives` stream |
-| 0.2.29  | 2021-12-17 | [8649](https://github.com/airbytehq/airbyte/pull/8649) | Retrive ad_creatives image as data encoded |
+| 0.2.29  | 2021-12-17 | [8649](https://github.com/airbytehq/airbyte/pull/8649) | Retrieve ad_creatives image as data encoded |
 | 0.2.28  | 2021-12-13 | [8742](https://github.com/airbytehq/airbyte/pull/8742) | Fix for schema generation related to "breakdown" fields |
 | 0.2.27  | 2021-11-29 | [8257](https://github.com/airbytehq/airbyte/pull/8257) | Add fields to Campaign stream |
 | 0.2.26  | 2021-11-19 | [7855](https://github.com/airbytehq/airbyte/pull/7855) | Add Video stream |
@@ -107,7 +107,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in
 | 0.2.23  | 2021-11-08 | [7734](https://github.com/airbytehq/airbyte/pull/7734) | Resolve $ref field for discover schema |
 | 0.2.22  | 2021-11-05 | [7605](https://github.com/airbytehq/airbyte/pull/7605) | Add job retry logics to AdsInsights stream |
 | 0.2.21  | 2021-10-05 | [4864](https://github.com/airbytehq/airbyte/pull/4864) | Update insights streams with custom entries for fields, breakdowns and action_breakdowns |
-| 0.2.20 | 2021-10-04 | [6719](https://github.com/airbytehq/airbyte/pull/6719) | Update version of facebook\_bussiness package to 12.0 |
+| 0.2.20 | 2021-10-04 | [6719](https://github.com/airbytehq/airbyte/pull/6719) | Update version of facebook\_business package to 12.0 |
 | 0.2.19 | 2021-09-30 | [6438](https://github.com/airbytehq/airbyte/pull/6438) | Annotate Oauth2 flow initialization parameters in connector specification |
 | 0.2.18 | 2021-09-28 | [6499](https://github.com/airbytehq/airbyte/pull/6499) | Fix field values converting fail |
 | 0.2.17 | 2021-09-14 | [4978](https://github.com/airbytehq/airbyte/pull/4978) | Convert values' types according to schema types |
@@ -115,7 +115,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in
 | 0.2.15 | 2021-09-14 | [5958](https://github.com/airbytehq/airbyte/pull/5958) | Fix url parsing and add report that exposes conversions |
 | 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management |
 | 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK: - Improve error handling. - Improve async job performance \(insights\). - Add new configuration parameter `insights_days_per_job`. - Rename stream `adsets` to `ad_sets`. - Refactor schema logic for insights, allowing to configure any possible insight stream. |
-| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook\_bussiness to 11.0 |
+| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook\_business to 11.0 |
 | 0.2.9 | 2021-06-10 | [3996](https://github.com/airbytehq/airbyte/pull/3996) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |
 | 0.2.8 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add 80000 as a rate-limiting error code |
 | 0.2.7 | 2021-06-03 | [3646](https://github.com/airbytehq/airbyte/pull/3646) | Add missing fields to AdInsights streams |