diff options
Diffstat (limited to 'spring-messaging/src/test/java/org/springframework/messaging/simp/stomp')
4 files changed, 77 insertions, 70 deletions
diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java index ae7fd9d1..eb838dcf 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,6 @@ package org.springframework.messaging.simp.stomp; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; - import java.nio.charset.Charset; import java.util.Date; import java.util.Map; @@ -51,6 +46,14 @@ import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; import org.springframework.util.concurrent.SettableListenableFuture; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.notNull; +import static org.mockito.Mockito.same; + /** * Unit tests for {@link DefaultStompSession}. * @@ -80,7 +83,6 @@ public class DefaultStompSessionTests { @Before public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); this.sessionHandler = mock(StompSessionHandler.class); @@ -96,7 +98,6 @@ public class DefaultStompSessionTests { @Test public void afterConnected() throws Exception { - assertFalse(this.session.isConnected()); this.connectHeaders.setHost("my-host"); this.connectHeaders.setHeartbeat(new long[] {11, 12}); @@ -122,7 +123,6 @@ public class DefaultStompSessionTests { @Test public void handleConnectedFrame() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -141,7 +141,6 @@ public class DefaultStompSessionTests { @Test public void heartbeatValues() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -164,7 +163,6 @@ public class DefaultStompSessionTests { @Test public void heartbeatNotSupportedByServer() throws Exception { - this.session.afterConnected(this.connection); verify(this.connection).send(any()); @@ -181,7 +179,6 @@ public class DefaultStompSessionTests { @Test public void heartbeatTasks() throws Exception { - this.session.afterConnected(this.connection); verify(this.connection).send(any()); @@ -217,7 +214,6 @@ public class DefaultStompSessionTests { @Test public void handleErrorFrame() throws Exception { - StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); accessor.setContentType(new MimeType("text", "plain", UTF_8)); accessor.addNativeHeader("foo", "bar"); @@ -236,7 +232,6 @@ public class DefaultStompSessionTests { @Test public void handleErrorFrameWithEmptyPayload() throws Exception { - StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); accessor.addNativeHeader("foo", "bar"); accessor.setLeaveMutable(true); @@ -249,7 +244,6 @@ public class DefaultStompSessionTests { @Test public void handleErrorFrameWithConversionException() throws Exception { - StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR); accessor.setContentType(MimeTypeUtils.APPLICATION_JSON); accessor.addNativeHeader("foo", "bar"); @@ -269,7 +263,6 @@ public class DefaultStompSessionTests { @Test public void handleMessageFrame() throws Exception { - this.session.afterConnected(this.connection); StompFrameHandler frameHandler = mock(StompFrameHandler.class); @@ -296,7 +289,6 @@ public class DefaultStompSessionTests { @Test public void handleMessageFrameWithConversionException() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -344,7 +336,6 @@ public class DefaultStompSessionTests { @Test public void send() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -361,13 +352,12 @@ public class DefaultStompSessionTests { assertEquals(destination, stompHeaders.getDestination()); assertEquals(new MimeType("text", "plain", UTF_8), stompHeaders.getContentType()); - assertEquals(-1, stompHeaders.getContentLength()); // StompEncoder isn't involved + assertEquals(-1, stompHeaders.getContentLength()); // StompEncoder isn't involved assertEquals(payload, new String(message.getPayload(), UTF_8)); } @Test public void sendWithReceipt() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -391,7 +381,6 @@ public class DefaultStompSessionTests { @Test public void sendWithConversionException() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -407,7 +396,6 @@ public class DefaultStompSessionTests { @Test public void sendWithExecutionException() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -426,7 +414,6 @@ public class DefaultStompSessionTests { @Test public void subscribe() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -446,7 +433,6 @@ public class DefaultStompSessionTests { @Test public void subscribeWithHeaders() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -473,7 +459,6 @@ public class DefaultStompSessionTests { @Test public void unsubscribe() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); @@ -492,8 +477,41 @@ public class DefaultStompSessionTests { } @Test - public void receiptReceived() throws Exception { + public void ack() throws Exception { + this.session.afterConnected(this.connection); + assertTrue(this.session.isConnected()); + + String messageId = "123"; + this.session.acknowledge(messageId, true); + + Message<byte[]> message = this.messageCaptor.getValue(); + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + assertEquals(StompCommand.ACK, accessor.getCommand()); + + StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); + assertEquals(stompHeaders.toString(), 1, stompHeaders.size()); + assertEquals(messageId, stompHeaders.getId()); + } + + @Test + public void nack() throws Exception { + this.session.afterConnected(this.connection); + assertTrue(this.session.isConnected()); + + String messageId = "123"; + this.session.acknowledge(messageId, false); + + Message<byte[]> message = this.messageCaptor.getValue(); + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + assertEquals(StompCommand.NACK, accessor.getCommand()); + + StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); + assertEquals(stompHeaders.toString(), 1, stompHeaders.size()); + assertEquals(messageId, stompHeaders.getId()); + } + @Test + public void receiptReceived() throws Exception { this.session.afterConnected(this.connection); this.session.setTaskScheduler(mock(TaskScheduler.class)); @@ -518,7 +536,6 @@ public class DefaultStompSessionTests { @Test public void receiptReceivedBeforeTaskAdded() throws Exception { - this.session.afterConnected(this.connection); this.session.setTaskScheduler(mock(TaskScheduler.class)); @@ -543,7 +560,6 @@ public class DefaultStompSessionTests { @Test @SuppressWarnings({ "unchecked", "rawtypes" }) public void receiptNotReceived() throws Exception { - TaskScheduler taskScheduler = mock(TaskScheduler.class); this.session.afterConnected(this.connection); @@ -575,7 +591,6 @@ public class DefaultStompSessionTests { @Test public void disconnect() throws Exception { - this.session.afterConnected(this.connection); assertTrue(this.session.isConnected()); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java index 9d8a82e7..94bfce4b 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,10 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.messaging.simp.stomp; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +package org.springframework.messaging.simp.stomp; import java.lang.reflect.Type; import java.util.ArrayList; @@ -41,6 +39,9 @@ import org.springframework.util.Assert; import org.springframework.util.SocketUtils; import org.springframework.util.concurrent.ListenableFuture; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + /** * Integration tests for {@link Reactor2TcpStompClient}. * @@ -86,7 +87,8 @@ public class Reactor2TcpStompClientTests { public void tearDown() throws Exception { try { this.client.shutdown(); - } catch (Throwable ex) { + } + catch (Throwable ex) { logger.error("Failed to shut client", ex); } final CountDownLatch latch = new CountDownLatch(1); @@ -100,7 +102,6 @@ public class Reactor2TcpStompClientTests { @Test public void publishSubscribe() throws Exception { - String destination = "/topic/foo"; ConsumingHandler consumingHandler1 = new ConsumingHandler(destination); ListenableFuture<StompSession> consumerFuture1 = this.client.connect(consumingHandler1); @@ -146,9 +147,9 @@ public class Reactor2TcpStompClientTests { public void handleTransportError(StompSession session, Throwable exception) { logger.error(exception); } - } + private static class ConsumingHandler extends LoggingSessionHandler { private final List<String> topics; @@ -157,14 +158,12 @@ public class Reactor2TcpStompClientTests { private final List<String> received = new ArrayList<>(); - public ConsumingHandler(String... topics) { Assert.notEmpty(topics); this.topics = Arrays.asList(topics); this.subscriptionLatch = new CountDownLatch(this.topics.size()); } - public List<String> getReceived() { return this.received; } @@ -208,16 +207,15 @@ public class Reactor2TcpStompClientTests { } return true; } - } + private static class ProducingHandler extends LoggingSessionHandler { private final List<String> topics = new ArrayList<>(); private final List<Object> payloads = new ArrayList<>(); - public ProducingHandler addToSend(String topic, Object payload) { this.topics.add(topic); this.payloads.add(payload); @@ -226,7 +224,7 @@ public class Reactor2TcpStompClientTests { @Override public void afterConnected(StompSession session, StompHeaders connectedHeaders) { - for (int i=0; i < this.topics.size(); i++) { + for (int i = 0; i < this.topics.size(); i++) { session.send(this.topics.get(i), this.payloads.get(i)); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index c84dc71d..62e0407c 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; + import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.messaging.Message; @@ -48,9 +49,7 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.SocketUtils; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * Integration tests for {@link StompBrokerRelayMessageHandler} running against ActiveMQ. @@ -59,13 +58,14 @@ import static org.junit.Assert.assertTrue; */ public class StompBrokerRelayMessageHandlerIntegrationTests { + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + @Rule public final TestName testName = new TestName(); private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandlerIntegrationTests.class); - private static final Charset UTF_8 = Charset.forName("UTF-8"); - private StompBrokerRelayMessageHandler relay; private BrokerService activeMQBroker; @@ -142,9 +142,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { logger.debug("Broker stopped"); } + @Test public void publishSubscribe() throws Exception { - logger.debug("Starting test publishSubscribe()"); String sess1 = "sess1"; @@ -167,7 +167,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.responseHandler.expectMessages(send); } - @Test(expected=MessageDeliveryException.class) + @Test(expected = MessageDeliveryException.class) public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception { logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()"); @@ -181,7 +181,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception { - logger.debug("Starting test brokerBecomingUnvailableTriggersErrorFrame()"); String sess1 = "sess1"; @@ -197,7 +196,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test public void brokerAvailabilityEventWhenStopped() throws Exception { - logger.debug("Starting test brokerAvailabilityEventWhenStopped()"); stopActiveMqBrokerAndAwait(); @@ -206,7 +204,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test public void relayReconnectsIfBrokerComesBackUp() throws Exception { - logger.debug("Starting test relayReconnectsIfBrokerComesBackUp()"); String sess1 = "sess1"; @@ -232,7 +229,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test public void disconnectWithReceipt() throws Exception { - logger.debug("Starting test disconnectWithReceipt()"); MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); @@ -270,6 +266,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } } + private static class TestMessageHandler implements MessageHandler { private final BlockingQueue<Message<?>> queue = new LinkedBlockingQueue<>(); @@ -283,17 +280,13 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException { - List<MessageExchange> expectedMessages = new ArrayList<MessageExchange>(Arrays.<MessageExchange>asList(messageExchanges)); - while (expectedMessages.size() > 0) { Message<?> message = this.queue.poll(10000, TimeUnit.MILLISECONDS); assertNotNull("Timed out waiting for messages, expected [" + expectedMessages + "]", message); - MessageExchange match = findMatch(expectedMessages, message); assertNotNull("Unexpected message=" + message + ", expected [" + expectedMessages + "]", match); - expectedMessages.remove(match); } } @@ -308,6 +301,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } } + /** * Holds a message as well as expected and actual messages matched against expectations. */ @@ -326,7 +320,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } public boolean matchMessage(Message<?> message) { - for (int i=0 ; i < this.expected.length; i++) { + for (int i = 0 ; i < this.expected.length; i++) { if (this.expected[i].match(message)) { this.actual[i] = message; return true; @@ -343,6 +337,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } } + private static class MessageExchangeBuilder { private final Message<?> message; @@ -351,8 +346,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { private final List<MessageMatcher> expected = new ArrayList<>(); - - private MessageExchangeBuilder(Message<?> message) { + public MessageExchangeBuilder(Message<?> message) { this.message = message; this.headers = StompHeaderAccessor.wrap(message); } @@ -442,25 +436,24 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } } - private static interface MessageMatcher { - boolean match(Message<?> message); + private interface MessageMatcher { + boolean match(Message<?> message); } + private static class StompFrameMessageMatcher implements MessageMatcher { private final StompCommand command; private final String sessionId; - public StompFrameMessageMatcher(StompCommand command, String sessionId) { this.command = command; this.sessionId = sessionId; } - @Override public final boolean match(Message<?> message) { StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); @@ -480,6 +473,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } } + private static class StompReceiptFrameMessageMatcher extends StompFrameMessageMatcher { private final String receiptId; @@ -500,6 +494,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } } + private static class StompMessageFrameMessageMatcher extends StompFrameMessageMatcher { private final String subscriptionId; @@ -508,7 +503,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { private final Object payload; - public StompMessageFrameMessageMatcher(String sessionId, String subscriptionId, String destination, Object payload) { super(StompCommand.MESSAGE, sessionId); this.subscriptionId = subscriptionId; @@ -536,18 +530,17 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } protected String getPayloadAsText() { - return (this.payload instanceof byte[]) - ? new String((byte[]) this.payload, UTF_8) : payload.toString(); + return (this.payload instanceof byte[]) ? + new String((byte[]) this.payload, UTF_8) : this.payload.toString(); } } - private static class StompConnectedFrameMessageMatcher extends StompFrameMessageMatcher { + private static class StompConnectedFrameMessageMatcher extends StompFrameMessageMatcher { public StompConnectedFrameMessageMatcher(String sessionId) { super(StompCommand.CONNECTED, sessionId); } - } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java index 3da0df98..b6afa9a7 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java @@ -325,7 +325,8 @@ public class StompCodecTests { this.decoder.apply(buffer); if (consumer.arguments.isEmpty()) { return null; - } else { + } + else { return consumer.arguments.get(0); } } |