diff --git a/docs/guides/code_examples/session_management/multi_sessions_http.py b/docs/guides/code_examples/session_management/multi_sessions_http.py index 74f1bafc4c..0bd4a88beb 100644 --- a/docs/guides/code_examples/session_management/multi_sessions_http.py +++ b/docs/guides/code_examples/session_management/multi_sessions_http.py @@ -49,7 +49,7 @@ async def session_init(context: HttpCrawlingContext) -> None: if context.session: context.log.info(f'Init session {context.session.id}') next_request = Request.from_url( - 'https://placeholder.dev', session_id=context.session.id + 'https://a.placeholder.com', session_id=context.session.id ) next_requests.append(next_request) diff --git a/src/crawlee/_log_config.py b/src/crawlee/_log_config.py index 914cc32f24..5fc9e94b8a 100644 --- a/src/crawlee/_log_config.py +++ b/src/crawlee/_log_config.py @@ -4,15 +4,19 @@ import logging import sys import textwrap -from typing import Any +from typing import TYPE_CHECKING, Any from colorama import Fore, Style, just_fix_windows_console from typing_extensions import assert_never from crawlee import service_locator +if TYPE_CHECKING: + from crawlee._types import LogLevel + just_fix_windows_console() + _LOG_NAME_COLOR = Fore.LIGHTBLACK_EX _LOG_LEVEL_COLOR = { @@ -34,22 +38,27 @@ _LOG_MESSAGE_INDENT = ' ' * 6 +def string_to_log_level(level: LogLevel) -> int: + """Convert a string representation of a log level to an integer log level.""" + if level == 'DEBUG': + return logging.DEBUG + if level == 'INFO': + return logging.INFO + if level == 'WARNING': + return logging.WARNING + if level == 'ERROR': + return logging.ERROR + if level == 'CRITICAL': + return logging.CRITICAL + + assert_never(level) + + def get_configured_log_level() -> int: config = service_locator.get_configuration() if 'log_level' in config.model_fields_set: - if config.log_level == 'DEBUG': - return logging.DEBUG - if config.log_level == 'INFO': - return logging.INFO - if config.log_level == 'WARNING': - return logging.WARNING - if config.log_level == 'ERROR': - return logging.ERROR - if config.log_level == 'CRITICAL': - return logging.CRITICAL - - assert_never(config.log_level) + return string_to_log_level(config.log_level) if sys.flags.dev_mode: return logging.DEBUG diff --git a/src/crawlee/_types.py b/src/crawlee/_types.py index ae047e4ce6..85c4ab0561 100644 --- a/src/crawlee/_types.py +++ b/src/crawlee/_types.py @@ -55,6 +55,8 @@ SkippedReason = Literal['robots_txt'] +LogLevel = Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] + def _normalize_headers(headers: Mapping[str, str]) -> dict[str, str]: """Convert all header keys to lowercase, strips whitespace, and returns them sorted by key.""" diff --git a/src/crawlee/configuration.py b/src/crawlee/configuration.py index c58aff9375..fbcfc6f04c 100644 --- a/src/crawlee/configuration.py +++ b/src/crawlee/configuration.py @@ -1,11 +1,12 @@ from __future__ import annotations from datetime import timedelta -from typing import TYPE_CHECKING, Annotated, Literal +from typing import TYPE_CHECKING, Annotated from pydantic import AliasChoices, BeforeValidator, Field from pydantic_settings import BaseSettings, SettingsConfigDict +from crawlee._types import LogLevel from crawlee._utils.docs import docs_group from crawlee._utils.models import timedelta_ms @@ -62,7 +63,7 @@ class Configuration(BaseSettings): https://playwright.dev/docs/api/class-browsertype#browser-type-launch.""" log_level: Annotated[ - Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], + LogLevel, Field( validation_alias=AliasChoices( 'apify_log_level', diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 0ca8422327..89ebde2413 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -25,7 +25,7 @@ from crawlee import EnqueueStrategy, Glob, RequestTransformAction, service_locator from crawlee._autoscaling import AutoscaledPool, Snapshotter, SystemStatus -from crawlee._log_config import configure_logger, get_configured_log_level +from crawlee._log_config import configure_logger, get_configured_log_level, string_to_log_level from crawlee._request import Request, RequestOptions, RequestState from crawlee._types import ( BasicCrawlingContext, @@ -33,12 +33,14 @@ GetKeyValueStoreFromRequestHandlerFunction, HttpHeaders, HttpPayload, + LogLevel, RequestHandlerRunResult, SendRequestFunction, SkippedReason, ) from crawlee._utils.docs import docs_group from crawlee._utils.file import export_csv_to_stream, export_json_to_stream +from crawlee._utils.recurring_task import RecurringTask from crawlee._utils.robots import RobotsTxtFile from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute from crawlee._utils.wait import wait_for @@ -53,6 +55,7 @@ SessionError, UserDefinedErrorHandlerError, ) +from crawlee.events._types import Event, EventCrawlerStatusData from crawlee.http_clients import HttpxHttpClient from crawlee.router import Router from crawlee.sessions import SessionPool @@ -191,6 +194,15 @@ class _BasicCrawlerOptions(TypedDict): """If set to `True`, the crawler will automatically try to fetch the robots.txt file for each domain, and skip those that are not allowed. This also prevents disallowed URLs to be added via `EnqueueLinksFunction`.""" + status_message_logging_interval: NotRequired[timedelta] + """Interval for logging the crawler status messages.""" + + status_message_callback: NotRequired[ + Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]] + ] + """Allows overriding the default status message. The default status message is provided in the parameters. + Returning `None` suppresses the status message.""" + class _BasicCrawlerOptionsGeneric(Generic[TCrawlingContext, TStatisticsState], TypedDict): """Generic options the `BasicCrawler` constructor.""" @@ -273,6 +285,9 @@ def __init__( configure_logging: bool = True, statistics_log_format: Literal['table', 'inline'] = 'table', respect_robots_txt_file: bool = False, + status_message_logging_interval: timedelta = timedelta(seconds=10), + status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]] + | None = None, _context_pipeline: ContextPipeline[TCrawlingContext] | None = None, _additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None, _logger: logging.Logger | None = None, @@ -291,7 +306,6 @@ def __init__( max_request_retries: Specifies the maximum number of retries allowed for a request if its processing fails. This includes retries due to navigation errors or errors thrown from user-supplied functions (`request_handler`, `pre_navigation_hooks` etc.). - This limit does not apply to retries triggered by session rotation (see `max_session_rotations`). max_requests_per_crawl: Maximum number of pages to open during a crawl. The crawl stops upon reaching this limit. Setting this value can help avoid infinite loops in misconfigured crawlers. `None` means @@ -300,7 +314,6 @@ def __init__( `max_requests_per_crawl` is achieved. max_session_rotations: Maximum number of session rotations per request. The crawler rotates the session if a proxy error occurs or if the website blocks the request. - The session rotations are not counted towards the `max_request_retries` limit. max_crawl_depth: Specifies the maximum crawl depth. If set, the crawler will stop processing links beyond this depth. The crawl depth starts at 0 for initial requests and increases with each subsequent level @@ -324,6 +337,9 @@ def __init__( respect_robots_txt_file: If set to `True`, the crawler will automatically try to fetch the robots.txt file for each domain, and skip those that are not allowed. This also prevents disallowed URLs to be added via `EnqueueLinksFunction` + status_message_logging_interval: Interval for logging the crawler status messages. + status_message_callback: Allows overriding the default status message. The default status message is + provided in the parameters. Returning `None` suppresses the status message. _context_pipeline: Enables extending the request lifecycle and modifying the crawling context. Intended for use by subclasses rather than direct instantiation of `BasicCrawler`. _additional_context_managers: Additional context managers used throughout the crawler lifecycle. @@ -368,6 +384,9 @@ def __init__( self._on_skipped_request: SkippedRequestCallback | None = None self._abort_on_error = abort_on_error + # Crawler callbacks + self._status_message_callback = status_message_callback + # Context of each request with matching result of request handler. # Inheritors can use this to override the result of individual request handler runs in `_run_request_handler`. self._context_result_map = WeakKeyDictionary[BasicCrawlingContext, RequestHandlerRunResult]() @@ -428,6 +447,10 @@ def __init__( is_task_ready_function=self.__is_task_ready_function, run_task_function=self.__run_task_function, ) + self._crawler_state_rec_task = RecurringTask( + func=self._crawler_state_task, delay=status_message_logging_interval + ) + self._previous_crawler_state: TStatisticsState | None = None # State flags self._keep_alive = keep_alive @@ -632,6 +655,7 @@ def sigint_handler() -> None: except CancelledError: pass finally: + await self._crawler_state_rec_task.stop() if threading.current_thread() is threading.main_thread(): with suppress(NotImplementedError): asyncio.get_running_loop().remove_signal_handler(signal.SIGINT) @@ -663,6 +687,8 @@ def sigint_handler() -> None: async def _run_crawler(self) -> None: event_manager = service_locator.get_event_manager() + self._crawler_state_rec_task.start() + # Collect the context managers to be entered. Context managers that are already active are excluded, # as they were likely entered by the caller, who will also be responsible for exiting them. contexts_to_enter = [ @@ -1481,3 +1507,53 @@ async def _find_txt_file_for_url(self, url: str) -> RobotsTxtFile: url: The URL whose domain will be used to locate and fetch the corresponding robots.txt file. """ return await RobotsTxtFile.find(url, self._http_client) + + def _log_status_message(self, message: str, level: LogLevel = 'DEBUG') -> None: + """Log a status message for the crawler. + + Args: + message: The status message to log. + level: The logging level for the message. + """ + log_level = string_to_log_level(level) + self.log.log(log_level, message) + + async def _crawler_state_task(self) -> None: + """Emit a persist state event with the given migration status.""" + event_manager = service_locator.get_event_manager() + + current_state = self.statistics.state + + if ( + failed_requests := ( + current_state.requests_failed - (self._previous_crawler_state or current_state).requests_failed + ) + > 0 + ): + message = f'Experiencing problems, {failed_requests} failed requests since last status update.' + else: + request_manager = await self.get_request_manager() + total_count = await request_manager.get_total_count() + if total_count is not None and total_count > 0: + pages_info = f'{self._statistics.state.requests_finished}/{total_count}' + else: + pages_info = str(self._statistics.state.requests_finished) + + message = ( + f'Crawled {pages_info} pages, {self._statistics.state.requests_failed} failed requests, ' + f'desired concurrency {self._autoscaled_pool.desired_concurrency}.' + ) + + if self._status_message_callback: + new_message = await self._status_message_callback(current_state, self._previous_crawler_state, message) + if new_message: + message = new_message + self._log_status_message(message, level='INFO') + else: + self._log_status_message(message, level='INFO') + + event_manager.emit( + event=Event.CRAWLER_STATUS, event_data=EventCrawlerStatusData(message=message, crawler_id=id(self)) + ) + + self._previous_crawler_state = current_state diff --git a/src/crawlee/events/__init__.py b/src/crawlee/events/__init__.py index 1c2cda0173..2aa2beecfd 100644 --- a/src/crawlee/events/__init__.py +++ b/src/crawlee/events/__init__.py @@ -3,6 +3,7 @@ from ._types import ( Event, EventAbortingData, + EventCrawlerStatusData, EventData, EventExitData, EventListener, @@ -14,6 +15,7 @@ __all__ = [ 'Event', 'EventAbortingData', + 'EventCrawlerStatusData', 'EventData', 'EventExitData', 'EventListener', diff --git a/src/crawlee/events/_event_manager.py b/src/crawlee/events/_event_manager.py index a0138fb153..b2b072e330 100644 --- a/src/crawlee/events/_event_manager.py +++ b/src/crawlee/events/_event_manager.py @@ -19,6 +19,7 @@ from crawlee.events._types import ( Event, EventAbortingData, + EventCrawlerStatusData, EventExitData, EventListener, EventMigratingData, @@ -147,6 +148,8 @@ def on(self, *, event: Literal[Event.ABORTING], listener: EventListener[EventAbo @overload def on(self, *, event: Literal[Event.EXIT], listener: EventListener[EventExitData]) -> None: ... @overload + def on(self, *, event: Literal[Event.CRAWLER_STATUS], listener: EventListener[EventCrawlerStatusData]) -> None: ... + @overload def on(self, *, event: Event, listener: EventListener[None]) -> None: ... def on(self, *, event: Event, listener: EventListener[Any]) -> None: @@ -222,6 +225,8 @@ def emit(self, *, event: Literal[Event.ABORTING], event_data: EventAbortingData) @overload def emit(self, *, event: Literal[Event.EXIT], event_data: EventExitData) -> None: ... @overload + def emit(self, *, event: Literal[Event.CRAWLER_STATUS], event_data: EventCrawlerStatusData) -> None: ... + @overload def emit(self, *, event: Event, event_data: Any) -> None: ... @ensure_context diff --git a/src/crawlee/events/_types.py b/src/crawlee/events/_types.py index f16b7a7b99..aeaa5d7611 100644 --- a/src/crawlee/events/_types.py +++ b/src/crawlee/events/_types.py @@ -31,6 +31,9 @@ class Event(str, Enum): PAGE_CREATED = 'pageCreated' PAGE_CLOSED = 'pageClosed' + # State events + CRAWLER_STATUS = 'crawlerStatus' + @docs_group('Event payloads') class EventPersistStateData(BaseModel): @@ -79,7 +82,27 @@ class EventExitData(BaseModel): model_config = ConfigDict(populate_by_name=True) -EventData = EventPersistStateData | EventSystemInfoData | EventMigratingData | EventAbortingData | EventExitData +@docs_group('Event payloads') +class EventCrawlerStatusData(BaseModel): + """Data for the crawler status event.""" + + model_config = ConfigDict(populate_by_name=True) + + message: str + """A message describing the current status of the crawler.""" + + crawler_id: int + """The ID of the crawler that emitted the event.""" + + +EventData = ( + EventPersistStateData + | EventSystemInfoData + | EventMigratingData + | EventAbortingData + | EventExitData + | EventCrawlerStatusData +) """A helper type for all possible event payloads""" WrappedListener = Callable[..., Coroutine[Any, Any, None]] diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 0e00f09934..4e8a513118 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -22,6 +22,7 @@ from crawlee.configuration import Configuration from crawlee.crawlers import BasicCrawler from crawlee.errors import RequestCollisionError, SessionError, UserDefinedErrorHandlerError +from crawlee.events import Event, EventCrawlerStatusData from crawlee.events._local_event_manager import LocalEventManager from crawlee.request_loaders import RequestList, RequestManagerTandem from crawlee.sessions import Session, SessionPool @@ -36,11 +37,12 @@ from yarl import URL from crawlee._types import JsonSerializable + from crawlee.statistics import StatisticsState async def test_processes_requests_from_explicit_queue() -> None: queue = await RequestQueue.open() - await queue.add_requests(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await queue.add_requests(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) crawler = BasicCrawler(request_manager=queue) calls = list[str]() @@ -51,14 +53,16 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler.run() - assert calls == ['http://a.com/', 'http://b.com/', 'http://c.com/'] + assert calls == ['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com'] async def test_processes_requests_from_request_source_tandem() -> None: request_queue = await RequestQueue.open() - await request_queue.add_requests(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await request_queue.add_requests( + ['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com'] + ) - request_list = RequestList(['http://a.com/', 'http://d.com', 'http://e.com']) + request_list = RequestList(['https://a.placeholder.com', 'https://d.placeholder.com', 'https://e.placeholder.com']) crawler = BasicCrawler(request_manager=RequestManagerTandem(request_list, request_queue)) calls = set[str]() @@ -69,7 +73,13 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler.run() - assert calls == {'http://a.com/', 'http://b.com/', 'http://c.com/', 'http://d.com', 'http://e.com'} + assert calls == { + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', + 'https://d.placeholder.com', + 'https://e.placeholder.com', + } async def test_processes_requests_from_run_args() -> None: @@ -80,9 +90,9 @@ async def test_processes_requests_from_run_args() -> None: async def handler(context: BasicCrawlingContext) -> None: calls.append(context.request.url) - await crawler.run(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) - assert calls == ['http://a.com/', 'http://b.com/', 'http://c.com/'] + assert calls == ['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com'] async def test_allows_multiple_run_calls() -> None: @@ -93,16 +103,16 @@ async def test_allows_multiple_run_calls() -> None: async def handler(context: BasicCrawlingContext) -> None: calls.append(context.request.url) - await crawler.run(['http://a.com/', 'http://b.com/', 'http://c.com/']) - await crawler.run(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) assert calls == [ - 'http://a.com/', - 'http://b.com/', - 'http://c.com/', - 'http://a.com/', - 'http://b.com/', - 'http://c.com/', + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', ] @@ -114,17 +124,17 @@ async def test_retries_failed_requests() -> None: async def handler(context: BasicCrawlingContext) -> None: calls.append(context.request.url) - if context.request.url == 'http://b.com/': + if context.request.url == 'https://b.placeholder.com': raise RuntimeError('Arbitrary crash for testing purposes') - await crawler.run(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) assert calls == [ - 'http://a.com/', - 'http://b.com/', - 'http://c.com/', - 'http://b.com/', - 'http://b.com/', + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', + 'https://b.placeholder.com', + 'https://b.placeholder.com', ] @@ -137,16 +147,22 @@ async def handler(context: BasicCrawlingContext) -> None: calls.append(context.request.url) raise RuntimeError('Arbitrary crash for testing purposes') - await crawler.run(['http://a.com/', 'http://b.com/', Request.from_url(url='http://c.com/', no_retry=True)]) + await crawler.run( + [ + 'https://a.placeholder.com', + 'https://b.placeholder.com', + Request.from_url(url='https://c.placeholder.com', no_retry=True), + ] + ) assert calls == [ - 'http://a.com/', - 'http://b.com/', - 'http://c.com/', - 'http://a.com/', - 'http://b.com/', - 'http://a.com/', - 'http://b.com/', + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://a.placeholder.com', + 'https://b.placeholder.com', ] @@ -161,19 +177,19 @@ async def handler(context: BasicCrawlingContext) -> None: await crawler.run( [ - 'http://a.com/', - 'http://b.com/', - Request.from_url(url='http://c.com/', user_data={'__crawlee': {'maxRetries': 4}}), + 'https://a.placeholder.com', + 'https://b.placeholder.com', + Request.from_url(url='https://c.placeholder.com', user_data={'__crawlee': {'maxRetries': 4}}), ] ) assert calls == [ - 'http://a.com/', - 'http://b.com/', - 'http://c.com/', - 'http://c.com/', - 'http://c.com/', - 'http://c.com/', + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', + 'https://c.placeholder.com', + 'https://c.placeholder.com', + 'https://c.placeholder.com', ] @@ -192,7 +208,7 @@ class Call: @crawler.router.default_handler async def handler(context: BasicCrawlingContext) -> None: - if context.request.url == 'http://b.com/': + if context.request.url == 'https://b.placeholder.com': raise RuntimeError('Arbitrary crash for testing purposes') @crawler.error_handler @@ -209,20 +225,20 @@ async def error_handler(context: BasicCrawlingContext, error: Exception) -> Requ request['headers'] = HttpHeaders({'custom_retry_count': str(custom_retry_count + 1)}) return Request.model_validate(request) - await crawler.run(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) # Verify that the error handler was called twice assert len(calls) == 2 # Check the first call... first_call = calls[0] - assert first_call.url == 'http://b.com/' + assert first_call.url == 'https://b.placeholder.com' assert isinstance(first_call.error, RuntimeError) assert first_call.custom_retry_count == 0 # Check the second call... second_call = calls[1] - assert second_call.url == 'http://b.com/' + assert second_call.url == 'https://b.placeholder.com' assert isinstance(second_call.error, RuntimeError) assert second_call.custom_retry_count == 1 @@ -252,7 +268,7 @@ async def test_handles_error_in_error_handler() -> None: @crawler.router.default_handler async def handler(context: BasicCrawlingContext) -> None: - if context.request.url == 'http://b.com/': + if context.request.url == 'https://b.placeholder.com': raise RuntimeError('Arbitrary crash for testing purposes') @crawler.error_handler @@ -260,7 +276,7 @@ async def error_handler(context: BasicCrawlingContext, error: Exception) -> None raise RuntimeError('Crash in error handler') with pytest.raises(UserDefinedErrorHandlerError): - await crawler.run(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) async def test_calls_failed_request_handler() -> None: @@ -269,17 +285,17 @@ async def test_calls_failed_request_handler() -> None: @crawler.router.default_handler async def handler(context: BasicCrawlingContext) -> None: - if context.request.url == 'http://b.com/': + if context.request.url == 'https://b.placeholder.com': raise RuntimeError('Arbitrary crash for testing purposes') @crawler.failed_request_handler async def failed_request_handler(context: BasicCrawlingContext, error: Exception) -> None: calls.append((context, error)) - await crawler.run(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) assert len(calls) == 1 - assert calls[0][0].request.url == 'http://b.com/' + assert calls[0][0].request.url == 'https://b.placeholder.com' assert isinstance(calls[0][1], RuntimeError) @@ -288,7 +304,7 @@ async def test_handles_error_in_failed_request_handler() -> None: @crawler.router.default_handler async def handler(context: BasicCrawlingContext) -> None: - if context.request.url == 'http://b.com/': + if context.request.url == 'https://b.placeholder.com': raise RuntimeError('Arbitrary crash for testing purposes') @crawler.failed_request_handler @@ -296,7 +312,7 @@ async def failed_request_handler(context: BasicCrawlingContext, error: Exception raise RuntimeError('Crash in failed request handler') with pytest.raises(UserDefinedErrorHandlerError): - await crawler.run(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) @pytest.mark.parametrize( @@ -318,7 +334,7 @@ async def handler(context: BasicCrawlingContext) -> None: response_data['body'] = json.loads(response.read()) response_data['headers'] = response.headers - await crawler.run(['http://a.com/', 'http://b.com/', 'http://c.com/']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) response_body = response_data.get('body') assert response_body is not None @@ -363,15 +379,15 @@ class AddRequestsTestInput: # Basic use case pytest.param( AddRequestsTestInput( - start_url='https://a.com/', - loaded_url='https://a.com/', + start_url='https://a.placeholder.com', + loaded_url='https://a.placeholder.com', requests=[ - 'https://a.com/', - Request.from_url('http://b.com/'), - 'http://c.com/', + 'https://a.placeholder.com', + Request.from_url('https://b.placeholder.com'), + 'https://c.placeholder.com', ], kwargs={}, - expected_urls=['http://b.com/', 'http://c.com/'], + expected_urls=['https://b.placeholder.com', 'https://c.placeholder.com'], ), id='basic', ), @@ -667,7 +683,7 @@ async def handler(context: BasicCrawlingContext) -> None: await context.push_data({'b': 2}) raise RuntimeError('Watch me crash') - stats = await crawler.run(['https://a.com']) + stats = await crawler.run(['https://a.placeholder.com']) assert (await crawler.get_data()).items == [] assert stats.requests_total == 1 @@ -893,15 +909,15 @@ async def test_consecutive_runs_purge_request_queue() -> None: async def handler(context: BasicCrawlingContext) -> None: visit(context.request.url) - await crawler.run(['http://a.com', 'http://b.com', 'http://c.com']) - await crawler.run(['http://a.com', 'http://b.com', 'http://c.com']) - await crawler.run(['http://a.com', 'http://b.com', 'http://c.com']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) + await crawler.run(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) counter = Counter(args[0][0] for args in visit.call_args_list) assert counter == { - 'http://a.com': 3, - 'http://b.com': 3, - 'http://c.com': 3, + 'https://a.placeholder.com': 3, + 'https://b.placeholder.com': 3, + 'https://c.placeholder.com': 3, } @@ -1154,7 +1170,7 @@ async def handler(context: BasicCrawlingContext) -> None: # Timeout in pytest, because previous implementation would run crawler until following: # "The request queue seems to be stuck for 300.0s, resetting internal state." async with timeout(max_request_retries * double_handler_timeout_s): - await crawler.run(['http://a.com/']) + await crawler.run(['https://a.placeholder.com']) assert crawler.statistics.state.requests_finished == 1 assert mocked_handler_before_sleep.call_count == max_request_retries @@ -1175,7 +1191,7 @@ async def test_keep_alive( """Test that crawler can be kept alive without any requests and stopped with `crawler.stop()`. Crawler should stop if `max_requests_per_crawl` is reached regardless of the `keep_alive` flag.""" - additional_urls = ['http://a.com/', 'http://b.com/'] + additional_urls = ['https://a.placeholder.com', 'https://b.placeholder.com'] expected_handler_calls = [call(url) for url in additional_urls[:expected_handled_requests_count]] crawler = BasicCrawler( @@ -1224,9 +1240,9 @@ async def handler(context: BasicCrawlingContext) -> None: context.session.retire() if retire else None - await context.add_requests(['http://b.com/']) + await context.add_requests(['https://b.placeholder.com']) - await crawler.run(['http://a.com/']) + await crawler.run(['https://a.placeholder.com']) # The session should differ if `retire` was called and match otherwise since pool size == 1 if retire: @@ -1247,7 +1263,8 @@ async def handler(context: BasicCrawlingContext) -> None: used_sessions.append(context.session.id) requests = [ - Request.from_url('http://a.com/', session_id=check_session.id, always_enqueue=True) for _ in range(10) + Request.from_url('https://a.placeholder.com', session_id=check_session.id, always_enqueue=True) + for _ in range(10) ] await crawler.run(requests) @@ -1278,7 +1295,7 @@ async def handler(context: BasicCrawlingContext) -> None: used_sessions.append(context.session.id) requests = [ - Request.from_url('http://a.com/', session_id=str(session_id), use_extended_unique_key=True) + Request.from_url('https://a.placeholder.com', session_id=str(session_id), use_extended_unique_key=True) for session_id in range(10) ] @@ -1291,7 +1308,7 @@ async def handler(context: BasicCrawlingContext) -> None: async def test_error_bound_session_to_request() -> None: crawler = BasicCrawler(request_handler=AsyncMock()) - requests = [Request.from_url('http://a.com/', session_id='1', always_enqueue=True) for _ in range(10)] + requests = [Request.from_url('https://a.placeholder.com', session_id='1', always_enqueue=True) for _ in range(10)] stats = await crawler.run(requests) @@ -1309,7 +1326,7 @@ async def error_req_hook(context: BasicCrawlingContext, error: Exception) -> Non if isinstance(error, RequestCollisionError): await error_handler_mock(context, error) - requests = [Request.from_url('http://a.com/', session_id='1')] + requests = [Request.from_url('https://a.placeholder.com', session_id='1')] await crawler.run(requests) @@ -1328,7 +1345,7 @@ async def handler(context: BasicCrawlingContext) -> None: async def failed_request_handler(context: BasicCrawlingContext, error: Exception) -> None: handler_requests.add(context.request.url) - requests = ['http://a.com/', 'http://b.com/', 'http://c.com/'] + requests = ['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com'] await crawler.run(requests) @@ -1361,7 +1378,7 @@ async def handler(context: BasicCrawlingContext) -> None: # Capture all logs from the 'crawlee' logger at INFO level or higher with caplog.at_level(logging.INFO, logger='crawlee'): - await crawler.run([Request.from_url('http://a.com/')]) + await crawler.run([Request.from_url('https://a.placeholder.com')]) # Check for the timeout message in any of the logs found_timeout_message = False @@ -1374,3 +1391,61 @@ async def handler(context: BasicCrawlingContext) -> None: break assert found_timeout_message, 'Expected log message about request handler error was not found.' + + +async def test_status_message_callback() -> None: + """Test that status message callback is called with the correct message.""" + status_message_callback = AsyncMock() + states: list[dict[str, StatisticsState | None]] = [] + + async def status_callback( + state: StatisticsState, previous_state: StatisticsState | None, message: str + ) -> str | None: + status_message_callback(message) + states.append({'state': state, 'previous_state': previous_state}) + return message + + crawler = BasicCrawler( + status_message_callback=status_callback, status_message_logging_interval=timedelta(seconds=0.01) + ) + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + await asyncio.sleep(0.1) # Simulate some processing time + + await crawler.run(['https://a.placeholder.com']) + + assert status_message_callback.called + + assert len(states) > 1 + + first_call = states[0] + second_call = states[1] + + # For the first call, `previous_state` is None + assert first_call['state'] is not None + assert first_call['previous_state'] is None + + # For second call, `previous_state` is the first state + assert second_call['state'] is not None + assert second_call['previous_state'] is not None + assert second_call['previous_state'] == first_call['state'] + + +async def test_status_message_emit() -> None: + event_manager = service_locator.get_event_manager() + + status_message_listener = Mock() + + def listener(event_data: EventCrawlerStatusData) -> None: + status_message_listener(event_data) + + event_manager.on(event=Event.CRAWLER_STATUS, listener=listener) + + crawler = BasicCrawler(request_handler=AsyncMock()) + + await crawler.run(['https://a.placeholder.com']) + + event_manager.off(event=Event.CRAWLER_STATUS, listener=listener) + + assert status_message_listener.called diff --git a/tests/unit/request_loaders/test_request_list.py b/tests/unit/request_loaders/test_request_list.py index 5142b7719d..e3ded91b7f 100644 --- a/tests/unit/request_loaders/test_request_list.py +++ b/tests/unit/request_loaders/test_request_list.py @@ -4,7 +4,7 @@ async def test_sync_traversal() -> None: - request_list = RequestList(['https://a.com', 'https://b.com', 'https://c.com']) + request_list = RequestList(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) while not await request_list.is_finished(): item = await request_list.fetch_next_request() @@ -17,9 +17,9 @@ async def test_sync_traversal() -> None: async def test_async_traversal() -> None: async def generator() -> AsyncGenerator[str]: - yield 'https://a.com' - yield 'https://b.com' - yield 'https://c.com' + yield 'https://a.placeholder.com' + yield 'https://b.placeholder.com' + yield 'https://c.placeholder.com' request_list = RequestList(generator()) @@ -33,7 +33,7 @@ async def generator() -> AsyncGenerator[str]: async def test_is_empty_does_not_depend_on_fetch_next_request() -> None: - request_list = RequestList(['https://a.com', 'https://b.com', 'https://c.com']) + request_list = RequestList(['https://a.placeholder.com', 'https://b.placeholder.com', 'https://c.placeholder.com']) item_1 = await request_list.fetch_next_request() assert item_1 is not None diff --git a/tests/unit/storages/test_request_manager_tandem.py b/tests/unit/storages/test_request_manager_tandem.py index 70240914ec..69bd944348 100644 --- a/tests/unit/storages/test_request_manager_tandem.py +++ b/tests/unit/storages/test_request_manager_tandem.py @@ -25,26 +25,30 @@ class TestInput: argvalues=[ pytest.param( TestInput( - request_loader_items=['http://a.com', 'http://b.com'], + request_loader_items=['https://a.placeholder.com', 'https://b.placeholder.com'], request_manager_items=[], - discovered_items=[Request.from_url('http://c.com')], + discovered_items=[Request.from_url('https://c.placeholder.com')], expected_result={ - 'http://a.com', - 'http://b.com', - 'http://c.com', + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', }, ), id='basic_usage', ), pytest.param( TestInput( - request_loader_items=[Request.from_url('http://a.com'), None, Request.from_url('http://c.com')], - request_manager_items=['http://b.com', 'http://d.com'], + request_loader_items=[ + Request.from_url('https://a.placeholder.com'), + None, + Request.from_url('https://c.placeholder.com'), + ], + request_manager_items=['https://b.placeholder.com', 'http://d.com'], discovered_items=[], expected_result={ - 'http://a.com', - 'http://b.com', - 'http://c.com', + 'https://a.placeholder.com', + 'https://b.placeholder.com', + 'https://c.placeholder.com', 'http://d.com', }, ),