Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 83 additions & 12 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/BulkUpsertImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,110 @@ internal BulkUpsertImporter(
_cancellationToken = cancellationToken;
}

/// <summary>
/// Add a single row to the current BulkUpsert batch.
/// </summary>
/// <param name="values">Column values in the same order as the configured <c>columns</c>.</param>
/// <remarks>
/// Supported element types: <see cref="YdbValue"/>, <see cref="YdbParameter"/>, <see cref="YdbList"/> (as-is);
/// other CLR values are converted via <see cref="YdbParameter"/>.
/// </remarks>
/// <example>
/// <code>
/// // columns: ["Id", "Name"]
/// await importer.AddRowAsync(1, "Alice");
/// </code>
/// </example>
/// <exception cref="ArgumentException">When the number of values doesn't equal the number of columns.</exception>
/// <exception cref="InvalidOperationException">When a value cannot be mapped to a YDB type.</exception>
public async ValueTask AddRowAsync(params object[] values)
{
if (values.Length != _columns.Count)
throw new ArgumentException("Values count must match columns count", nameof(values));
throw new ArgumentException("Values count must match columns count.", nameof(values));

var ydbValues = values.Select(v => v switch
{
YdbValue ydbValue => ydbValue.GetProto(),
YdbParameter param => param.TypedValue,
_ => new YdbParameter { Value = v }.TypedValue
}
).ToArray();
{
YdbValue ydbValue => ydbValue.GetProto(),
YdbParameter param => param.TypedValue,
YdbList list => list.ToTypedValue(),
_ => new YdbParameter { Value = v }.TypedValue
}).ToArray();

var protoStruct = new Ydb.Value();
foreach (var value in ydbValues) protoStruct.Items.Add(value.Value);
foreach (var value in ydbValues)
protoStruct.Items.Add(value.Value);

var rowSize = protoStruct.CalculateSize();

if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0)
{
await FlushAsync();
}

_rows.Add(protoStruct);
_currentBytes += rowSize;

_structType ??= new StructType
{ Members = { _columns.Select((col, i) => new StructMember { Name = col, Type = ydbValues[i].Type }) } };
{
Members = { _columns.Select((col, i) => new StructMember { Name = col, Type = ydbValues[i].Type }) }
};
}

/// <summary>
/// Add multiple rows from a single <see cref="YdbList"/> parameter.
/// </summary>
/// <remarks>
/// Expects <c>List&lt;Struct&lt;...&gt;&gt;</c>; struct member names and order must exactly match the configured <c>columns</c>.
/// Example: <c>columns=["Id","Name"]</c> → <c>List&lt;Struct&lt;Id:Int64, Name:Utf8&gt;&gt;</c>.
/// </remarks>
public async ValueTask AddListAsync(YdbList list)
{
var tv = list.ToTypedValue();

if (tv.Type.TypeCase != Type.TypeOneofCase.ListType ||
tv.Type.ListType.Item.TypeCase != Type.TypeOneofCase.StructType)
{
throw new ArgumentException(
"BulkUpsertImporter.AddListAsync expects a YdbList with a value like List<Struct<...>>",
nameof(list));
}

var incomingStruct = tv.Type.ListType.Item.StructType;

if (incomingStruct.Members.Count != _columns.Count)
throw new ArgumentException(
$"The number of columns in the List<Struct> ({incomingStruct.Members.Count}) " +
$"does not match the expected ({_columns.Count}).");

for (var i = 0; i < _columns.Count; i++)
{
var expected = _columns[i];
var actual = incomingStruct.Members[i].Name;
if (!string.Equals(expected, actual, StringComparison.Ordinal))
throw new ArgumentException(
$"Column name mismatch at position {i}: expected '{expected}', received '{actual}'.");
}

_structType ??= incomingStruct;

foreach (var rowValue in tv.Value.Items)
{
var rowSize = rowValue.CalculateSize();

if (_currentBytes + rowSize > _maxBatchByteSize && _rows.Count > 0)
await FlushAsync().ConfigureAwait(false);

_rows.Add(rowValue);
_currentBytes += rowSize;
}
}

/// <summary>
/// Flush the current batch via BulkUpsert. No-op if the batch is empty.
/// </summary>
public async ValueTask FlushAsync()
{
if (_rows.Count == 0) return;
if (_rows.Count == 0)
return;

if (_structType == null)
throw new InvalidOperationException("structType is undefined");

Expand Down
11 changes: 11 additions & 0 deletions src/Ydb.Sdk/src/Ado/BulkUpsert/IBulkUpsertImporter.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado.BulkUpsert;

public interface IBulkUpsertImporter
{
/// <summary>Add a single row to the batch. Values must match the importer column order.</summary>
/// <param name="row">Column values in the same order as the configured <c>columns</c>.</param>
ValueTask AddRowAsync(params object[] row);

/// <summary>
/// Add many rows from <see cref="YdbList"/> (shape: <c>List&lt;Struct&lt;...&gt;&gt;</c>).
/// Struct member names and order must exactly match the configured <c>columns</c>.
/// </summary>
ValueTask AddListAsync(YdbList list);

/// <summary>Flush the current batch via BulkUpsert (no-op if empty).</summary>
ValueTask FlushAsync();
}
18 changes: 6 additions & 12 deletions src/Ydb.Sdk/src/Ado/YdbParameter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,10 @@ internal TypedValue TypedValue
var value = Value;

if (value is YdbValue ydbValue)
{
return ydbValue.GetProto();
}

if (value == null || value == DBNull.Value)
{
return NullTypedValue();
}

return YdbDbType switch
{
Expand All @@ -149,13 +145,13 @@ internal TypedValue TypedValue
YdbDbType.Double => MakeDouble(value),
YdbDbType.Decimal when value is decimal decimalValue => Decimal(decimalValue),
YdbDbType.Bytes => MakeBytes(value),
YdbDbType.Json when value is string stringValue => stringValue.Json(),
YdbDbType.JsonDocument when value is string stringValue => stringValue.JsonDocument(),
YdbDbType.Json when value is string sJson => sJson.Json(),
YdbDbType.JsonDocument when value is string sJsonDoc => sJsonDoc.JsonDocument(),
YdbDbType.Uuid when value is Guid guidValue => guidValue.Uuid(),
YdbDbType.Date => MakeDate(value),
YdbDbType.DateTime when value is DateTime dateTimeValue => dateTimeValue.Datetime(),
YdbDbType.DateTime when value is DateTime dt => dt.Datetime(),
YdbDbType.Timestamp => MakeTimestamp(value),
YdbDbType.Interval when value is TimeSpan timeSpanValue => timeSpanValue.Interval(),
YdbDbType.Interval when value is TimeSpan ts => ts.Interval(),
YdbDbType.Unspecified => Cast(value),
_ => throw ValueTypeNotSupportedException
};
Expand Down Expand Up @@ -246,6 +242,7 @@ internal TypedValue TypedValue

private TypedValue Cast(object value) => value switch
{
YdbList ydbList => ydbList.ToTypedValue(),
string stringValue => stringValue.Text(),
bool boolValue => boolValue.Bool(),
sbyte sbyteValue => sbyteValue.Int8(),
Expand Down Expand Up @@ -278,9 +275,7 @@ private TypedValue Decimal(decimal value) =>
private TypedValue NullTypedValue()
{
if (YdbNullByDbType.TryGetValue(YdbDbType, out var value))
{
return value;
}

if (YdbDbType == YdbDbType.Decimal)
{
Expand All @@ -290,8 +285,7 @@ private TypedValue NullTypedValue()
}

throw new InvalidOperationException(
"Writing value of 'null' is not supported without explicit mapping to the YdbDbType"
);
"Writing value of 'null' is not supported without explicit mapping to the YdbDbType");
}

private InvalidOperationException ValueTypeNotSupportedException =>
Expand Down
114 changes: 114 additions & 0 deletions src/Ydb.Sdk/src/Value/YdbList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using Google.Protobuf.WellKnownTypes;
using Ydb.Sdk.Ado;
using Ydb.Sdk.Ado.YdbType;

namespace Ydb.Sdk.Value;

/// <summary>
/// Struct-only builder for YDB <c>List&lt;Struct&lt;...&gt;&gt;</c>.
/// Define columns (optionally YDB types) and add positional rows; no external MakeStruct is needed.
/// </summary>
public sealed class YdbList
{
private readonly string[] _columns;
private readonly YdbDbType[]? _types;
private readonly List<object?[]> _rows = new();

/// <summary>Create Struct-mode list with column names; types will be inferred from the first non-null per column.</summary>
public static YdbList Struct(params string[] columns) => new(columns);

/// <summary>Create Struct-mode list with column names and explicit YDB types (same length as columns).</summary>
public static YdbList Struct(string[] columns, YdbDbType[] types) => new(columns, types);

/// <summary>Constructs Struct-mode list. If <paramref name="types"/> is null, types are inferred per column.</summary>
public YdbList(string[] columns, YdbDbType[]? types = null)
{
if (columns is null || columns.Length == 0)
throw new ArgumentException("Columns must be non-empty.", nameof(columns));
if (types is not null && types.Length != columns.Length)
throw new ArgumentException("Length of 'types' must match length of 'columns'.", nameof(types));

_columns = columns;
_types = types;
}

/// <summary>Add one positional row. Value count must match the number of columns.</summary>
public YdbList AddRow(params object?[] values)
{
if (values.Length != _columns.Length)
throw new ArgumentException($"Expected {_columns.Length} values, got {values.Length}.");
_rows.Add(values);
return this;
}

/// <summary>Convert to YDB <see cref="TypedValue"/> with shape <c>List&lt;Struct&lt;...&gt;&gt;</c>.</summary>
internal TypedValue ToTypedValue()
{
if (_rows.Count == 0 && (_types is null || _types.All(t => t == YdbDbType.Unspecified)))
throw new InvalidOperationException(
"Cannot infer Struct schema from an empty list without explicit YdbDbType hints.");

var memberTypes = new Type[_columns.Length];
for (var i = 0; i < _columns.Length; i++)
{
if (_types is not null && _types[i] != YdbDbType.Unspecified)
{
memberTypes[i] = new YdbParameter { YdbDbType = _types[i] }.TypedValue.Type;
continue;
}

object? sample = null;
foreach (var r in _rows)
{
var v = r[i];
if (v is not null && v != DBNull.Value)
{
sample = v;
break;
}
}

if (sample is null)
throw new InvalidOperationException(
$"Column '{_columns[i]}' has only nulls and no explicit YdbDbType. Provide a type hint.");

memberTypes[i] = new YdbParameter { Value = sample }.TypedValue.Type;
}

var structType = new StructType
{
Members = { _columns.Select((name, idx) => new StructMember { Name = name, Type = memberTypes[idx] }) }
};

var ydbRows = new List<Ydb.Value>(_rows.Count);
foreach (var r in _rows)
{
var fields = new List<Ydb.Value>(_columns.Length);
for (var i = 0; i < _columns.Length; i++)
{
var v = r[i];
if (v is null || v == DBNull.Value)
{
fields.Add(new Ydb.Value { NullFlagValue = NullValue.NullValue });
continue;
}

var tv = v switch
{
YdbValue yv => yv.GetProto(),
YdbParameter p => p.TypedValue,
_ => new YdbParameter { Value = v }.TypedValue
};
fields.Add(tv.Value);
}

ydbRows.Add(new Ydb.Value { Items = { fields } });
}

return new TypedValue
{
Type = new Type { ListType = new ListType { Item = new Type { StructType = structType } } },
Value = new Ydb.Value { Items = { ydbRows } }
};
}
}
Loading
Loading