-
Notifications
You must be signed in to change notification settings - Fork 85
TDengine Sink #931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
TDengine Sink #931
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
814db0b
moving to branch and adding table template name
jbrass d8ac006
adding dateutil for tdengine sink
jbrass 2e2adb8
formatted with black
jbrass a7bb19f
Merge branch 'main' into tdengine_sink
jbrass 78d5bd9
changes to address packaging
jbrass 67d49ce
Sink updates
jbrass e2ba7f7
enh: enhance subtable handling and improve database status checks
huskar-t 43b67da
refactor: clean up import order and improve code formatting
huskar-t b6e074b
Merge pull request #1 from huskar-t/tdengine_sink
jbrass fb113f5
Merge branch 'quixio:main' into tdengine_sink
jbrass 4625cf6
Adding TDengine sink md doc
jbrass 6fab192
fix: update database status check
huskar-t 26f3744
Merge pull request #2 from huskar-t/tdengine_sink
jbrass 6c67e6f
polishing it up
jbrass 669ff23
linting
jbrass 995e05c
Update __init__.py
tim-quix 9054a71
Update sink.py
tim-quix 054b13b
Update point.py
tim-quix ed173a5
Update point.py
tim-quix 7246e30
Update __init__.py
tim-quix 81b4caf
Update point.py
tim-quix 61f06f8
Update point.py
tim-quix File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
# TDengine Sink | ||
|
||
!!! info | ||
|
||
This is a **Community** connector. Test it before using in production. | ||
|
||
To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. | ||
|
||
TDengine is an open source time series database optimized for IoT, connected vehicles, and industrial applications. | ||
|
||
Quix Streams provides a sink to write processed data to TDengine. | ||
|
||
## How To Install | ||
The dependencies for this sink are not included in the default `quixstreams` package. | ||
|
||
To install them, run the following command: | ||
|
||
```commandline | ||
pip install quixstreams[tdengine] | ||
``` | ||
|
||
## How To Use | ||
|
||
To sink data to TDengine, you need to create an instance of `TDengineSink` and pass it to the `StreamingDataFrame.sink()` method: | ||
|
||
```python | ||
|
||
app = Application(broker_address="127.0.0.1:9092") | ||
topic = app.topic("cpu_topic") | ||
|
||
def generate_subtable_name(row): | ||
return f"cpu_{row['cpu_id']}" | ||
|
||
tdengine_sink = TDengineSink( | ||
host="http://localhost:6041", | ||
username= "root", | ||
password= "taosdata", | ||
database="test_cpu", | ||
supertable="cpu", | ||
subtable = generate_subtable_name, | ||
fields_keys=["percent"], | ||
tags_keys=["cpu_id", "host", "region"], | ||
|
||
) | ||
|
||
sdf = app.dataframe(topic) | ||
sdf.sink(tdengine_sink) | ||
|
||
if __name__ == '__main__': | ||
app.run() | ||
|
||
``` | ||
|
||
## Configuration | ||
|
||
### Parameter Overview | ||
TDengineSink accepts the following configuration parameters: | ||
|
||
- `host` - TDengine host in format `"http[s]://<host>[:<port>]"` (e.g., `"http://localhost:6041"`). | ||
- `database` - Database name (must exist before use). | ||
- `supertable` - Supertable name as a string, or a callable that receives the current message data and returns a string. | ||
- `subtable` - Subtable name as a string, or a callable that receives the current message data and returns a string. If empty, a hash value will be generated from the data as the subtable name. | ||
- `fields_keys` - Iterable (list) of strings used as TDengine "fields", or a callable that receives the current message data and returns an iterable of strings. If present, must not overlap with `tags_keys`. If empty, the whole record value will be used. Default: `()`. | ||
- `tags_keys` - Iterable (list) of strings used as TDengine "tags", or a callable that receives the current message data and returns an iterable of strings. If present, must not overlap with `fields_keys`. Given keys are popped from the value dictionary since the same key cannot be both a tag and field. If empty, no tags will be sent. Default: `()`. | ||
- `time_key` - Optional key to use as the timestamp for TDengine. If not set, the record's Kafka timestamp is used. Default: `None`. | ||
- `time_precision` - Time precision for the timestamp. One of: `"ms"`, `"ns"`, `"us"`, `"s"`. Default: `"ms"`. | ||
- `allow_missing_fields` - If `True`, skip missing field keys instead of raising `KeyError`. Default: `False`. | ||
- `include_metadata_tags` - If `True`, includes the record's key, topic, and partition as tags. Default: `False`. | ||
- `convert_ints_to_floats` - If `True`, converts all integer values to floats. Default: `False`. | ||
- `batch_size` - Number of records to write to TDengine in one request. Only affects the size of one write request, not the number of records flushed on each checkpoint. Default: `1000`. | ||
- `enable_gzip` - If `True`, enables gzip compression for writes. Default: `True`. | ||
- `request_timeout_ms` - HTTP request timeout in milliseconds. Default: `10000`. | ||
- `verify_ssl` - If `True`, verifies the SSL certificate. Default: `True`. | ||
- `username` - TDengine username. Default: `""` (empty string). | ||
- `password` - TDengine password. Default: `""` (empty string). | ||
- `token` - TDengine cloud token. Default: `""` (empty string). Either `token` or `username` and `password` must be provided. | ||
- `on_client_connect_success` - Optional callback after successful client authentication. | ||
- `on_client_connect_failure` - Optional callback after failed client authentication (should accept the raised Exception as an argument). | ||
|
||
See the [What data can be sent to TDengine](#what-data-can-be-sent-to-tdengine) section for more info on `fields_keys` and `tags_keys`. | ||
|
||
|
||
### Parameter Examples | ||
|
||
|
||
#### `subtable` | ||
Accepts either: | ||
- Static name (string) | ||
- Dynamic generator (callable) | ||
|
||
**Behavior:** | ||
- If empty: Generates hash from data as subtable name | ||
- Callable receives message data (dict) and returns string | ||
|
||
#### Examples: | ||
```python | ||
# Static name | ||
subtable = "meters" | ||
|
||
# Dynamic name | ||
def generate_subtable(row: dict) -> str: | ||
"""Create subtable name from data""" | ||
return f"meters_{row['id']}" | ||
|
||
subtable = generate_subtable | ||
``` | ||
|
||
|
||
### supertable | ||
Same interface as subtable: | ||
|
||
- String for static name | ||
- Callable for dynamic name | ||
|
||
### Database Validation | ||
- Verifies database exists during setup | ||
|
||
- Raises error if missing: `Database 'your_database' does not exist` | ||
|
||
|
||
### Expected Behavior | ||
1. Prerequisite: Database must exist before operation | ||
|
||
Error if missing: Database 'your_database' does not exist | ||
|
||
2. After successful setup: | ||
|
||
- Message example sent to Kafka: | ||
|
||
```json | ||
{"host":"192.168.1.98","region":"EU","percent":12.5,"cpu_id":1} | ||
``` | ||
|
||
- Creates the following tables in TDengine: | ||
|
||
```text | ||
taos> show stables; | ||
stable_name | | ||
================================= | ||
cpu | | ||
|
||
taos> show tables; | ||
table_name | | ||
================================= | ||
cpu_1 | | ||
``` | ||
|
||
- Data verification: | ||
|
||
```text | ||
taos> select * from cpu; | ||
_ts | percent | cpu_id | host | region | | ||
======================================================================== | ||
2025-06-27 15:02:17.125 | 12.5 | 1 | 192.168.1.98 | EU | | ||
``` | ||
|
||
|
||
|
||
## Testing Locally | ||
|
||
Rather than connect to a hosted TDengine instance, you can alternatively test your | ||
application using a local instance of TDengine using Docker: | ||
|
||
1. Execute in terminal: | ||
|
||
```bash | ||
docker run --rm -d --name tdengine \ | ||
-p 6030:6030 \ | ||
-p 6041:6041 \ | ||
-p 6043-6060:6043-6060 \ | ||
-e TZ=America/New_York \ | ||
-e LC_ALL=C.UTF-8 \ | ||
tdengine/tdengine:latest | ||
``` | ||
|
||
2. Use the following authentication settings for `TDengineSink` to connect: | ||
|
||
```python | ||
TDengineSink( | ||
host="http://localhost:6041", | ||
username="root", | ||
password="taosdata", | ||
... | ||
) | ||
3. When finished, execute in terminal: | ||
|
||
```bash | ||
docker stop tdengine | ||
``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# ruff: noqa: F403 | ||
from .sink import * |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
"""Utils to get right Date parsing function.""" | ||
|
||
import datetime | ||
import threading | ||
from datetime import timezone as tz | ||
from sys import version_info | ||
|
||
try: | ||
from dateutil import parser | ||
except ImportError as exc: | ||
raise ImportError( | ||
'Package "dateutil" is missing: ' | ||
"run pip install quixstreams[tdengine] to fix it" | ||
) from exc | ||
|
||
|
||
date_helper = None | ||
|
||
lock_ = threading.Lock() | ||
|
||
|
||
class DateHelper: | ||
""" | ||
DateHelper to groups different implementations of date operations. | ||
|
||
If you would like to serialize the query results to custom timezone, you can use following code: | ||
|
||
.. code-block:: python | ||
|
||
from influxdb_client.client.util import date_utils | ||
from influxdb_client.client.util.date_utils import DateHelper | ||
import dateutil.parser | ||
from dateutil import tz | ||
|
||
def parse_date(date_string: str): | ||
return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT+2')) | ||
|
||
date_utils.date_helper = DateHelper() | ||
date_utils.date_helper.parse_date = parse_date | ||
""" | ||
|
||
def __init__(self, timezone: datetime.tzinfo = tz.utc) -> None: | ||
""" | ||
Initialize defaults. | ||
|
||
:param timezone: Default timezone used for serialization "datetime" without "tzinfo". | ||
Default value is "UTC". | ||
""" | ||
self.timezone = timezone | ||
|
||
def parse_date(self, date_string: str): | ||
""" | ||
Parse string into Date or Timestamp. | ||
|
||
:return: Returns a :class:`datetime.datetime` object or compliant implementation | ||
like :class:`class 'pandas._libs.tslibs.timestamps.Timestamp` | ||
""" | ||
pass | ||
|
||
def to_nanoseconds(self, delta): | ||
""" | ||
Get number of nanoseconds in timedelta. | ||
|
||
Solution comes from v1 client. Thx. | ||
https://github.com/influxdata/influxdb-python/pull/811 | ||
""" | ||
nanoseconds_in_days = delta.days * 86400 * 10**9 | ||
nanoseconds_in_seconds = delta.seconds * 10**9 | ||
nanoseconds_in_micros = delta.microseconds * 10**3 | ||
|
||
return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros | ||
|
||
def to_utc(self, value: datetime): | ||
""" | ||
Convert datetime to UTC timezone. | ||
|
||
:param value: datetime | ||
:return: datetime in UTC | ||
""" | ||
if not value.tzinfo: | ||
return self.to_utc(value.replace(tzinfo=self.timezone)) | ||
else: | ||
return value.astimezone(tz.utc) | ||
|
||
|
||
def get_date_helper() -> DateHelper: | ||
""" | ||
Return DateHelper with proper implementation. | ||
|
||
If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else use 'dateutil.parse'. | ||
""" | ||
global date_helper | ||
if date_helper is None: | ||
with lock_: | ||
# avoid duplicate initialization | ||
if date_helper is None: | ||
_date_helper = DateHelper() | ||
try: | ||
import ciso8601 | ||
|
||
_date_helper.parse_date = ciso8601.parse_datetime | ||
except ModuleNotFoundError: | ||
if (version_info.major, version_info.minor) >= (3, 11): | ||
_date_helper.parse_date = datetime.datetime.fromisoformat | ||
else: | ||
_date_helper.parse_date = parser.parse | ||
date_helper = _date_helper | ||
|
||
return date_helper |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.