Search code examples
apache-sparkpysparkapache-spark-sqlrdd

pyspark.createDataFrame(rdd, schema) returns just null values


My RDD (From ElasticSearch) looks like this.

[
    ('rty456ui', {'@timestamp': '2022-10-10T24:56:10.000259+0000', 'host': {'id': 'test-host-id-1'}, 'watchlists': {'ioc': {'summary': '127.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '127.0.0.1'}, 'event': {'created': '2022-10-10T13:56:10+00:00', 'id': 'rty456ui'}, 'tags': ('Mon',)}),
    ('cxs980qw', {'@timestamp': '2022-10-10T13:56:10.000259+0000', 'host': {'id': 'test-host-id-2'}, 'watchlists': {'ioc': {'summary': '0.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '0.0.0.1'}, 'event': {'created': '2022-10-10T24:56:10+00:00', 'id': 'cxs980qw'}, 'tags': ('Mon', 'Tue')})
]

(What I find interesting is Lists in ES are converted to Tuples in RDD)

I am trying to convert it into something like this.

+---------------+-----------+-----------+---------------------------+-----------------------+-----------------------+---------------+
|host.id        |event.id   |source.ip  |event.created              |watchlists.ioc.summary |watchlists.ioc.tags    |tags           |
+---------------+-----------+-----------+---------------------------+-----------------------+-----------------------+---------------+
|test-host-id-1 |rty456ui   |127.0.0.1  |2022-10-10T13:56:10+00:00  |127.0.0.1              |[Dummy Tag]            |[Mon]          |
|test-host-id-2 |cxs980qw   |0.0.0.1    |2022-10-10T24:56:10+00:00  |127.0.0.1              |[Dummy Tag]            |[Mon, Tue]     |
+---------------+-----------+-----------+---------------------------+-----------------------+-----------------------+---------------+

However, getting this.

+-------+--------+---------+-------------+----------------------+-------------------+-------------------------------+
|host.id|event.id|source.ip|event.created|watchlists.ioc.summary|watchlists.ioc.tags|tags                           |
+-------+--------+---------+-------------+----------------------+-------------------+-------------------------------+
|null   |null    |null     |null         |null                  |null               |[Ljava.lang.Object;@6c704e6e   |
|null   |null    |null     |null         |null                  |null               |[Ljava.lang.Object;@701ea4c8   |
+-------+--------+---------+-------------+----------------------+-------------------+-------------------------------+

Code

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([ 
    StructField("host.id",StringType(), True), 
    StructField("event.id",StringType(), True), 
    StructField("source.ip",StringType(), True), 
    StructField("event.created", StringType(), True), 
    StructField("watchlists.ioc.summary", StringType(), True), 
    StructField("watchlists.ioc.tags", StringType(), True),
    StructField("tags", StringType(), True)
  ])


df = spark.createDataFrame(es_rdd.map(lambda x: x[1]),schema)
df.show(truncate=False)

I'm trying to convert an rdd into Dataframe. Additionally, I want to define the schema for it. However, pyspark.createDataFrame(rdd, schema) returns just null values, even though the rdd has data. Further, I get [Ljava.lang.Object;@701ea4c8 in the output too. So what am I missing here?


Solution

  • Your post cover 2 questions:

    1. Why all columns will be null even I declare the schema when I transform the RDD to dataframe: In your schema, you use StructTypeColumn.StructFiedColumn (eg host.id) to get the value in RDD. However, this type of selection statement could only work when you use Spark SQL select statement and I think there is no such parsing here. To achieve your goal, you might have to update your lambda function inside map function to extract the exact element like rdd_trans = rdd.map(lambda x: (x[1]['host']['id'], x[1]['event']['id'], ))

    2. Why the output of tag column is not shown as expected: It's because when you declare your tag column, you declare it as a string column, you should use ArrayType instead.