Skip to content

Commit 66776f1

Browse files
alespourbednar
andauthored
fix: skip infinite values during serialization (#99)
* fix: skip infinite values during serialization to line protocol * fix: deprecated method call * fix: use apply() with lambda instead of replace() for future Pandas 3.x compatiblity * refactor: backport tests from v2 client --------- Co-authored-by: Jakub Bednar <jakub.bednar@gmail.com>
1 parent 1fe1460 commit 66776f1

File tree

4 files changed

+468
-11
lines changed

4 files changed

+468
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Bug Fixes
66

77
1. [#95](https://github.com/InfluxCommunity/influxdb3-python/pull/95): `Polars` is optional dependency
8+
1. [#99](https://github.com/InfluxCommunity/influxdb3-python/pull/99): Skip infinite values during serialization to line protocol
89

910
## 0.6.1 [2024-06-25]
1011

influxdb_client_3/write_client/client/write/dataframe_serializer.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
132132
keys = []
133133
# tags holds a list of tag f-string segments ordered alphabetically by tag key.
134134
tags = []
135-
# fields holds a list of field f-string segments ordered alphebetically by field key
135+
# fields holds a list of field f-string segments ordered alphabetically by field key
136136
fields = []
137137
# field_indexes holds the index into each row of all the fields.
138138
field_indexes = []
@@ -160,6 +160,11 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
160160
# null_columns has a bool value for each column holding
161161
# whether that column contains any null (NaN or None) values.
162162
null_columns = data_frame.isnull().any()
163+
164+
# inf_columns has a bool value for each column holding
165+
# whether that column contains any Inf values.
166+
inf_columns = data_frame.isin([np.inf, -np.inf]).any()
167+
163168
timestamp_index = 0
164169

165170
# Iterate through the columns building up the expression for each column.
@@ -175,9 +180,10 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
175180

176181
if key in data_frame_tag_columns:
177182
# This column is a tag column.
178-
if null_columns.iloc[index]:
183+
if null_columns.iloc[index] or inf_columns.iloc[index]:
179184
key_value = f"""{{
180-
'' if {val_format} == '' or pd.isna({val_format}) else
185+
'' if {val_format} == '' or pd.isna({val_format}) or
186+
({inf_columns.iloc[index]} and np.isinf({val_format})) else
181187
f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}'
182188
}}"""
183189
else:
@@ -199,16 +205,17 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
199205
if (issubclass(value.type, np.integer) or issubclass(value.type, np.floating) or
200206
issubclass(value.type, np.bool_)):
201207
suffix = 'i' if issubclass(value.type, np.integer) else ''
202-
if null_columns.iloc[index]:
208+
if null_columns.iloc[index] or inf_columns.iloc[index]:
203209
field_value = (
204-
f"""{{"" if pd.isna({val_format}) else f"{sep}{key_format}={{{val_format}}}{suffix}"}}"""
210+
f"""{{"" if pd.isna({val_format}) or ({inf_columns.iloc[index]} and np.isinf({val_format})) else
211+
f"{sep}{key_format}={{{val_format}}}{suffix}"}}"""
205212
)
206213
else:
207214
field_value = f'{sep}{key_format}={{{val_format}}}{suffix}'
208215
else:
209-
if null_columns.iloc[index]:
216+
if null_columns.iloc[index] or inf_columns.iloc[index]:
210217
field_value = f"""{{
211-
'' if pd.isna({val_format}) else
218+
'' if pd.isna({val_format}) or ({inf_columns.iloc[index]} and np.isinf({val_format})) else
212219
f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'
213220
}}"""
214221
else:
@@ -234,11 +241,12 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
234241
'_ESCAPE_STRING': _ESCAPE_STRING,
235242
'keys': keys,
236243
'pd': pd,
244+
'np': np,
237245
})
238246

239247
for k, v in dict(data_frame.dtypes).items():
240248
if k in data_frame_tag_columns:
241-
data_frame[k].replace('', np.nan, inplace=True)
249+
data_frame[k] = data_frame[k].apply(lambda x: np.nan if x == '' else x)
242250

243251
self.data_frame = data_frame
244252
self.f = f

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def serialize(self, chunk_idx: int = None):
136136
chunk = df[chunk_idx * self.chunk_size:(chunk_idx + 1) * self.chunk_size]
137137

138138
# Apply the UDF to each row
139-
line_protocol_expr = chunk.apply(self.to_line_protocol, return_dtype=pl.Object)
139+
line_protocol_expr = chunk.map_rows(self.to_line_protocol, return_dtype=pl.Object)
140140

141141
lp = line_protocol_expr['map'].to_list()
142142

0 commit comments

Comments
 (0)