Skip to content

TDengine Sink #931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ requirements:
- pandas >=1.0.0,<3.0
- elasticsearch >=8.17,<9
- rich >=13,<15
- python-dateutil >=2.8.2,<3

test:
imports:
Expand Down
189 changes: 189 additions & 0 deletions docs/connectors/sinks/tdengine-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# TDengine Sink

!!! info

This is a **Community** connector. Test it before using in production.

To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.

TDengine is an open source time series database optimized for IoT, connected vehicles, and industrial applications.

Quix Streams provides a sink to write processed data to TDengine.

## How To Install
The dependencies for this sink are not included in the default `quixstreams` package.

To install them, run the following command:

```commandline
pip install quixstreams[tdengine]
```

## How To Use

To sink data to TDengine, you need to create an instance of `TDengineSink` and pass it to the `StreamingDataFrame.sink()` method:

```python

app = Application(broker_address="127.0.0.1:9092")
topic = app.topic("cpu_topic")

def generate_subtable_name(row):
return f"cpu_{row['cpu_id']}"

tdengine_sink = TDengineSink(
host="http://localhost:6041",
username= "root",
password= "taosdata",
database="test_cpu",
supertable="cpu",
subtable = generate_subtable_name,
fields_keys=["percent"],
tags_keys=["cpu_id", "host", "region"],

)

sdf = app.dataframe(topic)
sdf.sink(tdengine_sink)

if __name__ == '__main__':
app.run()

```

## Configuration

### Parameter Overview
TDengineSink accepts the following configuration parameters:

- `host` - TDengine host in format `"http[s]://<host>[:<port>]"` (e.g., `"http://localhost:6041"`).
- `database` - Database name (must exist before use).
- `supertable` - Supertable name as a string, or a callable that receives the current message data and returns a string.
- `subtable` - Subtable name as a string, or a callable that receives the current message data and returns a string. If empty, a hash value will be generated from the data as the subtable name.
- `fields_keys` - Iterable (list) of strings used as TDengine "fields", or a callable that receives the current message data and returns an iterable of strings. If present, must not overlap with `tags_keys`. If empty, the whole record value will be used. Default: `()`.
- `tags_keys` - Iterable (list) of strings used as TDengine "tags", or a callable that receives the current message data and returns an iterable of strings. If present, must not overlap with `fields_keys`. Given keys are popped from the value dictionary since the same key cannot be both a tag and field. If empty, no tags will be sent. Default: `()`.
- `time_key` - Optional key to use as the timestamp for TDengine. If not set, the record's Kafka timestamp is used. Default: `None`.
- `time_precision` - Time precision for the timestamp. One of: `"ms"`, `"ns"`, `"us"`, `"s"`. Default: `"ms"`.
- `allow_missing_fields` - If `True`, skip missing field keys instead of raising `KeyError`. Default: `False`.
- `include_metadata_tags` - If `True`, includes the record's key, topic, and partition as tags. Default: `False`.
- `convert_ints_to_floats` - If `True`, converts all integer values to floats. Default: `False`.
- `batch_size` - Number of records to write to TDengine in one request. Only affects the size of one write request, not the number of records flushed on each checkpoint. Default: `1000`.
- `enable_gzip` - If `True`, enables gzip compression for writes. Default: `True`.
- `request_timeout_ms` - HTTP request timeout in milliseconds. Default: `10000`.
- `verify_ssl` - If `True`, verifies the SSL certificate. Default: `True`.
- `username` - TDengine username. Default: `""` (empty string).
- `password` - TDengine password. Default: `""` (empty string).
- `token` - TDengine cloud token. Default: `""` (empty string). Either `token` or `username` and `password` must be provided.
- `on_client_connect_success` - Optional callback after successful client authentication.
- `on_client_connect_failure` - Optional callback after failed client authentication (should accept the raised Exception as an argument).

See the [What data can be sent to TDengine](#what-data-can-be-sent-to-tdengine) section for more info on `fields_keys` and `tags_keys`.


### Parameter Examples


#### `subtable`
Accepts either:
- Static name (string)
- Dynamic generator (callable)

**Behavior:**
- If empty: Generates hash from data as subtable name
- Callable receives message data (dict) and returns string

#### Examples:
```python
# Static name
subtable = "meters"

# Dynamic name
def generate_subtable(row: dict) -> str:
"""Create subtable name from data"""
return f"meters_{row['id']}"

subtable = generate_subtable
```


### supertable
Same interface as subtable:

- String for static name
- Callable for dynamic name

### Database Validation
- Verifies database exists during setup

- Raises error if missing: `Database 'your_database' does not exist`


### Expected Behavior
1. Prerequisite: Database must exist before operation

Error if missing: Database 'your_database' does not exist

2. After successful setup:

- Message example sent to Kafka:

```json
{"host":"192.168.1.98","region":"EU","percent":12.5,"cpu_id":1}
```

- Creates the following tables in TDengine:

```text
taos> show stables;
stable_name |
=================================
cpu |

taos> show tables;
table_name |
=================================
cpu_1 |
```

- Data verification:

```text
taos> select * from cpu;
_ts | percent | cpu_id | host | region |
========================================================================
2025-06-27 15:02:17.125 | 12.5 | 1 | 192.168.1.98 | EU |
```



## Testing Locally

Rather than connect to a hosted TDengine instance, you can alternatively test your
application using a local instance of TDengine using Docker:

1. Execute in terminal:

```bash
docker run --rm -d --name tdengine \
-p 6030:6030 \
-p 6041:6041 \
-p 6043-6060:6043-6060 \
-e TZ=America/New_York \
-e LC_ALL=C.UTF-8 \
tdengine/tdengine:latest
```

2. Use the following authentication settings for `TDengineSink` to connect:

```python
TDengineSink(
host="http://localhost:6041",
username="root",
password="taosdata",
...
)
3. When finished, execute in terminal:

```bash
docker stop tdengine
```
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ all = [
"pymongo>=4.11,<5",
"pandas>=1.0.0,<3.0",
"elasticsearch>=8.17,<9",
"influxdb>=5.3,<6"
"influxdb>=5.3,<6",
"python-dateutil>=2.8.2,<3"
]

avro = ["fastavro>=1.8,<2.0"]
Expand All @@ -61,6 +62,7 @@ neo4j = ["neo4j>=5.27.0,<6"]
mongodb = ["pymongo>=4.11,<5"]
pandas = ["pandas>=1.0.0,<3.0"]
elasticsearch = ["elasticsearch>=8.17,<9"]
tdengine = ["python-dateutil>=2.8.2,<3"]

# AWS dependencies are separated by service to support
# different requirements in the future.
Expand Down
2 changes: 2 additions & 0 deletions quixstreams/sinks/community/tdengine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# ruff: noqa: F403
from .sink import *
109 changes: 109 additions & 0 deletions quixstreams/sinks/community/tdengine/date_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Utils to get right Date parsing function."""

import datetime
import threading
from datetime import timezone as tz
from sys import version_info

try:
from dateutil import parser
except ImportError as exc:
raise ImportError(
'Package "dateutil" is missing: '
"run pip install quixstreams[tdengine] to fix it"
) from exc


date_helper = None

lock_ = threading.Lock()


class DateHelper:
"""
DateHelper to groups different implementations of date operations.

If you would like to serialize the query results to custom timezone, you can use following code:

.. code-block:: python

from influxdb_client.client.util import date_utils
from influxdb_client.client.util.date_utils import DateHelper
import dateutil.parser
from dateutil import tz

def parse_date(date_string: str):
return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT+2'))

date_utils.date_helper = DateHelper()
date_utils.date_helper.parse_date = parse_date
"""

def __init__(self, timezone: datetime.tzinfo = tz.utc) -> None:
"""
Initialize defaults.

:param timezone: Default timezone used for serialization "datetime" without "tzinfo".
Default value is "UTC".
"""
self.timezone = timezone

def parse_date(self, date_string: str):
"""
Parse string into Date or Timestamp.

:return: Returns a :class:`datetime.datetime` object or compliant implementation
like :class:`class 'pandas._libs.tslibs.timestamps.Timestamp`
"""
pass

def to_nanoseconds(self, delta):
"""
Get number of nanoseconds in timedelta.

Solution comes from v1 client. Thx.
https://github.com/influxdata/influxdb-python/pull/811
"""
nanoseconds_in_days = delta.days * 86400 * 10**9
nanoseconds_in_seconds = delta.seconds * 10**9
nanoseconds_in_micros = delta.microseconds * 10**3

return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros

def to_utc(self, value: datetime):
"""
Convert datetime to UTC timezone.

:param value: datetime
:return: datetime in UTC
"""
if not value.tzinfo:
return self.to_utc(value.replace(tzinfo=self.timezone))
else:
return value.astimezone(tz.utc)


def get_date_helper() -> DateHelper:
"""
Return DateHelper with proper implementation.

If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else use 'dateutil.parse'.
"""
global date_helper
if date_helper is None:
with lock_:
# avoid duplicate initialization
if date_helper is None:
_date_helper = DateHelper()
try:
import ciso8601

_date_helper.parse_date = ciso8601.parse_datetime
except ModuleNotFoundError:
if (version_info.major, version_info.minor) >= (3, 11):
_date_helper.parse_date = datetime.datetime.fromisoformat
else:
_date_helper.parse_date = parser.parse
date_helper = _date_helper

return date_helper
Loading