@@ -67,6 +67,7 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
67
67
private static final String timestampColumnsProp = "Timestamp Columns" ;
68
68
private static final String readDelayProp = "Read Delay" ;
69
69
private static final String fullLoadIntervalProp = "Full Load Interval" ;
70
+ private static final String loadIntervalProp = "Incremental Load Interval" ;
70
71
private static final String userNameProp = "User Name" ;
71
72
private static final String passwordProp = "Password" ;
72
73
private static final String keepSourceTypes = "Keep JDBC source types" ;
@@ -83,10 +84,12 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
83
84
new SimplePropertyDescription (timestampColumnsProp , "Comma separated list of timestamp columns to use for loading new rows. The fist non-null value will be used. At least one of the values must not be null for each row" , true ),
84
85
new SimplePropertyDescription (readDelayProp , "How long (in seconds) to wait before reading rows based on their timestamp. This allows waiting for all transactions of a certain timestamp to complete to avoid loading partial data. Default value is 0" , true ),
85
86
new SimplePropertyDescription (fullLoadIntervalProp , "If set the full table will be read every configured interval (in minutes). When this is configured the update time and incrementing columns are not used." , true ),
86
- new SimplePropertyDescription (keepSourceTypes , "Keep original data types from source to use string representation" , true , false , null , null , null , true , Optional .of ("true" )));
87
+ new SimplePropertyDescription (keepSourceTypes , "Keep original data types from source to use string representation" , true , false , null , null , null , true , Optional .of ("true" )),
88
+ new SimplePropertyDescription (loadIntervalProp , "Configures how often (in minutes) the data source will poll the database for new changes" , true ));
87
89
88
90
private long readDelay ;
89
91
private long fullLoadIntervalMinutes ;
92
+ private long loadIntervalMinutes ;
90
93
private TableInfo tableInfo ;
91
94
private QueryDialect queryDialect ;
92
95
private long dbTimezoneOffset ;
@@ -103,6 +106,10 @@ private boolean isFullLoad() {
103
106
return fullLoadIntervalMinutes > 0 ;
104
107
}
105
108
109
+ private boolean hasCustomLoadInterval () {
110
+ return loadIntervalMinutes > 1 ;
111
+ }
112
+
106
113
@ Override
107
114
public DataSourceDescription getDataSourceDescription () {
108
115
return new JDBCDataSourceDescription ();
@@ -146,6 +153,7 @@ public void setProperties(Map<String, String> properties) {
146
153
try (Connection con = getConnection ()) {
147
154
readDelay = Long .parseLong (properties .getOrDefault (readDelayProp , "0" ));
148
155
fullLoadIntervalMinutes = Long .parseLong (properties .getOrDefault (fullLoadIntervalProp , "0" ));
156
+ loadIntervalMinutes = Long .parseLong (properties .getOrDefault (loadIntervalProp , "1" ));
149
157
DatabaseMetaData metadata = con .getMetaData ();
150
158
String userProvidedIncColumn = properties .get (incrementingColumnNameProp );
151
159
tableInfo = loadTableInfo (metadata , properties .getOrDefault (schemaPatternProp , null ), properties .get (tableNameProp ));
@@ -404,9 +412,10 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta
404
412
405
413
var taskCount = completedRanges .size () + wantedRanges .size ();
406
414
var itemsPerTask = (taskInfo .getMetadata ().itemsPerTask (taskCount ));
407
- var emptyFullLoad = isFullLoad () && wantedRanges .stream ().noneMatch (this ::matchesLoadInterval );
415
+ var skipAll = hasCustomLoadInterval () && wantedRanges .stream ().noneMatch (this ::matchesLoadInterval );
416
+ var emptyFullLoad = isFullLoad () && wantedRanges .stream ().noneMatch (this ::matchesFullLoadInterval );
408
417
var noDataToLoad = !isFullLoad () && !tableInfo .hasTimeColumns () && itemsPerTask == 0 ;
409
- if (emptyFullLoad || noDataToLoad ) {
418
+ if (skipAll || emptyFullLoad || noDataToLoad ) {
410
419
List <DataLoader <JDBCTaskMetadata >> result =
411
420
wantedRanges .stream ().map (t -> new NoDataLoader (t , taskInfo .getMetadata ())).collect (Collectors .toList ());
412
421
return CompletableFuture .completedFuture (result .iterator ());
@@ -423,8 +432,16 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta
423
432
}
424
433
}
425
434
435
+ private boolean matchesFullLoadInterval (TaskRange x ) {
436
+ return getTimeInMinutes (x .getInclusiveStartTime ()) % fullLoadIntervalMinutes == 0 ;
437
+ }
438
+
426
439
private boolean matchesLoadInterval (TaskRange x ) {
427
- return x .getInclusiveStartTime ().getEpochSecond () / 60 % fullLoadIntervalMinutes == 0 ;
440
+ return getTimeInMinutes (x .getInclusiveStartTime ()) % loadIntervalMinutes == 0 ;
441
+ }
442
+
443
+ private Long getTimeInMinutes (Instant time ) {
444
+ return time .getEpochSecond () / 60L ;
428
445
}
429
446
430
447
private List <JDBCTaskMetadata > getRunMetadatas (TaskInformation <JDBCTaskMetadata > taskInfo ,
@@ -445,10 +462,18 @@ private List<JDBCTaskMetadata> getRunMetadatas(TaskInformation<JDBCTaskMetadata>
445
462
// First task does not have lower bound to ensure we don't skip data from the last point we stopped at
446
463
var startTime =
447
464
firstInBatch ? taskInfo .getMetadata ().getStartTime () : wantedRange .getInclusiveStartTime ().minusSeconds (readDelay );
465
+ var lowerBound = taskInfo .getMetadata ().getStartTime ().getEpochSecond ();
466
+ var truncatedStartTime = Math .max (lowerBound , hasCustomLoadInterval () ?
467
+ (getTimeInMinutes (startTime ) / loadIntervalMinutes * 60 * loadIntervalMinutes )
468
+ : startTime .getEpochSecond ());
469
+ var endTime = wantedRange .getExclusiveEndTime ().minusSeconds (readDelay );
470
+ var truncatedEndTime = Math .max (lowerBound ,hasCustomLoadInterval () ?
471
+ getTimeInMinutes (endTime ) / loadIntervalMinutes * 60 * loadIntervalMinutes
472
+ : endTime .getEpochSecond ());
448
473
var metadata = new JDBCTaskMetadata (taskInfo .getMetadata ().getInclusiveStart (),
449
474
taskInfo .getMetadata ().getExclusiveEnd (),
450
- startTime ,
451
- wantedRange . getExclusiveEndTime (). minusSeconds ( readDelay ));
475
+ Instant . ofEpochSecond ( truncatedStartTime ) ,
476
+ Instant . ofEpochSecond ( truncatedEndTime ));
452
477
result .add (metadata );
453
478
}
454
479
} else {
@@ -486,43 +511,58 @@ private CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> splitData(Result
486
511
final var isLast = i == wantedRanges .size () - 1 ;
487
512
final var taskRange = wantedRanges .get (i );
488
513
final var metadata = runMetadatas .get (i );
489
- var loader = new DataLoader <JDBCTaskMetadata >() {
490
- @ Override
491
- public TaskRange getTaskRange () {
492
- return taskRange ;
493
- }
494
-
495
- private final RowReader rowReader = new RowReader (tableInfo , valueGetter , metadata , connection , isFullLoad () && matchesLoadInterval (taskRange ));
496
-
497
- @ Override
498
- public Iterator <LoadedData > loadData () {
499
- ResultSetInputStream inputStream = new ResultSetInputStream (rowConverter , rowReader , isLast );
500
- var result = new LoadedData (inputStream , new HashMap <>(), taskRange .getInclusiveStartTime ());
501
- return Collections .singleton (result ).iterator ();
502
- }
503
-
504
- @ Override
505
- public JDBCTaskMetadata getCompletedMetadata () {
506
- if (tableInfo .hasTimeColumns () && rowReader .readValues ()) {
507
- if (rowReader .readValues ()) {
508
- // If some data was successfully read then that's our next start point
509
- lastReadTime .set (toUtc (rowReader .getLastTimestampValue ().toInstant ()));
510
- lastReadIncValue .set (rowReader .getLastIncValue ());
511
- }
512
- metadata .setExclusiveEnd (lastReadIncValue .get () + 1 );
513
- metadata .setEndTime (lastReadTime .get ());
514
+ DataLoader <JDBCTaskMetadata > loader = null ;
515
+ if (matchesLoadInterval (taskRange )){
516
+ loader = getLoader (connection ,
517
+ lastReadIncValue ,
518
+ lastReadTime ,
519
+ valueGetter ,
520
+ isLast ,
521
+ taskRange ,
522
+ metadata );
523
+ } else {
524
+ loader = new NoDataLoader (taskRange , metadata );
514
525
}
515
-
516
- return metadata ;
517
- }
518
- };
519
526
result .add (loader );
520
527
521
528
}
522
529
return CompletableFuture .completedFuture (result .iterator ());
523
530
524
531
}
525
532
533
+ private DataLoader <JDBCTaskMetadata > getLoader (Connection connection , AtomicReference <Long > lastReadIncValue , AtomicReference <Instant > lastReadTime , ResultSetValuesGetter valueGetter , boolean isLast , TaskRange taskRange , JDBCTaskMetadata metadata ) {
534
+ return new DataLoader <JDBCTaskMetadata >() {
535
+ @ Override
536
+ public TaskRange getTaskRange () {
537
+ return taskRange ;
538
+ }
539
+
540
+ private final RowReader rowReader = new RowReader (tableInfo , valueGetter , metadata , connection , isFullLoad () && matchesFullLoadInterval (taskRange ));
541
+
542
+ @ Override
543
+ public Iterator <LoadedData > loadData () {
544
+ ResultSetInputStream inputStream = new ResultSetInputStream (rowConverter , rowReader , isLast );
545
+ var result = new LoadedData (inputStream , new HashMap <>(), taskRange .getInclusiveStartTime ());
546
+ return Collections .singleton (result ).iterator ();
547
+ }
548
+
549
+ @ Override
550
+ public JDBCTaskMetadata getCompletedMetadata () {
551
+ if (tableInfo .hasTimeColumns () && rowReader .readValues ()) {
552
+ if (rowReader .readValues ()) {
553
+ // If some data was successfully read then that's our next start point
554
+ lastReadTime .set (toUtc (rowReader .getLastTimestampValue ().toInstant ()));
555
+ lastReadIncValue .set (rowReader .getLastIncValue ());
556
+ }
557
+ metadata .setExclusiveEnd (lastReadIncValue .get () + 1 );
558
+ metadata .setEndTime (lastReadTime .get ());
559
+ }
560
+
561
+ return metadata ;
562
+ }
563
+ };
564
+ }
565
+
526
566
527
567
@ Override
528
568
public CompletionStage <TaskInformation <JDBCTaskMetadata >> getTaskInfo (JDBCTaskMetadata previousTaskMetadata ,
0 commit comments