summaryrefslogtreecommitdiff
path: root/spring-jms/src/main
diff options
context:
space:
mode:
authorEmmanuel Bourg <ebourg@apache.org>2016-08-02 11:13:32 +0200
committerEmmanuel Bourg <ebourg@apache.org>2016-08-02 11:13:32 +0200
commitf69f2a4b8ea697b3a631c0dc7a470e3c9793fee3 (patch)
treedb2f25b29aa3e59c463ab41d3f2856f6265bb1a5 /spring-jms/src/main
parent5575b60c30c5a0c308c4ba3a2db93956d8c1746c (diff)
Imported Upstream version 4.2.6
Diffstat (limited to 'spring-jms/src/main')
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/annotation/EnableJms.java24
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/annotation/JmsBootstrapConfiguration.java3
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java3
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java77
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/annotation/JmsListeners.java44
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java25
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java23
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java2
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java2
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java35
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java119
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java61
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java2
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java2
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/connection/TransactionAwareConnectionFactoryProxy.java6
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java4
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java32
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java10
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java46
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/listener/MessageListenerContainer.java10
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java47
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/listener/adapter/JmsResponse.java155
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java17
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManager.java11
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/remoting/JmsInvokerClientInterceptor.java89
-rw-r--r--spring-jms/src/main/java/org/springframework/jms/support/converter/MessagingMessageConverter.java12
-rw-r--r--spring-jms/src/main/resources/META-INF/spring.schemas3
-rw-r--r--spring-jms/src/main/resources/org/springframework/jms/config/spring-jms-4.2.xsd638
28 files changed, 1329 insertions, 173 deletions
diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/EnableJms.java b/spring-jms/src/main/java/org/springframework/jms/annotation/EnableJms.java
index a896e5f4..a8767010 100644
--- a/spring-jms/src/main/java/org/springframework/jms/annotation/EnableJms.java
+++ b/spring-jms/src/main/java/org/springframework/jms/annotation/EnableJms.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@ import org.springframework.context.annotation.Import;
* &#064;Configuration
* &#064;EnableJms
* public class AppConfig {
+ *
* &#064;Bean
* public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
* DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
@@ -43,6 +44,7 @@ import org.springframework.context.annotation.Import;
* factory.setConcurrency("5");
* return factory;
* }
+ *
* // other &#064;Bean definitions
* }</pre>
*
@@ -59,6 +61,7 @@ import org.springframework.context.annotation.Import;
* package com.acme.foo;
*
* public class MyService {
+ *
* &#064;JmsListener(containerFactory = "myJmsListenerContainerFactory", destination="myQueue")
* public void process(String msg) {
* // process incoming message
@@ -78,6 +81,7 @@ import org.springframework.context.annotation.Import;
* &#064;Configuration
* &#064;EnableJms
* public class AppConfig {
+ *
* &#064;Bean
* public MyService myService() {
* return new MyService();
@@ -112,10 +116,9 @@ import org.springframework.context.annotation.Import;
* // process incoming message
* }</pre>
*
- * These features are abstracted by the {@link org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
- * MessageHandlerMethodFactory} that is responsible to build the necessary invoker to process
- * the annotated method. By default, {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory
- * DefaultMessageHandlerMethodFactory} is used.
+ * These features are abstracted by the {@link org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory}
+ * that is responsible to build the necessary invoker to process the annotated method. By default,
+ * {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory} is used.
*
* <p>When more control is desired, a {@code @Configuration} class may implement
* {@link JmsListenerConfigurer}. This allows access to the underlying
@@ -127,6 +130,7 @@ import org.springframework.context.annotation.Import;
* &#064;Configuration
* &#064;EnableJms
* public class AppConfig implements JmsListenerConfigurer {
+ *
* &#064;Override
* public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
* registrar.setContainerFactory(myJmsListenerContainerFactory());
@@ -145,16 +149,18 @@ import org.springframework.context.annotation.Import;
*
* For reference, the example above can be compared to the following Spring XML
* configuration:
+ *
* <pre class="code">
* {@code <beans>
+ *
* <jms:annotation-driven container-factory="myJmsListenerContainerFactory"/>
*
- * <bean id="myJmsListenerContainerFactory"
- * class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
+ * <bean id="myJmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
* // factory settings
* </bean>
*
* <bean id="myService" class="com.acme.foo.MyService"/>
+ *
* </beans>
* }</pre>
*
@@ -169,6 +175,7 @@ import org.springframework.context.annotation.Import;
* &#064;Configuration
* &#064;EnableJms
* public class AppConfig implements JmsListenerConfigurer {
+ *
* &#064;Override
* public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
* registrar.setEndpointRegistry(myJmsListenerEndpointRegistry());
@@ -197,6 +204,7 @@ import org.springframework.context.annotation.Import;
* configuration:
* <pre class="code">
* {@code <beans>
+ *
* <jms:annotation-driven registry="myJmsListenerEndpointRegistry"
* handler-method-factory="myJmsHandlerMethodFactory"/&gt;
*
@@ -211,6 +219,7 @@ import org.springframework.context.annotation.Import;
* </bean>
*
* <bean id="myService" class="com.acme.foo.MyService"/>
+ *
* </beans>
* }</pre>
*
@@ -222,6 +231,7 @@ import org.springframework.context.annotation.Import;
* &#064;Configuration
* &#064;EnableJms
* public class AppConfig implements JmsListenerConfigurer {
+ *
* &#064;Override
* public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
* SimpleJmsListenerEndpoint myEndpoint = new SimpleJmsListenerEndpoint();
diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsBootstrapConfiguration.java b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsBootstrapConfiguration.java
index e42c72e6..3ee0b0c0 100644
--- a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsBootstrapConfiguration.java
+++ b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsBootstrapConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@ import org.springframework.jms.config.JmsListenerEndpointRegistry;
* @see EnableJms
*/
@Configuration
+@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class JmsBootstrapConfiguration {
@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java
index a4617023..bf62c493 100644
--- a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java
+++ b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListener.java
@@ -18,6 +18,7 @@ package org.springframework.jms.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@@ -69,10 +70,12 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
* @since 4.1
* @see EnableJms
* @see JmsListenerAnnotationBeanPostProcessor
+ * @see JmsListeners
*/
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
+@Repeatable(JmsListeners.class)
@MessageMapping
public @interface JmsListener {
diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java
index d50b0848..10b35b7e 100644
--- a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java
+++ b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListenerAnnotationBeanPostProcessor.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2015 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@ package org.springframework.jms.annotation;
import java.lang.reflect.Method;
import java.util.Collections;
-import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +36,7 @@ import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.core.MethodIntrospector;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.jms.config.JmsListenerConfigUtils;
@@ -48,7 +48,6 @@ import org.springframework.messaging.handler.annotation.support.DefaultMessageHa
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
-import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
@@ -194,18 +193,16 @@ public class JmsListenerAnnotationBeanPostProcessor
@Override
public Object postProcessAfterInitialization(final Object bean, String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
- final Set<Method> annotatedMethods = new LinkedHashSet<Method>(1);
Class<?> targetClass = AopUtils.getTargetClass(bean);
- ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
- @Override
- public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
- JmsListener jmsListener = AnnotationUtils.getAnnotation(method, JmsListener.class);
- if (jmsListener != null) {
- processJmsListener(jmsListener, method, bean);
- annotatedMethods.add(method);
- }
- }
- });
+ Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
+ new MethodIntrospector.MetadataLookup<Set<JmsListener>>() {
+ @Override
+ public Set<JmsListener> inspect(Method method) {
+ Set<JmsListener> listenerMethods = AnnotationUtils.getRepeatableAnnotations(
+ method, JmsListener.class, JmsListeners.class);
+ return (!listenerMethods.isEmpty() ? listenerMethods : null);
+ }
+ });
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
if (logger.isTraceEnabled()) {
@@ -214,6 +211,12 @@ public class JmsListenerAnnotationBeanPostProcessor
}
else {
// Non-empty set of methods
+ for (Map.Entry<Method, Set<JmsListener>> entry : annotatedMethods.entrySet()) {
+ Method method = entry.getKey();
+ for (JmsListener listener : entry.getValue()) {
+ processJmsListener(listener, method, bean);
+ }
+ }
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +
"': " + annotatedMethods);
@@ -223,30 +226,24 @@ public class JmsListenerAnnotationBeanPostProcessor
return bean;
}
- protected void processJmsListener(JmsListener jmsListener, Method method, Object bean) {
- if (AopUtils.isJdkDynamicProxy(bean)) {
- try {
- // Found a @JmsListener method on the target class for this JDK proxy ->
- // is it also present on the proxy itself?
- method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
- }
- catch (SecurityException ex) {
- ReflectionUtils.handleReflectionException(ex);
- }
- catch (NoSuchMethodException ex) {
- throw new IllegalStateException(String.format(
- "@JmsListener method '%s' found on bean target class '%s', " +
- "but not found in any interface(s) for bean JDK proxy. Either " +
- "pull the method up to an interface or switch to subclass (CGLIB) " +
- "proxies by setting proxy-target-class/proxyTargetClass " +
- "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()));
- }
- }
+ /**
+ * Process the given {@link JmsListener} annotation on the given method,
+ * registering a corresponding endpoint for the given bean instance.
+ * @param jmsListener the annotation to process
+ * @param mostSpecificMethod the annotated method
+ * @param bean the instance to invoke the method on
+ * @see #createMethodJmsListenerEndpoint()
+ * @see JmsListenerEndpointRegistrar#registerEndpoint
+ */
+ protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {
+ Method invocableMethod = MethodIntrospector.selectInvocableMethod(mostSpecificMethod, bean.getClass());
MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
endpoint.setBean(bean);
- endpoint.setMethod(method);
+ endpoint.setMethod(invocableMethod);
+ endpoint.setMostSpecificMethod(mostSpecificMethod);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
+ endpoint.setBeanFactory(this.beanFactory);
endpoint.setId(getEndpointId(jmsListener));
endpoint.setDestination(resolve(jmsListener.destination()));
if (StringUtils.hasText(jmsListener.selector())) {
@@ -268,7 +265,7 @@ public class JmsListenerAnnotationBeanPostProcessor
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
- method + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
+ mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
" with id '" + containerFactoryBeanName + "' was found in the application context", ex);
}
}
@@ -296,15 +293,9 @@ public class JmsListenerAnnotationBeanPostProcessor
}
}
- /**
- * Resolve the specified value if possible.
- * @see ConfigurableBeanFactory#resolveEmbeddedValue
- */
private String resolve(String value) {
- if (this.beanFactory instanceof ConfigurableBeanFactory) {
- return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
- }
- return value;
+ return (this.beanFactory instanceof ConfigurableBeanFactory ?
+ ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value) : value);
}
diff --git a/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListeners.java b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListeners.java
new file mode 100644
index 00000000..f3d5266e
--- /dev/null
+++ b/spring-jms/src/main/java/org/springframework/jms/annotation/JmsListeners.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2002-2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.jms.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Container annotation that aggregates several {@link JmsListener} annotations.
+ *
+ * <p>Can be used natively, declaring several nested {@link JmsListener} annotations.
+ * Can also be used in conjunction with Java 8's support for repeatable annotations,
+ * where {@link JmsListener} can simply be declared several times on the same method,
+ * implicitly generating this container annotation.
+ *
+ * @author Stephane Nicoll
+ * @since 4.2
+ * @see JmsListener
+ */
+@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface JmsListeners {
+
+ JmsListener[] value();
+
+}
diff --git a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java
index cfaf8de1..888f8e6b 100644
--- a/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java
+++ b/spring-jms/src/main/java/org/springframework/jms/config/AbstractJmsListenerContainerFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -52,6 +52,8 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
private Boolean pubSubDomain;
+ private Boolean replyPubSubDomain;
+
private Boolean subscriptionDurable;
private Boolean subscriptionShared;
@@ -60,6 +62,8 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
private Integer phase;
+ private Boolean autoStartup;
+
/**
* @see AbstractMessageListenerContainer#setConnectionFactory(ConnectionFactory)
@@ -111,6 +115,13 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
}
/**
+ * @see AbstractMessageListenerContainer#setReplyPubSubDomain(boolean)
+ */
+ public void setReplyPubSubDomain(Boolean replyPubSubDomain) {
+ this.replyPubSubDomain = replyPubSubDomain;
+ }
+
+ /**
* @see AbstractMessageListenerContainer#setSubscriptionDurable(boolean)
*/
public void setSubscriptionDurable(Boolean subscriptionDurable) {
@@ -138,6 +149,12 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
this.phase = phase;
}
+ /**
+ * @see AbstractMessageListenerContainer#setAutoStartup(boolean)
+ */
+ public void setAutoStartup(boolean autoStartup) {
+ this.autoStartup = autoStartup;
+ }
@Override
public C createListenerContainer(JmsListenerEndpoint endpoint) {
@@ -164,6 +181,9 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
if (this.pubSubDomain != null) {
instance.setPubSubDomain(this.pubSubDomain);
}
+ if (this.replyPubSubDomain != null) {
+ instance.setReplyPubSubDomain(this.replyPubSubDomain);
+ }
if (this.subscriptionDurable != null) {
instance.setSubscriptionDurable(this.subscriptionDurable);
}
@@ -176,6 +196,9 @@ public abstract class AbstractJmsListenerContainerFactory<C extends AbstractMess
if (this.phase != null) {
instance.setPhase(this.phase);
}
+ if (this.autoStartup != null) {
+ instance.setAutoStartup(this.autoStartup);
+ }
endpoint.setupListenerContainer(instance);
initializeContainer(instance);
diff --git a/spring-jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java b/spring-jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java
index e4608b09..f2d33d30 100644
--- a/spring-jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java
+++ b/spring-jms/src/main/java/org/springframework/jms/config/AbstractListenerContainerParser.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -78,6 +78,8 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
protected static final String DESTINATION_TYPE_SHARED_DURABLE_TOPIC = "sharedDurableTopic";
+ protected static final String RESPONSE_DESTINATION_TYPE_ATTRIBUTE = "response-destination-type";
+
protected static final String CLIENT_ID_ATTRIBUTE = "client-id";
protected static final String ACKNOWLEDGE_ATTRIBUTE = "acknowledge";
@@ -170,7 +172,7 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
if (listenerEle.hasAttribute(RESPONSE_DESTINATION_ATTRIBUTE)) {
String responseDestination = listenerEle.getAttribute(RESPONSE_DESTINATION_ATTRIBUTE);
- Boolean pubSubDomain = (Boolean) commonContainerProperties.getPropertyValue("pubSubDomain").getValue();
+ Boolean pubSubDomain = (Boolean) commonContainerProperties.getPropertyValue("replyPubSubDomain").getValue();
listenerDef.getPropertyValues().add(
pubSubDomain ? "defaultResponseTopicName" : "defaultResponseQueueName", responseDestination);
if (containerDef.getPropertyValues().contains("destinationResolver")) {
@@ -260,6 +262,23 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
properties.add("subscriptionDurable", subscriptionDurable);
properties.add("subscriptionShared", subscriptionShared);
+ boolean replyPubSubDomain = false;
+ String replyDestinationType = containerEle.getAttribute(RESPONSE_DESTINATION_TYPE_ATTRIBUTE);
+ if (DESTINATION_TYPE_TOPIC.equals(replyDestinationType)) {
+ replyPubSubDomain = true;
+ }
+ else if (DESTINATION_TYPE_QUEUE.equals(replyDestinationType)) {
+ replyPubSubDomain = false;
+ }
+ else if (!StringUtils.hasText(replyDestinationType)) {
+ replyPubSubDomain = pubSubDomain; // the default: same value as pubSubDomain
+ }
+ else if (StringUtils.hasText(replyDestinationType)) {
+ parserContext.getReaderContext().error("Invalid listener container 'response-destination-type': only " +
+ "\"queue\", \"topic\" supported.", containerEle);
+ }
+ properties.add("replyPubSubDomain", replyPubSubDomain);
+
if (containerEle.hasAttribute(CLIENT_ID_ATTRIBUTE)) {
String clientId = containerEle.getAttribute(CLIENT_ID_ATTRIBUTE);
if (!StringUtils.hasText(clientId)) {
diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java b/spring-jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java
index 03feb01e..ef03eda9 100644
--- a/spring-jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java
+++ b/spring-jms/src/main/java/org/springframework/jms/config/JcaListenerContainerParser.java
@@ -26,7 +26,7 @@ import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.StringUtils;
/**
- * Parser for the JMS {@code &lt;jca-listener-container&gt;} element.
+ * Parser for the JMS {@code <jca-listener-container>} element.
*
* @author Juergen Hoeller
* @author Stephane Nicoll
diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java
index e2b45004..0e91e2ef 100644
--- a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java
+++ b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerContainerParser.java
@@ -28,7 +28,7 @@ import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.StringUtils;
/**
- * Parser for the JMS {@code &lt;listener-container&gt;} element.
+ * Parser for the JMS {@code <listener-container>} element.
*
* @author Mark Fisher
* @author Juergen Hoeller
diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java
index dc97d9d3..af540e59 100644
--- a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java
+++ b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ import java.util.List;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.util.Assert;
@@ -50,6 +51,10 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
private final List<JmsListenerEndpointDescriptor> endpointDescriptors =
new ArrayList<JmsListenerEndpointDescriptor>();
+ private boolean startImmediately;
+
+ private Object mutex = endpointDescriptors;
+
/**
* Set the {@link JmsListenerEndpointRegistry} instance to use.
@@ -113,6 +118,10 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
+ if (beanFactory instanceof ConfigurableBeanFactory) {
+ ConfigurableBeanFactory cbf = (ConfigurableBeanFactory) beanFactory;
+ this.mutex = cbf.getSingletonMutex();
+ }
}
@@ -122,8 +131,12 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
}
protected void registerAllEndpoints() {
- for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
- this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));
+ synchronized (this.mutex) {
+ for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
+ this.endpointRegistry.registerListenerContainer(
+ descriptor.endpoint, resolveContainerFactory(descriptor));
+ }
+ this.startImmediately = true; // trigger immediate startup
}
}
@@ -136,9 +149,10 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
}
else if (this.containerFactoryBeanName != null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
+ // Consider changing this if live change of the factory is required...
this.containerFactory = this.beanFactory.getBean(
this.containerFactoryBeanName, JmsListenerContainerFactory.class);
- return this.containerFactory; // Consider changing this if live change of the factory is required
+ return this.containerFactory;
}
else {
throw new IllegalStateException("Could not resolve the " +
@@ -156,8 +170,19 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
public void registerEndpoint(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
+
// Factory may be null, we defer the resolution right before actually creating the container
- this.endpointDescriptors.add(new JmsListenerEndpointDescriptor(endpoint, factory));
+ JmsListenerEndpointDescriptor descriptor = new JmsListenerEndpointDescriptor(endpoint, factory);
+
+ synchronized (this.mutex) {
+ if (this.startImmediately) { // register and start immediately
+ this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
+ resolveContainerFactory(descriptor), true);
+ }
+ else {
+ this.endpointDescriptors.add(descriptor);
+ }
+ }
}
/**
diff --git a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java
index 994c9f18..4d48fb31 100644
--- a/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java
+++ b/spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,8 +18,9 @@ package org.springframework.jms.config;
import java.util.Collection;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -28,7 +29,11 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ApplicationListener;
import org.springframework.context.SmartLifecycle;
+import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.util.Assert;
@@ -52,15 +57,33 @@ import org.springframework.util.Assert;
* @see MessageListenerContainer
* @see JmsListenerContainerFactory
*/
-public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecycle {
+public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecycle,
+ ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
protected final Log logger = LogFactory.getLog(getClass());
private final Map<String, MessageListenerContainer> listenerContainers =
- new LinkedHashMap<String, MessageListenerContainer>();
+ new ConcurrentHashMap<String, MessageListenerContainer>();
private int phase = Integer.MAX_VALUE;
+ private ApplicationContext applicationContext;
+
+ private boolean contextRefreshed;
+
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) {
+ this.applicationContext = applicationContext;
+ }
+
+ @Override
+ public void onApplicationEvent(ContextRefreshedEvent event) {
+ if (event.getApplicationContext() == this.applicationContext) {
+ this.contextRefreshed = true;
+ }
+ }
+
/**
* Return the {@link MessageListenerContainer} with the specified id or
@@ -68,6 +91,7 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
* @param id the id of the container
* @return the container or {@code null} if no container with that id exists
* @see JmsListenerEndpoint#getId()
+ * @see #getListenerContainerIds()
*/
public MessageListenerContainer getListenerContainer(String id) {
Assert.notNull(id, "Container identifier must not be null");
@@ -75,32 +99,63 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
}
/**
+ * Return the ids of the managed {@link MessageListenerContainer} instance(s).
+ * @see #getListenerContainer(String)
+ * @since 4.2.3
+ */
+ public Set<String> getListenerContainerIds() {
+ return Collections.unmodifiableSet(this.listenerContainers.keySet());
+ }
+
+ /**
* Return the managed {@link MessageListenerContainer} instance(s).
*/
public Collection<MessageListenerContainer> getListenerContainers() {
return Collections.unmodifiableCollection(this.listenerContainers.values());
}
-
/**
* Create a message listener container for the given {@link JmsListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
+ * <p>The {@code startImmediately} flag determines if the container should be
+ * started immediately.
* @param endpoint the endpoint to add
+ * @param factory the listener factory to use
+ * @param startImmediately start the container immediately if necessary
* @see #getListenerContainers()
* @see #getListenerContainer(String)
*/
- public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
+ public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
+ boolean startImmediately) {
+
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.notNull(id, "Endpoint id must not be null");
- Assert.state(!this.listenerContainers.containsKey(id),
- "Another endpoint is already registered with id '" + id + "'");
+ synchronized (this.listenerContainers) {
+ if (this.listenerContainers.containsKey(id)) {
+ throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");
+ }
+ MessageListenerContainer container = createListenerContainer(endpoint, factory);
+ this.listenerContainers.put(id, container);
+ if (startImmediately) {
+ startIfNecessary(container);
+ }
+ }
+ }
- MessageListenerContainer container = createListenerContainer(endpoint, factory);
- this.listenerContainers.put(id, container);
+ /**
+ * Create a message listener container for the given {@link JmsListenerEndpoint}.
+ * <p>This create the necessary infrastructure to honor that endpoint
+ * with regards to its configuration.
+ * @param endpoint the endpoint to add
+ * @param factory the listener factory to use
+ * @see #registerListenerContainer(JmsListenerEndpoint, JmsListenerContainerFactory, boolean)
+ */
+ public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
+ registerListenerContainer(endpoint, factory, false);
}
/**
@@ -133,21 +188,6 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
}
- @Override
- public void destroy() {
- for (MessageListenerContainer listenerContainer : getListenerContainers()) {
- if (listenerContainer instanceof DisposableBean) {
- try {
- ((DisposableBean) listenerContainer).destroy();
- }
- catch (Throwable ex) {
- logger.warn("Failed to destroy message listener container", ex);
- }
- }
- }
- }
-
-
// Delegating implementation of SmartLifecycle
@Override
@@ -163,9 +203,7 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
- if (listenerContainer.isAutoStartup()) {
- listenerContainer.start();
- }
+ startIfNecessary(listenerContainer);
}
}
@@ -195,6 +233,31 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
return false;
}
+ /**
+ * Start the specified {@link MessageListenerContainer} if it should be started
+ * on startup or when start is called explicitly after startup.
+ * @see MessageListenerContainer#isAutoStartup()
+ */
+ private void startIfNecessary(MessageListenerContainer listenerContainer) {
+ if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
+ listenerContainer.start();
+ }
+ }
+
+ @Override
+ public void destroy() {
+ for (MessageListenerContainer listenerContainer : getListenerContainers()) {
+ if (listenerContainer instanceof DisposableBean) {
+ try {
+ ((DisposableBean) listenerContainer).destroy();
+ }
+ catch (Throwable ex) {
+ logger.warn("Failed to destroy message listener container", ex);
+ }
+ }
+ }
+ }
+
private static class AggregatingCallback implements Runnable {
diff --git a/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java b/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java
index 7876c109..8bde24d0 100644
--- a/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java
+++ b/spring-jms/src/main/java/org/springframework/jms/config/MethodJmsListenerEndpoint.java
@@ -21,6 +21,8 @@ import java.util.Arrays;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.support.AopUtils;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter;
@@ -37,6 +39,7 @@ import org.springframework.util.StringUtils;
* an incoming message for this endpoint.
*
* @author Stephane Nicoll
+ * @author Juergen Hoeller
* @since 4.1
*/
public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
@@ -45,8 +48,12 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
private Method method;
+ private Method mostSpecificMethod;
+
private MessageHandlerMethodFactory messageHandlerMethodFactory;
+ private BeanFactory beanFactory;
+
/**
* Set the actual bean instance to invoke this endpoint method on.
@@ -71,6 +78,29 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
}
/**
+ * Set the most specific method known for this endpoint's declaration.
+ * <p>In case of a proxy, this will be the method on the target class
+ * (if annotated itself, that is, if not just annotated in an interface).
+ * @since 4.2.3
+ */
+ public void setMostSpecificMethod(Method mostSpecificMethod) {
+ this.mostSpecificMethod = mostSpecificMethod;
+ }
+
+ public Method getMostSpecificMethod() {
+ if (this.mostSpecificMethod != null) {
+ return this.mostSpecificMethod;
+ }
+ else if (AopUtils.isAopProxy(this.bean)) {
+ Class<?> target = AopProxyUtils.ultimateTargetClass(this.bean);
+ return AopUtils.getMostSpecificMethod(getMethod(), target);
+ }
+ else {
+ return getMethod();
+ }
+ }
+
+ /**
* Set the {@link MessageHandlerMethodFactory} to use to build the
* {@link InvocableHandlerMethod} responsible to manage the invocation
* of this endpoint.
@@ -79,6 +109,12 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
this.messageHandlerMethodFactory = messageHandlerMethodFactory;
}
+ /**
+ * Set the {@link BeanFactory} to use to resolve expressions (can be null).
+ */
+ public void setBeanFactory(BeanFactory beanFactory) {
+ this.beanFactory = beanFactory;
+ }
@Override
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
@@ -90,7 +126,7 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
messageListener.setHandlerMethod(invocableHandlerMethod);
String responseDestination = getDefaultResponseDestination();
if (StringUtils.hasText(responseDestination)) {
- if (container.isPubSubDomain()) {
+ if (container.isReplyPubSubDomain()) {
messageListener.setDefaultResponseTopicName(responseDestination);
}
else {
@@ -116,7 +152,10 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
return new MessagingMessageListenerAdapter();
}
- private String getDefaultResponseDestination() {
+ /**
+ * Return the default response destination, if any.
+ */
+ protected String getDefaultResponseDestination() {
Method specificMethod = getMostSpecificMethod();
SendTo ann = AnnotationUtils.getAnnotation(specificMethod, SendTo.class);
if (ann != null) {
@@ -125,21 +164,23 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
throw new IllegalStateException("Invalid @" + SendTo.class.getSimpleName() + " annotation on '" +
specificMethod + "' one destination must be set (got " + Arrays.toString(destinations) + ")");
}
- return (String) destinations[0];
+ return resolve((String) destinations[0]);
}
return null;
}
- private Method getMostSpecificMethod() {
- if (AopUtils.isAopProxy(this.bean)) {
- Class<?> target = AopProxyUtils.ultimateTargetClass(this.bean);
- return AopUtils.getMostSpecificMethod(getMethod(), target);
- }
- else {
- return getMethod();
+ /**
+ * Resolve the specified value if possible.
+ * @see ConfigurableBeanFactory#resolveEmbeddedValue
+ */
+ private String resolve(String value) {
+ if (this.beanFactory instanceof ConfigurableBeanFactory) {
+ return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
}
+ return value;
}
+
@Override
protected StringBuilder getEndpointDescription() {
return super.getEndpointDescription()
diff --git a/spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java b/spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java
index 3d7e0fb5..288ab5af 100644
--- a/spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java
+++ b/spring-jms/src/main/java/org/springframework/jms/connection/CachedMessageProducer.java
@@ -276,7 +276,7 @@ class CachedMessageProducer implements MessageProducer, QueueSender, TopicPublis
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
if (method.getName().equals("send") && args != null &&
- completionListenerClass.equals(method.getParameterTypes()[args.length - 1])) {
+ completionListenerClass == method.getParameterTypes()[args.length - 1]) {
switch (args.length) {
case 2: // send(message, completionListener)
return sendWithCompletionListenerMethod.invoke(
diff --git a/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java b/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java
index 2708e0d7..f3ec7bee 100644
--- a/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java
+++ b/spring-jms/src/main/java/org/springframework/jms/connection/CachingConnectionFactory.java
@@ -528,7 +528,7 @@ public class CachingConnectionFactory extends SingleConnectionFactory {
}
protected boolean destinationEquals(DestinationCacheKey otherKey) {
- return (this.destination.getClass().equals(otherKey.destination.getClass()) &&
+ return (this.destination.getClass() == otherKey.destination.getClass() &&
(this.destination.equals(otherKey.destination) ||
getDestinationString().equals(otherKey.getDestinationString())));
}
diff --git a/spring-jms/src/main/java/org/springframework/jms/connection/TransactionAwareConnectionFactoryProxy.java b/spring-jms/src/main/java/org/springframework/jms/connection/TransactionAwareConnectionFactoryProxy.java
index 51154031..50c61409 100644
--- a/spring-jms/src/main/java/org/springframework/jms/connection/TransactionAwareConnectionFactoryProxy.java
+++ b/spring-jms/src/main/java/org/springframework/jms/connection/TransactionAwareConnectionFactoryProxy.java
@@ -234,14 +234,14 @@ public class TransactionAwareConnectionFactoryProxy
// Use hashCode of Connection proxy.
return System.identityHashCode(proxy);
}
- else if (Session.class.equals(method.getReturnType())) {
+ else if (Session.class == method.getReturnType()) {
Session session = ConnectionFactoryUtils.getTransactionalSession(
getTargetConnectionFactory(), this.target, isSynchedLocalTransactionAllowed());
if (session != null) {
return getCloseSuppressingSessionProxy(session);
}
}
- else if (QueueSession.class.equals(method.getReturnType())) {
+ else if (QueueSession.class == method.getReturnType()) {
QueueSession session = ConnectionFactoryUtils.getTransactionalQueueSession(
(QueueConnectionFactory) getTargetConnectionFactory(), (QueueConnection) this.target,
isSynchedLocalTransactionAllowed());
@@ -249,7 +249,7 @@ public class TransactionAwareConnectionFactoryProxy
return getCloseSuppressingSessionProxy(session);
}
}
- else if (TopicSession.class.equals(method.getReturnType())) {
+ else if (TopicSession.class == method.getReturnType()) {
TopicSession session = ConnectionFactoryUtils.getTransactionalTopicSession(
(TopicConnectionFactory) getTargetConnectionFactory(), (TopicConnection) this.target,
isSynchedLocalTransactionAllowed());
diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java
index ab67e4bf..0de5f39a 100644
--- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java
+++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractJmsListeningContainer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -315,7 +315,7 @@ public abstract class AbstractJmsListeningContainer extends JmsDestinationAccess
@Override
public void stop(Runnable callback) {
- this.stop();
+ stop();
callback.run();
}
diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java
index 3605debf..ec36bcc0 100644
--- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java
+++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java
@@ -166,6 +166,8 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
private String subscriptionName;
+ private Boolean replyPubSubDomain;
+
private boolean pubSubNoLocal = false;
private MessageConverter messageConverter;
@@ -459,6 +461,36 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
}
/**
+ * Configure the reply destination type. By default, the configured {@code pubSubDomain}
+ * value is used (see {@link #isPubSubDomain()}.
+ * <p>This setting primarily indicates what type of destination to resolve if dynamic
+ * destinations are enabled.
+ * @param replyPubSubDomain "true" for the Publish/Subscribe domain ({@link Topic Topics}),
+ * "false" for the Point-to-Point domain ({@link Queue Queues})
+ * @since 4.2
+ * @see #setDestinationResolver
+ */
+ public void setReplyPubSubDomain(boolean replyPubSubDomain) {
+ this.replyPubSubDomain = replyPubSubDomain;
+ }
+
+ /**
+ * Return whether the Publish/Subscribe domain ({@link javax.jms.Topic Topics}) is used
+ * for replies. Otherwise, the Point-to-Point domain ({@link javax.jms.Queue Queues})
+ * is used.
+ * @since 4.2
+ */
+ @Override
+ public boolean isReplyPubSubDomain() {
+ if (this.replyPubSubDomain != null) {
+ return replyPubSubDomain;
+ }
+ else {
+ return isPubSubDomain();
+ }
+ }
+
+ /**
* Set the {@link MessageConverter} strategy for converting JMS Messages.
* @since 4.1
*/
diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java
index 3f12056a..4c1a6011 100644
--- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java
+++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2012 the original author or authors.
+ * Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -168,6 +168,14 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe
this.receiveTimeout = receiveTimeout;
}
+ /**
+ * Return the receive timeout (ms) configured for this listener container.
+ * @since 4.2
+ */
+ protected long getReceiveTimeout() {
+ return this.receiveTimeout;
+ }
+
@Override
public void initialize() {
diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java
index 03a69069..5fe49eb4 100644
--- a/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java
+++ b/spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2015 the original author or authors.
+ * Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -175,7 +175,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private Executor taskExecutor;
- private BackOff backOff = createDefaultBackOff(DEFAULT_RECOVERY_INTERVAL);
+ private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE);
private int cacheLevel = CACHE_AUTO;
@@ -197,6 +197,8 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
private volatile boolean recovering = false;
+ private volatile boolean interrupted = false;
+
private Runnable stopCallback;
private Object currentRecoveryMarker = new Object();
@@ -229,6 +231,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* attempt to recover.
* <p>The {@link #setRecoveryInterval(long) recovery interval} is ignored
* when this property is set.
+ * @since 4.1
*/
public void setBackOff(BackOff backOff) {
this.backOff = backOff;
@@ -244,7 +247,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
* @see #handleListenerSetupFailure
*/
public void setRecoveryInterval(long recoveryInterval) {
- this.backOff = createDefaultBackOff(recoveryInterval);
+ this.backOff = new FixedBackOff(recoveryInterval, Long.MAX_VALUE);
}
/**
@@ -563,7 +566,13 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
" message listener invokers");
}
- this.lifecycleMonitor.wait();
+ long timeout = getReceiveTimeout();
+ if (timeout > 0) {
+ this.lifecycleMonitor.wait(timeout);
+ }
+ else {
+ this.lifecycleMonitor.wait();
+ }
}
// Clear remaining scheduled invokers, possibly left over as paused tasks...
for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
@@ -605,6 +614,12 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
@Override
public void stop(Runnable callback) throws JmsException {
synchronized (this.lifecycleMonitor) {
+ if (!isRunning() || this.stopCallback != null) {
+ // Not started, already stopped, or previous stop attempt in progress
+ // -> return immediately, no stop process to control anymore.
+ callback.run();
+ return;
+ }
this.stopCallback = callback;
}
stop();
@@ -886,6 +901,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
finally {
this.recovering = false;
+ this.interrupted = false;
}
}
@@ -935,7 +951,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
StringBuilder msg = new StringBuilder();
msg.append("Stopping container for destination '")
.append(getDestinationDescription())
- .append("' - back off policy does not allow ").append("for further attempts.");
+ .append("': back-off policy does not allow ").append("for further attempts.");
logger.error(msg.toString());
stop();
}
@@ -962,12 +978,17 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
/**
- * Apply the next back off time using the specified {@link BackOffExecution}.
- * <p>Return {@code true} if the back off period has been applied and a new
+ * Apply the next back-off time using the specified {@link BackOffExecution}.
+ * <p>Return {@code true} if the back-off period has been applied and a new
* attempt to recover should be made, {@code false} if no further attempt
* should be made.
+ * @since 4.1
*/
protected boolean applyBackOffTime(BackOffExecution execution) {
+ if (this.recovering && this.interrupted) {
+ // Interrupted right before and still failing... give up.
+ return false;
+ }
long interval = execution.nextBackOff();
if (interval == BackOffExecution.STOP) {
return false;
@@ -979,13 +1000,12 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
catch (InterruptedException interEx) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
+ if (this.recovering) {
+ this.interrupted = true;
+ }
}
+ return true;
}
- return true;
- }
-
- private FixedBackOff createDefaultBackOff(long interval) {
- return new FixedBackOff(interval, Long.MAX_VALUE);
}
/**
@@ -1199,7 +1219,7 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
/**
- * Apply the back off time once. In a regular scenario, the back off is only applied if we
+ * Apply the back-off time once. In a regular scenario, the back-off is only applied if we
* failed to recover with the broker. This additional sleep period avoids a burst retry
* scenario when the broker is actually up but something else if failing (i.e. listener
* specific).
diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/MessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/MessageListenerContainer.java
index 32fe238d..2457c537 100644
--- a/spring-jms/src/main/java/org/springframework/jms/listener/MessageListenerContainer.java
+++ b/spring-jms/src/main/java/org/springframework/jms/listener/MessageListenerContainer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,4 +54,12 @@ public interface MessageListenerContainer extends SmartLifecycle {
*/
boolean isPubSubDomain();
+ /**
+ * Return whether the reply destination uses Publish/Subscribe domain
+ * ({@link javax.jms.Topic Topics}). Otherwise, the Point-to-Point domain
+ * ({@link javax.jms.Queue Queues}) is used.
+ * <p>By default, the value is identical to {@link #isPubSubDomain()}.
+ */
+ boolean isReplyPubSubDomain();
+
}
diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java
index 5108d2bb..10ae1cf8 100644
--- a/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java
+++ b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java
@@ -243,7 +243,7 @@ public abstract class AbstractAdaptableMessageListener
try {
Message response = buildMessage(session, result);
postProcessResponse(request, response);
- Destination destination = getResponseDestination(request, response, session);
+ Destination destination = getResponseDestination(request, response, session, result);
sendResponse(session, destination, response);
}
catch (Exception ex) {
@@ -269,22 +269,21 @@ public abstract class AbstractAdaptableMessageListener
* @see #setMessageConverter
*/
protected Message buildMessage(Session session, Object result) throws JMSException {
+ Object content = (result instanceof JmsResponse ? ((JmsResponse<?>) result).getResponse() : result);
+ if (content instanceof org.springframework.messaging.Message) {
+ return this.messagingMessageConverter.toMessage(content, session);
+ }
+
MessageConverter converter = getMessageConverter();
if (converter != null) {
- if (result instanceof org.springframework.messaging.Message) {
- return this.messagingMessageConverter.toMessage(result, session);
- }
- else {
- return converter.toMessage(result, session);
- }
+ return converter.toMessage(content, session);
}
- else {
- if (!(result instanceof Message)) {
- throw new MessageConversionException(
- "No MessageConverter specified - cannot handle message [" + result + "]");
- }
- return (Message) result;
+
+ if (!(content instanceof Message)) {
+ throw new MessageConversionException(
+ "No MessageConverter specified - cannot handle message [" + content + "]");
}
+ return (Message) content;
}
/**
@@ -305,6 +304,19 @@ public abstract class AbstractAdaptableMessageListener
response.setJMSCorrelationID(correlation);
}
+ private Destination getResponseDestination(Message request, Message response, Session session, Object result)
+ throws JMSException {
+
+ if (result instanceof JmsResponse) {
+ JmsResponse<?> jmsResponse = (JmsResponse) result;
+ Destination destination = jmsResponse.resolveDestination(getDestinationResolver(), session);
+ if (destination != null) {
+ return destination;
+ }
+ }
+ return getResponseDestination(request, response, session);
+ }
+
/**
* Determine a response destination for the given message.
* <p>The default implementation first checks the JMS Reply-To
@@ -411,6 +423,15 @@ public abstract class AbstractAdaptableMessageListener
}
return payload;
}
+
+ @Override
+ protected Message createMessageForPayload(Object payload, Session session) throws JMSException {
+ MessageConverter converter = getMessageConverter();
+ if (converter != null) {
+ return converter.toMessage(payload, session);
+ }
+ throw new IllegalStateException("No message converter - cannot handle [" + payload + "]");
+ }
}
diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/adapter/JmsResponse.java b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/JmsResponse.java
new file mode 100644
index 00000000..0ba0e326
--- /dev/null
+++ b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/JmsResponse.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2002-2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.jms.listener.adapter;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.util.Assert;
+
+/**
+ * Return type of any JMS listener method used to indicate the actual response
+ * destination alongside the response itself. Typically used when said destination
+ * needs to be computed at runtime.
+ *
+ * <p>The example below sends a response with the content of the {@code result}
+ * argument to the {@code queueOut Queue}:
+ *
+ * <pre class="code">
+ * package com.acme.foo;
+ *
+ * public class MyService {
+ * &#064;JmsListener
+ * public JmsResponse process(String msg) {
+ * // process incoming message
+ * return JmsResponse.forQueue(result, "queueOut");
+ * }
+ * }</pre>
+ *
+ * If the destination does not need to be computed at runtime,
+ * {@link org.springframework.messaging.handler.annotation.SendTo @SendTo}
+ * is the recommended declarative approach.
+ *
+ * @author Stephane Nicoll
+ * @since 4.2
+ * @see org.springframework.jms.annotation.JmsListener
+ * @see org.springframework.messaging.handler.annotation.SendTo
+ * @param <T> the type of the response
+ */
+public class JmsResponse<T> {
+
+ private final T response;
+
+ private final Object destination;
+
+
+ /**
+ * Create a new instance
+ * @param response the content of the result
+ * @param destination the destination
+ */
+ protected JmsResponse(T response, Object destination) {
+ Assert.notNull(response, "Result must not be null");
+ this.response = response;
+ this.destination = destination;
+ }
+
+
+ /**
+ * Return the content of the response.
+ */
+ public T getResponse() {
+ return this.response;
+ }
+
+ /**
+ * Resolve the {@link Destination} to use for this instance. The {@link DestinationResolver}
+ * and {@link Session} can be used to resolve a destination at runtime.
+ * @param destinationResolver the destination resolver to use if necessary
+ * @param session the session to use, if necessary
+ * @return the {@link Destination} to use
+ * @throws JMSException if the DestinationResolver failed to resolve the destination
+ */
+ public Destination resolveDestination(DestinationResolver destinationResolver, Session session)
+ throws JMSException {
+
+ if (this.destination instanceof Destination) {
+ return (Destination) this.destination;
+ }
+ if (this.destination instanceof DestinationNameHolder) {
+ DestinationNameHolder nameHolder = (DestinationNameHolder) this.destination;
+ return destinationResolver.resolveDestinationName(session,
+ nameHolder.destinationName, nameHolder.pubSubDomain);
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "JmsResponse [" + "response=" + this.response + ", destination=" + this.destination + ']';
+ }
+
+
+ /**
+ * Create a {@link JmsResponse} targeting the queue with the specified name.
+ */
+ public static <T> JmsResponse<T> forQueue(T result, String queueName) {
+ Assert.notNull(queueName, "Queue name must not be null");
+ return new JmsResponse<T>(result, new DestinationNameHolder(queueName, false));
+ }
+
+ /**
+ * Create a {@link JmsResponse} targeting the topic with the specified name.
+ */
+ public static <T> JmsResponse<T> forTopic(T result, String topicName) {
+ Assert.notNull(topicName, "Topic name must not be null");
+ return new JmsResponse<T>(result, new DestinationNameHolder(topicName, true));
+ }
+
+ /**
+ * Create a {@link JmsResponse} targeting the specified {@link Destination}.
+ */
+ public static <T> JmsResponse<T> forDestination(T result, Destination destination) {
+ Assert.notNull(destination, "Destination must not be null");
+ return new JmsResponse<T>(result, destination);
+ }
+
+
+ /**
+ * Internal class combining a destination name
+ * and its target destination type (queue or topic).
+ */
+ private static class DestinationNameHolder {
+
+ private final String destinationName;
+
+ private final boolean pubSubDomain;
+
+ public DestinationNameHolder(String destinationName, boolean pubSubDomain) {
+ this.destinationName = destinationName;
+ this.pubSubDomain = pubSubDomain;
+ }
+
+ @Override
+ public String toString() {
+ return this.destinationName + "{" + "pubSubDomain=" + this.pubSubDomain + '}';
+ }
+ }
+
+}
diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java
index fc2fdf85..6a6fde3b 100644
--- a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java
+++ b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsActivationSpecConfig.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -46,6 +46,8 @@ public class JmsActivationSpecConfig {
private boolean pubSubDomain = false;
+ private Boolean replyPubSubDomain;
+
private boolean subscriptionDurable = false;
private boolean subscriptionShared = false;
@@ -81,6 +83,19 @@ public class JmsActivationSpecConfig {
return this.pubSubDomain;
}
+ public void setReplyPubSubDomain(boolean replyPubSubDomain) {
+ this.replyPubSubDomain = replyPubSubDomain;
+ }
+
+ public boolean isReplyPubSubDomain() {
+ if (this.replyPubSubDomain != null) {
+ return this.replyPubSubDomain;
+ }
+ else {
+ return isPubSubDomain();
+ }
+ }
+
public void setSubscriptionDurable(boolean subscriptionDurable) {
this.subscriptionDurable = subscriptionDurable;
if (subscriptionDurable) {
diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManager.java b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManager.java
index c5dfe383..8af16f59 100644
--- a/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManager.java
+++ b/spring-jms/src/main/java/org/springframework/jms/listener/endpoint/JmsMessageEndpointManager.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -208,4 +208,13 @@ public class JmsMessageEndpointManager extends GenericMessageEndpointManager
throw new IllegalStateException("Could not determine pubSubDomain - no activation spec config is set");
}
+ @Override
+ public boolean isReplyPubSubDomain() {
+ JmsActivationSpecConfig config = getActivationSpecConfig();
+ if (config != null) {
+ return config.isReplyPubSubDomain();
+ }
+ throw new IllegalStateException("Could not determine reply pubSubDomain - no activation spec config is set");
+ }
+
}
diff --git a/spring-jms/src/main/java/org/springframework/jms/remoting/JmsInvokerClientInterceptor.java b/spring-jms/src/main/java/org/springframework/jms/remoting/JmsInvokerClientInterceptor.java
index 1de257e1..2cff432a 100644
--- a/spring-jms/src/main/java/org/springframework/jms/remoting/JmsInvokerClientInterceptor.java
+++ b/spring-jms/src/main/java/org/springframework/jms/remoting/JmsInvokerClientInterceptor.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,6 +40,7 @@ import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
import org.springframework.remoting.RemoteAccessException;
import org.springframework.remoting.RemoteInvocationFailureException;
+import org.springframework.remoting.RemoteTimeoutException;
import org.springframework.remoting.support.DefaultRemoteInvocationFactory;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationFactory;
@@ -61,6 +62,7 @@ import org.springframework.remoting.support.RemoteInvocationResult;
*
* @author Juergen Hoeller
* @author James Strachan
+ * @author Stephane Nicoll
* @since 2.0
* @see #setConnectionFactory
* @see #setQueue
@@ -106,7 +108,7 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
/**
* Set the name of target queue to send invoker requests to.
- * The specified name will be dynamically resolved via the
+ * <p>The specified name will be dynamically resolved via the
* {@link #setDestinationResolver DestinationResolver}.
*/
public void setQueueName(String queueName) {
@@ -116,8 +118,8 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
/**
* Set the DestinationResolver that is to be used to resolve Queue
* references for this accessor.
- * <p>The default resolver is a DynamicDestinationResolver. Specify a
- * JndiDestinationResolver for resolving destination names as JNDI locations.
+ * <p>The default resolver is a {@code DynamicDestinationResolver}. Specify a
+ * {@code JndiDestinationResolver} for resolving destination names as JNDI locations.
* @see org.springframework.jms.support.destination.DynamicDestinationResolver
* @see org.springframework.jms.support.destination.JndiDestinationResolver
*/
@@ -127,8 +129,8 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
}
/**
- * Set the RemoteInvocationFactory to use for this accessor.
- * Default is a {@link org.springframework.remoting.support.DefaultRemoteInvocationFactory}.
+ * Set the {@link RemoteInvocationFactory} to use for this accessor.
+ * <p>Default is a {@link DefaultRemoteInvocationFactory}.
* <p>A custom invocation factory can add further context information
* to the invocation, for example user credentials.
*/
@@ -138,16 +140,16 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
}
/**
- * Specify the MessageConverter to use for turning
+ * Specify the {@link MessageConverter} to use for turning
* {@link org.springframework.remoting.support.RemoteInvocation}
* objects into request messages, as well as response messages into
* {@link org.springframework.remoting.support.RemoteInvocationResult} objects.
- * <p>Default is a {@link org.springframework.jms.support.converter.SimpleMessageConverter},
- * using a standard JMS {@link javax.jms.ObjectMessage} for each invocation /
- * invocation result object.
- * <p>Custom implementations may generally adapt Serializables into
- * special kinds of messages, or might be specifically tailored for
- * translating RemoteInvocation(Result)s into specific kinds of messages.
+ * <p>Default is a {@link SimpleMessageConverter}, using a standard JMS
+ * {@link javax.jms.ObjectMessage} for each invocation / invocation result
+ * object.
+ * <p>Custom implementations may generally adapt {@link java.io.Serializable}
+ * objects into special kinds of messages, or might be specifically tailored for
+ * translating {@code RemoteInvocation(Result)s} into specific kinds of messages.
*/
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = (messageConverter != null ? messageConverter : new SimpleMessageConverter());
@@ -213,11 +215,11 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
}
/**
- * Create a new RemoteInvocation object for the given AOP method invocation.
- * The default implementation delegates to the RemoteInvocationFactory.
- * <p>Can be overridden in subclasses to provide custom RemoteInvocation
+ * Create a new {@code RemoteInvocation} object for the given AOP method invocation.
+ * <p>The default implementation delegates to the {@link RemoteInvocationFactory}.
+ * <p>Can be overridden in subclasses to provide custom {@code RemoteInvocation}
* subclasses, containing additional invocation parameters like user credentials.
- * Note that it is preferable to use a custom RemoteInvocationFactory which
+ * Note that it is preferable to use a custom {@code RemoteInvocationFactory} which
* is a reusable strategy.
* @param methodInvocation the current AOP method invocation
* @return the RemoteInvocation object
@@ -244,7 +246,12 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
Message requestMessage = createRequestMessage(session, invocation);
con.start();
Message responseMessage = doExecuteRequest(session, queueToUse, requestMessage);
- return extractInvocationResult(responseMessage);
+ if (responseMessage != null) {
+ return extractInvocationResult(responseMessage);
+ }
+ else {
+ return onReceiveTimeout(invocation);
+ }
}
finally {
JmsUtils.closeSession(session);
@@ -300,7 +307,7 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
/**
* Create the invoker request message.
- * <p>The default implementation creates a JMS ObjectMessage
+ * <p>The default implementation creates a JMS {@link javax.jms.ObjectMessage}
* for the given RemoteInvocation object.
* @param session the current JMS Session
* @param invocation the remote invocation to send
@@ -346,9 +353,9 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
/**
* Extract the invocation result from the response message.
- * <p>The default implementation expects a JMS ObjectMessage carrying
- * a RemoteInvocationResult object. If an invalid response message is
- * encountered, the {@code onInvalidResponse} callback gets invoked.
+ * <p>The default implementation expects a JMS {@link javax.jms.ObjectMessage}
+ * carrying a {@link RemoteInvocationResult} object. If an invalid response
+ * message is encountered, the {@code onInvalidResponse} callback gets invoked.
* @param responseMessage the response message
* @return the invocation result
* @throws JMSException is thrown if a JMS exception occurs
@@ -363,14 +370,27 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
}
/**
- * Callback that is invoked by {@code extractInvocationResult}
- * when it encounters an invalid response message.
- * <p>The default implementation throws a MessageFormatException.
+ * Callback that is invoked by {@link #executeRequest} when the receive
+ * timeout has expired for the specified {@link RemoteInvocation}.
+ * <p>By default, an {@link RemoteTimeoutException} is thrown. Sub-classes
+ * can choose to either throw a more dedicated exception or even return
+ * a default {@link RemoteInvocationResult} as a fallback.
+ * @param invocation the invocation
+ * @return a default result when the receive timeout has expired
+ */
+ protected RemoteInvocationResult onReceiveTimeout(RemoteInvocation invocation) {
+ throw new RemoteTimeoutException("Receive timeout after " + this.receiveTimeout + " ms for " + invocation);
+ }
+
+ /**
+ * Callback that is invoked by {@link #extractInvocationResult} when
+ * it encounters an invalid response message.
+ * <p>The default implementation throws a {@link MessageFormatException}.
* @param responseMessage the invalid response message
- * @return an alternative invocation result that should be
- * returned to the caller (if desired)
- * @throws JMSException if the invalid response should lead
- * to an infrastructure exception propagated to the caller
+ * @return an alternative invocation result that should be returned to
+ * the caller (if desired)
+ * @throws JMSException if the invalid response should lead to an
+ * infrastructure exception propagated to the caller
* @see #extractInvocationResult
*/
protected RemoteInvocationResult onInvalidResponse(Message responseMessage) throws JMSException {
@@ -378,9 +398,10 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
}
/**
- * Recreate the invocation result contained in the given RemoteInvocationResult
- * object. The default implementation calls the default recreate method.
- * <p>Can be overridden in subclass to provide custom recreation, potentially
+ * Recreate the invocation result contained in the given {@link RemoteInvocationResult}
+ * object.
+ * <p>The default implementation calls the default {@code recreate()} method.
+ * <p>Can be overridden in subclasses to provide custom recreation, potentially
* processing the returned result object.
* @param result the RemoteInvocationResult to recreate
* @return a return value if the invocation result is a successful return
@@ -393,12 +414,12 @@ public class JmsInvokerClientInterceptor implements MethodInterceptor, Initializ
/**
* Convert the given JMS invoker access exception to an appropriate
- * Spring RemoteAccessException.
+ * Spring {@link RemoteAccessException}.
* @param ex the exception to convert
* @return the RemoteAccessException to throw
*/
protected RemoteAccessException convertJmsInvokerAccessException(JMSException ex) {
- throw new RemoteAccessException("Could not access JMS invoker queue [" + this.queue + "]", ex);
+ return new RemoteAccessException("Could not access JMS invoker queue [" + this.queue + "]", ex);
}
}
diff --git a/spring-jms/src/main/java/org/springframework/jms/support/converter/MessagingMessageConverter.java b/spring-jms/src/main/java/org/springframework/jms/support/converter/MessagingMessageConverter.java
index aefa4587..a4db7431 100644
--- a/spring-jms/src/main/java/org/springframework/jms/support/converter/MessagingMessageConverter.java
+++ b/spring-jms/src/main/java/org/springframework/jms/support/converter/MessagingMessageConverter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2014 the original author or authors.
+ * Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -93,7 +93,7 @@ public class MessagingMessageConverter implements MessageConverter, Initializing
Message.class.getName() + "] is handled by this converter");
}
Message<?> input = (Message<?>) object;
- javax.jms.Message reply = this.payloadConverter.toMessage(input.getPayload(), session);
+ javax.jms.Message reply = createMessageForPayload(input.getPayload(), session);
this.headerMapper.fromHeaders(input.getHeaders(), reply);
return reply;
}
@@ -119,4 +119,12 @@ public class MessagingMessageConverter implements MessageConverter, Initializing
return this.payloadConverter.fromMessage(message);
}
+ /**
+ * Create a JMS message for the specified payload.
+ * @see MessageConverter#toMessage(Object, Session)
+ */
+ protected javax.jms.Message createMessageForPayload(Object payload, Session session) throws JMSException {
+ return this.payloadConverter.toMessage(payload, session);
+ }
+
}
diff --git a/spring-jms/src/main/resources/META-INF/spring.schemas b/spring-jms/src/main/resources/META-INF/spring.schemas
index c7c42d46..a6ee238a 100644
--- a/spring-jms/src/main/resources/META-INF/spring.schemas
+++ b/spring-jms/src/main/resources/META-INF/spring.schemas
@@ -4,4 +4,5 @@ http\://www.springframework.org/schema/jms/spring-jms-3.1.xsd=org/springframewor
http\://www.springframework.org/schema/jms/spring-jms-3.2.xsd=org/springframework/jms/config/spring-jms-3.2.xsd
http\://www.springframework.org/schema/jms/spring-jms-4.0.xsd=org/springframework/jms/config/spring-jms-4.0.xsd
http\://www.springframework.org/schema/jms/spring-jms-4.1.xsd=org/springframework/jms/config/spring-jms-4.1.xsd
-http\://www.springframework.org/schema/jms/spring-jms.xsd=org/springframework/jms/config/spring-jms-4.1.xsd
+http\://www.springframework.org/schema/jms/spring-jms-4.2.xsd=org/springframework/jms/config/spring-jms-4.2.xsd
+http\://www.springframework.org/schema/jms/spring-jms.xsd=org/springframework/jms/config/spring-jms-4.2.xsd
diff --git a/spring-jms/src/main/resources/org/springframework/jms/config/spring-jms-4.2.xsd b/spring-jms/src/main/resources/org/springframework/jms/config/spring-jms-4.2.xsd
new file mode 100644
index 00000000..5122b3c4
--- /dev/null
+++ b/spring-jms/src/main/resources/org/springframework/jms/config/spring-jms-4.2.xsd
@@ -0,0 +1,638 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<xsd:schema xmlns="http://www.springframework.org/schema/jms"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:tool="http://www.springframework.org/schema/tool"
+ targetNamespace="http://www.springframework.org/schema/jms"
+ elementFormDefault="qualified"
+ attributeFormDefault="unqualified">
+
+ <xsd:import namespace="http://www.springframework.org/schema/tool" schemaLocation="http://www.springframework.org/schema/tool/spring-tool-4.2.xsd"/>
+
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Defines the configuration elements for the Spring Framework's JMS support.
+ Allows for configuring JMS listener containers in XML 'shortcut' style as
+ well as through annotation.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+
+ <xsd:element name="annotation-driven">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Enables the detection of @JmsListener annotation on any Spring-managed object. If
+ present, a message listener container will be created to receive the relevant
+ messages and invoke the annotated method accordingly.
+
+ See Javadoc for the org.springframework.jms.annotation.EnableJms annotation for
+ information on code-based alternatives to this XML element.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:attribute name="registry" type="xsd:string" use="optional">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Specifies the org.springframework.jms.config.JmsListenerEndpointRegistry instance to
+ use to register annotated jms listener endpoints. If not provided, a default instance
+ will be used by default.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.jms.config.JmsListenerEndpointRegistry"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="container-factory" type="xsd:string" use="optional">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Specifies the org.springframework.jms.config.JmsListenerContainerFactory instance to
+ use to create the container for a jms listener endpoint that does not define a specific
+ factory. This permits in practice to omit the "containerFactory" attribute of the JmsListener
+ annotation. This attribute is not required as each endpoint may define the factory to use and,
+ as a convenience, the JmsListenerContainerFactory with name 'jmsListenerContainerFactory' is
+ looked up by default.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.jms.config.JmsListenerContainerFactory"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="handler-method-factory" type="xsd:string" use="optional">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Specifies a custom org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
+ instance to use to configure the message listener responsible to serve an endpoint detected by this
+ processor. By default, DefaultMessageHandlerMethodFactory is used and it can be configured
+ further to support additional method arguments or to customize conversion and validation
+ support. See org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory
+ Javadoc for more details.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+
+
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:element name="listener-container">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Each listener child element will be hosted by a container whose configuration
+ is determined by this parent element. This variant builds standard JMS
+ listener containers, operating against a specified JMS ConnectionFactory. When
+ a factory-id attribute is present, the configuration defined by this element is
+ exposed as a org.springframework.jms.config.JmsListenerContainerFactory. It is
+ therefore possible to only define this element without any child to just expose
+ a container factory.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation>
+ <tool:exports type="org.springframework.jms.listener.AbstractMessageListenerContainer"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="listener" type="listenerType" minOccurs="0" maxOccurs="unbounded"/>
+ </xsd:sequence>
+ <xsd:attribute name="factory-id" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Expose the settings defined by this element as a org.springframework.jms.config.JmsListenerContainerFactory
+ so that they can be reused with other endpoints.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="container-type" default="default">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The type of this listener container: "default" or "simple", choosing
+ between DefaultMessageListenerContainer and SimpleMessageListenerContainer.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="default"/>
+ <xsd:enumeration value="simple"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:attribute>
+ <xsd:attribute name="container-class" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A custom listener container implementation class as fully qualified class name.
+ Default is Spring's standard DefaultMessageListenerContainer or
+ SimpleMessageListenerContainer, according to the "container-type" attribute.
+ Note that a custom container class will typically be a subclass of either of
+ those two Spring-provided standard container classes: Nake sure that the
+ "container-type" attribute matches the actual base type that the custom class
+ derives from ("default" will usually be fine anyway, since most custom classes
+ will derive from DefaultMessageListenerContainer).
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation>
+ <tool:expected-type type="java.lang.Class"/>
+ <tool:assignable-to type="org.springframework.jms.listener.AbstractMessageListenerContainer"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="connection-factory" type="xsd:string" default="connectionFactory">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to the JMS ConnectionFactory bean.
+ Default is "connectionFactory".
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="javax.jms.ConnectionFactory"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="task-executor" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to a Spring TaskExecutor (or standard JDK 1.5 Executor) for executing
+ JMS listener invokers. Default is a SimpleAsyncTaskExecutor in case of a
+ DefaultMessageListenerContainer, using internally managed threads. For a
+ SimpleMessageListenerContainer, listeners will always get invoked within the
+ JMS provider's receive thread by default.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="java.util.concurrent.Executor"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="destination-resolver" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to the DestinationResolver strategy for resolving destination names.
+ Default is a DynamicDestinationResolver, using the JMS provider's queue/topic
+ name resolution. Alternatively, specify a reference to a JndiDestinationResolver
+ (typically in a J2EE environment).
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.jms.support.destination.DestinationResolver"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="message-converter" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to the MessageConverter strategy for converting JMS Messages to
+ listener method arguments. Default is a SimpleMessageConverter.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.jms.support.converter.MessageConverter"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="error-handler" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to an ErrorHandler strategy for handling any uncaught Exceptions
+ that may occur during the execution of the MessageListener.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.util.ErrorHandler"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="destination-type" default="queue">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The JMS destination type for this listener: "queue", "topic", "durableTopic",
+ "sharedTopic", "sharedDurableTopic". This enables potentially the "pubSubDomain",
+ "subscriptionDurable" and "subscriptionShared" properties of the container. The
+ default is "queue" (i.e. disabling those 3 properties).
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="queue"/>
+ <xsd:enumeration value="topic"/>
+ <xsd:enumeration value="durableTopic"/>
+ <xsd:enumeration value="sharedTopic"/>
+ <xsd:enumeration value="sharedDurableTopic"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:attribute>
+ <xsd:attribute name="response-destination-type" default="queue">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The JMS destination type for responses: "queue", "topic". Default
+ is the value of the "destination-type" attribute.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="queue"/>
+ <xsd:enumeration value="topic"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:attribute>
+ <xsd:attribute name="client-id" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The JMS client id for this listener container.
+ Needs to be specified when using subscriptions.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="cache" default="auto">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The cache level for JMS resources: "none", "connection", "session", "consumer"
+ or "auto". By default ("auto"), the cache level will effectively be "consumer",
+ unless an external transaction manager has been specified - in which case the
+ effective default will be "none" (assuming J2EE-style transaction management
+ where the given ConnectionFactory is an XA-aware pool).
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="none"/>
+ <xsd:enumeration value="connection"/>
+ <xsd:enumeration value="session"/>
+ <xsd:enumeration value="consumer"/>
+ <xsd:enumeration value="auto"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:attribute>
+ <xsd:attribute name="acknowledge" default="auto">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The native JMS acknowledge mode: "auto", "client", "dups-ok" or "transacted".
+ A value of "transacted" effectively activates a locally transacted Session;
+ as alternative, specify an external "transaction-manager" via the corresponding
+ attribute. Default is "auto".
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="auto"/>
+ <xsd:enumeration value="client"/>
+ <xsd:enumeration value="dups-ok"/>
+ <xsd:enumeration value="transacted"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:attribute>
+ <xsd:attribute name="transaction-manager" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to an external PlatformTransactionManager (typically an
+ XA-based transaction coordinator, e.g. Spring's JtaTransactionManager).
+ If not specified, native acknowledging will be used (see "acknowledge" attribute).
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.transaction.PlatformTransactionManager"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="concurrency" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The number of concurrent sessions/consumers to start for each listener.
+ Can either be a simple number indicating the maximum number (e.g. "5")
+ or a range indicating the lower as well as the upper limit (e.g. "3-5").
+ Note that a specified minimum is just a hint and might be ignored at runtime.
+ Default is 1; keep concurrency limited to 1 in case of a topic listener
+ or if message ordering is important; consider raising it for general queues.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="prefetch" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The maximum number of messages to load into a single session.
+ Note that raising this number might lead to starvation of concurrent consumers!
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="receive-timeout" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The timeout to use for receive calls (in milliseconds).
+ The default is 1000 ms (1 sec); -1 indicates no timeout at all.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="back-off" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Specify the BackOff instance to use to compute the interval between recovery
+ attempts. If the BackOff implementation returns "BackOffExecution#STOP", the listener
+ container will not further attempt to recover. The recovery-interval value is
+ ignored when this property is set. The default is a FixedBackOff with an
+ interval of 5000 ms, that is 5 seconds.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.util.backoff.BackOff"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="recovery-interval" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Specify the interval between recovery attempts, in milliseconds. Convenience
+ way to create a FixedBackOff with the specified interval. For more recovery
+ options, consider specifying a BackOff instance instead. The default is
+ 5000 ms, that is 5 seconds.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="phase" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The lifecycle phase within which this container should start and stop. The lower
+ the value the earlier this container will start and the later it will stop. The
+ default is Integer.MAX_VALUE meaning the container will start as late as possible
+ and stop as soon as possible.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:element name="jca-listener-container">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Each listener child element will be hosted by a container whose configuration
+ is determined by this parent element. This variant builds standard JCA-based
+ listener containers, operating against a specified JCA ResourceAdapter
+ (which needs to be provided by the JMS message broker, e.g. ActiveMQ). When
+ a factory-id attribute is present, the configuration defined by this element is
+ exposed as a org.springframework.jms.config.JmsListenerContainerFactory. It is
+ therefore possible to only define this element without any child to just expose
+ a container factory.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation>
+ <tool:exports type="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="listener" type="listenerType" minOccurs="0" maxOccurs="unbounded"/>
+ </xsd:sequence>
+ <xsd:attribute name="factory-id" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Expose the settings defined by this element as a org.springframework.jms.config.JmsListenerContainerFactory
+ so that they can be reused with other endpoints.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="resource-adapter" type="xsd:string" default="resourceAdapter">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to the JCA ResourceAdapter bean for the JMS provider.
+ Default is "resourceAdapter".
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="javax.resource.spi.ResourceAdapter"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="activation-spec-factory" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to the JmsActivationSpecFactory.
+ Default is to autodetect the JMS provider and its ActivationSpec class
+ (see DefaultJmsActivationSpecFactory).
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.jms.listener.endpoint.JmsActivationSpecFactory"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="destination-resolver" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to the DestinationResolver strategy for resolving destination names.
+ Default is to pass in the destination name Strings into the JCA ActivationSpec as-is.
+ Alternatively, specify a reference to a JndiDestinationResolver (typically in a J2EE
+ environment, in particular if the server insists on receiving Destination objects).
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.jms.support.destination.DestinationResolver"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="message-converter" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to the MessageConverter strategy for converting JMS Messages to
+ listener method arguments. Default is a SimpleMessageConverter.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.jms.support.converter.MessageConverter"/>
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="destination-type" default="queue">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The JMS destination type for this listener: "queue", "topic" or "durableTopic".
+ Default is "queue".
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="queue"/>
+ <xsd:enumeration value="topic"/>
+ <xsd:enumeration value="durableTopic"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:attribute>
+ <xsd:attribute name="response-destination-type" default="queue">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The JMS destination type for responses: "queue", "topic". Default
+ is the value of the "destination-type" attribute.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="queue"/>
+ <xsd:enumeration value="topic"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:attribute>
+ <xsd:attribute name="client-id" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The JMS client id for this listener container.
+ Needs to be specified when using durable subscriptions.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="acknowledge" default="auto">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The native JMS acknowledge mode: "auto", "client", "dups-ok" or "transacted".
+ A value of "transacted" effectively activates a locally transacted Session;
+ as alternative, specify an external "transaction-manager" via the corresponding
+ attribute. Default is "auto".
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="auto"/>
+ <xsd:enumeration value="client"/>
+ <xsd:enumeration value="dups-ok"/>
+ <xsd:enumeration value="transacted"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:attribute>
+ <xsd:attribute name="transaction-manager" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ A reference to the Spring JtaTransactionManager or [javax.transaction.TransactionManager],
+ for kicking off an XA transaction for each incoming message.
+ If not specified, native acknowledging will be used (see "acknowledge" attribute).
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref"/>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="concurrency" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The number of concurrent sessions/consumers to start for each listener.
+ Can either be a simple number indicating the maximum number (e.g. "5")
+ or a range indicating the lower as well as the upper limit (e.g. "3-5").
+ Note that a specified minimum is just a hint and will typically be ignored
+ at runtime when using a JCA listener container. Default is 1.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="prefetch" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The maximum number of messages to load into a single session.
+ Note that raising this number might lead to starvation of concurrent consumers!
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="phase" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The lifecycle phase within which this container should start and stop. The lower
+ the value the earlier this container will start and the later it will stop. The
+ default is Integer.MAX_VALUE meaning the container will start as late as possible
+ and stop as soon as possible.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:complexType name="listenerType">
+ <xsd:attribute name="id" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The unique identifier for a listener.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="destination" type="xsd:string" use="required">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The destination name for this listener, resolved through the
+ container-wide DestinationResolver strategy (if any). Required.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="subscription" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The name for the durable subscription, if any.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="selector" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The JMS message selector for this listener.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="ref" type="xsd:string" use="required">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The bean name of the listener object, implementing
+ the MessageListener/SessionAwareMessageListener interface
+ or defining the specified listener method. Required.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref"/>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="method" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The name of the listener method to invoke. If not specified,
+ the target bean is supposed to implement the MessageListener
+ or SessionAwareMessageListener interface.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="response-destination" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The name of the default response destination to send response messages to.
+ This will be applied in case of a request message that does not carry
+ a "JMSReplyTo" field. The type of this destination will be determined
+ by the listener-container's "response-destination-type" attribute.
+ Note: This only applies to a listener method with a return value,
+ for which each result object will be converted into a response message.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="concurrency" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ The number of concurrent sessions/consumers to start for this listener.
+ Can either be a simple number indicating the maximum number (e.g. "5")
+ or a range indicating the lower as well as the upper limit (e.g. "3-5").
+ Note that a specified minimum is just a hint and might be ignored at runtime.
+ Default is the value provided by the container.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:complexType>
+
+</xsd:schema>