Search code examples
javamultithreadingpackets

Multiple Thread Processing


I have few packets of data as below,

00 FF FF 00 00 08 00 64 00 **07** 08 FC 09 90 07 09 

00 FF FF 00 00 08 00 64 00 **04** 08 0B 07 E3 0A 0B 

00 FF FF 00 00 08 00 64 00 **07** 09 25 09 C0 0B C7 

00 FF FF 00 00 08 00 64 00 **04** 08 13 07 E3 0A 0B

00 FF FF 00 00 08 00 64 00 **07** 09 2E 09 C1 0B C6 

00 FF FF 00 00 08 00 64 00 04 **08** 13 07 E3 0A 07 

I have to read each packet byte by byte and store the node_id(the bold values) and X-value (which correcsponds to byte 9 and byte 10&11 respectively).

I need to write a code where,I need a detect change in x-value for a particular node_id and if that change matches my criteria ,I need to generate a sound.

I have two different node_id , 4 and 7 so I need to capture X-values of node 4 and node 7 seperately and detect a change in values .

  • What I have done: Read the packet and if node_id is 4 ,call thread1 and run . if node_id is 7,call thread2 and run.

The thread is processing all the packets one by one instead based on node_id's.

How to code in order process packets seperately using threads?

    import java.io.*; 
import java.nio.*;
import java.net.*; 
import org.jfugue.pattern.Pattern;
import org.jfugue.player.Player;
import java.lang.*;
import java.util.Arrays;
import java.lang.String;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

class RunnableDemo implements Runnable {
   public boolean running = false;
   private Thread t;
   private String threadName;
   private int x_val;
   private int nodeid;
   private static final int UNIQUE_INDEX_COUNT = 10;
   private static int[] xValChangeInfo = new int[UNIQUE_INDEX_COUNT];
   static int indexCount = 0;
   static int[] nodeidarr = new int[UNIQUE_INDEX_COUNT];
   static int nodearrCount =0;
   static int state=0;


   RunnableDemo( String name, int x_value,int node_id) 
   {
      threadName = name;
      this.x_val = x_value;
      nodeid=node_id;

   }

   public void run()
     {          
        System.out.println("Running Thread  = "+threadName);    
        //System.out.printf("***************************"); 
        System.out.println("x_val = "+x_val);
        System.out.printf("***************************");
        checkForUniqueValue(x_val, threadName);

     }

   public void start ()
   {
      //System.out.println("Starting " +  threadName );
      if (t == null) 
    {
           t = new Thread (this, threadName);
    //   System.out.println("Starting 1111" +  threadName );
           t.start ();
        }
   }

    public static int roundingOffXValue(int xValue)
    {
        int quotientx = 0;
        int dividend = xValue;
        int divisor = 10;
        for(int j=0; j<=1; j++)
        {       
            quotientx = dividend/divisor;
            dividend = quotientx * 10;
            divisor = divisor * 10;
        }
        quotientx = quotientx *100;
        //System.out.println("x:" +quotientx);
        return quotientx;
    }

    public static void checkBoundForTone(int[] xValChangeInfo, String TName)
    {
        // TODO Make sure not to modify xValChangeInfo
        Player player = new Player(); 
        Pattern pattern_xup = new Pattern("C");
        if(indexCount == UNIQUE_INDEX_COUNT)
        {
            resetArray(xValChangeInfo);
            return;
        }
        int a = 0;
        int b = 0;
        for(int i = 1; i < indexCount; i++)
        {
            if(xValChangeInfo[i-1] < xValChangeInfo[i])
            {
                a = a+Math.abs(xValChangeInfo[i-1] - xValChangeInfo[i]);
            }

            if (a > 300)
            {
                if(xValChangeInfo[i-1] > xValChangeInfo[i])
                {
                    b=b+Math.abs(xValChangeInfo[i-1] - xValChangeInfo[i]);

                }
                if(b>300)
                {
                    resetArray(xValChangeInfo);
                    System.out.println("node_id: " +TName);
                    //System.out.printf("%d   ",node_id);
                    System.out.println("");
                    System.out.printf("a : %d    b : %d",a, b);
                    System.out.println("");
                    player.play(pattern_xup);   
                }
            }

        }

    }

    public static void resetArray(int[] xValChangeInfo)
    {
        indexCount = 0;

    }
    public static void resetNodeArray(int[] xValChangeInfo)
    {
        nodearrCount = 0;

    }
    public static void checkForUniqueValue(int xValue, String TName)
    {
        int xValue_rounded = roundingOffXValue(xValue);

        if(indexCount == 0 && (xValChangeInfo[indexCount] != xValue_rounded))
        {
            xValChangeInfo[indexCount] = xValue_rounded;
            indexCount++;
        }
        else if (indexCount != 0 &&( xValChangeInfo[indexCount-1] != xValue_rounded))
        {
            xValChangeInfo[indexCount] = xValue_rounded;
            indexCount++;
            checkBoundForTone(xValChangeInfo, TName);
        }
        //System.out.println("checkForUniqueValue");
        for (int i = 0; i < indexCount; i++)
        {   
//          System.out.println("");
            System.out.println("...........");
            System.out.printf("%d   ",xValChangeInfo[i]); 
        }
    }
}



class ParseClient
{
    private static final int MIN_XVALUE = 1700;
    private static final int MAX_XVALUE = 2400;
    private static final int UNIQUE_INDEX_COUNT = 10;
    //static int[] xValChangeInfo = new int[UNIQUE_INDEX_COUNT];
    //static int indexCount = 0;
    static int[] nodeidarr = new int[UNIQUE_INDEX_COUNT];
    static int nodearrCount =0;
    static int node_id;
    public static void main(String args[]) throws Exception 
    {
        Socket clientSocket = new Socket("sowmya-Lenovo-ideapad-500-15ACZ",6789);
        DataOutputStream outToServer = new DataOutputStream(clientSocket.getOutputStream()); 
        DataInputStream dataInput = new DataInputStream(clientSocket.getInputStream());
        //RunnableDemo R1 = new RunnableDemo( "Thread-1");
        while(true)
        {
            readPacket(dataInput);

        }
    }


    public static void checkforuniquenodes(int nodeid,int xvalue)
    {
        int x_val=xvalue;
        if(nodearrCount ==0 )
        {
             nodeidarr[nodearrCount]=nodeid;
             nodearrCount++;
             RunnableDemo R1 = new RunnableDemo( "R"+nodeid, xvalue,nodeid);
             R1.start();
        }
        else 
        {
            for (int i=0; i<nodearrCount; i++) 
            {
                if (nodeidarr[i] == nodeid)
                {
                  RunnableDemo R1 = new RunnableDemo( "R"+nodeid, xvalue, nodeid);
                          R1.start();
                }
                else
                {
                 nodeidarr[i+1]=nodeid;
                 RunnableDemo R2 = new RunnableDemo("R"+nodeid, xvalue,nodeid);  
                 R2.start();
                }
            }

        }    

      }

    public static void readPacket(DataInputStream dataInput)
    {
        Byte pkt_byte=0;
        int t = 0;
        int X1 = 0, X2 = 0;
        int x_val = 0;
        for (int byte_index = 0; byte_index <= 15; byte_index++)    
        {
            try
            {
                pkt_byte = dataInput.readByte();
            }
            catch (IOException e) 
            {
                // Do something here    
            }
            if(pkt_byte < 0)
            {
                t = pkt_byte & 0xFF;
            }
            else 
            {
                t = pkt_byte.intValue();
            }
            if(byte_index == 9)
            {
                node_id = t;
            }

            if(byte_index == 10)
            {
                X1 = t << 8;
            }
            else if (byte_index == 11)
            {
                X2 = t;
                x_val = X1+X2;
            }
        }

        checkforuniquenodes(node_id, x_val);

    }
}

Solution

  • I think you need some modules( you may have some of them already ).

    1. Reading packet
    2. Detecting node id( with finding patterns ) and sending the packet to a proper thread.
    3. Processing with the packet delivered

    In conclusion, you need three threads.

    One, which I call it Main thread, manages 1, 2 in the list above, the other two deal with packet which corresponds to the node id.

    So you need two different type of threads. Main thread and processing thread.