Skip to content

Commit a92d8b3

Browse files
Pijukatelvdusek
andauthored
feat: Add basic OpenTelemetry instrumentation (#1255)
### Description - Add `CrawlerInstrumentor` that can be used to add instrumentation for tracing requests and also to auto-instrument other `Crawlee` classes. - Add documentation guide about OpenTelemetry. - Add test for the `CrawlerInstrumentor`. - Do a minor refactoring of `ContextPipeline` and middleware handling to make it easily observable. ### Issues - Closes: #1254 --------- Co-authored-by: Vlada Dusek <v.dusek96@gmail.com>
1 parent 4f8004c commit a92d8b3

File tree

10 files changed

+715
-39
lines changed

10 files changed

+715
-39
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import asyncio
2+
3+
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
4+
from opentelemetry.sdk.resources import Resource
5+
from opentelemetry.sdk.trace import TracerProvider
6+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
7+
from opentelemetry.trace import set_tracer_provider
8+
9+
from crawlee.crawlers import BasicCrawlingContext, ParselCrawler, ParselCrawlingContext
10+
from crawlee.otel import CrawlerInstrumentor
11+
from crawlee.storages import Dataset, KeyValueStore, RequestQueue
12+
13+
14+
def instrument_crawler() -> None:
15+
"""Add instrumentation to the crawler."""
16+
resource = Resource.create(
17+
{
18+
'service.name': 'ExampleCrawler',
19+
'service.version': '1.0.0',
20+
'environment': 'development',
21+
}
22+
)
23+
24+
# Set up the OpenTelemetry tracer provider and exporter
25+
provider = TracerProvider(resource=resource)
26+
otlp_exporter = OTLPSpanExporter(endpoint='localhost:4317', insecure=True)
27+
provider.add_span_processor(SimpleSpanProcessor(otlp_exporter))
28+
set_tracer_provider(provider)
29+
# Instrument the crawler with OpenTelemetry
30+
CrawlerInstrumentor(
31+
instrument_classes=[RequestQueue, KeyValueStore, Dataset]
32+
).instrument()
33+
34+
35+
async def main() -> None:
36+
"""Run the crawler."""
37+
instrument_crawler()
38+
39+
crawler = ParselCrawler(max_requests_per_crawl=100)
40+
kvs = await KeyValueStore.open()
41+
42+
@crawler.pre_navigation_hook
43+
async def pre_nav_hook(_: BasicCrawlingContext) -> None:
44+
# Simulate some pre-navigation processing
45+
await asyncio.sleep(0.01)
46+
47+
@crawler.router.default_handler
48+
async def handler(context: ParselCrawlingContext) -> None:
49+
await context.push_data({'url': context.request.url})
50+
await kvs.set_value(key='url', value=context.request.url)
51+
await context.enqueue_links()
52+
53+
await crawler.run(['https://crawlee.dev/'])
54+
55+
56+
if __name__ == '__main__':
57+
asyncio.run(main())
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
---
2+
id: trace-and-monitor-crawlers
3+
title: Trace and monitor crawlers
4+
description: Learn how to instrument your crawlers with OpenTelemetry to trace request handling, identify bottlenecks, monitor performance, and visualize telemetry data using Jaeger for performance optimization.
5+
---
6+
7+
import ApiLink from '@site/src/components/ApiLink';
8+
import CodeBlock from '@theme/CodeBlock';
9+
10+
import InstrumentCrawler from '!!raw-loader!./code_examples/trace_and_monitor_crawlers/instrument_crawler.py';
11+
12+
[OpenTelemtery](https://opentelemetry.io/) is a collection of APIs, SDKs, and tools to instrument, generate, collect, and export telemetry data (metrics, logs, and traces) to help you analyze your software’s performance and behavior. In the context of crawler development, it can be used to better understand how the crawler internally works, identify bottlenecks, debug, log metrics, and more. The topic described in this guide requires at least a basic understanding of OpenTelemetry. A good place to start is [What is open telemetry](https://opentelemetry.io/docs/what-is-opentelemetry/).
13+
14+
In this guide, it will be shown how to set up OpenTelemetry and instrument a specific crawler to see traces of individual requests that are being processed by the crawler. OpenTelemetry on its own does not provide out of the box tool for convenient visualisation of the exported data (apart from printing to the console), but there are several good available tools to do that. In this guide, we will use [Jaeger](https://www.jaegertracing.io/) to visualise the telemetry data. To better understand concepts such as exporter, collector, and visualisation backend, please refer to the [OpenTelemetry documentation](https://opentelemetry.io/docs/collector/).
15+
16+
## Set up the Jaeger
17+
18+
This guide will show how to set up the environment locally to run the example code and visualize the telemetry data in Jaeger that will be running locally in a [docker](https://www.docker.com/) container.
19+
20+
To start the preconfigured Docker container, you can use the following command:
21+
22+
```bash
23+
docker run -d --name jaeger -e COLLECTOR_OTLP_ENABLED=true -p 16686:16686 -p 4317:4317 -p 4318:4318 jaegertracing/all-in-one:latest
24+
```
25+
For more details about the Jaeger setup, see the [getting started](https://www.jaegertracing.io/docs/2.7/getting-started/) section in their documentation.
26+
You can see the Jaeger UI in your browser by navigating to http://localhost:16686
27+
28+
## Instrument the Crawler
29+
30+
Now you can proceed with instrumenting the crawler to send the telemetry data to Jaeger and running it. To have the Python environment ready, you should install either **crawlee[all]** or **crawlee[otel]**, This will ensure that OpenTelemetry dependencies are installed, and you can run the example code snippet.
31+
In the following example, you can see the function `instrument_crawler` that contains the instrumentation setup and is called before the crawler is started. If you have already set up the Jaeger, then you can just run the following code snippet.
32+
33+
<CodeBlock className="language-python">
34+
{InstrumentCrawler}
35+
</CodeBlock>
36+
37+
## Analyze the results
38+
39+
In the Jaeger UI, you can search for different traces, apply filtering, compare traces, view their detailed attributes, view timing details, and more. For the detailed description of the tool's capabilities, please refer to the [Jaeger documentation](https://www.jaegertracing.io/docs/1.47/deployment/frontend-ui/#trace-page).
40+
41+
![Jaeger search view](/img/guides/jaeger_otel_search_view_example.png 'Example visualisation of search view in Jaeger')
42+
![Jaeger trace view](/img/guides/jaeger_otel_trace_example.png 'Example visualisation of crawler request trace in Jaeger')
43+
44+
You can use different tools to consume the OpenTelemetry data that might better suit your needs. Please see the list of known Vendors in [OpenTelemetry documentation](https://opentelemetry.io/ecosystem/vendors/).
45+
46+
## Customize the instrumentation
47+
48+
You can customize the <ApiLink to="class/CrawlerInstrumentor">`CrawlerInstrumentor`</ApiLink>. Depending on the arguments used during its initialization, the instrumentation will be applied to different parts ot the Crawlee code. By default, it instruments some functions that can give quite a good picture of each individual request handling. To turn this default instrumentation off, you can pass `request_handling_instrumentation=False` during initialization. You can also extend instrumentation by passing `instrument_classes=[...]` initialization argument that contains classes you want to be auto-instrumented. All their public methods will be automatically instrumented. Bear in mind that instrumentation has some runtime costs as well. The more instrumentation is used, the more overhead it will add to the crawler execution.
49+
50+
You can also create your instrumentation by selecting only the methods you want to instrument. For more details, see the <ApiLink to="class/CrawlerInstrumentor">`CrawlerInstrumentor`</ApiLink> source code and the [Python documentation for OpenTelemetry](https://opentelemetry.io/docs/languages/python/).
51+
52+
If you have questions or need assistance, feel free to reach out on our [GitHub](https://github.com/apify/crawlee-python) or join our [Discord community](https://discord.com/invite/jyEM2PRvMU).

pyproject.toml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,17 @@ all = [
6161
"html5lib>=1.0",
6262
"inquirer>=3.3.0",
6363
"jaro-winkler>=2.0.3",
64+
"opentelemetry-api>=1.34.1",
65+
"opentelemetry-distro[otlp]>=0.54",
66+
"opentelemetry-instrumentation>=0.54",
67+
"opentelemetry-instrumentation-httpx>=0.54",
68+
"opentelemetry-sdk>=1.34.1",
69+
"opentelemetry-semantic-conventions>=0.54",
6470
"parsel>=1.10.0",
6571
"playwright>=1.27.0",
6672
"scikit-learn>=1.6.0",
6773
"typer>=0.12.0",
74+
"wrapt>=1.17.0",
6875
]
6976
adaptive-crawler = [
7077
"jaro-winkler>=2.0.3",
@@ -76,6 +83,15 @@ cli = ["cookiecutter>=2.6.0", "inquirer>=3.3.0", "rich>=13.9.0", "typer>=0.12.0"
7683
curl-impersonate = ["curl-cffi>=0.9.0"]
7784
parsel = ["parsel>=1.10.0"]
7885
playwright = ["playwright>=1.27.0"]
86+
otel = [
87+
"opentelemetry-api>=1.34.1",
88+
"opentelemetry-distro[otlp]>=0.54",
89+
"opentelemetry-instrumentation>=0.54",
90+
"opentelemetry-instrumentation-httpx>=0.54",
91+
"opentelemetry-sdk>=1.34.1",
92+
"opentelemetry-semantic-conventions>=0.54",
93+
"wrapt>=1.17.0",
94+
]
7995

8096
[project.scripts]
8197
crawlee = "crawlee._cli:cli"
@@ -254,6 +270,7 @@ module = [
254270
"cookiecutter.*", # Untyped and stubs not available
255271
"inquirer.*", # Untyped and stubs not available
256272
"warcio.*", # Example code shows WARC files creation.
273+
"wrapt" # Untyped and stubs not available
257274
]
258275
ignore_missing_imports = true
259276

src/crawlee/crawlers/_basic/_context_pipeline.py

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,38 @@
2121
TMiddlewareCrawlingContext = TypeVar('TMiddlewareCrawlingContext', bound=BasicCrawlingContext)
2222

2323

24+
class _Middleware(Generic[TMiddlewareCrawlingContext, TCrawlingContext]):
25+
"""Helper wrapper class to make the middleware easily observable by open telemetry instrumentation."""
26+
27+
def __init__(
28+
self,
29+
middleware: Callable[
30+
[TCrawlingContext],
31+
AsyncGenerator[TMiddlewareCrawlingContext, Exception | None],
32+
],
33+
input_context: TCrawlingContext,
34+
) -> None:
35+
self.generator = middleware(input_context)
36+
self.input_context = input_context
37+
self.output_context: TMiddlewareCrawlingContext | None = None
38+
39+
async def action(self) -> TMiddlewareCrawlingContext:
40+
self.output_context = await self.generator.__anext__()
41+
return self.output_context
42+
43+
async def cleanup(self, final_consumer_exception: Exception | None) -> None:
44+
try:
45+
await self.generator.asend(final_consumer_exception)
46+
except StopAsyncIteration:
47+
pass
48+
except ContextPipelineInterruptedError as e:
49+
raise RuntimeError('Invalid state - pipeline interrupted in the finalization step') from e
50+
except Exception as e:
51+
raise ContextPipelineFinalizationError(e, self.output_context or self.input_context) from e
52+
else:
53+
raise RuntimeError('The middleware yielded more than once')
54+
55+
2456
@docs_group('Classes')
2557
class ContextPipeline(Generic[TCrawlingContext]):
2658
"""Encapsulates the logic of gradually enhancing the crawling context with additional information and utilities.
@@ -57,15 +89,15 @@ async def __call__(
5789
Exceptions from the consumer function are wrapped together with the final crawling context.
5890
"""
5991
chain = list(self._middleware_chain())
60-
cleanup_stack: list[AsyncGenerator[Any, Exception | None]] = []
92+
cleanup_stack: list[_Middleware[Any]] = []
6193
final_consumer_exception: Exception | None = None
6294

6395
try:
6496
for member in reversed(chain):
6597
if member._middleware: # noqa: SLF001
66-
middleware_instance = member._middleware(crawling_context) # noqa: SLF001
98+
middleware_instance = _Middleware(middleware=member._middleware, input_context=crawling_context) # noqa: SLF001
6799
try:
68-
result = await middleware_instance.__anext__()
100+
result = await middleware_instance.action()
69101
except SessionError: # Session errors get special treatment
70102
raise
71103
except StopAsyncIteration as e:
@@ -88,16 +120,7 @@ async def __call__(
88120
raise RequestHandlerError(e, crawling_context) from e
89121
finally:
90122
for middleware_instance in reversed(cleanup_stack):
91-
try:
92-
result = await middleware_instance.asend(final_consumer_exception)
93-
except StopAsyncIteration: # noqa: PERF203
94-
pass
95-
except ContextPipelineInterruptedError as e:
96-
raise RuntimeError('Invalid state - pipeline interrupted in the finalization step') from e
97-
except Exception as e:
98-
raise ContextPipelineFinalizationError(e, crawling_context) from e
99-
else:
100-
raise RuntimeError('The middleware yielded more than once')
123+
await middleware_instance.cleanup(final_consumer_exception)
101124

102125
def compose(
103126
self,

src/crawlee/otel/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from crawlee.otel.crawler_instrumentor import CrawlerInstrumentor
2+
3+
__all__ = [
4+
'CrawlerInstrumentor',
5+
]
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
from __future__ import annotations
2+
3+
import inspect
4+
from typing import TYPE_CHECKING, Any
5+
6+
from opentelemetry.instrumentation.instrumentor import ( # type:ignore[attr-defined] # Mypy has troubles with OTEL
7+
BaseInstrumentor,
8+
)
9+
from opentelemetry.instrumentation.utils import unwrap
10+
from opentelemetry.semconv.attributes.code_attributes import CODE_FUNCTION_NAME
11+
from opentelemetry.semconv.attributes.http_attributes import HTTP_REQUEST_METHOD
12+
from opentelemetry.semconv.attributes.url_attributes import URL_FULL
13+
from opentelemetry.trace import get_tracer
14+
from wrapt import wrap_function_wrapper
15+
16+
from crawlee._utils.docs import docs_group
17+
from crawlee.crawlers import BasicCrawler, ContextPipeline
18+
from crawlee.crawlers._basic._context_pipeline import _Middleware
19+
20+
if TYPE_CHECKING:
21+
from collections.abc import Callable
22+
23+
from crawlee.crawlers import BasicCrawlingContext
24+
25+
26+
@docs_group('Classes')
27+
class CrawlerInstrumentor(BaseInstrumentor):
28+
"""Helper class for instrumenting crawlers with OpenTelemetry."""
29+
30+
def __init__(
31+
self, *, instrument_classes: list[type] | None = None, request_handling_instrumentation: bool = True
32+
) -> None:
33+
"""Initialize the instrumentor.
34+
35+
Args:
36+
instrument_classes: List of classes to be instrumented - all their public methods and coroutines will be
37+
wrapped by generic instrumentation wrapper that will create spans for them.
38+
request_handling_instrumentation: Handpicked most interesting methods to instrument in the request handling
39+
pipeline.
40+
"""
41+
self._tracer = get_tracer(__name__)
42+
43+
async def _simple_async_wrapper(wrapped: Any, _: Any, args: Any, kwargs: Any) -> Any:
44+
with self._tracer.start_as_current_span(
45+
name=wrapped.__name__, attributes={CODE_FUNCTION_NAME: wrapped.__qualname__}
46+
):
47+
return await wrapped(*args, **kwargs)
48+
49+
def _simple_wrapper(wrapped: Any, _: Any, args: Any, kwargs: Any) -> Any:
50+
with self._tracer.start_as_current_span(
51+
name=wrapped.__name__, attributes={CODE_FUNCTION_NAME: wrapped.__qualname__}
52+
):
53+
return wrapped(*args, **kwargs)
54+
55+
def _init_wrapper(wrapped: Any, _: Any, args: Any, kwargs: Any) -> None:
56+
with self._tracer.start_as_current_span(
57+
name=wrapped.__name__, attributes={CODE_FUNCTION_NAME: wrapped.__qualname__}
58+
):
59+
wrapped(*args, **kwargs)
60+
61+
self._instrumented: list[tuple[Any, str, Callable]] = []
62+
self._simple_wrapper = _simple_wrapper
63+
self._simple_async_wrapper = _simple_async_wrapper
64+
self._init_wrapper = _init_wrapper
65+
66+
if instrument_classes:
67+
for _class in instrument_classes:
68+
self._instrument_all_public_methods(on_class=_class)
69+
70+
if request_handling_instrumentation:
71+
72+
async def middlware_wrapper(wrapped: Any, instance: _Middleware, args: Any, kwargs: Any) -> Any:
73+
with self._tracer.start_as_current_span(
74+
name=f'{instance.generator.__name__}, {wrapped.__name__}', # type:ignore[attr-defined] # valid in our context
75+
attributes={
76+
URL_FULL: instance.input_context.request.url,
77+
CODE_FUNCTION_NAME: instance.generator.__qualname__, # type:ignore[attr-defined] # valid in our context
78+
},
79+
):
80+
return await wrapped(*args, **kwargs)
81+
82+
async def context_pipeline_wrapper(
83+
wrapped: Any, _: ContextPipeline[BasicCrawlingContext], args: Any, kwargs: Any
84+
) -> Any:
85+
context = args[0]
86+
final_context_consumer = args[1]
87+
88+
async def wrapped_final_consumer(*args: Any, **kwargs: Any) -> Any:
89+
with self._tracer.start_as_current_span(
90+
name='request_handler',
91+
attributes={URL_FULL: context.request.url, HTTP_REQUEST_METHOD: context.request.method},
92+
):
93+
return await final_context_consumer(*args, **kwargs)
94+
95+
with self._tracer.start_as_current_span(
96+
name='ContextPipeline',
97+
attributes={URL_FULL: context.request.url, HTTP_REQUEST_METHOD: context.request.method},
98+
):
99+
return await wrapped(context, wrapped_final_consumer, **kwargs)
100+
101+
async def _commit_request_handler_result_wrapper(
102+
wrapped: Callable[[Any], Any], _: BasicCrawler, args: Any, kwargs: Any
103+
) -> Any:
104+
context = args[0]
105+
with self._tracer.start_as_current_span(
106+
name='Commit results',
107+
attributes={URL_FULL: context.request.url, HTTP_REQUEST_METHOD: context.request.method},
108+
):
109+
return await wrapped(*args, **kwargs)
110+
111+
# Handpicked interesting methods to instrument
112+
self._instrumented.extend(
113+
[
114+
(_Middleware, 'action', middlware_wrapper),
115+
(_Middleware, 'cleanup', middlware_wrapper),
116+
(ContextPipeline, '__call__', context_pipeline_wrapper),
117+
(BasicCrawler, '_BasicCrawler__run_task_function', self._simple_async_wrapper),
118+
(BasicCrawler, '_commit_request_handler_result', _commit_request_handler_result_wrapper),
119+
]
120+
)
121+
122+
def instrumentation_dependencies(self) -> list[str]:
123+
"""Return a list of python packages with versions that will be instrumented."""
124+
return ['crawlee']
125+
126+
def _instrument_all_public_methods(self, on_class: type) -> None:
127+
public_coroutines = {
128+
name
129+
for name, member in inspect.getmembers(on_class, predicate=inspect.iscoroutinefunction)
130+
if not name.startswith('_')
131+
}
132+
public_methods = {
133+
name
134+
for name, member in inspect.getmembers(on_class, predicate=inspect.isfunction)
135+
if not name.startswith('_')
136+
} - public_coroutines
137+
138+
for coroutine in public_coroutines:
139+
self._instrumented.append((on_class, coroutine, self._simple_async_wrapper))
140+
141+
for method in public_methods:
142+
self._instrumented.append((on_class, method, self._simple_wrapper))
143+
144+
self._instrumented.append((on_class, '__init__', self._init_wrapper))
145+
146+
def _instrument(self, **_: Any) -> None:
147+
for _class, method, wrapper in self._instrumented:
148+
wrap_function_wrapper(_class, method, wrapper)
149+
150+
def _uninstrument(self, **_: Any) -> None:
151+
for _class, method, __ in self._instrumented:
152+
unwrap(_class, method)

0 commit comments

Comments
 (0)