I have a jms producer, which generates many messages per second, which are sent to amq persistent queue and are consumed by single consumer, which needs to process them sequentially. But it seems that the producer is much faster than the consumer and i am having performance and memory problems. Messages are fetched very very slowly and the consuming seems to happen on intervals (the consumer "asks" for messages in polling fashion, which is strange?!)
Basically everything happens with spring integration. Here is the configuration at the producer side. First stake messages come in stakesInMemoryChannel, from there, they are filtered throw the filteredStakesChannel and from there they are going into the jms queue (using executor so the sending will happen in separate thread)
<bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="${jms.stakes.queue.name}" />
</bean>
<int:channel id="stakesInMemoryChannel" />
<int:channel id="filteredStakesChannel" >
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<bean id="stakeFilterService" class="cayetano.games.stake.StakeFilterService"/>
<int:filter
input-channel="stakesInMemoryChannel"
output-channel="filteredStakesChannel"
throw-exception-on-rejection="false"
expression="true"/>
<jms:outbound-channel-adapter channel="filteredStakesChannel" destination="stakesQueue" delivery-persistent="true" explicit-qos-enabled="true" />
<task:executor id="taskExecutor" pool-size="100" />
The other application is consuming the messages like this... The messages come in stakesInputChannel from the jms stakesQueue, after that they are routed to 2 separate channels, one persists the message and the other do some other stuff, lets call it "processing".
<bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="${jms.stakes.queue.name}" />
</bean>
<jms:message-driven-channel-adapter
channel="stakesInputChannel"
destination="stakesQueue"
acknowledge="auto"
concurrent-consumers="1"
max-concurrent-consumers="1"
/>
<int:publish-subscribe-channel id="stakesInputChannel" />
<int:channel id="persistStakesChannel" />
<int:channel id="processStakesChannel" />
<int:recipient-list-router
id="customRouter"
input-channel="stakesInputChannel"
timeout="3000"
ignore-send-failures="true"
apply-sequence="true"
>
<int:recipient channel="persistStakesChannel"/>
<int:recipient channel="processStakesChannel"/>
</int:recipient-list-router>
<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch" value="${jms.broker.prefetch.policy}" />
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.broker.url}" />
<property name="prefetchPolicy" ref="prefetchPolicy" />
<property name="optimizeAcknowledge" value="true" />
<property name="useAsyncSend" value="true" />
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>