From f9bca2b618f0a76f3ac10d1232c8cf89b3c8f555 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 26 Sep 2025 12:56:59 +0300 Subject: [PATCH 1/4] YQ-4723 Use own driver in leader election (#25770) --- .../libs/row_dispatcher/leader_election.cpp | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp index 5a4d5f9b1a49..4e8aae9cee1f 100644 --- a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp +++ b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp @@ -9,10 +9,13 @@ #include #include #include +#include #include #include +#include + namespace NFq { using namespace NActors; @@ -83,7 +86,11 @@ struct TLeaderElectionMetrics { ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged; }; -class TLeaderElection: public TActorBootstrapped { +struct TActorSystemPtrMixin { + NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr = std::make_shared(nullptr); +}; + +class TLeaderElection: public TActorBootstrapped, public TActorSystemPtrMixin { enum class EState { Init, @@ -93,8 +100,8 @@ class TLeaderElection: public TActorBootstrapped { Started }; NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig Config; - const NKikimr::TYdbCredentialsProviderFactory& CredentialsProviderFactory; - NYdb::TDriver Driver; + NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; + std::unique_ptr Driver; TYdbConnectionPtr YdbConnection; TString TablePathPrefix; const TString TenantId; @@ -165,6 +172,7 @@ class TLeaderElection: public TActorBootstrapped { void ProcessState(); void ResetState(); void SetTimeout(); + NYdb::TDriverConfig GetYdbDriverConfig() const; }; TLeaderElection::TLeaderElection( @@ -172,13 +180,11 @@ TLeaderElection::TLeaderElection( NActors::TActorId coordinatorId, const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, - NYdb::TDriver driver, + NYdb::TDriver /*driver*/, const TString& tenant, const ::NMonitoring::TDynamicCounterPtr& counters) : Config(config) , CredentialsProviderFactory(credentialsProviderFactory) - , Driver(driver) - , YdbConnection(config.GetLocalMode() ? nullptr : NewYdbConnection(config.GetDatabase(), credentialsProviderFactory, Driver)) , TablePathPrefix(JoinPath(config.GetDatabase().GetDatabase(), config.GetCoordinationNodePath())) , TenantId(JoinSeq(":", NKikimr::SplitPath(tenant))) , CoordinationNodePath(JoinPath(TablePathPrefix, TenantId)) @@ -218,6 +224,9 @@ TYdbSdkRetryPolicy::TPtr MakeSchemaRetryPolicy() { void TLeaderElection::Bootstrap() { Become(&TLeaderElection::StateFunc); + Y_ABORT_UNLESS(!ActorSystemPtr->load(std::memory_order_relaxed), "Double ActorSystemPtr init"); + ActorSystemPtr->store(TActivationContext::ActorSystem(), std::memory_order_relaxed); + LogPrefix = "TLeaderElection " + SelfId().ToString() + " "; LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped, local coordinator id " << CoordinatorId.ToString() << ", tenant id " << TenantId << ", local mode " << Config.GetLocalMode() << ", coordination node path " << CoordinationNodePath); @@ -225,6 +234,9 @@ void TLeaderElection::Bootstrap() { TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(CoordinatorId, 0)); return; } + + Driver = std::make_unique(GetYdbDriverConfig()); + YdbConnection = NewYdbConnection(Config.GetDatabase(), CredentialsProviderFactory, *Driver); ProcessState(); } @@ -469,6 +481,13 @@ void TLeaderElection::HandleException(const std::exception& e) { ResetState(); } +NYdb::TDriverConfig TLeaderElection::GetYdbDriverConfig() const { + NYdb::TDriverConfig cfg; + cfg.SetDiscoveryMode(NYdb::EDiscoveryMode::Async); + cfg.SetLog(std::make_unique(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK)); + return cfg; +} + } // anonymous namespace //////////////////////////////////////////////////////////////////////////////// From a0860b0ecd12b04aa99e88e8e548d611b277f615 Mon Sep 17 00:00:00 2001 From: yumkam Date: Fri, 26 Sep 2025 16:49:56 +0300 Subject: [PATCH 2/4] rd + watermarks: transfer single watermark per batch (#25209) --- .../filters/purecalc_filter.cpp | 21 ++---- .../format_handler/format_handler.cpp | 46 +++++++------ .../format_handler/format_handler.h | 2 +- .../format_handler/ut/format_handler_ut.cpp | 68 +++++++++++++------ .../fq/libs/row_dispatcher/topic_session.cpp | 6 +- 5 files changed, 86 insertions(+), 57 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp index a0844d0aa72a..b83b75a51095 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp @@ -21,6 +21,10 @@ NYT::TNode CreateTypeNode(NYT::TNode&& typeNode) { return CreateNamedNode("DataType", std::move(typeNode)); } +NYT::TNode CreateOptionalTypeNode(NYT::TNode&& typeNode) { + return CreateNamedNode("OptionalType", std::move(typeNode)); +} + NYT::TNode CreateStructTypeNode(NYT::TNode&& membersNode) { return CreateNamedNode("StructType", std::move(membersNode)); } @@ -70,7 +74,7 @@ NYT::TNode MakeWatermarkOutputSchema() { return CreateStructTypeNode( NYT::TNode::CreateList() .Add(CreateFieldNode(OFFSET_FIELD_NAME, CreateTypeNode("Uint64"))) - .Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateTypeNode("Timestamp"))) + .Add(CreateFieldNode(WATERMARK_FIELD_NAME, CreateOptionalTypeNode(CreateTypeNode("Timestamp")))) ); } @@ -456,21 +460,10 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable TStringBuilder sb; sb << R"(PRAGMA config.flags("LLVM", ")" << (settings.EnabledLLVM ? "ON" : "OFF") << R"(");)" << '\n'; - sb << "$input =" - << " SELECT " - << OFFSET_FIELD_NAME << ", " - << watermarkExpr << " AS " << WATERMARK_FIELD_NAME - << " FROM Input;\n"; - sb << "$output =" - << " SELECT " - << OFFSET_FIELD_NAME << ", " - << WATERMARK_FIELD_NAME - << " FROM $input" - << " WHERE " << WATERMARK_FIELD_NAME << " IS NOT NULL;\n"; sb << "SELECT " << OFFSET_FIELD_NAME << ", " - << "Unwrap(" << WATERMARK_FIELD_NAME << ") AS " << WATERMARK_FIELD_NAME - << " FROM $output;\n"; + << watermarkExpr << " AS " << WATERMARK_FIELD_NAME + << " FROM Input;\n"; TString result = sb; LOG_ROW_DISPATCHER_DEBUG("Generated sql:\n" << result); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index 699d896fada7..80d980f5f202 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -221,17 +221,31 @@ class TTopicFormatHandler : public NActors::TActor, public Client->StartClientSession(); } + private: + void OnWatermark(const NYql::NUdf::TUnboxedValue& rowIdValue, const NYql::NUdf::TUnboxedValue& maybeWatermark) { + if (!maybeWatermark) { + return; + } + auto rowId = rowIdValue.Get(); + Offset = Self.Offsets->at(rowId); + auto watermark = TInstant::MicroSeconds(maybeWatermark.Get()); + if (Watermark < watermark) { + Watermark = watermark; + } + LOG_ROW_DISPATCHER_TRACE("OnWatermark, row id: " << rowId << ", watermark: " << watermark); + } + + public: void OnData(const NYql::NUdf::TUnboxedValue* value) override { ui64 rowId; - TMaybe watermarkUs; if (value->IsEmbedded()) { rowId = value->Get(); } else if (value->IsBoxed()) { if (value->GetListLength() == 1) { rowId = value->GetElement(0).Get(); } else if (value->GetListLength() == 2) { - rowId = value->GetElement(0).Get(); - watermarkUs = value->GetElement(1).Get(); + OnWatermark(value->GetElement(0), value->GetElement(1)); + return; } else { Y_ENSURE(false, "Unexpected output schema size"); } @@ -246,14 +260,6 @@ class TTopicFormatHandler : public NActors::TActor, public } FilteredOffsets.insert(Offset); - if (watermarkUs) { - WatermarksUs.push_back(*watermarkUs); - - const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe{TInstant::MicroSeconds(WatermarksUs.back())}; - LOG_ROW_DISPATCHER_TRACE("OnData, row id: " << rowId << ", offset: " << Offset << ", watermark: " << watermark); - - return; - } Y_DEFER { // Values allocated on parser allocator and should be released @@ -272,7 +278,7 @@ class TTopicFormatHandler : public NActors::TActor, public } void OnBatchFinish() override { - if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && WatermarksUs.empty()) { + if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && !Watermark) { return; } if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && Offset < *nextOffset) { @@ -282,11 +288,10 @@ class TTopicFormatHandler : public NActors::TActor, public const auto numberRows = NewNumberRows - NumberRows; const auto rowSize = NewDataPackerSize - DataPackerSize; - const auto watermark = WatermarksUs.empty() ? Nothing() : TMaybe{TInstant::MicroSeconds(WatermarksUs.back())}; - LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << watermark); + LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark); - Client->AddDataToClient(Offset, numberRows, rowSize, watermark); + Client->AddDataToClient(Offset, numberRows, rowSize, Watermark); NumberRows = NewNumberRows; DataPackerSize = NewDataPackerSize; @@ -315,15 +320,18 @@ class TTopicFormatHandler : public NActors::TActor, public } void FinishPacking() { - if (!DataPacker->IsEmpty() || !WatermarksUs.empty()) { + if (!DataPacker->IsEmpty() || !Watermark.Empty()) { LOG_ROW_DISPATCHER_TRACE("FinishPacking, batch size: " << DataPackerSize << ", number rows: " << FilteredOffsets.size()); - ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), FilteredOffsets, WatermarksUs); + if (FilteredOffsets.empty()) { + FilteredOffsets.emplace(Offset); + } + ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), std::move(FilteredOffsets), Watermark); NumberRows = 0; NewNumberRows = 0; DataPackerSize = 0; NewDataPackerSize = 0; FilteredOffsets.clear(); - WatermarksUs.clear(); + Watermark.Clear(); } } @@ -345,7 +353,7 @@ class TTopicFormatHandler : public NActors::TActor, public TVector FilteredRow; // Temporary value holder for DataPacket std::unique_ptr> DataPacker; TSet FilteredOffsets; // Offsets of current batch in DataPacker - TVector WatermarksUs; + TMaybe Watermark; TQueue ClientData; }; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h index bf400909731d..266aadaa3a63 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h @@ -38,7 +38,7 @@ class IClientDataConsumer : public TThrRefBase { struct TDataBatch { TRope SerializedData; TSet Offsets; - TVector WatermarksUs; + TMaybe Watermark; }; class ITopicFormatHandler : public TNonCopyable { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index 47eea0fb2e69..6788e996a689 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -11,7 +11,7 @@ class TFormatHandlerFixture : public TBaseFixture { using TCallback = std::function&&)>; struct TMessages { TVector Offsets; - TVector Watermark; + TMaybe Watermark; TBatch Batch; }; @@ -555,14 +555,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { auto messages = TVector{ { {firstOffset + 2, firstOffset + 3}, - {39'000'000, 40'000'000}, + TInstant::Seconds(40), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:44Z")) .AddRow(TRow().AddString("1970-01-01T00:00:45Z")) }, { {firstOffset + 4, firstOffset + 5}, - {41'000'000, 42'000'000}, + TInstant::Seconds(42), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:46Z")) .AddRow(TRow().AddString("1970-01-01T00:00:47Z")) @@ -584,18 +584,33 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { messages = TVector{ { {firstOffset + 4, firstOffset + 5}, - {41'000'000, 42'000'000}, + TInstant::Seconds(42), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:46Z")) .AddRow(TRow().AddString("1970-01-01T00:00:47Z")) }, { {firstOffset + 6, firstOffset + 7}, - {43'000'000, 44'000'000}, + TInstant::Seconds(44), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:48Z")) .AddRow(TRow().AddString("1970-01-01T00:00:49Z")) }, + { + {firstOffset + 60, firstOffset + 70}, + Nothing(), + TBatch() + .AddRow(TRow().AddString("1970-01-01T00:00:01Z")) // watermark = NULL + .AddRow(TRow().AddString("1970-01-01T00:00:02Z")) // watermark = NULL + }, + { + {firstOffset + 600, firstOffset + 700, firstOffset + 800}, + TInstant::Seconds(0), + TBatch() + .AddRow(TRow().AddString("1970-01-01T00:00:03Z")) // watermark = NULL + .AddRow(TRow().AddString("1970-01-01T00:00:05Z")) + .AddRow(TRow().AddString("1970-01-01T00:00:04Z")) // watermark = NULL + }, }; CheckSuccess(MakeClient( {{"ts", "[DataType; String]"}}, @@ -617,6 +632,17 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { GetMessage(firstOffset + 7, R"({"ts": "1970-01-01T00:00:49Z"})"), }); + ParseMessages({ + GetMessage(firstOffset + 60, R"({"ts": "1970-01-01T00:00:01Z"})"), + GetMessage(firstOffset + 70, R"({"ts": "1970-01-01T00:00:02Z"})"), + }); + + ParseMessages({ + GetMessage(firstOffset + 600, R"({"ts": "1970-01-01T00:00:03Z"})"), + GetMessage(firstOffset + 700, R"({"ts": "1970-01-01T00:00:05Z"})"), + GetMessage(firstOffset + 800, R"({"ts": "1970-01-01T00:00:04Z"})"), + }); + RemoveClient(ClientIds[1]); ParseMessages({ @@ -635,14 +661,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { auto messages = TVector{ { - {firstOffset + 2, firstOffset + 3}, - {39'000'000, 40'000'000}, + {firstOffset + 2}, + TInstant::Seconds(40), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:44Z").AddUint64(1)) }, { - {firstOffset + 4, firstOffset + 5}, - {41'000'000, 42'000'000}, + {firstOffset + 4}, + TInstant::Seconds(42), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1)) }, @@ -662,14 +688,14 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { messages = TVector{ { - {firstOffset + 4, firstOffset + 5}, - {41'000'000, 42'000'000}, + {firstOffset + 4}, + TInstant::Seconds(42), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1)) }, { - {firstOffset + 6, firstOffset + 7}, - {43'000'000, 44'000'000}, + {firstOffset + 6}, + TInstant::Seconds(44), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:48Z").AddUint64(1)) }, @@ -712,13 +738,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { auto messages = TVector{ { - {firstOffset + 2, firstOffset + 3}, - {39'000'000, 40'000'000}, + {firstOffset + 3}, + TInstant::Seconds(40), TBatch() }, { - {firstOffset + 4, firstOffset + 5}, - {41'000'000, 42'000'000}, + {firstOffset + 5}, + TInstant::Seconds(42), TBatch() }, }; @@ -737,13 +763,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { messages = TVector{ { - {firstOffset + 4, firstOffset + 5}, - {41'000'000, 42'000'000}, + {firstOffset + 5}, + TInstant::Seconds(42), TBatch() }, { - {firstOffset + 6, firstOffset + 7}, - {43'000'000, 44'000'000}, + {firstOffset + 7}, + TInstant::Seconds(44), TBatch() }, }; diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 10116ac3b3e0..8fcb960f5925 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -697,7 +697,7 @@ void TTopicSession::SendData(TClientsInfo& info) { ui64 batchSize = 0; while (!buffer.empty()) { - auto [serializedData, offsets, watermarksUs] = std::move(buffer.front()); + auto [serializedData, offsets, watermark] = std::move(buffer.front()); Y_ENSURE(!offsets.empty(), "Expected non empty message batch"); buffer.pop(); @@ -706,7 +706,9 @@ void TTopicSession::SendData(TClientsInfo& info) { NFq::NRowDispatcherProto::TEvMessage message; message.SetPayloadId(event->AddPayload(std::move(serializedData))); message.MutableOffsets()->Assign(offsets.begin(), offsets.end()); - message.MutableWatermarksUs()->Assign(watermarksUs.begin(), watermarksUs.end()); + if (watermark) { + message.AddWatermarksUs(watermark->MicroSeconds()); + } event->Record.AddMessages()->CopyFrom(std::move(message)); event->Record.SetNextMessageOffset(*offsets.rbegin() + 1); From 8ca9017ad81b379afee3a676e655f64889aebcbe Mon Sep 17 00:00:00 2001 From: yumkam Date: Fri, 26 Sep 2025 19:00:32 +0300 Subject: [PATCH 3/4] pq rd: get rid of New watermark (empty maybe) log spam (#25882) --- .../yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 509201c7ba9f..7cb47fdcb6f1 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -1148,8 +1148,10 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) newWatermark = *maybeNewWatermark; } - SRC_LOG_D("SessionId: " << GetSessionId() << " New watermark " << newWatermark << " was generated"); - activeBatch.Watermark = newWatermark; + if (newWatermark) { + SRC_LOG_D("SessionId: " << GetSessionId() << " New watermark " << newWatermark << " was generated"); + activeBatch.Watermark = newWatermark; + } } Parent->NotifyCA(); From cf11b9d0a03a0a6bd30fd316eedef879fbcd2c6a Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Mon, 29 Sep 2025 22:30:51 +0500 Subject: [PATCH 4/4] YQ-4736 optimized json parser memory usage (#25907) --- .../format_handler/filters/filters_set.cpp | 8 +- .../format_handler/filters/filters_set.h | 2 +- .../filters/purecalc_filter.cpp | 9 +- .../format_handler/filters/purecalc_filter.h | 2 +- .../format_handler/format_handler.cpp | 9 +- .../format_handler/parsers/json_parser.cpp | 77 +++++++---- .../format_handler/parsers/json_parser.h | 2 +- .../format_handler/parsers/parser_abstract.h | 2 +- .../format_handler/parsers/raw_parser.cpp | 4 +- .../format_handler/ut/topic_filter_ut.cpp | 18 +-- .../format_handler/ut/topic_parser_ut.cpp | 124 +++++++++--------- 11 files changed, 141 insertions(+), 116 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp index a34c6c0bd9c6..bdbd49831d75 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp @@ -30,7 +30,7 @@ class TTopicFilters : public ITopicFilters { , Counters_(std::move(counters)) {} - void ProcessData(const TVector& columnIndex, const TVector& offsets, const TVector*>& values, ui64 numberRows) override { + void ProcessData(const TVector& columnIndex, const TVector& offsets, const TVector>& values, ui64 numberRows) override { LOG_ROW_DISPATCHER_TRACE("ProcessData for " << RunHandlers_.size() << " clients, number rows: " << numberRows); if (!numberRows) { @@ -220,18 +220,18 @@ class TTopicFilters : public ITopicFilters { RunHandlers_.erase(iter); } - void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector& /* offsets */, const TVector& columnIndex, const TVector*>& values, ui64 numberRows) { + void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector& /* offsets */, const TVector& columnIndex, const TVector>& values, ui64 numberRows) { const auto consumer = programRunHandler->GetConsumer(); const auto& columnIds = consumer->GetColumnIds(); - TVector*> result; + TVector> result; result.reserve(columnIds.size()); for (ui64 columnId : columnIds) { Y_ENSURE(columnId < columnIndex.size(), "Unexpected column id " << columnId << ", it is larger than index array size " << columnIndex.size()); const ui64 index = columnIndex[columnId]; Y_ENSURE(index < values.size(), "Unexpected column index " << index << ", it is larger than values array size " << values.size()); - if (const auto value = values[index]) { + if (const auto value = values[index]; !value.empty()) { result.emplace_back(value); } else { LOG_ROW_DISPATCHER_TRACE("Ignore processing for " << consumer->GetClientId() << ", client got parsing error for column " << columnId); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h index 74ef42b0f3ed..eaeacd63d5bb 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h @@ -15,7 +15,7 @@ class ITopicFilters : public TThrRefBase, public TNonCopyable { public: // columnIndex - mapping from stable column id to index in values array - virtual void ProcessData(const TVector& columnIndex, const TVector& offsets, const TVector*>& values, ui64 numberRows) = 0; + virtual void ProcessData(const TVector& columnIndex, const TVector& offsets, const TVector>& values, ui64 numberRows) = 0; virtual void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr& ev) = 0; virtual TStatus AddPrograms(IProcessedDataConsumer::TPtr consumer, std::unordered_map programHolders) = 0; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp index b83b75a51095..44c147fc662d 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp @@ -79,7 +79,7 @@ NYT::TNode MakeWatermarkOutputSchema() { } struct TInputType { - const TVector*>& Values; + const TVector>& Values; ui64 NumberRows; }; @@ -156,8 +156,9 @@ class TInputConsumer : public NYql::NPureCalc::IConsumer { items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(rowId); - for (ui64 fieldId = 0; const auto column : input.Values) { - items[FieldsPositions[fieldId++]] = column->at(rowId); + for (ui64 fieldId = 0; const auto& column : input.Values) { + Y_DEBUG_ABORT_UNLESS(column.size() > rowId); + items[FieldsPositions[fieldId++]] = column[rowId]; } Worker->Push(std::move(result)); @@ -419,7 +420,7 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable ActiveFilters_->Dec(); } - void ProcessData(const TVector*>& values, ui64 numberRows) const override { + void ProcessData(const TVector>& values, ui64 numberRows) const override { LOG_ROW_DISPATCHER_TRACE("ProcessData for " << numberRows << " rows"); if (!ProgramHolder_) { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.h b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.h index 9c0c69353f72..d7971aad4b96 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.h @@ -73,7 +73,7 @@ class IProgramRunHandler : public TThrRefBase { return ProgramHolder_; } - virtual void ProcessData(const TVector*>& values, ui64 numberRows) const = 0; + virtual void ProcessData(const TVector>& values, ui64 numberRows) const = 0; protected: TString Name_; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index 80d980f5f202..e7f8c1949dd1 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -79,7 +79,7 @@ class TTopicFormatHandler : public NActors::TActor, public void OnParsedData(ui64 numberRows) override { LOG_ROW_DISPATCHER_TRACE("Got parsed data, number rows: " << numberRows); - Self.ParsedData.assign(ParerSchema.size(), nullptr); + Self.ParsedData.assign(ParerSchema.size(), std::span()); for (size_t i = 0; i < ParerSchema.size(); ++i) { auto columnStatus = Self.Parser->GetParsedColumn(i); if (Y_LIKELY(columnStatus.IsSuccess())) { @@ -267,8 +267,11 @@ class TTopicFormatHandler : public NActors::TActor, public }; for (size_t i = 0; const ui64 columnId : ColumnsIds) { + auto& parsedData = Self.ParsedData[Self.ParserSchemaIndex[columnId]]; + Y_DEBUG_ABORT_UNLESS(parsedData.size() > rowId); + // All data was locked in parser, so copy is safe - FilteredRow[i++] = Self.ParsedData[Self.ParserSchemaIndex[columnId]]->at(rowId); + FilteredRow[i++] = parsedData[rowId]; } DataPacker->AddWideItem(FilteredRow.data(), FilteredRow.size()); @@ -661,7 +664,7 @@ class TTopicFormatHandler : public NActors::TActor, public // Parsed data const TVector* Offsets; - TVector*> ParsedData; + TVector> ParsedData; bool RefreshScheduled = false; // Metrics diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp index cdc7f7152d9d..10844d2e862b 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp @@ -26,7 +26,7 @@ namespace { if (Y_UNLIKELY(error)) \ struct TJsonParserBuffer { - size_t NumberValues = 0; + ui16 NumberValues = 0; bool Finished = false; TInstant CreationStartTime = TInstant::Now(); TVector Offsets = {}; @@ -84,20 +84,21 @@ class TColumnParser { TString TypeYson; public: - TStatus InitParser(const TString& name, const TString& typeYson, ui64 maxNumberRows, const NKikimr::NMiniKQL::TType* typeMkql) { + TStatus InitParser(const TString& name, const TString& typeYson, std::span parsedRows, const NKikimr::NMiniKQL::TType* typeMkql) { Name = name; TypeYson = typeYson; IsOptional = false; Status = TStatus::Success(); - - if (2 * ParsedRows.capacity() < maxNumberRows) { - ParsedRows.reserve(maxNumberRows); - } - + ParsedRowsCount = 0; + ParsedRows = parsedRows; return Status = ExtractDataSlot(typeMkql); } - const TVector& GetParsedRows() const { + ui16 GetParsedRowsCount() const { + return ParsedRowsCount; + } + + const std::span& GetParsedRows() const { return ParsedRows; } @@ -105,11 +106,11 @@ class TColumnParser { return Status; } - void ParseJsonValue(ui64 offset, ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + void ParseJsonValue(ui64 offset, ui16 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { if (Y_UNLIKELY(Status.IsFail())) { return; } - ParsedRows.emplace_back(rowId); + ParsedRows[ParsedRowsCount++] = rowId; if (DataSlot != NYql::NUdf::EDataSlot::Json) { ParseDataType(std::move(jsonValue), resultValue, Status); @@ -126,17 +127,17 @@ class TColumnParser { } } - void ValidateNumberValues(size_t expectedNumberValues, const TVector& offsets) { + void ValidateNumberValues(ui16 expectedNumberValues, const TVector& offsets) { if (Status.IsFail()) { return; } - if (Y_UNLIKELY(!IsOptional && ParsedRows.size() < expectedNumberValues)) { - Status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values in non optional column '" << Name << "' with type " << TypeYson << ", buffered offsets: " << JoinSeq(' ' , offsets)); + if (Y_UNLIKELY(!IsOptional && ParsedRowsCount < expectedNumberValues)) { + Status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRowsCount << " missing values in non optional column '" << Name << "' with type " << TypeYson << ", buffered offsets: " << JoinSeq(' ' , offsets)); } } void ClearParsedRows() { - ParsedRows.clear(); + ParsedRowsCount = 0; Status = TStatus::Success(); } @@ -318,7 +319,8 @@ class TColumnParser { TString DataTypeName; bool IsOptional = false; - TVector ParsedRows; + ui16 ParsedRowsCount = 0; + std::span ParsedRows; TStatus Status = TStatus::Success(); }; @@ -327,6 +329,9 @@ class TJsonParser : public TTopicParserBase { using TBase = TTopicParserBase; using TPtr = TIntrusivePtr; + static constexpr ui64 NUMBER_ROWS_LIMIT = 1000; + static_assert(NUMBER_ROWS_LIMIT <= Max()); + public: TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters) : TBase(std::move(consumer), __LOCATION__, config.FunctionRegistry, counters) @@ -343,6 +348,10 @@ class TJsonParser : public TTopicParserBase { TStatus InitColumnsParsers() { const auto& consumerColumns = Consumer->GetColumns(); + + ParsedRowsIdxBuffer.resize(consumerColumns.size() * MaxNumberRows); + const std::span parsedRowsIdxSpan(ParsedRowsIdxBuffer); + Columns.resize(consumerColumns.size()); for (ui64 i = 0; i < consumerColumns.size(); ++i) { const auto& name = consumerColumns[i].Name; @@ -352,10 +361,11 @@ class TJsonParser : public TTopicParserBase { return TStatus(typeStatus).AddParentIssue(TStringBuilder() << "Failed to parse column '" << name << "' type " << typeYson); } - if (auto status = Columns[i].InitParser(name, typeYson, MaxNumberRows, typeStatus.DetachResult()); status.IsFail()) { + if (auto status = Columns[i].InitParser(name, typeYson, parsedRowsIdxSpan.subspan(i * MaxNumberRows, MaxNumberRows), typeStatus.DetachResult()); status.IsFail()) { return status.AddParentIssue(TStringBuilder() << "Failed to create parser for column '" << name << "' with type " << typeYson); } } + return TStatus::Success(); } @@ -413,11 +423,11 @@ class TJsonParser : public TTopicParserBase { return Buffer.Offsets; } - TValueStatus*> GetParsedColumn(ui64 columnId) const override { + TValueStatus> GetParsedColumn(ui64 columnId) override { if (auto status = Columns[columnId].GetStatus(); status.IsFail()) { return status; } - return &ParsedValues[columnId]; + return ParsedValues[columnId]; } protected: @@ -445,7 +455,7 @@ class TJsonParser : public TTopicParserBase { return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets())); } - size_t rowId = 0; + ui16 rowId = 0; for (auto document : documents) { if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) { return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets())); @@ -486,11 +496,17 @@ class TJsonParser : public TTopicParserBase { void ClearBuffer() override { for (size_t i = 0; i < Columns.size(); ++i) { auto& parsedColumn = ParsedValues[i]; - for (size_t rowId : Columns[i].GetParsedRows()) { - ClearObject(parsedColumn[rowId]); + + auto& column = Columns[i]; + const auto parsedRows = column.GetParsedRows(); + const auto parsedRowsCount = column.GetParsedRowsCount(); + for (ui16 rowId = 0; rowId < parsedRowsCount; ++rowId) { + ClearObject(parsedColumn[parsedRows[rowId]]); } - Columns[i].ClearParsedRows(); + + column.ClearParsedRows(); } + Buffer.Clear(); } @@ -507,27 +523,32 @@ class TJsonParser : public TTopicParserBase { ColumnsIndex.emplace(std::string_view(consumerColumns[i].Name), i); } + ParsedValuesBuffer.resize(consumerColumns.size() * MaxNumberRows); + const std::span valuesBufferSpan(ParsedValuesBuffer); + ParsedValues.resize(consumerColumns.size()); - for (auto& parseBuffer : ParsedValues) { - parseBuffer.resize(MaxNumberRows); + for (ui64 i = 0; i < consumerColumns.size(); ++i) { + ParsedValues[i] = valuesBufferSpan.subspan(i * MaxNumberRows, MaxNumberRows); } } - ui64 CalculateMaxNumberRows() const { - return (Config.BufferCellCount - 1) / Consumer->GetColumns().size() + 1; + ui16 CalculateMaxNumberRows() const { + return std::min((Config.BufferCellCount - 1) / Consumer->GetColumns().size() + 1, NUMBER_ROWS_LIMIT); } private: const TJsonParserConfig Config; - ui64 MaxNumberRows = 0; + ui16 MaxNumberRows = 0; const TString LogPrefix; TVector Columns; + TVector ParsedRowsIdxBuffer; absl::flat_hash_map ColumnsIndex; TJsonParserBuffer Buffer; simdjson::ondemand::parser Parser; - TVector> ParsedValues; + TVector ParsedValuesBuffer; + TVector> ParsedValues; }; } // anonymous namespace diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h index e078ac43e884..adc4dd474b5f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h @@ -18,7 +18,7 @@ struct TJsonParserConfig { const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; ui64 BatchSize = 1_MB; TDuration LatencyLimit; - ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit, amount memory size is O(BufferCellCount * log(BufferCellCount)) + ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit }; TValueStatus CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_abstract.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_abstract.h index c2cc1dfe0e0a..253c2c41af7f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_abstract.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_abstract.h @@ -30,7 +30,7 @@ class ITopicParser : public TThrRefBase, public TNonCopyable { virtual TStatus ChangeConsumer(IParsedDataConsumer::TPtr consumer) = 0; virtual const TVector& GetOffsets() const = 0; - virtual TValueStatus*> GetParsedColumn(ui64 columnId) const = 0; + virtual TValueStatus> GetParsedColumn(ui64 columnId) = 0; virtual void FillStatistics(TFormatHandlerStatistic& statistic) = 0; }; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp index 0283a3458b8b..076f947df226 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp @@ -83,9 +83,9 @@ class TRawParser : public TTopicParserBase { return Offsets; } - TValueStatus*> GetParsedColumn(ui64 columnId) const override { + TValueStatus> GetParsedColumn(ui64 columnId) override { Y_ENSURE(columnId == 0, "Invalid column id for raw parser"); - return &ParsedColumn; + return std::span(ParsedColumn); } protected: diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp index 29c3052961b6..3f4b5c8a996f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp @@ -172,41 +172,41 @@ class TFilterFixture : public TBaseFixture { Consumer.Reset(); } - void Push(const TVector*>& values, ui64 numberRows = 0) { + void Push(const TVector>& values, ui64 numberRows = 0) { for (const auto& [name, runHandler] : RunHandlers) { - runHandler->ProcessData(values, numberRows ? numberRows : values.front()->size()); + runHandler->ProcessData(values, numberRows ? numberRows : values.front().size()); } if (Consumer) { Consumer->OnBatchFinish(); } } - const TVector* MakeVector(size_t size, std::function valueCreator) { + std::span MakeVector(size_t size, std::function valueCreator) { with_lock (Alloc) { auto& holder = Holders.emplace_front(); for (size_t i = 0; i < size; ++i) { holder.emplace_back(LockObject(valueCreator(i))); } - return &holder; + return holder; } } template - const TVector* MakeVector(const TVector& values, bool optional = false) { + std::span MakeVector(const TVector& values, bool optional = false) { return MakeVector(values.size(), [&](size_t i) { NYql::NUdf::TUnboxedValuePod unboxedValue = NYql::NUdf::TUnboxedValuePod(values[i]); return optional ? unboxedValue.MakeOptional() : unboxedValue; }); } - const TVector* MakeStringVector(const TVector& values, bool optional = false) { + std::span MakeStringVector(const TVector& values, bool optional = false) { return MakeVector(values.size(), [&](size_t i) { NYql::NUdf::TUnboxedValuePod stringValue = NKikimr::NMiniKQL::MakeString(values[i]); return optional ? stringValue.MakeOptional() : stringValue; }); } - const TVector* MakeEmptyVector(size_t size) { + std::span MakeEmptyVector(size_t size) { return MakeVector(size, [&](size_t) { return NYql::NUdf::TUnboxedValuePod(); }); @@ -310,8 +310,8 @@ class TFilterSetFixture : public TFilterFixture { Consumer.Reset(); } - void ProcessData(const TVector& columnIndex, const TVector*>& values, ui64 numberRows = 0) { - numberRows = numberRows ? numberRows : values.front()->size(); + void ProcessData(const TVector& columnIndex, const TVector>& values, ui64 numberRows = 0) { + numberRows = numberRows ? numberRows : values.front().size(); FiltersSet->ProcessData(columnIndex, TVector(numberRows, std::numeric_limits::max()), values, numberRows); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp index 7593b4b44c2b..9dce3d661887 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp @@ -11,7 +11,7 @@ class TBaseParserFixture : public TBaseFixture { static constexpr ui64 FIRST_OFFSET = 42; using TBase = TBaseFixture; - using TCallback = std::function*> result)>; + using TCallback = std::function> result)>; class TParsedDataConsumer : public IParsedDataConsumer { public: @@ -60,7 +60,7 @@ class TBaseParserFixture : public TBaseFixture { CurrentOffset++; } - TVector*> result(Columns.size(), nullptr); + TVector> result(Columns.size()); for (ui64 i = 0; i < Columns.size(); ++i) { if (const auto it = ExpectedErrors.find(i); it != ExpectedErrors.end()) { CheckError(Self.Parser->GetParsedColumn(i), it->second.first, it->second.second); @@ -118,11 +118,11 @@ class TBaseParserFixture : public TBaseFixture { } TStatus MakeParser(TVector columnNames, TString columnType) { - return MakeParser(columnNames, columnType, [](ui64, TVector*>) {}); + return MakeParser(columnNames, columnType, [](ui64, TVector>) {}); } TStatus MakeParser(TVector columns) { - return MakeParser(columns, [](ui64, TVector*>) {}); + return MakeParser(columns, [](ui64, TVector>) {}); } void PushToParser(ui64 offset, const TString& data) { @@ -185,41 +185,41 @@ class TRawParserFixture : public TBaseParserFixture { Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(Simple1, TJsonParserFixture) { - CheckSuccess(MakeParser({{"a1", "[DataType; String]"}, {"a2", "[OptionalType; [DataType; Uint64]]"}}, [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"a1", "[DataType; String]"}, {"a2", "[OptionalType; [DataType; Uint64]]"}}, [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL(101, result[1]->at(0).GetOptionalValue().Get()); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(101, result[1][0].GetOptionalValue().Get()); })); PushToParser(FIRST_OFFSET, R"({"a1": "hello1", "a2": 101, "event": "event1"})"); } Y_UNIT_TEST_F(Simple2, TJsonParserFixture) { - CheckSuccess(MakeParser({"a2", "a1"}, "[DataType; String]", [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a2", "a1"}, "[DataType; String]", [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); })); PushToParser(FIRST_OFFSET, R"({"a1": "hello1", "a2": "101", "event": "event1"})"); } Y_UNIT_TEST_F(Simple3, TJsonParserFixture) { - CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); })); PushToParser(FIRST_OFFSET,R"({"a2": "hello1", "a1": "101", "event": "event1"})"); } Y_UNIT_TEST_F(Simple4, TJsonParserFixture) { - CheckSuccess(MakeParser({"a2", "a1"}, "[DataType; String]", [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a2", "a1"}, "[DataType; String]", [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[1]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[1][0].AsStringRef())); })); PushToParser(FIRST_OFFSET, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); } @@ -229,11 +229,11 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; - CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0]->at(1).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][1].AsStringRef())); })); const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; @@ -246,12 +246,12 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(ManyValues, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(3, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); for (size_t i = 0; i < numberRows; ++i) { - UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0]->at(i).AsStringRef()), i); - UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1]->at(i).AsStringRef()), i); + UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0][i].AsStringRef()), i); + UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1][i].AsStringRef()), i); } })); @@ -265,20 +265,20 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(MissingFields, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}, {"a2", "[OptionalType; [DataType; Uint64]]"}}, [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}, {"a2", "[OptionalType; [DataType; Uint64]]"}}, [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(3, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); for (size_t i = 0; i < numberRows; ++i) { if (i == 2) { - UNIT_ASSERT_C(!result[0]->at(i), i); + UNIT_ASSERT_C(!result[0][i], i); } else { - NYql::NUdf::TUnboxedValue value = result[0]->at(i).GetOptionalValue(); + NYql::NUdf::TUnboxedValue value = result[0][i].GetOptionalValue(); UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(value.AsStringRef()), i); } if (i == 1) { - UNIT_ASSERT_C(!result[1]->at(i), i); + UNIT_ASSERT_C(!result[1][i], i); } else { - UNIT_ASSERT_VALUES_EQUAL_C(101, result[1]->at(i).GetOptionalValue().Get(), i); + UNIT_ASSERT_VALUES_EQUAL_C(101, result[1][i].GetOptionalValue().Get(), i); } } })); @@ -293,21 +293,21 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(NestedTypes, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({{"nested", "[OptionalType; [DataType; Json]]"}, {"a1", "[DataType; String]"}}, [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"nested", "[OptionalType; [DataType; Json]]"}, {"a1", "[DataType; String]"}}, [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(4, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0]->at(1).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1]->at(1).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0][1].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1][1].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("\"some string\"", TString(result[0]->at(2).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello3", TString(result[1]->at(2).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("\"some string\"", TString(result[0][2].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello3", TString(result[1][2].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("123456", TString(result[0]->at(3).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello4", TString(result[1]->at(3).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("123456", TString(result[0][3].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello4", TString(result[1][3].AsStringRef())); })); Parser->ParseMessages({ @@ -321,12 +321,12 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(SimpleBooleans, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({{"a", "[DataType; Bool]"}}, [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"a", "[DataType; Bool]"}}, [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(true, result[0]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(false, result[0]->at(1).Get()); + UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); })); Parser->ParseMessages({ @@ -338,12 +338,12 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(ChangeParserSchema, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({{"a", "[DataType; Bool]"}}, [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"a", "[DataType; Bool]"}}, [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(true, result[0]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(false, result[0]->at(1).Get()); + UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); })); Parser->ParseMessages({ @@ -354,14 +354,14 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { CheckSuccess(Parser->ChangeConsumer(MakeIntrusive( *this, TVector{{"a", "[DataType; Bool]"}, {"b", "[DataType; Int64]"}}, - [&](ui64 numberRows, TVector*> result) { + [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL(true, result[0]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(false, result[0]->at(1).Get()); - UNIT_ASSERT_VALUES_EQUAL(42, result[1]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(84, result[1]->at(1).Get()); + UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); + UNIT_ASSERT_VALUES_EQUAL(42, result[1][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(84, result[1][1].Get()); } ))); @@ -373,12 +373,12 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { CheckSuccess(Parser->ChangeConsumer(MakeIntrusive( *this, TVector{{"b", "[DataType; Int64]"}}, - [&](ui64 numberRows, TVector*> result) { + [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(42, result[0]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(84, result[0]->at(1).Get()); + UNIT_ASSERT_VALUES_EQUAL(42, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(84, result[0][1].Get()); } ))); @@ -393,10 +393,10 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Config.BufferCellCount = 1; const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; - CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); })); const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; @@ -411,10 +411,10 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Config.BatchSize = 10; const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; - CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); })); const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; @@ -480,11 +480,11 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_SUITE(TestRawParser) { Y_UNIT_TEST_F(Simple, TRawParserFixture) { - CheckSuccess(MakeParser({{"data", "[OptionalType; [DataType; String]]"}}, [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"data", "[OptionalType; [DataType; String]]"}}, [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - NYql::NUdf::TUnboxedValue value = result[0]->at(0).GetOptionalValue(); + NYql::NUdf::TUnboxedValue value = result[0][0].GetOptionalValue(); UNIT_ASSERT_VALUES_EQUAL(R"({"a1": "hello1__large_str", "a2": 101, "event": "event1"})", TString(value.AsStringRef())); })); PushToParser(FIRST_OFFSET, R"({"a1": "hello1__large_str", "a2": 101, "event": "event1"})"); @@ -499,10 +499,10 @@ Y_UNIT_TEST_SUITE(TestRawParser) { ExpectedBatches = data.size(); int i = 0; - CheckSuccess(MakeParser({"a1"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a1"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(data[i], TString(result[0]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(data[i], TString(result[0][0].AsStringRef())); i++; })); @@ -514,11 +514,11 @@ Y_UNIT_TEST_SUITE(TestRawParser) { } Y_UNIT_TEST_F(ChangeParserSchema, TRawParserFixture) { - CheckSuccess(MakeParser({{"data", "[OptionalType; [DataType; String]]"}}, [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"data", "[OptionalType; [DataType; String]]"}}, [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - NYql::NUdf::TUnboxedValue value = result[0]->at(0).GetOptionalValue(); + NYql::NUdf::TUnboxedValue value = result[0][0].GetOptionalValue(); UNIT_ASSERT_VALUES_EQUAL(R"({"a1": "hello1__large_str", "a2": 101, "event": "event1"})", TString(value.AsStringRef())); })); @@ -528,11 +528,11 @@ Y_UNIT_TEST_SUITE(TestRawParser) { CheckSuccess(Parser->ChangeConsumer(MakeIntrusive( *this, TVector{{"data", "[DataType; String]"}}, - [&](ui64 numberRows, TVector*> result) { + [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - NYql::NUdf::TUnboxedValue value = result[0]->at(0); + NYql::NUdf::TUnboxedValue value = result[0][0]; UNIT_ASSERT_VALUES_EQUAL(R"({"a1": "hello2__large_str", "a2": 101, "event": "event2"})", TString(value.AsStringRef())); } )));