Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
},
"require-dev": {
"composer/xdebug-handler": "^2.0",
"open-telemetry/dev-tools": "dev-main"
"open-telemetry/dev-tools": "dev-main",
"phpunit/phpunit": "^8.0 || ^9.5 || ^10.0"
},
"autoload": {
"psr-4": {
Expand Down
97 changes: 76 additions & 21 deletions src/Instrumentation/Symfony/src/MessengerInstrumentation.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ public static function register(): void
{
$instrumentation = new CachedInstrumentation('io.opentelemetry.contrib.php.symfony_messenger');

/**
* MessageBusInterface dispatches messages to the handlers.
*/
hook(

// Instrument MessageBusInterface (message dispatching)
hook(
MessageBusInterface::class,
'dispatch',
pre: static function (
Expand All @@ -54,19 +53,17 @@ public static function register(): void
$message = $params[0];
$messageClass = \get_class($message);

/** @psalm-suppress ArgumentTypeCoercion */
// Instrument dispatch as a "send" operation with SpanKind::KIND_PRODUCER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('DISPATCH %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER)
->spanBuilder(\sprintf('publish %s', $messageClass))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not "dispatch"?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The span types and names should be consistent with the spec here:
https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#span-kind

But it's kind of confusing TBH, it mentions both "publish" and "send", in the example it gives publish shop.orders, but then it doesn't give publish in operation type or span kind. 🤯

@brettmc can you clarify the spec wording here maybe?

->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for dispatch
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)

->setAttribute(self::ATTRIBUTE_MESSENGER_BUS, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
;
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();
$span = $builder
Expand Down Expand Up @@ -103,9 +100,7 @@ public static function register(): void
}
);

/**
* SenderInterface sends messages to a transport.
*/
// Instrument SenderInterface (sending messages to transport)
hook(
SenderInterface::class,
'send',
Expand All @@ -121,28 +116,88 @@ public static function register(): void
$envelope = $params[0];
$messageClass = \get_class($envelope->getMessage());

/** @psalm-suppress ArgumentTypeCoercion */
// Instrument sending as a "send" operation with SpanKind::KIND_PRODUCER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('send %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for sending
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)
->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();
$span = $builder
->setParent($parent)
->startSpan();

$context = $span->storeInContext($parent);

Context::storage()->attach($context);

return $params;
},
post: static function (
SenderInterface $sender,
array $params,
?Envelope $result,
?\Throwable $exception
): void {
$scope = Context::storage()->scope();
if (null === $scope) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());

if (null !== $exception) {
$span->recordException($exception, [
TraceAttributes::EXCEPTION_ESCAPED => true,
]);
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
}

$span->end();
}
);

// Instrument the receiving of messages (consumer-side)
hook(
ReceiverInterface::class,
'get',
static function (
SenderInterface $bus,
array $params,
string $class,
string $function,
?string $filename,
?int $lineno,
) use ($instrumentation): array {
/** @var Envelope $envelope */
$envelope = $params[0];
$messageClass = \get_class($envelope->getMessage());

// Instrument receiving as a "consume" operation with SpanKind::KIND_CONSUMER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('SEND %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER)
->spanBuilder(\sprintf('consume %s', $messageClass))
->setSpanKind(SpanKind::KIND_CONSUMER) // Set KIND_CONSUMER for receiving
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)

->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
;
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();

$span = $builder
->setParent($parent)
->startSpan();

$context = $span->storeInContext($parent);

Context::storage()->attach($context);

return $params;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
use Symfony\Component\Messenger\Transport\InMemoryTransport as LegacyInMemoryTransport;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

final class SendEmailMessage
{
Expand All @@ -35,14 +35,26 @@ protected function getMessenger(): MessageBusInterface
{
return new MessageBus();
}

protected function getTransport()
{
// Symfony 6+
// Symfony 6+ version of the transport
if (class_exists(InMemoryTransport::class)) {
return new InMemoryTransport();
}

// Symfony 5+
// Symfony 5+ fallback
return new LegacyInMemoryTransport();
}

protected function getReceiver()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this different enough from getTransport to require a method? It doesn't return a receiver but a transport...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RichardChukwu have you had any time to look further into this?

Copy link
Contributor Author

@RichardChukwu RichardChukwu Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ChrisLightfootWild, thanks for asking.

Not really, I'll appreciate any assistance please

{
// Symfony 6+ version of the receiver
if (class_exists(ReceiverInterface::class)) {
return new InMemoryTransport(); // Example transport acting as a receiver
}

// Symfony 5+ fallback
return new LegacyInMemoryTransport();
}

Expand All @@ -56,8 +68,7 @@ protected function getTransport()
public function test_dispatch_message($message, string $spanName, int $kind, array $attributes)
{
$bus = $this->getMessenger();

$bus->dispatch($message);
$bus->dispatch($message); // Target the correct interface (MessageBusInterface)

$this->assertCount(1, $this->storage);

Expand All @@ -73,6 +84,36 @@ public function test_dispatch_message($message, string $spanName, int $kind, arr
}
}

/**
* Test consumer span when processing a message
*/
public function test_consume_message()
{
$transport = $this->getReceiver(); // Use the correct receiver interface
$message = new SendEmailMessage('Hello Consumer');
$envelope = new Envelope($message);

// Simulate receiving the message via ReceiverInterface::get
$transport->get();

// Simulate message consumption (processing)
$bus = $this->getMessenger();
$bus->dispatch($message);

// After message is consumed, we expect a consumer span
$this->assertCount(1, $this->storage);

/** @var ImmutableSpan $span */
$span = $this->storage[0];

// We expect this to be a consumer span
$this->assertEquals('CONSUME OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getName());
$this->assertEquals(SpanKind::KIND_CONSUMER, $span->getKind());

$this->assertTrue($span->getAttributes()->has(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE));
$this->assertEquals('OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getAttributes()->get(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE));
}

/**
* @dataProvider sendDataProvider
* @param mixed $message
Expand Down Expand Up @@ -172,7 +213,7 @@ public function dispatchDataProvider(): array
[
new SendEmailMessage('Hello Again'),
'DISPATCH OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage',
SpanKind::KIND_PRODUCER,
SpanKind::KIND_PROCESS, // Correct SpanKind for dispatching
[
MessengerInstrumentation::ATTRIBUTE_MESSENGER_BUS => 'Symfony\Component\Messenger\MessageBus',
MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage',
Expand Down
Loading