Search code examples
pysparkibm-cloudcloudantapache-spark-sql

Extract value from cloudant IBM Bluemix NoSQL Database


How to Extract value from Cloudant IBM Bluemix NoSQL Database stored in JSON format?

I tried this code

def readDataFrameFromCloudant(host,user,pw,database):
   cloudantdata=spark.read.format("com.cloudant.spark"). \
      option("cloudant.host",host). \
      option("cloudant.username", user). \
      option("cloudant.password", pw). \
      load(database)

cloudantdata.createOrReplaceTempView("washing")
spark.sql("SELECT * from washing").show()
return cloudantdata

hostname = ""
user = ""
pw = ""
database = "database"
cloudantdata=readDataFrameFromCloudant(hostname, user, pw, database)

It is stored in this format

{
  "_id": "31c24a382f3e4d333421fc89ada5361e",
  "_rev": "1-8ba1be454fed5b48fa493e9fe97bedae",
  "d": {
    "count": 9,
    "hardness": 72,
    "temperature": 85,
    "flowrate": 11,
    "fluidlevel": "acceptable",
    "ts": 1502677759234
  }
}

I want this result

Expected

Expected

Actual Outcome

Actual Outcome


Solution

  • Create a dummy dataset for reproducing the issue:

    cloudantdata = spark.read.json(sc.parallelize(["""
    {
      "_id": "31c24a382f3e4d333421fc89ada5361e",
      "_rev": "1-8ba1be454fed5b48fa493e9fe97bedae",
      "d": {
        "count": 9,
        "hardness": 72,
        "temperature": 85,
        "flowrate": 11,
        "fluidlevel": "acceptable",
        "ts": 1502677759234
      }
    }
    """]))
    cloudantdata.take(1)
    

    Returns:

    [Row(_id='31c24a382f3e4d333421fc89ada5361e', _rev='1-8ba1be454fed5b48fa493e9fe97bedae', d=Row(count=9, flowrate=11, fluidlevel='acceptable', hardness=72, temperature=85, ts=1502677759234))]
    

    Now flatten:

    flat_df = cloudantdata.select("_id", "_rev", "d.*")
    flat_df.take(1)
    

    Returns:

    [Row(_id='31c24a382f3e4d333421fc89ada5361e', _rev='1-8ba1be454fed5b48fa493e9fe97bedae', count=9, flowrate=11, fluidlevel='acceptable', hardness=72, temperature=85, ts=1502677759234)]
    

    I tested this code with an IBM Data Science Experience notebook using Python 3.5 (Experimental) with Spark 2.0

    This answer is based on: https://stackoverflow.com/a/45694796/1033422