Search code examples
apache-spark.net-spark

Load fixed position file with multiple sections using .net-spark


I'm trying to load a fixed-position file with multiple sections in spark using .net-spark. Here is an example of the file:

01Nikola Tesla                  [email protected]                                                       +5521981181569
02Creations                                       
03Alternating current                              
03Tesla coil                                 
01Thomas Edison                 [email protected]                                                      +5521981181569
02Creations                                        
03Lamp                                         
03Phonograph                                      
03General Eletric                                 
03Cinema 

So basically we have a header with the transactions owner, a sub-header stating that below are the transactions and then finally the transactions section. The transactions lines does not contains any reference to the owner, so yeah, it is tricky.

As suggested by @EdElliott, here is how we should see data in RDD (showing only the first line, but the purpose is to read all content):

inventor email phone creations
Nikola Tesla [email protected] +5511999999999 Alternating current

I guess this not very usual file format nowadays, but still pretty common in big Brazilian banks.

Found this example for java, but it does not handle the sections part. I believe I could achieve this using UDF, but again, I don't know where to start. Appreciate any piece of insight here guys.

Thanks


Solution

  • I got through the other side. Not proud of the solution, mainly because I'm using ToLocalIterator() and the way I'm passing argument to normalizedTransaction UDF. Thanks @EdElliott, your blog helped me a lot.

    Anyway, here it goes:

    static Func<Column, Column> OnlyHeaders = Udf<string, bool>(
            line =>  line.Substring(0,2).Equals("01")
        );
    
        static Func<Column, Column> OnlyTransactions = Udf<string, bool>(
            line =>  line.Substring(0,2).Equals("03")
        );
    
        static Func<Column, Column> breakHeader =
            Udf<string, string[]>((line) => GetHeader(line) );
    
        static Func<Column, Column> breakTransaction =
            Udf<string, string[]>((line) => GetTransaction(line) );
    
        static void Main(string[] args)
        {
            // Create a Spark session
            var spark = SparkSession
                .Builder()
                .AppName("FixedLenghtWithSectionsApp")
                .GetOrCreate();
    
            // Create initial DataFrame
            var rawDf = spark.Read().Schema("rawLine STRING").Text("resources/input.txt");
            rawDf = rawDf.WithColumn("rowNumber", Functions.MonotonicallyIncreasingId());
            rawDf.CreateOrReplaceTempView ("rawdata");
    
            var headersDf = GetHeadersDf(rawDf);
            headersDf.CreateOrReplaceTempView("headers");
            headersDf.Show();
    
            var transactionsDf = GetTransactionsDf(rawDf);
            transactionsDf.CreateOrReplaceTempView("transactions");
            transactionsDf.Show();
            var headerLines = headersDf.ToLocalIterator().ToList().Select( r => r.Get("rowNumber"));
            var columns = new StringBuilder();
            foreach(var h in headerLines)
                columns.Append($"{h.ToString()},");
            
            
            var column = Functions.Lit(columns.ToString()).Alias("ids");
    
            Func<Column, Column, Column> normalizedTransaction = Udf<string, string,int>( (line, hLines) => {
    
                    var ids = hLines.Split(",", StringSplitOptions.RemoveEmptyEntries).Select(s => int.Parse(s));
                    var id = ids.Where(h => h < int.Parse(line) );
                    if (id.Any())
                        return id.Max();
                    return -1;
                    
                } );
    
            
            var inventionsDf = transactionsDf.Alias("one")
               .Select(
                   Functions.Col("one.rowNumber"), 
                   Functions.Col("invention"), 
                   normalizedTransaction(Functions.Col("one.rowNumber").Cast("string") , column ).Alias("id")
                );
    
            inventionsDf = inventionsDf.Alias("one")
                .Join(
                    headersDf.Alias("two")
                    ,Functions.Col("one.id") == Functions.Col("two.rowNumber")
                    ,"inner"
                );
    
            inventionsDf.Show();
            
            spark.Stop();
        }
    
    
    
        private static DataFrame GetTransactionsDf(DataFrame rawDf)
        {
            var transactionsRawsDf = rawDf
                .Select(rawDf["rowNumber"],breakTransaction(rawDf["rawLine"]).Alias("value"))
                .Where(OnlyTransactions(rawDf["rawLine"]));
    
            return transactionsRawsDf.Select(
                transactionsRawsDf["rowNumber"], 
                transactionsRawsDf.Col("value").GetItem(0))
                .ToDF("rowNumber", "invention");
            
        }
    
        private static DataFrame GetHeadersDf(DataFrame rawDf)
        {
            var headerRawsDf = rawDf
                .Select(rawDf["rowNumber"],breakHeader(rawDf["rawLine"]).Alias("value"))
                .Where(OnlyHeaders(rawDf["rawLine"]));
            
            
            return headerRawsDf.Select(
                headerRawsDf["rowNumber"], 
                headerRawsDf.Col("value").GetItem(0), 
                headerRawsDf.Col("value").GetItem(1),
                headerRawsDf.Col("value").GetItem(2))
                .ToDF("rowNumber", "inventor", "email", "phone");
        }
    
        private static string[] GetHeader(string line)
        {
    
            var columns = new List<string>();
                columns.Add(line.Substring(2,30));
                columns.Add(line.Substring(32,70));
                columns.Add(line.Substring(102,14));
                return columns.ToArray();
        }
    
         private static string[] GetTransaction(string line)
        {
            var columns = new List<string>();
                columns.Add(line.Substring(2,48));
                return columns.ToArray();
        }