Skip to content

refactor: botocore spans #3561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ def response_hook(span, service_name, operation_name, result):
from opentelemetry.semconv._incubating.attributes.cloud_attributes import (
CLOUD_REGION,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv._incubating.attributes.http_attributes import (
HTTP_STATUS_CODE,
)
from opentelemetry.semconv._incubating.attributes.rpc_attributes import (
RPC_METHOD,
RPC_SERVICE,
RPC_SYSTEM,
)
from opentelemetry.trace import get_tracer
from opentelemetry.trace.span import Span

Expand Down Expand Up @@ -276,9 +283,9 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
return original_func(*args, **kwargs)

attributes = {
SpanAttributes.RPC_SYSTEM: "aws-api",
SpanAttributes.RPC_SERVICE: call_context.service_id,
SpanAttributes.RPC_METHOD: call_context.operation,
RPC_SYSTEM: "aws-api",
RPC_SERVICE: call_context.service_id,
RPC_METHOD: call_context.operation,
CLOUD_REGION: call_context.region,
**get_server_attributes(call_context.endpoint_url),
}
Expand Down Expand Up @@ -375,7 +382,7 @@ def _apply_response_attributes(span: Span, result):

status_code = metadata.get("HTTPStatusCode")
if status_code is not None:
span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code)
span.set_attribute(HTTP_STATUS_CODE, status_code)


def _determine_call_context(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,37 @@
_BotocoreInstrumentorContext,
_DynamoDbExtension,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv._incubating.attributes.aws_attributes import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are a lot maybe just import aws_attributes and reference them as aws_attributes.AWS_DYNAMO_WHATEVER?

AWS_DYNAMODB_ATTRIBUTE_DEFINITIONS,
AWS_DYNAMODB_ATTRIBUTES_TO_GET,
AWS_DYNAMODB_CONSISTENT_READ,
AWS_DYNAMODB_CONSUMED_CAPACITY,
AWS_DYNAMODB_COUNT,
AWS_DYNAMODB_EXCLUSIVE_START_TABLE,
AWS_DYNAMODB_GLOBAL_SECONDARY_INDEX_UPDATES,
AWS_DYNAMODB_GLOBAL_SECONDARY_INDEXES,
AWS_DYNAMODB_INDEX_NAME,
AWS_DYNAMODB_ITEM_COLLECTION_METRICS,
AWS_DYNAMODB_LIMIT,
AWS_DYNAMODB_LOCAL_SECONDARY_INDEXES,
AWS_DYNAMODB_PROJECTION,
AWS_DYNAMODB_PROVISIONED_READ_CAPACITY,
AWS_DYNAMODB_PROVISIONED_WRITE_CAPACITY,
AWS_DYNAMODB_SCAN_FORWARD,
AWS_DYNAMODB_SCANNED_COUNT,
AWS_DYNAMODB_SEGMENT,
AWS_DYNAMODB_SELECT,
AWS_DYNAMODB_TABLE_COUNT,
AWS_DYNAMODB_TABLE_NAMES,
AWS_DYNAMODB_TOTAL_SEGMENTS,
)
from opentelemetry.semconv.attributes.db_attributes import (
DB_OPERATION_NAME,
DB_SYSTEM_NAME,
)
from opentelemetry.semconv.attributes.server_attributes import (
SERVER_ADDRESS,
)
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.span import Span

Expand Down Expand Up @@ -101,24 +131,22 @@ def assert_span(self, operation: str) -> Span:
self.assertEqual(1, len(spans))
span = spans[0]

self.assertEqual("dynamodb", span.attributes[SpanAttributes.DB_SYSTEM])
self.assertEqual(
operation, span.attributes[SpanAttributes.DB_OPERATION]
)
self.assertEqual("dynamodb", span.attributes[DB_SYSTEM_NAME])
self.assertEqual(operation, span.attributes[DB_OPERATION_NAME])
self.assertEqual(
"dynamodb.us-west-2.amazonaws.com",
span.attributes[SpanAttributes.NET_PEER_NAME],
span.attributes[SERVER_ADDRESS],
)
return span

def assert_table_names(self, span: Span, *table_names):
self.assertEqual(
tuple(table_names),
span.attributes[SpanAttributes.AWS_DYNAMODB_TABLE_NAMES],
span.attributes[AWS_DYNAMODB_TABLE_NAMES],
)

def assert_consumed_capacity(self, span: Span, *table_names):
cap = span.attributes[SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY]
cap = span.attributes[AWS_DYNAMODB_CONSUMED_CAPACITY]
self.assertEqual(len(cap), len(table_names))
cap_tables = set()
for item in cap:
Expand All @@ -129,52 +157,40 @@ def assert_consumed_capacity(self, span: Span, *table_names):
self.assertIn(table_name, cap_tables)

def assert_item_col_metrics(self, span: Span):
actual = span.attributes[
SpanAttributes.AWS_DYNAMODB_ITEM_COLLECTION_METRICS
]
actual = span.attributes[AWS_DYNAMODB_ITEM_COLLECTION_METRICS]
self.assertIsNotNone(actual)
json.loads(actual)

def assert_provisioned_read_cap(self, span: Span, expected: int):
actual = span.attributes[
SpanAttributes.AWS_DYNAMODB_PROVISIONED_READ_CAPACITY
]
actual = span.attributes[AWS_DYNAMODB_PROVISIONED_READ_CAPACITY]
self.assertEqual(expected, actual)

def assert_provisioned_write_cap(self, span: Span, expected: int):
actual = span.attributes[
SpanAttributes.AWS_DYNAMODB_PROVISIONED_WRITE_CAPACITY
]
actual = span.attributes[AWS_DYNAMODB_PROVISIONED_WRITE_CAPACITY]
self.assertEqual(expected, actual)

def assert_consistent_read(self, span: Span, expected: bool):
actual = span.attributes[SpanAttributes.AWS_DYNAMODB_CONSISTENT_READ]
actual = span.attributes[AWS_DYNAMODB_CONSISTENT_READ]
self.assertEqual(expected, actual)

def assert_projection(self, span: Span, expected: str):
actual = span.attributes[SpanAttributes.AWS_DYNAMODB_PROJECTION]
actual = span.attributes[AWS_DYNAMODB_PROJECTION]
self.assertEqual(expected, actual)

def assert_attributes_to_get(self, span: Span, *attrs):
self.assertEqual(
tuple(attrs),
span.attributes[SpanAttributes.AWS_DYNAMODB_ATTRIBUTES_TO_GET],
span.attributes[AWS_DYNAMODB_ATTRIBUTES_TO_GET],
)

def assert_index_name(self, span: Span, expected: str):
self.assertEqual(
expected, span.attributes[SpanAttributes.AWS_DYNAMODB_INDEX_NAME]
)
self.assertEqual(expected, span.attributes[AWS_DYNAMODB_INDEX_NAME])

def assert_limit(self, span: Span, expected: int):
self.assertEqual(
expected, span.attributes[SpanAttributes.AWS_DYNAMODB_LIMIT]
)
self.assertEqual(expected, span.attributes[AWS_DYNAMODB_LIMIT])

def assert_select(self, span: Span, expected: str):
self.assertEqual(
expected, span.attributes[SpanAttributes.AWS_DYNAMODB_SELECT]
)
self.assertEqual(expected, span.attributes[AWS_DYNAMODB_SELECT])

def assert_extension_item_col_metrics(self, operation: str):
span = self.tracer_provider.get_tracer("test").start_span("test")
Expand Down Expand Up @@ -259,15 +275,11 @@ def test_create_table(self):
self.assert_table_names(span, self.default_table_name)
self.assertEqual(
(json.dumps(global_sec_idx),),
span.attributes[
SpanAttributes.AWS_DYNAMODB_GLOBAL_SECONDARY_INDEXES
],
span.attributes[AWS_DYNAMODB_GLOBAL_SECONDARY_INDEXES],
)
self.assertEqual(
(json.dumps(local_sec_idx),),
span.attributes[
SpanAttributes.AWS_DYNAMODB_LOCAL_SECONDARY_INDEXES
],
span.attributes[AWS_DYNAMODB_LOCAL_SECONDARY_INDEXES],
)
self.assert_provisioned_read_cap(span, 42)

Expand Down Expand Up @@ -365,12 +377,10 @@ def test_list_tables(self):
span = self.assert_span("ListTables")
self.assertEqual(
"my_table",
span.attributes[SpanAttributes.AWS_DYNAMODB_EXCLUSIVE_START_TABLE],
)
self.assertEqual(
1, span.attributes[SpanAttributes.AWS_DYNAMODB_TABLE_COUNT]
span.attributes[AWS_DYNAMODB_EXCLUSIVE_START_TABLE],
)
self.assertEqual(5, span.attributes[SpanAttributes.AWS_DYNAMODB_LIMIT])
self.assertEqual(1, span.attributes[AWS_DYNAMODB_TABLE_COUNT])
self.assertEqual(5, span.attributes[AWS_DYNAMODB_LIMIT])

@mock_aws
def test_put_item(self):
Expand Down Expand Up @@ -417,9 +427,7 @@ def test_query(self):

span = self.assert_span("Query")
self.assert_table_names(span, self.default_table_name)
self.assertTrue(
span.attributes[SpanAttributes.AWS_DYNAMODB_SCAN_FORWARD]
)
self.assertTrue(span.attributes[AWS_DYNAMODB_SCAN_FORWARD])
self.assert_attributes_to_get(span, "id")
self.assert_consistent_read(span, True)
self.assert_index_name(span, "lsi")
Expand Down Expand Up @@ -447,16 +455,10 @@ def test_scan(self):

span = self.assert_span("Scan")
self.assert_table_names(span, self.default_table_name)
self.assertEqual(
21, span.attributes[SpanAttributes.AWS_DYNAMODB_SEGMENT]
)
self.assertEqual(
17, span.attributes[SpanAttributes.AWS_DYNAMODB_TOTAL_SEGMENTS]
)
self.assertEqual(1, span.attributes[SpanAttributes.AWS_DYNAMODB_COUNT])
self.assertEqual(
1, span.attributes[SpanAttributes.AWS_DYNAMODB_SCANNED_COUNT]
)
self.assertEqual(21, span.attributes[AWS_DYNAMODB_SEGMENT])
self.assertEqual(17, span.attributes[AWS_DYNAMODB_TOTAL_SEGMENTS])
self.assertEqual(1, span.attributes[AWS_DYNAMODB_COUNT])
self.assertEqual(1, span.attributes[AWS_DYNAMODB_SCANNED_COUNT])
self.assert_attributes_to_get(span, "id", "idl")
self.assert_consistent_read(span, True)
self.assert_index_name(span, "lsi")
Expand Down Expand Up @@ -517,11 +519,9 @@ def test_update_table(self):
self.assert_provisioned_write_cap(span, 19)
self.assertEqual(
(json.dumps(attr_definition),),
span.attributes[SpanAttributes.AWS_DYNAMODB_ATTRIBUTE_DEFINITIONS],
span.attributes[AWS_DYNAMODB_ATTRIBUTE_DEFINITIONS],
)
self.assertEqual(
(json.dumps(global_sec_idx_updates),),
span.attributes[
SpanAttributes.AWS_DYNAMODB_GLOBAL_SECONDARY_INDEX_UPDATES
],
span.attributes[AWS_DYNAMODB_GLOBAL_SECONDARY_INDEX_UPDATES],
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,23 @@
from opentelemetry.semconv._incubating.attributes.cloud_attributes import (
CLOUD_REGION,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv._incubating.attributes.http_attributes import (
HTTP_STATUS_CODE,
)
from opentelemetry.semconv._incubating.attributes.rpc_attributes import (
RPC_METHOD,
RPC_SERVICE,
RPC_SYSTEM,
)
from opentelemetry.semconv.attributes.exception_attributes import (
EXCEPTION_MESSAGE,
EXCEPTION_STACKTRACE,
EXCEPTION_TYPE,
)
from opentelemetry.semconv.attributes.server_attributes import (
SERVER_ADDRESS,
SERVER_PORT,
)
from opentelemetry.test.mock_textmap import MockTextMapPropagator
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.span import format_span_id, format_trace_id
Expand Down Expand Up @@ -61,15 +77,15 @@ def _make_client(self, service: str):

def _default_span_attributes(self, service: str, operation: str):
return {
SpanAttributes.RPC_SYSTEM: "aws-api",
SpanAttributes.RPC_SERVICE: service,
SpanAttributes.RPC_METHOD: operation,
RPC_SYSTEM: "aws-api",
RPC_SERVICE: service,
RPC_METHOD: operation,
CLOUD_REGION: self.region,
"retry_attempts": 0,
SpanAttributes.HTTP_STATUS_CODE: 200,
HTTP_STATUS_CODE: 200,
# Some services like IAM or STS have a global endpoint and exclude specified region.
SpanAttributes.SERVER_ADDRESS: f"{service.lower()}.{'' if self.region == 'aws-global' else self.region + '.'}amazonaws.com",
SpanAttributes.SERVER_PORT: 443,
SERVER_ADDRESS: f"{service.lower()}.{'' if self.region == 'aws-global' else self.region + '.'}amazonaws.com",
SERVER_PORT: 443,
}

def assert_only_span(self):
Expand Down Expand Up @@ -150,16 +166,16 @@ def test_exception(self):
span = spans[0]

expected = self._default_span_attributes("S3", "ListObjects")
expected.pop(SpanAttributes.HTTP_STATUS_CODE)
expected.pop(HTTP_STATUS_CODE)
expected.pop("retry_attempts")
self.assertEqual(expected, span.attributes)
self.assertIs(span.status.status_code, trace_api.StatusCode.ERROR)

self.assertEqual(1, len(span.events))
event = span.events[0]
self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes)
self.assertIn(SpanAttributes.EXCEPTION_TYPE, event.attributes)
self.assertIn(SpanAttributes.EXCEPTION_MESSAGE, event.attributes)
self.assertIn(EXCEPTION_STACKTRACE, event.attributes)
self.assertIn(EXCEPTION_TYPE, event.attributes)
self.assertIn(EXCEPTION_MESSAGE, event.attributes)

@mock_aws
def test_s3_client(self):
Expand Down Expand Up @@ -337,7 +353,7 @@ def test_sts_client(self):
span = self.assert_only_span()
expected = self._default_span_attributes("STS", "GetCallerIdentity")
expected["aws.request_id"] = ANY
expected[SpanAttributes.SERVER_ADDRESS] = "sts.amazonaws.com"
expected[SERVER_ADDRESS] = "sts.amazonaws.com"
# check for exact attribute set to make sure not to leak any sts secrets
self.assertEqual(expected, dict(span.attributes))

Expand Down Expand Up @@ -515,8 +531,8 @@ def test_server_attributes(self):
"EC2",
"DescribeInstances",
attributes={
SpanAttributes.SERVER_ADDRESS: f"ec2.{self.region}.amazonaws.com",
SpanAttributes.SERVER_PORT: 443,
SERVER_ADDRESS: f"ec2.{self.region}.amazonaws.com",
SERVER_PORT: 443,
},
)
self.memory_exporter.clear()
Expand All @@ -528,8 +544,8 @@ def test_server_attributes(self):
"IAM",
"ListUsers",
attributes={
SpanAttributes.SERVER_ADDRESS: "iam.amazonaws.com",
SpanAttributes.SERVER_PORT: 443,
SERVER_ADDRESS: "iam.amazonaws.com",
SERVER_PORT: 443,
CLOUD_REGION: "aws-global",
},
)
Expand All @@ -552,7 +568,7 @@ def test_server_attributes_with_custom_endpoint(self):
"S3",
"ListBuckets",
attributes={
SpanAttributes.SERVER_ADDRESS: "proxy.amazon.org",
SpanAttributes.SERVER_PORT: 2025,
SERVER_ADDRESS: "proxy.amazon.org",
SERVER_PORT: 2025,
},
)
Loading
Loading