Important
EXPERIMENTAL - This package is experimental and might introduce breaking changes. Use with caution in production environments.
A high-performance Kafka client for Dart using librdkafka
with Domain-Driven Design architecture. Supports both mock implementations for testing and real Kafka connectivity via FFI
bindings.
- π Type-safe: Strong typing with value objects and domain entities
- ποΈ Clean Architecture: Domain-Driven Design with layered architecture
β οΈ Error Handling: Comprehensive exception hierarchy with meaningful error messages- π§ͺ Testable: Clean separation of concerns enables easy unit testing with mocks
- β‘ High Performance: Built on librdkafka FFI bindings for production use
- π― Simple API: Easy-to-use factory methods and service classes
- π³ Docker Ready: Includes Docker Compose setup for local development
- π Full Coverage: 98 % test coverage with comprehensive test suite
Add this to your package's pubspec.yaml
file:
dependencies:
kafka_dart: ^1.0.0
Then run:
dart pub get
You need to install librdkafka on your system:
Ubuntu/Debian:
sudo apt-get install librdkafka-dev
Fedora/RHEL:
sudo dnf install librdkafka-devel
macOS:
brew install librdkafka
## π Quick Start
### Producer Example
```dart
import 'package:kafka_dart/kafka_dart.dart';
Future<void> main() async {
// For real Kafka connectivity
final producer = await KafkaFactory.createAndInitializeProducer(
bootstrapServers: 'localhost:9092',
);
try {
await producer.sendMessage(
topic: 'my-topic',
payload: 'Hello, Kafka!',
key: 'message-key',
);
await producer.flush();
print('β
Message sent to Kafka!');
} finally {
await producer.close();
}
}
import 'package:kafka_dart/kafka_dart.dart';
Future<void> main() async {
// For real Kafka connectivity
final consumer = await KafkaFactory.createAndInitializeConsumer(
bootstrapServers: 'localhost:9092',
groupId: 'my-consumer-group',
);
try {
await consumer.subscribe(['my-topic']);
for (int i = 0; i < 10; i++) {
final message = await consumer.pollMessage();
if (message != null) {
print('π¨ Received: ${message.payload.value}');
print('π Key: ${message.key.hasValue ? message.key.value : 'null'}');
await consumer.commitAsync();
}
await Future.delayed(Duration(seconds: 1));
}
} finally {
await consumer.close();
}
}
import 'package:kafka_dart/kafka_dart.dart';
Future<void> main() async {
// For testing - uses mock implementation
final producer = await KafkaFactory.createAndInitializeProducer(
bootstrapServers: 'localhost:9092',
useMock: true, // Safe for testing
);
try {
await producer.sendMessage(
topic: 'test-topic',
payload: 'Test message',
key: 'test-key',
);
print('β
Mock message sent!');
} finally {
await producer.close();
}
}
This library follows Domain-Driven Design principles with a clean, layered architecture:
βββββββββββββββββββββββββββββββββββββββ
β Presentation Layer β
β (Public API & Factories) β
βββββββββββββββββββββββββββββββββββββββ€
β Application Layer β
β (Services & Use Cases) β
βββββββββββββββββββββββββββββββββββββββ€
β Domain Layer β
β (Entities, Value Objects, etc.) β
βββββββββββββββββββββββββββββββββββββββ€
β Infrastructure Layer β
β (FFI Bindings & Repositories) β
βββββββββββββββββββββββββββββββββββββββ
- π¨ Presentation Layer: Public API and factory classes
- βοΈ Application Layer: Use cases and application services
- π§ Domain Layer: Core business logic with entities, value objects, and repository interfaces
- π§ Infrastructure Layer: FFI bindings to librdkafka and repository implementations
This project includes a comprehensive Makefile for common development tasks:
make test # Run all tests (uses mocks)
make test-coverage # Run tests with coverage (excludes infrastructure)
make coverage-html # Generate HTML coverage report and open in browser
make lint # Run Dart analyzer
make docs # Generate API documentation
make generate # Regenerate FFI bindings from librdkafka
make clean # Clean coverage files
make kafka-setup # Start Kafka + create test topics + open UI
make kafka-up # Start Kafka cluster only
make kafka-down # Stop Kafka cluster
The Kafka setup includes:
- Kafka broker on
localhost:9092
- Kafka UI on
http://localhost:8080
for monitoring - Pre-created topics for testing
# Run all unit tests (213 tests, 98% coverage)
make test
# Generate coverage report
make test-coverage
make coverage-html
All tests use mock implementations for safety and speed. The test suite covers:
- Domain entities and value objects
- Application services
- Repository interfaces
- Integration scenarios
1. Start Kafka environment:
make kafka-setup
2. Test with real FFI bindings:
dart run example/real_kafka_test.dart
3. Monitor with Kafka UI: Visit http://localhost:8080
4. Use Kafka CLI tools:
# Send messages
docker exec -it kafka kafka-console-producer --topic test-topic --bootstrap-server localhost:9092
# Read messages
docker exec -it kafka kafka-console-consumer --topic test-topic --from-beginning --bootstrap-server localhost:9092
Current test coverage: 98%
Coverage excludes:
- FFI bindings (auto-generated from librdkafka)
- Infrastructure adapters (tested through integration)
Generate coverage reports:
make test-coverage # Generate lcov.info
make coverage-html # Generate HTML report and open
final producer = await KafkaFactory.createAndInitializeProducer(
bootstrapServers: 'localhost:9092',
useMock: true, // Safe, no external dependencies
);
final producer = await KafkaFactory.createAndInitializeProducer(
bootstrapServers: 'localhost:9092',
// Uses librdkafka FFI bindings by default
);
The factory automatically chooses the appropriate repository implementation based on the useMock
parameter.
lib/
βββ src/
β βββ application/ # Application services
β β βββ services/ # Kafka producer/consumer services
β βββ domain/ # Domain layer
β β βββ entities/ # Domain entities
β β βββ value_objects/ # Value objects
β β βββ repositories/ # Repository interfaces
β β βββ exceptions/ # Domain exceptions
β βββ infrastructure/ # Infrastructure layer
β β βββ bindings/ # FFI bindings to librdkafka
β β βββ repositories/ # Repository implementations
β β βββ factories/ # Service factories
β βββ presentation/ # Public API (future)
βββ kafka_dart.dart # Main export file
example/
βββ real_kafka_test.dart # Real Kafka testing
βββ manual_kafka_test.dart # Manual testing with mocks
βββ ...
test/ # Comprehensive test suite
βββ integration_test.dart # Integration tests (mocks)
βββ domain/ # Domain layer tests
βββ application/ # Application layer tests
βββ ...
docker-compose.yml # Kafka development environment
Makefile # Development commands
scripts/setup-kafka.sh # Kafka setup automation
The library includes full FFI bindings to librdkafka:
Producer Features:
- β Message sending with keys and headers
- β Partition selection (specific or automatic)
- β Synchronous flushing
- β Proper resource cleanup
Consumer Features:
- β Topic subscription
- β Message polling with timeout
- β Offset management (sync/async commits)
- β Consumer group coordination
Configuration:
- Uses librdkafka-compatible properties
- Automatic platform detection (Linux/macOS/Windows)
- Error handling with meaningful exceptions
See the /example
folder for complete working examples:
real_kafka_test.dart
- Real FFI implementation testmanual_kafka_test.dart
- Mock implementation testproducer_example.dart
- Producer patternsconsumer_example.dart
- Consumer patterns
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Run tests (
make test
) - Run linting (
make lint
) - Test with real Kafka (
make kafka-setup && dart run example/real_kafka_test.dart
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
- All new features must include tests
- Use mock implementations in unit tests
- Test real implementations manually with Docker Kafka
- Follow Domain-Driven Design principles
- Maintain 95%+ test coverage
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
Copyright 2025 Β© Stefano Amorelli
- Built on top of librdkafka - the high-performance C/C++ Kafka client;
- Inspired by Domain-Driven Design principles;
- Thanks to the Dart FFI contributors for excellent tooling;
- Thanks to the Kafka community for robust ecosystem.
- Package: https://pub.dev/packages/kafka_dart
- Repository: https://github.com/stefanoamorelli/kafka_dart
- Issues: https://github.com/stefanoamorelli/kafka_dart/issues
- librdkafka: https://github.com/confluentinc/librdkafka
- Kafka Documentation: https://kafka.apache.org/documentation/