Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 83 additions & 12 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,110 @@ internal BulkUpsertImporter(
_cancellationToken = cancellationToken;
}

/// <summary>
/// Add a single row to the current BulkUpsert batch.
/// </summary>
/// <param name="values">Column values in the same order as the configured <c>columns</c>.</param>
/// <remarks>
/// Supported element types: <see cref="YdbValue"/>, <see cref="YdbParameter"/>, <see cref="YdbList"/> (as-is);
/// other CLR values are converted via <see cref="YdbParameter"/>.
/// </remarks>
/// <example>
/// <code>
/// // columns: ["Id", "Name"]
/// await importer.AddRowAsync(1, "Alice");
/// </code>
/// </example>
/// <exception cref="ArgumentException">When the number of values doesn't equal the number of columns.</exception>
/// <exception cref="InvalidOperationException">When a value cannot be mapped to a YDB type.</exception>
public async ValueTask AddRowAsync(params object[] values)
{
if (values.Length != _columns.Count)
throw new ArgumentException("Values count must match columns count", nameof(values));
throw new ArgumentException("Values count must match columns count.", nameof(values));

var ydbValues = values.Select(v => v switch
{
YdbValue ydbValue => ydbValue.GetProto(),
YdbParameter param => param.TypedValue,
_ => new YdbParameter { Value = v }.TypedValue
}
).ToArray();
{
YdbValue ydbValue => ydbValue.GetProto(),
YdbParameter param => param.TypedValue,
YdbList list => list.ToTypedValue(),
_ => new YdbParameter { Value = v }.TypedValue
}).ToArray();

var protoStruct = new Ydb.Value();
foreach (var value in ydbValues) protoStruct.Items.Add(value.Value);
foreach (var value in ydbValues)
protoStruct.Items.Add(value.Value);

var rowSize = protoStruct.CalculateSize();

if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0)
{
await FlushAsync();
}

_rows.Add(protoStruct);
_currentBytes += rowSize;

_structType ??= new StructType
{ Members = { _columns.Select((col, i) => new StructMember { Name = col, Type = ydbValues[i].Type }) } };
{
Members = { _columns.Select((col, i) => new StructMember { Name = col, Type = ydbValues[i].Type }) }
};
}

/// <summary>
/// Add multiple rows from a single <see cref="YdbList"/> parameter.
/// </summary>
/// <remarks>
/// Expects <c>List&lt;Struct&lt;...&gt;&gt;</c>; struct member names and order must exactly match the configured <c>columns</c>.
/// Example: <c>columns=["Id","Name"]</c> → <c>List&lt;Struct&lt;Id:Int64, Name:Utf8&gt;&gt;</c>.
/// </remarks>
public async ValueTask AddListAsync(YdbList list)
{
var tv = list.ToTypedValue();

if (tv.Type.TypeCase != Type.TypeOneofCase.ListType ||
tv.Type.ListType.Item.TypeCase != Type.TypeOneofCase.StructType)
{
throw new ArgumentException(
"BulkUpsertImporter.AddListAsync expects a YdbList with a value like List<Struct<...>>",
nameof(list));
}

var incomingStruct = tv.Type.ListType.Item.StructType;

if (incomingStruct.Members.Count != _columns.Count)
throw new ArgumentException(
$"The number of columns in the List<Struct> ({incomingStruct.Members.Count}) " +
$"does not match the expected ({_columns.Count}).");

for (var i = 0; i < _columns.Count; i++)
{
var expected = _columns[i];
var actual = incomingStruct.Members[i].Name;
if (!string.Equals(expected, actual, StringComparison.Ordinal))
throw new ArgumentException(
$"Column name mismatch at position {i}: expected '{expected}', received '{actual}'.");
}

_structType ??= incomingStruct;

foreach (var rowValue in tv.Value.Items)
{
var rowSize = rowValue.CalculateSize();

if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0)
await FlushAsync().ConfigureAwait(false);

_rows.Add(rowValue);
_currentBytes += rowSize;
}
}

/// <summary>
/// Flush the current batch via BulkUpsert. No-op if the batch is empty.
/// </summary>
public async ValueTask FlushAsync()
{
if (_rows.Count == 0) return;
if (_rows.Count == 0)
return;

if (_structType == null)
throw new InvalidOperationException("structType is undefined");

Expand Down
11 changes: 11 additions & 0 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado.BulkUpsert;

public interface IBulkUpsertImporter
{
/// <summary>Add a single row to the batch. Values must match the importer column order.</summary>
/// <param name="row">Column values in the same order as the configured <c>columns</c>.</param>
ValueTask AddRowAsync(params object[] row);

/// <summary>
/// Add many rows from <see cref="YdbList"/> (shape: <c>List&lt;Struct&lt;...&gt;&gt;</c>).
/// Struct member names and order must exactly match the configured <c>columns</c>.
/// </summary>
ValueTask AddListAsync(YdbList list);

/// <summary>Flush the current batch via BulkUpsert (no-op if empty).</summary>
ValueTask FlushAsync();
}
11 changes: 5 additions & 6 deletions src/Ydb.Sdk/src/Ado/YdbParameter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ internal TypedValue TypedValue
YdbDbType.Double => MakeDouble(value),
YdbDbType.Decimal when value is decimal decimalValue => Decimal(decimalValue),
YdbDbType.Bytes => MakeBytes(value),
YdbDbType.Json when value is string stringValue => stringValue.Json(),
YdbDbType.JsonDocument when value is string stringValue => stringValue.JsonDocument(),
YdbDbType.Json when value is string stringJsonValue => stringJsonValue.Json(),
YdbDbType.JsonDocument when value is string stringJsonDocumentValue => stringJsonDocumentValue
.JsonDocument(),
YdbDbType.Uuid when value is Guid guidValue => guidValue.Uuid(),
YdbDbType.Date => MakeDate(value),
YdbDbType.DateTime when value is DateTime dateTimeValue => dateTimeValue.Datetime(),
Expand Down Expand Up @@ -246,6 +247,7 @@ internal TypedValue TypedValue

private TypedValue Cast(object value) => value switch
{
YdbList ydbList => ydbList.ToTypedValue(),
string stringValue => stringValue.Text(),
bool boolValue => boolValue.Bool(),
sbyte sbyteValue => sbyteValue.Int8(),
Expand Down Expand Up @@ -278,9 +280,7 @@ private TypedValue Decimal(decimal value) =>
private TypedValue NullTypedValue()
{
if (YdbNullByDbType.TryGetValue(YdbDbType, out var value))
{
return value;
}

if (YdbDbType == YdbDbType.Decimal)
{
Expand All @@ -290,8 +290,7 @@ private TypedValue NullTypedValue()
}

throw new InvalidOperationException(
"Writing value of 'null' is not supported without explicit mapping to the YdbDbType"
);
"Writing value of 'null' is not supported without explicit mapping to the YdbDbType");
}

private InvalidOperationException ValueTypeNotSupportedException =>
Expand Down
150 changes: 150 additions & 0 deletions src/Ydb.Sdk/src/Value/YdbList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
using Google.Protobuf.WellKnownTypes;
using Ydb.Sdk.Ado;
using Ydb.Sdk.Ado.YdbType;

namespace Ydb.Sdk.Value;

/// <summary>
/// Struct-only builder for YDB <c>List&lt;Struct&lt;...&gt;&gt;</c>.
/// Works directly with protobuf:
/// - Each call to <see cref="AddRow(object?[])"/> converts values into protobuf cells (<see cref="Ydb.Value"/>) and stores a row immediately.
/// - The struct schema (<see cref="StructType"/>) is derived from column type hints or from the first non-null sample per column.
/// - If a column has at least one <c>null</c>, its type becomes <c>Optional&lt;T&gt;</c>; individual null cells are encoded via <see cref="NullValue.NullValue"/>.
/// </summary>
public sealed class YdbList
{
private readonly string[] _columns;
private readonly YdbDbType[]? _typeHints;

private readonly List<Ydb.Value> _rows = new();

private readonly Type?[] _observedBaseTypes;
private readonly bool[] _sawNull;

/// <summary>
/// Create a struct-mode list with column names; types will be inferred from the
/// first non-null value per column (columns with any nulls become Optional&lt;T&gt;).
/// </summary>
public static YdbList Struct(params string[] columns) => new(columns);

/// <summary>
/// Create a struct-mode list with column names and explicit YDB type hints
/// (array length must match <paramref name="columns"/>). Columns with any nulls
/// will be emitted as Optional&lt;hintedType&gt;.
/// </summary>
public static YdbList Struct(string[] columns, YdbDbType[] types) => new(columns, types);

/// <summary>
/// Construct a struct-mode list. If <paramref name="types"/> is null, schema is inferred from values.
/// </summary>
private YdbList(string[] columns, YdbDbType[]? types = null)
{
if (columns is null || columns.Length == 0)
throw new ArgumentException("Columns must be non-empty.", nameof(columns));
if (types is not null && types.Length != columns.Length)
throw new ArgumentException("Length of 'types' must match length of 'columns'.", nameof(types));

_columns = columns;
_typeHints = types;
_observedBaseTypes = new Type[_columns.Length];
_sawNull = new bool[_columns.Length];
}

/// <summary>
/// Add a single positional row. The number of values must match the number of columns.
/// Values are converted to protobuf cells and the row is stored immediately.
/// </summary>
public YdbList AddRow(params object?[] values)
{
if (values.Length != _columns.Length)
throw new ArgumentException($"Expected {_columns.Length} values, got {values.Length}.");

var cells = new List<Ydb.Value>(_columns.Length);

for (var i = 0; i < _columns.Length; i++)
{
var v = values[i];

if (v is null || v == DBNull.Value)
{
_sawNull[i] = true;
cells.Add(new Ydb.Value { NullFlagValue = NullValue.NullValue });
continue;
}

var tv = v switch
{
YdbValue yv => yv.GetProto(),
YdbParameter p => p.TypedValue,
_ => new YdbParameter { Value = v }.TypedValue
};

var t = tv.Type;
if (t.TypeCase == Type.TypeOneofCase.OptionalType && t.OptionalType is not null)
t = t.OptionalType.Item;

_observedBaseTypes[i] ??= t;
cells.Add(tv.Value);
}

_rows.Add(new Ydb.Value { Items = { cells } });
return this;
}

/// <summary>
/// Convert to a YDB <see cref="TypedValue"/> shaped as <c>List&lt;Struct&lt;...&gt;&gt;</c>.
/// Columns that observed <c>null</c> values are emitted as <c>Optional&lt;T&gt;</c>;
/// individual <c>null</c> cells are encoded via <see cref="NullValue.NullValue"/>.
/// </summary>
internal TypedValue ToTypedValue()
{
if (_rows.Count == 0 && (_typeHints is null || _typeHints.All(t => t == YdbDbType.Unspecified)))
throw new InvalidOperationException(
"Cannot infer Struct schema from an empty list without explicit YdbDbType hints.");

var n = _columns.Length;
var memberTypes = new Type[n];

for (var i = 0; i < n; i++)
{
Type? baseType;

if (_typeHints is not null && _typeHints[i] != YdbDbType.Unspecified)
{
baseType = new YdbParameter { YdbDbType = _typeHints[i], Value = DBNull.Value }.TypedValue.Type;

if (baseType.TypeCase == Type.TypeOneofCase.OptionalType && baseType.OptionalType is not null)
baseType = baseType.OptionalType.Item;
}
else
{
baseType = _observedBaseTypes[i];
if (baseType is null)
throw new InvalidOperationException(
$"Column '{_columns[i]}' has only nulls and no explicit YdbDbType. Provide a type hint.");
}

memberTypes[i] = _sawNull[i] && baseType.TypeCase != Type.TypeOneofCase.OptionalType
? new Type { OptionalType = new OptionalType { Item = baseType } }
: baseType;
}

var structType = new StructType
{
Members =
{
_columns.Select((name, idx) => new StructMember
{
Name = name,
Type = memberTypes[idx]
})
}
};

return new TypedValue
{
Type = new Type { ListType = new ListType { Item = new Type { StructType = structType } } },
Value = new Ydb.Value { Items = { _rows } }
};
}
}
Loading
Loading