Search code examples
javaiofileinputstreamfileoutputstreamjcifs

File upload on primary location and same time reading & writing same file to multiple secondary locations


i need to achieve the task as below:-

1). File upload on primary location:-

I want to read from a file and write it to primary location(remote file server).

2). File upload on multiple secondary locations:-

Same time while writing to primary location is running, parallelly I want to read some chunks of bytes from primary location file and writing it to multiple secondary location.

I have tried a below program for above approach:-

BufferedInputStream bin = null;
        ReadableByteChannel channel = null;
        int bufferSize = 1048576;
        int readBufferSize = 1024*4;
        java.nio.ByteBuffer byteBuffer = java.nio.ByteBuffer.allocate(readBufferSize);
        InputStream is = new FileInputStream(new File("D:\\Harisingh\\300MB.txt"));

        bin = new BufferedInputStream(is,bufferSize);
        channel = Channels.newChannel(bin);
        int retryCnt = 0;
        ByteArrayOutputStream baOS = new ByteArrayOutputStream(bufferSize);
        int totalBytes=0;
        int itrCount=0;
        int maxIterateCnt = 1;
        int len;
        //primary location writing
        SmbFile smbFile = new SmbFile("smb://user:[email protected]/data/Harisingh/collab_4_1_4/primary.txt");
        BufferedOutputStream bFout = new BufferedOutputStream(new SmbFileOutputStream(smbFile));

        SmbFileInputStream fis = new SmbFileInputStream("smb://user:[email protected]/data/Harisingh/collab_4_1_4/primary.txt");
        BufferedInputStream binPrimary = new BufferedInputStream(fis);

        SmbFileOutputStream secLocation1= new SmbFileOutputStream(new SmbFile("smb://user:[email protected]/data/Harisingh/collab_4_1_4/Secondary1.txt"));
        SmbFileOutputStream secLocation2 = new SmbFileOutputStream(new SmbFile("smb://user:[email protected]/data/Harisingh/collab_4_1_4/Secondary2.txt"));
        SmbFileOutputStream secLocation3 = new SmbFileOutputStream(new SmbFile("smb://user:[email protected]/data/Harisingh/Secondary/Secondary3.txt"));
        try {
            if(bufferSize > readBufferSize){
                maxIterateCnt = bufferSize/readBufferSize;
            }
            while((len=channel.read(byteBuffer))>=0) 
            {
                itrCount++;
                totalBytes+=len;
                baOS.write(byteBuffer.array(),0,len);
                if(itrCount>=maxIterateCnt)
                {
                    //primary location writing
                    try{
                        bFout.write(baOS.toByteArray(),0,totalBytes);
                    }catch(Exception se)
                    {
                    }

                    // secondary location writing
                    new Thread(){
                           public void run(){
                              System.out.println("Thread Running");
                              try {
                                int count;
                                byte[] readByteArray = new byte[1024*4];
                                while ((count = binPrimary.read(readByteArray)) != -1)
                                    {
                                        secLocation1.write(readByteArray, 0, count);
                                        secLocation2.write(readByteArray, 0, count);
                                        secLocation3.write(readByteArray, 0, count);
                                        readByteArray = new byte[1024*4];
                                        count= 0;
                                    }
                            } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                          }
                      }.start();
                    totalBytes=0;
                    baOS.reset();
                    itrCount=0;
                }
                byteBuffer.clear();
            }

            //primary location writing
            try{
                bFout.write(baOS.toByteArray(),0,totalBytes);
            }catch(Exception se)
            {
            }
            bFout.flush();
            bFout.close();
            int count;
            // secondary location writing
            new Thread(){
                public void run(){
                  System.out.println("Thread Running");
                  try {
                    int count;
                    byte[] readByteArray = new byte[1024*4];
                    while ((count = binPrimary.read(readByteArray)) != -1)
                    {
                            secLocation1.write(readByteArray, 0, count);
                            secLocation2.write(readByteArray, 0, count);
                            secLocation3.write(readByteArray, 0, count);
                            readByteArray = new byte[1024*4];
                            count= 0;
                    }
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                }
              }.start();

Now with above program, it writes a file to primary location by main thread and secondary location writing is running in separate thread but i am facing the problem of some bytes writing missing on some secondary locations due to multi threading.

FYI

This question is related to io stream only. It is not specific to JCIFS so you can use same program with simple io stream, don't require smb io stream. Can you please help me to sort out this?


Solution

  • Here is an example which I do not encourage to use "as is" - it's intention is to act as Proof Of Concept. In example the primary process is done first to achieve best performance for this phase. Then the secondaries are done each in own Thread parallel.

    import java.io.BufferedInputStream;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.net.MalformedURLException;
    import java.net.UnknownHostException;
    
    import jcifs.smb.NtlmPasswordAuthentication;
    import jcifs.smb.SmbException;
    import jcifs.smb.SmbFile;
    import jcifs.smb.SmbFileInputStream;
    import jcifs.smb.SmbFileOutputStream;
    
    public class testSmb {
    
        static boolean append = true;
        static int threadCount = 0;
    
        static int bufferSize = 2048;
    
        static NtlmPasswordAuthentication auth;
    
        static File localFile;
    
        static SmbFile primarySmbFile;
        static BufferedInputStream input;
        static SmbFileOutputStream output;
    
        static SmbFile secondary1SmbFile;
        static BufferedInputStream sec1Input;
        static SmbFileOutputStream sec1Output;
    
        static SmbFile secondary2SmbFile;
        static BufferedInputStream sec2Input;
        static SmbFileOutputStream sec2Output;
    
        static SmbFile secondary3SmbFile;
        static BufferedInputStream sec3Input;
        static SmbFileOutputStream sec3Output;
    
        public static Object lock = new Object();
    
        public static void main(String... args) throws IOException {
            System.out.println("Main thread Started");
            init();
            write(input, output);
            writeInThread(sec1Input, sec1Output);
            writeInThread(sec2Input, sec2Output);
            writeInThread(sec3Input, sec3Output);
    
            System.out.println("Main thread Finished");
        }
    
        public static void init() throws MalformedURLException,
                FileNotFoundException, SmbException, UnknownHostException {
    
            localFile = new File("c:\\temp\\myFile.txt");
            if (localFile.length() > 20971520l) {
                bufferSize = 131072;
            }
    
            String server = "myServer";
            String username = "myUser";
            String password = "myPass";
            String path = "myPath";
            auth = new NtlmPasswordAuthentication(server, username, password);
    
            input = new BufferedInputStream(new FileInputStream(localFile));
            primarySmbFile = new SmbFile("smb://" + server + "/" + path
                    + "/primary.txt", auth, SmbFile.FILE_SHARE_READ
                    | SmbFile.FILE_SHARE_WRITE | SmbFile.FILE_SHARE_DELETE);
            output = new SmbFileOutputStream(primarySmbFile, append);
            if (!primarySmbFile.exists()) {
                primarySmbFile.createNewFile();
            }
    
            sec1Input = new BufferedInputStream(new SmbFileInputStream(new SmbFile(
                    primarySmbFile, primarySmbFile.getName())));
            secondary1SmbFile = new SmbFile("smb://" + server + "/" + path
                    + "/secondary1.txt", auth, SmbFile.FILE_SHARE_READ
                    | SmbFile.FILE_SHARE_WRITE | SmbFile.FILE_SHARE_DELETE);
            sec1Output = new SmbFileOutputStream(secondary1SmbFile, append);
            if (!secondary1SmbFile.exists()) {
                secondary1SmbFile.createNewFile();
            }
    
            sec2Input = new BufferedInputStream(new SmbFileInputStream(new SmbFile(
                    primarySmbFile, primarySmbFile.getName())));
            secondary2SmbFile = new SmbFile("smb://" + server + "/" + path
                    + "/secondary2.txt", auth, SmbFile.FILE_SHARE_READ
                    | SmbFile.FILE_SHARE_WRITE | SmbFile.FILE_SHARE_DELETE);
            sec2Output = new SmbFileOutputStream(secondary2SmbFile, append);
            if (!secondary2SmbFile.exists()) {
                secondary2SmbFile.createNewFile();
            }
    
            sec3Input = new BufferedInputStream(new SmbFileInputStream(new SmbFile(
                    primarySmbFile, primarySmbFile.getName())));
            secondary3SmbFile = new SmbFile("smb://" + server + "/" + path
                    + "/secondary3.txt", auth, SmbFile.FILE_SHARE_READ
                    | SmbFile.FILE_SHARE_WRITE | SmbFile.FILE_SHARE_DELETE);
            sec3Output = new SmbFileOutputStream(secondary3SmbFile, append);
            if (!secondary3SmbFile.exists()) {
                secondary3SmbFile.createNewFile();
            }
    
        }
    
        public static void write(BufferedInputStream bufferedInputStream,
                SmbFileOutputStream smbFileOutputStream) throws IOException {
    
            byte[] buffer = new byte[bufferSize];
            int len = 0;
    
            try {
    
                while ((len = bufferedInputStream.read(buffer)) > 0) {
                    synchronized (lock) {
                        System.out.println("'" + Thread.currentThread().getName()
                                + "' writing " + bufferSize + "bytes");
                        smbFileOutputStream.write(buffer, 0, len);
                        smbFileOutputStream.flush();
                    }
                }
    
            } catch (IOException e) {
                throw e;
            } finally {
                try {
                    bufferedInputStream.close();
                } catch (Exception e) {
                }
    
                try {
                    smbFileOutputStream.flush();
                    smbFileOutputStream.close();
                } catch (Exception e) {
                }
            }
    
        }
    
        public static void writeInThread(
                final BufferedInputStream bufferedInputStream,
                final SmbFileOutputStream smbFileOutputStream) {
            threadCount++;
    
            new Thread("Secondary thread " + threadCount) {
                public void run() {
                    System.out.println(Thread.currentThread().getName()
                            + ": started");
                    try {
                        write(bufferedInputStream, smbFileOutputStream);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()
                            + ": finished");
                }
            }.start();
    
        }
    }