Skip to content
Merged
35 changes: 22 additions & 13 deletions src/crawlee/_log_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
import logging
import sys
import textwrap
from typing import Any
from typing import TYPE_CHECKING, Any

from colorama import Fore, Style, just_fix_windows_console
from typing_extensions import assert_never

from crawlee import service_locator

if TYPE_CHECKING:
from crawlee._types import LogLevel

just_fix_windows_console()


_LOG_NAME_COLOR = Fore.LIGHTBLACK_EX

_LOG_LEVEL_COLOR = {
Expand All @@ -34,22 +38,27 @@
_LOG_MESSAGE_INDENT = ' ' * 6


def string_to_log_level(level: LogLevel) -> int:
"""Convert a string representation of a log level to an integer log level."""
if level == 'DEBUG':
return logging.DEBUG
if level == 'INFO':
return logging.INFO
if level == 'WARNING':
return logging.WARNING
if level == 'ERROR':
return logging.ERROR
if level == 'CRITICAL':
return logging.CRITICAL

assert_never(level)


def get_configured_log_level() -> int:
config = service_locator.get_configuration()

if 'log_level' in config.model_fields_set:
if config.log_level == 'DEBUG':
return logging.DEBUG
if config.log_level == 'INFO':
return logging.INFO
if config.log_level == 'WARNING':
return logging.WARNING
if config.log_level == 'ERROR':
return logging.ERROR
if config.log_level == 'CRITICAL':
return logging.CRITICAL

assert_never(config.log_level)
return string_to_log_level(config.log_level)

if sys.flags.dev_mode:
return logging.DEBUG
Expand Down
2 changes: 2 additions & 0 deletions src/crawlee/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@

SkippedReason: TypeAlias = Literal['robots_txt']

LogLevel: TypeAlias = Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']


def _normalize_headers(headers: Mapping[str, str]) -> dict[str, str]:
"""Convert all header keys to lowercase, strips whitespace, and returns them sorted by key."""
Expand Down
5 changes: 3 additions & 2 deletions src/crawlee/configuration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Annotated, Literal
from typing import TYPE_CHECKING, Annotated

from pydantic import AliasChoices, BeforeValidator, Field
from pydantic_settings import BaseSettings, SettingsConfigDict

from crawlee._types import LogLevel
from crawlee._utils.docs import docs_group
from crawlee._utils.models import timedelta_ms

Expand Down Expand Up @@ -62,7 +63,7 @@ class Configuration(BaseSettings):
https://playwright.dev/docs/api/class-browsertype#browser-type-launch."""

log_level: Annotated[
Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
LogLevel,
Field(
validation_alias=AliasChoices(
'apify_log_level',
Expand Down
82 changes: 79 additions & 3 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@

from crawlee import EnqueueStrategy, Glob, RequestTransformAction, service_locator
from crawlee._autoscaling import AutoscaledPool, Snapshotter, SystemStatus
from crawlee._log_config import configure_logger, get_configured_log_level
from crawlee._log_config import configure_logger, get_configured_log_level, string_to_log_level
from crawlee._request import Request, RequestOptions, RequestState
from crawlee._types import (
BasicCrawlingContext,
EnqueueLinksKwargs,
GetKeyValueStoreFromRequestHandlerFunction,
HttpHeaders,
HttpPayload,
LogLevel,
RequestHandlerRunResult,
SendRequestFunction,
SkippedReason,
)
from crawlee._utils.docs import docs_group
from crawlee._utils.recurring_task import RecurringTask
from crawlee._utils.robots import RobotsTxtFile
from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute
from crawlee._utils.wait import wait_for
Expand All @@ -52,6 +54,7 @@
SessionError,
UserDefinedErrorHandlerError,
)
from crawlee.events._types import Event, EventCrawlerStatusData
from crawlee.http_clients import HttpxHttpClient
from crawlee.router import Router
from crawlee.sessions import SessionPool
Expand Down Expand Up @@ -189,6 +192,15 @@ class _BasicCrawlerOptions(TypedDict):
"""If set to `True`, the crawler will automatically try to fetch the robots.txt file for each domain,
and skip those that are not allowed. This also prevents disallowed URLs to be added via `EnqueueLinksFunction`."""

status_message_logging_interval: NotRequired[timedelta]
"""Interval for logging the crawler status messages."""

status_message_callback: NotRequired[
Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]]
]
"""Allows overriding the default status message. The default status message is provided in the parameters.
Returning `None` suppresses the status message."""


class _BasicCrawlerOptionsGeneric(Generic[TCrawlingContext, TStatisticsState], TypedDict):
"""Generic options the `BasicCrawler` constructor."""
Expand Down Expand Up @@ -271,6 +283,9 @@ def __init__(
configure_logging: bool = True,
statistics_log_format: Literal['table', 'inline'] = 'table',
respect_robots_txt_file: bool = False,
status_message_logging_interval: timedelta = timedelta(seconds=10),
status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]]
| None = None,
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
_additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None,
_logger: logging.Logger | None = None,
Expand All @@ -289,7 +304,6 @@ def __init__(
max_request_retries: Specifies the maximum number of retries allowed for a request if its processing fails.
This includes retries due to navigation errors or errors thrown from user-supplied functions
(`request_handler`, `pre_navigation_hooks` etc.).

This limit does not apply to retries triggered by session rotation (see `max_session_rotations`).
max_requests_per_crawl: Maximum number of pages to open during a crawl. The crawl stops upon reaching
this limit. Setting this value can help avoid infinite loops in misconfigured crawlers. `None` means
Expand All @@ -298,7 +312,6 @@ def __init__(
`max_requests_per_crawl` is achieved.
max_session_rotations: Maximum number of session rotations per request. The crawler rotates the session
if a proxy error occurs or if the website blocks the request.

The session rotations are not counted towards the `max_request_retries` limit.
max_crawl_depth: Specifies the maximum crawl depth. If set, the crawler will stop processing links beyond
this depth. The crawl depth starts at 0 for initial requests and increases with each subsequent level
Expand All @@ -322,6 +335,9 @@ def __init__(
respect_robots_txt_file: If set to `True`, the crawler will automatically try to fetch the robots.txt file
for each domain, and skip those that are not allowed. This also prevents disallowed URLs to be added
via `EnqueueLinksFunction`
status_message_logging_interval: Interval for logging the crawler status messages
status_message_callback: Allows overriding the default status message. The default status message is
provided in the parameters. Returning `None` suppresses the status message.
_context_pipeline: Enables extending the request lifecycle and modifying the crawling context.
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
_additional_context_managers: Additional context managers used throughout the crawler lifecycle.
Expand Down Expand Up @@ -366,6 +382,9 @@ def __init__(
self._on_skipped_request: SkippedRequestCallback | None = None
self._abort_on_error = abort_on_error

# Crawler callbacks
self._status_message_callback = status_message_callback

# Context of each request with matching result of request handler.
# Inheritors can use this to override the result of individual request handler runs in `_run_request_handler`.
self._context_result_map = WeakKeyDictionary[BasicCrawlingContext, RequestHandlerRunResult]()
Expand Down Expand Up @@ -426,6 +445,10 @@ def __init__(
is_task_ready_function=self.__is_task_ready_function,
run_task_function=self.__run_task_function,
)
self._crawler_state_rec_task = RecurringTask(
func=self._crawler_state_task, delay=status_message_logging_interval
)
self._previous_crawler_state: TStatisticsState | None = None

# State flags
self._keep_alive = keep_alive
Expand Down Expand Up @@ -630,6 +653,7 @@ def sigint_handler() -> None:
except CancelledError:
pass
finally:
await self._crawler_state_rec_task.stop()
if threading.current_thread() is threading.main_thread():
with suppress(NotImplementedError):
asyncio.get_running_loop().remove_signal_handler(signal.SIGINT)
Expand Down Expand Up @@ -661,6 +685,8 @@ def sigint_handler() -> None:
async def _run_crawler(self) -> None:
event_manager = service_locator.get_event_manager()

self._crawler_state_rec_task.start()

# Collect the context managers to be entered. Context managers that are already active are excluded,
# as they were likely entered by the caller, who will also be responsible for exiting them.
contexts_to_enter = [
Expand Down Expand Up @@ -1522,3 +1548,53 @@ async def _find_txt_file_for_url(self, url: str) -> RobotsTxtFile:
url: The URL whose domain will be used to locate and fetch the corresponding robots.txt file.
"""
return await RobotsTxtFile.find(url, self._http_client)

def _log_status_message(self, message: str, level: LogLevel = 'DEBUG') -> None:
"""Log a status message for the crawler.

Args:
message: The status message to log.
level: The logging level for the message.
"""
log_level = string_to_log_level(level)
self.log.log(log_level, message)

async def _crawler_state_task(self) -> None:
"""Emit a persist state event with the given migration status."""
event_manager = service_locator.get_event_manager()

current_state = self.statistics.state

if (
failed_requests := (
current_state.requests_failed - (self._previous_crawler_state or current_state).requests_failed
)
> 0
):
message = f'Experiencing problems, {failed_requests} failed requests since last status update.'
else:
request_manager = await self.get_request_manager()
total_count = await request_manager.get_total_count()
if total_count is not None and total_count > 0:
pages_info = f'{self._statistics.state.requests_finished}/{total_count}'
else:
pages_info = str(self._statistics.state.requests_finished)

message = (
f'Crawled {pages_info} pages, {self._statistics.state.requests_failed} failed requests, '
f'desired concurrency {self._autoscaled_pool.desired_concurrency}.'
)

if self._status_message_callback:
new_message = await self._status_message_callback(current_state, self._previous_crawler_state, message)
if new_message:
message = new_message
self._log_status_message(message, level='INFO')
else:
self._log_status_message(message, level='INFO')

event_manager.emit(
event=Event.CRAWLER_STATUS, event_data=EventCrawlerStatusData(message=message, crawler_id=id(self))
)

self._previous_crawler_state = current_state
2 changes: 2 additions & 0 deletions src/crawlee/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ._types import (
Event,
EventAbortingData,
EventCrawlerStatusData,
EventData,
EventExitData,
EventListener,
Expand All @@ -14,6 +15,7 @@
__all__ = [
'Event',
'EventAbortingData',
'EventCrawlerStatusData',
'EventData',
'EventExitData',
'EventListener',
Expand Down
5 changes: 5 additions & 0 deletions src/crawlee/events/_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from crawlee.events._types import (
Event,
EventAbortingData,
EventCrawlerStatusData,
EventExitData,
EventListener,
EventMigratingData,
Expand Down Expand Up @@ -147,6 +148,8 @@ def on(self, *, event: Literal[Event.ABORTING], listener: EventListener[EventAbo
@overload
def on(self, *, event: Literal[Event.EXIT], listener: EventListener[EventExitData]) -> None: ...
@overload
def on(self, *, event: Literal[Event.CRAWLER_STATUS], listener: EventListener[EventCrawlerStatusData]) -> None: ...
@overload
def on(self, *, event: Event, listener: EventListener[None]) -> None: ...

def on(self, *, event: Event, listener: EventListener[Any]) -> None:
Expand Down Expand Up @@ -222,6 +225,8 @@ def emit(self, *, event: Literal[Event.ABORTING], event_data: EventAbortingData)
@overload
def emit(self, *, event: Literal[Event.EXIT], event_data: EventExitData) -> None: ...
@overload
def emit(self, *, event: Literal[Event.CRAWLER_STATUS], event_data: EventCrawlerStatusData) -> None: ...
@overload
def emit(self, *, event: Event, event_data: Any) -> None: ...

@ensure_context
Expand Down
23 changes: 22 additions & 1 deletion src/crawlee/events/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class Event(str, Enum):
PAGE_CREATED = 'pageCreated'
PAGE_CLOSED = 'pageClosed'

# State events
CRAWLER_STATUS = 'crawlerStatus'


@docs_group('Event payloads')
class EventPersistStateData(BaseModel):
Expand Down Expand Up @@ -79,7 +82,25 @@ class EventExitData(BaseModel):
model_config = ConfigDict(populate_by_name=True)


EventData = Union[EventPersistStateData, EventSystemInfoData, EventMigratingData, EventAbortingData, EventExitData]
@docs_group('Event payloads')
class EventCrawlerStatusData(BaseModel):
"""Data for the crawler status event."""

model_config = ConfigDict(populate_by_name=True)

message: str

crawler_id: int


EventData = Union[
EventPersistStateData,
EventSystemInfoData,
EventMigratingData,
EventAbortingData,
EventExitData,
EventCrawlerStatusData,
]
"""A helper type for all possible event payloads"""

WrappedListener = Callable[..., Coroutine[Any, Any, None]]
Expand Down
Loading
Loading