Skip to content

Commit 80cf49f

Browse files
Merge pull request #26 from InfluxCommunity/23-influxdbclient3-doesnt-pass-success_callback-and-error_callback-args-to-writeapi
added write_client_option
2 parents b0751be + 93a9ffe commit 80cf49f

File tree

2 files changed

+47
-22
lines changed

2 files changed

+47
-22
lines changed

Examples/batching-example.py

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,23 @@
55
import influxdb_client_3 as InfluxDBClient3
66
import pandas as pd
77
import numpy as np
8-
from influxdb_client_3 import write_options, WritePrecision
8+
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
99
import datetime
1010
import time
1111

1212

13+
class BatchingCallback(object):
14+
15+
def success(self, conf, data: str):
16+
print(f"Written batch: {conf}, data: {data}")
17+
18+
def error(self, conf, data: str, exception: InfluxDBError):
19+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
20+
21+
def retry(self, conf, data: str, exception: InfluxDBError):
22+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
23+
24+
1325
# Creating 5.000 gatewayId values as MongoDB ObjectIDs
1426
gatewayIds = [ObjectId() for x in range(0, 100)]
1527

@@ -18,27 +30,37 @@
1830

1931
# Setting timestamp for first sensor reading
2032
now = datetime.datetime.now()
21-
now = now - datetime.timedelta(days=366)
33+
now = now - datetime.timedelta(days=30)
2234
teststart = datetime.datetime.now()
2335

2436
# InfluxDB connection details
2537
token = ""
2638
org = ""
27-
bucket = ""
28-
url = ""
39+
database = ""
40+
url = "eu-central-1-1.aws.cloud2.influxdata.com"
41+
42+
callback = BatchingCallback()
43+
44+
write_options = WriteOptions(batch_size=5_000,
45+
flush_interval=10_000,
46+
jitter_interval=2_000,
47+
retry_interval=5_000,
48+
max_retries=5,
49+
max_retry_delay=30_000,
50+
exponential_base=2)
2951

52+
wco = write_client_options(success_callback=callback.success,
53+
error_callback=callback.error,
54+
retry_callback=callback.retry,
55+
WriteOptions=write_options
56+
)
3057
# Opening InfluxDB client with a batch size of 5k points or flush interval
3158
# of 10k ms and gzip compression
3259
with InfluxDBClient3.InfluxDBClient3(token=token,
3360
host=url,
3461
org=org,
35-
database="solarmanager", enable_gzip=True, write_options=write_options(batch_size=5_000,
36-
flush_interval=10_000,
37-
jitter_interval=2_000,
38-
retry_interval=5_000,
39-
max_retries=5,
40-
max_retry_delay=30_000,
41-
exponential_base=2, write_type='batching')) as _client:
62+
database=database, enable_gzip=True, write_client_options=wco) as _client:
63+
4264

4365
# Creating iterator for one hour worth of data (6 sensor readings per
4466
# minute)

influxdb_client_3/__init__.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33
from pyarrow import csv
44
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
55
from influxdb_client import InfluxDBClient as _InfluxDBClient
6-
from influxdb_client import WriteOptions as _WriteOptions
6+
from influxdb_client import WriteOptions as WriteOptions
77
from influxdb_client.client.write_api import WriteApi as _WriteApi
8-
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
9-
from influxdb_client.client.write_api import PointSettings
8+
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, PointSettings
109
from influxdb_client.domain.write_precision import WritePrecision
10+
from influxdb_client.client.exceptions import InfluxDBError
1111
from influxdb_client import Point
1212
import json
1313

1414

15-
def write_options(**kwargs):
16-
return _WriteOptions(**kwargs)
15+
def write_client_options(**kwargs):
16+
return kwargs
1717

1818

1919
def flight_client_options(**kwargs):
@@ -27,7 +27,7 @@ def __init__(
2727
org=None,
2828
database=None,
2929
token=None,
30-
write_options=None,
30+
write_client_options=None,
3131
flight_client_options=None,
3232
**kwargs):
3333
"""
@@ -36,24 +36,26 @@ def __init__(
3636
* org (str, optional): The InfluxDB organization name to be used for operations. Defaults to None.
3737
* database (str, optional): The database to be used for InfluxDB operations. Defaults to None.
3838
* token (str, optional): The authentication token for accessing the InfluxDB server. Defaults to None.
39-
* write_options (enum, optional): Specifies the write mode (synchronous or asynchronous) to use when writing data points to InfluxDB. Defaults to SYNCHRONOUS.
39+
* write_options (ANY, optional): Exposes InfuxDB WriteAPI options.
4040
* **kwargs: Additional arguments to be passed to the InfluxDB Client.
4141
"""
4242
self._org = org
4343
self._database = database
44-
self.write_options = write_options if write_options is not None else SYNCHRONOUS
44+
self.write_client_options = write_client_options if write_client_options is not None else write_client_options(write_options=SYNCHRONOUS)
4545
self._client = _InfluxDBClient(
4646
url=f"https://{host}",
4747
token=token,
4848
org=self._org,
4949
**kwargs)
50+
5051
self._write_api = _WriteApi(
51-
self._client, write_options=self.write_options)
52+
self._client, **self.write_client_options)
5253

5354
self._flight_client_options = flight_client_options if flight_client_options is not None else {}
5455
self._flight_client = FlightClient(
5556
f"grpc+tls://{host}:443",
5657
**self._flight_client_options)
58+
5759
# create an authorization header
5860
self._options = FlightCallOptions(
5961
headers=[(b"authorization", f"Bearer {token}".encode('utf-8'))])
@@ -140,6 +142,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
140142
"PointSettings",
141143
"SYNCHRONOUS",
142144
"ASYNCHRONOUS",
143-
"write_options",
145+
"write_client_options",
144146
"WritePrecision",
145-
"flight_client_options"]
147+
"flight_client_options",
148+
"WriteOptions"]

0 commit comments

Comments
 (0)