Search code examples
amazon-web-servicesapache-sparkpysparkamazon-sqs

sending lots of AWS SQS messages---FAST


I have an application that may need to send hundreds of thousands of messages each run of my program using SQS. The program takes 1-2 hours/run and I run it 5-10 times/day. So that's roughly 1 million messages/day.

I want to do it fast. Is my best approach to:

  1. Send each with its own send-message, but send them in another thread so my main thread doesn't pause?
  2. Use send-message-batch, which lets me send 10 messages at a time?
  3. OMG. Why am I sending so many messages? Why not write them all into a big object, save the object in S3, and then send a pointer to the object with SQS?

My messages are the stdout and stderr of programs that are running in a distributed system. So the problem with #3 above is that I won't get the output of the program until the batching happens. I suppose that I could batch up every 60 seconds.

I'm sure that this has come up for other people. Is there a clever way to do this in the AWS SQS API that I am missing?

Kinesis is not an option in my environment.

We are currently sending the messages from python programs running on Apache Spark workers---about 2000 cores/cluster---and other monitoring systems and about 5-20 clusters. The messages will go to a lambda server. The problem is that some of the nodes send a few thousand messages within the course of 10-20 seconds

We tried using Spark itself to collect this information, storing it in an RDD, saving that RDD in S3, and so on. The problem with that approach was that we didn't get real-time monitoring, and we added several hours to processing time. (We're not entirely sure why it added so much time, but it's possible that Spark ended up re-computing some RDDs because some stuff would no longer fit in RAM or on the spill disks.)


Solution

  • We solved this problem three ways:

    1. We created work queue with a consumer running in a separate queue. The consumer received messages from the worker and sent them off in batches of 10. If no message was received within a few seconds, the queue was flushed.

    The full code is here: https://github.com/uscensusbureau/DAS_2020_Redistricting_Production_Code/blob/5e619a4b719284ad6af91e85e0548077ce3bfed7/source/programs/dashboard.py

    The relevant class is below.

    #
    # We use a worker running in another thread to collect SQS messages
    # and send them asychronously to the SQS in batches of 10 (or when WATCH_TIME expires.)
    #
    
    def sqs_queue():
        return boto3.resource('sqs',
                              config = botocore.config.Config(
                                  proxies={'https':bcc_https_proxy().replace("https://","")}
                              )).Queue(das_sqs_url())
    
    SQS_MAX_MESSAGES=10             # SQS allows sending up to 10 messages at a time
    WATCHER_TIME=5                  # how long to collect SQS messages before sending them
    EXIT='exit'                     # token message to send when program exits
    
    
    # SQS Worker. Collects
    class SQS_Client(metaclass=Singleton):
        """SQS_Client class is a singleton.
        This uses a python queue to batch up messages that are send to the AWS Quwue.
        We batch up to 10 messages a time, but send every message within 5 seconds.
        """
        def __init__(self):
            """Set up the singleton by:
            - getting a handle to the SQS queue through the BCC proxy.
            - Creating the python queue for batching the requests to the SQS queue.
            - Creating a background process to flush the queue every 5 seconds.
            """
            # Set the default
            if TRY_SQS_SECOND not in os.environ:
                os.environ[TRY_SQS_SECOND]=YES
    
            self.sqs_queue = sqs_queue()  # queue to send this to SQS
            self.pyqueue  = queue.Queue() # producer/consumer queue used by dashboard.py
            self.worker = threading.Thread(target=self.watcher, daemon=True)
            self.worker.start()
            atexit.register(self.terminate)
    
        def flush(self, timeout=0.0):
            """Flush the pyqueue. Can be called from the main thread or the watcher thread.
            While there are messages in the queue, grab up to 10, then send them to the sqs_queue.
            Returns last message processed, which may be EXIT.
            The watcher repeatedly calls flush() until it receives an Exit.
            """
            entries = []
            msg     = None
            t0      = time.time()
            while True:
                try:
                    msg = self.pyqueue.get(timeout=timeout, block=True)
                except queue.Empty as e:
                    break
                if msg==EXIT:
                    break
                msg['Id'] = str( len( entries ))
                entries.append(msg)
                if len(entries)==SQS_MAX_MESSAGES:
                    break
                if time.time() - t0 > timeout:
                    break
            if entries:
                # Send the 1-10 messages.
                # If this fails, just save them in S3.
                try:
                    if os.getenv(TRY_SQS_SECOND)==YES:
                        self.sqs_queue.send_messages(Entries=entries)
                        entries = []
                except botocore.exceptions.ClientError as err:
                    logging.warning("Cannot send by SQS; sending by S3")
                    os.environ[TRY_SQS_SECOND]=NO
            if entries:
                assert os.getenv(TRY_SQS_SECOND)==NO # should have only gotten here if we failed above
                for entry in entries:
                    send_message_s3(entry['MessageBody'])
            return msg
    
        def watcher(self):
            """Repeatedly call flush().
            If the flush gets exit, it returns EXIT and we EXIT.
            """
            while True:
                if self.flush(timeout=WATCHER_TIME)==EXIT:
                    return
    
        def queue_message(self, *, MessageBody, **kwargs):
            self.pyqueue.put({'MessageBody':MessageBody})
    
        def terminate(self):
            """Tell the watcher to exit"""
            self.flush()
            self.pyqueue.put(EXIT)
    
    

    However, we were still unsatisfied with this, as emptying the SQS queue was also slow, and there is poor visibility into the queues.

    1. We developed a system that used S3 as a message queue. Create objects with a given bucket and prefix and then a random string, and then remove them in the consumer. Different consumers used different prefixes of the random string.

    2. We implemented a traditional system with HTTP REST and with the python server running under mod_wsgi. This was the most performant.