I have an application that may need to send hundreds of thousands of messages each run of my program using SQS. The program takes 1-2 hours/run and I run it 5-10 times/day. So that's roughly 1 million messages/day.
I want to do it fast. Is my best approach to:
My messages are the stdout and stderr of programs that are running in a distributed system. So the problem with #3 above is that I won't get the output of the program until the batching happens. I suppose that I could batch up every 60 seconds.
I'm sure that this has come up for other people. Is there a clever way to do this in the AWS SQS API that I am missing?
Kinesis is not an option in my environment.
We are currently sending the messages from python programs running on Apache Spark workers---about 2000 cores/cluster---and other monitoring systems and about 5-20 clusters. The messages will go to a lambda server. The problem is that some of the nodes send a few thousand messages within the course of 10-20 seconds
We tried using Spark itself to collect this information, storing it in an RDD, saving that RDD in S3, and so on. The problem with that approach was that we didn't get real-time monitoring, and we added several hours to processing time. (We're not entirely sure why it added so much time, but it's possible that Spark ended up re-computing some RDDs because some stuff would no longer fit in RAM or on the spill disks.)
We solved this problem three ways:
The full code is here: https://github.com/uscensusbureau/DAS_2020_Redistricting_Production_Code/blob/5e619a4b719284ad6af91e85e0548077ce3bfed7/source/programs/dashboard.py
The relevant class is below.
#
# We use a worker running in another thread to collect SQS messages
# and send them asychronously to the SQS in batches of 10 (or when WATCH_TIME expires.)
#
def sqs_queue():
return boto3.resource('sqs',
config = botocore.config.Config(
proxies={'https':bcc_https_proxy().replace("https://","")}
)).Queue(das_sqs_url())
SQS_MAX_MESSAGES=10 # SQS allows sending up to 10 messages at a time
WATCHER_TIME=5 # how long to collect SQS messages before sending them
EXIT='exit' # token message to send when program exits
# SQS Worker. Collects
class SQS_Client(metaclass=Singleton):
"""SQS_Client class is a singleton.
This uses a python queue to batch up messages that are send to the AWS Quwue.
We batch up to 10 messages a time, but send every message within 5 seconds.
"""
def __init__(self):
"""Set up the singleton by:
- getting a handle to the SQS queue through the BCC proxy.
- Creating the python queue for batching the requests to the SQS queue.
- Creating a background process to flush the queue every 5 seconds.
"""
# Set the default
if TRY_SQS_SECOND not in os.environ:
os.environ[TRY_SQS_SECOND]=YES
self.sqs_queue = sqs_queue() # queue to send this to SQS
self.pyqueue = queue.Queue() # producer/consumer queue used by dashboard.py
self.worker = threading.Thread(target=self.watcher, daemon=True)
self.worker.start()
atexit.register(self.terminate)
def flush(self, timeout=0.0):
"""Flush the pyqueue. Can be called from the main thread or the watcher thread.
While there are messages in the queue, grab up to 10, then send them to the sqs_queue.
Returns last message processed, which may be EXIT.
The watcher repeatedly calls flush() until it receives an Exit.
"""
entries = []
msg = None
t0 = time.time()
while True:
try:
msg = self.pyqueue.get(timeout=timeout, block=True)
except queue.Empty as e:
break
if msg==EXIT:
break
msg['Id'] = str( len( entries ))
entries.append(msg)
if len(entries)==SQS_MAX_MESSAGES:
break
if time.time() - t0 > timeout:
break
if entries:
# Send the 1-10 messages.
# If this fails, just save them in S3.
try:
if os.getenv(TRY_SQS_SECOND)==YES:
self.sqs_queue.send_messages(Entries=entries)
entries = []
except botocore.exceptions.ClientError as err:
logging.warning("Cannot send by SQS; sending by S3")
os.environ[TRY_SQS_SECOND]=NO
if entries:
assert os.getenv(TRY_SQS_SECOND)==NO # should have only gotten here if we failed above
for entry in entries:
send_message_s3(entry['MessageBody'])
return msg
def watcher(self):
"""Repeatedly call flush().
If the flush gets exit, it returns EXIT and we EXIT.
"""
while True:
if self.flush(timeout=WATCHER_TIME)==EXIT:
return
def queue_message(self, *, MessageBody, **kwargs):
self.pyqueue.put({'MessageBody':MessageBody})
def terminate(self):
"""Tell the watcher to exit"""
self.flush()
self.pyqueue.put(EXIT)
However, we were still unsatisfied with this, as emptying the SQS queue was also slow, and there is poor visibility into the queues.
We developed a system that used S3 as a message queue. Create objects with a given bucket and prefix and then a random string, and then remove them in the consumer. Different consumers used different prefixes of the random string.
We implemented a traditional system with HTTP REST and with the python server running under mod_wsgi
. This was the most performant.