@@ -79,7 +79,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
79
79
void OnParsedData (ui64 numberRows) override {
80
80
LOG_ROW_DISPATCHER_TRACE (" Got parsed data, number rows: " << numberRows);
81
81
82
- Self.ParsedData .assign (ParerSchema.size (), nullptr );
82
+ Self.ParsedData .assign (ParerSchema.size (), std::span<NYql::NUdf::TUnboxedValue>() );
83
83
for (size_t i = 0 ; i < ParerSchema.size (); ++i) {
84
84
auto columnStatus = Self.Parser ->GetParsedColumn (i);
85
85
if (Y_LIKELY (columnStatus.IsSuccess ())) {
@@ -221,17 +221,31 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
221
221
Client->StartClientSession ();
222
222
}
223
223
224
+ private:
225
+ void OnWatermark (const NYql::NUdf::TUnboxedValue& rowIdValue, const NYql::NUdf::TUnboxedValue& maybeWatermark) {
226
+ if (!maybeWatermark) {
227
+ return ;
228
+ }
229
+ auto rowId = rowIdValue.Get <ui64>();
230
+ Offset = Self.Offsets ->at (rowId);
231
+ auto watermark = TInstant::MicroSeconds (maybeWatermark.Get <ui64>());
232
+ if (Watermark < watermark) {
233
+ Watermark = watermark;
234
+ }
235
+ LOG_ROW_DISPATCHER_TRACE (" OnWatermark, row id: " << rowId << " , watermark: " << watermark);
236
+ }
237
+
238
+ public:
224
239
void OnData (const NYql::NUdf::TUnboxedValue* value) override {
225
240
ui64 rowId;
226
- TMaybe<ui64> watermarkUs;
227
241
if (value->IsEmbedded ()) {
228
242
rowId = value->Get <ui64>();
229
243
} else if (value->IsBoxed ()) {
230
244
if (value->GetListLength () == 1 ) {
231
245
rowId = value->GetElement (0 ).Get <ui64>();
232
246
} else if (value->GetListLength () == 2 ) {
233
- rowId = value->GetElement (0 ). Get <ui64>( );
234
- watermarkUs = value-> GetElement ( 1 ). Get <ui64>() ;
247
+ OnWatermark ( value->GetElement (0 ), value-> GetElement ( 1 ) );
248
+ return ;
235
249
} else {
236
250
Y_ENSURE (false , " Unexpected output schema size" );
237
251
}
@@ -246,23 +260,18 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
246
260
}
247
261
248
262
FilteredOffsets.insert (Offset);
249
- if (watermarkUs) {
250
- WatermarksUs.push_back (*watermarkUs);
251
-
252
- const auto watermark = WatermarksUs.empty () ? Nothing () : TMaybe<TInstant>{TInstant::MicroSeconds (WatermarksUs.back ())};
253
- LOG_ROW_DISPATCHER_TRACE (" OnData, row id: " << rowId << " , offset: " << Offset << " , watermark: " << watermark);
254
-
255
- return ;
256
- }
257
263
258
264
Y_DEFER {
259
265
// Values allocated on parser allocator and should be released
260
266
FilteredRow.assign (Columns.size (), NYql::NUdf::TUnboxedValue ());
261
267
};
262
268
263
269
for (size_t i = 0 ; const ui64 columnId : ColumnsIds) {
270
+ auto & parsedData = Self.ParsedData [Self.ParserSchemaIndex [columnId]];
271
+ Y_DEBUG_ABORT_UNLESS (parsedData.size () > rowId);
272
+
264
273
// All data was locked in parser, so copy is safe
265
- FilteredRow[i++] = Self. ParsedData [Self. ParserSchemaIndex [columnId]]-> at ( rowId) ;
274
+ FilteredRow[i++] = parsedData[ rowId] ;
266
275
}
267
276
DataPacker->AddWideItem (FilteredRow.data (), FilteredRow.size ());
268
277
@@ -272,7 +281,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
272
281
}
273
282
274
283
void OnBatchFinish () override {
275
- if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && WatermarksUs. empty () ) {
284
+ if (NewNumberRows == NumberRows && NewDataPackerSize == DataPackerSize && !Watermark ) {
276
285
return ;
277
286
}
278
287
if (const auto nextOffset = Client->GetNextMessageOffset (); nextOffset && Offset < *nextOffset) {
@@ -282,11 +291,10 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
282
291
283
292
const auto numberRows = NewNumberRows - NumberRows;
284
293
const auto rowSize = NewDataPackerSize - DataPackerSize;
285
- const auto watermark = WatermarksUs.empty () ? Nothing () : TMaybe<TInstant>{TInstant::MicroSeconds (WatermarksUs.back ())};
286
294
287
- LOG_ROW_DISPATCHER_TRACE (" OnBatchFinish, offset: " << Offset << " , number rows: " << numberRows << " , row size: " << rowSize << " , watermark: " << watermark );
295
+ LOG_ROW_DISPATCHER_TRACE (" OnBatchFinish, offset: " << Offset << " , number rows: " << numberRows << " , row size: " << rowSize << " , watermark: " << Watermark );
288
296
289
- Client->AddDataToClient (Offset, numberRows, rowSize, watermark );
297
+ Client->AddDataToClient (Offset, numberRows, rowSize, Watermark );
290
298
291
299
NumberRows = NewNumberRows;
292
300
DataPackerSize = NewDataPackerSize;
@@ -315,15 +323,18 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
315
323
}
316
324
317
325
void FinishPacking () {
318
- if (!DataPacker->IsEmpty () || !WatermarksUs. empty ()) {
326
+ if (!DataPacker->IsEmpty () || !Watermark. Empty ()) {
319
327
LOG_ROW_DISPATCHER_TRACE (" FinishPacking, batch size: " << DataPackerSize << " , number rows: " << FilteredOffsets.size ());
320
- ClientData.emplace (NYql::MakeReadOnlyRope (DataPacker->Finish ()), FilteredOffsets, WatermarksUs);
328
+ if (FilteredOffsets.empty ()) {
329
+ FilteredOffsets.emplace (Offset);
330
+ }
331
+ ClientData.emplace (NYql::MakeReadOnlyRope (DataPacker->Finish ()), std::move (FilteredOffsets), Watermark);
321
332
NumberRows = 0 ;
322
333
NewNumberRows = 0 ;
323
334
DataPackerSize = 0 ;
324
335
NewDataPackerSize = 0 ;
325
336
FilteredOffsets.clear ();
326
- WatermarksUs. clear ();
337
+ Watermark. Clear ();
327
338
}
328
339
}
329
340
@@ -345,7 +356,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
345
356
TVector<NYql::NUdf::TUnboxedValue> FilteredRow; // Temporary value holder for DataPacket
346
357
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true >> DataPacker;
347
358
TSet<ui64> FilteredOffsets; // Offsets of current batch in DataPacker
348
- TVector<ui64> WatermarksUs ;
359
+ TMaybe<TInstant> Watermark ;
349
360
TQueue<TDataBatch> ClientData;
350
361
};
351
362
@@ -653,7 +664,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
653
664
654
665
// Parsed data
655
666
const TVector<ui64>* Offsets;
656
- TVector<const TVector <NYql::NUdf::TUnboxedValue>* > ParsedData;
667
+ TVector<std::span <NYql::NUdf::TUnboxedValue>> ParsedData;
657
668
bool RefreshScheduled = false ;
658
669
659
670
// Metrics
0 commit comments