De-Coder’s Ring

Consumable Security and Technology

Category: api (page 1 of 2)

Announcing: Metrobus – A framework to simplify message bus/Kafka based microservices

I’ve been wanting to write this code for a while.  If I were as efficient at writing in Java, I would have done so, but, I’m slower and therefore wrote it in python.  It’s not very pythonic, but, whatever.

Metrobus is a framework that allows you to focus on your microservices application logic, not logic around pulling and pushing to Kafka.    Kafka for now.  Others to come soon.  All the details you could ever want on the github page.    I’ll be adding tickets in order to track some todo items.


Proof of concept and example for smart routing on a dumb bus.

This is a small project focused on my blog posts around routing on a message bus that’s dumb. Like Kafka.

Stateless and dumb:

Fast Cache:

I use some of the caching ideas in here for a few of my data lookups.

The concept for the ‘test’ example application is fairly simple. The ‘pusher’ generates records, as if from a client or application. These records are simple JSON structures. For our example, we get cool things like an account number (FAKE!). The ‘pusher’ sends the message to the ‘Source’ topic on Kafka. Consider this your public entry point for upstream clients.


KNOWN TO DO ITEMS (Cause you know, SHIP IT!)

  • Simplify the logic or break up the code for the main handling function in
  • Determine best how to handle different situations like
    • Send to Error log
    • Send to dead letter queue
    • Dropped on purpose
  • Is it too simple?   what am I missing?


Routing Messages through Kafka

I’m going through a major project with a client in regards to migrating to a Kafka based streaming message platform.  This is a heckuva lot of fun.   Previously, I’ve written about how Kafka made the message bus dumb, on purpose:   find it here. 

Building out a  new streaming platform is an interesting challenge.  There are so many ways to handle the logic.  I’ll provide more details as time goes on, but there are at least three ways of dealing with a stream of data.

The Hard Wired Bus

A message bus needs various publishers and subscribers.  You can very tightly couple each service by having them be aware of what’s upstream or what’s downstream.  Upstream is where the message came from, downstream is where it goes next.  When each component is aware of the route a message must take, it becomes brittle and hard to change over time.  Imagine spaghetti.

The Smart Conductor

A conductor travels on the train.  The conductor determines the best route while moving along the train tracks.  The conductor can handle every message, after every service to determine which service is next in line.  This cleans up the function of each service along the line but makes the conductor pretty brittle too.  The more complex the system gets, the more complex the conductor gets.  A rules engine would be a great component to add to the conductor if you choose this path.

The Map Maker

A map maker plots a route for a hiker to follow.  In our case, the map maker is a component that sits at the very beginning of every stream.  When the event comes to the map makers topic (in Kafka), the map maker determines the best route for the message to take.  Using metadata or embedded data in the event, the map maker can send the route down the chain with the event itself.  Each service can use a common library to read from its configured topic, allow the custom service to do some work, and then use the wrapper again to pass the message downstream.  The biggest advantage here is that each service doesn’t care where the message comes from, or where it goes next.   This works great for streams that are static, and the route can be determined up front.    If there are decisions down stream, then it may need a ‘switch’ service that is allowed to update the route.

What’s the best path for your application?    Have something better that I haven’t thought of yet?






SQS Cost Optimization: Save $1684 per month

I made a bone head move.  Yes, I admit it.

Amazon SQS has always been talked about as ‘free’.  In terms of passing messages for an application, it’s supposed to be freaking cheap as can be.

I was blown away when my July 2017 SQS bill was $1720!!

What?  How’s that FREE?!

Digging into my SQS reports, I made 4.3 billion (with a B) SQS calls.    Billed at $0.0000004 per call, that adds up to $1,720!

Well, my architecture would only be scaling up from there.. I had to do something about it.

I moved to Kafka.

… but that’s not the point of this post.  I realized, later, that I could have been close to free, and optimized a ton of my downstream pipeline.

SQS messages are billed on the 64k of data chunk.    My messages were averaging 1,300 bytes (1.3k).   Doing some quick math, I could have been batching up to 49 ‘messages’ at a time per SQS call.   This would save my producer, and my consumer a ton of API calls to SQS.

If I can batch 49 ‘messages’ per API call, then my 4.3 billion calls, becomes about 87.8 million SQS calls.

87.8 million SQS calls becomes $35.10

Too late for this implementation (Kafka is better in the long run in my opinion), but if the goal were server/infrastructure-less implementation, then shoot. I could have saved $1,684 per month.

Information on SQS pricing can be found here:


Batch your data before pushing to SQS, save moneys…. #profit

Two (or Three?) Tips to Improve AWS SQS Throughput

I have software that is processing approximately 200k records per 5 minutes from SQS.   That’s about 40k/ minute, or 666 records per second.  Not bad!

The only problem is, I had to run 25 instances of my python program to get that through put.  Ummm.. that’s not cool.. it should be WAY faster than that.  Shoot, my back end is elasticsearch, which I KNOW can EASILY support dozens of thousands of inserts per second.

My code isn’t terribly so either, so I was definitely bound up on the ingest from SQS.

Even batching, reading 10 messages at a time, didn’t help.. well, it did.. reading one message at a time was abysmal.

I kept reading about ‘infinite’ throughput with SQS, and figured out how to do it. Well, computers aren’t infinite, so that’s not exactly right, but, I was able to linearly increase throughput by having multiple consumer threads per processing thread in my code… since I then output to another SQS queue, I have multiple OUTPUT threads.    Now, the code I wrote is a multi-threaded beast.

  1. Multi thread your input, Use something like python Threading and Queues, or any other threading library with 0mq
  2. Multi thread your output.. same thing.
  3. .. bonus tip, be nice to AWS

Think of your code as a pipeline, because that’s really what it is.  Input -> Processing -> Output

Pipeline that mess

If you have multiple threads reading from SQS, they can each block on IO all day long. That type of thread should be super duper light-weight.  Essentially, read from the SQS Queue and shove the record into your Thread safe work queue (0MQ or a python Queue).    I’m a huge fan of Python since I can code and get something working fast and efficiently.  Sure, I could GoLang it, but Python makes me happy.

Here’s a quick code snippet of an example SQS reading thread:


Like I said, super simple.  That “outbound_queue” object is a Python Queue.  This is a thread safe FIFO Queue that can be shared. In my example, I have a single consume thread that reads the objects that my ReceiveMessageThread puts in.   Once they’re read and processed, then, I have the same construct for down-streaming messages to another SQS Queue.

Oh.. here’s the second (bonus?) tip, I apparently can’t count today.

See that “WaitTimeSeconds=5”?    My first attempt didn’t have that Wait.  What would happen then, would be a lot of “Empty Receives”. that ‘messages’ array would be empty.  No big deal.  Code can handle it.. and I’m sure AWS doesn’t mind too much if you spam them, but I figured I’d try not to DDOS them…. check out the graph…. this is the difference between not having a WaitTimeSeconds and having one.

Less Empty – Full?


See that HUGE drop?   yeah, I stopped DDOS-ing AWS.  I’m sure that’ll save me some money to, or something like that.   OH.. Dang.. I just read the pricing for SQS. Yep.  That’ll save me cashola.  Each API call (receive_messages) counts as an action.   Don’t tell my boss that I didn’t do the timeout.. ha!

After these updates, I’m able to process the same 200k records per 5 minutes, but now, I only need 3 instances of my code running.    That’ll free up a TON of compute in my Elastic Container Services cluster.


Elasticsearch Maintenance with Jenkins


Maintaining production systems is one of those unfortunate tasks that we need to deal with…  I mean, why can’t they just run themselves?   I get tired of daily tasks extremely quickly.   Now that I have a few ongoing Elasticsearch clusters to deal with, I had to come up with a way to keep them singing.

As a developer, I usually don’t have to deal with these kind of things, but in startup world, I get to do it all from maintenance, monitoring, development, etc.

Jenkins makes this kind of stuff super easy.   With a slew of python programs, that use parameters/environment variables to connect to the right Elasticsearch cluster, I’m able to perform the following tasks, in order (order is key)

  1.  Create Snapshot
  2. Monitor Snapshot until it’s done
  3. Delete Old Data ( This is especially interesting in our use case, we have a lot of intentional False Positive data for connectivity testing)
  4. Force Merge Indices

I have Jenkins set up to trigger the down stream jobs after the prior completes.

I could do a cool Jenkins Pipeline…. in my spare time.


Daily snapshots are critical in case of cluster failure.   With a four node cluster, I’m running in a fairly safe setup, but if something goes catastrophically bad, I can always restore from a snapshot.   My setup has my snapshots going to AWS S3 buckets.

Delete Old Data:

When dealing with network monitoring, network sensors and storing of NSM data (see Suricata NSM Fields ), we have determined one easy way to test end to end integration is by inserting some obviously fake False Positives into our system.   We have stood up a Threat Intelligence Platform (Soltra Edge) to serve some fake Indicator/Observables.,, etc.   They show up in everyone’s networks if there is user traffic.   Now, this is great to determine connectivity, but long term that comes to be LOTS of traffic that I really don’t need to store…. so, they get deleted.

Force Merge Indices

There is a lot of magic that happens in Elasticsearch.  Thats’s fantastic.  Force Merging allows ES to effectively shrink the number of segments in a shard, thereby increasing performance when querying it.  This is really only useful for indices that are no longer receiving data.  In our use case, that’s historical data.  I delete the old data, then force merge it.


A day in the life.. of Jenkins.





Older posts

© 2018 De-Coder’s Ring

Theme by Anders NorenUp ↑