Search code examples
data-ingestionibm-streams

Working with large files with a particular extension using directory scan operator


I have a 1GB+ size file coming to my directory from a MQ, this takes some time to completely transfer the file, but a file will be produced in that directory even if it is not a complete one. I am afraid my directoryScan operator will pick up an incomplete file. Also, I cannot add an initial delay because I am not sure how much time will it take to transfer the file.

PS: I read somewhere that some of the file transfer protocols take care of this by adding a different extension to the file until it is complete. So say my directoryScan operator is waiting for any file with .txt extension, so this file transfer protocol will create a file with extension .abc until the transfer is complete.

How should I go ahead with this?


Solution

  • If you were going to use the regular expression route, here is an example of invoking the operator to only read files of a certain extension:

       // DirectoryScan operator with an absolute file argument and a file name pattern         
    stream<rstring name> Dir2 = DirectoryScan()                                        
    {                                                                                        
      param                                                                                  
        directory : "/tmp/work";                                                             
       pattern : "\\.txt$";                                                                 
    }   
    

    If that doesn't work, is it possible to set up MQ to write the file to a different directory and then move it into your target directory when it is complete?

    One thing you could do, if you know the size of the file, is use the Size() function to ignore the file until it is the right size. This snippet uses a Custom operator to wait until the file size is at least 2000 bytes.

    graph
            stream <rstring filename, uint64 size> DirScanOutput = DirectoryScan() {
                param
                    directory: "test1";
                    sleepTime: 10.0; //wait 10s between scans
                    pattern: ".*\\.txt";
    
              output DirScanOutput : size= Size();
            } 
    
    
            stream<rstring file> FileNameStream= Custom(DirScanOutput as In){
                logic
                    onTuple In:{
                        if (size < 2000ul){
                            printStringLn("Required size not met yet.");
                        } else {
                            printStringLn("Size of file reached.");
                            submit({file=filename}, FileNameStream);
    
                        }
                    }
            }
    
            stream <cityData> CityDataRecord = FileSource(FileNameStream) {
                param
                    format: csv;
            } 
    

    I hope one of these suggestions works for you.