@@ -67,6 +67,7 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
67
67
private static final String timestampColumnsProp = "Timestamp Columns" ;
68
68
private static final String readDelayProp = "Read Delay" ;
69
69
private static final String fullLoadIntervalProp = "Full Load Interval" ;
70
+ private static final String loadIntervalProp = "Incremental Load Interval" ;
70
71
private static final String userNameProp = "User Name" ;
71
72
private static final String passwordProp = "Password" ;
72
73
private static final String keepSourceTypes = "Keep JDBC source types" ;
@@ -83,10 +84,12 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
83
84
new SimplePropertyDescription (timestampColumnsProp , "Comma separated list of timestamp columns to use for loading new rows. The fist non-null value will be used. At least one of the values must not be null for each row" , true ),
84
85
new SimplePropertyDescription (readDelayProp , "How long (in seconds) to wait before reading rows based on their timestamp. This allows waiting for all transactions of a certain timestamp to complete to avoid loading partial data. Default value is 0" , true ),
85
86
new SimplePropertyDescription (fullLoadIntervalProp , "If set the full table will be read every configured interval (in minutes). When this is configured the update time and incrementing columns are not used." , true ),
86
- new SimplePropertyDescription (keepSourceTypes , "Keep original data types from source to use string representation" , true , false , null , null , null , true , Optional .of ("true" )));
87
+ new SimplePropertyDescription (keepSourceTypes , "Keep original data types from source to use string representation" , true , false , null , null , null , true , Optional .of ("true" )),
88
+ new SimplePropertyDescription (loadIntervalProp , "Configures how often (in minutes) the data source will poll the database for new changes" , true ));
87
89
88
90
private long readDelay ;
89
91
private long fullLoadIntervalMinutes ;
92
+ private long loadIntervalMinutes ;
90
93
private TableInfo tableInfo ;
91
94
private QueryDialect queryDialect ;
92
95
private long dbTimezoneOffset ;
@@ -103,6 +106,10 @@ private boolean isFullLoad() {
103
106
return fullLoadIntervalMinutes > 0 ;
104
107
}
105
108
109
+ private boolean hasCustomLoadInterval () {
110
+ return loadIntervalMinutes > 1 ;
111
+ }
112
+
106
113
@ Override
107
114
public DataSourceDescription getDataSourceDescription () {
108
115
return new JDBCDataSourceDescription ();
@@ -146,6 +153,7 @@ public void setProperties(Map<String, String> properties) {
146
153
try (Connection con = getConnection ()) {
147
154
readDelay = Long .parseLong (properties .getOrDefault (readDelayProp , "0" ));
148
155
fullLoadIntervalMinutes = Long .parseLong (properties .getOrDefault (fullLoadIntervalProp , "0" ));
156
+ loadIntervalMinutes = Long .parseLong (properties .getOrDefault (loadIntervalProp , "1" ));
149
157
DatabaseMetaData metadata = con .getMetaData ();
150
158
String userProvidedIncColumn = properties .get (incrementingColumnNameProp );
151
159
tableInfo = loadTableInfo (metadata , properties .getOrDefault (schemaPatternProp , null ), properties .get (tableNameProp ));
@@ -403,9 +411,10 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta
403
411
ShardDefinition shardDefinition ) {
404
412
var taskCount = completedRanges .size () + wantedRanges .size ();
405
413
var itemsPerTask = (taskInfo .getMetadata ().itemsPerTask (taskCount ));
406
- var emptyFullLoad = isFullLoad () && wantedRanges .stream ().noneMatch (this ::matchesLoadInterval );
414
+ var skipAll = hasCustomLoadInterval () && wantedRanges .stream ().noneMatch (this ::matchesLoadInterval );
415
+ var emptyFullLoad = isFullLoad () && wantedRanges .stream ().noneMatch (this ::matchesFullLoadInterval );
407
416
var noDataToLoad = !isFullLoad () && !tableInfo .hasTimeColumns () && itemsPerTask == 0 ;
408
- if (emptyFullLoad || noDataToLoad ) {
417
+ if (skipAll || emptyFullLoad || noDataToLoad ) {
409
418
List <DataLoader <JDBCTaskMetadata >> result =
410
419
wantedRanges .stream ().map (t -> new NoDataLoader (t , taskInfo .getMetadata ())).collect (Collectors .toList ());
411
420
return CompletableFuture .completedFuture (result .iterator ());
@@ -422,8 +431,16 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta
422
431
}
423
432
}
424
433
434
+ private boolean matchesFullLoadInterval (TaskRange x ) {
435
+ return getTimeInMinutes (x .getInclusiveStartTime ()) % fullLoadIntervalMinutes == 0 ;
436
+ }
437
+
425
438
private boolean matchesLoadInterval (TaskRange x ) {
426
- return x .getInclusiveStartTime ().getEpochSecond () / 60 % fullLoadIntervalMinutes == 0 ;
439
+ return getTimeInMinutes (x .getInclusiveStartTime ()) % loadIntervalMinutes == 0 ;
440
+ }
441
+
442
+ private Long getTimeInMinutes (Instant time ) {
443
+ return time .getEpochSecond () / 60L ;
427
444
}
428
445
429
446
private List <JDBCTaskMetadata > getRunMetadatas (TaskInformation <JDBCTaskMetadata > taskInfo ,
@@ -444,10 +461,18 @@ private List<JDBCTaskMetadata> getRunMetadatas(TaskInformation<JDBCTaskMetadata>
444
461
// First task does not have lower bound to ensure we don't skip data from the last point we stopped at
445
462
var startTime =
446
463
firstInBatch ? taskInfo .getMetadata ().getStartTime () : wantedRange .getInclusiveStartTime ().minusSeconds (readDelay );
464
+ var lowerBound = taskInfo .getMetadata ().getStartTime ().getEpochSecond ();
465
+ var truncatedStartTime = Math .max (lowerBound , hasCustomLoadInterval () ?
466
+ (getTimeInMinutes (startTime ) / loadIntervalMinutes * 60 * loadIntervalMinutes )
467
+ : startTime .getEpochSecond ());
468
+ var endTime = wantedRange .getExclusiveEndTime ().minusSeconds (readDelay );
469
+ var truncatedEndTime = Math .max (lowerBound ,hasCustomLoadInterval () ?
470
+ getTimeInMinutes (endTime ) / loadIntervalMinutes * 60 * loadIntervalMinutes
471
+ : endTime .getEpochSecond ());
447
472
var metadata = new JDBCTaskMetadata (taskInfo .getMetadata ().getInclusiveStart (),
448
473
taskInfo .getMetadata ().getExclusiveEnd (),
449
- startTime ,
450
- wantedRange . getExclusiveEndTime (). minusSeconds ( readDelay ));
474
+ Instant . ofEpochSecond ( truncatedStartTime ) ,
475
+ Instant . ofEpochSecond ( truncatedEndTime ));
451
476
result .add (metadata );
452
477
}
453
478
} else {
@@ -485,43 +510,58 @@ private CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> splitData(Result
485
510
final var isLast = i == wantedRanges .size () - 1 ;
486
511
final var taskRange = wantedRanges .get (i );
487
512
final var metadata = runMetadatas .get (i );
488
- var loader = new DataLoader <JDBCTaskMetadata >() {
489
- @ Override
490
- public TaskRange getTaskRange () {
491
- return taskRange ;
492
- }
493
-
494
- private final RowReader rowReader = new RowReader (tableInfo , valueGetter , metadata , connection , isFullLoad () && matchesLoadInterval (taskRange ));
495
-
496
- @ Override
497
- public Iterator <LoadedData > loadData () {
498
- ResultSetInputStream inputStream = new ResultSetInputStream (rowConverter , rowReader , isLast );
499
- var result = new LoadedData (inputStream , new HashMap <>(), taskRange .getInclusiveStartTime ());
500
- return Collections .singleton (result ).iterator ();
501
- }
502
-
503
- @ Override
504
- public JDBCTaskMetadata getCompletedMetadata () {
505
- if (tableInfo .hasTimeColumns () && rowReader .readValues ()) {
506
- if (rowReader .readValues ()) {
507
- // If some data was successfully read then that's our next start point
508
- lastReadTime .set (toUtc (rowReader .getLastTimestampValue ().toInstant ()));
509
- lastReadIncValue .set (rowReader .getLastIncValue ());
510
- }
511
- metadata .setExclusiveEnd (lastReadIncValue .get () + 1 );
512
- metadata .setEndTime (lastReadTime .get ());
513
+ DataLoader <JDBCTaskMetadata > loader = null ;
514
+ if (matchesLoadInterval (taskRange )){
515
+ loader = getLoader (connection ,
516
+ lastReadIncValue ,
517
+ lastReadTime ,
518
+ valueGetter ,
519
+ isLast ,
520
+ taskRange ,
521
+ metadata );
522
+ } else {
523
+ loader = new NoDataLoader (taskRange , metadata );
513
524
}
514
-
515
- return metadata ;
516
- }
517
- };
518
525
result .add (loader );
519
526
520
527
}
521
528
return CompletableFuture .completedFuture (result .iterator ());
522
529
523
530
}
524
531
532
+ private DataLoader <JDBCTaskMetadata > getLoader (Connection connection , AtomicReference <Long > lastReadIncValue , AtomicReference <Instant > lastReadTime , ResultSetValuesGetter valueGetter , boolean isLast , TaskRange taskRange , JDBCTaskMetadata metadata ) {
533
+ return new DataLoader <JDBCTaskMetadata >() {
534
+ @ Override
535
+ public TaskRange getTaskRange () {
536
+ return taskRange ;
537
+ }
538
+
539
+ private final RowReader rowReader = new RowReader (tableInfo , valueGetter , metadata , connection , isFullLoad () && matchesFullLoadInterval (taskRange ));
540
+
541
+ @ Override
542
+ public Iterator <LoadedData > loadData () {
543
+ ResultSetInputStream inputStream = new ResultSetInputStream (rowConverter , rowReader , isLast );
544
+ var result = new LoadedData (inputStream , new HashMap <>(), taskRange .getInclusiveStartTime ());
545
+ return Collections .singleton (result ).iterator ();
546
+ }
547
+
548
+ @ Override
549
+ public JDBCTaskMetadata getCompletedMetadata () {
550
+ if (tableInfo .hasTimeColumns () && rowReader .readValues ()) {
551
+ if (rowReader .readValues ()) {
552
+ // If some data was successfully read then that's our next start point
553
+ lastReadTime .set (toUtc (rowReader .getLastTimestampValue ().toInstant ()));
554
+ lastReadIncValue .set (rowReader .getLastIncValue ());
555
+ }
556
+ metadata .setExclusiveEnd (lastReadIncValue .get () + 1 );
557
+ metadata .setEndTime (lastReadTime .get ());
558
+ }
559
+
560
+ return metadata ;
561
+ }
562
+ };
563
+ }
564
+
525
565
526
566
@ Override
527
567
public CompletionStage <TaskInformation <JDBCTaskMetadata >> getTaskInfo (JDBCTaskMetadata previousTaskMetadata ,
0 commit comments