I have a JBoss web application that is currently using embedded HornetQ for JMS. We want to switch to an ActiveMQ HA cluster, but I'm running into some strange issues. One of my queues (periodicDerivationQueue) is not behaving as it does with HornetQ. The AMQ console shows that messages are enqueued and dequeued, but they are not making it to my consumer. At first, I assumed that the messages were dequeuing into the DLQ for some reason, but that doesn't seem to be the case. As I understand it, AMQ will not create the DLQ unless it is necessary. When I look at the broker there is no DLQ. How can I figure out where my messages are going?
I'm also having trouble debugging from the app side of the stack due to reflection. I'd like to set a breakpoint on the AMQ side to see whats happening with my messages, but I'm not sure where to put it. Any ideas here?
Could this be a serialization issue? I've heard that sometimes differences in serialization between JMS brokers can lead to strange behavior.
I'm really stuck here and any help would be appreciated. See config info below.
Wildfly 8.2
AMQ 5.13
Consumer (Messages not making it here)
public class PeriodicDerivationExecutionHandlerImpl implements PeriodicDerivationExecutionHandler {
protected DerivationService derivationService;
protected DerivationModelService derivationModelService;
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void executeDerivation(PeriodicDerivation params) throws Exception{
JbpmHibernateUtil.openSession();
Derivation derivation = null;
try{
if (params.isGroup()){
derivation = new GroupDerivation();
GroupQueryParameters qp = new GroupQueryParameters();
qp.setGroupName(params.getItemName());
derivation.setDerivedItem(derivationModelService.getGroup(qp));
}else{
derivation = new DeterminantDerivation();
DeterminantQueryParameters qp = new DeterminantQueryParameters();
qp.setDeterminantName(params.getItemName());
derivation.setDerivedItem(derivationModelService.getDeterminant(qp));
}
logger.info("Executing periodic derivation [" + derivation + "]");
derivation.setModelEffectiveDate(new DateTime());
derivation.setPeriod(params.getPeriod());
derivation.getProcessParameters().add(new DerivationProcessParameter(PeriodicDerivation.PERIODIC_PROCESS_VAR, true));
derivation.setExecutionMode(DerivationExecutionMode.SYNCHRONOUS_LOCAL);
derivationService.executeDerivation(derivation);
JbpmHibernateUtil.closeSession(true);
}catch(Exception e){
logger.error("Periodic derivation execution failed for [" + derivation + "]",e);
JbpmHibernateUtil.closeSession(false);
throw new Exception("Periodic derivation execution failed for [" + derivation + "]",e);
}
}
public DerivationService getDerivationService() {
return derivationService;
}
public void setDerivationService(DerivationService derivationService) {
this.derivationService = derivationService;
}
public DerivationModelService getDerivationModelService() {
return derivationModelService;
}
public void setDerivationModelService(DerivationModelService derivationModelService) {
this.derivationModelService = derivationModelService;
}
}
Consumer XML config
<int:gateway id="periodicDerivationExecutionGateway"
service-interface="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandler">
<int:method name="executeDerivation" request-channel="periodicDerivationChannel" />
</int:gateway>
<bean id="periodicDerivationExecutor"
class="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandlerImpl">
<property name="derivationService" ref="derivationService" />
<property name="derivationModelService" ref="derivationModelService" />
</bean>
<int:service-activator input-channel="periodicDerivationChannel"
ref="periodicDerivationExecutor" method="executeDerivation" />
<int-jms:channel id="periodicDerivationChannel"
queue-name="${jms.destination.name.periodicderivation}" concurrency="${integration.listener.threads.maximum}"
task-executor="periodicDerivationTaskExecutor" />
ActiveMQ Standalone.xml (Jboss)
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
<resource-adapters>
<resource-adapter id="activemq-rar.rar">
<archive>
activemq-rar.rar
</archive>
<transaction-support>XATransaction</transaction-support>
<config-property name="ServerUrl">
tcp://127.0.0.1:61616?jms.rmIdFromConnectionId=true
</config-property>
<config-property name="UserName">
admin
</config-property>
<config-property name="Password">
admin
</config-property>
<connection-definitions>
<connection-definition
class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory"
jndi-name="java:/ConnectionFactory"
enabled="true"
pool-name="ConnectionFactory">
<xa-pool>
<min-pool-size>1</min-pool-size>
<max-pool-size>20</max-pool-size>
<prefill>false</prefill>
<is-same-rm-override>false</is-same-rm-override>
</xa-pool>
<recovery>
<recover-credential>
<user-name>admin</user-name>
<password>admin</password>
</recover-credential>
<recover-plugin class-name="org.jboss.jca.core.recovery.ConfigurableRecoveryPlugin">
<config-property name="EnableIsValid">
false
</config-property>
<config-property name="IsValidOverride">
true
</config-property>
<config-property name="EnableClose">
true
</config-property>
</recover-plugin>
</recovery>
</connection-definition>
</connection-definitions>
Queues/Topics
<admin-objects>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue"
jndi-name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue"
use-java-context="true"
pool-name="deferredBpmCommandQueue">
<config-property name="PhysicalName">
deferredBpmCommandQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue"
jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue"
use-java-context="true"
pool-name="ActiveMQQueue.asyncActionRequestQueue">
<config-property name="PhysicalName">
asyncActionRequestQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue"
jndi-name="java:jboss/exported/jms/queue/bpm/DLQ"
use-java-context="true"
pool-name="DLQ">
<config-property name="PhysicalName">
DLQ
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue" use-java-context="true" pool-name="ActiveMQQueue.cacheUpdateReplicationQueue">
<config-property name="PhysicalName">
cacheUpdateReplicationQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue" use-java-context="true" pool-name="ActiveMQQueue.periodicDerivationQueue">
<config-property name="PhysicalName">
periodicDerivationQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncServiceSignalQueue">
<config-property name="PhysicalName">
asyncServiceSignalQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/processEventTopic" use-java-context="true" pool-name="ActiveMQTopic.processEventTopic">
<config-property name="PhysicalName">
processEventTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionReplyQueue">
<config-property name="PhysicalName">
asyncActionReplyQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/ExpiryQueue" use-java-context="true" pool-name="ActiveMQQueue.ExpiryQueue">
<config-property name="PhysicalName">
ExpiryQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusRequestTopic">
<config-property name="PhysicalName">
asyncActionServiceStatusRequestTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityRequestTopic">
<config-property name="PhysicalName">
asyncActionAffinityRequestTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue" use-java-context="true" pool-name="ActiveMQQueue.jbpmJobQueue">
<config-property name="PhysicalName">
jbpmJobQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityReplyTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityReplyTopic">
<config-property name="PhysicalName">
asyncActionAffinityReplyTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationEventTopic" use-java-context="true" pool-name="ActiveMQTopic.cacheUpdateReplicationEventTopic">
<config-property name="PhysicalName">
cacheUpdateReplicationEventTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusTopic">
<config-property name="PhysicalName">
asyncActionServiceStatusTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionServiceLogRecordQueue">
<config-property name="PhysicalName">
asyncActionServiceLogRecordQueue
</config-property>
</admin-object>
</admin-objects>
Broker config
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" dataDirectory="${activemq.data}" persistent="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
HornetQ Standalone.xml (Jboss)
<subsystem xmlns="urn:jboss:domain:messaging:2.0">
<hornetq-server>
<persistence-enabled>false</persistence-enabled>
<jmx-management-enabled>true</jmx-management-enabled>
<shared-store>true</shared-store>
<journal-type>ASYNCIO</journal-type>
<journal-file-size>102400</journal-file-size>
<journal-min-files>2</journal-min-files>
<connectors>
<netty-connector name="netty" socket-binding="messaging"/>
<netty-connector name="netty-throughput" socket-binding="messaging-throughput">
<param key="batch-delay" value="50"/>
</netty-connector>
<in-vm-connector name="in-vm" server-id="0"/>
</connectors>
<acceptors>
<netty-acceptor name="netty" socket-binding="messaging"/>
<netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</netty-acceptor>
<in-vm-acceptor name="in-vm" server-id="0"/>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="send" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>104857600</max-size-bytes>
<page-size-bytes>10485760</page-size-bytes>
<page-max-cache-size>10</page-max-cache-size>
<address-full-policy>PAGE</address-full-policy>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
<jms-connection-factories>
<connection-factory name="InVmConnectionFactory">
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
<entry name="java:/ConnectionFactory"/>
</entries>
</connection-factory>
<connection-factory name="RemoteConnectionFactory">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
</entries>
<client-failure-check-period>30000</client-failure-check-period>
<connection-ttl>300000</connection-ttl>
<retry-interval>2000</retry-interval>
<retry-interval-multiplier>1</retry-interval-multiplier>
<max-retry-interval>2000</max-retry-interval>
<reconnect-attempts>100</reconnect-attempts>
</connection-factory>
</jms-connection-factories>
Queues/Topics
<jms-destinations>
<jms-queue name="asyncActionRequestQueue">
<entry name="queue/bpm/asyncActionRequestQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue"/>
</jms-queue>
<jms-queue name="asyncActionReplyQueue">
<entry name="queue/bpm/asyncActionReplyQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue"/>
</jms-queue>
<jms-queue name="asyncServiceSignalQueue">
<entry name="queue/bpm/asyncServiceSignalQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue"/>
</jms-queue>
<jms-queue name="asyncActionServiceLogRecordQueue">
<entry name="queue/bpm/asyncActionServiceLogRecordQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue"/>
</jms-queue>
<jms-queue name="deferredBpmCommandQueue">
<entry name="queue/bpm/deferredBpmCommandQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue"/>
</jms-queue>
<jms-queue name="jbpmJobQueue">
<entry name="queue/bpm/jbpmJobQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue"/>
</jms-queue>
<jms-queue name="DLQ">
<entry name="queue/DLQ"/>
<entry name="java:jboss/exported/jms/queue/DLQ"/>
</jms-queue>
<jms-queue name="ExpiryQueue">
<entry name="queue/ExpiryQueue"/>
<entry name="java:jboss/exported/jms/queue/ExpiryQueue"/>
</jms-queue>
<jms-queue name="periodicDerivationQueue">
<entry name="queue/bpm/periodicDerivationQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue"/>
</jms-queue>
<jms-queue name="cacheUpdateReplicationQueue">
<entry name="queue/bpm/cacheUpdateReplicationQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue"/>
</jms-queue>
<jms-topic name="asyncActionServiceStatusTopic">
<entry name="topic/bpm/asyncActionServiceStatusTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusTopic"/>
</jms-topic>
<jms-topic name="asyncActionServiceStatusRequestTopic">
<entry name="topic/bpm/asyncActionServiceStatusRequestTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusRequestTopic"/>
</jms-topic>
<jms-topic name="asyncActionAffinityRequestTopic">
<entry name="topic/bpm/asyncActionAffinityRequestTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityRequestTopic"/>
</jms-topic>
<jms-topic name="asyncActionAffinityReplyTopic">
<entry name="topic/bpm/asyncActionAffinityReplyTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityReplyTopic"/>
</jms-topic>
<jms-topic name="processEventTopic">
<entry name="topic/bpm/processEventTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/processEventTopic"/>
</jms-topic>
<jms-topic name="cacheUpdateReplicationEventTopic">
<entry name="topic/bpm/cacheUpdateReplicationEventTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/cacheUpdateReplicationEventTopic"/>
</jms-topic>
</jms-destinations>
ObjectMessage serialization security was the issue.
ObjectMessage objects depend on Java serialization of marshal/unmarshal object payload. This process is generally considered unsafe as malicious payload can exploit the host system. That's why starting with versions 5.12.2 and 5.13.0, ActiveMQ enforces users to explicitly whitelist packages that can be exchanged using ObjectMessages.
I saw this a few days ago and added a white list but it didn't fix the issue. I also tried running against AMQ 5.11.3 and it didn't work. Apparently they added the security feature to 5.11.3 too. Anyway, I added this (-Dorg.apache.activemq.SERIALIZABLE_PACKAGES="*") to the client side and AMQ vm arguments and now everything is working as it should.
Keep in mind that the command line option I used is a security vulnerability that I've explicitly opened in my broker, which can allow a malicious user to execute code on my system. The right way to use that flag is to explicitly list the classes you allow to be deserialized, or at most use package wildcards to avoid explicitly listing individual classes and subpackages in a trusted parent package.