Skip to content

Commit 5a53c3d

Browse files
authored
fix #23 (#24)
Fixing #23 DataBuffer need to be released in BrokerFrameDecoder.decode()
1 parent 8d4c432 commit 5a53c3d

File tree

1 file changed

+25
-19
lines changed

1 file changed

+25
-19
lines changed

rsocket-broker-common-spring/src/main/java/io/rsocket/broker/common/spring/BrokerFrameDecoder.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.core.codec.AbstractDecoder;
3737
import org.springframework.core.codec.DecodingException;
3838
import org.springframework.core.io.buffer.DataBuffer;
39+
import org.springframework.core.io.buffer.DataBufferUtils;
3940
import org.springframework.core.io.buffer.NettyDataBuffer;
4041
import org.springframework.util.MimeType;
4142

@@ -60,27 +61,32 @@ public Flux<BrokerFrame> decode(Publisher<DataBuffer> inputStream, ResolvableTyp
6061

6162
@Override
6263
public BrokerFrame decode(DataBuffer buffer, ResolvableType targetType, MimeType mimeType, Map<String, Object> hints) throws DecodingException {
63-
ByteBuf byteBuf = asByteBuf(buffer);
64-
// FIXME hack for ClusterJoinListener.setupRSocket() in broker broker
65-
if (!byteBuf.isReadable()) {
66-
return new BrokerFrame(FrameType.RESERVED, 0) {
67-
};
64+
try {
65+
ByteBuf byteBuf = asByteBuf(buffer);
66+
// FIXME hack for ClusterJoinListener.setupRSocket() in broker broker
67+
if (!byteBuf.isReadable()) {
68+
return new BrokerFrame(FrameType.RESERVED, 0) {
69+
};
70+
}
71+
int flags = FrameHeaderFlyweight.flags(byteBuf);
72+
FrameType frameType = FrameHeaderFlyweight.frameType(byteBuf);
73+
switch (frameType) {
74+
case ADDRESS:
75+
return Address.from(byteBuf, flags);
76+
case BROKER_INFO:
77+
return BrokerInfo.from(byteBuf);
78+
case ROUTE_JOIN:
79+
return RouteJoin.from(byteBuf);
80+
case ROUTE_REMOVE:
81+
return RouteRemove.from(byteBuf);
82+
case ROUTE_SETUP:
83+
return RouteSetup.from(byteBuf);
84+
}
85+
throw new IllegalArgumentException("Unknown FrameType " + frameType);
6886
}
69-
int flags = FrameHeaderFlyweight.flags(byteBuf);
70-
FrameType frameType = FrameHeaderFlyweight.frameType(byteBuf);
71-
switch (frameType) {
72-
case ADDRESS:
73-
return Address.from(byteBuf, flags);
74-
case BROKER_INFO:
75-
return BrokerInfo.from(byteBuf);
76-
case ROUTE_JOIN:
77-
return RouteJoin.from(byteBuf);
78-
case ROUTE_REMOVE:
79-
return RouteRemove.from(byteBuf);
80-
case ROUTE_SETUP:
81-
return RouteSetup.from(byteBuf);
87+
finally {
88+
DataBufferUtils.release(buffer);
8289
}
83-
throw new IllegalArgumentException("Unknown FrameType " + frameType);
8490
}
8591

8692
private static ByteBuf asByteBuf(DataBuffer buffer) {

0 commit comments

Comments
 (0)