Skip to content

stefanoamorelli/kafka_dart

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

37 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Kafka Dart

Pub Version Dart SDK License: Apache 2.0 Coverage FFI Support

Important

EXPERIMENTAL - This package is experimental and might introduce breaking changes. Use with caution in production environments.

A high-performance Kafka client for Dart using librdkafka with Domain-Driven Design architecture. Supports both mock implementations for testing and real Kafka connectivity via FFI bindings.

πŸš€ Features

dart-kafka

  • πŸ”’ Type-safe: Strong typing with value objects and domain entities
  • πŸ—οΈ Clean Architecture: Domain-Driven Design with layered architecture
  • ⚠️ Error Handling: Comprehensive exception hierarchy with meaningful error messages
  • πŸ§ͺ Testable: Clean separation of concerns enables easy unit testing with mocks
  • ⚑ High Performance: Built on librdkafka FFI bindings for production use
  • 🎯 Simple API: Easy-to-use factory methods and service classes
  • 🐳 Docker Ready: Includes Docker Compose setup for local development
  • πŸ“Š Full Coverage: 98 % test coverage with comprehensive test suite

πŸ“¦ Installation

Add this to your package's pubspec.yaml file:

dependencies:
  kafka_dart: ^1.0.0

Then run:

dart pub get

Prerequisites

You need to install librdkafka on your system:

Ubuntu/Debian:

sudo apt-get install librdkafka-dev

Fedora/RHEL:

sudo dnf install librdkafka-devel

macOS:

brew install librdkafka

## πŸš€ Quick Start

### Producer Example

```dart
import 'package:kafka_dart/kafka_dart.dart';

Future<void> main() async {
  // For real Kafka connectivity
  final producer = await KafkaFactory.createAndInitializeProducer(
    bootstrapServers: 'localhost:9092',
  );

  try {
    await producer.sendMessage(
      topic: 'my-topic',
      payload: 'Hello, Kafka!',
      key: 'message-key',
    );
    await producer.flush();
    print('βœ… Message sent to Kafka!');
  } finally {
    await producer.close();
  }
}

Consumer Example

import 'package:kafka_dart/kafka_dart.dart';

Future<void> main() async {
  // For real Kafka connectivity  
  final consumer = await KafkaFactory.createAndInitializeConsumer(
    bootstrapServers: 'localhost:9092',
    groupId: 'my-consumer-group',
  );

  try {
    await consumer.subscribe(['my-topic']);
    
    for (int i = 0; i < 10; i++) {
      final message = await consumer.pollMessage();
      if (message != null) {
        print('πŸ“¨ Received: ${message.payload.value}');
        print('πŸ”‘ Key: ${message.key.hasValue ? message.key.value : 'null'}');
        await consumer.commitAsync();
      }
      await Future.delayed(Duration(seconds: 1));
    }
  } finally {
    await consumer.close();
  }
}

Testing with Mocks

import 'package:kafka_dart/kafka_dart.dart';

Future<void> main() async {
  // For testing - uses mock implementation
  final producer = await KafkaFactory.createAndInitializeProducer(
    bootstrapServers: 'localhost:9092',
    useMock: true, // Safe for testing
  );

  try {
    await producer.sendMessage(
      topic: 'test-topic',
      payload: 'Test message',
      key: 'test-key',
    );
    print('βœ… Mock message sent!');
  } finally {
    await producer.close();
  }
}

πŸ—οΈ Architecture

This library follows Domain-Driven Design principles with a clean, layered architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         Presentation Layer          β”‚
β”‚      (Public API & Factories)       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚         Application Layer           β”‚
β”‚     (Services & Use Cases)          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚           Domain Layer              β”‚
β”‚   (Entities, Value Objects, etc.)   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚        Infrastructure Layer         β”‚
β”‚   (FFI Bindings & Repositories)     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Layers

  • 🎨 Presentation Layer: Public API and factory classes
  • βš™οΈ Application Layer: Use cases and application services
  • 🧠 Domain Layer: Core business logic with entities, value objects, and repository interfaces
  • πŸ”§ Infrastructure Layer: FFI bindings to librdkafka and repository implementations

πŸ› οΈ Development & Testing

This project includes a comprehensive Makefile for common development tasks:

Testing Commands

make test              # Run all tests (uses mocks)
make test-coverage     # Run tests with coverage (excludes infrastructure)
make coverage-html     # Generate HTML coverage report and open in browser
make lint              # Run Dart analyzer

Development Commands

make docs              # Generate API documentation
make generate          # Regenerate FFI bindings from librdkafka
make clean             # Clean coverage files

Kafka Development Environment

make kafka-setup       # Start Kafka + create test topics + open UI
make kafka-up          # Start Kafka cluster only
make kafka-down        # Stop Kafka cluster

The Kafka setup includes:

  • Kafka broker on localhost:9092
  • Kafka UI on http://localhost:8080 for monitoring
  • Pre-created topics for testing

πŸ“Š Testing

Automated Testing

# Run all unit tests (213 tests, 98% coverage)
make test

# Generate coverage report  
make test-coverage
make coverage-html

All tests use mock implementations for safety and speed. The test suite covers:

  • Domain entities and value objects
  • Application services
  • Repository interfaces
  • Integration scenarios

Manual Testing with Real Kafka

1. Start Kafka environment:

make kafka-setup

2. Test with real FFI bindings:

dart run example/real_kafka_test.dart

3. Monitor with Kafka UI: Visit http://localhost:8080

4. Use Kafka CLI tools:

# Send messages
docker exec -it kafka kafka-console-producer --topic test-topic --bootstrap-server localhost:9092

# Read messages  
docker exec -it kafka kafka-console-consumer --topic test-topic --from-beginning --bootstrap-server localhost:9092

πŸ“ˆ Coverage

Current test coverage: 98%

Coverage excludes:

  • FFI bindings (auto-generated from librdkafka)
  • Infrastructure adapters (tested through integration)

Generate coverage reports:

make test-coverage    # Generate lcov.info
make coverage-html    # Generate HTML report and open

πŸ”„ Implementation Modes

Mock Mode (Default for Testing)

final producer = await KafkaFactory.createAndInitializeProducer(
  bootstrapServers: 'localhost:9092',
  useMock: true, // Safe, no external dependencies
);

Real Kafka Mode (Production)

final producer = await KafkaFactory.createAndInitializeProducer(
  bootstrapServers: 'localhost:9092',
  // Uses librdkafka FFI bindings by default
);

The factory automatically chooses the appropriate repository implementation based on the useMock parameter.

πŸ› οΈ Project Structure

lib/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ application/         # Application services
β”‚   β”‚   └── services/        # Kafka producer/consumer services
β”‚   β”œβ”€β”€ domain/             # Domain layer
β”‚   β”‚   β”œβ”€β”€ entities/       # Domain entities
β”‚   β”‚   β”œβ”€β”€ value_objects/  # Value objects
β”‚   β”‚   β”œβ”€β”€ repositories/   # Repository interfaces
β”‚   β”‚   └── exceptions/     # Domain exceptions
β”‚   β”œβ”€β”€ infrastructure/     # Infrastructure layer
β”‚   β”‚   β”œβ”€β”€ bindings/       # FFI bindings to librdkafka
β”‚   β”‚   β”œβ”€β”€ repositories/   # Repository implementations
β”‚   β”‚   └── factories/      # Service factories
β”‚   └── presentation/       # Public API (future)
└── kafka_dart.dart        # Main export file

example/
β”œβ”€β”€ real_kafka_test.dart    # Real Kafka testing
β”œβ”€β”€ manual_kafka_test.dart  # Manual testing with mocks
└── ...

test/                       # Comprehensive test suite
β”œβ”€β”€ integration_test.dart   # Integration tests (mocks)
β”œβ”€β”€ domain/                 # Domain layer tests
β”œβ”€β”€ application/            # Application layer tests
└── ...

docker-compose.yml          # Kafka development environment
Makefile                   # Development commands
scripts/setup-kafka.sh     # Kafka setup automation

πŸ”§ FFI Implementation

The library includes full FFI bindings to librdkafka:

Producer Features:

  • βœ… Message sending with keys and headers
  • βœ… Partition selection (specific or automatic)
  • βœ… Synchronous flushing
  • βœ… Proper resource cleanup

Consumer Features:

  • βœ… Topic subscription
  • βœ… Message polling with timeout
  • βœ… Offset management (sync/async commits)
  • βœ… Consumer group coordination

Configuration:

  • Uses librdkafka-compatible properties
  • Automatic platform detection (Linux/macOS/Windows)
  • Error handling with meaningful exceptions

πŸ“š Examples

See the /example folder for complete working examples:

🀝 Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Run tests (make test)
  4. Run linting (make lint)
  5. Test with real Kafka (make kafka-setup && dart run example/real_kafka_test.dart)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

Development Guidelines

  • All new features must include tests
  • Use mock implementations in unit tests
  • Test real implementations manually with Docker Kafka
  • Follow Domain-Driven Design principles
  • Maintain 95%+ test coverage

πŸ“„ License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.

Copyright 2025 Β© Stefano Amorelli

πŸ™ Acknowledgments

  • Built on top of librdkafka - the high-performance C/C++ Kafka client;
  • Inspired by Domain-Driven Design principles;
  • Thanks to the Dart FFI contributors for excellent tooling;
  • Thanks to the Kafka community for robust ecosystem.

πŸ”— Links

Packages

No packages published

Languages