summaryrefslogtreecommitdiff
path: root/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp
diff options
context:
space:
mode:
Diffstat (limited to 'spring-messaging/src/test/java/org/springframework/messaging/simp/stomp')
-rw-r--r--spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java73
-rw-r--r--spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java22
-rw-r--r--spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java49
-rw-r--r--spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java3
4 files changed, 77 insertions, 70 deletions
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java
index ae7fd9d1..eb838dcf 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2015 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,11 +16,6 @@
package org.springframework.messaging.simp.stomp;
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Map;
@@ -51,6 +46,14 @@ import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.concurrent.SettableListenableFuture;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.notNull;
+import static org.mockito.Mockito.same;
+
/**
* Unit tests for {@link DefaultStompSession}.
*
@@ -80,7 +83,6 @@ public class DefaultStompSessionTests {
@Before
public void setUp() throws Exception {
-
MockitoAnnotations.initMocks(this);
this.sessionHandler = mock(StompSessionHandler.class);
@@ -96,7 +98,6 @@ public class DefaultStompSessionTests {
@Test
public void afterConnected() throws Exception {
-
assertFalse(this.session.isConnected());
this.connectHeaders.setHost("my-host");
this.connectHeaders.setHeartbeat(new long[] {11, 12});
@@ -122,7 +123,6 @@ public class DefaultStompSessionTests {
@Test
public void handleConnectedFrame() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -141,7 +141,6 @@ public class DefaultStompSessionTests {
@Test
public void heartbeatValues() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -164,7 +163,6 @@ public class DefaultStompSessionTests {
@Test
public void heartbeatNotSupportedByServer() throws Exception {
-
this.session.afterConnected(this.connection);
verify(this.connection).send(any());
@@ -181,7 +179,6 @@ public class DefaultStompSessionTests {
@Test
public void heartbeatTasks() throws Exception {
-
this.session.afterConnected(this.connection);
verify(this.connection).send(any());
@@ -217,7 +214,6 @@ public class DefaultStompSessionTests {
@Test
public void handleErrorFrame() throws Exception {
-
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.setContentType(new MimeType("text", "plain", UTF_8));
accessor.addNativeHeader("foo", "bar");
@@ -236,7 +232,6 @@ public class DefaultStompSessionTests {
@Test
public void handleErrorFrameWithEmptyPayload() throws Exception {
-
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.addNativeHeader("foo", "bar");
accessor.setLeaveMutable(true);
@@ -249,7 +244,6 @@ public class DefaultStompSessionTests {
@Test
public void handleErrorFrameWithConversionException() throws Exception {
-
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
accessor.setContentType(MimeTypeUtils.APPLICATION_JSON);
accessor.addNativeHeader("foo", "bar");
@@ -269,7 +263,6 @@ public class DefaultStompSessionTests {
@Test
public void handleMessageFrame() throws Exception {
-
this.session.afterConnected(this.connection);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
@@ -296,7 +289,6 @@ public class DefaultStompSessionTests {
@Test
public void handleMessageFrameWithConversionException() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -344,7 +336,6 @@ public class DefaultStompSessionTests {
@Test
public void send() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -361,13 +352,12 @@ public class DefaultStompSessionTests {
assertEquals(destination, stompHeaders.getDestination());
assertEquals(new MimeType("text", "plain", UTF_8), stompHeaders.getContentType());
- assertEquals(-1, stompHeaders.getContentLength()); // StompEncoder isn't involved
+ assertEquals(-1, stompHeaders.getContentLength()); // StompEncoder isn't involved
assertEquals(payload, new String(message.getPayload(), UTF_8));
}
@Test
public void sendWithReceipt() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -391,7 +381,6 @@ public class DefaultStompSessionTests {
@Test
public void sendWithConversionException() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -407,7 +396,6 @@ public class DefaultStompSessionTests {
@Test
public void sendWithExecutionException() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -426,7 +414,6 @@ public class DefaultStompSessionTests {
@Test
public void subscribe() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -446,7 +433,6 @@ public class DefaultStompSessionTests {
@Test
public void subscribeWithHeaders() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -473,7 +459,6 @@ public class DefaultStompSessionTests {
@Test
public void unsubscribe() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
@@ -492,8 +477,41 @@ public class DefaultStompSessionTests {
}
@Test
- public void receiptReceived() throws Exception {
+ public void ack() throws Exception {
+ this.session.afterConnected(this.connection);
+ assertTrue(this.session.isConnected());
+
+ String messageId = "123";
+ this.session.acknowledge(messageId, true);
+
+ Message<byte[]> message = this.messageCaptor.getValue();
+ StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
+ assertEquals(StompCommand.ACK, accessor.getCommand());
+
+ StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
+ assertEquals(stompHeaders.toString(), 1, stompHeaders.size());
+ assertEquals(messageId, stompHeaders.getId());
+ }
+
+ @Test
+ public void nack() throws Exception {
+ this.session.afterConnected(this.connection);
+ assertTrue(this.session.isConnected());
+
+ String messageId = "123";
+ this.session.acknowledge(messageId, false);
+
+ Message<byte[]> message = this.messageCaptor.getValue();
+ StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
+ assertEquals(StompCommand.NACK, accessor.getCommand());
+
+ StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
+ assertEquals(stompHeaders.toString(), 1, stompHeaders.size());
+ assertEquals(messageId, stompHeaders.getId());
+ }
+ @Test
+ public void receiptReceived() throws Exception {
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
@@ -518,7 +536,6 @@ public class DefaultStompSessionTests {
@Test
public void receiptReceivedBeforeTaskAdded() throws Exception {
-
this.session.afterConnected(this.connection);
this.session.setTaskScheduler(mock(TaskScheduler.class));
@@ -543,7 +560,6 @@ public class DefaultStompSessionTests {
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void receiptNotReceived() throws Exception {
-
TaskScheduler taskScheduler = mock(TaskScheduler.class);
this.session.afterConnected(this.connection);
@@ -575,7 +591,6 @@ public class DefaultStompSessionTests {
@Test
public void disconnect() throws Exception {
-
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java
index 9d8a82e7..94bfce4b 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2015 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,10 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.springframework.messaging.simp.stomp;
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.*;
+package org.springframework.messaging.simp.stomp;
import java.lang.reflect.Type;
import java.util.ArrayList;
@@ -41,6 +39,9 @@ import org.springframework.util.Assert;
import org.springframework.util.SocketUtils;
import org.springframework.util.concurrent.ListenableFuture;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.*;
+
/**
* Integration tests for {@link Reactor2TcpStompClient}.
*
@@ -86,7 +87,8 @@ public class Reactor2TcpStompClientTests {
public void tearDown() throws Exception {
try {
this.client.shutdown();
- } catch (Throwable ex) {
+ }
+ catch (Throwable ex) {
logger.error("Failed to shut client", ex);
}
final CountDownLatch latch = new CountDownLatch(1);
@@ -100,7 +102,6 @@ public class Reactor2TcpStompClientTests {
@Test
public void publishSubscribe() throws Exception {
-
String destination = "/topic/foo";
ConsumingHandler consumingHandler1 = new ConsumingHandler(destination);
ListenableFuture<StompSession> consumerFuture1 = this.client.connect(consumingHandler1);
@@ -146,9 +147,9 @@ public class Reactor2TcpStompClientTests {
public void handleTransportError(StompSession session, Throwable exception) {
logger.error(exception);
}
-
}
+
private static class ConsumingHandler extends LoggingSessionHandler {
private final List<String> topics;
@@ -157,14 +158,12 @@ public class Reactor2TcpStompClientTests {
private final List<String> received = new ArrayList<>();
-
public ConsumingHandler(String... topics) {
Assert.notEmpty(topics);
this.topics = Arrays.asList(topics);
this.subscriptionLatch = new CountDownLatch(this.topics.size());
}
-
public List<String> getReceived() {
return this.received;
}
@@ -208,16 +207,15 @@ public class Reactor2TcpStompClientTests {
}
return true;
}
-
}
+
private static class ProducingHandler extends LoggingSessionHandler {
private final List<String> topics = new ArrayList<>();
private final List<Object> payloads = new ArrayList<>();
-
public ProducingHandler addToSend(String topic, Object payload) {
this.topics.add(topic);
this.payloads.add(payload);
@@ -226,7 +224,7 @@ public class Reactor2TcpStompClientTests {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
- for (int i=0; i < this.topics.size(); i++) {
+ for (int i = 0; i < this.topics.size(); i++) {
session.send(this.topics.get(i), this.payloads.get(i));
}
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java
index c84dc71d..62e0407c 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2015 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.Message;
@@ -48,9 +49,7 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.SocketUtils;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
* Integration tests for {@link StompBrokerRelayMessageHandler} running against ActiveMQ.
@@ -59,13 +58,14 @@ import static org.junit.Assert.assertTrue;
*/
public class StompBrokerRelayMessageHandlerIntegrationTests {
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+
@Rule
public final TestName testName = new TestName();
private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandlerIntegrationTests.class);
- private static final Charset UTF_8 = Charset.forName("UTF-8");
-
private StompBrokerRelayMessageHandler relay;
private BrokerService activeMQBroker;
@@ -142,9 +142,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
logger.debug("Broker stopped");
}
+
@Test
public void publishSubscribe() throws Exception {
-
logger.debug("Starting test publishSubscribe()");
String sess1 = "sess1";
@@ -167,7 +167,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
this.responseHandler.expectMessages(send);
}
- @Test(expected=MessageDeliveryException.class)
+ @Test(expected = MessageDeliveryException.class)
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {
logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");
@@ -181,7 +181,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test
public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {
-
logger.debug("Starting test brokerBecomingUnvailableTriggersErrorFrame()");
String sess1 = "sess1";
@@ -197,7 +196,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test
public void brokerAvailabilityEventWhenStopped() throws Exception {
-
logger.debug("Starting test brokerAvailabilityEventWhenStopped()");
stopActiveMqBrokerAndAwait();
@@ -206,7 +204,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test
public void relayReconnectsIfBrokerComesBackUp() throws Exception {
-
logger.debug("Starting test relayReconnectsIfBrokerComesBackUp()");
String sess1 = "sess1";
@@ -232,7 +229,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test
public void disconnectWithReceipt() throws Exception {
-
logger.debug("Starting test disconnectWithReceipt()");
MessageExchange connect = MessageExchangeBuilder.connect("sess1").build();
@@ -270,6 +266,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
}
+
private static class TestMessageHandler implements MessageHandler {
private final BlockingQueue<Message<?>> queue = new LinkedBlockingQueue<>();
@@ -283,17 +280,13 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException {
-
List<MessageExchange> expectedMessages =
new ArrayList<MessageExchange>(Arrays.<MessageExchange>asList(messageExchanges));
-
while (expectedMessages.size() > 0) {
Message<?> message = this.queue.poll(10000, TimeUnit.MILLISECONDS);
assertNotNull("Timed out waiting for messages, expected [" + expectedMessages + "]", message);
-
MessageExchange match = findMatch(expectedMessages, message);
assertNotNull("Unexpected message=" + message + ", expected [" + expectedMessages + "]", match);
-
expectedMessages.remove(match);
}
}
@@ -308,6 +301,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
}
+
/**
* Holds a message as well as expected and actual messages matched against expectations.
*/
@@ -326,7 +320,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
public boolean matchMessage(Message<?> message) {
- for (int i=0 ; i < this.expected.length; i++) {
+ for (int i = 0 ; i < this.expected.length; i++) {
if (this.expected[i].match(message)) {
this.actual[i] = message;
return true;
@@ -343,6 +337,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
}
+
private static class MessageExchangeBuilder {
private final Message<?> message;
@@ -351,8 +346,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
private final List<MessageMatcher> expected = new ArrayList<>();
-
- private MessageExchangeBuilder(Message<?> message) {
+ public MessageExchangeBuilder(Message<?> message) {
this.message = message;
this.headers = StompHeaderAccessor.wrap(message);
}
@@ -442,25 +436,24 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
}
- private static interface MessageMatcher {
- boolean match(Message<?> message);
+ private interface MessageMatcher {
+ boolean match(Message<?> message);
}
+
private static class StompFrameMessageMatcher implements MessageMatcher {
private final StompCommand command;
private final String sessionId;
-
public StompFrameMessageMatcher(StompCommand command, String sessionId) {
this.command = command;
this.sessionId = sessionId;
}
-
@Override
public final boolean match(Message<?> message) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
@@ -480,6 +473,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
}
+
private static class StompReceiptFrameMessageMatcher extends StompFrameMessageMatcher {
private final String receiptId;
@@ -500,6 +494,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
}
+
private static class StompMessageFrameMessageMatcher extends StompFrameMessageMatcher {
private final String subscriptionId;
@@ -508,7 +503,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
private final Object payload;
-
public StompMessageFrameMessageMatcher(String sessionId, String subscriptionId, String destination, Object payload) {
super(StompCommand.MESSAGE, sessionId);
this.subscriptionId = subscriptionId;
@@ -536,18 +530,17 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
protected String getPayloadAsText() {
- return (this.payload instanceof byte[])
- ? new String((byte[]) this.payload, UTF_8) : payload.toString();
+ return (this.payload instanceof byte[]) ?
+ new String((byte[]) this.payload, UTF_8) : this.payload.toString();
}
}
- private static class StompConnectedFrameMessageMatcher extends StompFrameMessageMatcher {
+ private static class StompConnectedFrameMessageMatcher extends StompFrameMessageMatcher {
public StompConnectedFrameMessageMatcher(String sessionId) {
super(StompCommand.CONNECTED, sessionId);
}
-
}
}
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java
index 3da0df98..b6afa9a7 100644
--- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java
+++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java
@@ -325,7 +325,8 @@ public class StompCodecTests {
this.decoder.apply(buffer);
if (consumer.arguments.isEmpty()) {
return null;
- } else {
+ }
+ else {
return consumer.arguments.get(0);
}
}