Skip to content

Commit f5eca2e

Browse files
Merge pull request #60 from InfluxCommunity/59-add-custom-port-and-address
added custom url and port
2 parents 7e6bc49 + 9363038 commit f5eca2e

File tree

5 files changed

+148
-5
lines changed

5 files changed

+148
-5
lines changed
File renamed without changes.

Examples/community/custom_url.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options
2+
import pandas as pd
3+
import random
4+
5+
6+
class BatchingCallback(object):
7+
8+
def success(self, conf, data: str):
9+
print(f"Written batch: {conf}, data: {data}")
10+
11+
def error(self, conf, data: str, exception: InfluxDBError):
12+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
13+
14+
def retry(self, conf, data: str, exception: InfluxDBError):
15+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
16+
17+
callback = BatchingCallback()
18+
19+
20+
write_options = WriteOptions(batch_size=100,
21+
flush_interval=10_000,
22+
jitter_interval=2_000,
23+
retry_interval=5_000,
24+
max_retries=5,
25+
max_retry_delay=30_000,
26+
exponential_base=2)
27+
28+
wco = write_client_options(success_callback=callback.success,
29+
error_callback=callback.error,
30+
retry_callback=callback.retry,
31+
WriteOptions=write_options
32+
)
33+
34+
client = InfluxDBClient3(
35+
token="",
36+
host="https://eu-central-1-1.aws.cloud2.influxdata.com:442",
37+
org="6a841c0c08328fb1",
38+
database="pokemon-codex", enable_gzip=True, write_client_options=wco, write_port_overwrite=443, query_port_overwrite=443)
39+
40+
now = pd.Timestamp.now(tz='UTC').floor('ms')
41+
42+
# Lists of possible trainers
43+
trainers = ["ash", "brock", "misty", "gary", "jessie", "james"]
44+
45+
# Read the CSV into a DataFrame
46+
pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv")
47+
48+
# Creating an empty list to store the data
49+
data = []
50+
51+
# Dictionary to keep track of the number of times each trainer has caught each Pokémon
52+
trainer_pokemon_counts = {}
53+
54+
# Number of entries we want to create
55+
num_entries = 1000
56+
57+
# Generating random data
58+
for i in range(num_entries):
59+
trainer = random.choice(trainers)
60+
61+
# Randomly select a row from pokemon_df
62+
random_pokemon = pokemon_df.sample().iloc[0]
63+
caught = random_pokemon['Name']
64+
65+
# Count the number of times this trainer has caught this Pokémon
66+
if (trainer, caught) in trainer_pokemon_counts:
67+
trainer_pokemon_counts[(trainer, caught)] += 1
68+
else:
69+
trainer_pokemon_counts[(trainer, caught)] = 1
70+
71+
# Get the number for this combination of trainer and Pokémon
72+
num = trainer_pokemon_counts[(trainer, caught)]
73+
74+
entry = {
75+
"trainer": trainer,
76+
"id": f"{0000 + random_pokemon['#']:04d}",
77+
"num": str(num),
78+
"caught": caught,
79+
"level": random.randint(5, 20),
80+
"attack": random_pokemon['Attack'],
81+
"defense": random_pokemon['Defense'],
82+
"hp": random_pokemon['HP'],
83+
"speed": random_pokemon['Speed'],
84+
"type1": random_pokemon['Type 1'],
85+
"type2": random_pokemon['Type 2'],
86+
"timestamp": now
87+
}
88+
data.append(entry)
89+
90+
# Convert the list of dictionaries to a DataFrame
91+
caught_pokemon_df = pd.DataFrame(data).set_index('timestamp')
92+
93+
# Print the DataFrame
94+
print(caught_pokemon_df)
95+
96+
97+
try:
98+
client.write(caught_pokemon_df, data_frame_measurement_name='caught',
99+
data_frame_tag_columns=['trainer', 'id', 'num'])
100+
except Exception as e:
101+
print(f"Error writing point: {e}")
File renamed without changes.

Examples/debugging/get_trace.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from influxdb_client_3 import InfluxDBClient3
2+
import pandas as pd
3+
from influxdb_client_3.debug import TracingClientMiddleWareFactory
4+
5+
6+
7+
client = InfluxDBClient3(
8+
token="",
9+
host="eu-central-1-1.aws.cloud2.influxdata.com",
10+
org="6a841c0c08328fb1",
11+
database="pokemon-codex",
12+
flight_client_options={"middleware": (TracingClientMiddleWareFactory(),)})
13+
14+
15+
sql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time >= now() - interval '1 hour' LIMIT 5'''
16+
table = client.query(query=sql, language='sql', mode='all')
17+
print(table)
18+
19+
20+
influxql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time > now() - 1h LIMIT 5'''
21+
reader = client.query(query=influxql, language='influxql', mode='chunk')
22+
try:
23+
while True:
24+
batch, buff = reader.read_chunk()
25+
print("batch:")
26+
print(buff)
27+
except StopIteration:
28+
print("No more chunks to read")

influxdb_client_3/__init__.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from influxdb_client.client.exceptions import InfluxDBError
77
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
88
from influxdb_client_3.read_file import UploadFile
9+
import urllib.parse
910

1011

1112

@@ -49,6 +50,8 @@ def __init__(
4950
token=None,
5051
write_client_options=None,
5152
flight_client_options=None,
53+
write_port_overwrite=None,
54+
query_port_overwrite=None,
5255
**kwargs):
5356
"""
5457
Initialize an InfluxDB client.
@@ -71,21 +74,32 @@ def __init__(
7174
self._database = database
7275
self._token = token
7376
self._write_client_options = write_client_options if write_client_options is not None else default_client_options(write_options=SYNCHRONOUS)
74-
75-
# Extracting the hostname from URL if provided
77+
78+
# Parse the host input
7679
parsed_url = urllib.parse.urlparse(host)
77-
host = parsed_url.hostname or host
80+
81+
# Determine the protocol (scheme), hostname, and port
82+
scheme = parsed_url.scheme if parsed_url.scheme else "https"
83+
hostname = parsed_url.hostname if parsed_url.hostname else host
84+
port = parsed_url.port if parsed_url.port else 443
85+
86+
# Construct the clients using the parsed values
87+
if write_port_overwrite is not None:
88+
port = write_port_overwrite
7889

7990
self._client = _InfluxDBClient(
80-
url=f"https://{host}",
91+
url=f"{scheme}://{hostname}:{port}",
8192
token=self._token,
8293
org=self._org,
8394
**kwargs)
8495

8596
self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
8697
self._flight_client_options = flight_client_options or {}
87-
self._flight_client = FlightClient(f"grpc+tls://{host}:443", **self._flight_client_options)
8898

99+
if query_port_overwrite is not None:
100+
port = query_port_overwrite
101+
self._flight_client = FlightClient(f"grpc+tls://{hostname}:{port}", **self._flight_client_options)
102+
89103

90104

91105
def write(self, record=None, database=None ,**kwargs):

0 commit comments

Comments
 (0)