Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ stripe_analytics = [
"types-stripe>=3.5.2.14,<4",
]
asana_dlt = ["asana>=3.2.1,<4"]
facebook_ads = ["facebook-business>=17.0.2,<18"]
facebook_ads = ["facebook-business>=23.0.0,<24"]
google_ads = [
"google-ads>=21.1.0,<22",
"google-api-python-client>=2.129.0,<3",
Expand Down
30 changes: 20 additions & 10 deletions sources/facebook_ads/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Loads campaigns, ads sets, ads, leads and insight data from Facebook Marketing API"""

from typing import Iterator, Sequence
from typing import Iterator, Sequence, Union, cast

from facebook_business import FacebookAdsApi
from facebook_business.adobjects.adreportrun import AdReportRun
from facebook_business.api import FacebookResponse

import dlt
Expand Down Expand Up @@ -56,7 +57,7 @@ def facebook_ads_source(
access_token: str = dlt.secrets.value,
chunk_size: int = 50,
request_timeout: float = 300.0,
app_api_version: str = None,
app_api_version: Union[str, None] = None,
) -> Sequence[DltResource]:
"""Returns a list of resources to load campaigns, ad sets, ads, creatives and ad leads data from Facebook Marketing API.

Expand All @@ -83,35 +84,39 @@ def facebook_ads_source(

@dlt.resource(primary_key="id", write_disposition="replace")
def campaigns(
fields: Sequence[str] = DEFAULT_CAMPAIGN_FIELDS, states: Sequence[str] = None
fields: Sequence[str] = DEFAULT_CAMPAIGN_FIELDS,
states: Union[Sequence[str], None] = None,
) -> Iterator[TDataItems]:
yield get_data_chunked(account.get_campaigns, fields, states, chunk_size)

@dlt.resource(primary_key="id", write_disposition="replace")
def ads(
fields: Sequence[str] = DEFAULT_AD_FIELDS, states: Sequence[str] = None
fields: Sequence[str] = DEFAULT_AD_FIELDS,
states: Union[Sequence[str], None] = None,
) -> Iterator[TDataItems]:
yield get_data_chunked(account.get_ads, fields, states, chunk_size)

@dlt.resource(primary_key="id", write_disposition="replace")
def ad_sets(
fields: Sequence[str] = DEFAULT_ADSET_FIELDS, states: Sequence[str] = None
fields: Sequence[str] = DEFAULT_ADSET_FIELDS,
states: Union[Sequence[str], None] = None,
) -> Iterator[TDataItems]:
yield get_data_chunked(account.get_ad_sets, fields, states, chunk_size)

@dlt.transformer(primary_key="id", write_disposition="replace", selected=True)
def leads(
items: TDataItems,
fields: Sequence[str] = DEFAULT_LEAD_FIELDS,
states: Sequence[str] = None,
states: Union[Sequence[str], None] = None,
) -> Iterator[TDataItems]:
for item in items:
ad = Ad(item["id"])
yield get_data_chunked(ad.get_leads, fields, states, chunk_size)

@dlt.resource(primary_key="id", write_disposition="replace")
def ad_creatives(
fields: Sequence[str] = DEFAULT_ADCREATIVE_FIELDS, states: Sequence[str] = None
fields: Sequence[str] = DEFAULT_ADCREATIVE_FIELDS,
states: Union[Sequence[str], None] = None,
) -> Iterator[TDataItems]:
yield get_data_chunked(account.get_ad_creatives, fields, states, chunk_size)

Expand All @@ -132,7 +137,7 @@ def facebook_insights_source(
action_attribution_windows: Sequence[str] = ALL_ACTION_ATTRIBUTION_WINDOWS,
batch_size: int = 50,
request_timeout: int = 300,
app_api_version: str = None,
app_api_version: Union[str, None] = None,
) -> DltResource:
"""Incrementally loads insight reports with defined granularity level, fields, breakdowns etc.

Expand Down Expand Up @@ -177,7 +182,7 @@ def facebook_insights_source(
def facebook_insights(
date_start: dlt.sources.incremental[str] = dlt.sources.incremental(
"date_start", initial_value=initial_load_start_date_str
)
),
) -> Iterator[TDataItems]:
start_date = get_start_date(date_start, attribution_window_days_lag)
end_date = pendulum.now()
Expand Down Expand Up @@ -207,7 +212,12 @@ def facebook_insights(
}
],
}
job = execute_job(account.get_insights(params=query, is_async=True))
job = execute_job(
cast(
AdReportRun,
account.get_insights(params=query, is_async=True),
)
)
yield list(map(process_report_item, job.get_result()))
start_date = start_date.add(days=time_increment_days)

Expand Down
84 changes: 58 additions & 26 deletions sources/facebook_ads/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import functools
import itertools
import time
from typing import Any, Iterator, Sequence
from typing import Any, Iterator, Protocol, Sequence, Type, Union, cast

import dlt
import humanize
Expand All @@ -13,21 +13,23 @@
from dlt.common.configuration.inject import with_config
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import DictStrAny, TDataItem, TDataItems
from dlt.extract.items_transform import ItemTransformFunctionWithMeta
from dlt.sources.helpers import requests
from dlt.sources.helpers.requests import Client

from facebook_business import FacebookAdsApi
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.adreportrun import AdReportRun
from facebook_business.adobjects.user import User
from facebook_business.api import FacebookResponse
from facebook_business.api import Cursor, FacebookRequest, FacebookResponse

from .exceptions import InsightsJobTimeout
from .settings import (
FACEBOOK_INSIGHTS_RETENTION_PERIOD,
INSIGHTS_PRIMARY_KEY,
TFbMethod,
)
from .utils import AbstractCrudObject, AbstractObject
from .utils import AbstractObject


def get_start_date(
Expand Down Expand Up @@ -62,7 +64,7 @@ def get_start_date(


def process_report_item(item: AbstractObject) -> DictStrAny:
d: DictStrAny = item.export_all_data()
d: DictStrAny = cast(DictStrAny, item.export_all_data())
for pki in INSIGHTS_PRIMARY_KEY:
if pki not in d:
d[pki] = "no_" + pki
Expand All @@ -71,23 +73,41 @@ def process_report_item(item: AbstractObject) -> DictStrAny:


def get_data_chunked(
method: TFbMethod, fields: Sequence[str], states: Sequence[str], chunk_size: int
method: TFbMethod,
fields: Sequence[str],
states: Union[Sequence[str], None],
chunk_size: int,
) -> Iterator[TDataItems]:
# add pagination and chunk into lists
params: DictStrAny = {"limit": chunk_size}
if states:
params.update({"effective_status": states})
it: map[DictStrAny] = map(
lambda c: c.export_all_data(), method(fields=fields, params=params)
)
result = cast(Iterator[AbstractObject], method(fields=fields, params=params))
it: map[DictStrAny] = map(lambda c: cast(DictStrAny, c.export_all_data()), result)
while True:
chunk = list(itertools.islice(it, chunk_size))
if not chunk:
break
yield chunk


def enrich_ad_objects(fb_obj_type: AbstractObject, fields: Sequence[str]) -> Any:
class AbstractCrudObjectWithApiGet(Protocol):
def __init__(self, fbid=None, parent_id=None, api=None): ...

def api_get(
self,
fields=None,
params=None,
batch=None,
success=None,
failure=None,
pending=False,
) -> Any: ...


def enrich_ad_objects(
fb_obj_type: Type[AbstractCrudObjectWithApiGet], fields: Sequence[str]
) -> ItemTransformFunctionWithMeta[TDataItems]:
"""Returns a transformation that will enrich any of the resources returned by `` with additional fields

In example below we add "thumbnail_url" to all objects loaded by `ad_creatives` resource:
Expand All @@ -105,16 +125,23 @@ def enrich_ad_objects(fb_obj_type: AbstractObject, fields: Sequence[str]) -> Any
"""

def _wrap(items: TDataItems, meta: Any = None) -> TDataItems:
api_batch = FacebookAdsApi.get_default_api().new_batch()
default_api = FacebookAdsApi.get_default_api()
if not default_api:
raise RuntimeError("FacebookAdsApi not initialized")

api_batch = default_api.new_batch()

def update_item(resp: FacebookResponse, item: TDataItem) -> None:
item.update(resp.json())

def fail(resp: FacebookResponse) -> None:
raise resp.error()
error = resp.error()
if error:
raise error
raise RuntimeError("Unknown error")

for item in items:
o: AbstractCrudObject = fb_obj_type(item["id"])
o = fb_obj_type(item["id"])
o.api_get(
fields=fields,
batch=api_batch,
Expand All @@ -132,17 +159,17 @@ def fail(resp: FacebookResponse) -> None:


def execute_job(
job: AbstractCrudObject,
job: AdReportRun,
insights_max_wait_to_start_seconds: int = 5 * 60,
insights_max_wait_to_finish_seconds: int = 30 * 60,
insights_max_async_sleep_seconds: int = 5 * 60,
) -> AbstractCrudObject:
status: str = None
) -> AdReportRun:
status: Union[str, None] = None
time_start = time.time()
sleep_time = 10
while status != "Job Completed":
duration = time.time() - time_start
job = job.api_get()
job = cast(AdReportRun, job.api_get())
status = job["async_status"]
percent_complete = job["async_percent_completion"]

Expand Down Expand Up @@ -181,11 +208,18 @@ def execute_job(


def get_ads_account(
account_id: str, access_token: str, request_timeout: float, app_api_version: str
account_id: str,
access_token: str,
request_timeout: float,
app_api_version: Union[str, None],
) -> AdAccount:
notify_on_token_expiration()

def retry_on_limit(response: requests.Response, exception: BaseException) -> bool:
def retry_on_limit(
response: Union[requests.Response, None], exception: Union[BaseException, None]
) -> bool:
if not response:
return False
try:
error = response.json()["error"]
code = error["code"]
Expand Down Expand Up @@ -229,20 +263,18 @@ def retry_on_limit(response: requests.Response, exception: BaseException) -> boo
API._session.requests = retry_session
user = User(fbid="me")

accounts = user.get_ad_accounts()
account: AdAccount = None
accounts = cast(Cursor, user.get_ad_accounts())
for acc in accounts:
if acc["account_id"] == account_id:
account = acc

if not account:
raise ValueError("Couldn't find account with id {}".format(account_id))
return acc

return account
raise ValueError("Couldn't find account with id {}".format(account_id))


@with_config(sections=("sources", "facebook_ads"))
def notify_on_token_expiration(access_token_expires_at: int = None) -> None:
def notify_on_token_expiration(
access_token_expires_at: Union[int, None] = None,
) -> None:
"""Notifies (currently via logger) if access token expires in less than 7 days. Needs `access_token_expires_at` to be configured."""
if not access_token_expires_at:
logger.warning(
Expand Down
4 changes: 2 additions & 2 deletions sources/facebook_ads/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
dlt>=0.5.1
facebook-business>=16.0.2
dlt>=1.0.0
facebook-business>=23.0.0
5 changes: 3 additions & 2 deletions sources/facebook_ads/settings.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Facebook ads source settings and constants"""

from typing import Any, Callable, Dict, Iterator, Literal
from typing import Any, Callable, Dict, Iterator, Literal, Union
from dlt.common.schema.typing import TTableSchemaColumns
from facebook_business.adobjects.abstractobject import AbstractObject
from facebook_business.api import FacebookRequest

TFbMethod = Callable[..., Iterator[AbstractObject]]
TFbMethod = Callable[..., Union[FacebookRequest, Iterator[AbstractObject], Any]]


DEFAULT_FIELDS = (
Expand Down
Loading