Skip to content

feat: add periodic status logging and status_message_callback parameter for customization #1265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 2, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def session_init(context: HttpCrawlingContext) -> None:
if context.session:
context.log.info(f'Init session {context.session.id}')
next_request = Request.from_url(
'https://placeholder.dev', session_id=context.session.id
'https://a.placeholder.com', session_id=context.session.id
)
next_requests.append(next_request)

Expand Down
35 changes: 22 additions & 13 deletions src/crawlee/_log_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
import logging
import sys
import textwrap
from typing import Any
from typing import TYPE_CHECKING, Any

from colorama import Fore, Style, just_fix_windows_console
from typing_extensions import assert_never

from crawlee import service_locator

if TYPE_CHECKING:
from crawlee._types import LogLevel

just_fix_windows_console()


_LOG_NAME_COLOR = Fore.LIGHTBLACK_EX

_LOG_LEVEL_COLOR = {
Expand All @@ -34,22 +38,27 @@
_LOG_MESSAGE_INDENT = ' ' * 6


def string_to_log_level(level: LogLevel) -> int:
"""Convert a string representation of a log level to an integer log level."""
if level == 'DEBUG':
return logging.DEBUG
if level == 'INFO':
return logging.INFO
if level == 'WARNING':
return logging.WARNING
if level == 'ERROR':
return logging.ERROR
if level == 'CRITICAL':
return logging.CRITICAL

assert_never(level)


def get_configured_log_level() -> int:
config = service_locator.get_configuration()

if 'log_level' in config.model_fields_set:
if config.log_level == 'DEBUG':
return logging.DEBUG
if config.log_level == 'INFO':
return logging.INFO
if config.log_level == 'WARNING':
return logging.WARNING
if config.log_level == 'ERROR':
return logging.ERROR
if config.log_level == 'CRITICAL':
return logging.CRITICAL

assert_never(config.log_level)
return string_to_log_level(config.log_level)

if sys.flags.dev_mode:
return logging.DEBUG
Expand Down
2 changes: 2 additions & 0 deletions src/crawlee/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

SkippedReason = Literal['robots_txt']

LogLevel = Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']


def _normalize_headers(headers: Mapping[str, str]) -> dict[str, str]:
"""Convert all header keys to lowercase, strips whitespace, and returns them sorted by key."""
Expand Down
5 changes: 3 additions & 2 deletions src/crawlee/configuration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Annotated, Literal
from typing import TYPE_CHECKING, Annotated

from pydantic import AliasChoices, BeforeValidator, Field
from pydantic_settings import BaseSettings, SettingsConfigDict

from crawlee._types import LogLevel
from crawlee._utils.docs import docs_group
from crawlee._utils.models import timedelta_ms

Expand Down Expand Up @@ -62,7 +63,7 @@ class Configuration(BaseSettings):
https://playwright.dev/docs/api/class-browsertype#browser-type-launch."""

log_level: Annotated[
Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
LogLevel,
Field(
validation_alias=AliasChoices(
'apify_log_level',
Expand Down
82 changes: 79 additions & 3 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@

from crawlee import EnqueueStrategy, Glob, RequestTransformAction, service_locator
from crawlee._autoscaling import AutoscaledPool, Snapshotter, SystemStatus
from crawlee._log_config import configure_logger, get_configured_log_level
from crawlee._log_config import configure_logger, get_configured_log_level, string_to_log_level
from crawlee._request import Request, RequestOptions, RequestState
from crawlee._types import (
BasicCrawlingContext,
EnqueueLinksKwargs,
GetKeyValueStoreFromRequestHandlerFunction,
HttpHeaders,
HttpPayload,
LogLevel,
RequestHandlerRunResult,
SendRequestFunction,
SkippedReason,
)
from crawlee._utils.docs import docs_group
from crawlee._utils.file import export_csv_to_stream, export_json_to_stream
from crawlee._utils.recurring_task import RecurringTask
from crawlee._utils.robots import RobotsTxtFile
from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute
from crawlee._utils.wait import wait_for
Expand All @@ -53,6 +55,7 @@
SessionError,
UserDefinedErrorHandlerError,
)
from crawlee.events._types import Event, EventCrawlerStatusData
from crawlee.http_clients import HttpxHttpClient
from crawlee.router import Router
from crawlee.sessions import SessionPool
Expand Down Expand Up @@ -191,6 +194,15 @@ class _BasicCrawlerOptions(TypedDict):
"""If set to `True`, the crawler will automatically try to fetch the robots.txt file for each domain,
and skip those that are not allowed. This also prevents disallowed URLs to be added via `EnqueueLinksFunction`."""

status_message_logging_interval: NotRequired[timedelta]
"""Interval for logging the crawler status messages."""

status_message_callback: NotRequired[
Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]]
]
"""Allows overriding the default status message. The default status message is provided in the parameters.
Returning `None` suppresses the status message."""


class _BasicCrawlerOptionsGeneric(Generic[TCrawlingContext, TStatisticsState], TypedDict):
"""Generic options the `BasicCrawler` constructor."""
Expand Down Expand Up @@ -273,6 +285,9 @@ def __init__(
configure_logging: bool = True,
statistics_log_format: Literal['table', 'inline'] = 'table',
respect_robots_txt_file: bool = False,
status_message_logging_interval: timedelta = timedelta(seconds=10),
status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]]
| None = None,
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
_additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None,
_logger: logging.Logger | None = None,
Expand All @@ -291,7 +306,6 @@ def __init__(
max_request_retries: Specifies the maximum number of retries allowed for a request if its processing fails.
This includes retries due to navigation errors or errors thrown from user-supplied functions
(`request_handler`, `pre_navigation_hooks` etc.).

This limit does not apply to retries triggered by session rotation (see `max_session_rotations`).
max_requests_per_crawl: Maximum number of pages to open during a crawl. The crawl stops upon reaching
this limit. Setting this value can help avoid infinite loops in misconfigured crawlers. `None` means
Expand All @@ -300,7 +314,6 @@ def __init__(
`max_requests_per_crawl` is achieved.
max_session_rotations: Maximum number of session rotations per request. The crawler rotates the session
if a proxy error occurs or if the website blocks the request.

The session rotations are not counted towards the `max_request_retries` limit.
max_crawl_depth: Specifies the maximum crawl depth. If set, the crawler will stop processing links beyond
this depth. The crawl depth starts at 0 for initial requests and increases with each subsequent level
Expand All @@ -324,6 +337,9 @@ def __init__(
respect_robots_txt_file: If set to `True`, the crawler will automatically try to fetch the robots.txt file
for each domain, and skip those that are not allowed. This also prevents disallowed URLs to be added
via `EnqueueLinksFunction`
status_message_logging_interval: Interval for logging the crawler status messages.
status_message_callback: Allows overriding the default status message. The default status message is
provided in the parameters. Returning `None` suppresses the status message.
_context_pipeline: Enables extending the request lifecycle and modifying the crawling context.
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
_additional_context_managers: Additional context managers used throughout the crawler lifecycle.
Expand Down Expand Up @@ -368,6 +384,9 @@ def __init__(
self._on_skipped_request: SkippedRequestCallback | None = None
self._abort_on_error = abort_on_error

# Crawler callbacks
self._status_message_callback = status_message_callback

# Context of each request with matching result of request handler.
# Inheritors can use this to override the result of individual request handler runs in `_run_request_handler`.
self._context_result_map = WeakKeyDictionary[BasicCrawlingContext, RequestHandlerRunResult]()
Expand Down Expand Up @@ -428,6 +447,10 @@ def __init__(
is_task_ready_function=self.__is_task_ready_function,
run_task_function=self.__run_task_function,
)
self._crawler_state_rec_task = RecurringTask(
func=self._crawler_state_task, delay=status_message_logging_interval
)
self._previous_crawler_state: TStatisticsState | None = None

# State flags
self._keep_alive = keep_alive
Expand Down Expand Up @@ -632,6 +655,7 @@ def sigint_handler() -> None:
except CancelledError:
pass
finally:
await self._crawler_state_rec_task.stop()
if threading.current_thread() is threading.main_thread():
with suppress(NotImplementedError):
asyncio.get_running_loop().remove_signal_handler(signal.SIGINT)
Expand Down Expand Up @@ -663,6 +687,8 @@ def sigint_handler() -> None:
async def _run_crawler(self) -> None:
event_manager = service_locator.get_event_manager()

self._crawler_state_rec_task.start()

# Collect the context managers to be entered. Context managers that are already active are excluded,
# as they were likely entered by the caller, who will also be responsible for exiting them.
contexts_to_enter = [
Expand Down Expand Up @@ -1481,3 +1507,53 @@ async def _find_txt_file_for_url(self, url: str) -> RobotsTxtFile:
url: The URL whose domain will be used to locate and fetch the corresponding robots.txt file.
"""
return await RobotsTxtFile.find(url, self._http_client)

def _log_status_message(self, message: str, level: LogLevel = 'DEBUG') -> None:
"""Log a status message for the crawler.

Args:
message: The status message to log.
level: The logging level for the message.
"""
log_level = string_to_log_level(level)
self.log.log(log_level, message)

async def _crawler_state_task(self) -> None:
"""Emit a persist state event with the given migration status."""
event_manager = service_locator.get_event_manager()

current_state = self.statistics.state

if (
failed_requests := (
current_state.requests_failed - (self._previous_crawler_state or current_state).requests_failed
)
> 0
):
message = f'Experiencing problems, {failed_requests} failed requests since last status update.'
else:
request_manager = await self.get_request_manager()
total_count = await request_manager.get_total_count()
if total_count is not None and total_count > 0:
pages_info = f'{self._statistics.state.requests_finished}/{total_count}'
else:
pages_info = str(self._statistics.state.requests_finished)

message = (
f'Crawled {pages_info} pages, {self._statistics.state.requests_failed} failed requests, '
f'desired concurrency {self._autoscaled_pool.desired_concurrency}.'
)

if self._status_message_callback:
new_message = await self._status_message_callback(current_state, self._previous_crawler_state, message)
if new_message:
message = new_message
self._log_status_message(message, level='INFO')
else:
self._log_status_message(message, level='INFO')

event_manager.emit(
event=Event.CRAWLER_STATUS, event_data=EventCrawlerStatusData(message=message, crawler_id=id(self))
)

self._previous_crawler_state = current_state
2 changes: 2 additions & 0 deletions src/crawlee/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ._types import (
Event,
EventAbortingData,
EventCrawlerStatusData,
EventData,
EventExitData,
EventListener,
Expand All @@ -14,6 +15,7 @@
__all__ = [
'Event',
'EventAbortingData',
'EventCrawlerStatusData',
'EventData',
'EventExitData',
'EventListener',
Expand Down
5 changes: 5 additions & 0 deletions src/crawlee/events/_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from crawlee.events._types import (
Event,
EventAbortingData,
EventCrawlerStatusData,
EventExitData,
EventListener,
EventMigratingData,
Expand Down Expand Up @@ -147,6 +148,8 @@ def on(self, *, event: Literal[Event.ABORTING], listener: EventListener[EventAbo
@overload
def on(self, *, event: Literal[Event.EXIT], listener: EventListener[EventExitData]) -> None: ...
@overload
def on(self, *, event: Literal[Event.CRAWLER_STATUS], listener: EventListener[EventCrawlerStatusData]) -> None: ...
@overload
def on(self, *, event: Event, listener: EventListener[None]) -> None: ...

def on(self, *, event: Event, listener: EventListener[Any]) -> None:
Expand Down Expand Up @@ -222,6 +225,8 @@ def emit(self, *, event: Literal[Event.ABORTING], event_data: EventAbortingData)
@overload
def emit(self, *, event: Literal[Event.EXIT], event_data: EventExitData) -> None: ...
@overload
def emit(self, *, event: Literal[Event.CRAWLER_STATUS], event_data: EventCrawlerStatusData) -> None: ...
@overload
def emit(self, *, event: Event, event_data: Any) -> None: ...

@ensure_context
Expand Down
25 changes: 24 additions & 1 deletion src/crawlee/events/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class Event(str, Enum):
PAGE_CREATED = 'pageCreated'
PAGE_CLOSED = 'pageClosed'

# State events
CRAWLER_STATUS = 'crawlerStatus'


@docs_group('Event payloads')
class EventPersistStateData(BaseModel):
Expand Down Expand Up @@ -79,7 +82,27 @@ class EventExitData(BaseModel):
model_config = ConfigDict(populate_by_name=True)


EventData = EventPersistStateData | EventSystemInfoData | EventMigratingData | EventAbortingData | EventExitData
@docs_group('Event payloads')
class EventCrawlerStatusData(BaseModel):
"""Data for the crawler status event."""

model_config = ConfigDict(populate_by_name=True)

message: str
"""A message describing the current status of the crawler."""

crawler_id: int
"""The ID of the crawler that emitted the event."""


EventData = (
EventPersistStateData
| EventSystemInfoData
| EventMigratingData
| EventAbortingData
| EventExitData
| EventCrawlerStatusData
)
"""A helper type for all possible event payloads"""

WrappedListener = Callable[..., Coroutine[Any, Any, None]]
Expand Down
Loading
Loading