Let's say I have a relatively big file (about 100MB) that I want to multicast to all members of a cluster. How can I send the file in chunks using jgroups(with preferably a code demonstration)? The file should be read in chunks at the receivers side. Also how can I ensure the sequence order of the chunks is maintained at the receivers side.
EDIT 1 Here is what I have tried so far. I just send the file as a whole and write its contents at the receivers' side to temporary file
public class SimpleFileTransfer extends ReceiverAdapter {
JChannel channel;
private void start() throws Exception{
channel = new JChannel();
channel.setReceiver(this);
channel.connect("FileCluster");
// channel.getState(null, 10000);
File file = new File("/res/test.txt"); //the file to be sent
eventLoop(file);
channel.close();
}
private void eventLoop(File file) throws IOException{
BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
try {
Message msg = new Message(null, null, in);
channel.send(msg);
}
catch (Exception e){
e.printStackTrace();
}
}
public void receive(Message msg)
{
try {
File temp = new File("/res/temp.txt");
FileWriter writer = new FileWriter(temp);
InputStream in = new ByteArrayInputStream(msg.getBuffer());
int next = in.read();
while (next != -1){
writer.write(next);
next = in.read();
}
}
catch (IOException ie)
{
ie.printStackTrace();
}
}
}
Below's the better version, which chunks up large files into chunks of 8K. A file X is written to /tmp/X. Note that the /home/bela/fast.xml config has to be changed, of course:
public class SimpleFileTransfer extends ReceiverAdapter {
protected String filename;
protected JChannel channel;
protected Map<String,OutputStream> files=new ConcurrentHashMap<>();
protected static final short ID=3500;
private void start(String name, String filename) throws Exception {
ClassConfigurator.add((short)3500, FileHeader.class);
this.filename=filename;
channel=new JChannel("/home/bela/fast.xml").name(name);
channel.setReceiver(this);
channel.connect("FileCluster");
eventLoop();
}
private void eventLoop() throws Exception {
while(true) {
Util.keyPress(String.format("<enter to send %s>\n", filename));
sendFile();
}
}
protected void sendFile() throws Exception {
FileInputStream in=new FileInputStream(filename);
try {
for(;;) {
byte[] buf=new byte[8096];
int bytes=in.read(buf);
if(bytes == -1)
break;
sendMessage(buf, 0, bytes, false);
}
}
catch(Exception e) {
e.printStackTrace();
}
finally {
sendMessage(null, 0, 0, true);
}
}
public void receive(Message msg) {
byte[] buf=msg.getRawBuffer();
FileHeader hdr=(FileHeader)msg.getHeader(ID);
if(hdr == null)
return;
OutputStream out=files.get(hdr.filename);
try {
if(out == null) {
File tmp=new File(hdr.filename);
String fname=tmp.getName();
fname="/tmp/" + fname;
out=new FileOutputStream(fname);
files.put(hdr.filename, out);
}
if(hdr.eof) {
Util.close(files.remove(hdr.filename));
}
else {
out.write(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
}
}
catch(Throwable t) {
System.err.println(t);
}
}
protected void sendMessage(byte[] buf, int offset, int length, boolean eof) throws Exception {
Message msg=new Message(null, buf, offset, length).putHeader(ID, new FileHeader(filename, eof));
// set this if the sender doesn't want to receive the file
// msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
channel.send(msg);
}
protected static class FileHeader extends Header {
protected String filename;
protected boolean eof;
public FileHeader() {} // for de-serialization
public FileHeader(String filename, boolean eof) {
this.filename=filename;
this.eof=eof;
}
public int size() {
return Util.size(filename) + Global.BYTE_SIZE;
}
public void writeTo(DataOutput out) throws Exception {
Util.writeObject(filename, out);
out.writeBoolean(eof);
}
public void readFrom(DataInput in) throws Exception {
filename=(String)Util.readObject(in);
eof=in.readBoolean();
}
}
public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.out.printf("%s <name> <filename>\n", SimpleFileTransfer.class.getSimpleName());
return;
}
new SimpleFileTransfer().start(args[0], args[1]); // name and file
}
}