I'm trying to load a fixed-position file with multiple sections in spark using .net-spark. Here is an example of the file:
01Nikola Tesla tesla@gmail.com +5521981181569
02Creations
03Alternating current
03Tesla coil
01Thomas Edison edison@gmail.com +5521981181569
02Creations
03Lamp
03Phonograph
03General Eletric
03Cinema
So basically we have a header with the transactions owner, a sub-header stating that below are the transactions and then finally the transactions section. The transactions lines does not contains any reference to the owner, so yeah, it is tricky.
As suggested by @EdElliott, here is how we should see data in RDD (showing only the first line, but the purpose is to read all content):
inventor | phone | creations | |
---|---|---|---|
Nikola Tesla | tesla@gmail.com | +5511999999999 | Alternating current |
I guess this not very usual file format nowadays, but still pretty common in big Brazilian banks.
Found this example for java, but it does not handle the sections part. I believe I could achieve this using UDF, but again, I don't know where to start. Appreciate any piece of insight here guys.
Thanks
I got through the other side. Not proud of the solution, mainly because I'm using ToLocalIterator() and the way I'm passing argument to normalizedTransaction UDF. Thanks @EdElliott, your blog helped me a lot.
Anyway, here it goes:
static Func<Column, Column> OnlyHeaders = Udf<string, bool>(
line => line.Substring(0,2).Equals("01")
);
static Func<Column, Column> OnlyTransactions = Udf<string, bool>(
line => line.Substring(0,2).Equals("03")
);
static Func<Column, Column> breakHeader =
Udf<string, string[]>((line) => GetHeader(line) );
static Func<Column, Column> breakTransaction =
Udf<string, string[]>((line) => GetTransaction(line) );
static void Main(string[] args)
{
// Create a Spark session
var spark = SparkSession
.Builder()
.AppName("FixedLenghtWithSectionsApp")
.GetOrCreate();
// Create initial DataFrame
var rawDf = spark.Read().Schema("rawLine STRING").Text("resources/input.txt");
rawDf = rawDf.WithColumn("rowNumber", Functions.MonotonicallyIncreasingId());
rawDf.CreateOrReplaceTempView ("rawdata");
var headersDf = GetHeadersDf(rawDf);
headersDf.CreateOrReplaceTempView("headers");
headersDf.Show();
var transactionsDf = GetTransactionsDf(rawDf);
transactionsDf.CreateOrReplaceTempView("transactions");
transactionsDf.Show();
var headerLines = headersDf.ToLocalIterator().ToList().Select( r => r.Get("rowNumber"));
var columns = new StringBuilder();
foreach(var h in headerLines)
columns.Append($"{h.ToString()},");
var column = Functions.Lit(columns.ToString()).Alias("ids");
Func<Column, Column, Column> normalizedTransaction = Udf<string, string,int>( (line, hLines) => {
var ids = hLines.Split(",", StringSplitOptions.RemoveEmptyEntries).Select(s => int.Parse(s));
var id = ids.Where(h => h < int.Parse(line) );
if (id.Any())
return id.Max();
return -1;
} );
var inventionsDf = transactionsDf.Alias("one")
.Select(
Functions.Col("one.rowNumber"),
Functions.Col("invention"),
normalizedTransaction(Functions.Col("one.rowNumber").Cast("string") , column ).Alias("id")
);
inventionsDf = inventionsDf.Alias("one")
.Join(
headersDf.Alias("two")
,Functions.Col("one.id") == Functions.Col("two.rowNumber")
,"inner"
);
inventionsDf.Show();
spark.Stop();
}
private static DataFrame GetTransactionsDf(DataFrame rawDf)
{
var transactionsRawsDf = rawDf
.Select(rawDf["rowNumber"],breakTransaction(rawDf["rawLine"]).Alias("value"))
.Where(OnlyTransactions(rawDf["rawLine"]));
return transactionsRawsDf.Select(
transactionsRawsDf["rowNumber"],
transactionsRawsDf.Col("value").GetItem(0))
.ToDF("rowNumber", "invention");
}
private static DataFrame GetHeadersDf(DataFrame rawDf)
{
var headerRawsDf = rawDf
.Select(rawDf["rowNumber"],breakHeader(rawDf["rawLine"]).Alias("value"))
.Where(OnlyHeaders(rawDf["rawLine"]));
return headerRawsDf.Select(
headerRawsDf["rowNumber"],
headerRawsDf.Col("value").GetItem(0),
headerRawsDf.Col("value").GetItem(1),
headerRawsDf.Col("value").GetItem(2))
.ToDF("rowNumber", "inventor", "email", "phone");
}
private static string[] GetHeader(string line)
{
var columns = new List<string>();
columns.Add(line.Substring(2,30));
columns.Add(line.Substring(32,70));
columns.Add(line.Substring(102,14));
return columns.ToArray();
}
private static string[] GetTransaction(string line)
{
var columns = new List<string>();
columns.Add(line.Substring(2,48));
return columns.ToArray();
}