diff options
Diffstat (limited to 'spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java')
-rw-r--r-- | spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java | 28 |
1 files changed, 26 insertions, 2 deletions
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java index 71160780..44cb5f8e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,7 +67,6 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { private static final Message<byte[]> HEARTBEAT; - static { StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat(); HEARTBEAT = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders()); @@ -93,6 +92,8 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { private volatile TcpConnection<byte[]> connection; + private volatile String version; + private final AtomicInteger subscriptionIndex = new AtomicInteger(); private final Map<String, DefaultSubscription> subscriptions = new ConcurrentHashMap<String, DefaultSubscription>(4); @@ -310,6 +311,28 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { return subscription; } + @Override + public Receiptable acknowledge(String messageId, boolean consumed) { + StompHeaders stompHeaders = new StompHeaders(); + if ("1.1".equals(this.version)) { + stompHeaders.setMessageId(messageId); + } + else { + stompHeaders.setId(messageId); + } + + String receiptId = checkOrAddReceipt(stompHeaders); + Receiptable receiptable = new ReceiptHandler(receiptId); + + StompCommand command = (consumed ? StompCommand.ACK : StompCommand.NACK); + StompHeaderAccessor accessor = createHeaderAccessor(command); + accessor.addNativeHeaders(stompHeaders); + Message<byte[]> message = createMessage(accessor, null); + execute(message); + + return receiptable; + } + private void unsubscribe(String id) { StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE); accessor.setSubscriptionId(id); @@ -390,6 +413,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { } else if (StompCommand.CONNECTED.equals(command)) { initHeartbeatTasks(stompHeaders); + this.version = stompHeaders.getFirst("version"); this.sessionFuture.set(this); this.sessionHandler.afterConnected(this, stompHeaders); } |