I have software that is processing approximately 200k records per 5 minutes from SQS. That’s about 40k/ minute, or 666 records per second. Not bad!
The only problem is, I had to run 25 instances of my python program to get that through put. Ummm.. that’s not cool.. it should be WAY faster than that. Shoot, my back end is elasticsearch, which I KNOW can EASILY support dozens of thousands of inserts per second.
My code isn’t terribly so either, so I was definitely bound up on the ingest from SQS.
Even batching, reading 10 messages at a time, didn’t help.. well, it did.. reading one message at a time was abysmal.
I kept reading about ‘infinite’ throughput with SQS, and figured out how to do it. Well, computers aren’t infinite, so that’s not exactly right, but, I was able to linearly increase throughput by having multiple consumer threads per processing thread in my code… since I then output to another SQS queue, I have multiple OUTPUT threads. Now, the code I wrote is a multi-threaded beast.
- Multi thread your input, Use something like python Threading and Queues, or any other threading library with 0mq
- Multi thread your output.. same thing.
- .. bonus tip, be nice to AWS
Think of your code as a pipeline, because that’s really what it is. Input -> Processing -> Output
If you have multiple threads reading from SQS, they can each block on IO all day long. That type of thread should be super duper light-weight. Essentially, read from the SQS Queue and shove the record into your Thread safe work queue (0MQ or a python Queue). I’m a huge fan of Python since I can code and get something working fast and efficiently. Sure, I could GoLang it, but Python makes me happy.
Here’s a quick code snippet of an example SQS reading thread:
class ReceiveMessagesThread(Thread): def __init__(self, thread_id, inbound_queue_name, outbound_queue): Thread.__init__(self) self.thread_id = thread_id self.outbound_queue = outbound_queue sqs = boto3.resource('sqs') LOGGER.debug("Starting up %s and looking at inbound queueue: %s", self.thread_id, inbound_queue_name) self.inbound_queue = sqs.get_queue_by_name(QueueName=inbound_queue_name) def run(self): while True: messages = self.inbound_queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=5) for message in messages: while self.queue.full(): time.sleep(0.01) self.outbound_queue.put( message )
Like I said, super simple. That “outbound_queue” object is a Python Queue. This is a thread safe FIFO Queue that can be shared. In my example, I have a single consume thread that reads the objects that my ReceiveMessageThread puts in. Once they’re read and processed, then, I have the same construct for down-streaming messages to another SQS Queue.
Oh.. here’s the second (bonus?) tip, I apparently can’t count today.
See that “WaitTimeSeconds=5”? My first attempt didn’t have that Wait. What would happen then, would be a lot of “Empty Receives”. that ‘messages’ array would be empty. No big deal. Code can handle it.. and I’m sure AWS doesn’t mind too much if you spam them, but I figured I’d try not to DDOS them…. check out the graph…. this is the difference between not having a WaitTimeSeconds and having one.
See that HUGE drop? yeah, I stopped DDOS-ing AWS. I’m sure that’ll save me some money to, or something like that. OH.. Dang.. I just read the pricing for SQS. Yep. That’ll save me cashola. Each API call (receive_messages) counts as an action. Don’t tell my boss that I didn’t do the timeout.. ha!
After these updates, I’m able to process the same 200k records per 5 minutes, but now, I only need 3 instances of my code running. That’ll free up a TON of compute in my Elastic Container Services cluster.