Search code examples
scalarestapache-spark

Spark Send DataFrame as body of HTTP Post request


I have a data frame which I want to send it as the body of HTTP Post request, what's the best Sparky way to do it?
How can I control a number of HTTP requests? If the number of records gets bigger is there any way to split sending data frame into multiple HTTP Post call?

let's say my data frame is like this:

+--------------------------------------+------------+------------+------------------+
|               user_id                |    city    | user_name  |   facebook_id    |
+--------------------------------------+------------+------------+------------------+
| 55c3c59d-0163-46a2-b495-bc352a8de883 | Toronto    | username_x | 0123482174440907 |
| e2ddv22d-4132-c211-4425-9933aa8de454 | Washington | username_y | 0432982476780234 |
+--------------------------------------+------------+------------+------------------+

I want to have user_id and facebook_id in the body of HTTP Post request to this endpoint localhost:8080/api/spark


Solution

  • You can achieve this using foreachPartition method on a Dataframe. I am assuming here you want to make an Http Call for each row in the Dataframe in parallel. foreachPartition operates on each partition of the Dataframe in parallel. If you wanted to batch multiple rows together in a single HTTP post call that too is possible by changing the signature of the makeHttpCall method from Row to Iterator[Row]

      def test(): Unit = {
        val df: DataFrame = null
        df.foreachPartition(_.foreach(x => makeHttpCall(x)))
      }
    
      def makeHttpCall(row: Row) = {
        val json = Json.obj("user_name" -> row.getString(2), "facebook_id" -> row.getString(3))
        /**
          * code make Http call
          */
      }
    

    for making bulk Http request makeHttpCall. make sure you have sufficient number of partitions in the dataframe so that each partition is small enough to make your Http Post request.

    import org.apache.spark.sql.{DataFrame, Row}
    import play.api.libs.json.Json
    
      def test(): Unit = {
        val df: DataFrame = null
        df.foreachPartition(x => makeHttpCall(x))
      }
    
      def makeHttpCall(row: Iterator[Row]) = {
        val json = Json.arr(row.toSeq.map(x => Json.obj("user_name" -> x.getString(2), "facebook_id" -> x.getString(3))))
        /**
          * code make Http call
          */
      }