A simple .NET input provider demonstrating clean architecture patterns in the DStream ecosystem. Generates sequential counter data with timestamps via stdin/stdout communication - perfect for testing output providers, validating data pipelines, and demonstrating the DStream .NET SDK.
counter-input-provider/
βββ Program.cs β Top-level statement entry point (5 lines)
βββ Config.cs β Configuration class (CounterConfig)
βββ Reader.cs β Core data reading logic (ReadAsync implementation)
This provider demonstrates clean architecture patterns for DStream input providers:
Purpose: Generate streaming data and emit envelopes
Interface: IInputProvider
from Katasec.DStream.Abstractions
public interface IInputProvider : IProvider
{
IAsyncEnumerable<Envelope> ReadAsync(IPluginContext ctx, CancellationToken ct);
}
Required Implementation:
ReadAsync
method: ReturnsIAsyncEnumerable<Envelope>
for streaming data generationEnvelope
structure:record struct Envelope(object Payload, IReadOnlyDictionary<string, object?> Meta)
- Error handling: Respect
CancellationToken
for graceful shutdown - Streaming pattern: Use
yield return
for continuous data generation
Key Responsibilities:
- β Generate data streams (counters, CDC, API polling, file watching)
- β
Create
Envelope
objects with rich payload and metadata - β Handle timing/intervals for data generation
- β Implement graceful shutdown on cancellation
- β Add metadata for downstream routing and debugging
Simple configuration class with JSON binding attributes:
public sealed record CounterConfig
{
/// <summary>Interval in milliseconds between counter increments</summary>
[JsonPropertyName("interval")]
public int Interval { get; init; } = 1000;
/// <summary>Maximum count before stopping (0 = infinite)</summary>
[JsonPropertyName("max_count")]
public int MaxCount { get; init; } = 0;
}
Configuration Features:
- β Uses record types for immutable configuration
- β JSON property name mapping for HCL/JSON compatibility
- β Default values for all properties
- β Clear documentation with XML comments
Modern C# top-level statements:
using Katasec.DStream.SDK.Core;
using CounterInputProvider;
// Top-level program entry point
await StdioProviderHost.RunInputProviderAsync<CounterInputProvider.CounterInputProvider, CounterInputProvider.CounterConfig>();
What StdioProviderHost
handles for you:
- β JSON configuration parsing from stdin
- β Provider instantiation and initialization
- β Envelope serialization to JSON for stdout
- β Process lifecycle and graceful shutdown
- β Error handling and logging to stderr
-
β Inherit from
ProviderBase<TConfig>
-
β Implement
IInputProvider
-
β Implement
ReadAsync
method:public async IAsyncEnumerable<Envelope> ReadAsync(IPluginContext ctx, [EnumeratorCancellation] CancellationToken ct) { while (!ct.IsCancellationRequested) { // Your data generation logic here: // - Create data payload // - Add rich metadata for downstream processing // - Yield envelope for streaming var data = new { /* your data structure */ }; var metadata = new Dictionary<string, object?> { /* routing/debug info */ }; yield return new Envelope(data, metadata); // Wait/delay between items if needed await Task.Delay(Config.Interval, ct); } }
-
β Use
[EnumeratorCancellation]
attribute for proper cancellation handling
# Generate 3 counter items with 500ms intervals
echo '{"interval": 500, "max_count": 3}' | bin/Release/net9.0/osx-x64/counter-input-provider
# Infinite counter (stop with Ctrl+C)
echo '{"interval": 1000}' | bin/Release/net9.0/osx-x64/counter-input-provider
# Using dotnet run for development
echo '{"interval": 500, "max_count": 3}' | /usr/local/share/dotnet/dotnet run
# Via DStream CLI
cd ~/progs/dstream/dstream
go run . run counter-to-console
# Manual pipeline testing
echo '{"interval": 500, "max_count": 3}' | ./counter-input-provider 2>/dev/null | \
echo '{"outputFormat": "simple"}' | ./console-output-provider
# Clean build using Makefile
make clean && make build
# Manual build
/usr/local/share/dotnet/dotnet build -c Release
/usr/local/share/dotnet/dotnet publish -c Release -r osx-x64 --self-contained
{"source":"","type":"","data":{"value":1,"timestamp":"2025-09-20T17:05:56.424303+00:00"},"metadata":{"seq":1,"interval_ms":500,"provider":"counter-input-provider"}}
{"source":"","type":"","data":{"value":2,"timestamp":"2025-09-20T17:05:56.980548+00:00"},"metadata":{"seq":2,"interval_ms":500,"provider":"counter-input-provider"}}
{"source":"","type":"","data":{"value":3,"timestamp":"2025-09-20T17:05:57.492708+00:00"},"metadata":{"seq":3,"interval_ms":500,"provider":"counter-input-provider"}}
data.value
: Sequential counter number (1, 2, 3, ...)data.timestamp
: ISO 8601 timestamp when item was generatedmetadata.seq
: Sequence number for debuggingmetadata.interval_ms
: Configured interval for referencemetadata.provider
: Provider identification
Accepts JSON configuration via stdin:
{
"interval": 1000,
"max_count": 100
}
Configuration Properties:
interval
(default: 1000): Milliseconds between counter incrementsmax_count
(default: 0): Maximum items to generate (0 = infinite)
- π§© Clear Separation: Configuration, business logic, and entry point isolated
- π§ Maintainable: Easy to modify data generation logic in Reader.cs
- π§ͺ Testable: Each component can be tested independently
- π¦ Reusable: Pattern works for any input provider (APIs, databases, files)
- β‘ Modern: Uses latest C# patterns (top-level statements, records)
task "counter-to-console" {
input {
provider_path = "./counter-input-provider"
config = {
interval = 1000
max_count = 50
}
}
output {
provider_path = "./console-output-provider"
config = {
outputFormat = "structured"
}
}
}
# Run via DStream CLI
dstream run counter-to-console
# The CLI handles:
# 1. Launching input/output provider processes
# 2. Piping data: input.stdout β CLI β output.stdin
# 3. Configuration injection and process lifecycle
- DStream .NET SDK - Main SDK documentation
- Console Output Provider - Companion output provider
- DStream CLI - Go-based orchestration engine
This counter provider demonstrates patterns used in production providers:
- SQL Server CDC Provider: ReadAsync β Monitor CDC tables, emit change events
- REST API Provider: ReadAsync β Poll endpoints, emit API responses
- File Watcher Provider: ReadAsync β Watch directories, emit file change events
- Kafka Consumer Provider: ReadAsync β Consume topics, emit Kafka messages
- Azure Service Bus Provider: ReadAsync β Receive queue messages, emit envelopes
The same clean architecture and ReadAsync
pattern applies regardless of the data source technology.
- Clone this repository as a template for your input provider
- Modify
Config.cs
with your provider's configuration needs - Update
Reader.cs
with your data generation logic - Test independently with echo/stdin before integrating with DStream CLI
- Build and distribute as self-contained binary or OCI container
This provider serves as a perfect template for building your own DStream input providers with clean, maintainable architecture!