Search code examples
scalaapache-sparkgraphframes

How to import to GraphFrame the text time follow structure


I have a file follow structure.Where first column it's nodeID. After ":" it's a node which has a connection with nodeID. Each nodeID can have more than one connection.

0: 5305811,
1: 4798401,
2: 7922543,
3: 7195074,
4: 6399935,
5: 5697217,
6: 5357407,
7: 4798401,
8: 629131,5330605,6481451,6280292,6909396,7325128,
...

How to apply transformations to import to GraphFrame?


Solution

  • import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.LongType
    import org.graphframes.GraphFrame
    import scala.util.Try
    
    
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("test")
      .getOrCreate()
    
    spark.sparkContext.setCheckpointDir(spark.conf.getOption(s"spark.checkpointdir").getOrElse("/tmp"))
    
    import spark.implicits._
    
    def cleanIds = udf((ids: Seq[String]) => ids.flatMap(x => Try(x.trim.toLong).toOption))
    
    val ds = spark
      .read
      .option("mode", "PERMISSIVE")
      .option("header", "false")
      .option("delimiter", ":")
      .csv("src/main/resources/connections.txt")
      .toDF("id", "links")
      .select(
        'id.cast(LongType),
        cleanIds(split(trim('links), ",")).as("links"))
      .cache()
    
    
    val vertices = ds.select('id).distinct()
    
    val edges = ds.select(
      'id.as("src"),
      explode('links).as("dst")
    )
    
    val graphFrame = GraphFrame(vertices, edges)
    
    val connectedComponents = graphFrame.connectedComponents.run()
    
    connectedComponents
      .groupBy('component).agg(
      collect_list(struct('id)).as("vertices")
    ).show(false)
    

    Given input like this:

    0: 5,6,
    1: 4,
    2: 3,4,5,
    3: 2,
    4: 2,1,
    5: 2,0,
    6: 0,7,
    10: 11,13,
    11: 12,14,
    12: 13,14,
    13: 10,12,
    14: 11,12,
    

    This will create a vertex dataframe looking like this:

    +---+
    | id|
    +---+
    |  0|
    |  6|
    |  5|
    |  1|
    | 10|
    |  3|
    | 12|
    | 11|
    |  2|
    |  4|
    | 13|
    | 14|
    +---+
    

    and edges like this:

    +---+---+
    |src|dst|
    +---+---+
    |  0|  5|
    |  0|  6|
    |  1|  4|
    |  2|  3|
    |  2|  4|
    |  2|  5|
    |  3|  2|
    |  4|  2|
    |  4|  1|
    |  5|  2|
    |  5|  0|
    |  6|  0|
    |  6|  7|
    | 10| 11|
    | 10| 13|
    | 11| 12|
    | 11| 14|
    | 12| 13|
    | 12| 14|
    | 13| 10|
    +---+---+
    

    and connected components like this:

    +---------+-----------------------------------+
    |component|vertices                           |
    +---------+-----------------------------------+
    |0        |[[0], [6], [5], [1], [3], [2], [4]]|
    |10       |[[10], [12], [11], [13], [14]]     |
    +---------+-----------------------------------+