summaryrefslogtreecommitdiff
path: root/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java')
-rw-r--r--spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java28
1 files changed, 26 insertions, 2 deletions
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java
index 71160780..44cb5f8e 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2015 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -67,7 +67,6 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
private static final Message<byte[]> HEARTBEAT;
-
static {
StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
HEARTBEAT = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders());
@@ -93,6 +92,8 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
private volatile TcpConnection<byte[]> connection;
+ private volatile String version;
+
private final AtomicInteger subscriptionIndex = new AtomicInteger();
private final Map<String, DefaultSubscription> subscriptions = new ConcurrentHashMap<String, DefaultSubscription>(4);
@@ -310,6 +311,28 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
return subscription;
}
+ @Override
+ public Receiptable acknowledge(String messageId, boolean consumed) {
+ StompHeaders stompHeaders = new StompHeaders();
+ if ("1.1".equals(this.version)) {
+ stompHeaders.setMessageId(messageId);
+ }
+ else {
+ stompHeaders.setId(messageId);
+ }
+
+ String receiptId = checkOrAddReceipt(stompHeaders);
+ Receiptable receiptable = new ReceiptHandler(receiptId);
+
+ StompCommand command = (consumed ? StompCommand.ACK : StompCommand.NACK);
+ StompHeaderAccessor accessor = createHeaderAccessor(command);
+ accessor.addNativeHeaders(stompHeaders);
+ Message<byte[]> message = createMessage(accessor, null);
+ execute(message);
+
+ return receiptable;
+ }
+
private void unsubscribe(String id) {
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE);
accessor.setSubscriptionId(id);
@@ -390,6 +413,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
}
else if (StompCommand.CONNECTED.equals(command)) {
initHeartbeatTasks(stompHeaders);
+ this.version = stompHeaders.getFirst("version");
this.sessionFuture.set(this);
this.sessionHandler.afterConnected(this, stompHeaders);
}