Search code examples
pandasapache-sparkpysparkaws-glueaws-glue-spark

How to join / concatenate / merge all rows of an RDD in PySpark / AWS Glue into one single long line?


I have a protocol that needs to take in many (read millions) of records. The protocol requires all of the data is a single line feed (InfluxDB / QuestDB). Using the InfluxDB client isn't currently an option so I need to do this via a socket.

I am at the end of my ETL process and I now just have to take the final RDD I have created and take all of those rows and transpose them into a single line but can't figure out how to do this (and how to do it properly!)

In PySpark / AWS Glue I currently have:

def write_to_quest(df, measurement, table, timestamp_field, args):
    HOST = args['questdb_host']
    PORT = int(args['questdb_port'])
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        sock.connect((HOST, PORT))
        rows = df.rdd.map(lambda row: row.asDict(True))
        new_rdd = rows.map(lambda row: 
                                 _row_to_line_protocol(row, measurement, table, timestamp_field)).glom()

        #transform new_rdd to single_line_rdd here

        sock.sendall((single_line_rdd).encode())

    except socket.error as e:
        print("Got error: %s" % (e))

Called by:

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

allDaily = glueContext.create_dynamic_frame.from_catalog(database=args['db_name'],
                                                         table_name="daily",
                                                         transformation_ctx="allDaily",
                                                         push_down_predicate="(date_str='20040302' and meter_id='NEM1206106')"
                                                         # for faster testing
                                                         )

# TODO: Handle entire DF as a single payload
df = allDaily.toDF()
tdf = df.withColumn('reading_date_time', F.to_timestamp(df['reading_date_time'], '%Y-%m-%dT%H:%M:%S.%f'))
tdf = tdf.drop(*["ingestion_date", "period_start", "period_end", "quality_method",
                 "event", "import_reactive_total", "export_reactive_total"])

write_to_quest(df=tdf, measurement="meter_id", table="daily", timestamp_field="reading_date_time", args=args)

The shape of new_rdd is a set of lists of strings:

RDD[
['string here,to,join','another string,to,join'...x70]
['string here,to,join','another string,to,join'...x70]
['string here,to,join','another string,to,join'...x70]
x200
]

enter image description here

How do I get this so I have a single line that has everything joined by '\n' (newline)?

e.g:

'string here,to,join\nanother string,to,join\n....'

I have so far tried several combinations of foreach like:

foreach(lambda x: ("\n".join(x)))

But to absolutely no avail, I am also concerned about scalability for this - for example I am pretty sure using .collect() on millions of rows is going to kill things

So what is the best way to solve this final step?

Edit after accepted answer

The specific solution from Werners answer I implemented was this (I removed Glob to get one list item per row and then removed the whitespace (as Influx / Quest is whitespace sensitive)

def write_to_quest(df, measurement, table, timestamp_field, args):
    """
    Open a socket and write the row directly into Quest
    :param df_row:
    :param measurement:
    :param table:
    :param timestamp_field:
    :param args:
    :return:
    """
    HOST = args['questdb_host']
    PORT = int(args['questdb_port'])
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        sock.connect((HOST, PORT))
        rows = df.rdd.map(lambda row: row.asDict(True))
        new_rdd = rows.map(lambda row:
                            _row_to_line_protocol(row, measurement, table, timestamp_field))
        result = new_rdd.map(lambda r: "".join(r) + "\n") \
            .aggregate("", lambda a, b: a + b, lambda a, b: a + b)
        
        sock.sendall((result.encode()))


    except socket.error as e:
        print("Got error: %s" % (e))

    sock.close()

Solution

  • Each row of the rdd can be mapped into one string per row using map and then the result of the map call can be aggregated into one large string:

    result = rdd.map(lambda r: " ".join(r) + "\n")\
        .aggregate("", lambda a,b: a+b, lambda a,b: a+b)
    

    If the goal is to have one large string, all the data has to be moved to a single place at least for the final step. Using aggregate here is slightly better than collecting all rows and concatinating the strings on the driver as aggregate can do things distributed and in parallel for most of the time. However enough memory for the whole final string is still required on a single node.