Search code examples
pythonapache-sparkspark-structured-streamingsocketserverdstream

Spark's socket text stream is empty


I am following Spark's streaming guide. Instead of using nc -lk 9999, I have created my own simple Python server as follows. As can be seen from the code below, it will randomly generate the letters a through z.

import socketserver
import time
from random import choice

class AlphaTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print('AlphaTCPHandler')
        alphabets = list('abcdefghikjklmnopqrstuvwxyz')

        try:
            while True:
                s = f'{choice(alphabets)}'
                b = bytes(s, 'utf-8')
                self.request.sendall(b)
                time.sleep(1)
        except BrokenPipeError:
            print('broken pipe detected')

if __name__ == '__main__':
    host = '0.0.0.0'
    port = 301

    server = socketserver.TCPServer((host, port), AlphaTCPHandler)
    print(f'server starting {host}:{port}')
    server.serve_forever()

I tested this server with client code as follows.

import socket
import sys
import time

HOST, PORT = 'localhost', 301
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:
    sock.connect((HOST, PORT))
    print('socket opened')

    while True:    
        received = str(sock.recv(1024), 'utf-8')
        if len(received.strip()) > 0:
            print(f'{received}')
        time.sleep(1)
finally:
    sock.close()
    print('socket closed')

However, my Spark streaming code does not seem to receive any data or it is not printing anything. The code is as follows.

from pyspark.streaming import StreamingContext
from time import sleep

ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')

lines = ssc.socketTextStream('0.0.0.0', 301)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

counts.pprint()

ssc.start()
sleep(5)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

All I see from the output is the repeated pattern below.

-------------------------------------------
Time: 2019-10-31 08:38:22
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 08:38:23
-------------------------------------------

-------------------------------------------
Time: 2019-10-31 08:38:24
-------------------------------------------

Any ideas on what I'm doing wrong?


Solution

  • Your streaming code is working properly. It is your server that is feeding it the wrong information - there are no line separators after each letter, so what Spark sees is one constantly growing line and it simply keeps waiting for that line to finish, which never happens. Modify your server to send a new line with each letter:

    while True:
        s = f'{choice(alphabets)}\n'  # <-- inserted \n in here
        b = bytes(s, 'utf-8')
        self.request.sendall(b)
        time.sleep(1)
    

    And the result:

    -------------------------------------------
    Time: 2019-10-31 12:09:26
    -------------------------------------------
    ('t', 1)
    
    -------------------------------------------
    Time: 2019-10-31 12:09:27
    -------------------------------------------
    ('t', 1)
    
    -------------------------------------------
    Time: 2019-10-31 12:09:28
    -------------------------------------------
    ('x', 1)