diff --git a/CHANGELOG.md b/CHANGELOG.md index 3776d69..f01a54d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Features 1. [#174](https://github.com/InfluxCommunity/influxdb3-csharp/pull/174): Support passing HttpClient to InfluxDBClient. +1. [#175](https://github.com/InfluxCommunity/influxdb3-csharp/pull/175): Add QueryTimeout and WriteTimeout to ClientConfig. ## 1.3.0 [2025-08-12] diff --git a/Client.Test.Integration/QueryWriteTest.cs b/Client.Test.Integration/QueryWriteTest.cs index 1c9acc6..1f18c56 100644 --- a/Client.Test.Integration/QueryWriteTest.cs +++ b/Client.Test.Integration/QueryWriteTest.cs @@ -169,22 +169,92 @@ public async Task MaxReceiveMessageSize() } [Test] - public void GrpcDeadline() + public async Task TimeoutExceededByDeadline() { using var client = new InfluxDBClient(new ClientConfig { Host = Host, Token = Token, Database = Database, + WriteTimeout = TimeSpan.FromSeconds(11), + QueryTimeout = TimeSpan.FromSeconds(11), QueryOptions = new QueryOptions() { - Deadline = DateTime.UtcNow.AddMicroseconds(1) + Deadline = DateTime.UtcNow.AddTicks(1) // Deadline will have a higher priority than QueryTimeout } }); + await client.WriteRecordAsync("mem,tag=a field=1"); + TestQuery(client); + TestQueryBatches(client); + TestQueryPoints(client); + } + + [Test] + public async Task TimeoutExceededByQueryTimeout() + { + using var client = new InfluxDBClient(new ClientConfig + { + Host = Host, + Token = Token, + Database = Database, + WriteTimeout = TimeSpan.FromSeconds(11), + QueryTimeout = TimeSpan.FromTicks(1) + }); + await client.WriteRecordAsync("mem,tag=a field=1"); + TestQuery(client); + TestQueryBatches(client); + TestQueryPoints(client); + } + + [Test] + public async Task TimeoutExceeded() + { + using var client = new InfluxDBClient(new ClientConfig + { + Host = Host, + Token = Token, + Database = Database, + WriteTimeout = TimeSpan.FromSeconds(11), + QueryTimeout = TimeSpan.FromSeconds(11), + QueryOptions = + { + Deadline = DateTime.UtcNow.AddSeconds(11), + } + }); + await client.WriteRecordAsync("mem,tag=a field=1"); + var timeout = TimeSpan.FromTicks(1); + TestQuery(client, timeout); + TestQueryBatches(client, timeout); + TestQueryPoints(client, timeout); + } + + private static void TestQuery(InfluxDBClient client, TimeSpan? timeout = null) + { + var ex = Assert.ThrowsAsync(async () => + { + await foreach (var _ in client.Query("SELECT * FROM mem", timeout: timeout)) + { + } + }); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCode.DeadlineExceeded)); + } + private static void TestQueryBatches(InfluxDBClient client, TimeSpan? timeout = null) + { + var ex = Assert.ThrowsAsync(async () => + { + await foreach (var _ in client.QueryBatches("SELECT * FROM mem", timeout: timeout)) + { + } + }); + Assert.That(ex.StatusCode, Is.EqualTo(StatusCode.DeadlineExceeded)); + } + + private static void TestQueryPoints(InfluxDBClient client, TimeSpan? timeout = null) + { var ex = Assert.ThrowsAsync(async () => { - await foreach (var _ in client.Query("SELECT value FROM stat")) + await foreach (var _ in client.QueryPoints("SELECT * FROM mem", timeout: timeout)) { } }); diff --git a/Client.Test.Integration/WriteTest.cs b/Client.Test.Integration/WriteTest.cs index 328b7be..5ddb051 100644 --- a/Client.Test.Integration/WriteTest.cs +++ b/Client.Test.Integration/WriteTest.cs @@ -33,6 +33,7 @@ public async Task WriteWithError() Assert.That(iaex.Message, Does.Contain("Found trailing content") .Or.Contain("partial write of line protocol occurred") + .Or.Contain("write buffer error: parsing for line protocol failed") ); Assert.That(iaex.StatusCode.ToString(), Is.EqualTo("BadRequest")); Assert.That(iaex.StatusCode, Is.EqualTo(HttpStatusCode.BadRequest)); diff --git a/Client.Test/InfluxDBClientQueryTest.cs b/Client.Test/InfluxDBClientQueryTest.cs index e34008d..18250ea 100644 --- a/Client.Test/InfluxDBClientQueryTest.cs +++ b/Client.Test/InfluxDBClientQueryTest.cs @@ -54,7 +54,7 @@ public async Task PassNamedParametersToFlightClient() var mockFlightSqlClient = new Mock(); mockFlightSqlClient .Setup(m => m.Execute(It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny>(), It.IsAny>())) + It.IsAny>(), It.IsAny>(), It.IsAny())) .Returns(new List().ToAsyncEnumerable()); // @@ -74,9 +74,9 @@ public async Task PassNamedParametersToFlightClient() { "max-frequency", 3.5 } }; - _ = await _client.QueryPoints(query, database: "my-db", queryType: queryType, namedParameters: namedParameters) + _ = await _client.QueryPoints(query, database: "my-db", queryType: queryType, namedParameters: namedParameters, timeout: TimeSpan.MaxValue) .ToListAsync(); - mockFlightSqlClient.Verify(m => m.Execute(query, "my-db", queryType, namedParameters, new Dictionary()), Times.Exactly(1)); + mockFlightSqlClient.Verify(m => m.Execute(query, "my-db", queryType, namedParameters, new Dictionary(), TimeSpan.MaxValue), Times.Exactly(1)); } [Test] @@ -109,7 +109,7 @@ public async Task PassHeadersToFlightClient() var mockFlightSqlClient = new Mock(); mockFlightSqlClient .Setup(m => m.Execute(It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny>(), It.IsAny>())) + It.IsAny>(), It.IsAny>(), It.IsAny())) .Returns(new List().ToAsyncEnumerable()); // @@ -128,6 +128,6 @@ public async Task PassHeadersToFlightClient() }}; _ = await _client.QueryPoints(query, database: "my-db", queryType: queryType, headers: headers) .ToListAsync(); - mockFlightSqlClient.Verify(m => m.Execute(query, "my-db", queryType, new Dictionary(), headers), Times.Exactly(1)); + mockFlightSqlClient.Verify(m => m.Execute(query, "my-db", queryType, new Dictionary(), headers, null), Times.Exactly(1)); } } \ No newline at end of file diff --git a/Client.Test/InfluxDBClientWriteTest.cs b/Client.Test/InfluxDBClientWriteTest.cs index c97c09a..a089e96 100644 --- a/Client.Test/InfluxDBClientWriteTest.cs +++ b/Client.Test/InfluxDBClientWriteTest.cs @@ -2,13 +2,14 @@ using System.Collections.Generic; using System.Linq; using System.Net; -using System.Net.Http; +using System.Threading; using System.Threading.Tasks; using InfluxDB3.Client.Config; using InfluxDB3.Client.Write; using WireMock.Matchers; using WireMock.RequestBuilders; using WireMock.ResponseBuilders; +using WriteOptions = InfluxDB3.Client.Config.WriteOptions; namespace InfluxDB3.Client.Test; @@ -493,55 +494,113 @@ public void WriteNoSyncTrueNotSupported() } [Test] - public async Task TestSetHttpClient() + public void TimeoutExceededByTimeout() { MockServer .Given(Request.Create().WithPath("/api/v2/write").UsingPost()) - .RespondWith(Response.Create().WithStatusCode(HttpStatusCode.OK)); - - var httpClient = new HttpClient(); - httpClient.DefaultRequestHeaders.UserAgent.ParseAdd("my-user-agent"); - httpClient.DefaultRequestHeaders.Add("X-Client-ID", "123"); + .RespondWith(Response.Create().WithStatusCode(204).WithDelay(TimeSpan.FromSeconds(2))); _client = new InfluxDBClient(new ClientConfig { Host = MockServerUrl, Token = "my-token", Database = "my-database", - HttpClient = httpClient + Timeout = TimeSpan.FromTicks(1) }); + TestWriteRecordAsync(_client); + TestWriteRecordsAsync(_client); + TestWritePointAsync(_client); + TestWritePointsAsync(_client); + } - await _client.WriteRecordAsync("mem,tag=a field=1"); - var requests = MockServer.LogEntries.ToList(); - using (Assert.EnterMultipleScope()) + [Test] + public void TimeoutExceededByWriteTimeout() + { + MockServer + .Given(Request.Create().WithPath("/api/v2/write").UsingPost()) + .RespondWith(Response.Create().WithStatusCode(204).WithDelay(TimeSpan.FromSeconds(2))); + + _client = new InfluxDBClient(new ClientConfig { - Assert.That(requests[0].RequestMessage.Headers?["User-Agent"].First(), Is.EqualTo("my-user-agent")); - Assert.That(requests[0].RequestMessage.Headers["X-Client-ID"].First(), Is.EqualTo("123")); - } - Assert.Pass(); + Host = MockServerUrl, + Token = "my-token", + Database = "my-database", + QueryTimeout = TimeSpan.FromSeconds(11), + Timeout = TimeSpan.FromSeconds(11), + WriteTimeout = TimeSpan.FromTicks(1) // WriteTimeout has a higher priority than Timeout + }); + TestWriteRecordAsync(_client); + TestWriteRecordsAsync(_client); + TestWritePointAsync(_client); + TestWritePointsAsync(_client); + } [Test] - public void TestCheckHttpClientStillOpen() + public void TimeoutExceededByToken() { MockServer - .Given(Request.Create().WithPath("/test").UsingGet()) - .RespondWith( - Response.Create() - .WithStatusCode(HttpStatusCode.OK) - .WithBody("Still ok")); + .Given(Request.Create().WithPath("/api/v2/write").UsingPost()) + .RespondWith(Response.Create().WithStatusCode(204).WithDelay(TimeSpan.FromSeconds(2))); - var httpClient = new HttpClient(new HttpClientHandler()); _client = new InfluxDBClient(new ClientConfig { Host = MockServerUrl, Token = "my-token", Database = "my-database", - HttpClient = httpClient + QueryTimeout = TimeSpan.FromSeconds(11), + Timeout = TimeSpan.FromSeconds(11), + WriteTimeout = TimeSpan.FromSeconds(11) }); - _client.Dispose(); + var cancellationToken = new CancellationTokenSource(TimeSpan.FromTicks(1)).Token; + TestWriteRecordAsync(_client, cancellationToken); + TestWriteRecordsAsync(_client, cancellationToken); + TestWritePointAsync(_client, cancellationToken); + TestWritePointsAsync(_client, cancellationToken); + } - var httpResponseMessage = httpClient.Send(new HttpRequestMessage(HttpMethod.Get, "test")); - Assert.That(httpResponseMessage.Content.ReadAsStringAsync().Result, Is.EqualTo("Still ok")); + private static void TestWriteRecordAsync(InfluxDBClient client, CancellationToken? cancellationToken = null) + { + Assert.ThrowsAsync(async () => + { + await client.WriteRecordAsync("mem,tag=a field=1", cancellationToken: cancellationToken ?? CancellationToken.None); + }); + } + + private static void TestWriteRecordsAsync(InfluxDBClient client, CancellationToken? cancellationToken = null) + { + Assert.ThrowsAsync(async () => + { + await client.WriteRecordsAsync( + records: new[] { "stat,unit=temperature value=24.5", "stat,unit=temperature value=25.5" }, + cancellationToken: cancellationToken ?? CancellationToken.None + ); + }); + } + + private static void TestWritePointAsync(InfluxDBClient client, CancellationToken? cancellationToken = null) + { + Assert.ThrowsAsync(async () => + { + await client.WritePointAsync( + PointData.Measurement("h2o").SetTag("location", "europe").SetField("level", 2), + cancellationToken: cancellationToken ?? CancellationToken.None + ); + }); + } + + private static void TestWritePointsAsync(InfluxDBClient client, CancellationToken? cancellationToken = null) + { + Assert.ThrowsAsync(async () => + { + await client.WritePointsAsync( + points: new[] + { + PointData.Measurement("h2o").SetTag("location", "europe").SetField("level", 2), + PointData.Measurement("h2o").SetTag("location", "us-west").SetField("level", 4), + }, + cancellationToken: cancellationToken ?? CancellationToken.None + ); + }); } } \ No newline at end of file diff --git a/Client.Test/Internal/RestClientTest.cs b/Client.Test/Internal/RestClientTest.cs index 07a02bb..d4a0b5f 100644 --- a/Client.Test/Internal/RestClientTest.cs +++ b/Client.Test/Internal/RestClientTest.cs @@ -279,19 +279,6 @@ public void AllowHttpRedirects() Assert.That(_client, Is.Not.Null); } - [Test] - public void Timeout() - { - CreateAndConfigureRestClient(new ClientConfig - { - Host = MockServerUrl, - Timeout = TimeSpan.FromSeconds(45) - }); - - var httpClient = GetDeclaredField(_client.GetType(), _client, "_httpClient"); - Assert.That(httpClient.Timeout, Is.EqualTo(TimeSpan.FromSeconds(45))); - } - private void CreateAndConfigureRestClient(ClientConfig config) { _httpClient = InfluxDBClient.CreateOrGetHttpClient(config); diff --git a/Client/Config/ClientConfig.cs b/Client/Config/ClientConfig.cs index 7d318af..c687bf6 100644 --- a/Client/Config/ClientConfig.cs +++ b/Client/Config/ClientConfig.cs @@ -19,7 +19,9 @@ namespace InfluxDB3.Client.Config; /// - Organization: The organization to be used for operations. /// - Database: The database to be used for InfluxDB operations. /// - Headers: The set of HTTP headers to be included in requests. -/// - Timeout: Timeout to wait before the HTTP request times out. Default to '10 seconds'. +/// - [Deprecated] Timeout: Timeout to wait before the HTTP request times out. Default to '10 seconds'. +/// - QueryTimeout: The maximum duration to wait for a query to complete before timing out. +/// - WriteTimeout: The duration to wait before timing out a write operation to the InfluxDB server. /// - AllowHttpRedirects: Automatically following HTTP 3xx redirects. Default to 'false'. /// - DisableServerCertificateValidation: Disable server SSL certificate validation. Default to 'false'. /// - DisableCertificateRevocationListCheck: Disable SSL certificate revocation list (CRL) checking. Default to 'false'. @@ -165,8 +167,19 @@ public string Host /// /// Timeout to wait before the HTTP request times out. Default to '10 seconds'. /// + [Obsolete("Please use more informative properties like WriteTimeout or QueryTimeout")] public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10); + /// + /// The maximum duration to wait for a query to complete before timing out. + /// + public TimeSpan? QueryTimeout { get; set; } + + /// + /// The duration to wait before timing out a write operation to the InfluxDB server. + /// + public TimeSpan? WriteTimeout { get; set; } + /// /// Automatically following HTTP 3xx redirects. Default to 'false'. /// diff --git a/Client/InfluxDBClient.cs b/Client/InfluxDBClient.cs index 90b1e1e..af6fb66 100644 --- a/Client/InfluxDBClient.cs +++ b/Client/InfluxDBClient.cs @@ -62,10 +62,13 @@ public interface IInfluxDBClient : IDisposable /// The headers to be added to query request. The headers specified here are preferred over /// the headers specified in the client configuration. /// + /// + /// The timeout to use for only this query request. + /// /// Batches of rows /// The client is already disposed IAsyncEnumerable Query(string query, QueryType? queryType = null, string? database = null, - Dictionary? namedParameters = null, Dictionary? headers = null); + Dictionary? namedParameters = null, Dictionary? headers = null, TimeSpan? timeout = null); /// /// Query data from InfluxDB IOx using FlightSQL. @@ -108,10 +111,13 @@ public interface IInfluxDBClient : IDisposable /// The headers to be added to query request. The headers specified here are preferred over /// the headers specified in the client configuration. /// + /// + /// The timeout to use for only this query request. + /// /// Batches of rows /// The client is already disposed IAsyncEnumerable QueryBatches(string query, QueryType? queryType = null, string? database = null, - Dictionary? namedParameters = null, Dictionary? headers = null); + Dictionary? namedParameters = null, Dictionary? headers = null, TimeSpan? timeout = null); /// /// Query data from InfluxDB IOx into PointData structure using FlightSQL. @@ -153,11 +159,14 @@ IAsyncEnumerable QueryBatches(string query, QueryType? queryType = /// The headers to be added to query request. The headers specified here are preferred over /// the headers specified in the client configuration. /// + /// + /// The timeout to use for only this query request. + /// /// Batches of rows /// The client is already disposed IAsyncEnumerable QueryPoints(string query, QueryType? queryType = null, string? database = null, Dictionary? namedParameters = null, - Dictionary? headers = null); + Dictionary? headers = null, TimeSpan? timeout = null); /// /// Write data to InfluxDB. @@ -186,33 +195,6 @@ IAsyncEnumerable QueryPoints(string query, QueryType? queryType Task WriteRecordAsync(string record, string? database = null, WritePrecision? precision = null, Dictionary? headers = null, CancellationToken cancellationToken = default); - /// - /// Write data to InfluxDB. - /// - /// - /// - /// The following example shows how to write multiple records with custom headers: - /// - /// - /// using var client = new InfluxDBClient(host: "http://localhost:8086", token: "my-token", organization: "my-org", database: "my-database"); - /// - /// await client.WriteRecordsAsync( - /// records: new[] { "stat,unit=temperature value=24.5", "stat,unit=temperature value=25.5" }, - /// headers: new Dictionary<string, string> { { "X-Tracing-Id", "123" } } - /// ); - /// - /// - /// Specifies the records in InfluxDB Line Protocol. The is considered as one batch unit. - /// The database to be used for InfluxDB operations. - /// The to use for the timestamp in the write API call. - /// - /// The headers to be added to write request. The headers specified here are preferred over - /// the headers specified in the client configuration. - /// - /// specifies the token to monitor for cancellation requests. - Task WriteRecordsAsync(IEnumerable records, string? database = null, WritePrecision? precision = null, - Dictionary? headers = null, CancellationToken cancellationToken = default); - /// /// Write data to InfluxDB. /// @@ -467,13 +449,16 @@ public InfluxDBClient() : this( /// The headers to be added to query request. The headers specified here are preferred over /// the headers specified in the client configuration. /// + /// + /// The timeout to use for only this query request. + /// /// Batches of rows /// The client is already disposed public async IAsyncEnumerable Query(string query, QueryType? queryType = null, string? database = null, Dictionary? namedParameters = null, - Dictionary? headers = null) + Dictionary? headers = null, TimeSpan? timeout = null) { - await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers) + await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers, timeout) .ConfigureAwait(false)) { for (var i = 0; i < batch.Column(0).Length; i++) @@ -537,13 +522,16 @@ public InfluxDBClient() : this( /// The headers to be added to query request. The headers specified here are preferred over /// the headers specified in the client configuration. /// + /// + /// The timeout to use for only this query request. + /// /// Batches of rows /// The client is already disposed public async IAsyncEnumerable QueryPoints(string query, QueryType? queryType = null, string? database = null, Dictionary? namedParameters = null, - Dictionary? headers = null) + Dictionary? headers = null, TimeSpan? timeout = null) { - await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers) + await foreach (var batch in QueryBatches(query, queryType, database, namedParameters, headers, timeout) .ConfigureAwait(false)) { for (var i = 0; i < batch.Column(0).Length; i++) @@ -595,11 +583,14 @@ public async IAsyncEnumerable QueryPoints(string query, QueryTy /// The headers to be added to query request. The headers specified here are preferred over /// the headers specified in the client configuration. /// + /// + /// The timeout to use for only this query request. + /// /// Batches of rows /// The client is already disposed public IAsyncEnumerable QueryBatches(string query, QueryType? queryType = null, string? database = null, Dictionary? namedParameters = null, - Dictionary? headers = null) + Dictionary? headers = null, TimeSpan? timeout = null) { if (_disposed) { @@ -610,7 +601,7 @@ public IAsyncEnumerable QueryBatches(string query, QueryType? query (database ?? _config.Database) ?? throw new InvalidOperationException(OptionMessage("database")), queryType ?? QueryType.SQL, namedParameters ?? new Dictionary(), - headers ?? new Dictionary()); + headers ?? new Dictionary(), timeout); } /// @@ -629,7 +620,7 @@ public IAsyncEnumerable QueryBatches(string query, QueryType? query /// ); /// /// - /// + /// /// Specifies the record in InfluxDB Line Protocol. The is considered as one batch unit. /// The database to be used for InfluxDB operations. /// The to use for the timestamp in the write API call. @@ -786,10 +777,20 @@ var databaseNotNull }; } + var cancelToken = new CancellationTokenSource(_config.Timeout).Token; // Just for compatibility with the old API + if (cancellationToken != CancellationToken.None) + { + cancelToken = cancellationToken; + } + else if (_config.WriteTimeout.HasValue) + { + cancelToken = new CancellationTokenSource(_config.WriteTimeout.Value).Token; + } + try { await _restClient - .Request(path, HttpMethod.Post, content, queryParams, headers, cancellationToken) + .Request(path, HttpMethod.Post, content, queryParams, headers, cancelToken) .ConfigureAwait(false); } catch (InfluxDBApiException e) @@ -938,10 +939,7 @@ private static HttpClient CreateHttpClient(ClientConfig config) handler.CheckCertificateRevocationList = false; } - var client = new HttpClient(handler) - { - Timeout = config.Timeout - }; + var client = new HttpClient(handler); client.DefaultRequestHeaders.UserAgent.ParseAdd(AssemblyHelper.GetUserAgent()); return client; diff --git a/Client/Internal/FlightSqlClient.cs b/Client/Internal/FlightSqlClient.cs index 52ae53e..eb7b682 100644 --- a/Client/Internal/FlightSqlClient.cs +++ b/Client/Internal/FlightSqlClient.cs @@ -25,7 +25,7 @@ internal interface IFlightSqlClient : IDisposable /// Execute the query and return the result as a sequence of record batches. /// internal IAsyncEnumerable Execute(string query, string database, QueryType queryType, - Dictionary namedParameters, Dictionary headers); + Dictionary namedParameters, Dictionary headers, TimeSpan? timeout = null); /// /// Prepare the FlightTicket for the query. @@ -77,7 +77,7 @@ internal FlightSqlClient(ClientConfig config, HttpClient httpClient) } async IAsyncEnumerable IFlightSqlClient.Execute(string query, string database, QueryType queryType, - Dictionary namedParameters, Dictionary headers) + Dictionary namedParameters, Dictionary headers, TimeSpan? timeout) { // // verify that values of namedParameters is supported type @@ -97,7 +97,21 @@ async IAsyncEnumerable IFlightSqlClient.Execute(string query, strin var ticket = ((IFlightSqlClient)this).PrepareFlightTicket(query, database, queryType, namedParameters); - using var stream = _flightClient.GetStream(ticket, metadata, _config.QueryOptions.Deadline); + DateTime? deadline = null; + if (timeout.HasValue) + { + deadline = DateTime.UtcNow.Add(timeout.Value); + } + else if (_config.QueryOptions.Deadline.HasValue) + { + deadline = _config.QueryOptions.Deadline.Value; + } + else if (_config.QueryTimeout.HasValue) + { + deadline = DateTime.UtcNow.Add(_config.QueryTimeout.Value); + } + + using var stream = _flightClient.GetStream(ticket, metadata, deadline); while (await stream.ResponseStream.MoveNext().ConfigureAwait(false)) { yield return stream.ResponseStream.Current;