Search code examples
javamultithreadingiso8583jpos

Multiple ISO8583 request parallel on a JPOS mux


I have a working JPOS ISO8583 server that able to handle request asynchronously, then i need to develop a test client to stress test it.

I want to know, is a mux able to handle multiple request at once and process it parallely? See this diagram. I'm using ISOMUX (not QMUX).

enter image description here

if the answer is Yes, then how to do it?

I tried with this code:

Customizer.java

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.jpos.iso.ISODate;
import org.jpos.iso.ISOException;
import org.jpos.iso.ISOMUX;
import org.jpos.iso.ISOMsg;
import org.jpos.iso.channel.ASCIIChannel;
import org.jpos.iso.packager.ISO87APackager;

public class Customizer {

    private ISO87APackager packager;
    private ASCIIChannel channel;
    private ISOMUX mux;

    public Customizer() {
        try {
            this.packager = new ISO87APackager();
            this.channel = new ASCIIChannel(packager);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static String C_HOST = "127.0.0.1";
    static int C_PORT = 2000;
    static int C_TIMEOUT = 60000;
    public int stan= 0;

    public void connect() throws Exception{
        this.channel.setHost(C_HOST);
        this.channel.setPort(C_PORT);
        this.channel.setTimeout(C_TIMEOUT);
        this.channel.connect();

        this.mux = new ISOMUX(channel);
        Thread muxThread = new Thread(mux);
        muxThread.start();

        System.out.println("Connected with " + channel.getHost() + ":" + channel.getPort() + " ? " + mux.isConnected());
        System.out.println(channel.getSocket());
    }

    private void completeMsg(ISOMsg msg) {
        if (msg == null) {
            return;
        }
        Date now = new Date();
        try {
            msg.set(7, ISODate.getDateTime(now));
            msg.set(11, String.format("%06d", stan++));
            msg.set(12, ISODate.getTime(now));
            msg.set(13, ISODate.getDate(now));
            msg.set(15, ISODate.getDate(now));
        } catch (ISOException e) {
        }
    }

    public void signIn() throws ISOException {
        ISOMsg reqMsg = new ISOMsg();
        reqMsg.setPackager(packager);
        reqMsg.setDirection(ISOMsg.OUTGOING);
        reqMsg.setMTI("0800");
        reqMsg.set(70, "001");

        completeMsg(reqMsg);

        if (mux.isConnected()) {
            try {
                ISOMsg respMsg;
                respMsg = mux.request(reqMsg, C_TIMEOUT);
                System.out.println("Success");
            } catch (ISOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("Error sending: not connected to server");
        }
    }

    static int NUM_THREADS = 3;
    public static void main(String[] args) {
        Customizer tr = new Customizer();

        try {
            System.out.println("--STRESS TEST--");
            System.out.println(">> CONNECTION");
            tr.connect();

            System.out.println(">> SIGN ON");
            tr.signIn();

            ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
            List<Callable<String>> calList = new ArrayList<Callable<String>>();

            long start = System.currentTimeMillis();

            System.out.println(">> REQUEST WITH "+NUM_THREADS+" THREADS");
            for (int i=0; i<NUM_THREADS; i++) {
                // the XML_TEST is a xml string, the xml later converted to ISOMsg 
                Callable<String> callable = new RequestSender(tr.packager, tr.mux, ConstantData.XML_TEST, i);
                calList.add(callable);
            }

            List<Future<String>> futList = executor.invokeAll(calList);

            System.out.println(">> FINISHED");
            System.out.println("elapsed ms: " + (System.currentTimeMillis() - start));
            for (Future<String> fut : futList) {
                try {
                    System.out.println(new Date()+ " | msgId: "+fut.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            executor.shutdown();    
            System.exit(1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

RequestSender.java

import org.jpos.iso.ISODate;
import org.jpos.iso.ISOException;
import org.jpos.iso.ISOMUX;
import org.jpos.iso.ISOMsg;
import org.jpos.iso.packager.ISO87APackager;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Callable;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

public class RequestSender implements Callable<String> {

    static int C_TIMEOUT = 25000; //changed
    private ISO87APackager packager;
    private ISOMUX mux;
    private String xmlReq;
    private int loop;
    private int stan = 1;

    public RequestSender(ISO87APackager packager, ISOMUX mux, String xmlReq, int loop) {
        super();
        this.packager = packager;
        this.mux = mux;
        this.xmlReq = xmlReq;
        this.loop = loop;
    }

    private ISOMsg isoBuilder(String xml){
        ISOMsg msg = new ISOMsg();
        try {
            DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
            DocumentBuilder dBuilder;
            dBuilder = dbFactory.newDocumentBuilder();
            Document doc = dBuilder.parse(new ByteArrayInputStream(xml.getBytes()));
            doc.getDocumentElement().normalize();
            NodeList nList = doc.getElementsByTagName("field");
            for (int temp = 0; temp < nList.getLength(); temp++) {
                Node nNode = nList.item(temp);
                if (nNode.getNodeType() == Node.ELEMENT_NODE) {
                    Element eElement = (Element) nNode;
                    msg.set(eElement.getAttribute("id"), eElement.getAttribute("value"));
                }
            }
            msg.set("37", "0000000009"+loop);
            completeMsg(msg);
        } catch (ParserConfigurationException e) {
            e.printStackTrace();
        } catch (SAXException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ISOException e) {
            e.printStackTrace();
        }
        return msg;
    }

    private void completeMsg(ISOMsg msg) {
        if (msg == null) {
            return;
        }
        Date now = new Date();
        try {
            msg.set(7, ISODate.getDateTime(now));
            msg.set(11, String.format("%06d", loop));  //changed 
            msg.set(12, ISODate.getTime(now));
            msg.set(13, ISODate.getDate(now));
            msg.set(15, ISODate.getDate(now));
        } catch (ISOException e) {
        }
    }

    private synchronized String testConfirmPayment() throws IOException {
        if (mux.isConnected()) {
            ISOMsg reqMsg = isoBuilder(xmlReq);
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())+" | REQ"+loop+"= "+reqMsg.toString().trim()+" BIT 37 : "+reqMsg.getString("37"));
            try {
                ISOMsg respMsg = mux.request(reqMsg, C_TIMEOUT);
                if (respMsg != null){
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())+" | RES"+loop+"= "+respMsg.getString(48));
                    return respMsg.getString(48);
                } else {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())+" | RES"+loop+"=  NO RESPONSE");
                    return "NO RESPONSE ";
                }
            } catch (ISOException e) {
                e.printStackTrace();
            }
        }
        return "NOT CONNECTED";
    }

    public String call() throws Exception {
        return testConfirmPayment();
    }

}

This request is successfully replied by server, but only on last request.

Running Output

--STRESS TEST--
>> CONNECTION
Connected with 127.0.0.1:2000 ? true
Socket[addr=/127.0.0.1,port=2000,localport=51086]
>> SIGN ON
Success
>> REQUEST WITH 3 THREADS
2018-01-23 09:12:06.355 | REQ0= 0200 000001 00000003 BIT 37 : 00000000090
2018-01-23 09:12:06.355 | REQ1= 0200 000001 00000003 BIT 37 : 00000000091
2018-01-23 09:12:06.355 | REQ2= 0200 000001 00000003 BIT 37 : 00000000092
2018-01-23 09:12:06.453 | RES2= 10004100108XXXXX  JOHNSON SIMONE XXX                  
2018-01-23 09:12:11.450 | RES1= NO RESPONSE
2018-01-23 09:12:26.357 | RES0= NO RESPONSE
>> FINISHED
elapsed ms: 20045

So how to solve this? if i tracked down i see the server is responding but unable to get in to the MUX especially on

ISOMsg respMsg = mux.request(reqMsg, C_TIMEOUT);

Any reply and suggestion is apppreciated.

Thankyou in advance

Update

The problem has been solved, The code above is fully working now..


Solution

  • Make sure you increment the STAN (serial trace audit number, data element 11). That's the reason you're not getting responses, your client is sending duplicate messages.