Skip to content

Commit 792c8a5

Browse files
committed
Refactor JDBC DAOs to use JdbcClient where feasible
Closes GH-4804 Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
1 parent b0eccd2 commit 792c8a5

File tree

6 files changed

+160
-117
lines changed

6 files changed

+160
-117
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/AbstractJdbcBatchMetadataDao.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2023 the original author or authors.
2+
* Copyright 2006-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.springframework.beans.factory.InitializingBean;
2222
import org.springframework.jdbc.core.JdbcOperations;
23+
import org.springframework.jdbc.core.simple.JdbcClient;
2324
import org.springframework.util.Assert;
2425
import org.springframework.util.StringUtils;
2526

@@ -29,6 +30,7 @@
2930
*
3031
* @author Robert Kasanicky
3132
* @author Mahmoud Ben Hassine
33+
* @author Yanming Zhou
3234
*/
3335
public abstract class AbstractJdbcBatchMetadataDao implements InitializingBean {
3436

@@ -47,6 +49,8 @@ public abstract class AbstractJdbcBatchMetadataDao implements InitializingBean {
4749

4850
private JdbcOperations jdbcTemplate;
4951

52+
private JdbcClient jdbcClient;
53+
5054
protected String getQuery(String base) {
5155
return StringUtils.replace(base, "%PREFIX%", tablePrefix);
5256
}
@@ -66,12 +70,17 @@ public void setTablePrefix(String tablePrefix) {
6670

6771
public void setJdbcTemplate(JdbcOperations jdbcTemplate) {
6872
this.jdbcTemplate = jdbcTemplate;
73+
this.jdbcClient = JdbcClient.create(jdbcTemplate);
6974
}
7075

7176
protected JdbcOperations getJdbcTemplate() {
7277
return jdbcTemplate;
7378
}
7479

80+
protected JdbcClient getJdbcClient() {
81+
return jdbcClient;
82+
}
83+
7584
public int getClobTypeToUse() {
7685
return clobTypeToUse;
7786
}
@@ -83,6 +92,7 @@ public void setClobTypeToUse(int clobTypeToUse) {
8392
@Override
8493
public void afterPropertiesSet() throws Exception {
8594
Assert.state(jdbcTemplate != null, "JdbcOperations is required");
95+
Assert.state(jdbcClient != null, "JdbcClient is required");
8696
}
8797

8898
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.springframework.core.serializer.Serializer;
4444
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
4545
import org.springframework.jdbc.core.RowMapper;
46+
import org.springframework.jdbc.core.simple.JdbcClient;
4647
import org.springframework.lang.NonNull;
4748
import org.springframework.util.Assert;
4849

@@ -65,7 +66,7 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem
6566
private static final String FIND_JOB_EXECUTION_CONTEXT = """
6667
SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT
6768
FROM %PREFIX%JOB_EXECUTION_CONTEXT
68-
WHERE JOB_EXECUTION_ID = ?
69+
WHERE JOB_EXECUTION_ID = :executionId
6970
""";
7071

7172
private static final String INSERT_JOB_EXECUTION_CONTEXT = """
@@ -82,7 +83,7 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem
8283
private static final String FIND_STEP_EXECUTION_CONTEXT = """
8384
SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT
8485
FROM %PREFIX%STEP_EXECUTION_CONTEXT
85-
WHERE STEP_EXECUTION_ID = ?
86+
WHERE STEP_EXECUTION_ID = :executionId
8687
""";
8788

8889
private static final String INSERT_STEP_EXECUTION_CONTEXT = """
@@ -98,12 +99,12 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem
9899

99100
private static final String DELETE_STEP_EXECUTION_CONTEXT = """
100101
DELETE FROM %PREFIX%STEP_EXECUTION_CONTEXT
101-
WHERE STEP_EXECUTION_ID = ?
102+
WHERE STEP_EXECUTION_ID = :executionId
102103
""";
103104

104105
private static final String DELETE_JOB_EXECUTION_CONTEXT = """
105106
DELETE FROM %PREFIX%JOB_EXECUTION_CONTEXT
106-
WHERE JOB_EXECUTION_ID = ?
107+
WHERE JOB_EXECUTION_ID = :executionId
107108
""";
108109

109110
private Charset charset = StandardCharsets.UTF_8;
@@ -154,8 +155,10 @@ public ExecutionContext getExecutionContext(JobExecution jobExecution) {
154155
Long executionId = jobExecution.getId();
155156
Assert.notNull(executionId, "ExecutionId must not be null.");
156157

157-
try (Stream<ExecutionContext> stream = getJdbcTemplate().queryForStream(getQuery(FIND_JOB_EXECUTION_CONTEXT),
158-
new ExecutionContextRowMapper(), executionId)) {
158+
try (Stream<ExecutionContext> stream = getJdbcClient().sql(getQuery(FIND_JOB_EXECUTION_CONTEXT))
159+
.param("executionId", executionId)
160+
.query(new ExecutionContextRowMapper())
161+
.stream()) {
159162
return stream.findFirst().orElseGet(ExecutionContext::new);
160163
}
161164
}
@@ -165,8 +168,10 @@ public ExecutionContext getExecutionContext(StepExecution stepExecution) {
165168
Long executionId = stepExecution.getId();
166169
Assert.notNull(executionId, "ExecutionId must not be null.");
167170

168-
try (Stream<ExecutionContext> stream = getJdbcTemplate().queryForStream(getQuery(FIND_STEP_EXECUTION_CONTEXT),
169-
new ExecutionContextRowMapper(), executionId)) {
171+
try (Stream<ExecutionContext> stream = getJdbcClient().sql(getQuery(FIND_STEP_EXECUTION_CONTEXT))
172+
.param("executionId", executionId)
173+
.query(new ExecutionContextRowMapper())
174+
.stream()) {
170175
return stream.findFirst().orElseGet(ExecutionContext::new);
171176
}
172177
}
@@ -248,7 +253,7 @@ public void saveExecutionContexts(Collection<StepExecution> stepExecutions) {
248253
*/
249254
@Override
250255
public void deleteExecutionContext(JobExecution jobExecution) {
251-
getJdbcTemplate().update(getQuery(DELETE_JOB_EXECUTION_CONTEXT), jobExecution.getId());
256+
getJdbcClient().sql(getQuery(DELETE_JOB_EXECUTION_CONTEXT)).param("executionId", jobExecution.getId()).update();
252257
}
253258

254259
/**
@@ -257,7 +262,9 @@ public void deleteExecutionContext(JobExecution jobExecution) {
257262
*/
258263
@Override
259264
public void deleteExecutionContext(StepExecution stepExecution) {
260-
getJdbcTemplate().update(getQuery(DELETE_STEP_EXECUTION_CONTEXT), stepExecution.getId());
265+
getJdbcClient().sql(getQuery(DELETE_STEP_EXECUTION_CONTEXT))
266+
.param("executionId", stepExecution.getId())
267+
.update();
261268
}
262269

263270
@Override

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java

Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,13 @@ public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements
9292
private static final String CHECK_JOB_EXECUTION_EXISTS = """
9393
SELECT COUNT(*)
9494
FROM %PREFIX%JOB_EXECUTION
95-
WHERE JOB_EXECUTION_ID = ?
95+
WHERE JOB_EXECUTION_ID = :executionId
9696
""";
9797

9898
private static final String GET_STATUS = """
9999
SELECT STATUS
100100
FROM %PREFIX%JOB_EXECUTION
101-
WHERE JOB_EXECUTION_ID = ?
101+
WHERE JOB_EXECUTION_ID = :executionId
102102
""";
103103

104104
private static final String UPDATE_JOB_EXECUTION = """
@@ -113,29 +113,29 @@ SELECT COUNT(*)
113113
""";
114114

115115
private static final String FIND_JOB_EXECUTIONS = GET_JOB_EXECUTIONS
116-
+ " WHERE JOB_INSTANCE_ID = ? ORDER BY JOB_EXECUTION_ID DESC";
116+
+ " WHERE JOB_INSTANCE_ID = :jobInstanceId ORDER BY JOB_EXECUTION_ID DESC";
117117

118118
private static final String GET_LAST_EXECUTION = GET_JOB_EXECUTIONS
119-
+ " WHERE JOB_INSTANCE_ID = ? AND JOB_EXECUTION_ID IN (SELECT MAX(JOB_EXECUTION_ID) FROM %PREFIX%JOB_EXECUTION E2 WHERE E2.JOB_INSTANCE_ID = ?)";
119+
+ " WHERE JOB_INSTANCE_ID = :jobInstanceId AND JOB_EXECUTION_ID IN (SELECT MAX(JOB_EXECUTION_ID) FROM %PREFIX%JOB_EXECUTION E2 WHERE E2.JOB_INSTANCE_ID = :jobInstanceId)";
120120

121-
private static final String GET_EXECUTION_BY_ID = GET_JOB_EXECUTIONS + " WHERE JOB_EXECUTION_ID = ?";
121+
private static final String GET_EXECUTION_BY_ID = GET_JOB_EXECUTIONS + " WHERE JOB_EXECUTION_ID = :jobExecutionId";
122122

123123
private static final String GET_RUNNING_EXECUTIONS = """
124124
SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, E.JOB_INSTANCE_ID
125125
FROM %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I
126-
WHERE E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID AND I.JOB_NAME=? AND E.STATUS IN ('STARTING', 'STARTED', 'STOPPING')
126+
WHERE E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID AND I.JOB_NAME=:jobName AND E.STATUS IN ('STARTING', 'STARTED', 'STOPPING')
127127
""";
128128

129129
private static final String CURRENT_VERSION_JOB_EXECUTION = """
130130
SELECT VERSION
131131
FROM %PREFIX%JOB_EXECUTION
132-
WHERE JOB_EXECUTION_ID=?
132+
WHERE JOB_EXECUTION_ID= :executionId
133133
""";
134134

135135
private static final String FIND_PARAMS_FROM_ID = """
136136
SELECT JOB_EXECUTION_ID, PARAMETER_NAME, PARAMETER_TYPE, PARAMETER_VALUE, IDENTIFYING
137137
FROM %PREFIX%JOB_EXECUTION_PARAMS
138-
WHERE JOB_EXECUTION_ID = ?
138+
WHERE JOB_EXECUTION_ID = :executionId
139139
""";
140140

141141
private static final String CREATE_JOB_PARAMETERS = """
@@ -145,12 +145,12 @@ SELECT COUNT(*)
145145

146146
private static final String DELETE_JOB_EXECUTION = """
147147
DELETE FROM %PREFIX%JOB_EXECUTION
148-
WHERE JOB_EXECUTION_ID = ? AND VERSION = ?
148+
WHERE JOB_EXECUTION_ID = :executionId AND VERSION = :version
149149
""";
150150

151151
private static final String DELETE_JOB_EXECUTION_PARAMETERS = """
152152
DELETE FROM %PREFIX%JOB_EXECUTION_PARAMS
153-
WHERE JOB_EXECUTION_ID = ?
153+
WHERE JOB_EXECUTION_ID = :executionId
154154
""";
155155

156156
private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH;
@@ -213,7 +213,10 @@ public List<JobExecution> findJobExecutions(final JobInstance job) {
213213
Assert.notNull(job, "Job cannot be null.");
214214
Assert.notNull(job.getId(), "Job Id cannot be null.");
215215

216-
return getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTIONS), new JobExecutionRowMapper(job), job.getId());
216+
return getJdbcClient().sql(getQuery(FIND_JOB_EXECUTIONS))
217+
.param("jobInstanceId", job.getId())
218+
.query(new JobExecutionRowMapper(job))
219+
.list();
217220
}
218221

219222
/**
@@ -306,8 +309,10 @@ public void updateJobExecution(JobExecution jobExecution) {
306309
// it
307310
// is invalid and
308311
// an exception should be thrown.
309-
if (getJdbcTemplate().queryForObject(getQuery(CHECK_JOB_EXECUTION_EXISTS), Integer.class,
310-
new Object[] { jobExecution.getId() }) != 1) {
312+
if (getJdbcClient().sql(getQuery(CHECK_JOB_EXECUTION_EXISTS))
313+
.param("executionId", jobExecution.getId())
314+
.query(Integer.class)
315+
.single() != 1) {
311316
throw new NoSuchObjectException("Invalid JobExecution, ID " + jobExecution.getId() + " not found.");
312317
}
313318

@@ -317,8 +322,10 @@ public void updateJobExecution(JobExecution jobExecution) {
317322

318323
// Avoid concurrent modifications...
319324
if (count == 0) {
320-
int currentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_JOB_EXECUTION),
321-
Integer.class, new Object[] { jobExecution.getId() });
325+
int currentVersion = getJdbcClient().sql(getQuery(CURRENT_VERSION_JOB_EXECUTION))
326+
.param("executionId", jobExecution.getId())
327+
.query(Integer.class)
328+
.single();
322329
throw new OptimisticLockingFailureException(
323330
"Attempt to update job execution id=" + jobExecution.getId() + " with wrong version ("
324331
+ jobExecution.getVersion() + "), where current version is " + currentVersion);
@@ -337,22 +344,22 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) {
337344

338345
Long id = jobInstance.getId();
339346

340-
try (Stream<JobExecution> stream = getJdbcTemplate().queryForStream(getQuery(GET_LAST_EXECUTION),
341-
new JobExecutionRowMapper(jobInstance), id, id)) {
347+
try (Stream<JobExecution> stream = getJdbcClient().sql(getQuery(GET_LAST_EXECUTION))
348+
.param("jobInstanceId", id)
349+
.query(new JobExecutionRowMapper(jobInstance))
350+
.stream()) {
342351
return stream.findFirst().orElse(null);
343352
}
344353
}
345354

346355
@Override
347356
@Nullable
348357
public JobExecution getJobExecution(Long executionId) {
349-
try {
350-
return getJdbcTemplate().queryForObject(getQuery(GET_EXECUTION_BY_ID), new JobExecutionRowMapper(),
351-
executionId);
352-
}
353-
catch (EmptyResultDataAccessException e) {
354-
return null;
355-
}
358+
return getJdbcClient().sql(getQuery(GET_EXECUTION_BY_ID))
359+
.param("jobExecutionId", executionId)
360+
.query(new JobExecutionRowMapper())
361+
.optional()
362+
.orElse(null);
356363
}
357364

358365
@Override
@@ -363,18 +370,23 @@ public Set<JobExecution> findRunningJobExecutions(String jobName) {
363370
JobExecutionRowMapper mapper = new JobExecutionRowMapper();
364371
result.add(mapper.mapRow(rs, 0));
365372
};
366-
getJdbcTemplate().query(getQuery(GET_RUNNING_EXECUTIONS), handler, jobName);
373+
getJdbcClient().sql(getQuery(GET_RUNNING_EXECUTIONS)).param("jobName", jobName).query(handler);
367374

368375
return result;
369376
}
370377

371378
@Override
372379
public void synchronizeStatus(JobExecution jobExecution) {
373-
int currentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_JOB_EXECUTION), Integer.class,
374-
jobExecution.getId());
380+
int currentVersion = getJdbcClient().sql(getQuery(CURRENT_VERSION_JOB_EXECUTION))
381+
.param("executionId", jobExecution.getId())
382+
.query(Integer.class)
383+
.single();
375384

376385
if (currentVersion != jobExecution.getVersion()) {
377-
String status = getJdbcTemplate().queryForObject(getQuery(GET_STATUS), String.class, jobExecution.getId());
386+
String status = getJdbcClient().sql(getQuery(GET_STATUS))
387+
.param("executionId", jobExecution.getId())
388+
.query(String.class)
389+
.single();
378390
jobExecution.upgradeStatus(BatchStatus.valueOf(status));
379391
jobExecution.setVersion(currentVersion);
380392
}
@@ -386,8 +398,10 @@ public void synchronizeStatus(JobExecution jobExecution) {
386398
*/
387399
@Override
388400
public void deleteJobExecution(JobExecution jobExecution) {
389-
int count = getJdbcTemplate().update(getQuery(DELETE_JOB_EXECUTION), jobExecution.getId(),
390-
jobExecution.getVersion());
401+
int count = getJdbcClient().sql(getQuery(DELETE_JOB_EXECUTION))
402+
.param("executionId", jobExecution.getId())
403+
.param("version", jobExecution.getVersion())
404+
.update();
391405

392406
if (count == 0) {
393407
throw new OptimisticLockingFailureException("Attempt to delete job execution id=" + jobExecution.getId()
@@ -401,7 +415,9 @@ public void deleteJobExecution(JobExecution jobExecution) {
401415
*/
402416
@Override
403417
public void deleteJobExecutionParameters(JobExecution jobExecution) {
404-
getJdbcTemplate().update(getQuery(DELETE_JOB_EXECUTION_PARAMETERS), jobExecution.getId());
418+
getJdbcClient().sql(getQuery(DELETE_JOB_EXECUTION_PARAMETERS))
419+
.param("executionId", jobExecution.getId())
420+
.update();
405421
}
406422

407423
/**
@@ -468,7 +484,7 @@ protected JobParameters getJobParameters(Long executionId) {
468484
map.put(parameterName, jobParameter);
469485
};
470486

471-
getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID), handler, executionId);
487+
getJdbcClient().sql(getQuery(FIND_PARAMS_FROM_ID)).param("executionId", executionId).query(handler);
472488

473489
return new JobParameters(map);
474490
}

0 commit comments

Comments
 (0)