Search code examples
apache-sparkpysparkazure-databricks

show the data collected in pyspark


Below code runs without any error where I am trying to read data from a JSON file in my storage account. I would like to know how can I see the Output, the contents here in Databricks notebook. The file is quite long, so I just need to verify the output is what I am looking for? so would like to see first 10 items.. How do we do that?

import re
import json
%pip install azure
import azure
from azure.storage.blob import AppendBlobService

abs = AppendBlobService(account_name="azurestorage", account_key="mykey")
base_path = "resourceId=/SUBSCRIPTIONS/5315MyId/RESOURCEGROUPS/AZURE-DEV/PROVIDERS/MICROSOFT.CONTAINERSERVICE/MANAGEDCLUSTERS/AZURE-DEV/y=2022/m=05/d=23/h=13/m=00/PT1H.json"
pattern = base_path + "/*/*/*/*/m=00/*.json"
filter = glob2re(pattern)
df1 = (
    spark.sparkContext.parallelize(
        [
            blob.name
            for blob in abs.list_blobs("insights-logs-kube-audit", prefix=base_path)
            if re.match(filter, blob.name)
        ]
    )
    .map(
        lambda blob_name: abs.get_blob_to_bytes("insights-logs-kube-audit", blob_name)
        .content.decode("utf-8")
        .splitlines()
    )
    .flatMap(lambda lines: [json.loads(l) for l in lines])
    .collect()
)

Solution

  • collect() :- PySpark RDD/DataFrame collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group() e.t.c.

    take(num) :-It returns the first num rows as a list of Row.

    DataFrame.take(num)

    import re
    import json
    %pip install azure
    import azure
    from azure.storage.blob import AppendBlobService
    
    abs = AppendBlobService(account_name="azurestorage", account_key="mykey")
    base_path = "resourceId=/SUBSCRIPTIONS/5315MyId/RESOURCEGROUPS/AZURE-DEV/PROVIDERS/MICROSOFT.CONTAINERSERVICE/MANAGEDCLUSTERS/AZURE-DEV/y=2022/m=05/d=23/h=13/m=00/PT1H.json"
    pattern = base_path + "/*/*/*/*/m=00/*.json"
    filter = glob2re(pattern)
    df1 = (
        spark.sparkContext.parallelize(
            [
                blob.name
                for blob in abs.list_blobs("insights-logs-kube-audit", prefix=base_path)
                if re.match(filter, blob.name)
            ]
        )
        .map(
            lambda blob_name: abs.get_blob_to_bytes("insights-logs-kube-audit", blob_name)
            .content.decode("utf-8")
            .splitlines()
        )
        .flatMap(lambda lines: [json.loads(l) for l in lines])
        .df1.take(10)
    )
    

    Refer - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.take.html