Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TTopicFilters : public ITopicFilters {
, Counters_(std::move(counters))
{}

void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) override {
void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) override {
LOG_ROW_DISPATCHER_TRACE("ProcessData for " << RunHandlers_.size() << " clients, number rows: " << numberRows);

if (!numberRows) {
Expand Down Expand Up @@ -220,18 +220,18 @@ class TTopicFilters : public ITopicFilters {
RunHandlers_.erase(iter);
}

void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector<ui64>& /* offsets */, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) {
void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector<ui64>& /* offsets */, const TVector<ui64>& columnIndex, const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) {
const auto consumer = programRunHandler->GetConsumer();
const auto& columnIds = consumer->GetColumnIds();

TVector<const TVector<NYql::NUdf::TUnboxedValue>*> result;
TVector<std::span<NYql::NUdf::TUnboxedValue>> result;
result.reserve(columnIds.size());
for (ui64 columnId : columnIds) {
Y_ENSURE(columnId < columnIndex.size(), "Unexpected column id " << columnId << ", it is larger than index array size " << columnIndex.size());
const ui64 index = columnIndex[columnId];

Y_ENSURE(index < values.size(), "Unexpected column index " << index << ", it is larger than values array size " << values.size());
if (const auto value = values[index]) {
if (const auto value = values[index]; !value.empty()) {
result.emplace_back(value);
} else {
LOG_ROW_DISPATCHER_TRACE("Ignore processing for " << consumer->GetClientId() << ", client got parsing error for column " << columnId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ITopicFilters : public TThrRefBase, public TNonCopyable {

public:
// columnIndex - mapping from stable column id to index in values array
virtual void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) = 0;
virtual void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) = 0;
virtual void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr& ev) = 0;

virtual TStatus AddPrograms(IProcessedDataConsumer::TPtr consumer, std::unordered_map<TString, IProgramHolder::TPtr> programHolders) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ NYT::TNode CreateTypeNode(NYT::TNode&& typeNode) {
return CreateNamedNode("DataType", std::move(typeNode));
}

NYT::TNode CreateOptionalTypeNode(NYT::TNode&& typeNode) {
return CreateNamedNode("OptionalType", std::move(typeNode));
}

NYT::TNode CreateStructTypeNode(NYT::TNode&& membersNode) {
return CreateNamedNode("StructType", std::move(membersNode));
}
Expand Down Expand Up @@ -70,12 +74,12 @@ NYT::TNode MakeWatermarkOutputSchema() {
return CreateStructTypeNode(
NYT::TNode::CreateList()
.Add(CreateFieldNode(OFFSET_FIELD_NAME, CreateTypeNode("Uint64")))
.Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateTypeNode("Timestamp")))
.Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateOptionalTypeNode(CreateTypeNode("Timestamp"))))
);
}

struct TInputType {
const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& Values;
const TVector<std::span<NYql::NUdf::TUnboxedValue>>& Values;
ui64 NumberRows;
};

Expand Down Expand Up @@ -152,8 +156,9 @@ class TInputConsumer : public NYql::NPureCalc::IConsumer<TInputType> {

items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(rowId);

for (ui64 fieldId = 0; const auto column : input.Values) {
items[FieldsPositions[fieldId++]] = column->at(rowId);
for (ui64 fieldId = 0; const auto& column : input.Values) {
Y_DEBUG_ABORT_UNLESS(column.size() > rowId);
items[FieldsPositions[fieldId++]] = column[rowId];
}

Worker->Push(std::move(result));
Expand Down Expand Up @@ -415,7 +420,7 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable
ActiveFilters_->Dec();
}

void ProcessData(const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) const override {
void ProcessData(const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) const override {
LOG_ROW_DISPATCHER_TRACE("ProcessData for " << numberRows << " rows");

if (!ProgramHolder_) {
Expand Down Expand Up @@ -456,21 +461,10 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable

TStringBuilder sb;
sb << R"(PRAGMA config.flags("LLVM", ")" << (settings.EnabledLLVM ? "ON" : "OFF") << R"(");)" << '\n';
sb << "$input ="
<< " SELECT "
<< OFFSET_FIELD_NAME << ", "
<< watermarkExpr << " AS " << WATERMARK_FIELD_NAME
<< " FROM Input;\n";
sb << "$output ="
<< " SELECT "
<< OFFSET_FIELD_NAME << ", "
<< WATERMARK_FIELD_NAME
<< " FROM $input"
<< " WHERE " << WATERMARK_FIELD_NAME << " IS NOT NULL;\n";
sb << "SELECT "
<< OFFSET_FIELD_NAME << ", "
<< "Unwrap(" << WATERMARK_FIELD_NAME << ") AS " << WATERMARK_FIELD_NAME
<< " FROM $output;\n";
<< watermarkExpr << " AS " << WATERMARK_FIELD_NAME
<< " FROM Input;\n";

TString result = sb;
LOG_ROW_DISPATCHER_DEBUG("Generated sql:\n" << result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class IProgramRunHandler : public TThrRefBase {
return ProgramHolder_;
}

virtual void ProcessData(const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) const = 0;
virtual void ProcessData(const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) const = 0;

protected:
TString Name_;
Expand Down
55 changes: 33 additions & 22 deletions ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
void OnParsedData(ui64 numberRows) override {
LOG_ROW_DISPATCHER_TRACE("Got parsed data, number rows: " << numberRows);

Self.ParsedData.assign(ParerSchema.size(), nullptr);
Self.ParsedData.assign(ParerSchema.size(), std::span<NYql::NUdf::TUnboxedValue>());
for (size_t i = 0; i < ParerSchema.size(); ++i) {
auto columnStatus = Self.Parser->GetParsedColumn(i);
if (Y_LIKELY(columnStatus.IsSuccess())) {
Expand Down Expand Up @@ -221,17 +221,31 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Client->StartClientSession();
}

private:
void OnWatermark(const NYql::NUdf::TUnboxedValue& rowIdValue, const NYql::NUdf::TUnboxedValue& maybeWatermark) {
if (!maybeWatermark) {
return;
}
auto rowId = rowIdValue.Get<ui64>();
Offset = Self.Offsets->at(rowId);
auto watermark = TInstant::MicroSeconds(maybeWatermark.Get<ui64>());
if (Watermark < watermark) {
Watermark = watermark;
}
LOG_ROW_DISPATCHER_TRACE("OnWatermark, row id: " << rowId << ", watermark: " << watermark);
}

public:
void OnData(const NYql::NUdf::TUnboxedValue* value) override {
ui64 rowId;
TMaybe<ui64> watermarkUs;
if (value->IsEmbedded()) {
rowId = value->Get<ui64>();
} else if (value->IsBoxed()) {
if (value->GetListLength() == 1) {
rowId = value->GetElement(0).Get<ui64>();
} else if (value->GetListLength() == 2) {
rowId = value->GetElement(0).Get<ui64>();
watermarkUs = value->GetElement(1).Get<ui64>();
OnWatermark(value->GetElement(0), value->GetElement(1));
return;
} else {
Y_ENSURE(false, "Unexpected output schema size");
}
Expand All @@ -246,23 +260,18 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

FilteredOffsets.insert(Offset);
if (watermarkUs) {
WatermarksUs.push_back(*watermarkUs);

const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe<TInstant>{TInstant::MicroSeconds(WatermarksUs.back())};
LOG_ROW_DISPATCHER_TRACE("OnData, row id: " << rowId << ", offset: " << Offset << ", watermark: " << watermark);

return;
}

Y_DEFER {
// Values allocated on parser allocator and should be released
FilteredRow.assign(Columns.size(), NYql::NUdf::TUnboxedValue());
};

for (size_t i = 0; const ui64 columnId : ColumnsIds) {
auto& parsedData = Self.ParsedData[Self.ParserSchemaIndex[columnId]];
Y_DEBUG_ABORT_UNLESS(parsedData.size() > rowId);

// All data was locked in parser, so copy is safe
FilteredRow[i++] = Self.ParsedData[Self.ParserSchemaIndex[columnId]]->at(rowId);
FilteredRow[i++] = parsedData[rowId];
}
DataPacker->AddWideItem(FilteredRow.data(), FilteredRow.size());

Expand All @@ -272,7 +281,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void OnBatchFinish() override {
if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && WatermarksUs.empty()) {
if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && !Watermark) {
return;
}
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && Offset < *nextOffset) {
Expand All @@ -282,11 +291,10 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

const auto numberRows = NewNumberRows - NumberRows;
const auto rowSize = NewDataPackerSize - DataPackerSize;
const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe<TInstant>{TInstant::MicroSeconds(WatermarksUs.back())};

LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << watermark);
LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark);

Client->AddDataToClient(Offset, numberRows, rowSize, watermark);
Client->AddDataToClient(Offset, numberRows, rowSize, Watermark);

NumberRows = NewNumberRows;
DataPackerSize = NewDataPackerSize;
Expand Down Expand Up @@ -315,15 +323,18 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void FinishPacking() {
if (!DataPacker->IsEmpty() || !WatermarksUs.empty()) {
if (!DataPacker->IsEmpty() || !Watermark.Empty()) {
LOG_ROW_DISPATCHER_TRACE("FinishPacking, batch size: " << DataPackerSize << ", number rows: " << FilteredOffsets.size());
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), FilteredOffsets, WatermarksUs);
if (FilteredOffsets.empty()) {
FilteredOffsets.emplace(Offset);
}
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), std::move(FilteredOffsets), Watermark);
NumberRows = 0;
NewNumberRows = 0;
DataPackerSize = 0;
NewDataPackerSize = 0;
FilteredOffsets.clear();
WatermarksUs.clear();
Watermark.Clear();
}
}

Expand All @@ -345,7 +356,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
TVector<NYql::NUdf::TUnboxedValue> FilteredRow; // Temporary value holder for DataPacket
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataPacker;
TSet<ui64> FilteredOffsets; // Offsets of current batch in DataPacker
TVector<ui64> WatermarksUs;
TMaybe<TInstant> Watermark;
TQueue<TDataBatch> ClientData;
};

Expand Down Expand Up @@ -653,7 +664,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

// Parsed data
const TVector<ui64>* Offsets;
TVector<const TVector<NYql::NUdf::TUnboxedValue>*> ParsedData;
TVector<std::span<NYql::NUdf::TUnboxedValue>> ParsedData;
bool RefreshScheduled = false;

// Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class IClientDataConsumer : public TThrRefBase {
struct TDataBatch {
TRope SerializedData;
TSet<ui64> Offsets;
TVector<ui64> WatermarksUs;
TMaybe<TInstant> Watermark;
};

class ITopicFormatHandler : public TNonCopyable {
Expand Down
Loading
Loading