Search code examples
javamulticastjgroups

How to multicast large files in jgroups


Let's say I have a relatively big file (about 100MB) that I want to multicast to all members of a cluster. How can I send the file in chunks using jgroups(with preferably a code demonstration)? The file should be read in chunks at the receivers side. Also how can I ensure the sequence order of the chunks is maintained at the receivers side.

EDIT 1 Here is what I have tried so far. I just send the file as a whole and write its contents at the receivers' side to temporary file

    public class SimpleFileTransfer extends ReceiverAdapter {

    JChannel channel;

    private void start() throws Exception{
        channel = new JChannel();
        channel.setReceiver(this);
        channel.connect("FileCluster");
//        channel.getState(null, 10000);
        File file = new File("/res/test.txt"); //the file to be sent
        eventLoop(file);
        channel.close();
    }

    private void eventLoop(File file) throws IOException{
        BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
        try {
            Message msg = new Message(null, null, in);
            channel.send(msg);
        }
        catch (Exception e){
            e.printStackTrace();
        }
    }


    public void receive(Message msg)
    {
        try {
            File temp = new File("/res/temp.txt");
            FileWriter writer = new FileWriter(temp);
            InputStream in = new ByteArrayInputStream(msg.getBuffer());
            int next = in.read();
            while (next != -1){
                writer.write(next);
                next = in.read();
            }
        }
        catch (IOException ie)
        {
            ie.printStackTrace();
        }


    }

}

Solution

  • Below's the better version, which chunks up large files into chunks of 8K. A file X is written to /tmp/X. Note that the /home/bela/fast.xml config has to be changed, of course:

    public class SimpleFileTransfer extends ReceiverAdapter {
    protected String   filename;
    protected JChannel channel;
    protected Map<String,OutputStream> files=new ConcurrentHashMap<>();
    protected static final short ID=3500;
    
    private void start(String name, String filename) throws Exception {
        ClassConfigurator.add((short)3500, FileHeader.class);
        this.filename=filename;
        channel=new JChannel("/home/bela/fast.xml").name(name);
        channel.setReceiver(this);
        channel.connect("FileCluster");
        eventLoop();
    }
    
    private void eventLoop() throws Exception {
        while(true) {
            Util.keyPress(String.format("<enter to send %s>\n", filename));
            sendFile();
        }
    }
    
    protected void sendFile() throws Exception {
        FileInputStream in=new FileInputStream(filename);
        try {
            for(;;) {
                byte[] buf=new byte[8096];
                int bytes=in.read(buf);
                if(bytes == -1)
                    break;
                sendMessage(buf, 0, bytes, false);
            }
        }
        catch(Exception e) {
            e.printStackTrace();
        }
        finally {
            sendMessage(null, 0, 0, true);
        }
    }
    
    
    public void receive(Message msg) {
        byte[] buf=msg.getRawBuffer();
        FileHeader hdr=(FileHeader)msg.getHeader(ID);
        if(hdr == null)
            return;
        OutputStream out=files.get(hdr.filename);
        try {
            if(out == null) {
                File tmp=new File(hdr.filename);
                String fname=tmp.getName();
                fname="/tmp/" + fname;
                out=new FileOutputStream(fname);
                files.put(hdr.filename, out);
            }
            if(hdr.eof) {
                Util.close(files.remove(hdr.filename));
            }
            else {
                out.write(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
            }
        }
        catch(Throwable t) {
            System.err.println(t);
        }
    }
    
    
    protected void sendMessage(byte[] buf, int offset, int length, boolean eof) throws Exception {
        Message msg=new Message(null, buf, offset, length).putHeader(ID, new FileHeader(filename, eof));
        // set this if the sender doesn't want to receive the file
        // msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
        channel.send(msg);
    }
    
    protected static class FileHeader extends Header {
        protected String  filename;
        protected boolean eof;
    
        public FileHeader() {} // for de-serialization
    
        public FileHeader(String filename, boolean eof) {
            this.filename=filename;
            this.eof=eof;
        }
    
        public int size() {
            return Util.size(filename) + Global.BYTE_SIZE;
        }
    
        public void writeTo(DataOutput out) throws Exception {
            Util.writeObject(filename, out);
            out.writeBoolean(eof);
        }
    
        public void readFrom(DataInput in) throws Exception {
            filename=(String)Util.readObject(in);
            eof=in.readBoolean();
        }
    }
    
    public static void main(String[] args) throws Exception {
        if(args.length != 2) {
            System.out.printf("%s <name> <filename>\n", SimpleFileTransfer.class.getSimpleName());
            return;
        }
        new SimpleFileTransfer().start(args[0], args[1]); // name and file
    }
    

    }