@@ -57,7 +57,7 @@ def __init__(
57
57
org = self ._org ,
58
58
** kwargs )
59
59
60
- self ._write_api = _WriteApi (self ._client , ** self ._write_client_options )
60
+ self ._write_api = _WriteApi (influxdb_client = self ._client , ** self ._write_client_options )
61
61
self ._flight_client_options = flight_client_options or {}
62
62
self ._flight_client = FlightClient (f"grpc+tls://{ host } :443" , ** self ._flight_client_options )
63
63
@@ -75,7 +75,8 @@ def write(self, record=None, **kwargs):
75
75
try :
76
76
self ._write_api .write (bucket = self ._database , record = record , ** kwargs )
77
77
except InfluxDBError as e :
78
- print (f"InfluxDB Error: { e } " )
78
+ raise e
79
+
79
80
80
81
def write_file (self , file , measurement_name = None , tag_columns = None , timestamp_column = 'time' , ** kwargs ):
81
82
"""
@@ -96,7 +97,8 @@ def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_co
96
97
df = table .to_pandas () if isinstance (table , pa .Table ) else table
97
98
self ._process_dataframe (df , measurement_name , tag_columns or [], timestamp_column )
98
99
except Exception as e :
99
- print (f"Error writing file: { e } " )
100
+ raise e
101
+
100
102
101
103
def _process_dataframe (self , df , measurement_name , tag_columns , timestamp_column ):
102
104
# This function is factored out for clarity.
@@ -133,19 +135,22 @@ def query(self, query, language="sql", mode="all"):
133
135
:type mode: str
134
136
:return: The queried data.
135
137
"""
136
- ticket_data = {"database" : self ._database , "sql_query" : query , "query_type" : language }
137
- ticket = Ticket (json .dumps (ticket_data ).encode ('utf-8' ))
138
- flight_reader = self ._flight_client .do_get (ticket , self ._options )
139
-
140
- mode_func = {
141
- "all" : flight_reader .read_all ,
142
- "pandas" : flight_reader .read_pandas ,
143
- "chunk" : lambda : flight_reader ,
144
- "reader" : flight_reader .to_reader ,
145
- "schema" : lambda : flight_reader .schema
146
- }.get (mode , flight_reader .read_all )
147
-
148
- return mode_func () if callable (mode_func ) else mode_func
138
+ try :
139
+ ticket_data = {"database" : self ._database , "sql_query" : query , "query_type" : language }
140
+ ticket = Ticket (json .dumps (ticket_data ).encode ('utf-8' ))
141
+ flight_reader = self ._flight_client .do_get (ticket , self ._options )
142
+
143
+ mode_func = {
144
+ "all" : flight_reader .read_all ,
145
+ "pandas" : flight_reader .read_pandas ,
146
+ "chunk" : lambda : flight_reader ,
147
+ "reader" : flight_reader .to_reader ,
148
+ "schema" : lambda : flight_reader .schema
149
+ }.get (mode , flight_reader .read_all )
150
+
151
+ return mode_func () if callable (mode_func ) else mode_func
152
+ except Exception as e :
153
+ raise e
149
154
150
155
def close (self ):
151
156
"""Close the client and clean up resources."""
0 commit comments