You
can use WSO2 ESB for processing large files (in GBs ) using VFS
transport while Smooks mediator can be used inside VFS listener to
read records from the files and route it to a destination queue ,
database or flat file for further processing rather than loading full
file content into the proxy payload.
Further
to this concept, below example will show you how to process large
file for reading millions of CSV records and route it to the JMS
Queue for further processing. The steps will help you to directly copy and paste the files into the specific folders for deploying the services , however you can create same configuration from ESB console rather than directly copying to the deployment folders.
Step 1: Download and install WSO2 ESB
- You can get the latest installation from ( http://wso2.com/more-downloads/esb )
- Also follow the documentation from https://docs.wso2.org/dashboard.action
Step 2: Enable VFS transport on ESB and enable JMS transport.
- Uncomment the below two lines from /<WSO2_ESB>/repository/conf/axis2/axis2.xml
<transportSender
name="vfs"
class="org.apache.synapse.transport.vfs.VFSTransportSender"/>
<transportReceiver
name="vfs"
class="org.apache.synapse.transport.vfs.VFSTransportListener"/>
|
- Enable JMS transport and create sample queue for forwarding records to JMS queue.
- Install ActiveMQ (http://activemq.apache.org/initial-configuration.html).
- Configure JMS transport in ESB using the steps in below link or search doc for latest ESB version. https://docs.wso2.org/display/ESB480/Configure+with+ActiveMQ
Step 3: Add the smooks configuration
- Copy smook-config-mapping.xml file to the <WSO2_ESB>/repository/resources/smooks/
smook-config-mapping.xml
<smooks-resource-list
xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd"
xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.1.xsd"
xmlns:jms="http://www.milyn.org/xsd/smooks/jms-routing-1.2.xsd"
xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd">
<params>
<param name="stream.filter.type">SAX</param>
<param
name="stream.filter.readerPoolSize">100</param>
</params>
<csv:reader
fields="firstname,lastname,gender,age,country"
separator="," quote="'" skipLines="1"
/>
<resource-config
selector="csv-record">
<resource>org.milyn.delivery.DomModelCreator</resource>
</resource-config>
<ftl:freemarker
applyOnElement="csv-record">
<ftl:template>/repository/resources/smooks/csv_record_as_xml.ftl</ftl:template>
<ftl:use>
<ftl:bindTo
id="csv_record_as_xml"/>
</ftl:use>
</ftl:freemarker>
<jms:router
routeOnElement="csv-record" beanId="csv_record_as_xml"
destination="TestQueue">
<jms:message>
<!--
Need to use special FreeMarker variable ".vars" -->
<jms:correlationIdPattern>${.vars["csv-record"].age}</jms:correlationIdPattern>
</jms:message>
<jms:jndi
properties="/repository/resources/smooks/activemq.sr.jndi.properties"
/>
<jms:highWaterMark
mark="10000000"/>
</jms:router>
</smooks-resource-list>
|
- To add the config key in local entries , please copy smook_config.xml <WSO2-ESB>/repository/deployment/server/synapse-configs/default/local-entries
smook_config.xml
<?xml
version="1.0" encoding="UTF-8"?>
<localEntry
xmlns="http://ws.apache.org/ns/synapse"
key="smook_config"
src="file:repository/resources/smooks/smooks-config-mapping.xml">
<description/>
</localEntry>
|
Step 4: Add JMS configuration and FTL template which used by smook mediator to split the message and make out put format.
- Add the 2 files called activemq.sr.jndi.properties and csv_record_as_xml.ftl into <WSO2-ESB>/repository/resources/smooks.
activemq.sr.jndi.properties
java.naming.factory.initial
= org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url
= tcp://localhost:61616
queue.TestQueue
= TestQueue
|
csv_record_as_xml.ftl
<#assign
csvrec = .vars["csv-record"]> <#-- special
assignment because order-item has a hyphen -->
<Customer>
<firstname>${csvrec.firstname}</firstname>
<lastname>${csvrec.lastname}</lastname>
<gender>${csvrec.gender}</gender>
<age>${csvrec.age}</age>
<country>${csvrec.country}</country>
</Customer>
|
Step 5: Create your VFS listener proxy to read the file and pass it to the smooks mediator
- Now copy the TestSmookRecVFS.xml proxy into the folder <WSO2_ESB>/repository/deployment/server/synapse-configs/default/proxy-services
TestSmookRecVFS.xml
<?xml
version="1.0" encoding="UTF-8"?>
<proxy
xmlns="http://ws.apache.org/ns/synapse"
name="TestSmookRecVFS"
transports="vfs"
startOnLoad="true"
trace="disable">
<description/>
<target>
<inSequence>
<log>
<property
name="JK*********" value="INSIDE VFS******"/>
</log>
<property
name="DISABLE_SMOOKS_RESULT_PAYLOAD" value="true"/>
<smooks
config-key="smook_config">
<input
type="text"/>
<output
type="xml"/>
</smooks>
</inSequence>
</target>
<parameter
name="transport.vfs.ActionAfterProcess">MOVE</parameter>
<parameter
name="transport.PollInterval">10</parameter>
<parameter
name="transport.vfs.MoveAfterProcess">file:///home/jayalal/Work/SmookRes/out</parameter>
<parameter
name="transport.vfs.FileURI">file:///home/jayalal/Work/SmookRes/in</parameter>
<parameter
name="transport.vfs.MoveAfterFailure">file:///home/jayalal/Work/SmookRes/failed</parameter>
<parameter
name="transport.vfs.FileNamePattern">.*.txt</parameter>
<parameter
name="transport.vfs.ContentType">text/plain</parameter>
<parameter
name="transport.vfs.ActionAfterFailure">MOVE</parameter>
</proxy>
|
Step 6 : Testing and Verify
Put
your flat file containing CSV records into the “
transport.vfs.FileURI” location. You will notice VFS lister will
pick the file and transfer record into the JMS queue.
Input.txt
name1,lastname1,Male,30,country1
name2,lastname2,Female,40,country2
name3,lastname3,Female,40,country3
name4,lastname4,Female,30,country4
|
Please note, active MQ will not store message more than 200 in default, so better you have reading listener to read from other end (as explain in next step) , so you can verify all of your millions of records successfully queued into the JMS queue.
Step 7: Verify with JMS Listener
You
can read messages coming to the “ TestQueue” by using JMS
listener proxy inside the ESB and further process the message.
Add
the JMS Listner to process the messages from the queue;
-
Put the proxy service file called JKTestJMSListner.xml into
<WSO2_ESB>/repository/deployment/server/synapse-configs/default/proxy-services
JKTestJMSListner.xml
<?xml
version="1.0" encoding="UTF-8"?>
<proxy
xmlns="http://ws.apache.org/ns/synapse"
name="JKTestJMSListner"
transports="jms"
startOnLoad="true"
trace="disable">
<description/>
<target>
<inSequence>
<log/>
</inSequence>
</target>
<parameter
name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter
name="transport.jms.ConcurrentConsumers">1</parameter>
<parameter
name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
<parameter
name="transport.jms.SessionTransacted">false</parameter>
<parameter
name="transport.jms.Destination">TestQueue</parameter>
<parameter
name="transport.jms.CacheLevel">consumer</parameter>
<parameter
name="transport.jms.MaxConcurrentConsumers">1</parameter>
</proxy>
|
NOTES:
We
have added below property to avoid the large file content into the
payload since we already have consumed the file content and routed into the
JMS queue.
<property
name="DISABLE_SMOOKS_RESULT_PAYLOAD" value="true"/>
If
this parameter is set to false (default value is false). Your file
content will be loaded into the message context and may give out of
memory error as below , this property will be available in ESB 4.9
onward or you have patch for fixes
(https://wso2.org/jira/browse/ESBJAVA-3031)
for your old ESB version.
You will be getting below
exception if you have not included disable smook flag and your JVM
memory is out.
java.lang.OutOfMemoryError:
Java heap space
Dumping heap to
/home/jayalal/Software/wso2esb-4.7.0/repository/logs/heap-dump.hprof
...
Heap dump file created
[2015812579 bytes in 11.531 secs]
[2014-03-13
06:23:42,973] ERROR - NativeWorkerPool Uncaught exception
java.lang.OutOfMemoryError:
Java heap space
at
org.apache.axiom.om.impl.llom.factory.OMLinkedListImplFactory.createOMText(OMLinkedListImplFactory.java:192)
at
org.apache.axiom.om.impl.builder.StAXBuilder.createOMText(StAXBuilder.java:294)
at
org.apache.axiom.om.impl.builder.StAXBuilder.createOMText(StAXBuilder.java:250)
at
org.apache.axiom.om.impl.builder.StAXOMBuilder.next(StAXOMBuilder.java:252)
at
org.apache.axiom.om.impl.llom.OMSerializableImpl.build(OMSerializableImpl.java:78)
at
org.apache.axiom.om.impl.llom.OMElementImpl.build(OMElementImpl.java:722)
at
org.apache.axiom.om.impl.llom.OMElementImpl.detach(OMElementImpl.java:700)
at
org.apache.axiom.om.impl.llom.OMNodeImpl.setParent(OMNodeImpl.java:105)
at
org.apache.axiom.om.impl.llom.OMElementImpl.addChild(OMElementImpl.java:296)
at
org.apache.axiom.om.impl.llom.OMElementImpl.addChild(OMElementImpl.java:212)
at
org.apache.axiom.soap.impl.llom.SOAPBodyImpl.addChild(SOAPBodyImpl.java:231)
at
org.wso2.carbon.mediator.transform.Output.setXMLPayload(Output.java:228)
at
org.wso2.carbon.mediator.transform.Output.process(Output.java:95)
at
org.wso2.carbon.mediator.transform.SmooksMediator.mediate(SmooksMediator.java:125)
at
org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
at
org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:114)
at
org.apache.synapse.core.axis2.ProxyServiceMessageReceiver.receive(ProxyServiceMessageReceiver.java:162)
at
org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
at
org.apache.axis2.transport.base.AbstractTransportListener.handleIncomingMessage(AbstractTransportListener.java:328)
at
org.apache.synapse.transport.vfs.VFSTransportListener.processFile(VFSTransportListener.java:590)
at
org.apache.synapse.transport.vfs.VFSTransportListener.scanFileOrDirectory(VFSTransportListener.java:324)
at
org.apache.synapse.transport.vfs.VFSTransportListener.poll(VFSTransportListener.java:158)
at
org.apache.synapse.transport.vfs.VFSTransportListener.poll(VFSTransportListener.java:107)
at
org.apache.axis2.transport.base.AbstractPollingTransportListener$1$1.run(AbstractPollingTransportListener.java:67)
at
org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
|
Hi
ReplyDeleteHave you tried using "DISABLE_SMOOKS_RESULT_PAYLOAD" property as explained in this document.
Also if you have not got latest builds for 4.8.1 , then you may have to use 4.9
Thanks
Jayalal
Hi,
ReplyDeleteInstead of using queue, can we put it in Database inside smooks?