Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Feat Value: Add YdbList builder for List<Struct<...>> (protobuf-based); works with IBulkUpsertImporter.AddListAsync.
- Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation.
- Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation.
- Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods.
Expand Down
99 changes: 86 additions & 13 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,112 @@ internal BulkUpsertImporter(
_cancellationToken = cancellationToken;
}

/// <summary>
/// Add a single row to the current BulkUpsert batch.
/// </summary>
/// <param name="values">Values in the same order as the configured <c>columns</c>.</param>
/// <remarks>
/// Supported per-cell types: <see cref="YdbValue"/>, <see cref="YdbParameter"/>.
/// Other CLR values are converted via <see cref="YdbParameter"/>.
/// Passing <see cref="YdbList"/> as a column value is not supported (tables do not accept list-typed columns).
/// Use <c>AddListAsync(YdbList)</c> to append many rows from a list parameter.
/// </remarks>
/// <exception cref="ArgumentException">Thrown when the number of values differs from the number of columns.</exception>
/// <exception cref="InvalidOperationException">Thrown when a value cannot be mapped to a YDB type.</exception>
/// <example>
/// <code>
/// // columns: ["Id", "Name"]
/// await importer.AddRowAsync(1, "Alice");
/// await importer.AddRowAsync(2, "Bob");
/// </code>
/// </example>
public async ValueTask AddRowAsync(params object[] values)
{
if (values.Length != _columns.Count)
throw new ArgumentException("Values count must match columns count", nameof(values));
throw new ArgumentException("Values count must match columns count.", nameof(values));

var ydbValues = values.Select(v => v switch
{
YdbValue ydbValue => ydbValue.GetProto(),
YdbParameter param => param.TypedValue,
_ => new YdbParameter { Value = v }.TypedValue
}
).ToArray();
{
YdbValue ydbValue => ydbValue.GetProto(),
YdbParameter param => param.TypedValue,
YdbList => throw new ArgumentException(
"YdbList cannot be used as a column value. Use AddListAsync(YdbList) to append multiple rows.",
nameof(values)),
_ => new YdbParameter { Value = v }.TypedValue
}).ToArray();

var protoStruct = new Ydb.Value();
foreach (var value in ydbValues) protoStruct.Items.Add(value.Value);
foreach (var tv in ydbValues)
protoStruct.Items.Add(tv.Value);

var rowSize = protoStruct.CalculateSize();

if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0)
{
await FlushAsync();
}
await FlushAsync().ConfigureAwait(false);

_rows.Add(protoStruct);
_currentBytes += rowSize;

_structType ??= new StructType
{ Members = { _columns.Select((col, i) => new StructMember { Name = col, Type = ydbValues[i].Type }) } };
{
Members = { _columns.Select((col, i) => new StructMember { Name = col, Type = ydbValues[i].Type }) }
};
}

/// <summary>
/// Add multiple rows from a <see cref="YdbList"/> shaped as <c>List&lt;Struct&lt;...&gt;&gt;</c>.
/// Struct member names and order must exactly match the configured <c>columns</c>.
/// </summary>
/// <param name="list">Rows as <c>List&lt;Struct&lt;...&gt;&gt;</c> with the exact column names and order.</param>
/// <exception cref="ArgumentException">
/// Thrown when the struct column set, order, or count does not match the importer’s <c>columns</c>.
/// </exception>
public async ValueTask AddListAsync(YdbList list)
{
var tv = list.ToTypedValue();

var incomingStruct = tv.Type.ListType.Item.StructType;

if (incomingStruct.Members.Count != _columns.Count)
throw new ArgumentException(
$"The number of columns in the List<Struct> ({incomingStruct.Members.Count}) " +
$"does not match the expected ({_columns.Count}).");

for (var i = 0; i < _columns.Count; i++)
{
var expected = _columns[i];
var actual = incomingStruct.Members[i].Name;
if (!string.Equals(expected, actual, StringComparison.Ordinal))
throw new ArgumentException(
$"Column name mismatch at position {i}: expected '{expected}', received '{actual}'.");
}

_structType ??= incomingStruct;

foreach (var rowValue in tv.Value.Items)
{
var rowSize = rowValue.CalculateSize();

if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0)
await FlushAsync().ConfigureAwait(false);

_rows.Add(rowValue);
_currentBytes += rowSize;
}
}

/// <summary>
/// Flush the current batch via BulkUpsert. No-op if the batch is empty.
/// </summary>
/// <remarks>
/// Uses the collected struct schema from the first added row (or the provided list) and sends
/// the accumulated rows in a single BulkUpsert request.
/// </remarks>
public async ValueTask FlushAsync()
{
if (_rows.Count == 0) return;
if (_rows.Count == 0)
return;

if (_structType == null)
throw new InvalidOperationException("structType is undefined");

Expand Down
23 changes: 23 additions & 0 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado.BulkUpsert;

/// <summary>
/// Bulk upsert importer API: add rows and flush them to YDB in batches.
/// </summary>
public interface IBulkUpsertImporter
{
/// <summary>
/// Add a single row to the batch. Values must match the importer’s column order.
/// </summary>
/// <param name="row">Values in the same order as the configured <c>columns</c>.</param>
/// <exception cref="ArgumentException">Thrown when the number of values differs from the number of columns.</exception>
ValueTask AddRowAsync(params object[] row);

/// <summary>
/// Add multiple rows from a <see cref="YdbList"/> shaped as <c>List&lt;Struct&lt;...&gt;&gt;</c>.
/// Struct member names and order must exactly match the configured <c>columns</c>.
/// </summary>
/// <param name="list">Rows as <c>List&lt;Struct&lt;...&gt;&gt;</c> with the exact column names and order.</param>
/// <exception cref="ArgumentException">
/// Thrown when the struct column set, order, or count does not match the importer’s <c>columns</c>.
/// </exception>
ValueTask AddListAsync(YdbList list);

/// <summary>
/// Flush the current batch via BulkUpsert. No-op if the batch is empty.
/// </summary>
ValueTask FlushAsync();
}
3 changes: 1 addition & 2 deletions src/Ydb.Sdk/src/Ado/YdbParameter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ internal TypedValue TypedValue

private TypedValue Cast(object value) => value switch
{
YdbList ydbList => ydbList.ToTypedValue(),
string stringValue => stringValue.Text(),
bool boolValue => boolValue.Bool(),
sbyte sbyteValue => sbyteValue.Int8(),
Expand Down Expand Up @@ -285,9 +286,7 @@ private TypedValue Decimal(decimal value) =>
private TypedValue NullTypedValue()
{
if (YdbNullByDbType.TryGetValue(YdbDbType, out var value))
{
return value;
}

if (YdbDbType == YdbDbType.Decimal)
{
Expand Down
157 changes: 157 additions & 0 deletions src/Ydb.Sdk/src/Value/YdbList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
using Google.Protobuf.WellKnownTypes;
using Ydb.Sdk.Ado;
using Ydb.Sdk.Ado.YdbType;

namespace Ydb.Sdk.Value;

/// <summary>
/// Builder for YDB values shaped as <c>List&lt;Struct&lt;...&gt;&gt;</c>, working directly with protobuf.
/// Each call to <see cref="AddRow(object?[])"/> converts input values into protobuf cells and stores the row.
/// The struct schema is derived from explicit type hints or from the first non-null sample per column.
/// If a column contains at least one <c>null</c>, its member type becomes <c>Optional&lt;T&gt;</c>.
/// </summary>
public sealed class YdbList
{
private readonly string[] _columns;
private readonly YdbDbType[] _typeHints;

private readonly List<Ydb.Value> _rows = new();

private readonly Type?[] _observedBaseTypes;
private readonly bool[] _sawNull;

/// <summary>
/// Create a struct-mode list with column names. Types will be inferred from the
/// first non-null value per column (columns with any nulls become <c>Optional&lt;T&gt;</c>).
/// </summary>
/// <param name="columns">Struct member names, in order.</param>
public static YdbList Struct(params string[] columns) => new(columns);

/// <summary>
/// Create a struct-mode list with column names and explicit YDB type hints. The array length must match <paramref name="columns"/>.
/// Columns that contain <c>null</c> are emitted as <c>Optional&lt;hintedType&gt;</c>.
/// </summary>
/// <param name="columns">Struct member names, in order.</param>
/// <param name="types">YDB type hints for each column (same length as <paramref name="columns"/>).</param>
public static YdbList Struct(string[] columns, YdbDbType[] types) => new(columns, types);

/// <summary>
/// Construct a struct-mode list. If <paramref name="types"/> is null, schema is inferred from values.
/// </summary>
private YdbList(string[] columns, YdbDbType[]? types = null)
{
if (columns is null || columns.Length == 0)
throw new ArgumentException("Columns must be non-empty.", nameof(columns));
if (types is not null && types.Length != columns.Length)
throw new ArgumentException("Length of 'types' must match length of 'columns'.", nameof(types));

_columns = columns;
_typeHints = types ?? Enumerable.Repeat(YdbDbType.Unspecified, columns.Length).ToArray();
_observedBaseTypes = new Type[_columns.Length];
_sawNull = new bool[_columns.Length];
}

/// <summary>
/// Add a single positional row. The number of values must match the number of columns.
/// Values are converted to protobuf cells and the row is stored immediately.
/// </summary>
/// <param name="values">Row values in the same order as the declared columns.</param>
/// <exception cref="ArgumentException">Thrown when the number of values differs from the number of columns.</exception>
public YdbList AddRow(params object?[] values)
{
if (values.Length != _columns.Length)
throw new ArgumentException($"Expected {_columns.Length} values, got {values.Length}.");

var cells = new List<Ydb.Value>(_columns.Length);

for (var i = 0; i < _columns.Length; i++)
{
var v = values[i];

if (v is null || v == DBNull.Value)
{
_sawNull[i] = true;
cells.Add(new Ydb.Value { NullFlagValue = NullValue.NullValue });
continue;
}

var tv = v switch
{
YdbValue yv => yv.GetProto(),
YdbParameter p => p.TypedValue,
_ => new YdbParameter { Value = v }.TypedValue
};

var t = tv.Type;
if (t.TypeCase == Type.TypeOneofCase.OptionalType && t.OptionalType is not null)
t = t.OptionalType.Item;

_observedBaseTypes[i] ??= t;
cells.Add(tv.Value);
}

_rows.Add(new Ydb.Value { Items = { cells } });
return this;
}

/// <summary>
/// Convert the accumulated rows into a YDB <see cref="TypedValue"/> with shape <c>List&lt;Struct&lt;...&gt;&gt;</c>.
/// Columns that observed <c>null</c> values are emitted as <c>Optional&lt;T&gt;</c>.
/// </summary>
/// <returns>TypedValue representing <c>List&lt;Struct&lt;...&gt;&gt;</c> and the collected data rows.</returns>
/// <exception cref="InvalidOperationException">
/// Thrown when the schema cannot be inferred (e.g., empty list without type hints, or a column has only nulls
/// and no explicit type hint).
/// </exception>
internal TypedValue ToTypedValue()
{
if (_rows.Count == 0)
throw new InvalidOperationException(
"Cannot infer Struct schema from an empty list without explicit YdbDbType hints.");

var n = _columns.Length;
var memberTypes = new Type[n];

for (var i = 0; i < n; i++)
{
Type? baseType;

if (_typeHints[i] != YdbDbType.Unspecified)
{
baseType = new YdbParameter { YdbDbType = _typeHints[i], Value = DBNull.Value }.TypedValue.Type;

if (baseType.TypeCase == Type.TypeOneofCase.OptionalType && baseType.OptionalType is not null)
baseType = baseType.OptionalType.Item;
}
else
{
baseType = _observedBaseTypes[i];
if (baseType is null)
throw new InvalidOperationException(
$"Column '{_columns[i]}' has only nulls and no explicit YdbDbType. Provide a type hint.");
}

memberTypes[i] = _sawNull[i] && baseType.TypeCase != Type.TypeOneofCase.OptionalType
? new Type { OptionalType = new OptionalType { Item = baseType } }
: baseType;
}

var structType = new StructType
{
Members =
{
_columns.Select((name, idx) => new StructMember
{
Name = name,
Type = memberTypes[idx]
})
}
};

return new TypedValue
{
Type = new Type { ListType = new ListType { Item = new Type { StructType = structType } } },
Value = new Ydb.Value { Items = { _rows } }
};
}
}
Loading
Loading