From 814db0bba1e7c16dc213b1764dd0e7182926e2b2 Mon Sep 17 00:00:00 2001 From: Joel Brass Date: Tue, 10 Jun 2025 15:09:24 -0700 Subject: [PATCH 01/18] moving to branch and adding table template name --- .../sinks/community/tdengine/__init__.py | 1 + .../sinks/community/tdengine/date_utils.py | 100 +++++ quixstreams/sinks/community/tdengine/point.py | 349 ++++++++++++++++++ quixstreams/sinks/community/tdengine/sink.py | 329 +++++++++++++++++ 4 files changed, 779 insertions(+) create mode 100644 quixstreams/sinks/community/tdengine/__init__.py create mode 100644 quixstreams/sinks/community/tdengine/date_utils.py create mode 100644 quixstreams/sinks/community/tdengine/point.py create mode 100644 quixstreams/sinks/community/tdengine/sink.py diff --git a/quixstreams/sinks/community/tdengine/__init__.py b/quixstreams/sinks/community/tdengine/__init__.py new file mode 100644 index 000000000..d765ad057 --- /dev/null +++ b/quixstreams/sinks/community/tdengine/__init__.py @@ -0,0 +1 @@ +from .sink import * diff --git a/quixstreams/sinks/community/tdengine/date_utils.py b/quixstreams/sinks/community/tdengine/date_utils.py new file mode 100644 index 000000000..e7e423893 --- /dev/null +++ b/quixstreams/sinks/community/tdengine/date_utils.py @@ -0,0 +1,100 @@ +"""Utils to get right Date parsing function.""" +import datetime +import threading +from datetime import timezone as tz +from sys import version_info + +from dateutil import parser + +date_helper = None + +lock_ = threading.Lock() + + +class DateHelper: + """ + DateHelper to groups different implementations of date operations. + + If you would like to serialize the query results to custom timezone, you can use following code: + + .. code-block:: python + + from influxdb_client.client.util import date_utils + from influxdb_client.client.util.date_utils import DateHelper + import dateutil.parser + from dateutil import tz + + def parse_date(date_string: str): + return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT+2')) + + date_utils.date_helper = DateHelper() + date_utils.date_helper.parse_date = parse_date + """ + + def __init__(self, timezone: datetime.tzinfo = tz.utc) -> None: + """ + Initialize defaults. + + :param timezone: Default timezone used for serialization "datetime" without "tzinfo". + Default value is "UTC". + """ + self.timezone = timezone + + def parse_date(self, date_string: str): + """ + Parse string into Date or Timestamp. + + :return: Returns a :class:`datetime.datetime` object or compliant implementation + like :class:`class 'pandas._libs.tslibs.timestamps.Timestamp` + """ + pass + + def to_nanoseconds(self, delta): + """ + Get number of nanoseconds in timedelta. + + Solution comes from v1 client. Thx. + https://github.com/influxdata/influxdb-python/pull/811 + """ + nanoseconds_in_days = delta.days * 86400 * 10 ** 9 + nanoseconds_in_seconds = delta.seconds * 10 ** 9 + nanoseconds_in_micros = delta.microseconds * 10 ** 3 + + return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros + + def to_utc(self, value: datetime): + """ + Convert datetime to UTC timezone. + + :param value: datetime + :return: datetime in UTC + """ + if not value.tzinfo: + return self.to_utc(value.replace(tzinfo=self.timezone)) + else: + return value.astimezone(tz.utc) + + +def get_date_helper() -> DateHelper: + """ + Return DateHelper with proper implementation. + + If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else use 'dateutil.parse'. + """ + global date_helper + if date_helper is None: + with lock_: + # avoid duplicate initialization + if date_helper is None: + _date_helper = DateHelper() + try: + import ciso8601 + _date_helper.parse_date = ciso8601.parse_datetime + except ModuleNotFoundError: + if (version_info.major, version_info.minor) >= (3, 11): + _date_helper.parse_date = datetime.datetime.fromisoformat + else: + _date_helper.parse_date = parser.parse + date_helper = _date_helper + + return date_helper diff --git a/quixstreams/sinks/community/tdengine/point.py b/quixstreams/sinks/community/tdengine/point.py new file mode 100644 index 000000000..43616d417 --- /dev/null +++ b/quixstreams/sinks/community/tdengine/point.py @@ -0,0 +1,349 @@ +import math +import warnings +from datetime import datetime, timezone, timedelta +from decimal import Decimal +from numbers import Integral + +from .date_utils import get_date_helper + +EPOCH = datetime.fromtimestamp(0, tz=timezone.utc) + +DEFAULT_WRITE_PRECISION = "ns" + +_ESCAPE_MEASUREMENT = str.maketrans({ + ',': r'\,', + ' ': r'\ ', + '\n': r'\n', + '\t': r'\t', + '\r': r'\r', +}) + +_ESCAPE_KEY = str.maketrans({ + ',': r'\,', + '=': r'\=', + ' ': r'\ ', + '\n': r'\n', + '\t': r'\t', + '\r': r'\r', +}) + +_ESCAPE_STRING = str.maketrans({ + '"': r'\"', + '\\': r'\\', +}) + +try: + import numpy as np + + _HAS_NUMPY = True +except ModuleNotFoundError: + _HAS_NUMPY = False + + +class Point(object): + @staticmethod + def measurement(measurement): + """Create a new Point with specified measurement name.""" + p = Point(measurement) + return p + + @staticmethod + def from_dict(dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs): + """ + Initialize point from 'dict' structure. + + The expected dict structure is: + - measurement + - tags + - fields + - time + + Example: + .. code-block:: python + + # Use default dictionary structure + dict_structure = { + "measurement": "h2o_feet", + "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 1.0}, + "time": 1 + } + point = Point.from_dict(dict_structure, "ns") + + Example: + .. code-block:: python + + # Use custom dictionary structure + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "version": "2021.06.05.5874", + "pressure": 125, + "temperature": 10, + "created": 1632208639, + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + record_measurement_key="name", + record_time_key="created", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) + + Int Types: + The following example shows how to configure the types of integers fields. + It is useful when you want to serialize integers always as ``float`` to avoid ``field type conflict`` + or use ``unsigned 64-bit integer`` as the type for serialization. + + .. code-block:: python + + # Use custom dictionary structure + dict_structure = { + "measurement": "h2o_feet", + "tags": {"location": "coyote_creek"}, + "fields": { + "water_level": 1.0, + "some_counter": 108913123234 + }, + "time": 1 + } + + point = Point.from_dict(dict_structure, field_types={"some_counter": "uint"}) + + :param dictionary: dictionary for serialize into data Point + :param write_precision: sets the precision for the supplied time values + :key record_measurement_key: key of dictionary with specified measurement + :key record_measurement_name: static measurement name for data Point + :key record_time_key: key of dictionary with specified timestamp + :key record_tag_keys: list of dictionary keys to use as a tag + :key record_field_keys: list of dictionary keys to use as a field + :key field_types: optional dictionary to specify types of serialized fields. Currently, is supported customization for integer types. + Possible integers types: + - ``int`` - serialize integers as "**Signed 64-bit integers**" - ``9223372036854775807i`` (default behaviour) + - ``uint`` - serialize integers as "**Unsigned 64-bit integers**" - ``9223372036854775807u`` + - ``float`` - serialize integers as "**IEEE-754 64-bit floating-point numbers**". Useful for unify number types in your pipeline to avoid field type conflict - ``9223372036854775807`` + The ``field_types`` can be also specified as part of incoming dictionary. For more info see an example above. + :return: new data point + """ # noqa: E501 + measurement_ = kwargs.get('record_measurement_name', None) + if measurement_ is None: + measurement_ = dictionary[kwargs.get('record_measurement_key', 'measurement')] + point = Point(measurement_) + + record_tag_keys = kwargs.get('record_tag_keys', None) + if record_tag_keys is not None: + for tag_key in record_tag_keys: + if tag_key in dictionary: + point.tag(tag_key, dictionary[tag_key]) + elif 'tags' in dictionary: + for tag_key, tag_value in dictionary['tags'].items(): + point.tag(tag_key, tag_value) + + record_field_keys = kwargs.get('record_field_keys', None) + if record_field_keys is not None: + for field_key in record_field_keys: + if field_key in dictionary: + point.field(field_key, dictionary[field_key]) + else: + for field_key, field_value in dictionary['fields'].items(): + point.field(field_key, field_value) + + record_time_key = kwargs.get('record_time_key', 'time') + if record_time_key in dictionary: + point.time(dictionary[record_time_key], write_precision=write_precision) + + _field_types = kwargs.get('field_types', {}) + if 'field_types' in dictionary: + _field_types = dictionary['field_types'] + # Map API fields types to Line Protocol types postfix: + # - int: 'i' + # - uint: 'u' + # - float: '' + point._field_types = dict(map( + lambda item: (item[0], 'i' if item[1] == 'int' else 'u' if item[1] == 'uint' else ''), + _field_types.items() + )) + + return point + + def __init__(self, measurement_name): + """Initialize defaults.""" + self._tags = {} + self._fields = {} + self._name = measurement_name + self._time = None + self._write_precision = DEFAULT_WRITE_PRECISION + self._field_types = {} + + def time(self, time, write_precision=DEFAULT_WRITE_PRECISION): + """ + Specify timestamp for DataPoint with declared precision. + + If time doesn't have specified timezone we assume that timezone is UTC. + + Examples:: + Point.measurement("h2o").field("val", 1).time("2009-11-10T23:00:00.123456Z") + Point.measurement("h2o").field("val", 1).time(1257894000123456000) + Point.measurement("h2o").field("val", 1).time(datetime(2009, 11, 10, 23, 0, 0, 123456)) + Point.measurement("h2o").field("val", 1).time(1257894000123456000, write_precision=WritePrecision.NS) + + + :param time: the timestamp for your data + :param write_precision: sets the precision for the supplied time values + :return: this point + """ + self._write_precision = write_precision + self._time = time + return self + + def tag(self, key, value): + """Add tag with key and value.""" + self._tags[key] = value + return self + + def field(self, field, value): + """Add field with key and value.""" + self._fields[field] = value + return self + + def to_line_protocol(self, precision=None): + """ + Create LineProtocol. + + :param precision: required precision of LineProtocol. If it's not set then use the precision from ``Point``. + """ + _measurement = _escape_key(self._name, _ESCAPE_MEASUREMENT) + if _measurement.startswith("#"): + message = f"""The measurement name '{_measurement}' start with '#'. + +The output Line protocol will be interpret as a comment by InfluxDB. For more info see: + - https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/#comments +""" + warnings.warn(message, SyntaxWarning) + _tags = _append_tags(self._tags) + _fields = _append_fields(self._fields, self._field_types) + if not _fields: + return "" + _time = _append_time(self._time, self._write_precision if precision is None else precision) + + return f"{_measurement}{_tags}{_fields}{_time}" + + @property + def write_precision(self): + """Get precision.""" + return self._write_precision + + @classmethod + def set_str_rep(cls, rep_function): + """Set the string representation for all Points.""" + cls.__str___rep = rep_function + + def __str__(self): + """Create string representation of this Point.""" + return self.to_line_protocol() + + +def _append_tags(tags): + _return = [] + for tag_key, tag_value in sorted(tags.items()): + + if tag_value is None: + continue + + tag = _escape_key(tag_key) + value = _escape_tag_value(tag_value) + if tag != '' and value != '': + _return.append(f'{tag}={value}') + + return f"{',' if _return else ''}{','.join(_return)} " + + +def _append_fields(fields, field_types): + _return = [] + + for field, value in sorted(fields.items()): + if value is None: + continue + + if isinstance(value, float) or isinstance(value, Decimal) or _np_is_subtype(value, 'float'): + if not math.isfinite(value): + continue + s = str(value) + # It's common to represent whole numbers as floats + # and the trailing ".0" that Python produces is unnecessary + # in line-protocol, inconsistent with other line-protocol encoders, + # and takes more space than needed, so trim it off. + if s.endswith('.0'): + s = s[:-2] + _return.append(f'{_escape_key(field)}={s}') + elif (isinstance(value, int) or _np_is_subtype(value, 'int')) and not isinstance(value, bool): + _type = field_types.get(field, "i") + _return.append(f'{_escape_key(field)}={str(value)}{_type}') + elif isinstance(value, bool): + _return.append(f'{_escape_key(field)}={str(value).lower()}') + elif isinstance(value, str): + _return.append(f'{_escape_key(field)}="{_escape_string(value)}"') + else: + raise ValueError(f'Type: "{type(value)}" of field: "{field}" is not supported.') + + return f"{','.join(_return)}" + + +def _append_time(time, write_precision) -> str: + if time is None: + return '' + return f" {int(_convert_timestamp(time, write_precision))}" + + +def _escape_key(tag, escape_list=None) -> str: + if escape_list is None: + escape_list = _ESCAPE_KEY + return str(tag).translate(escape_list) + + +def _escape_tag_value(value) -> str: + ret = _escape_key(value) + if ret.endswith('\\'): + ret += ' ' + return ret + + +def _escape_string(value) -> str: + return str(value).translate(_ESCAPE_STRING) + + +def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION): + date_helper = get_date_helper() + if isinstance(timestamp, Integral): + return timestamp # assume precision is correct if timestamp is int + + if isinstance(timestamp, str): + timestamp = date_helper.parse_date(timestamp) + + if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime): + + if isinstance(timestamp, datetime): + timestamp = date_helper.to_utc(timestamp) - EPOCH + + ns = date_helper.to_nanoseconds(timestamp) + + if precision is None or precision == "ns": + return ns + elif precision == "us": + return ns / 1e3 + elif precision == "ms": + return ns / 1e6 + elif precision == "s": + return ns / 1e9 + + raise ValueError(timestamp) + + +def _np_is_subtype(value, np_type): + if not _HAS_NUMPY or not hasattr(value, 'dtype'): + return False + + if np_type == 'float': + return np.issubdtype(value, np.floating) + elif np_type == 'int': + return np.issubdtype(value, np.integer) + return False diff --git a/quixstreams/sinks/community/tdengine/sink.py b/quixstreams/sinks/community/tdengine/sink.py new file mode 100644 index 000000000..5be0b0bd5 --- /dev/null +++ b/quixstreams/sinks/community/tdengine/sink.py @@ -0,0 +1,329 @@ +import base64 +import logging +import ssl +import sys +import time +from typing import Any, Callable, Iterable, Literal, Mapping, Optional, Union, get_args +from urllib.parse import urljoin, urlencode + +import urllib3 +from quixstreams.models import HeadersTuples + + +from quixstreams.sinks.base import ( + BatchingSink, + ClientConnectFailureCallback, + ClientConnectSuccessCallback, + SinkBackpressureError, + SinkBatch, +) + +from point import Point + +logger = logging.getLogger(__name__) + + +TimePrecision = Literal["ms", "ns", "us", "s"] + +InfluxDBValueMap = dict[str, Union[str, int, float, bool]] + +FieldsCallable = Callable[[InfluxDBValueMap], Iterable[str]] +MeasurementCallable = Callable[[InfluxDBValueMap], str] +TagsCallable = Callable[[InfluxDBValueMap], Iterable[str]] + + +FieldsSetter = Union[Iterable[str], FieldsCallable] +MeasurementSetter = Union[str, MeasurementCallable] +TagsSetter = Union[Iterable[str], TagsCallable] + + +class TDengineSink(BatchingSink): + + def __init__( + self, + host: str, + database: str, + measurement: MeasurementSetter, + table_name_key: str , + fields_keys: FieldsSetter = (), + tags_keys: TagsSetter = (), + time_key: Optional[str] = None, + time_precision: TimePrecision = "ms", + allow_missing_fields: bool = False, + include_metadata_tags: bool = False, + convert_ints_to_floats: bool = False, + batch_size: int = 1000, + enable_gzip: bool = True, + request_timeout_ms: int = 10_000, + on_client_connect_success: Optional[ClientConnectSuccessCallback] = None, + on_client_connect_failure: Optional[ClientConnectFailureCallback] = None, + verify_ssl: bool = True, + username: str = "", + password: str = "", + token: str = "", + ): + """ + A connector to sink processed data to TDengine. + + It batches the processed records in memory per topic partition, converts + them to the InfluxDB line protocol, and flushes them to TDengine at the checkpoint. + + >***NOTE***: TDengineSink can accept only dictionaries. + > If the record values are not dicts, you need to convert them to dicts before + > sinking. + + :param token: TDengine cloud token + :param host: TDengine host in format "https://" + :param username: TDengine username + :param password: TDengine password + :param verify_ssl: if `True`, verifies the SSL certificate. + Default - `True`. + :param database: database name + :param measurement: measurement name as a string. + Also accepts a single-argument callable that receives the current message + data as a dict and returns a string. + :param table_name_key: A tag key whose value is used as the subtable name when writing to TDengine. + If the data does not contain this tag key, a hash value will be generated from the data as the subtable name. + :param fields_keys: an iterable (list) of strings used as InfluxDB line protocol "fields". + Also accepts a singl e-argument callable that receives the current message + data as a dict and returns an iterable of strings. + - If present, it must not overlap with "tags_keys". + - If empty, the whole record value will be used. + >***NOTE*** The fields' values can only be strings, floats, integers, or booleans. + Default - `()`. + :param tags_keys: an iterable (list) of strings used as InfluxDB line protocol "tags". + Also accepts a single-argument callable that receives the current message + data as a dict and returns an iterable of strings. + - If present, it must not overlap with "fields_keys". + - Given keys are popped from the value dictionary since the same key + cannot be both a tag and field. + - If empty, no tags will be sent. + >***NOTE***: always converts tag values to strings. + Default - `()`. + :param time_key: a key to be used as "time" when convert to InfluxDB line protocol. + By default, the record timestamp will be used with "ms" time precision. + When using a custom key, you may need to adjust the `time_precision` setting + to match. + :param time_precision: a time precision to use when convert to InfluxDB line protocol. + Possible values: "ms", "ns", "us", "s". + Default - `"ms"`. + :param allow_missing_fields: if `True`, skip the missing fields keys, else raise `KeyError`. + Default - `False` + :param include_metadata_tags: if True, includes record's key, topic, + and partition as tags. + Default - `False`. + :param convert_ints_to_floats: if True, converts all integer values to floats. + Default - `False`. + :param batch_size: how many records to write to TDengine in one request. + Note that it only affects the size of one write request, and not the number + of records flushed on each checkpoint. + Default - `1000`. + :param enable_gzip: if True, enables gzip compression for writes. + Default - `True`. + :param request_timeout_ms: an HTTP request timeout in milliseconds. + Default - `10000`. + :param on_client_connect_success: An optional callback made after successful + client authentication, primarily for additional logging. + :param on_client_connect_failure: An optional callback made after failed + client authentication (which should raise an Exception). + Callback should accept the raised Exception as an argument. + Callback must resolve (or propagate/re-raise) the Exception. + """ + + super().__init__( + on_client_connect_success=on_client_connect_success, + on_client_connect_failure=on_client_connect_failure, + ) + + if time_precision not in (time_args := get_args(TimePrecision)): + raise ValueError( + f"Invalid 'time_precision' argument {time_precision}; " + f"valid options: {time_args}" + ) + if not callable(fields_keys) and not callable(tags_keys): + fields_tags_keys_overlap = set(fields_keys) & set(tags_keys) + if fields_tags_keys_overlap: + overlap_str = ",".join(str(k) for k in fields_tags_keys_overlap) + raise ValueError( + f'Keys {overlap_str} are present in both "fields_keys" and "tags_keys"' + ) + url_path = "influxdb/v1/write" + base_url = urljoin(host, url_path) + precision = time_precision + if precision == "us": + precision = "u" + query_params = {"db":database,"precision": precision} + header = { + "Content-Type": "text/plain; charset=utf-8", + } + if enable_gzip: + header["Accept-Encoding"] = "gzip" + if token != "": + query_params["token"] = token + elif username != "" and password != "": + basic_auth = f"{username}:{password}" + header["authorization"] = f"Basic {base64.b64encode(basic_auth.encode('latin-1')).decode()}" + else: + raise ValueError( + "Either token or username and password must be provided" + ) + if table_name_key: + if table_name_key not in tags_keys: + raise ValueError( + f'table_name_key "{table_name_key}" must be present in tags_keys' + ) + query_params["table_name_key"] = table_name_key + query_string = urlencode(query_params) + full_url = f"{base_url}?{query_string}" + self._client_args = { + "url": full_url, + "header": header, + "timeout": request_timeout_ms, + "verify_ssl": verify_ssl, + } + self._client: Optional[urllib3.PoolManager] = None + self._measurement = self._measurement_callable(measurement) + self._fields_keys = self._fields_callable(fields_keys) + self._tags_keys = self._tags_callable(tags_keys) + self._include_metadata_tags = include_metadata_tags + self._time_key = time_key + self._write_precision = time_precision + self._batch_size = batch_size + self._allow_missing_fields = allow_missing_fields + self._convert_ints_to_floats = convert_ints_to_floats + + def _measurement_callable(self, setter: MeasurementSetter) -> MeasurementCallable: + if callable(setter): + return setter + return lambda value: setter + + def _fields_callable(self, setter: FieldsSetter) -> FieldsCallable: + if callable(setter): + return setter + return lambda value: setter + + def _tags_callable(self, setter: TagsSetter) -> TagsCallable: + if callable(setter): + return setter + return lambda value: setter + + def setup(self): + if self._client_args['verify_ssl']: + cert_reqs = ssl.CERT_REQUIRED + else: + cert_reqs = ssl.CERT_NONE + self._client = urllib3.PoolManager( + cert_reqs=cert_reqs, + ) + + + def add( + self, + value: Any, + key: Any, + timestamp: int, + headers: HeadersTuples, + topic: str, + partition: int, + offset: int, + ): + if not isinstance(value, Mapping): + raise TypeError( + f'Sink "{self.__class__.__name__}" supports only dictionaries,' + f" got {type(value)}" + ) + return super().add( + value=value, + key=key, + timestamp=timestamp, + headers=headers, + topic=topic, + partition=partition, + offset=offset, + ) + + def write(self, batch: SinkBatch): + measurement = self._measurement + fields_keys = self._fields_keys + tags_keys = self._tags_keys + time_key = self._time_key + for write_batch in batch.iter_chunks(n=self._batch_size): + records = [] + + min_timestamp = sys.maxsize + max_timestamp = -1 + + for item in write_batch: + value = item.value + # Evaluate these before we alter the value + _measurement = measurement(value) + _tags_keys = tags_keys(value) + _fields_keys = fields_keys(value) + + tags = {} + for tag_key in _tags_keys: + if tag_key in value: + tag = value.pop(tag_key) + tags[tag_key] = tag + + if self._include_metadata_tags: + tags["__key"] = item.key + tags["__topic"] = batch.topic + tags["__partition"] = batch.partition + + if _fields_keys: + fields = { + f: value[f] + for f in _fields_keys + if f in value or not self._allow_missing_fields + } + else: + fields = value + + if self._convert_ints_to_floats: + fields = { + k: float(v) if isinstance(v, int) else v + for k, v in fields.items() + } + + ts = value[time_key] if time_key is not None else item.timestamp + record = { + "measurement": _measurement, + "tags": tags, + "fields": fields, + "time": ts, + } + records.append(record) + min_timestamp = min(ts, min_timestamp) + max_timestamp = max(ts, max_timestamp) + if not records: + logger.debug("No records to write") + continue + _start = time.monotonic() + l: list[bytes] = [b''] * len(records) + for i, point in enumerate(records): + p = Point.from_dict(point, self._write_precision) + l[i] = p.to_line_protocol().encode('utf-8') + body = b"\n".join(l) + timeout = urllib3.Timeout(total=self._client_args['timeout'] / 1_000) + logger.debug(f"Sending data to {self._client_args['url']} : {body}") + resp = self._client.request("POST", self._client_args['url'], body=body, + headers=self._client_args['header'], timeout=timeout) + elapsed = round(time.monotonic() - _start, 2) + logger.info( + f"Sent data to TDengine; " + f"total_records={len(records)} " + f"min_timestamp={min_timestamp} " + f"max_timestamp={max_timestamp} " + f"time_elapsed={elapsed}s" + ) + err = urllib3.exceptions.HTTPError( + f"Failed to write data to TDengine: {resp.status} {resp.data}" + ) + if resp.status != 204: + if resp.status == 503: + retry_after = resp.getheader('Retry-After') + raise SinkBackpressureError( + retry_after=int(retry_after) + ) from err + raise err \ No newline at end of file From d8ac006487939d558b4a287276c449c30846edbc Mon Sep 17 00:00:00 2001 From: Joel Brass Date: Tue, 10 Jun 2025 15:11:52 -0700 Subject: [PATCH 02/18] adding dateutil for tdengine sink --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index dddee0185..7ebb5dc55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ jsonschema>=4.3.0 jsonlines>=4,<5 rich>=13,<15 jsonpath_ng>=1.7.0,<2 +python-dateutil From 2e2adb8fdeb63aacb83895c74268f576b914a6be Mon Sep 17 00:00:00 2001 From: Joel Brass Date: Wed, 11 Jun 2025 16:59:53 -0700 Subject: [PATCH 03/18] formatted with black --- .../sinks/community/tdengine/date_utils.py | 8 +- quixstreams/sinks/community/tdengine/point.py | 131 +++++++++++------- quixstreams/sinks/community/tdengine/sink.py | 47 ++++--- 3 files changed, 107 insertions(+), 79 deletions(-) diff --git a/quixstreams/sinks/community/tdengine/date_utils.py b/quixstreams/sinks/community/tdengine/date_utils.py index e7e423893..8219c3c4a 100644 --- a/quixstreams/sinks/community/tdengine/date_utils.py +++ b/quixstreams/sinks/community/tdengine/date_utils.py @@ -1,4 +1,5 @@ """Utils to get right Date parsing function.""" + import datetime import threading from datetime import timezone as tz @@ -56,9 +57,9 @@ def to_nanoseconds(self, delta): Solution comes from v1 client. Thx. https://github.com/influxdata/influxdb-python/pull/811 """ - nanoseconds_in_days = delta.days * 86400 * 10 ** 9 - nanoseconds_in_seconds = delta.seconds * 10 ** 9 - nanoseconds_in_micros = delta.microseconds * 10 ** 3 + nanoseconds_in_days = delta.days * 86400 * 10**9 + nanoseconds_in_seconds = delta.seconds * 10**9 + nanoseconds_in_micros = delta.microseconds * 10**3 return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros @@ -89,6 +90,7 @@ def get_date_helper() -> DateHelper: _date_helper = DateHelper() try: import ciso8601 + _date_helper.parse_date = ciso8601.parse_datetime except ModuleNotFoundError: if (version_info.major, version_info.minor) >= (3, 11): diff --git a/quixstreams/sinks/community/tdengine/point.py b/quixstreams/sinks/community/tdengine/point.py index 43616d417..d667d8d92 100644 --- a/quixstreams/sinks/community/tdengine/point.py +++ b/quixstreams/sinks/community/tdengine/point.py @@ -10,27 +10,33 @@ DEFAULT_WRITE_PRECISION = "ns" -_ESCAPE_MEASUREMENT = str.maketrans({ - ',': r'\,', - ' ': r'\ ', - '\n': r'\n', - '\t': r'\t', - '\r': r'\r', -}) - -_ESCAPE_KEY = str.maketrans({ - ',': r'\,', - '=': r'\=', - ' ': r'\ ', - '\n': r'\n', - '\t': r'\t', - '\r': r'\r', -}) - -_ESCAPE_STRING = str.maketrans({ - '"': r'\"', - '\\': r'\\', -}) +_ESCAPE_MEASUREMENT = str.maketrans( + { + ",": r"\,", + " ": r"\ ", + "\n": r"\n", + "\t": r"\t", + "\r": r"\r", + } +) + +_ESCAPE_KEY = str.maketrans( + { + ",": r"\,", + "=": r"\=", + " ": r"\ ", + "\n": r"\n", + "\t": r"\t", + "\r": r"\r", + } +) + +_ESCAPE_STRING = str.maketrans( + { + '"': r"\"", + "\\": r"\\", + } +) try: import numpy as np @@ -48,7 +54,9 @@ def measurement(measurement): return p @staticmethod - def from_dict(dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs): + def from_dict( + dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs + ): """ Initialize point from 'dict' structure. @@ -124,44 +132,51 @@ def from_dict(dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, The ``field_types`` can be also specified as part of incoming dictionary. For more info see an example above. :return: new data point """ # noqa: E501 - measurement_ = kwargs.get('record_measurement_name', None) + measurement_ = kwargs.get("record_measurement_name", None) if measurement_ is None: - measurement_ = dictionary[kwargs.get('record_measurement_key', 'measurement')] + measurement_ = dictionary[ + kwargs.get("record_measurement_key", "measurement") + ] point = Point(measurement_) - record_tag_keys = kwargs.get('record_tag_keys', None) + record_tag_keys = kwargs.get("record_tag_keys", None) if record_tag_keys is not None: for tag_key in record_tag_keys: if tag_key in dictionary: point.tag(tag_key, dictionary[tag_key]) - elif 'tags' in dictionary: - for tag_key, tag_value in dictionary['tags'].items(): + elif "tags" in dictionary: + for tag_key, tag_value in dictionary["tags"].items(): point.tag(tag_key, tag_value) - record_field_keys = kwargs.get('record_field_keys', None) + record_field_keys = kwargs.get("record_field_keys", None) if record_field_keys is not None: for field_key in record_field_keys: if field_key in dictionary: point.field(field_key, dictionary[field_key]) else: - for field_key, field_value in dictionary['fields'].items(): + for field_key, field_value in dictionary["fields"].items(): point.field(field_key, field_value) - record_time_key = kwargs.get('record_time_key', 'time') + record_time_key = kwargs.get("record_time_key", "time") if record_time_key in dictionary: point.time(dictionary[record_time_key], write_precision=write_precision) - _field_types = kwargs.get('field_types', {}) - if 'field_types' in dictionary: - _field_types = dictionary['field_types'] + _field_types = kwargs.get("field_types", {}) + if "field_types" in dictionary: + _field_types = dictionary["field_types"] # Map API fields types to Line Protocol types postfix: # - int: 'i' # - uint: 'u' # - float: '' - point._field_types = dict(map( - lambda item: (item[0], 'i' if item[1] == 'int' else 'u' if item[1] == 'uint' else ''), - _field_types.items() - )) + point._field_types = dict( + map( + lambda item: ( + item[0], + "i" if item[1] == "int" else "u" if item[1] == "uint" else "", + ), + _field_types.items(), + ) + ) return point @@ -223,7 +238,9 @@ def to_line_protocol(self, precision=None): _fields = _append_fields(self._fields, self._field_types) if not _fields: return "" - _time = _append_time(self._time, self._write_precision if precision is None else precision) + _time = _append_time( + self._time, self._write_precision if precision is None else precision + ) return f"{_measurement}{_tags}{_fields}{_time}" @@ -251,8 +268,8 @@ def _append_tags(tags): tag = _escape_key(tag_key) value = _escape_tag_value(tag_value) - if tag != '' and value != '': - _return.append(f'{tag}={value}') + if tag != "" and value != "": + _return.append(f"{tag}={value}") return f"{',' if _return else ''}{','.join(_return)} " @@ -264,7 +281,11 @@ def _append_fields(fields, field_types): if value is None: continue - if isinstance(value, float) or isinstance(value, Decimal) or _np_is_subtype(value, 'float'): + if ( + isinstance(value, float) + or isinstance(value, Decimal) + or _np_is_subtype(value, "float") + ): if not math.isfinite(value): continue s = str(value) @@ -272,25 +293,29 @@ def _append_fields(fields, field_types): # and the trailing ".0" that Python produces is unnecessary # in line-protocol, inconsistent with other line-protocol encoders, # and takes more space than needed, so trim it off. - if s.endswith('.0'): + if s.endswith(".0"): s = s[:-2] - _return.append(f'{_escape_key(field)}={s}') - elif (isinstance(value, int) or _np_is_subtype(value, 'int')) and not isinstance(value, bool): + _return.append(f"{_escape_key(field)}={s}") + elif ( + isinstance(value, int) or _np_is_subtype(value, "int") + ) and not isinstance(value, bool): _type = field_types.get(field, "i") - _return.append(f'{_escape_key(field)}={str(value)}{_type}') + _return.append(f"{_escape_key(field)}={str(value)}{_type}") elif isinstance(value, bool): - _return.append(f'{_escape_key(field)}={str(value).lower()}') + _return.append(f"{_escape_key(field)}={str(value).lower()}") elif isinstance(value, str): _return.append(f'{_escape_key(field)}="{_escape_string(value)}"') else: - raise ValueError(f'Type: "{type(value)}" of field: "{field}" is not supported.') + raise ValueError( + f'Type: "{type(value)}" of field: "{field}" is not supported.' + ) return f"{','.join(_return)}" def _append_time(time, write_precision) -> str: if time is None: - return '' + return "" return f" {int(_convert_timestamp(time, write_precision))}" @@ -302,8 +327,8 @@ def _escape_key(tag, escape_list=None) -> str: def _escape_tag_value(value) -> str: ret = _escape_key(value) - if ret.endswith('\\'): - ret += ' ' + if ret.endswith("\\"): + ret += " " return ret @@ -339,11 +364,11 @@ def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION): def _np_is_subtype(value, np_type): - if not _HAS_NUMPY or not hasattr(value, 'dtype'): + if not _HAS_NUMPY or not hasattr(value, "dtype"): return False - if np_type == 'float': + if np_type == "float": return np.issubdtype(value, np.floating) - elif np_type == 'int': + elif np_type == "int": return np.issubdtype(value, np.integer) return False diff --git a/quixstreams/sinks/community/tdengine/sink.py b/quixstreams/sinks/community/tdengine/sink.py index 5be0b0bd5..c27c35495 100644 --- a/quixstreams/sinks/community/tdengine/sink.py +++ b/quixstreams/sinks/community/tdengine/sink.py @@ -9,7 +9,6 @@ import urllib3 from quixstreams.models import HeadersTuples - from quixstreams.sinks.base import ( BatchingSink, ClientConnectFailureCallback, @@ -44,7 +43,7 @@ def __init__( host: str, database: str, measurement: MeasurementSetter, - table_name_key: str , + table_name_key: str, fields_keys: FieldsSetter = (), tags_keys: TagsSetter = (), time_key: Optional[str] = None, @@ -152,7 +151,7 @@ def __init__( precision = time_precision if precision == "us": precision = "u" - query_params = {"db":database,"precision": precision} + query_params = {"db": database, "precision": precision} header = { "Content-Type": "text/plain; charset=utf-8", } @@ -162,11 +161,11 @@ def __init__( query_params["token"] = token elif username != "" and password != "": basic_auth = f"{username}:{password}" - header["authorization"] = f"Basic {base64.b64encode(basic_auth.encode('latin-1')).decode()}" - else: - raise ValueError( - "Either token or username and password must be provided" + header["authorization"] = ( + f"Basic {base64.b64encode(basic_auth.encode('latin-1')).decode()}" ) + else: + raise ValueError("Either token or username and password must be provided") if table_name_key: if table_name_key not in tags_keys: raise ValueError( @@ -208,14 +207,13 @@ def _tags_callable(self, setter: TagsSetter) -> TagsCallable: return lambda value: setter def setup(self): - if self._client_args['verify_ssl']: + if self._client_args["verify_ssl"]: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE self._client = urllib3.PoolManager( - cert_reqs=cert_reqs, - ) - + cert_reqs=cert_reqs, + ) def add( self, @@ -300,15 +298,20 @@ def write(self, batch: SinkBatch): logger.debug("No records to write") continue _start = time.monotonic() - l: list[bytes] = [b''] * len(records) + l: list[bytes] = [b""] * len(records) for i, point in enumerate(records): p = Point.from_dict(point, self._write_precision) - l[i] = p.to_line_protocol().encode('utf-8') + l[i] = p.to_line_protocol().encode("utf-8") body = b"\n".join(l) - timeout = urllib3.Timeout(total=self._client_args['timeout'] / 1_000) + timeout = urllib3.Timeout(total=self._client_args["timeout"] / 1_000) logger.debug(f"Sending data to {self._client_args['url']} : {body}") - resp = self._client.request("POST", self._client_args['url'], body=body, - headers=self._client_args['header'], timeout=timeout) + resp = self._client.request( + "POST", + self._client_args["url"], + body=body, + headers=self._client_args["header"], + timeout=timeout, + ) elapsed = round(time.monotonic() - _start, 2) logger.info( f"Sent data to TDengine; " @@ -318,12 +321,10 @@ def write(self, batch: SinkBatch): f"time_elapsed={elapsed}s" ) err = urllib3.exceptions.HTTPError( - f"Failed to write data to TDengine: {resp.status} {resp.data}" - ) + f"Failed to write data to TDengine: {resp.status} {resp.data}" + ) if resp.status != 204: if resp.status == 503: - retry_after = resp.getheader('Retry-After') - raise SinkBackpressureError( - retry_after=int(retry_after) - ) from err - raise err \ No newline at end of file + retry_after = resp.getheader("Retry-After") + raise SinkBackpressureError(retry_after=int(retry_after)) from err + raise err From 78d5bd9a9ee9ea2ee31bde163d8260a8efa1045d Mon Sep 17 00:00:00 2001 From: Joel Brass Date: Thu, 26 Jun 2025 17:28:14 -0700 Subject: [PATCH 04/18] changes to address packaging --- conda/meta.yaml | 1 + pyproject.toml | 1 + quixstreams/sinks/community/tdengine/date_utils.py | 10 +++++++++- requirements.txt | 3 +-- 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/conda/meta.yaml b/conda/meta.yaml index 9f100a975..5ccc432d2 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -34,6 +34,7 @@ requirements: - pandas >=1.0.0,<3.0 - elasticsearch >=8.17,<9 - rich >=13,<15 + - python-dateutil >=2.8.2,<3 test: imports: diff --git a/pyproject.toml b/pyproject.toml index b95560611..d25c8da34 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ neo4j = ["neo4j>=5.27.0,<6"] mongodb = ["pymongo>=4.11,<5"] pandas = ["pandas>=1.0.0,<3.0"] elasticsearch = ["elasticsearch>=8.17,<9"] +tdengine = ["python-dateutil>=2.8.2,<3"] # AWS dependencies are separated by service to support # different requirements in the future. diff --git a/quixstreams/sinks/community/tdengine/date_utils.py b/quixstreams/sinks/community/tdengine/date_utils.py index 8219c3c4a..b7837835d 100644 --- a/quixstreams/sinks/community/tdengine/date_utils.py +++ b/quixstreams/sinks/community/tdengine/date_utils.py @@ -5,7 +5,15 @@ from datetime import timezone as tz from sys import version_info -from dateutil import parser + +try: + from dateutil import parser +except ImportError as exc: + raise ImportError( + 'Package "dateutil" is missing: ' + "run pip install quixstreams[tdengine] to fix it" + ) from exc + date_helper = None diff --git a/requirements.txt b/requirements.txt index 7ebb5dc55..80b72291e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,5 +8,4 @@ pydantic-settings>=2.3,<2.10 jsonschema>=4.3.0 jsonlines>=4,<5 rich>=13,<15 -jsonpath_ng>=1.7.0,<2 -python-dateutil +jsonpath_ng>=1.7.0,<2 \ No newline at end of file From 67d49cea00168f7cad91d3eb81ed414abd81aef5 Mon Sep 17 00:00:00 2001 From: Joel Brass Date: Thu, 26 Jun 2025 17:30:14 -0700 Subject: [PATCH 05/18] Sink updates --- quixstreams/sinks/community/tdengine/sink.py | 24 ++++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/quixstreams/sinks/community/tdengine/sink.py b/quixstreams/sinks/community/tdengine/sink.py index c27c35495..95576b4a8 100644 --- a/quixstreams/sinks/community/tdengine/sink.py +++ b/quixstreams/sinks/community/tdengine/sink.py @@ -17,7 +17,7 @@ SinkBatch, ) -from point import Point +from .point import Point logger = logging.getLogger(__name__) @@ -42,8 +42,8 @@ def __init__( self, host: str, database: str, - measurement: MeasurementSetter, - table_name_key: str, + supertable: MeasurementSetter, + subtable: Optional[str] = None, fields_keys: FieldsSetter = (), tags_keys: TagsSetter = (), time_key: Optional[str] = None, @@ -78,13 +78,13 @@ def __init__( :param verify_ssl: if `True`, verifies the SSL certificate. Default - `True`. :param database: database name - :param measurement: measurement name as a string. + :param supertable: supertable name as a string. Also accepts a single-argument callable that receives the current message data as a dict and returns a string. :param table_name_key: A tag key whose value is used as the subtable name when writing to TDengine. If the data does not contain this tag key, a hash value will be generated from the data as the subtable name. :param fields_keys: an iterable (list) of strings used as InfluxDB line protocol "fields". - Also accepts a singl e-argument callable that receives the current message + Also accepts a single argument callable that receives the current message data as a dict and returns an iterable of strings. - If present, it must not overlap with "tags_keys". - If empty, the whole record value will be used. @@ -166,12 +166,12 @@ def __init__( ) else: raise ValueError("Either token or username and password must be provided") - if table_name_key: - if table_name_key not in tags_keys: + if subtable: + if subtable not in tags_keys: raise ValueError( - f'table_name_key "{table_name_key}" must be present in tags_keys' + f'table_name_key "{subtable}" must be present in tags_keys' ) - query_params["table_name_key"] = table_name_key + query_params["table_name_key"] = subtable query_string = urlencode(query_params) full_url = f"{base_url}?{query_string}" self._client_args = { @@ -181,7 +181,7 @@ def __init__( "verify_ssl": verify_ssl, } self._client: Optional[urllib3.PoolManager] = None - self._measurement = self._measurement_callable(measurement) + self._measurement = self._measurement_callable(supertable) self._fields_keys = self._fields_callable(fields_keys) self._tags_keys = self._tags_callable(tags_keys) self._include_metadata_tags = include_metadata_tags @@ -241,7 +241,7 @@ def add( ) def write(self, batch: SinkBatch): - measurement = self._measurement + supertable = self._measurement fields_keys = self._fields_keys tags_keys = self._tags_keys time_key = self._time_key @@ -254,7 +254,7 @@ def write(self, batch: SinkBatch): for item in write_batch: value = item.value # Evaluate these before we alter the value - _measurement = measurement(value) + _measurement = supertable(value) _tags_keys = tags_keys(value) _fields_keys = fields_keys(value) From e2ba7f763821bee29a32606c09def76ce25c3733 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Fri, 27 Jun 2025 15:43:21 +0800 Subject: [PATCH 06/18] enh: enhance subtable handling and improve database status checks --- quixstreams/sinks/community/tdengine/sink.py | 131 ++++++++++++------- 1 file changed, 82 insertions(+), 49 deletions(-) diff --git a/quixstreams/sinks/community/tdengine/sink.py b/quixstreams/sinks/community/tdengine/sink.py index 95576b4a8..1e863420f 100644 --- a/quixstreams/sinks/community/tdengine/sink.py +++ b/quixstreams/sinks/community/tdengine/sink.py @@ -1,4 +1,5 @@ import base64 +import json import logging import ssl import sys @@ -21,45 +22,45 @@ logger = logging.getLogger(__name__) - TimePrecision = Literal["ms", "ns", "us", "s"] InfluxDBValueMap = dict[str, Union[str, int, float, bool]] FieldsCallable = Callable[[InfluxDBValueMap], Iterable[str]] -MeasurementCallable = Callable[[InfluxDBValueMap], str] +SupertableCallable = Callable[[InfluxDBValueMap], str] TagsCallable = Callable[[InfluxDBValueMap], Iterable[str]] - +SubtableNameCallable = Callable[[InfluxDBValueMap], str] FieldsSetter = Union[Iterable[str], FieldsCallable] -MeasurementSetter = Union[str, MeasurementCallable] +SupertableSetter = Union[str, SupertableCallable] TagsSetter = Union[Iterable[str], TagsCallable] +SubtableNameSetter = Union[str, SubtableNameCallable] class TDengineSink(BatchingSink): def __init__( - self, - host: str, - database: str, - supertable: MeasurementSetter, - subtable: Optional[str] = None, - fields_keys: FieldsSetter = (), - tags_keys: TagsSetter = (), - time_key: Optional[str] = None, - time_precision: TimePrecision = "ms", - allow_missing_fields: bool = False, - include_metadata_tags: bool = False, - convert_ints_to_floats: bool = False, - batch_size: int = 1000, - enable_gzip: bool = True, - request_timeout_ms: int = 10_000, - on_client_connect_success: Optional[ClientConnectSuccessCallback] = None, - on_client_connect_failure: Optional[ClientConnectFailureCallback] = None, - verify_ssl: bool = True, - username: str = "", - password: str = "", - token: str = "", + self, + host: str, + database: str, + supertable: SupertableSetter, + subtable: SubtableNameSetter, + fields_keys: FieldsSetter = (), + tags_keys: TagsSetter = (), + time_key: Optional[str] = None, + time_precision: TimePrecision = "ms", + allow_missing_fields: bool = False, + include_metadata_tags: bool = False, + convert_ints_to_floats: bool = False, + batch_size: int = 1000, + enable_gzip: bool = True, + request_timeout_ms: int = 10_000, + on_client_connect_success: Optional[ClientConnectSuccessCallback] = None, + on_client_connect_failure: Optional[ClientConnectFailureCallback] = None, + verify_ssl: bool = True, + username: str = "", + password: str = "", + token: str = "", ): """ A connector to sink processed data to TDengine. @@ -72,7 +73,7 @@ def __init__( > sinking. :param token: TDengine cloud token - :param host: TDengine host in format "https://" + :param host: TDengine host in format "http[s]://[:]". :param username: TDengine username :param password: TDengine password :param verify_ssl: if `True`, verifies the SSL certificate. @@ -81,8 +82,10 @@ def __init__( :param supertable: supertable name as a string. Also accepts a single-argument callable that receives the current message data as a dict and returns a string. - :param table_name_key: A tag key whose value is used as the subtable name when writing to TDengine. - If the data does not contain this tag key, a hash value will be generated from the data as the subtable name. + :param subtable: subtable name as a string. + Also accepts a single-argument callable that receives the current message + data as a dict and returns a string. + If the subtable name is empty string, a hash value will be generated from the data as the subtable name. :param fields_keys: an iterable (list) of strings used as InfluxDB line protocol "fields". Also accepts a single argument callable that receives the current message data as a dict and returns an iterable of strings. @@ -148,10 +151,11 @@ def __init__( ) url_path = "influxdb/v1/write" base_url = urljoin(host, url_path) + sql_url = urljoin(host, "rest/sql") precision = time_precision if precision == "us": precision = "u" - query_params = {"db": database, "precision": precision} + query_params = {"db": database, "precision": precision, "table_name_key": "__subtable"} header = { "Content-Type": "text/plain; charset=utf-8", } @@ -159,6 +163,7 @@ def __init__( header["Accept-Encoding"] = "gzip" if token != "": query_params["token"] = token + sql_url = urljoin(sql_url, f"?token={token}") elif username != "" and password != "": basic_auth = f"{username}:{password}" header["authorization"] = ( @@ -166,22 +171,19 @@ def __init__( ) else: raise ValueError("Either token or username and password must be provided") - if subtable: - if subtable not in tags_keys: - raise ValueError( - f'table_name_key "{subtable}" must be present in tags_keys' - ) - query_params["table_name_key"] = subtable query_string = urlencode(query_params) full_url = f"{base_url}?{query_string}" self._client_args = { "url": full_url, + "sql_url": sql_url, "header": header, "timeout": request_timeout_ms, "verify_ssl": verify_ssl, + "database": database, } self._client: Optional[urllib3.PoolManager] = None - self._measurement = self._measurement_callable(supertable) + self._supertable_name = self._supertable_callable(supertable) + self._subtable_name = self._subtable_name_callable(subtable) self._fields_keys = self._fields_callable(fields_keys) self._tags_keys = self._tags_callable(tags_keys) self._include_metadata_tags = include_metadata_tags @@ -191,7 +193,7 @@ def __init__( self._allow_missing_fields = allow_missing_fields self._convert_ints_to_floats = convert_ints_to_floats - def _measurement_callable(self, setter: MeasurementSetter) -> MeasurementCallable: + def _supertable_callable(self, setter: SupertableSetter) -> SupertableCallable: if callable(setter): return setter return lambda value: setter @@ -206,6 +208,11 @@ def _tags_callable(self, setter: TagsSetter) -> TagsCallable: return setter return lambda value: setter + def _subtable_name_callable(self, setter: SubtableNameSetter) -> SubtableNameCallable: + if callable(setter): + return setter + return lambda value: setter + def setup(self): if self._client_args["verify_ssl"]: cert_reqs = ssl.CERT_REQUIRED @@ -214,16 +221,41 @@ def setup(self): self._client = urllib3.PoolManager( cert_reqs=cert_reqs, ) + # check if the database is alive + database = self._client_args["database"] + check_db_sql = f"SHOW `{database}`.alive" + timeout = urllib3.Timeout(total=self._client_args["timeout"] / 1_000) + logger.debug(f"Sending data to {self._client_args['sql_url']} : {check_db_sql}") + resp = self._client.request( + "POST", + self._client_args["sql_url"], + body=check_db_sql, + headers=self._client_args["header"], + timeout=timeout, + ) + if resp.status != 200: + err = urllib3.exceptions.HTTPError( + f"Failed to check database status: {resp.status} {resp.data}" + ) + raise err + resp_data = json.loads(resp.data.decode("utf-8")) + resp_code = resp_data.get("code") + if resp_code != 0: + error_message = resp_data.get("desc", "Unknown error") + err = urllib3.exceptions.HTTPError( + f"Failed to check database status, [{resp_code}]:{error_message}" + ) + raise err def add( - self, - value: Any, - key: Any, - timestamp: int, - headers: HeadersTuples, - topic: str, - partition: int, - offset: int, + self, + value: Any, + key: Any, + timestamp: int, + headers: HeadersTuples, + topic: str, + partition: int, + offset: int, ): if not isinstance(value, Mapping): raise TypeError( @@ -241,7 +273,8 @@ def add( ) def write(self, batch: SinkBatch): - supertable = self._measurement + supertable = self._supertable_name + subtable = self._subtable_name fields_keys = self._fields_keys tags_keys = self._tags_keys time_key = self._time_key @@ -257,7 +290,7 @@ def write(self, batch: SinkBatch): _measurement = supertable(value) _tags_keys = tags_keys(value) _fields_keys = fields_keys(value) - + _subtable_name = subtable(item.value) tags = {} for tag_key in _tags_keys: if tag_key in value: @@ -269,6 +302,7 @@ def write(self, batch: SinkBatch): tags["__topic"] = batch.topic tags["__partition"] = batch.partition + tags["__subtable"] = _subtable_name if _fields_keys: fields = { f: value[f] @@ -283,8 +317,7 @@ def write(self, batch: SinkBatch): k: float(v) if isinstance(v, int) else v for k, v in fields.items() } - - ts = value[time_key] if time_key is not None else item.timestamp + ts = value[time_key] if time_key is not None and time_key in value else item.timestamp record = { "measurement": _measurement, "tags": tags, From 43b67da6791510be58097a6ce5dde7696ba2d46f Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Fri, 27 Jun 2025 16:03:49 +0800 Subject: [PATCH 07/18] refactor: clean up import order and improve code formatting --- .../sinks/community/tdengine/date_utils.py | 1 - quixstreams/sinks/community/tdengine/point.py | 746 +++++++++--------- quixstreams/sinks/community/tdengine/sink.py | 79 +- 3 files changed, 416 insertions(+), 410 deletions(-) diff --git a/quixstreams/sinks/community/tdengine/date_utils.py b/quixstreams/sinks/community/tdengine/date_utils.py index b7837835d..967b09f6f 100644 --- a/quixstreams/sinks/community/tdengine/date_utils.py +++ b/quixstreams/sinks/community/tdengine/date_utils.py @@ -5,7 +5,6 @@ from datetime import timezone as tz from sys import version_info - try: from dateutil import parser except ImportError as exc: diff --git a/quixstreams/sinks/community/tdengine/point.py b/quixstreams/sinks/community/tdengine/point.py index d667d8d92..b579689ce 100644 --- a/quixstreams/sinks/community/tdengine/point.py +++ b/quixstreams/sinks/community/tdengine/point.py @@ -1,374 +1,372 @@ -import math -import warnings -from datetime import datetime, timezone, timedelta -from decimal import Decimal -from numbers import Integral - -from .date_utils import get_date_helper - -EPOCH = datetime.fromtimestamp(0, tz=timezone.utc) - -DEFAULT_WRITE_PRECISION = "ns" - -_ESCAPE_MEASUREMENT = str.maketrans( - { - ",": r"\,", - " ": r"\ ", - "\n": r"\n", - "\t": r"\t", - "\r": r"\r", - } -) - -_ESCAPE_KEY = str.maketrans( - { - ",": r"\,", - "=": r"\=", - " ": r"\ ", - "\n": r"\n", - "\t": r"\t", - "\r": r"\r", - } -) - -_ESCAPE_STRING = str.maketrans( - { - '"': r"\"", - "\\": r"\\", - } -) - -try: - import numpy as np - - _HAS_NUMPY = True -except ModuleNotFoundError: - _HAS_NUMPY = False - - -class Point(object): - @staticmethod - def measurement(measurement): - """Create a new Point with specified measurement name.""" - p = Point(measurement) - return p - - @staticmethod - def from_dict( - dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs - ): - """ - Initialize point from 'dict' structure. - - The expected dict structure is: - - measurement - - tags - - fields - - time - - Example: - .. code-block:: python - - # Use default dictionary structure - dict_structure = { - "measurement": "h2o_feet", - "tags": {"location": "coyote_creek"}, - "fields": {"water_level": 1.0}, - "time": 1 - } - point = Point.from_dict(dict_structure, "ns") - - Example: - .. code-block:: python - - # Use custom dictionary structure - dictionary = { - "name": "sensor_pt859", - "location": "warehouse_125", - "version": "2021.06.05.5874", - "pressure": 125, - "temperature": 10, - "created": 1632208639, - } - point = Point.from_dict(dictionary, - write_precision=WritePrecision.S, - record_measurement_key="name", - record_time_key="created", - record_tag_keys=["location", "version"], - record_field_keys=["pressure", "temperature"]) - - Int Types: - The following example shows how to configure the types of integers fields. - It is useful when you want to serialize integers always as ``float`` to avoid ``field type conflict`` - or use ``unsigned 64-bit integer`` as the type for serialization. - - .. code-block:: python - - # Use custom dictionary structure - dict_structure = { - "measurement": "h2o_feet", - "tags": {"location": "coyote_creek"}, - "fields": { - "water_level": 1.0, - "some_counter": 108913123234 - }, - "time": 1 - } - - point = Point.from_dict(dict_structure, field_types={"some_counter": "uint"}) - - :param dictionary: dictionary for serialize into data Point - :param write_precision: sets the precision for the supplied time values - :key record_measurement_key: key of dictionary with specified measurement - :key record_measurement_name: static measurement name for data Point - :key record_time_key: key of dictionary with specified timestamp - :key record_tag_keys: list of dictionary keys to use as a tag - :key record_field_keys: list of dictionary keys to use as a field - :key field_types: optional dictionary to specify types of serialized fields. Currently, is supported customization for integer types. - Possible integers types: - - ``int`` - serialize integers as "**Signed 64-bit integers**" - ``9223372036854775807i`` (default behaviour) - - ``uint`` - serialize integers as "**Unsigned 64-bit integers**" - ``9223372036854775807u`` - - ``float`` - serialize integers as "**IEEE-754 64-bit floating-point numbers**". Useful for unify number types in your pipeline to avoid field type conflict - ``9223372036854775807`` - The ``field_types`` can be also specified as part of incoming dictionary. For more info see an example above. - :return: new data point - """ # noqa: E501 - measurement_ = kwargs.get("record_measurement_name", None) - if measurement_ is None: - measurement_ = dictionary[ - kwargs.get("record_measurement_key", "measurement") - ] - point = Point(measurement_) - - record_tag_keys = kwargs.get("record_tag_keys", None) - if record_tag_keys is not None: - for tag_key in record_tag_keys: - if tag_key in dictionary: - point.tag(tag_key, dictionary[tag_key]) - elif "tags" in dictionary: - for tag_key, tag_value in dictionary["tags"].items(): - point.tag(tag_key, tag_value) - - record_field_keys = kwargs.get("record_field_keys", None) - if record_field_keys is not None: - for field_key in record_field_keys: - if field_key in dictionary: - point.field(field_key, dictionary[field_key]) - else: - for field_key, field_value in dictionary["fields"].items(): - point.field(field_key, field_value) - - record_time_key = kwargs.get("record_time_key", "time") - if record_time_key in dictionary: - point.time(dictionary[record_time_key], write_precision=write_precision) - - _field_types = kwargs.get("field_types", {}) - if "field_types" in dictionary: - _field_types = dictionary["field_types"] - # Map API fields types to Line Protocol types postfix: - # - int: 'i' - # - uint: 'u' - # - float: '' - point._field_types = dict( - map( - lambda item: ( - item[0], - "i" if item[1] == "int" else "u" if item[1] == "uint" else "", - ), - _field_types.items(), - ) - ) - - return point - - def __init__(self, measurement_name): - """Initialize defaults.""" - self._tags = {} - self._fields = {} - self._name = measurement_name - self._time = None - self._write_precision = DEFAULT_WRITE_PRECISION - self._field_types = {} - - def time(self, time, write_precision=DEFAULT_WRITE_PRECISION): - """ - Specify timestamp for DataPoint with declared precision. - - If time doesn't have specified timezone we assume that timezone is UTC. - - Examples:: - Point.measurement("h2o").field("val", 1).time("2009-11-10T23:00:00.123456Z") - Point.measurement("h2o").field("val", 1).time(1257894000123456000) - Point.measurement("h2o").field("val", 1).time(datetime(2009, 11, 10, 23, 0, 0, 123456)) - Point.measurement("h2o").field("val", 1).time(1257894000123456000, write_precision=WritePrecision.NS) - - - :param time: the timestamp for your data - :param write_precision: sets the precision for the supplied time values - :return: this point - """ - self._write_precision = write_precision - self._time = time - return self - - def tag(self, key, value): - """Add tag with key and value.""" - self._tags[key] = value - return self - - def field(self, field, value): - """Add field with key and value.""" - self._fields[field] = value - return self - - def to_line_protocol(self, precision=None): - """ - Create LineProtocol. - - :param precision: required precision of LineProtocol. If it's not set then use the precision from ``Point``. - """ - _measurement = _escape_key(self._name, _ESCAPE_MEASUREMENT) - if _measurement.startswith("#"): - message = f"""The measurement name '{_measurement}' start with '#'. - -The output Line protocol will be interpret as a comment by InfluxDB. For more info see: - - https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/#comments -""" - warnings.warn(message, SyntaxWarning) - _tags = _append_tags(self._tags) - _fields = _append_fields(self._fields, self._field_types) - if not _fields: - return "" - _time = _append_time( - self._time, self._write_precision if precision is None else precision - ) - - return f"{_measurement}{_tags}{_fields}{_time}" - - @property - def write_precision(self): - """Get precision.""" - return self._write_precision - - @classmethod - def set_str_rep(cls, rep_function): - """Set the string representation for all Points.""" - cls.__str___rep = rep_function - - def __str__(self): - """Create string representation of this Point.""" - return self.to_line_protocol() - - -def _append_tags(tags): - _return = [] - for tag_key, tag_value in sorted(tags.items()): - - if tag_value is None: - continue - - tag = _escape_key(tag_key) - value = _escape_tag_value(tag_value) - if tag != "" and value != "": - _return.append(f"{tag}={value}") - - return f"{',' if _return else ''}{','.join(_return)} " - - -def _append_fields(fields, field_types): - _return = [] - - for field, value in sorted(fields.items()): - if value is None: - continue - - if ( - isinstance(value, float) - or isinstance(value, Decimal) - or _np_is_subtype(value, "float") - ): - if not math.isfinite(value): - continue - s = str(value) - # It's common to represent whole numbers as floats - # and the trailing ".0" that Python produces is unnecessary - # in line-protocol, inconsistent with other line-protocol encoders, - # and takes more space than needed, so trim it off. - if s.endswith(".0"): - s = s[:-2] - _return.append(f"{_escape_key(field)}={s}") - elif ( - isinstance(value, int) or _np_is_subtype(value, "int") - ) and not isinstance(value, bool): - _type = field_types.get(field, "i") - _return.append(f"{_escape_key(field)}={str(value)}{_type}") - elif isinstance(value, bool): - _return.append(f"{_escape_key(field)}={str(value).lower()}") - elif isinstance(value, str): - _return.append(f'{_escape_key(field)}="{_escape_string(value)}"') - else: - raise ValueError( - f'Type: "{type(value)}" of field: "{field}" is not supported.' - ) - - return f"{','.join(_return)}" - - -def _append_time(time, write_precision) -> str: - if time is None: - return "" - return f" {int(_convert_timestamp(time, write_precision))}" - - -def _escape_key(tag, escape_list=None) -> str: - if escape_list is None: - escape_list = _ESCAPE_KEY - return str(tag).translate(escape_list) - - -def _escape_tag_value(value) -> str: - ret = _escape_key(value) - if ret.endswith("\\"): - ret += " " - return ret - - -def _escape_string(value) -> str: - return str(value).translate(_ESCAPE_STRING) - - -def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION): - date_helper = get_date_helper() - if isinstance(timestamp, Integral): - return timestamp # assume precision is correct if timestamp is int - - if isinstance(timestamp, str): - timestamp = date_helper.parse_date(timestamp) - - if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime): - - if isinstance(timestamp, datetime): - timestamp = date_helper.to_utc(timestamp) - EPOCH - - ns = date_helper.to_nanoseconds(timestamp) - - if precision is None or precision == "ns": - return ns - elif precision == "us": - return ns / 1e3 - elif precision == "ms": - return ns / 1e6 - elif precision == "s": - return ns / 1e9 - - raise ValueError(timestamp) - - -def _np_is_subtype(value, np_type): - if not _HAS_NUMPY or not hasattr(value, "dtype"): - return False - - if np_type == "float": - return np.issubdtype(value, np.floating) - elif np_type == "int": - return np.issubdtype(value, np.integer) - return False +import math +import warnings +from datetime import datetime, timedelta, timezone +from decimal import Decimal +from numbers import Integral + +from .date_utils import get_date_helper + +EPOCH = datetime.fromtimestamp(0, tz=timezone.utc) + +DEFAULT_WRITE_PRECISION = "ns" + +_ESCAPE_MEASUREMENT = str.maketrans( + { + ",": r"\,", + " ": r"\ ", + "\n": r"\n", + "\t": r"\t", + "\r": r"\r", + } +) + +_ESCAPE_KEY = str.maketrans( + { + ",": r"\,", + "=": r"\=", + " ": r"\ ", + "\n": r"\n", + "\t": r"\t", + "\r": r"\r", + } +) + +_ESCAPE_STRING = str.maketrans( + { + '"': r"\"", + "\\": r"\\", + } +) + +try: + import numpy as np + + _HAS_NUMPY = True +except ModuleNotFoundError: + _HAS_NUMPY = False + + +class Point(object): + @staticmethod + def measurement(measurement): + """Create a new Point with specified measurement name.""" + p = Point(measurement) + return p + + @staticmethod + def from_dict( + dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs + ): + """ + Initialize point from 'dict' structure. + + The expected dict structure is: + - measurement + - tags + - fields + - time + + Example: + .. code-block:: python + + # Use default dictionary structure + dict_structure = { + "measurement": "h2o_feet", + "tags": {"location": "coyote_creek"}, + "fields": {"water_level": 1.0}, + "time": 1 + } + point = Point.from_dict(dict_structure, "ns") + + Example: + .. code-block:: python + + # Use custom dictionary structure + dictionary = { + "name": "sensor_pt859", + "location": "warehouse_125", + "version": "2021.06.05.5874", + "pressure": 125, + "temperature": 10, + "created": 1632208639, + } + point = Point.from_dict(dictionary, + write_precision=WritePrecision.S, + record_measurement_key="name", + record_time_key="created", + record_tag_keys=["location", "version"], + record_field_keys=["pressure", "temperature"]) + + Int Types: + The following example shows how to configure the types of integers fields. + It is useful when you want to serialize integers always as ``float`` to avoid ``field type conflict`` + or use ``unsigned 64-bit integer`` as the type for serialization. + + .. code-block:: python + + # Use custom dictionary structure + dict_structure = { + "measurement": "h2o_feet", + "tags": {"location": "coyote_creek"}, + "fields": { + "water_level": 1.0, + "some_counter": 108913123234 + }, + "time": 1 + } + + point = Point.from_dict(dict_structure, field_types={"some_counter": "uint"}) + + :param dictionary: dictionary for serialize into data Point + :param write_precision: sets the precision for the supplied time values + :key record_measurement_key: key of dictionary with specified measurement + :key record_measurement_name: static measurement name for data Point + :key record_time_key: key of dictionary with specified timestamp + :key record_tag_keys: list of dictionary keys to use as a tag + :key record_field_keys: list of dictionary keys to use as a field + :key field_types: optional dictionary to specify types of serialized fields. Currently, is supported customization for integer types. + Possible integers types: + - ``int`` - serialize integers as "**Signed 64-bit integers**" - ``9223372036854775807i`` (default behaviour) + - ``uint`` - serialize integers as "**Unsigned 64-bit integers**" - ``9223372036854775807u`` + - ``float`` - serialize integers as "**IEEE-754 64-bit floating-point numbers**". Useful for unify number types in your pipeline to avoid field type conflict - ``9223372036854775807`` + The ``field_types`` can be also specified as part of incoming dictionary. For more info see an example above. + :return: new data point + """ # noqa: E501 + measurement_ = kwargs.get("record_measurement_name", None) + if measurement_ is None: + measurement_ = dictionary[ + kwargs.get("record_measurement_key", "measurement") + ] + point = Point(measurement_) + + record_tag_keys = kwargs.get("record_tag_keys", None) + if record_tag_keys is not None: + for tag_key in record_tag_keys: + if tag_key in dictionary: + point.tag(tag_key, dictionary[tag_key]) + elif "tags" in dictionary: + for tag_key, tag_value in dictionary["tags"].items(): + point.tag(tag_key, tag_value) + + record_field_keys = kwargs.get("record_field_keys", None) + if record_field_keys is not None: + for field_key in record_field_keys: + if field_key in dictionary: + point.field(field_key, dictionary[field_key]) + else: + for field_key, field_value in dictionary["fields"].items(): + point.field(field_key, field_value) + + record_time_key = kwargs.get("record_time_key", "time") + if record_time_key in dictionary: + point.time(dictionary[record_time_key], write_precision=write_precision) + + _field_types = kwargs.get("field_types", {}) + if "field_types" in dictionary: + _field_types = dictionary["field_types"] + # Map API fields types to Line Protocol types postfix: + # - int: 'i' + # - uint: 'u' + # - float: '' + point._field_types = dict( + map( + lambda item: ( + item[0], + "i" if item[1] == "int" else "u" if item[1] == "uint" else "", + ), + _field_types.items(), + ) + ) + + return point + + def __init__(self, measurement_name): + """Initialize defaults.""" + self._tags = {} + self._fields = {} + self._name = measurement_name + self._time = None + self._write_precision = DEFAULT_WRITE_PRECISION + self._field_types = {} + + def time(self, time, write_precision=DEFAULT_WRITE_PRECISION): + """ + Specify timestamp for DataPoint with declared precision. + + If time doesn't have specified timezone we assume that timezone is UTC. + + Examples:: + Point.measurement("h2o").field("val", 1).time("2009-11-10T23:00:00.123456Z") + Point.measurement("h2o").field("val", 1).time(1257894000123456000) + Point.measurement("h2o").field("val", 1).time(datetime(2009, 11, 10, 23, 0, 0, 123456)) + Point.measurement("h2o").field("val", 1).time(1257894000123456000, write_precision=WritePrecision.NS) + + + :param time: the timestamp for your data + :param write_precision: sets the precision for the supplied time values + :return: this point + """ + self._write_precision = write_precision + self._time = time + return self + + def tag(self, key, value): + """Add tag with key and value.""" + self._tags[key] = value + return self + + def field(self, field, value): + """Add field with key and value.""" + self._fields[field] = value + return self + + def to_line_protocol(self, precision=None): + """ + Create LineProtocol. + + :param precision: required precision of LineProtocol. If it's not set then use the precision from ``Point``. + """ + _measurement = _escape_key(self._name, _ESCAPE_MEASUREMENT) + if _measurement.startswith("#"): + message = f"""The measurement name '{_measurement}' start with '#'. + +The output Line protocol will be interpret as a comment by InfluxDB. For more info see: + - https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/#comments +""" + warnings.warn(message, SyntaxWarning) + _tags = _append_tags(self._tags) + _fields = _append_fields(self._fields, self._field_types) + if not _fields: + return "" + _time = _append_time( + self._time, self._write_precision if precision is None else precision + ) + + return f"{_measurement}{_tags}{_fields}{_time}" + + @property + def write_precision(self): + """Get precision.""" + return self._write_precision + + @classmethod + def set_str_rep(cls, rep_function): + """Set the string representation for all Points.""" + cls.__str___rep = rep_function + + def __str__(self): + """Create string representation of this Point.""" + return self.to_line_protocol() + + +def _append_tags(tags): + _return = [] + for tag_key, tag_value in sorted(tags.items()): + if tag_value is None: + continue + + tag = _escape_key(tag_key) + value = _escape_tag_value(tag_value) + if tag != "" and value != "": + _return.append(f"{tag}={value}") + + return f"{',' if _return else ''}{','.join(_return)} " + + +def _append_fields(fields, field_types): + _return = [] + + for field, value in sorted(fields.items()): + if value is None: + continue + + if ( + isinstance(value, float) + or isinstance(value, Decimal) + or _np_is_subtype(value, "float") + ): + if not math.isfinite(value): + continue + s = str(value) + # It's common to represent whole numbers as floats + # and the trailing ".0" that Python produces is unnecessary + # in line-protocol, inconsistent with other line-protocol encoders, + # and takes more space than needed, so trim it off. + if s.endswith(".0"): + s = s[:-2] + _return.append(f"{_escape_key(field)}={s}") + elif ( + isinstance(value, int) or _np_is_subtype(value, "int") + ) and not isinstance(value, bool): + _type = field_types.get(field, "i") + _return.append(f"{_escape_key(field)}={str(value)}{_type}") + elif isinstance(value, bool): + _return.append(f"{_escape_key(field)}={str(value).lower()}") + elif isinstance(value, str): + _return.append(f'{_escape_key(field)}="{_escape_string(value)}"') + else: + raise ValueError( + f'Type: "{type(value)}" of field: "{field}" is not supported.' + ) + + return f"{','.join(_return)}" + + +def _append_time(time, write_precision) -> str: + if time is None: + return "" + return f" {int(_convert_timestamp(time, write_precision))}" + + +def _escape_key(tag, escape_list=None) -> str: + if escape_list is None: + escape_list = _ESCAPE_KEY + return str(tag).translate(escape_list) + + +def _escape_tag_value(value) -> str: + ret = _escape_key(value) + if ret.endswith("\\"): + ret += " " + return ret + + +def _escape_string(value) -> str: + return str(value).translate(_ESCAPE_STRING) + + +def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION): + date_helper = get_date_helper() + if isinstance(timestamp, Integral): + return timestamp # assume precision is correct if timestamp is int + + if isinstance(timestamp, str): + timestamp = date_helper.parse_date(timestamp) + + if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime): + if isinstance(timestamp, datetime): + timestamp = date_helper.to_utc(timestamp) - EPOCH + + ns = date_helper.to_nanoseconds(timestamp) + + if precision is None or precision == "ns": + return ns + elif precision == "us": + return ns / 1e3 + elif precision == "ms": + return ns / 1e6 + elif precision == "s": + return ns / 1e9 + + raise ValueError(timestamp) + + +def _np_is_subtype(value, np_type): + if not _HAS_NUMPY or not hasattr(value, "dtype"): + return False + + if np_type == "float": + return np.issubdtype(value, np.floating) + elif np_type == "int": + return np.issubdtype(value, np.integer) + return False diff --git a/quixstreams/sinks/community/tdengine/sink.py b/quixstreams/sinks/community/tdengine/sink.py index 1e863420f..3324dc27d 100644 --- a/quixstreams/sinks/community/tdengine/sink.py +++ b/quixstreams/sinks/community/tdengine/sink.py @@ -5,11 +5,11 @@ import sys import time from typing import Any, Callable, Iterable, Literal, Mapping, Optional, Union, get_args -from urllib.parse import urljoin, urlencode +from urllib.parse import urlencode, urljoin import urllib3 -from quixstreams.models import HeadersTuples +from quixstreams.models import HeadersTuples from quixstreams.sinks.base import ( BatchingSink, ClientConnectFailureCallback, @@ -38,29 +38,28 @@ class TDengineSink(BatchingSink): - def __init__( - self, - host: str, - database: str, - supertable: SupertableSetter, - subtable: SubtableNameSetter, - fields_keys: FieldsSetter = (), - tags_keys: TagsSetter = (), - time_key: Optional[str] = None, - time_precision: TimePrecision = "ms", - allow_missing_fields: bool = False, - include_metadata_tags: bool = False, - convert_ints_to_floats: bool = False, - batch_size: int = 1000, - enable_gzip: bool = True, - request_timeout_ms: int = 10_000, - on_client_connect_success: Optional[ClientConnectSuccessCallback] = None, - on_client_connect_failure: Optional[ClientConnectFailureCallback] = None, - verify_ssl: bool = True, - username: str = "", - password: str = "", - token: str = "", + self, + host: str, + database: str, + supertable: SupertableSetter, + subtable: SubtableNameSetter, + fields_keys: FieldsSetter = (), + tags_keys: TagsSetter = (), + time_key: Optional[str] = None, + time_precision: TimePrecision = "ms", + allow_missing_fields: bool = False, + include_metadata_tags: bool = False, + convert_ints_to_floats: bool = False, + batch_size: int = 1000, + enable_gzip: bool = True, + request_timeout_ms: int = 10_000, + on_client_connect_success: Optional[ClientConnectSuccessCallback] = None, + on_client_connect_failure: Optional[ClientConnectFailureCallback] = None, + verify_ssl: bool = True, + username: str = "", + password: str = "", + token: str = "", ): """ A connector to sink processed data to TDengine. @@ -155,7 +154,11 @@ def __init__( precision = time_precision if precision == "us": precision = "u" - query_params = {"db": database, "precision": precision, "table_name_key": "__subtable"} + query_params = { + "db": database, + "precision": precision, + "table_name_key": "__subtable", + } header = { "Content-Type": "text/plain; charset=utf-8", } @@ -208,7 +211,9 @@ def _tags_callable(self, setter: TagsSetter) -> TagsCallable: return setter return lambda value: setter - def _subtable_name_callable(self, setter: SubtableNameSetter) -> SubtableNameCallable: + def _subtable_name_callable( + self, setter: SubtableNameSetter + ) -> SubtableNameCallable: if callable(setter): return setter return lambda value: setter @@ -248,14 +253,14 @@ def setup(self): raise err def add( - self, - value: Any, - key: Any, - timestamp: int, - headers: HeadersTuples, - topic: str, - partition: int, - offset: int, + self, + value: Any, + key: Any, + timestamp: int, + headers: HeadersTuples, + topic: str, + partition: int, + offset: int, ): if not isinstance(value, Mapping): raise TypeError( @@ -317,7 +322,11 @@ def write(self, batch: SinkBatch): k: float(v) if isinstance(v, int) else v for k, v in fields.items() } - ts = value[time_key] if time_key is not None and time_key in value else item.timestamp + ts = ( + value[time_key] + if time_key is not None and time_key in value + else item.timestamp + ) record = { "measurement": _measurement, "tags": tags, From 4625cf6d1301a86d6799e5f732b05052ba88238e Mon Sep 17 00:00:00 2001 From: Joel Brass Date: Mon, 30 Jun 2025 11:04:53 -0700 Subject: [PATCH 08/18] Adding TDengine sink md doc --- docs/connectors/sinks/tdengine-sink.md | 153 +++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 docs/connectors/sinks/tdengine-sink.md diff --git a/docs/connectors/sinks/tdengine-sink.md b/docs/connectors/sinks/tdengine-sink.md new file mode 100644 index 000000000..765b00fff --- /dev/null +++ b/docs/connectors/sinks/tdengine-sink.md @@ -0,0 +1,153 @@ +# TDengine Sink + +!!! info + + This is a **Community** connector. Test it before using in production. + + To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. + +TDengine is an open source time series database optimized for IoT, connected vehicles, and industrial applications. + +Quix Streams provides a sink to write processed data to TDengine. + +## How To Install +The dependencies for this sink are not included in the default `quixstreams` package. + +To install them, run the following command: + +```commandline +pip install quixstreams[tdengine] +``` + +## How To Use + +To sink data to TDengine, you need to create an instance of `TDengineSink` and pass it to the `StreamingDataFrame.sink()` method: + +```python + +app = Application(broker_address="127.0.0.1:9092") +topic = app.topic("cpu_topic") + +def generate_subtable_name(row): + return f"cpu_{row['cpu_id']}" + +tdengine_sink = TDengineSink( + host="http://localhost:6041", + username= "root", + password= "taosdata", + database="test_cpu", + supertable="cpu", + subtable = generate_subtable_name, + fields_keys=["percent"], + tags_keys=["cpu_id", "host", "region"], + +) + +sdf = app.dataframe(topic) +sdf.sink(tdengine_sink) + +if __name__ == '__main__': + app.run() + +``` + +## Configuration +TDengineSink accepts the following configuration parameters: + +- `host` - TDengine host in format `"http[s]://[:]"` (e.g., `"http://localhost:6041"`). +- `database` - Database name (must exist before use). +- `supertable` - Supertable name as a string, or a callable that receives the current message data and returns a string. +- `subtable` - Subtable name as a string, or a callable that receives the current message data and returns a string. If empty, a hash value will be generated from the data as the subtable name. +- `fields_keys` - Iterable (list) of strings used as TDengine "fields", or a callable that receives the current message data and returns an iterable of strings. If present, must not overlap with `tags_keys`. If empty, the whole record value will be used. Default: `()`. +- `tags_keys` - Iterable (list) of strings used as TDengine "tags", or a callable that receives the current message data and returns an iterable of strings. If present, must not overlap with `fields_keys`. Given keys are popped from the value dictionary since the same key cannot be both a tag and field. If empty, no tags will be sent. Default: `()`. +- `time_key` - Optional key to use as the timestamp for TDengine. If not set, the record's Kafka timestamp is used. Default: `None`. +- `time_precision` - Time precision for the timestamp. One of: `"ms"`, `"ns"`, `"us"`, `"s"`. Default: `"ms"`. +- `allow_missing_fields` - If `True`, skip missing field keys instead of raising `KeyError`. Default: `False`. +- `include_metadata_tags` - If `True`, includes the record's key, topic, and partition as tags. Default: `False`. +- `convert_ints_to_floats` - If `True`, converts all integer values to floats. Default: `False`. +- `batch_size` - Number of records to write to TDengine in one request. Only affects the size of one write request, not the number of records flushed on each checkpoint. Default: `1000`. +- `enable_gzip` - If `True`, enables gzip compression for writes. Default: `True`. +- `request_timeout_ms` - HTTP request timeout in milliseconds. Default: `10000`. +- `verify_ssl` - If `True`, verifies the SSL certificate. Default: `True`. +- `username` - TDengine username. Default: `""` (empty string). +- `password` - TDengine password. Default: `""` (empty string). +- `token` - TDengine cloud token. Default: `""` (empty string). Either `token` or `username` and `password` must be provided. +- `on_client_connect_success` - Optional callback after successful client authentication. +- `on_client_connect_failure` - Optional callback after failed client authentication (should accept the raised Exception as an argument). + +See the [What data can be sent to TDengine](#what-data-can-be-sent-to-tdengine) section for more info on `fields_keys` and `tags_keys`. + + +## Parameters + + +### `subtable` +Accepts either: +- Static name (string) +- Dynamic generator (callable) + +**Behavior:** +- If empty: Generates hash from data as subtable name +- Callable receives message data (dict) and returns string + +#### Examples: +```python +# Static name +subtable = "meters" + +# Dynamic name +def generate_subtable(row: dict) -> str: + """Create subtable name from data""" + return f"meters_{row['id']}" + +subtable = generate_subtable +``` + + +### supertable +Same interface as subtable: + +- String for static name +- Callable for dynamic name + +### Database Validation +- Verifies database exists during setup + +- Raises error if missing: `Failed to check database status, [904]:Database does not exist` + + +### Expected Behavior +1. Prerequisite: Database must exist before operation + + Error if missing: [904]: Database does not exist + +2. After successful setup: + +- Message example sent to Kafka: + +```json +{"host":"192.168.1.98","region":"EU","percent":12.5,"cpu_id":1} +``` + +- Creates the following tables in TDengine: + +```text +taos> show stables; + stable_name | +================================= + cpu | + +taos> show tables; + table_name | +================================= + cpu_1 | +``` + +- Data verification: + +```text +taos> select * from cpu; + _ts | percent | cpu_id | host | region | +======================================================================== + 2025-06-27 15:02:17.125 | 12.5 | 1 | 192.168.1.98 | EU | +``` \ No newline at end of file From 6fab1922890e17bcfad7c00a00a37757a9c5ab47 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Tue, 1 Jul 2025 09:26:19 +0800 Subject: [PATCH 09/18] fix: update database status check --- docs/connectors/sinks/tdengine-sink.md | 4 ++-- quixstreams/sinks/community/tdengine/sink.py | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/docs/connectors/sinks/tdengine-sink.md b/docs/connectors/sinks/tdengine-sink.md index 765b00fff..d44f93eed 100644 --- a/docs/connectors/sinks/tdengine-sink.md +++ b/docs/connectors/sinks/tdengine-sink.md @@ -113,13 +113,13 @@ Same interface as subtable: ### Database Validation - Verifies database exists during setup -- Raises error if missing: `Failed to check database status, [904]:Database does not exist` +- Raises error if missing: `Database 'your_database' not exists` ### Expected Behavior 1. Prerequisite: Database must exist before operation - Error if missing: [904]: Database does not exist + Error if missing: Database 'your_database' not exists 2. After successful setup: diff --git a/quixstreams/sinks/community/tdengine/sink.py b/quixstreams/sinks/community/tdengine/sink.py index 3324dc27d..0cd507533 100644 --- a/quixstreams/sinks/community/tdengine/sink.py +++ b/quixstreams/sinks/community/tdengine/sink.py @@ -228,7 +228,7 @@ def setup(self): ) # check if the database is alive database = self._client_args["database"] - check_db_sql = f"SHOW `{database}`.alive" + check_db_sql = f"SHOW DATABASES" timeout = urllib3.Timeout(total=self._client_args["timeout"] / 1_000) logger.debug(f"Sending data to {self._client_args['sql_url']} : {check_db_sql}") resp = self._client.request( @@ -240,7 +240,7 @@ def setup(self): ) if resp.status != 200: err = urllib3.exceptions.HTTPError( - f"Failed to check database status: {resp.status} {resp.data}" + f"Failed to get databases: {resp.status} {resp.data}" ) raise err resp_data = json.loads(resp.data.decode("utf-8")) @@ -248,7 +248,13 @@ def setup(self): if resp_code != 0: error_message = resp_data.get("desc", "Unknown error") err = urllib3.exceptions.HTTPError( - f"Failed to check database status, [{resp_code}]:{error_message}" + f"Failed to get databases, [{resp_code}]:{error_message}" + ) + raise err + data = resp_data.get("data") + if not (isinstance(data, list) and any(database == sublist[0] for sublist in data if sublist)): + err = urllib3.exceptions.HTTPError( + f"Database '{database}' not exists" ) raise err From 6c67e6f7908438999a0900ce31d619e91b6160dc Mon Sep 17 00:00:00 2001 From: Joel Brass Date: Tue, 1 Jul 2025 13:53:39 -0700 Subject: [PATCH 10/18] polishing it up --- docs/connectors/sinks/tdengine-sink.md | 46 +++++++++++++++++--- pyproject.toml | 3 +- quixstreams/sinks/community/tdengine/sink.py | 3 +- 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/docs/connectors/sinks/tdengine-sink.md b/docs/connectors/sinks/tdengine-sink.md index d44f93eed..e791b34a0 100644 --- a/docs/connectors/sinks/tdengine-sink.md +++ b/docs/connectors/sinks/tdengine-sink.md @@ -52,6 +52,8 @@ if __name__ == '__main__': ``` ## Configuration + +### Parameter Overview TDengineSink accepts the following configuration parameters: - `host` - TDengine host in format `"http[s]://[:]"` (e.g., `"http://localhost:6041"`). @@ -78,10 +80,10 @@ TDengineSink accepts the following configuration parameters: See the [What data can be sent to TDengine](#what-data-can-be-sent-to-tdengine) section for more info on `fields_keys` and `tags_keys`. -## Parameters +### Parameter Examples -### `subtable` +#### `subtable` Accepts either: - Static name (string) - Dynamic generator (callable) @@ -113,13 +115,13 @@ Same interface as subtable: ### Database Validation - Verifies database exists during setup -- Raises error if missing: `Database 'your_database' not exists` +- Raises error if missing: `Database 'your_database' does not exist` ### Expected Behavior 1. Prerequisite: Database must exist before operation - Error if missing: Database 'your_database' not exists + Error if missing: Database 'your_database' does not exist 2. After successful setup: @@ -150,4 +152,38 @@ taos> select * from cpu; _ts | percent | cpu_id | host | region | ======================================================================== 2025-06-27 15:02:17.125 | 12.5 | 1 | 192.168.1.98 | EU | -``` \ No newline at end of file +``` + + + +## Testing Locally + +Rather than connect to a hosted TDengine instance, you can alternatively test your +application using a local instance of TDengine using Docker: + +1. Execute in terminal: + + ```bash + docker run --rm -d --name tdengine \ + -p 6030:6030 \ + -p 6041:6041 \ + -p 6043-6060:6043-6060 \ + -e TZ=America/New_York \ + -e LC_ALL=C.UTF-8 \ + tdengine/tdengine:latest + ``` + +2. Use the following authentication settings for `TDengineSink` to connect: + + ```python + TDengineSink( + host="http://localhost:6041", + username="root", + password="taosdata", + ... + ) +3. When finished, execute in terminal: + + ```bash + docker stop tdengine + ``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index d25c8da34..371d0ec2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,8 @@ all = [ "pymongo>=4.11,<5", "pandas>=1.0.0,<3.0", "elasticsearch>=8.17,<9", - "influxdb>=5.3,<6" + "influxdb>=5.3,<6", + "python-dateutil>=2.8.2,<3" ] avro = ["fastavro>=1.8,<2.0"] diff --git a/quixstreams/sinks/community/tdengine/sink.py b/quixstreams/sinks/community/tdengine/sink.py index 0cd507533..4990f20ab 100644 --- a/quixstreams/sinks/community/tdengine/sink.py +++ b/quixstreams/sinks/community/tdengine/sink.py @@ -252,9 +252,10 @@ def setup(self): ) raise err data = resp_data.get("data") + # TODO: create the database if it does not exist if not (isinstance(data, list) and any(database == sublist[0] for sublist in data if sublist)): err = urllib3.exceptions.HTTPError( - f"Database '{database}' not exists" + f"Database '{database}' does not exist" ) raise err From 669ff238892a6106d9d34398cbc1743dbddcc487 Mon Sep 17 00:00:00 2001 From: Joel Brass Date: Tue, 1 Jul 2025 17:18:39 -0700 Subject: [PATCH 11/18] linting --- quixstreams/sinks/community/tdengine/sink.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/quixstreams/sinks/community/tdengine/sink.py b/quixstreams/sinks/community/tdengine/sink.py index 4990f20ab..41a343117 100644 --- a/quixstreams/sinks/community/tdengine/sink.py +++ b/quixstreams/sinks/community/tdengine/sink.py @@ -253,10 +253,11 @@ def setup(self): raise err data = resp_data.get("data") # TODO: create the database if it does not exist - if not (isinstance(data, list) and any(database == sublist[0] for sublist in data if sublist)): - err = urllib3.exceptions.HTTPError( - f"Database '{database}' does not exist" - ) + if not ( + isinstance(data, list) + and any(database == sublist[0] for sublist in data if sublist) + ): + err = urllib3.exceptions.HTTPError(f"Database '{database}' does not exist") raise err def add( From 995e05c05c0be02c600b7abd982cda75462a4d3b Mon Sep 17 00:00:00 2001 From: Tim Sawicki <136370015+tim-quix@users.noreply.github.com> Date: Tue, 1 Jul 2025 21:51:01 -0400 Subject: [PATCH 12/18] Update __init__.py add noqs ref --- quixstreams/sinks/community/tdengine/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/quixstreams/sinks/community/tdengine/__init__.py b/quixstreams/sinks/community/tdengine/__init__.py index d765ad057..203d59eb1 100644 --- a/quixstreams/sinks/community/tdengine/__init__.py +++ b/quixstreams/sinks/community/tdengine/__init__.py @@ -1 +1,2 @@ +# noqa: F403 from .sink import * From 9054a71e42d3fe4a93f1b904aaaff22b24f692f1 Mon Sep 17 00:00:00 2001 From: Tim Sawicki <136370015+tim-quix@users.noreply.github.com> Date: Tue, 1 Jul 2025 21:53:03 -0400 Subject: [PATCH 13/18] Update sink.py remove unused string format --- quixstreams/sinks/community/tdengine/sink.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quixstreams/sinks/community/tdengine/sink.py b/quixstreams/sinks/community/tdengine/sink.py index 41a343117..afd3f16ff 100644 --- a/quixstreams/sinks/community/tdengine/sink.py +++ b/quixstreams/sinks/community/tdengine/sink.py @@ -228,7 +228,7 @@ def setup(self): ) # check if the database is alive database = self._client_args["database"] - check_db_sql = f"SHOW DATABASES" + check_db_sql = "SHOW DATABASES" timeout = urllib3.Timeout(total=self._client_args["timeout"] / 1_000) logger.debug(f"Sending data to {self._client_args['sql_url']} : {check_db_sql}") resp = self._client.request( From 054b13b808a5d71d16da48eded06a023a20b13dc Mon Sep 17 00:00:00 2001 From: Tim Sawicki <136370015+tim-quix@users.noreply.github.com> Date: Tue, 1 Jul 2025 22:25:48 -0400 Subject: [PATCH 14/18] Update point.py remove internal field assignment, change a couple staticmethods to classmethods --- quixstreams/sinks/community/tdengine/point.py | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/quixstreams/sinks/community/tdengine/point.py b/quixstreams/sinks/community/tdengine/point.py index b579689ce..9f73a98bf 100644 --- a/quixstreams/sinks/community/tdengine/point.py +++ b/quixstreams/sinks/community/tdengine/point.py @@ -47,13 +47,12 @@ class Point(object): - @staticmethod - def measurement(measurement): + @classmethod + def measurement(cls, measurement): """Create a new Point with specified measurement name.""" - p = Point(measurement) - return p + return cls(measurement) - @staticmethod + @classmethod def from_dict( dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs ): @@ -137,7 +136,7 @@ def from_dict( measurement_ = dictionary[ kwargs.get("record_measurement_key", "measurement") ] - point = Point(measurement_) + point = cls(measurement_) record_tag_keys = kwargs.get("record_tag_keys", None) if record_tag_keys is not None: @@ -168,13 +167,15 @@ def from_dict( # - int: 'i' # - uint: 'u' # - float: '' - point._field_types = dict( - map( - lambda item: ( - item[0], - "i" if item[1] == "int" else "u" if item[1] == "uint" else "", - ), - _field_types.items(), + point.field_types( + dict( + map( + lambda item: ( + item[0], + "i" if item[1] == "int" else "u" if item[1] == "uint" else "", + ), + _field_types.items(), + ) ) ) @@ -189,6 +190,10 @@ def __init__(self, measurement_name): self._write_precision = DEFAULT_WRITE_PRECISION self._field_types = {} + def field_types(self, field_types: dict): + self._field_types = field_types + return self + def time(self, time, write_precision=DEFAULT_WRITE_PRECISION): """ Specify timestamp for DataPoint with declared precision. From ed173a5593932d13a4b1efd3ef2c024f8c214adf Mon Sep 17 00:00:00 2001 From: Tim Sawicki <136370015+tim-quix@users.noreply.github.com> Date: Tue, 1 Jul 2025 22:30:12 -0400 Subject: [PATCH 15/18] Update point.py add missing cls argument --- quixstreams/sinks/community/tdengine/point.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/quixstreams/sinks/community/tdengine/point.py b/quixstreams/sinks/community/tdengine/point.py index 9f73a98bf..646b0c360 100644 --- a/quixstreams/sinks/community/tdengine/point.py +++ b/quixstreams/sinks/community/tdengine/point.py @@ -54,7 +54,10 @@ def measurement(cls, measurement): @classmethod def from_dict( - dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs + cls, + dictionary: dict, + write_precision: str = DEFAULT_WRITE_PRECISION, + **kwargs ): """ Initialize point from 'dict' structure. From 7246e309b9f67f6d08ab720b83819a82c1f052c6 Mon Sep 17 00:00:00 2001 From: Tim Sawicki <136370015+tim-quix@users.noreply.github.com> Date: Tue, 1 Jul 2025 22:36:27 -0400 Subject: [PATCH 16/18] Update __init__.py fix ruff noqa --- quixstreams/sinks/community/tdengine/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quixstreams/sinks/community/tdengine/__init__.py b/quixstreams/sinks/community/tdengine/__init__.py index 203d59eb1..6ffb1dec0 100644 --- a/quixstreams/sinks/community/tdengine/__init__.py +++ b/quixstreams/sinks/community/tdengine/__init__.py @@ -1,2 +1,2 @@ -# noqa: F403 +# ruff: noqa: F403 from .sink import * From 81b4cafed0ba1de2aa2680d3847799d8d27bfe97 Mon Sep 17 00:00:00 2001 From: Tim Sawicki <136370015+tim-quix@users.noreply.github.com> Date: Tue, 1 Jul 2025 22:39:25 -0400 Subject: [PATCH 17/18] Update point.py fix arguments --- quixstreams/sinks/community/tdengine/point.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/quixstreams/sinks/community/tdengine/point.py b/quixstreams/sinks/community/tdengine/point.py index 646b0c360..5fee744fd 100644 --- a/quixstreams/sinks/community/tdengine/point.py +++ b/quixstreams/sinks/community/tdengine/point.py @@ -53,12 +53,7 @@ def measurement(cls, measurement): return cls(measurement) @classmethod - def from_dict( - cls, - dictionary: dict, - write_precision: str = DEFAULT_WRITE_PRECISION, - **kwargs - ): + def from_dict(cls, dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs): """ Initialize point from 'dict' structure. From 61f06f8fea42dc4320c855cdedfec89a4d2acd54 Mon Sep 17 00:00:00 2001 From: Tim Sawicki <136370015+tim-quix@users.noreply.github.com> Date: Tue, 1 Jul 2025 22:42:31 -0400 Subject: [PATCH 18/18] Update point.py fix args...again! --- quixstreams/sinks/community/tdengine/point.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/quixstreams/sinks/community/tdengine/point.py b/quixstreams/sinks/community/tdengine/point.py index 5fee744fd..b3d7d027d 100644 --- a/quixstreams/sinks/community/tdengine/point.py +++ b/quixstreams/sinks/community/tdengine/point.py @@ -53,7 +53,9 @@ def measurement(cls, measurement): return cls(measurement) @classmethod - def from_dict(cls, dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs): + def from_dict( + cls, dictionary: dict, write_precision: str = DEFAULT_WRITE_PRECISION, **kwargs + ): """ Initialize point from 'dict' structure.