diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java index 7ea5258..c3bac3f 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java @@ -382,6 +382,7 @@ protected jakarta.jms.Message convertToJMSMessage(Message message) throws JMSExc } jmsMessage.setJMSTimestamp(getJMSTimestamp(message)); + jmsMessage.setJMSDeliveryTime(getApproximateFirstReceiveTimestamp(message)); return jmsMessage; } @@ -395,6 +396,16 @@ private long getJMSTimestamp(Message message) { } } + private long getApproximateFirstReceiveTimestamp(Message message) { + Map systemAttributes = message.attributesAsStrings(); + String timestamp = systemAttributes.get(SQSMessagingClientConstants.APPROXIMATE_FIRST_RECEIVE_TIMESTAMP); + if (timestamp != null) { + return Long.parseLong(timestamp); + } else { + return 0L; + } + } + protected void nackQueueMessages() { // Also nack messages already in the messageQueue synchronized (stateLock) { diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessagingClientConstants.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessagingClientConstants.java index a2cd748..9a46abc 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessagingClientConstants.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessagingClientConstants.java @@ -80,6 +80,8 @@ public class SQSMessagingClientConstants { public static final String SEQUENCE_NUMBER = "SequenceNumber"; + public static final String APPROXIMATE_FIRST_RECEIVE_TIMESTAMP = "ApproximateFirstReceiveTimestamp"; + static final String APPENDED_USER_AGENT_HEADER_VERSION; static { try { diff --git a/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java b/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java index 3aeca56..16fe7ee 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java +++ b/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java @@ -104,6 +104,7 @@ public class SQSMessage implements Message { private String type; private SQSQueueDestination replyTo; private Destination destination; + private long deliveryTime; private final Map properties = new HashMap<>(); @@ -422,13 +423,12 @@ public void setJMSExpiration(long expiration) throws JMSException { @Override public long getJMSDeliveryTime() throws JMSException { - // FIXME - return 0; + return deliveryTime; } @Override public void setJMSDeliveryTime(long deliveryTime) throws JMSException { - // FIXME + this.deliveryTime = deliveryTime; } @Override diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java index 4d52c70..3f4195c 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetchTest.java @@ -997,7 +997,8 @@ public void testConvertToJMSMessageTextTypeAttribute(int numberOfMessagesToPrefe long now = System.currentTimeMillis(); Map mapAttributes = Map.of( SQSMessagingClientConstants.APPROXIMATE_RECEIVE_COUNT, "1", - SQSMessagingClientConstants.SENT_TIMESTAMP, Long.toString(now)); + SQSMessagingClientConstants.SENT_TIMESTAMP, Long.toString(now), + SQSMessagingClientConstants.APPROXIMATE_FIRST_RECEIVE_TIMESTAMP, Long.toString(now)); // Return message attributes with message type 'TEXT' Message message = Message.builder() @@ -1017,6 +1018,7 @@ public void testConvertToJMSMessageTextTypeAttribute(int numberOfMessagesToPrefe assertTrue(jsmMessage instanceof SQSTextMessage); assertEquals(message.body(), "MessageBody"); assertEquals(jsmMessage.getJMSTimestamp(), now); + assertEquals(jsmMessage.getJMSDeliveryTime(), now); } /** @@ -1830,7 +1832,7 @@ public void testRequestedMessageTracking(int numberOfMessagesToPrefetch) throws // Wait to make sure the received calls have gotten far enough to // wait on the message queue - allReceivesWaiting.await(); + allReceivesWaiting.await(1000, TimeUnit.MILLISECONDS); assertEquals(concurrentReceives, consumerPrefetch.messagesRequested);