« Back
in stream graphs akka graphdsl throttling actor read.
Journey with GraphDSL Vol.1

Journey with GraphDSL Vol.1.

If you have CS or C.Eng. background probably you know what graph is. In graph theory, there are many states and combinatorial logics used to transform input to output. Each phase wired with vertexes to another. For a long time, we(software engineers) haven't used graph directive domain specific language in production. State machines are already in our lives with washing machines, fridges and including household equipment. They have all based on a Finite State Machine transformations actually. But we haven't had state programming DSL for pipelines until today.

We are processing lots of data every day that comes from millions of requests. Data should be processed quickly and services should be resilient enough for high loads as much as possible. One of the best choices for implementing this was Akka and it has GraphDSL to code your own pipeline. Akka Stream Graphs can process a great amount of data and it is easy to write with a DSL which is built-in. Furthermore, it allows engineers to design a processing pipeline visually too. It supplies batteries-included approach for defining states and state transformations.

Streaming pipelines classified by two:

  • Feedback mechanism based pipelines (contains cyclic graphs and backpressure system)
  • Linear pipelines

In computer science, feedback based pipelines can be called as Mealy machines. Mealy machines transform the output respectively with input and system's current state. In our systems "system's current state" is either giving feedback to the input supplier or merging multiple state's outputs to combine a final output which makes it Mealy machine. Linear state transformed pipelines are called as Moore machines because they are producing output as long as the input is received and transformed.

In our team, we started working on designing the pipeline. There are messages coming in and they should be saved into the database and in a case of error, it should retry the save.

in ~> databaseSave ~> countAllMessages ~> out  

This was my initial implementation, it lacks retry mechanism and many auxiliary strategies including error handling. I really liked the writing of this code while I write flow functions that will process data. For the sake of separation of concerns, I separated flow functions to a different class. This code was a Moore machine and as you can see it was digraph.

We had a problem to solve with our team in our processing pipeline. The problem was calculating success rate to come up with a retry mechanism. Retry strategy should give feedback to the incoming source to reflect choke effect(which may be translated as throttling). First, we need to calculate retry rate. Retry rate should be inside of a sliding window that is calculated through the stream when a specific state transformation happened. This comes with a great burden at first. It may block async processing or even delay message processing. Actually, this was my initial reason to select GraphDSL over bare Scala Streaming API. Just because we don't want to block incoming message processing stages with our retry rate calculation; I want to use GraphDSL to distribute messages to different states like:

case class Message(  
  record: Option[Record],
  isSaved: Boolean = false
)

val bcast = builder.add(Broadcast[Message](2))


in ~> databaseSave ~> bcast ~> countAllMessages ~> out  
                      bcast ~> calculateErrorRate ~> out

Akka Graph's built-in broadcasting system allows us to broadcast messages that are carrying their status to use with a simple filter of failure. Then we implemented a sliding window over it and calculated the failure rate and saved rate to shared variable which will be used across threads.

Cool... This allowed us to continue to processing while calculating failure rate without any side effect(except JVM context switches...) Now we have a multi-stage graph which looks like:

We were thinking that writing code like that is really explanatory for stages and their connections. You don't need to look weird branch jumps and guard clauses etc. It was clear as crystal.

Retry implementations made by my colleague. He made this implementation really clearly and explicitly. The second iteration was implementing the retry strategy. Meanwhile, everything was being processed, the failure rate has been calculated continuously by pipeline and it is used in next stage for retrying failures only.

in ~> databaseSave ~> bcast ~> countAllMessages ~> retrySave ~> out  
                      bcast ~> calculateErrorRate ~> out

By doing this our system had become a feedback based system. Hence, it was adjusting itself and recovering from any failure of database saves.

We need to calculate and see how many times we have retries on database saves. We thought that this secondary error handling actually will be our congestion and backpressure system to system's message source. So we manipulated the system to use yet another rate to hold this retry rate too.

val bcast2 = builder.add(Broadcast[Message](2))

in ~> databaseSave ~> bcast ~> countAllMessages ~> retrySave ~> bcast2 ~> out  
                      bcast ~> calculateErrorRate ~> out        bcast2 ~> retryCalculator ~> out

This can be noted as:

Now it was the time for making the auto-throttling algorithm to suppress incoming messages with choke effect if everything goes wrong. We were going to use retry calculation to prevent the source from pushing so many messages. Even Akka streams have this effect called backpressure, we want to be sure about we are not going to write malformed records or extremely inconsistent records to the database.

Our final state was:

in ~> dynamicThrottler ~> databaseSave ~> bcast ~> countAllMessages ~> retrySave ~> bcast2 ~> out  
                                          bcast ~> calculateErrorRate ~> out        bcast2 ~> retryCalculator ~> out

With our dynamic throttling system, we were able to slow down our message source with an interval of the inversely proportioned rate of retries.

For now, we haven't benchmarked this approach... But for me, this will be fast and resilient enough for a critical service. I enjoyed while writing this code and defining every processing step. While we are implementing this architecture, I think we enjoyed and learned a lot. Thanks to my colleagues and Akka friends in Akka community for their help and fast responses to important questions.

If you have suggestions or questions please add your comments below.

EDIT: Benchmarks made...

Server specs were:

  • 1 Core
  • 4096 MB Memory (Half of it is used... Which is cool also)

Under high load (Millions of messages) results were like:

200K messages per minute and in 7 minutes processed 1.3 Million messages.

Have a nice day...
Here is a 🐟 again.

comments powered by Disqus