Search code examples
mysqlapache-camelfuseesb

Fuse ide how to define database table end point


I have heard alot of success integration story when comes to Apache Camel with Fuse. HEnce. here Im just starting to explore the Fuse IDE, with just a simple task on top of my head, i would like to achieve:

  1. Read a fix length file
  2. Parse the fix length file
  3. persist it to mysql database table

I am only able to get as far as:

  1. Read the fix length file (with Endpoint "file:src/data/Japan?noop=true")
  2. Define a Marshal with Bindy and Define a POJO package model with @FixedLengthRecord annotation
  3. then i am stuck... HOW TO persist the POJO into mysql database table? I can see some JDBC, IBatis and JPA end point, but how to accomplish that in Fuse IDE?

CamelContext.xml

My POJO package:
package com.mbww.model;

import org.apache.camel.dataformat.bindy.annotation.DataField;
import org.apache.camel.dataformat.bindy.annotation.FixedLengthRecord;

@FixedLengthRecord(length=91)
public class Japan {

@DataField(pos=1, length=10)
private String TNR; 

@DataField(pos=11, length=10)
private String ATR;

@DataField(pos=21, length=70)
private String STR; 
}

Solution

  • Well you can use all of the following components to actually read and write from the database:

    1. JDBC
    2. IBATIS
    3. MyBATIS
    4. SPRING-JDBC
    5. SQL
    6. Custom Processor

    I am going to show you how to use the custom processor to insert the rows into a table. The main reason for this is that you will get to work with the messages and exchange and this will give you more of a insight into Camel. All of the other components can be used by following the documentation on the camel site.

    So lets review what you have. You are reading the file and converting the body to a bindy object. So for each line in your text file Camel will send a bindy object of class com.mbww.model.JAPAN to the next end point. This next end point needs to talk to the database. There is one problem I can spot immediately you are using a marshal you should be using a unmarshal.

    The documentation clearly states: If you receive a message from one of the Camel Components such as File, HTTP or JMS you often want to unmarshal the payload into some bean so that you can process it using some Bean Integration or perform Predicate evaluation and so forth. To do this use the unmarshal word in the DSL in Java or the Xml Configuration.

    Your bindy class looks good but it is missing getters and setters modify the class to look like this:

    package com.mbww.model;
    import org.apache.camel.dataformat.bindy.annotation.DataField;
    import org.apache.camel.dataformat.bindy.annotation.FixedLengthRecord;
    
    @FixedLengthRecord(length=91)
    public class Japan {
    
      @DataField(pos=1, length=10)
      private String TNR; 
    
      @DataField(pos=11, length=10)
      private String ATR;
    
      @DataField(pos=21, length=70)
      private String STR;
    
      public String getTNR() {
        return TNR;
      }
    
      public void setTNR(String tNR) {
        TNR = tNR;
      }
    
      public String getATR() {
        return ATR;
      }
    
      public void setATR(String aTR) {
        ATR = aTR;
      }
    
      public String getSTR() {
        return STR;
      }
    
      public void setSTR(String sTR) {
        STR = sTR;
      } 
    }
    

    First you need to create a data source to your database in your route. First thing is to add the mysql driver jar to your maven dependencies open your pom.xml file and add the following dependency to it.

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <!-- use this version of the driver or a later version of the driver -->
        <version>5.1.25</version>
    </dependency>
    

    Right now we need to declare a custom processor to use in the route that will use this driver and insert the received body into a table.

    So lets create a new class in Fuse IDE called PersistToDatabase code below:

    package com.mbww.JapanData;
    import java.sql.DriverManager;
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.Map;
    
    import org.apache.camel.Body;
    import org.apache.camel.Exchange;
    import org.apache.camel.Handler;
    import org.apache.camel.Headers;
    
    import com.mbww.model.Japan;
    import com.mysql.jdbc.Statement;
    
    
    public class PersistToDatabase {
    @Handler
    public void PersistRecord
    (
            @Body Japan msgBody
            , @Headers Map hdr
            , Exchange exch
    ) throws Exception
    {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            System.out.println("Where is your MySQL JDBC Driver?");
            e.printStackTrace();
            return;
        }
    
        System.out.println("MySQL JDBC Driver Registered!");
        Connection connection = null;
    
        try {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/databasename","root", "password");
    
        } catch (SQLException e) {
            System.out.println("Connection Failed! Check output console");
            e.printStackTrace();
            return;
        }
    
        if (connection != null) {
            System.out.println("You made it, take control your database now!");
        } else {
            System.out.println("Failed to make connection!");
        }
        try {
            PreparedStatement stmt=connection.prepareStatement("INSERT INTO JapanDate(TNR,ATR,STR) VALUES(?,?,?)");
            stmt.setString(1, msgBody.getTNR());
            stmt.setString(2, msgBody.getATR());
            stmt.setString(1, msgBody.getSTR());
            int rows = stmt.executeUpdate();
            System.out.println("Number of rows inserted: "+Integer.toString(rows));
        }
        catch(Exception e){
            System.out.println("Error in executing sql statement: "+e.getMessage() );
            throw new Exception(e.getMessage());
        }
    
    }
    }
    

    This class is a POJO nothing fancy except the @Handler annotation on the PersistRecord. This annotation tells camel that the PersistRecord method/procedure will handle the message exchange. You will also notice that the method PersistRecord has a parameter of type Japan. As mentioned earlier when you call the conversion bean in your camel route it translates each line into a Japan object and passes it along the route.

    The rest of the code is just how to handle the JDBC connection and calling a insert statement.

    We are almost done just one last thing to do. We need to declare this class in our camel route xml. This file will typically be called camel-route.xml or blueprint.xml depending on your arch type. Open the source tab and add the following line <bean id="JapanPersist" class="com.mbww.JapanData.PersistToDatabase"/> before the <camelContext> tag.

    This declares a new spring bean called JapanPersist based on the class we just added to the camel route. You can now reference this bean inside your camel route.

    Thus the final route xml file should look something like this:

    <?xml version="1.0" encoding="UTF-8"?>
    <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/blueprint"
       xsi:schemaLocation="
       http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
       http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">
    
    <bean id="JapanPersist" class="com.mbww.JapanData.PersistToDatabase"/>
    
      <camelContext trace="false" id="blueprintContext" xmlns="http://camel.apache.org/schema/blueprint">
        <route id="JapanDataFromFileToDB">
            <from uri="file:src/data/japan"/>
            <unmarshal ref="Japan"/>
            <bean ref="JapanPersist"/>        
        </route>
    </camelContext>
    
    </blueprint>
    

    Or see screen shot below:

    enter image description here

    Once you understand this technique you can start scaling the solution by using a splitter, connection pooling and threading to do massive amount of concurrent inserts etc.

    Using the technique above you learned how to inject your own beans into a camel route which give you the ability to work with the messages directly in code.

    I have not tested the code so there will probably be a bug or two but the idea should be clear.