Skip to content

Commit 3433b02

Browse files
committed
Add a new Step implementation for the chunk-oriented processing model
This commit also adapts the ChunkListener interface as well as corresponding annotations to the new implementation. Moreover, this commit deprecates SkipWrapper and its usage in the Chunk API as they are not used in the new implementation. Resolves #3950
1 parent 3bcc525 commit 3433b02

File tree

19 files changed

+1372
-22
lines changed

19 files changed

+1372
-22
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/annotation/AfterChunk.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@
1616
package org.springframework.batch.core.annotation;
1717

1818
import org.springframework.batch.core.listener.ChunkListener;
19-
import org.springframework.batch.core.scope.context.ChunkContext;
19+
import org.springframework.batch.item.Chunk;
2020

2121
import java.lang.annotation.ElementType;
2222
import java.lang.annotation.Retention;
2323
import java.lang.annotation.RetentionPolicy;
2424
import java.lang.annotation.Target;
2525

2626
/**
27-
* Marks a method to be called after a chunk is executed.<br>
28-
* <br>
29-
* Expected signature: void afterChunk(ChunkContext context)
27+
* Marks a method to be called after a chunk is processed. <br>
28+
* Expected signature: void afterChunk(Chunk)
3029
*
3130
* @author Lucas Ward
31+
* @author Mahmoud Ben Hassine
3232
* @since 2.0
33-
* @see ChunkListener#afterChunk(ChunkContext context)
33+
* @see ChunkListener#afterChunk(Chunk)
3434
*/
3535
@Retention(RetentionPolicy.RUNTIME)
3636
@Target({ ElementType.METHOD })

spring-batch-core/src/main/java/org/springframework/batch/core/annotation/BeforeChunk.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@
2222

2323
import org.springframework.batch.core.listener.ChunkListener;
2424
import org.springframework.batch.core.scope.context.ChunkContext;
25+
import org.springframework.batch.item.Chunk;
2526

2627
/**
2728
* Marks a method to be called before a chunk is executed. <br>
2829
* <br>
29-
* Expected signature: void beforeChunk(ChunkContext context)
30+
* Expected signature: void beforeChunk(Chunk)
3031
*
3132
* @author Lucas Ward
33+
* @author Mahmoud Ben Hassine
3234
* @since 2.0
33-
* @see ChunkListener#beforeChunk(ChunkContext context)
35+
* @see ChunkListener#beforeChunk(Chunk)
3436
*/
3537
@Retention(RetentionPolicy.RUNTIME)
3638
@Target({ ElementType.METHOD })
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.core.annotation;
17+
18+
import java.lang.annotation.ElementType;
19+
import java.lang.annotation.Retention;
20+
import java.lang.annotation.RetentionPolicy;
21+
import java.lang.annotation.Target;
22+
23+
import org.springframework.batch.core.listener.ChunkListener;
24+
import org.springframework.batch.item.Chunk;
25+
26+
/**
27+
* Marks a method to be called after a chunk has failed. <br>
28+
* Expected signature: void onChunkError(Exception, Chunk)
29+
*
30+
* @author Mahmoud Ben Hassine
31+
* @since 6.0
32+
* @see ChunkListener#onChunkError(Exception, Chunk)
33+
*/
34+
@Retention(RetentionPolicy.RUNTIME)
35+
@Target({ ElementType.METHOD })
36+
public @interface OnChunkError {
37+
38+
}

spring-batch-core/src/main/java/org/springframework/batch/core/listener/ChunkListener.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.batch.core.listener;
1717

1818
import org.springframework.batch.core.scope.context.ChunkContext;
19+
import org.springframework.batch.item.Chunk;
1920

2021
/**
2122
* Listener interface for the lifecycle of a chunk. A chunk can be thought of as a
@@ -30,24 +31,32 @@
3031
* @author Parikshit Dutta
3132
* @author Injae Kim
3233
*/
33-
public interface ChunkListener extends StepListener {
34+
public interface ChunkListener<I, O> extends StepListener {
3435

3536
/**
3637
* The key for retrieving the rollback exception.
38+
* @deprecated since 6.0 with no replacement. Scheduled for removal in 6.2 or later.
3739
*/
40+
@Deprecated(since = "6.0", forRemoval = true)
3841
String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";
3942

4043
/**
4144
* Callback before the chunk is executed, but inside the transaction.
4245
* @param context The current {@link ChunkContext}
46+
* @deprecated since 6.0, use {@link #beforeChunk(Chunk)} instead. Scheduled for
47+
* removal in 6.2 or later.
4348
*/
49+
@Deprecated(since = "6.0", forRemoval = true)
4450
default void beforeChunk(ChunkContext context) {
4551
}
4652

4753
/**
4854
* Callback after the chunk is executed, outside the transaction.
4955
* @param context The current {@link ChunkContext}
56+
* @deprecated since 6.0, use {@link #afterChunk(Chunk)} instead. Scheduled for
57+
* removal in 6.2 or later.
5058
*/
59+
@Deprecated(since = "6.0", forRemoval = true)
5160
default void afterChunk(ChunkContext context) {
5261
}
5362

@@ -61,8 +70,37 @@ default void afterChunk(ChunkContext context) {
6170
* from here.</em>
6271
* @param context the chunk context containing the exception that caused the
6372
* underlying rollback.
73+
* @deprecated since 6.0, use {@link #onChunkError(Exception,Chunk)} instead.
74+
* Scheduled for removal in 6.2 or later.
6475
*/
76+
@Deprecated(since = "6.0", forRemoval = true)
6577
default void afterChunkError(ChunkContext context) {
6678
}
6779

80+
/**
81+
* Callback before the chunk is processed, inside the transaction.
82+
* @since 6.0
83+
*/
84+
default void beforeChunk(Chunk<I> chunk) {
85+
}
86+
87+
/**
88+
* Callback after the chunk is written, inside the transaction.
89+
* @since 6.0
90+
*/
91+
default void afterChunk(Chunk<O> chunk) {
92+
}
93+
94+
/**
95+
* Callback if an exception occurs while processing or writing a chunk, inside the
96+
* transaction, which is about to be rolled back. <em>As a result, you should use
97+
* {@code PROPAGATION_REQUIRES_NEW} for any transactional operation that is called
98+
* here.</em>
99+
* @param exception the exception that caused the underlying rollback.
100+
* @param chunk the processed chunk
101+
* @since 6.0
102+
*/
103+
default void onChunkError(Exception exception, Chunk<O> chunk) {
104+
}
105+
68106
}

spring-batch-core/src/main/java/org/springframework/batch/core/listener/CompositeChunkListener.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020
import java.util.List;
2121

2222
import org.springframework.batch.core.scope.context.ChunkContext;
23+
import org.springframework.batch.item.Chunk;
2324
import org.springframework.core.Ordered;
2425

2526
/**
2627
* @author Lucas Ward
2728
* @author Mahmoud Ben Hassine
2829
*
2930
*/
30-
public class CompositeChunkListener implements ChunkListener {
31+
public class CompositeChunkListener<I, O> implements ChunkListener<I, O> {
3132

3233
private final OrderedComposite<ChunkListener> listeners = new OrderedComposite<>();
3334

@@ -74,7 +75,10 @@ public void register(ChunkListener chunkListener) {
7475
* Call the registered listeners in reverse order.
7576
*
7677
* @see ChunkListener#afterChunk(ChunkContext context)
78+
* @deprecated since 6.0, use {@link #afterChunk(Chunk)} instead. Scheduled for
79+
* removal in 6.2 or later.
7780
*/
81+
@Deprecated(since = "6.0", forRemoval = true)
7882
@Override
7983
public void afterChunk(ChunkContext context) {
8084
for (Iterator<ChunkListener> iterator = listeners.reverse(); iterator.hasNext();) {
@@ -83,12 +87,28 @@ public void afterChunk(ChunkContext context) {
8387
}
8488
}
8589

90+
/**
91+
* Call the registered listeners in reverse order.
92+
*
93+
* @see ChunkListener#afterChunk(Chunk)
94+
*/
95+
@Override
96+
public void afterChunk(Chunk chunk) {
97+
for (Iterator<ChunkListener> iterator = listeners.reverse(); iterator.hasNext();) {
98+
ChunkListener listener = iterator.next();
99+
listener.afterChunk(chunk);
100+
}
101+
}
102+
86103
/**
87104
* Call the registered listeners in order, respecting and prioritizing those that
88105
* implement {@link Ordered}.
89106
*
90107
* @see ChunkListener#beforeChunk(ChunkContext context)
108+
* @deprecated since 6.0, use {@link #beforeChunk(Chunk)} instead. Scheduled for
109+
* removal in 6.2 or later.
91110
*/
111+
@Deprecated(since = "6.0", forRemoval = true)
92112
@Override
93113
public void beforeChunk(ChunkContext context) {
94114
for (Iterator<ChunkListener> iterator = listeners.iterator(); iterator.hasNext();) {
@@ -97,11 +117,28 @@ public void beforeChunk(ChunkContext context) {
97117
}
98118
}
99119

120+
/**
121+
* Call the registered listeners in order, respecting and prioritizing those that
122+
* implement {@link Ordered}.
123+
*
124+
* @see ChunkListener#beforeChunk(Chunk chunk)
125+
*/
126+
@Override
127+
public void beforeChunk(Chunk chunk) {
128+
for (Iterator<ChunkListener> iterator = listeners.iterator(); iterator.hasNext();) {
129+
ChunkListener listener = iterator.next();
130+
listener.beforeChunk(chunk);
131+
}
132+
}
133+
100134
/**
101135
* Call the registered listeners in reverse order.
102136
*
103137
* @see ChunkListener#afterChunkError(ChunkContext context)
138+
* @deprecated since 6.0, use {@link #onChunkError(Exception,Chunk)} instead.
139+
* Scheduled for removal in 6.2 or later.
104140
*/
141+
@Deprecated(since = "6.0", forRemoval = true)
105142
@Override
106143
public void afterChunkError(ChunkContext context) {
107144
for (Iterator<ChunkListener> iterator = listeners.reverse(); iterator.hasNext();) {
@@ -110,4 +147,17 @@ public void afterChunkError(ChunkContext context) {
110147
}
111148
}
112149

150+
/**
151+
* Call the registered listeners in reverse order.
152+
*
153+
* @see ChunkListener#onChunkError(Exception, Chunk)
154+
*/
155+
@Override
156+
public void onChunkError(Exception exception, Chunk chunk) {
157+
for (Iterator<ChunkListener> iterator = listeners.reverse(); iterator.hasNext();) {
158+
ChunkListener listener = iterator.next();
159+
listener.onChunkError(exception, chunk);
160+
}
161+
}
162+
113163
}

0 commit comments

Comments
 (0)