diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index 606c74cf..ef1769c4 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,3 +1,4 @@ +- Feat Value: Add YdbList builder for List> (protobuf-based); works with IBulkUpsertImporter.AddListAsync. - Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation. - Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation. - Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods. diff --git a/src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs b/src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs index dc563257..92e171f4 100644 --- a/src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs +++ b/src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs @@ -31,39 +31,112 @@ internal BulkUpsertImporter( _cancellationToken = cancellationToken; } + /// + /// Add a single row to the current BulkUpsert batch. + /// + /// Values in the same order as the configured columns. + /// + /// Supported per-cell types: , . + /// Other CLR values are converted via . + /// Passing as a column value is not supported (tables do not accept list-typed columns). + /// Use AddListAsync(YdbList) to append many rows from a list parameter. + /// + /// Thrown when the number of values differs from the number of columns. + /// Thrown when a value cannot be mapped to a YDB type. + /// + /// + /// // columns: ["Id", "Name"] + /// await importer.AddRowAsync(1, "Alice"); + /// await importer.AddRowAsync(2, "Bob"); + /// + /// public async ValueTask AddRowAsync(params object[] values) { if (values.Length != _columns.Count) - throw new ArgumentException("Values count must match columns count", nameof(values)); + throw new ArgumentException("Values count must match columns count.", nameof(values)); var ydbValues = values.Select(v => v switch - { - YdbValue ydbValue => ydbValue.GetProto(), - YdbParameter param => param.TypedValue, - _ => new YdbParameter { Value = v }.TypedValue - } - ).ToArray(); + { + YdbValue ydbValue => ydbValue.GetProto(), + YdbParameter param => param.TypedValue, + YdbList => throw new ArgumentException( + "YdbList cannot be used as a column value. Use AddListAsync(YdbList) to append multiple rows.", + nameof(values)), + _ => new YdbParameter { Value = v }.TypedValue + }).ToArray(); var protoStruct = new Ydb.Value(); - foreach (var value in ydbValues) protoStruct.Items.Add(value.Value); + foreach (var tv in ydbValues) + protoStruct.Items.Add(tv.Value); var rowSize = protoStruct.CalculateSize(); if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0) - { - await FlushAsync(); - } + await FlushAsync().ConfigureAwait(false); _rows.Add(protoStruct); _currentBytes += rowSize; _structType ??= new StructType - { Members = { _columns.Select((col, i) => new StructMember { Name = col, Type = ydbValues[i].Type }) } }; + { + Members = { _columns.Select((col, i) => new StructMember { Name = col, Type = ydbValues[i].Type }) } + }; + } + + /// + /// Add multiple rows from a shaped as List<Struct<...>>. + /// Struct member names and order must exactly match the configured columns. + /// + /// Rows as List<Struct<...>> with the exact column names and order. + /// + /// Thrown when the struct column set, order, or count does not match the importer’s columns. + /// + public async ValueTask AddListAsync(YdbList list) + { + var tv = list.ToTypedValue(); + + var incomingStruct = tv.Type.ListType.Item.StructType; + + if (incomingStruct.Members.Count != _columns.Count) + throw new ArgumentException( + $"The number of columns in the List ({incomingStruct.Members.Count}) " + + $"does not match the expected ({_columns.Count})."); + + for (var i = 0; i < _columns.Count; i++) + { + var expected = _columns[i]; + var actual = incomingStruct.Members[i].Name; + if (!string.Equals(expected, actual, StringComparison.Ordinal)) + throw new ArgumentException( + $"Column name mismatch at position {i}: expected '{expected}', received '{actual}'."); + } + + _structType ??= incomingStruct; + + foreach (var rowValue in tv.Value.Items) + { + var rowSize = rowValue.CalculateSize(); + + if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0) + await FlushAsync().ConfigureAwait(false); + + _rows.Add(rowValue); + _currentBytes += rowSize; + } } + /// + /// Flush the current batch via BulkUpsert. No-op if the batch is empty. + /// + /// + /// Uses the collected struct schema from the first added row (or the provided list) and sends + /// the accumulated rows in a single BulkUpsert request. + /// public async ValueTask FlushAsync() { - if (_rows.Count == 0) return; + if (_rows.Count == 0) + return; + if (_structType == null) throw new InvalidOperationException("structType is undefined"); diff --git a/src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs b/src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs index fa687609..9e66ddb0 100644 --- a/src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs +++ b/src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs @@ -1,8 +1,31 @@ +using Ydb.Sdk.Value; + namespace Ydb.Sdk.Ado.BulkUpsert; +/// +/// Bulk upsert importer API: add rows and flush them to YDB in batches. +/// public interface IBulkUpsertImporter { + /// + /// Add a single row to the batch. Values must match the importer’s column order. + /// + /// Values in the same order as the configured columns. + /// Thrown when the number of values differs from the number of columns. ValueTask AddRowAsync(params object[] row); + /// + /// Add multiple rows from a shaped as List<Struct<...>>. + /// Struct member names and order must exactly match the configured columns. + /// + /// Rows as List<Struct<...>> with the exact column names and order. + /// + /// Thrown when the struct column set, order, or count does not match the importer’s columns. + /// + ValueTask AddListAsync(YdbList list); + + /// + /// Flush the current batch via BulkUpsert. No-op if the batch is empty. + /// ValueTask FlushAsync(); } diff --git a/src/Ydb.Sdk/src/Ado/YdbParameter.cs b/src/Ydb.Sdk/src/Ado/YdbParameter.cs index 2d4ac898..f7ebba8c 100644 --- a/src/Ydb.Sdk/src/Ado/YdbParameter.cs +++ b/src/Ydb.Sdk/src/Ado/YdbParameter.cs @@ -256,6 +256,7 @@ internal TypedValue TypedValue private TypedValue Cast(object value) => value switch { + YdbList ydbList => ydbList.ToTypedValue(), string stringValue => stringValue.Text(), bool boolValue => boolValue.Bool(), sbyte sbyteValue => sbyteValue.Int8(), @@ -285,9 +286,7 @@ private TypedValue Decimal(decimal value) => private TypedValue NullTypedValue() { if (YdbNullByDbType.TryGetValue(YdbDbType, out var value)) - { return value; - } if (YdbDbType == YdbDbType.Decimal) { diff --git a/src/Ydb.Sdk/src/Value/YdbList.cs b/src/Ydb.Sdk/src/Value/YdbList.cs new file mode 100644 index 00000000..2d93ae3f --- /dev/null +++ b/src/Ydb.Sdk/src/Value/YdbList.cs @@ -0,0 +1,157 @@ +using Google.Protobuf.WellKnownTypes; +using Ydb.Sdk.Ado; +using Ydb.Sdk.Ado.YdbType; + +namespace Ydb.Sdk.Value; + +/// +/// Builder for YDB values shaped as List<Struct<...>>, working directly with protobuf. +/// Each call to converts input values into protobuf cells and stores the row. +/// The struct schema is derived from explicit type hints or from the first non-null sample per column. +/// If a column contains at least one null, its member type becomes Optional<T>. +/// +public sealed class YdbList +{ + private readonly string[] _columns; + private readonly YdbDbType[] _typeHints; + + private readonly List _rows = new(); + + private readonly Type?[] _observedBaseTypes; + private readonly bool[] _sawNull; + + /// + /// Create a struct-mode list with column names. Types will be inferred from the + /// first non-null value per column (columns with any nulls become Optional<T>). + /// + /// Struct member names, in order. + public static YdbList Struct(params string[] columns) => new(columns); + + /// + /// Create a struct-mode list with column names and explicit YDB type hints. The array length must match . + /// Columns that contain null are emitted as Optional<hintedType>. + /// + /// Struct member names, in order. + /// YDB type hints for each column (same length as ). + public static YdbList Struct(string[] columns, YdbDbType[] types) => new(columns, types); + + /// + /// Construct a struct-mode list. If is null, schema is inferred from values. + /// + private YdbList(string[] columns, YdbDbType[]? types = null) + { + if (columns is null || columns.Length == 0) + throw new ArgumentException("Columns must be non-empty.", nameof(columns)); + if (types is not null && types.Length != columns.Length) + throw new ArgumentException("Length of 'types' must match length of 'columns'.", nameof(types)); + + _columns = columns; + _typeHints = types ?? Enumerable.Repeat(YdbDbType.Unspecified, columns.Length).ToArray(); + _observedBaseTypes = new Type[_columns.Length]; + _sawNull = new bool[_columns.Length]; + } + + /// + /// Add a single positional row. The number of values must match the number of columns. + /// Values are converted to protobuf cells and the row is stored immediately. + /// + /// Row values in the same order as the declared columns. + /// Thrown when the number of values differs from the number of columns. + public YdbList AddRow(params object?[] values) + { + if (values.Length != _columns.Length) + throw new ArgumentException($"Expected {_columns.Length} values, got {values.Length}."); + + var cells = new List(_columns.Length); + + for (var i = 0; i < _columns.Length; i++) + { + var v = values[i]; + + if (v is null || v == DBNull.Value) + { + _sawNull[i] = true; + cells.Add(new Ydb.Value { NullFlagValue = NullValue.NullValue }); + continue; + } + + var tv = v switch + { + YdbValue yv => yv.GetProto(), + YdbParameter p => p.TypedValue, + _ => new YdbParameter { Value = v }.TypedValue + }; + + var t = tv.Type; + if (t.TypeCase == Type.TypeOneofCase.OptionalType && t.OptionalType is not null) + t = t.OptionalType.Item; + + _observedBaseTypes[i] ??= t; + cells.Add(tv.Value); + } + + _rows.Add(new Ydb.Value { Items = { cells } }); + return this; + } + + /// + /// Convert the accumulated rows into a YDB with shape List<Struct<...>>. + /// Columns that observed null values are emitted as Optional<T>. + /// + /// TypedValue representing List<Struct<...>> and the collected data rows. + /// + /// Thrown when the schema cannot be inferred (e.g., empty list without type hints, or a column has only nulls + /// and no explicit type hint). + /// + internal TypedValue ToTypedValue() + { + if (_rows.Count == 0) + throw new InvalidOperationException( + "Cannot infer Struct schema from an empty list without explicit YdbDbType hints."); + + var n = _columns.Length; + var memberTypes = new Type[n]; + + for (var i = 0; i < n; i++) + { + Type? baseType; + + if (_typeHints[i] != YdbDbType.Unspecified) + { + baseType = new YdbParameter { YdbDbType = _typeHints[i], Value = DBNull.Value }.TypedValue.Type; + + if (baseType.TypeCase == Type.TypeOneofCase.OptionalType && baseType.OptionalType is not null) + baseType = baseType.OptionalType.Item; + } + else + { + baseType = _observedBaseTypes[i]; + if (baseType is null) + throw new InvalidOperationException( + $"Column '{_columns[i]}' has only nulls and no explicit YdbDbType. Provide a type hint."); + } + + memberTypes[i] = _sawNull[i] && baseType.TypeCase != Type.TypeOneofCase.OptionalType + ? new Type { OptionalType = new OptionalType { Item = baseType } } + : baseType; + } + + var structType = new StructType + { + Members = + { + _columns.Select((name, idx) => new StructMember + { + Name = name, + Type = memberTypes[idx] + }) + } + }; + + return new TypedValue + { + Type = new Type { ListType = new ListType { Item = new Type { StructType = structType } } }, + Value = new Ydb.Value { Items = { _rows } } + }; + } +} diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Value/YdbListTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Value/YdbListTests.cs new file mode 100644 index 00000000..d4a4c0b5 --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Value/YdbListTests.cs @@ -0,0 +1,295 @@ +using Xunit; +using Ydb.Sdk.Ado.YdbType; +using Ydb.Sdk.Value; + +namespace Ydb.Sdk.Ado.Tests.Value; + +public class YdbListTests : TestBase +{ + private static async Task WithTempTableAsync( + YdbConnection conn, + string namePrefix, + string columns, + Func body) + { + var table = $"{namePrefix}_{Guid.NewGuid():N}"; + var createSql = $"CREATE TABLE {table} (\n{columns}\n)"; + var dropSql = $"DROP TABLE {table}"; + + await using (var create = conn.CreateCommand()) + { + create.CommandText = createSql; + await create.ExecuteNonQueryAsync(); + } + + try + { + await body(table); + } + finally + { + await using var drop = conn.CreateCommand(); + drop.CommandText = dropSql; + await drop.ExecuteNonQueryAsync(); + } + } + + private static async Task ExecAsTableAsync( + YdbConnection conn, + string verb, + string table, + string paramName, + YdbList rows) + { + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"{verb} INTO {table}\nSELECT * FROM AS_TABLE({paramName});"; + cmd.Parameters.Add(new YdbParameter(paramName, rows)); + await cmd.ExecuteNonQueryAsync(); + } + + private static async Task CountAsync(YdbConnection conn, string table) + { + await using var check = conn.CreateCommand(); + check.CommandText = $"SELECT COUNT(*) FROM {table}"; + return Convert.ToInt32(await check.ExecuteScalarAsync()); + } + + [Fact] + public async Task Insert_With_YdbList_Works() + { + await using var conn = await CreateOpenConnectionAsync(); + await WithTempTableAsync(conn, "ydb_list_insert", + """ + Id Int64, + Value Text, + PRIMARY KEY (Id) + """, + async table => + { + var rows = YdbList.Struct("Id", "Value") + .AddRow(1L, "a") + .AddRow(2L, "b"); + + await ExecAsTableAsync(conn, "INSERT", table, "$rows", rows); + + Assert.Equal(2, await CountAsync(conn, table)); + }); + } + + [Fact] + public async Task YdbList_WhenUpsertOperation_InsertAndUpdate() + { + await using var conn = await CreateOpenConnectionAsync(); + await WithTempTableAsync(conn, "ydb_list_upsert", + """ + Id Int64, + Value Text, + PRIMARY KEY (Id) + """, + async table => + { + await using (var seed = conn.CreateCommand()) + { + seed.CommandText = $"INSERT INTO {table} (Id, Value) VALUES (1, 'old')"; + await seed.ExecuteNonQueryAsync(); + } + + var rows = YdbList.Struct("Id", "Value") + .AddRow(1L, "new") + .AddRow(2L, "two"); + + await ExecAsTableAsync(conn, "UPSERT", table, "$rows", rows); + + await using (var check = conn.CreateCommand()) + { + check.CommandText = $"SELECT Value FROM {table} WHERE Id=1"; + Assert.Equal("new", (string)(await check.ExecuteScalarAsync())!); + + check.CommandText = $"SELECT Value FROM {table} WHERE Id=2"; + Assert.Equal("two", (string)(await check.ExecuteScalarAsync())!); + } + }); + } + + [Fact] + public async Task UpdateOn_With_YdbList_ChangesValues() + { + await using var conn = await CreateOpenConnectionAsync(); + await WithTempTableAsync(conn, "ydb_list_update_on", + """ + Id Int64, + Value Text, + PRIMARY KEY (Id) + """, + async table => + { + await using (var seed = conn.CreateCommand()) + { + seed.CommandText = $"INSERT INTO {table} (Id, Value) VALUES (1,'a'),(2,'b')"; + await seed.ExecuteNonQueryAsync(); + } + + var toUpdate = YdbList.Struct("Id", "Value") + .AddRow(1L, "x") + .AddRow(2L, "y"); + + await using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = $""" + UPDATE {table} ON + SELECT * FROM AS_TABLE($to_update); + """; + cmd.Parameters.Add(new YdbParameter("$to_update", toUpdate)); + await cmd.ExecuteNonQueryAsync(); + } + + await using (var check = conn.CreateCommand()) + { + check.CommandText = $"SELECT Value FROM {table} WHERE Id=1"; + Assert.Equal("x", (string)(await check.ExecuteScalarAsync())!); + + check.CommandText = $"SELECT Value FROM {table} WHERE Id=2"; + Assert.Equal("y", (string)(await check.ExecuteScalarAsync())!); + } + }); + } + + [Fact] + public async Task DeleteOn_With_YdbList_RemovesRows() + { + await using var conn = await CreateOpenConnectionAsync(); + await WithTempTableAsync(conn, "ydb_list_delete_on", + """ + Id Int64, + Value Text, + PRIMARY KEY (Id) + """, + async table => + { + await using (var seed = conn.CreateCommand()) + { + seed.CommandText = $"INSERT INTO {table} (Id, Value) VALUES (1,'a'),(2,'b'),(3,'c')"; + await seed.ExecuteNonQueryAsync(); + } + + var toDelete = YdbList.Struct("Id") + .AddRow(1L) + .AddRow(3L); + + await using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = $""" + DELETE FROM {table} ON + SELECT * FROM AS_TABLE($to_delete); + """; + cmd.Parameters.Add(new YdbParameter("$to_delete", toDelete)); + await cmd.ExecuteNonQueryAsync(); + } + + Assert.Equal(1, await CountAsync(conn, table)); + + await using (var check = conn.CreateCommand()) + { + check.CommandText = $"SELECT Value FROM {table} WHERE Id=2"; + Assert.Equal("b", (string)(await check.ExecuteScalarAsync())!); + } + }); + } + + [Fact] + public async Task Insert_With_OptionalText_And_Inference_NullThenNonNull() + { + await using var conn = await CreateOpenConnectionAsync(); + await WithTempTableAsync(conn, "ydb_list_nulls", + """ + Id Int64, + Name Text?, + PRIMARY KEY (Id) + """, + async table => + { + var rows1 = YdbList.Struct( + ["Id", "Name"], + [YdbDbType.Int64, YdbDbType.Text]) + .AddRow(1L, "A") + .AddRow(2L, null); + + await ExecAsTableAsync(conn, "INSERT", table, "$rows", rows1); + + var rows2 = YdbList.Struct( + ["Id", "Name"], + [YdbDbType.Int64, YdbDbType.Text]) + .AddRow(3L, null) + .AddRow(4L, "B"); + + await ExecAsTableAsync(conn, "INSERT", table, "$rows", rows2); + + await using (var check = conn.CreateCommand()) + { + check.CommandText = $"SELECT Name IS NULL FROM {table} WHERE Id=2"; + Assert.True((bool)(await check.ExecuteScalarAsync())!); + + check.CommandText = $"SELECT Name IS NULL FROM {table} WHERE Id=3"; + Assert.True((bool)(await check.ExecuteScalarAsync())!); + + check.CommandText = $"SELECT Name FROM {table} WHERE Id=4"; + Assert.Equal("B", (string)(await check.ExecuteScalarAsync())!); + } + }); + } + + [Fact] + public async Task Bulk_Load_With_List_Mode_Sanity() + { + const int n = 5_000; + await using var conn = await CreateOpenConnectionAsync(); + await WithTempTableAsync(conn, "ydb_list_load", + """ + Id Int64, + Name Text, + PRIMARY KEY (Id) + """, + async table => + { + for (var offset = 0; offset < n; offset += 1000) + { + var rows = YdbList.Struct("Id", "Name"); + for (var i = offset; i < Math.Min(n, offset + 1000); i++) + rows.AddRow((long)i, $"v{i}"); + + await ExecAsTableAsync(conn, "UPSERT", table, "$rows", rows); + } + + Assert.Equal(n, await CountAsync(conn, table)); + }); + } + + [Fact] + public async Task YdbList_WhenAnyRowHasNull_InsertsIntoOptionalColumn() + { + await using var conn = await CreateOpenConnectionAsync(); + await WithTempTableAsync(conn, "ydb_list_optional", + """ + Id Int64, + Name Text?, + PRIMARY KEY (Id) + """, + async table => + { + var rows = YdbList.Struct("Id", "Name") + .AddRow(1L, "X") + .AddRow(2L, null); + + await ExecAsTableAsync(conn, "UPSERT", table, "$rows", rows); + + await using (var check = conn.CreateCommand()) + { + check.CommandText = $"SELECT Name FROM {table} WHERE Id=1"; + Assert.Equal("X", (string)(await check.ExecuteScalarAsync())!); + + check.CommandText = $"SELECT Name IS NULL FROM {table} WHERE Id=2"; + Assert.True((bool)(await check.ExecuteScalarAsync())!); + } + }); + } +} diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs index 3cf2d07a..78201f67 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs @@ -2,6 +2,7 @@ using Xunit; using Ydb.Sdk.Ado.Tests.Utils; using Ydb.Sdk.Ado.YdbType; +using Ydb.Sdk.Value; namespace Ydb.Sdk.Ado.Tests; @@ -40,7 +41,7 @@ public async Task TlsSettings_WhenUseGrpcs_ReturnValidConnection() { await using var ydbConnection = new YdbConnection(_connectionStringTls); await ydbConnection.OpenAsync(); - var command = ydbConnection.CreateCommand(); + await using var command = ydbConnection.CreateCommand(); command.CommandText = Tables.CreateTables; await command.ExecuteNonQueryAsync(); command.CommandText = Tables.UpsertData; @@ -68,7 +69,7 @@ public async Task SetConnectionString_WhenConnectionIsOpen_ThrowException() [Fact] public async Task BeginTransaction_WhenConnectionIsClosed_ThrowException() { - var ydbConnection = await CreateOpenConnectionAsync(); + await using var ydbConnection = await CreateOpenConnectionAsync(); await ydbConnection.CloseAsync(); Assert.Equal("Connection is closed", Assert.Throws(() => ydbConnection.BeginTransaction()).Message); @@ -77,10 +78,10 @@ public async Task BeginTransaction_WhenConnectionIsClosed_ThrowException() [Fact] public async Task ExecuteScalar_WhenConnectionIsClosed_ThrowException() { - var ydbConnection = await CreateOpenConnectionAsync(); + await using var ydbConnection = await CreateOpenConnectionAsync(); await ydbConnection.CloseAsync(); - var ydbCommand = ydbConnection.CreateCommand(); + await using var ydbCommand = ydbConnection.CreateCommand(); ydbCommand.CommandText = "SELECT 1"; Assert.Equal("Connection is closed", @@ -90,11 +91,11 @@ public async Task ExecuteScalar_WhenConnectionIsClosed_ThrowException() [Fact] public async Task ClosedYdbDataReader_WhenConnectionIsClosed_ThrowException() { - var ydbConnection = await CreateOpenConnectionAsync(); + await using var ydbConnection = await CreateOpenConnectionAsync(); - var ydbCommand = ydbConnection.CreateCommand(); + await using var ydbCommand = ydbConnection.CreateCommand(); ydbCommand.CommandText = "SELECT 1; SELECT 2; SELECT 3;"; - var reader = await ydbCommand.ExecuteReaderAsync(); + await using var reader = await ydbCommand.ExecuteReaderAsync(); await reader.ReadAsync(); Assert.Equal(1, reader.GetInt32(0)); @@ -107,80 +108,85 @@ public async Task ClosedYdbDataReader_WhenConnectionIsClosed_ThrowException() public async Task SetNulls_WhenTableAllTypes_SussesSet() { await using var ydbConnection = await CreateOpenConnectionAsync(); - var ydbCommand = ydbConnection.CreateCommand(); + await using var ydbCommand = ydbConnection.CreateCommand(); var tableName = "AllTypes_" + Random.Shared.Next(); - ydbCommand.CommandText = $""" - CREATE TABLE {tableName} ( - id INT32, - bool_column BOOL, - bigint_column INT64, - smallint_column INT16, - tinyint_column INT8, - float_column FLOAT, - double_column DOUBLE, - decimal_column DECIMAL(22,9), - uint8_column UINT8, - uint16_column UINT16, - uint32_column UINT32, - uint64_column UINT64, - text_column TEXT, - binary_column BYTES, - json_column JSON, - jsondocument_column JSONDOCUMENT, - date_column DATE, - datetime_column DATETIME, - timestamp_column TIMESTAMP, - interval_column INTERVAL, - PRIMARY KEY (id) - ) - """; - await ydbCommand.ExecuteNonQueryAsync(); - ydbCommand.CommandText = - $""" - INSERT INTO {tableName} (id, bool_column, bigint_column, smallint_column, tinyint_column, float_column, - double_column, decimal_column, uint8_column, uint16_column, uint32_column, uint64_column, text_column, - binary_column, json_column, jsondocument_column, date_column, datetime_column, timestamp_column, - interval_column) VALUES (@name1, @name2, @name3, @name4, @name5, @name6, @name7, @name8, @name9, @name10, - @name11, @name12, @name13, @name14, @name15, @name16, @name17, @name18, @name19, @name20); - """; - - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name1", DbType = DbType.Int32, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name2", DbType = DbType.Boolean, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name3", DbType = DbType.Int64, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name4", DbType = DbType.Int16, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name5", DbType = DbType.SByte, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name6", DbType = DbType.Single, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name7", DbType = DbType.Double, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name8", DbType = DbType.Decimal, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name9", DbType = DbType.Byte, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name10", DbType = DbType.UInt16, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name11", DbType = DbType.UInt32, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name12", DbType = DbType.UInt64, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name13", DbType = DbType.String, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name14", DbType = DbType.Binary, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name15", YdbDbType = YdbDbType.Json }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name16", YdbDbType = YdbDbType.JsonDocument }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name17", DbType = DbType.Date, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter - { ParameterName = "name18", DbType = DbType.DateTime, Value = null }); - ydbCommand.Parameters.Add( - new YdbParameter { ParameterName = "name19", DbType = DbType.DateTime2, Value = null }); - ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name20", YdbDbType = YdbDbType.Interval }); - - await ydbCommand.ExecuteNonQueryAsync(); - ydbCommand.CommandText = $"SELECT NULL, t.* FROM {tableName} t"; - var ydbDataReader = await ydbCommand.ExecuteReaderAsync(); - Assert.True(await ydbDataReader.ReadAsync()); - for (var i = 0; i < 21; i++) + try { - Assert.True(ydbDataReader.IsDBNull(i)); - } - - Assert.False(await ydbDataReader.ReadAsync()); + ydbCommand.CommandText = @$" +CREATE TABLE {tableName} ( + id INT32, + bool_column BOOL, + bigint_column INT64, + smallint_column INT16, + tinyint_column INT8, + float_column FLOAT, + double_column DOUBLE, + decimal_column DECIMAL(22,9), + uint8_column UINT8, + uint16_column UINT16, + uint32_column UINT32, + uint64_column UINT64, + text_column TEXT, + binary_column BYTES, + json_column JSON, + jsondocument_column JSONDOCUMENT, + date_column DATE, + datetime_column DATETIME, + timestamp_column TIMESTAMP, + interval_column INTERVAL, + PRIMARY KEY (id) +) +"; + await ydbCommand.ExecuteNonQueryAsync(); + ydbCommand.CommandText = @$" +INSERT INTO {tableName} + (id, bool_column, bigint_column, smallint_column, tinyint_column, float_column, double_column, decimal_column, + uint8_column, uint16_column, uint32_column, uint64_column, text_column, binary_column, json_column, + jsondocument_column, date_column, datetime_column, timestamp_column, interval_column) VALUES +(@name1, @name2, @name3, @name4, @name5, @name6, @name7, @name8, @name9, @name10, @name11, @name12, @name13, @name14, + @name15, @name16, @name17, @name18, @name19, @name20); +"; + + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name1", DbType = DbType.Int32, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name2", DbType = DbType.Boolean, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name3", DbType = DbType.Int64, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name4", DbType = DbType.Int16, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name5", DbType = DbType.SByte, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name6", DbType = DbType.Single, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name7", DbType = DbType.Double, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name8", DbType = DbType.Decimal, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name9", DbType = DbType.Byte, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name10", DbType = DbType.UInt16, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name11", DbType = DbType.UInt32, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name12", DbType = DbType.UInt64, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name13", DbType = DbType.String, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name14", DbType = DbType.Binary, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name15", YdbDbType = YdbDbType.Json }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name16", YdbDbType = YdbDbType.JsonDocument }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name17", DbType = DbType.Date, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter + { ParameterName = "name18", DbType = DbType.DateTime, Value = null }); + ydbCommand.Parameters.Add( + new YdbParameter { ParameterName = "name19", DbType = DbType.DateTime2, Value = null }); + ydbCommand.Parameters.Add(new YdbParameter { ParameterName = "name20", YdbDbType = YdbDbType.Interval }); + + await ydbCommand.ExecuteNonQueryAsync(); + ydbCommand.CommandText = $"SELECT NULL, t.* FROM {tableName} t"; + await using var ydbDataReader = await ydbCommand.ExecuteReaderAsync(); + Assert.True(await ydbDataReader.ReadAsync()); + for (var i = 0; i < 21; i++) + { + Assert.True(ydbDataReader.IsDBNull(i)); + } - ydbCommand.CommandText = $"DROP TABLE {tableName}"; - await ydbCommand.ExecuteNonQueryAsync(); + Assert.False(await ydbDataReader.ReadAsync()); + } + finally + { + ydbCommand.CommandText = $"DROP TABLE {tableName}"; + await ydbCommand.ExecuteNonQueryAsync(); + } } [Fact] @@ -198,7 +204,7 @@ public async Task OpenAsync_WhenCancelTokenIsCanceled_ThrowYdbException() await using var connection = CreateConnection(); connection.ConnectionString = ConnectionString + ";MinSessionPool=1"; using var cts = new CancellationTokenSource(); - await cts.CancelAsync(); + cts.Cancel(); await Assert.ThrowsAnyAsync(async () => await connection.OpenAsync(cts.Token)); Assert.Equal(ConnectionState.Closed, connection.State); } @@ -207,36 +213,39 @@ public async Task OpenAsync_WhenCancelTokenIsCanceled_ThrowYdbException() public async Task YdbDataReader_WhenCancelTokenIsCanceled_ThrowYdbException() { await using var connection = await CreateOpenConnectionAsync(); - var command = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1; SELECT 1;" }; - var ydbDataReader = await command.ExecuteReaderAsync(); - using var cts = new CancellationTokenSource(); - await cts.CancelAsync(); - - await ydbDataReader.ReadAsync(cts.Token); // first part in memory - Assert.False(ydbDataReader.IsClosed); - Assert.Equal(1, ydbDataReader.GetValue(0)); - Assert.Equal(ConnectionState.Open, connection.State); - Assert.Equal(StatusCode.ClientTransportTimeout, - (await Assert.ThrowsAsync(async () => await ydbDataReader.NextResultAsync(cts.Token))).Code); - Assert.True(ydbDataReader.IsClosed); - Assert.Equal(ConnectionState.Broken, connection.State); - // CLOSE OLD CONNECTION! (return to pool) - await connection.CloseAsync(); - // ReSharper disable once MethodSupportsCancellation + await using var command = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1; SELECT 1;" }; + await using (var ydbDataReader = await command.ExecuteReaderAsync()) + { + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + await ydbDataReader.ReadAsync(cts.Token); // first part in memory + Assert.False(ydbDataReader.IsClosed); + Assert.Equal(1, ydbDataReader.GetValue(0)); + Assert.Equal(ConnectionState.Open, connection.State); + + var nextTask = ydbDataReader.NextResultAsync(cts.Token); + cts.Cancel(); + var ex = await Assert.ThrowsAsync(async () => await nextTask); + Assert.Equal(StatusCode.ClientTransportTimeout, ex.Code); + Assert.True(ydbDataReader.IsClosed); + Assert.Equal(ConnectionState.Broken, connection.State); + } await connection.OpenAsync(); - // ReSharper disable once MethodSupportsCancellation - ydbDataReader = await command.ExecuteReaderAsync(); - // ReSharper disable once MethodSupportsCancellation - await ydbDataReader.NextResultAsync(); - await ydbDataReader.ReadAsync(cts.Token); - Assert.False(ydbDataReader.IsClosed); - Assert.Equal(1, ydbDataReader.GetValue(0)); - Assert.False(ydbDataReader.IsClosed); - - Assert.Equal(StatusCode.ClientTransportTimeout, - (await Assert.ThrowsAsync(async () => await ydbDataReader.NextResultAsync(cts.Token))).Code); - Assert.True(ydbDataReader.IsClosed); + await using var ydbDataReader2 = await command.ExecuteReaderAsync(); + await ydbDataReader2.NextResultAsync(); + using var cts2 = new CancellationTokenSource(); + await ydbDataReader2.ReadAsync(cts2.Token); + Assert.False(ydbDataReader2.IsClosed); + Assert.Equal(1, ydbDataReader2.GetValue(0)); + Assert.False(ydbDataReader2.IsClosed); + + var nextTask2 = ydbDataReader2.NextResultAsync(cts2.Token); + cts2.Cancel(); + var ex2 = await Assert.ThrowsAsync(async () => await nextTask2); + Assert.Equal(StatusCode.ClientTransportTimeout, ex2.Code); + Assert.True(ydbDataReader2.IsClosed); Assert.Equal(ConnectionState.Broken, connection.State); } @@ -244,7 +253,7 @@ public async Task YdbDataReader_WhenCancelTokenIsCanceled_ThrowYdbException() public async Task ExecuteMethods_WhenCancelTokenIsCanceled_ConnectionIsBroken() { await using var connection = await CreateOpenConnectionAsync(); - var command = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1; SELECT 1;" }; + await using var command = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1; SELECT 1;" }; using var cts = new CancellationTokenSource(); await cts.CancelAsync(); @@ -265,17 +274,16 @@ await Assert.ThrowsAnyAsync(async () => public async Task ExecuteReaderAsync_WhenExecutedYdbDataReaderThenCancelTokenIsCanceled_ReturnValues() { await using var connection = await CreateOpenConnectionAsync(); - var ydbCommand = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1; " }; + await using var ydbCommand = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1; " }; var cts = new CancellationTokenSource(); - var ydbDataReader = await ydbCommand.ExecuteReaderAsync(cts.Token); + await using var ydbDataReader = await ydbCommand.ExecuteReaderAsync(cts.Token); await ydbDataReader.ReadAsync(cts.Token); Assert.Equal(1, ydbDataReader.GetValue(0)); Assert.True(await ydbDataReader.NextResultAsync(cts.Token)); - await cts.CancelAsync(); + cts.Cancel(); await ydbDataReader.ReadAsync(cts.Token); Assert.Equal(1, ydbDataReader.GetValue(0)); - // ReSharper disable once MethodSupportsCancellation Assert.False(await ydbDataReader.NextResultAsync()); } @@ -295,7 +303,7 @@ private List GenerateTasks(string connectionString) => Enumerable.Range(0, } await using var connection = ydbConnection; - var command = connection.CreateCommand(); + using var command = connection.CreateCommand(); command.CommandText = "SELECT " + i; var scalar = (int)(await command.ExecuteScalarAsync())!; Assert.Equal(i, scalar); @@ -307,10 +315,10 @@ public async Task BulkUpsertImporter_HappyPath_Add_Flush() { var tableName = $"BulkImporter_{Guid.NewGuid():N}"; - await using var ydbConnection = await CreateOpenConnectionAsync(); + await using var conn = await CreateOpenConnectionAsync(); try { - await using (var createCmd = ydbConnection.CreateCommand()) + await using (var createCmd = conn.CreateCommand()) { createCmd.CommandText = $""" CREATE TABLE {tableName} ( @@ -324,25 +332,24 @@ PRIMARY KEY (Id) var columns = new[] { "Id", "Name" }; - var importer = ydbConnection.BeginBulkUpsertImport(tableName, columns); - - await importer.AddRowAsync(1, "Alice"); - await importer.AddRowAsync(2, "Bob"); - await importer.FlushAsync(); + var importer1 = conn.BeginBulkUpsertImport(tableName, columns); + await importer1.AddRowAsync(1, "Alice"); + await importer1.AddRowAsync(2, "Bob"); + await importer1.FlushAsync(); - await using (var checkCmd = ydbConnection.CreateCommand()) + await using (var checkCmd = conn.CreateCommand()) { checkCmd.CommandText = $"SELECT COUNT(*) FROM {tableName}"; var count = Convert.ToInt32(await checkCmd.ExecuteScalarAsync()); Assert.Equal(2, count); } - importer = ydbConnection.BeginBulkUpsertImport(tableName, columns); - await importer.AddRowAsync(3, "Charlie"); - await importer.AddRowAsync(4, "Diana"); - await importer.FlushAsync(); + var importer2 = conn.BeginBulkUpsertImport(tableName, columns); + await importer2.AddRowAsync(3, "Charlie"); + await importer2.AddRowAsync(4, "Diana"); + await importer2.FlushAsync(); - await using (var checkCmd = ydbConnection.CreateCommand()) + await using (var checkCmd = conn.CreateCommand()) { checkCmd.CommandText = $"SELECT Name FROM {tableName} ORDER BY Id"; var names = new List(); @@ -357,7 +364,7 @@ PRIMARY KEY (Id) } finally { - await using var dropCmd = ydbConnection.CreateCommand(); + await using var dropCmd = conn.CreateCommand(); dropCmd.CommandText = $"DROP TABLE {tableName}"; await dropCmd.ExecuteNonQueryAsync(); } @@ -367,25 +374,31 @@ PRIMARY KEY (Id) public async Task BulkUpsertImporter_ThrowsOnInvalidRowCount() { var tableName = $"BulkImporter_{Guid.NewGuid():N}"; - await using var ydbConnection = await CreateOpenConnectionAsync(); + await using var conn = await CreateOpenConnectionAsync(); try { - await using (var createCmd = ydbConnection.CreateCommand()) + await using (var createCmd = conn.CreateCommand()) { - createCmd.CommandText = $"CREATE TABLE {tableName} (Id Int32, Name Utf8, PRIMARY KEY (Id))"; + createCmd.CommandText = $""" + CREATE TABLE {tableName} ( + Id Int32, + Name Text, + PRIMARY KEY (Id) + ) + """; await createCmd.ExecuteNonQueryAsync(); } var columns = new[] { "Id", "Name" }; - var importer = ydbConnection.BeginBulkUpsertImport(tableName, columns); + var importer = conn.BeginBulkUpsertImport(tableName, columns); await Assert.ThrowsAsync(async () => await importer.AddRowAsync(1)); await Assert.ThrowsAsync(async () => await importer.AddRowAsync(2)); } finally { - await using var dropCmd = ydbConnection.CreateCommand(); + await using var dropCmd = conn.CreateCommand(); dropCmd.CommandText = $"DROP TABLE {tableName}"; await dropCmd.ExecuteNonQueryAsync(); } @@ -397,16 +410,16 @@ public async Task BulkUpsertImporter_MultipleImporters_Parallel() var table1 = $"BulkImporter_{Guid.NewGuid():N}_1"; var table2 = $"BulkImporter_{Guid.NewGuid():N}_2"; - await using var ydbConnection = await CreateOpenConnectionAsync(); + var conn = await CreateOpenConnectionAsync(); try { foreach (var table in new[] { table1, table2 }) { - await using var createCmd = ydbConnection.CreateCommand(); + await using var createCmd = conn.CreateCommand(); createCmd.CommandText = $""" CREATE TABLE {table} ( Id Int32, - Name Utf8, + Name Text, PRIMARY KEY (Id) ) """; @@ -418,8 +431,7 @@ PRIMARY KEY (Id) await Task.WhenAll( Task.Run(async () => { - // ReSharper disable once AccessToDisposedClosure - var importer = ydbConnection.BeginBulkUpsertImport(table1, columns); + var importer = conn.BeginBulkUpsertImport(table1, columns); var rows = Enumerable.Range(0, 20) .Select(i => new object[] { i, $"A{i}" }) .ToArray(); @@ -429,8 +441,7 @@ await Task.WhenAll( }), Task.Run(async () => { - // ReSharper disable once AccessToDisposedClosure - var importer = ydbConnection.BeginBulkUpsertImport(table2, columns); + var importer = conn.BeginBulkUpsertImport(table2, columns); var rows = Enumerable.Range(0, 20) .Select(i => new object[] { i, $"B{i}" }) .ToArray(); @@ -442,7 +453,7 @@ await Task.WhenAll( foreach (var table in new[] { table1, table2 }) { - await using var checkCmd = ydbConnection.CreateCommand(); + await using var checkCmd = conn.CreateCommand(); checkCmd.CommandText = $"SELECT COUNT(*) FROM {table}"; var count = Convert.ToInt32(await checkCmd.ExecuteScalarAsync()); Assert.Equal(20, count); @@ -452,10 +463,12 @@ await Task.WhenAll( { foreach (var table in new[] { table1, table2 }) { - await using var dropCmd = ydbConnection.CreateCommand(); + await using var dropCmd = conn.CreateCommand(); dropCmd.CommandText = $"DROP TABLE {table}"; await dropCmd.ExecuteNonQueryAsync(); } + + await conn.DisposeAsync(); } } @@ -463,14 +476,140 @@ await Task.WhenAll( public async Task BulkUpsertImporter_ThrowsOnNonexistentTable() { var tableName = $"Nonexistent_{Guid.NewGuid():N}"; - await using var ydbConnection = await CreateOpenConnectionAsync(); + await using var conn = await CreateOpenConnectionAsync(); var columns = new[] { "Id", "Name" }; - var importer = ydbConnection.BeginBulkUpsertImport(tableName, columns); + var importer = conn.BeginBulkUpsertImport(tableName, columns); await importer.AddRowAsync(1, "NotExists"); await Assert.ThrowsAsync(async () => { await importer.FlushAsync(); }); } + + [Fact] + public async Task BulkUpsertImporter_AddListAsync_HappyPath_InsertsRows() + { + var table = $"BulkImporter_List_{Guid.NewGuid():N}"; + + await using var conn = await CreateOpenConnectionAsync(); + try + { + await using (var create = conn.CreateCommand()) + { + create.CommandText = $""" + CREATE TABLE {table} ( + Id Int64, + Name Text, + PRIMARY KEY (Id) + ) + """; + await create.ExecuteNonQueryAsync(); + } + + var importer = conn.BeginBulkUpsertImport(table, ["Id", "Name"]); + + // $rows: List> + var rows = YdbList + .Struct("Id", "Name") + .AddRow(1L, "A") + .AddRow(2L, "B"); + + await importer.AddListAsync(rows); + await importer.FlushAsync(); + + await using var check = conn.CreateCommand(); + check.CommandText = $"SELECT COUNT(*) FROM {table}"; + var count = Convert.ToInt32(await check.ExecuteScalarAsync()); + Assert.Equal(2, count); + } + finally + { + await using var drop = conn.CreateCommand(); + drop.CommandText = $"DROP TABLE {table}"; + await drop.ExecuteNonQueryAsync(); + } + } + + [Fact] + public async Task BulkUpsertImporter_AddListAsync_WrongStructColumns_ThrowsArgumentException() + { + var table = $"BulkImporter_List_{Guid.NewGuid():N}"; + + await using var conn = await CreateOpenConnectionAsync(); + try + { + await using (var create = conn.CreateCommand()) + { + create.CommandText = $""" + CREATE TABLE {table} ( + Id Int64, + Name Text, + PRIMARY KEY (Id) + ) + """; + await create.ExecuteNonQueryAsync(); + } + + var importer = conn.BeginBulkUpsertImport(table, ["Id", "Name"]); + + var wrong = YdbList + .Struct("Id", "Wrong") + .AddRow(1L, "A"); + + var ex = await Assert.ThrowsAsync(() => importer.AddListAsync(wrong).AsTask()); + Assert.Contains("mismatch", ex.Message, StringComparison.OrdinalIgnoreCase); + Assert.Contains("expected 'Name'", ex.Message, StringComparison.OrdinalIgnoreCase); + } + finally + { + await using var drop = conn.CreateCommand(); + drop.CommandText = $"DROP TABLE {table}"; + await drop.ExecuteNonQueryAsync(); + } + } + + [Fact] + public async Task BulkUpsertImporter_AddRowAsync_WhenLaterRowHasNull_AllowsNullValue() + { + var table = $"bulk_null_{Guid.NewGuid():N}"; + await using var conn = await CreateOpenConnectionAsync(); + try + { + await using (var create = conn.CreateCommand()) + { + create.CommandText = $""" + CREATE TABLE {table} ( + Id Int32, + Name Text?, + PRIMARY KEY (Id) + ) + """; + await create.ExecuteNonQueryAsync(); + } + + var importer = conn.BeginBulkUpsertImport(table, ["Id", "Name"]); + + await importer.AddRowAsync(1, "A"); + + await importer.AddRowAsync(2, new YdbParameter { YdbDbType = YdbDbType.Text, Value = null }); + + await importer.FlushAsync(); + + await using (var check = conn.CreateCommand()) + { + check.CommandText = $"SELECT Name FROM {table} WHERE Id=1"; + Assert.Equal("A", (string)(await check.ExecuteScalarAsync())!); + + check.CommandText = $"SELECT Name IS NULL FROM {table} WHERE Id=2"; + Assert.True((bool)(await check.ExecuteScalarAsync())!); + } + } + finally + { + await using var drop = conn.CreateCommand(); + drop.CommandText = $"DROP TABLE {table}"; + await drop.ExecuteNonQueryAsync(); + } + } }