On Italian Restaurants, Mafia Bosses & Web Sockets (Part 2)

This is the second and final part of 2 parts article, Part 1 is here

The first part focused on the need to create sequential multi queue Task Scheduler to solve the producer/consumer needs of downstream ordered messages on protocols such as Web Sockets.

This part is not going to be long ūüôā

  1. Code for the Web Socket  Server (on top the scheduler) along with the unit tests is published here https://github.com/khenidak/WebSocketsServer
  2. In brief: The web socket server is designed to work as a stand alone server with Owin (including self hosting) and to work on top of Service Fabric.
  3. The web server uses “Session Manager” to allow the hosting process to interact with active connected web sockets (such as find a socket, send a message down stream).
  4. The web socket themselves are strong types, for example a Customer socket type, an Order socket type and so on.
  5. The web socket supports, close (async), Abort, and Drain and close (async).

please go through the readme.md for more details and usage samples.

 

till next time

@khnidk

On Italian Restaurants, Mafia Bosses & Web Sockets (Part 1)

**Part 2 published here**

Prelog

Consider¬†your 50’s movie style Italian restaurant with one of those comically usually angry chefs. The restaurant kitchen operation is really simple. He – the chef – can cook many different menu items concurrently. But he can only cook one – and only one – plate/meal of each menu item at a time. He is a fair chef, orders cooked in first ordered, first served style. He cannot interrupt the cooking of menu item A for customer C1 for the same menu item A for customer C2. C1’s meal will never be nicely cooked and as a food respecting chef he will never allow that to happen. Now enter a mafia boss , who walks in – queue in the God Father theme music – the chef has to place whatever the mafia boss orders in front of the previously queued orders (come on it is his life on the line). Because the boss is also a food respecting person he will never ask the chef to drop currently cooking meals for his. He just asks for a place in front of the queue.

Aside from the theatrics the above analogy is commonly found in client/server communication (even when they are full duplex) including Web Sockets. Messages flow in order (per each session/client) with the ability to place one message in front of the queue (think of it as a high priority message). For example consider a very simple banking scenario where client sends the following (starting account balance is a 100$):

  1. Withdraw 50 (Account Balance: 50$).
  2. Deposit 100 (Account Balance: 150$).
  3. Withdraw 150 (Account Balance: 0$).

If message 2 and 3 came out of order the operation will fail although logically from the customer point he has the fund to execute the operation. Worse if you stopped one operation – in the middle of execution – for another you will end up with an account in an “unknown state”.

Detailing the Problem

The upcoming second part of this post I will focus more on the Web Sockets server implementation (and yes you guess it right it will be Azure Service Fabric friendly). But for now let us focus on solving the above problem.

The traditional route to solve this problem is to create producer/consumer queue-like data structure. Low priority messages placed at the end of the queue, high priority ones are placed in front of the queue.  Having a loop as a consumer picking up messages. As the following

SingleQueue

 

//Click to Enlarge

While this approach will work it will have the following problems (though you can hack your code to solve them, your code base will be a bit too complex to maintain).

  1. Ensure Fairness: One messaging/web socket session might overload the queue with large number of messages, the loop will be forced to stream them all down which means other sessions will have to wait. You can solve this by having multiple queues + multiple loops.
  2. Sent Notification:¬†Most of messaging systems maintain an-after¬†“send” journal-ing or logging operation. You can have this log as part of your loop (but then your other sessions will wait for send + logging).
  3. Wait Time: Your loop will implement pull > found new messages > execute or pull > not found > sleep sequence (or a similar paradigm). Which means the loop uses CPU even when there are no messages to process and the minimum wait time for a message (in a free queue) is equal to your sleep time + de-queue time.

The Solution

Using a .NET Custom Task Scheduler (details here) as the following :

Note: the term message and task used below in an inter exchangeable fashion.

  • The Scheduler sets on top of N *¬†2 queues (where N is the number of active messaging sessions, each session will have a high priority queue and low/normal priority queue).
  • The scheduler ensures that there is only one active thread working per messaging session and this thread executes one task at a time. While it will allow multiple threads working on different sessions concurrently. The worker thread de-queues high priority messages first then low priory messages.
  • The scheduler ensures that threads yields execution to other queues after a configurable number of messages. This ensures fairness among queues even under high pressure where all threads of the thread pool are in use. This only happens if we have multiple active messaging sessions. If we have only one active session then the scheduler will ensure it is drained in one go.
  • The Scheduler does not have a loop, it en queue work items in .NET thread pool only when there is work to do (to solve the wait/sleep time discussed above).
  • The scheduler uses .NET’s Task Class derivatives which means you can use c# await construct, task continuation, task chaining just as regular tasks (to elegantly solve the after-send processing/notification¬†problem discussed¬†above).
  • The scheduler has a “remove queue” method, allow you to remove a queue from the sessions (it drains then remove the queue).

The implementation

First we need to have a “Leveled Class” that is derived from Task class with 2 additions:

  1. Queue Id: Used to help the scheduler to identify which queue this task should be en-queued.
  2. Is High Priority: Flag Used to identify if the task should go in front of every other exiting task in the queue (except the currently executing task).
 public class LeveledTask : Task
    {
        public string QueueId = string.Empty;
        public bool IsHighPriority = false;
       // ... CTORs for base class 
     }

Then we need the scheduler itself which goes like that

   public class SequentialLeveledTaskScheduler : TaskScheduler
    {

        private ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>> m_HiQueues
            = new ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>>();

        private ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>> m_LowQueues
        = new ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>>();

        private ConcurrentDictionary<string, object> MasterQueueLocks
            = new ConcurrentDictionary<string, object>();


        public const int DefaulltMinTaskBeforeYield = 10;
        public const int DefaultMaxTaskBeforeYield  = 50;

        private int m_MaxTaskBeforeYield = DefaulltMinTaskBeforeYield;

        public int MaxTaskBeforeYield
            {
            set {
                if (value > DefaultMaxTaskBeforeYield)
                {
                    m_MaxTaskBeforeYield = DefaultMaxTaskBeforeYield;
                    return;
                }
                if (value < DefaulltMinTaskBeforeYield)
                {
                    m_MaxTaskBeforeYield = DefaulltMinTaskBeforeYield;
                    return;
                }

                m_MaxTaskBeforeYield = value;
            }
                get { return m_MaxTaskBeforeYield; }
            }

        public void AddQueue(string QueueId)
        {
            var bHighPrority = true;
            var bAddIfNotExist = true;

            GetOrAddQueue(QueueId, bHighPrority, bAddIfNotExist);
            GetOrAddQueue(QueueId, !bHighPrority, bAddIfNotExist);
        }



        public  IEnumerable<Task> GetScheduledTasks(string QueueId)
        {
            var hQ = GetOrAddQueue(QueueId, true, false);
            var lQ = GetOrAddQueue(QueueId, false, false);

            if (null == hQ || null == lQ)
                return null;

            var masterList = new List<Task>();

            masterList.AddRange(hQ);
            masterList.AddRange(lQ);
            

            return masterList;
        }


        private ConcurrentQueue<LeveledTask> GetOrAddQueue(string QueueId, bool isHighPriority, bool addIfNotExist = true)
        {
            if (addIfNotExist)
            {
                var hiQueue = m_HiQueues.GetOrAdd(QueueId, new ConcurrentQueue<LeveledTask>());
                var lowQueue = m_LowQueues.GetOrAdd(QueueId, new ConcurrentQueue<LeveledTask>());

                return (isHighPriority) ? hiQueue : lowQueue;
            }
            else
            {
                var qList = (isHighPriority) ? m_HiQueues : m_LowQueues;
                return qList[QueueId];
            }
        }

        private object GetOrAddLock(string QueueId, bool AddIfNotExist = true)
        {
            if (AddIfNotExist)
            {
                object oNewLock = new object();
                var o = MasterQueueLocks.GetOrAdd(QueueId, oNewLock);
                return o;
            }
            else
            {
                return MasterQueueLocks[QueueId];
            }
            
        }

        public void RemoveQueue(string QueueId)
        {
            LeveledTask lt = new LeveledTask(() =>
            {

                // this will remove the Q from the list of Q
                // but will not null it so if we have an going exection it will just keep on going. 
                // because the list of Q and locks no longer maintain a reference to lQ & HQ and lock
                // it will evantually be null

                Trace.WriteLine(string.Format("queue {0} will be removed", QueueId), "info");

                ConcurrentQueue<LeveledTask> q;
                object oLock;
                
                
                m_HiQueues.TryRemove(QueueId, out q);
                m_HiQueues.TryRemove(QueueId, out q);

                MasterQueueLocks.TryRemove(QueueId, out oLock);

            });

            lt.IsHighPriority = false;
            lt.QueueId = QueueId;
            lt.Start(this);

        }

        

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            var masterList = new List<Task>();

            foreach (var hqueue in m_HiQueues.Values)
                masterList.AddRange(hqueue);

            foreach (var lqueue in m_LowQueues.Values)
                masterList.AddRange(lqueue);
            return masterList;
        }

        protected override void QueueTask(Task task)
        {
            var leveledtask = task  as LeveledTask;
            if (null == leveledtask)
                throw new InvalidOperationException("this leveled sequential scheduler shouldn't be used with regular Task objects"); // bang!


            if (leveledtask.QueueId == null ||
                leveledtask.QueueId == string.Empty)
                throw new InvalidOperationException("Task scheduler received a task that does not have a queue assigned to it"); // bang!

            var Q = GetOrAddQueue(leveledtask.QueueId, leveledtask.IsHighPriority);
            Q.Enqueue(leveledtask);

            ProcessWork(leveledtask.QueueId);   
        }

        private void ProcessWork(string QueueId)
        {

          

            ThreadPool.UnsafeQueueUserWorkItem(w => {



                var oLock = GetOrAddLock(QueueId);

                bool bGotLock = false;
                Monitor.TryEnter(oLock, ref bGotLock);

                var hQ = GetOrAddQueue(QueueId, true);
                var lQ = GetOrAddQueue(QueueId, false);

                if (0 == hQ.Count && 0 == lQ.Count)
                    return; // was already completed.

                if (!bGotLock) // a thread from the thread pool is already looping on the Q. 
                {
                    //Trace.WriteLine(string.Format("Scheduler attempt to acquire lock on {0} and failed", QueueId), "info");
                    return;    // at any point of time ONLY one thread is allwoed to dequeue on the Q to enable order tasks
                }


                var ExecutedTasks = 0;
                
                
                while (
                            // should yield
                            ExecutedTasks <= m_MaxTaskBeforeYield || // don't yeild if we have only one queue. (ExecutedTasks > m_MaxTaskBeforeYield  && m_HiQueues.Count + m_LowQueues.Count == 2)  || 
                            // don't yeild if this queue has been removed, drain it before dropping the reference. 
                            (ExecutedTasks > m_MaxTaskBeforeYield && (!m_HiQueues.ContainsKey(QueueId) && !m_LowQueues.ContainsKey(QueueId) ) ) 
                      ) 
                             
                {


                    if (ExecutedTasks > m_MaxTaskBeforeYield && (!m_HiQueues.ContainsKey(QueueId) && !m_LowQueues.ContainsKey(QueueId)))
                        Trace.WriteLine(string.Format("Queue {0} has been removed. Draining.. (remaining {1} tasks)", QueueId, lQ.Count + hQ.Count), "info");


                    LeveledTask leveledTask = null;
                    var bFound = false;

                    // found in High Priority Queue
                    bFound = hQ.TryDequeue(out leveledTask);

                    // found in Low Priority Queue
                    if (!bFound && null == leveledTask)
                        lQ.TryDequeue(out leveledTask);

                    if (!bFound && null == leveledTask) //nothing here to work on
                        break;
                    //{
                    //Trace.WriteLine(string.Format("faild! count {0}/{1} queue {2}", hQ.Count, lQ.Count, QueueId), "info");
                    ///break;
                    //}

                    try
                    {
                        base.TryExecuteTask(leveledTask);
                    }
                    catch (Exception e)
                    {
                        
                        //TODO: check if we need to call unobserved exceptions
                        Trace.WriteLine(string.Format("Task Executer: Error Executing Task {0} {1}", e.Message, e.StackTrace), "error");
                    }

                    ExecutedTasks++;

                }

                if (0 == ExecutedTasks) // we were unsucessfull picking up tasks 
                    Trace.WriteLine(string.Format("Scheduler attempted to execute on queue {0} with count {1} and found 0 tasks", QueueId, lQ.Count + hQ.Count), "info");


                Monitor.Exit(oLock);
             

                

                if ((ExecutedTasks > m_MaxTaskBeforeYield && hQ.Count + lQ.Count > 0))
                {

                    // current thread is about to be released back to the pool (and we still have more as we yielded).
                    // call it back to ensure that remaining tasks will be executed (even if no more tasks are sent to the scheduler). 
                    Trace.WriteLine(string.Format("Queue {0} yielded thread {1} after {2} tasks",
                                                   QueueId,
                                                   Thread.CurrentThread.ManagedThreadId,
                                                   ExecutedTasks - 1));

                    Task.Run(() => { ProcessWork(QueueId);} );
                }



            }, null);
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // if tasks got inlined it will lose this position in the queue.
            // so we can not afford inline tasks here
            return false;
        }
    }

The magic happens in the following methods

QueueTask(Task task)

Where the task is validated, destination queue (per session and per priority) is identified then the task is en-queued and ProcessWork method is called.

ProcessWork(string QueueId)

This method en-queues a delegate in the thread pool as a work item. When the delegates executes it checks if there is a currently active worker on the queue if there is one then it returns. Then it keeps pulling messages (high priority then low priority) until it has to yield (With some special condition regarding draining the queue as discussed above). If it has yielded the execution, it creates a new task of just another call to ProcessWork(). This ensure that execution will keep going even when QueueTask is not being called.

A couple of additional notes, The scheduler does not support inline execution (because this will make us lose the order of tasks). The rest of class implementation is maintenance and management to the memory structures used by the scheduler instance.

From the client point of view (this could be your messaging server, in my case it is a Web Sockets server) a message is sent this way

 LeveledTask lt = new LeveledTask(() =&gt;
            {
             // the magic goes here (your send logic)
            });


            lt.QueueId = q; // queue is my messaging session id
            lt.IsHighPriority = IsHighPrority; 
            lt.Start(scheduler); // scheduler is an instance of the scheduler class. 
// I can now use lt to do await, or ContinueWith just like Task class. 
// Note: un observed exceptions will go to TaskScheduler.UnobservedTaskException as usual.

Epilog

This was part 1 of two parts post covering Web Sockets Server Implementation. This part is focused on the scheduler (which in a lot of ways is a stand alone component). Part 2 will cover the server implementation. I should be able to post part 2 next week. Once I do, I will upload the code along with unit tests to git hub.

till next time!

Questions/Comments
@khnidk

Azure Service Fabric: On State Management

Prelog

Making the right design decision is a mix of¬†Experience, Knowledge¬†and a bit of Art.¬†The phrase¬†“right design decision” in itself requires the understanding that design¬†is not an isolated abstract, it has context that spans requirements, environment to the people who actually about to write¬†the software.

Service Fabric employs a complex set of procedures that ensures that your data is consistent and resilient (i.e. if a cluster node goes down).

Understanding (knowing) what goes under the hood in your application platform is key to making better design decisions. Today I want to attempt to describe how state is managed in Service Fabric and different choices that you have and their implications on the performance and availability of your Service Fabric Application.

Strongly recommend to go through Service Fabric: Partitions before reading this one.

Replication Process & Copy Process

Service Fabric employs two distinct processes used to ensure consistency across a replica set (the number of replicas in a given partition for a  stateful service).

Replication Process: used during writing state data in your service.

Copy Process: used to build a new replica when an existing Active Secondary fails. Also used to recover an Active Secondary that was down for a while.

Both processes are described below.

Replication Process

So your Primary goes online start listening to requests and is about to write data using one of the reliable collection classes. Here is a break down of events:

  1. The Primary wraps the write operation (can be an add, update, or delete) puts it in a queue. This operation is synchronized with a transaction. At the other end of the queue your Active Secondary(s) are pumping out the operations as described in the next step. No data has been written yet to the Primary data store.
  2. Active Secondary(s) pick the operation(s) and apply them locally, apply them locally and acknowledge the Replicator one by one.
  3. The replicator delivers a call back to the Primary when write were success.
  4. Primary then can apply the data locally and return the original request caller.

Diving Even Deeper

  • The replicator will not¬†acknowledge the primary¬†with a successful write until¬†it was “Quorum Written”.¬†A quorum is floor(n/2)+1¬†where n is the # of replicas in a set. As in the¬†more than half ¬†of the replicas in a set has successfully applied it.
  • ¬†Each operation is logically sequenced. The replication process is out of band (from the perspective o the Primary).¬†As in at in any point of time an internal list¬†is maintained¬†by the¬†primary that indicates¬†operation waiting to be¬†sent to replicators, operations sent to replicators and not yet Acked, operations sent to replicator and Acked.
  • There is no relation between the data structure semantics (queue vs dictionary) and the actual storage/replication. All data are replicated and stored the same way.
  • Given the following pseudo code

Create transaction TX
Update¬†dictionary D (key =¬†K to value of “1”)
commit TX
Create another transaction TX1
Read Dictionary D (Key = K)

What value should you expect when you read the dictionary the second time? most probably whatever value K had before updating it. Because remember changes are not applied locally until replicated to the quorum.

Let us bend our minds even more

  • Active Secondary(s) keep pumping data out from replication queue. They don’t depend on replicator to notify for new data. As in a number of Active Secondary(s) can be at different stages of replications. In a perfect case scenario a quorum has to be aligned with the Primary. This effect how you do your reads as we will describe it later.
  • Service Fabric maintains – sort of – configuration version for each replica set. This version changes as the formation of the replica set (i.e. which one currently is the primary). This particular information are communicated to the replica.
  • You can run in a situation where an Active Secondary is a head of the Primary. Given a situation where a Primary failed to apply changes locally before dying and not notifying the caller of successful writes.¬†Or worse¬†Service Fabric lost the write quorum (for example: many nodes of the cluster¬†failed at the same time) and the¬†surviving replicas (of which one will be elected primary)¬†are the slow ones that was not catching up. The details of how this is recovered¬†and communicated to your code (and control you will have on recovery)¬†is the part of the¬†Service Fabric team is working on.¬†¬†Rolling Back, Loss Acceptance,¬†Restore from Backup are all techniques that can be used for this.

What About My Reads?

The default behavior is to direct reads to the Primary replica to ensure consistent reads (i.e. read from the latest most trusted version of data) However you can enable listening on your secondary(s) and redirect reads to them (more on how to here).
As you can tell from the above discussion reads from Secondary(s) can be out of sync reads or a better expression ghost data. The current service interfaces does not allow you to know how far ahead or far behind the current replica is from the Primary.

The Copy Process

The scenarios above discuss what happens when writes occur from Primary to Active Secondary(s). But what happen when Service Fabric needs to bring one a Secondary up to speed (i.e. synchronize it). The Secondary in question can be new (i.e. service fabric replacing a dead one on a dead node or moving an existing one to a new node due to resource rebalance). Obviously using the replication mechanism is an over kill. Instead Service Fabric allows the Secondary to communicate to the primary its marker (the last sequence it applied internally) the Primary can then send back the delta operations for the secondary to apply. Once the secondary is brought up to speed it can follow the normal replication process.

The Fabric Replicator

Up until this moment we have talked about a “Replicator” with further details. Service Fabric comes with “Fabric Replicator” as the default replicator, this is surfaced to you partially in the “StateManager” member of your stateful service which holds reference to the replicator. If you tinker inside the code you will notice that there is a process that¬†couples your service to a state provider / manager (in IStatefulServiceReplica interface) meaning in theory you can build your own replicator which as you can tell is quite complex.

So What Does This Means For Me?

The below is various discussion points on multiple design aspects when it comes to replicas/replica sets.

How Fast are My Writes?

You have to understand that the fastest is S/A + time -on-slowest-Network (R) + D/A  + Ack. Where
  • S/A is the time of serialization & local calls to the replicator at the Primary.
  • Slowest network to the replica (those of the quorum, or the slowest of fastest responders who happened to be most updated secondary(s)).
  • D/A is the time of deserialization & calls to the replicator¬†at the secondary.
  • Ack time (the way back).

Large # of Replicas vs Small # of Replicas

It is really tempting to go with large # of replicas. But keep in mind¬†“How fast are my writes?” discussion above & the fact that write quorum are floor(r/2) + 1. The larger your replica set the larger your quorum the slower your writes will be. I would go with the smallest possible replica set per partition.

But really, When Should I Go With Larger # of Replica in my Replica Set?

If you have a system where reads are far more than writes, and the impact of ghost reads is negligible to your business then it might make sense to go with large # of replicas (and allowing read on secondary(s)).
As a side note, replica listening address resolution mechanism (more here) does not keep track of your calls. So you probably want to do simple load balancing (for example round/robin across all secondary replicas in a partition).

Large Objects (Data Fragment) vs. Small Objects

¬†How big your objects (those entries in reliable dictionaries and queues) should be? well consider the following. The sequential nature of replication, the fact that replicas live on different machines (network overhead). you will¬†end up with “smaller is better“.
A data fragment that is a 500K big, it is not only slow in replication to the quorum. it will also slow the line of pending replications to replicas (trying to catch up). Not only that it will slow the Copy Process hence slowing down the recovery and resource re-balancing processes.
Consider techniques of breaking down your data into smaller chunks (writes) and write them in one transaction.
Also note that reliable collections persist in memory (for primary and Active Secondary(s)) as well (that is how data structure semantics queue vs dictionary are maintained on a natural replicator). That means your entire state lives in memory of your cluster nodes.

Epilog

As you can tell the mechanics of Service Fabric internals are quite complex yet an understanding of how it work internally is important to building high performance Service Fabric applications.

till next time

@khnidk

Writing a Non-Locking Throttling Component

Prelog

Throttling components, also known as flood gates, latches or just gates have variety of use cases. On top of my head, I can think of:

  1. SaaS software that charges customer per call/quota of calls to a certain component.
  2. Protecting a resource, for example an old system that can handle up to X calls in a certain period.
  3. Some uses it systems such as DOS detection (if X calls to resource Y from Z caller > threshold) then flag then throttle.

If you have  requirements for a throttling component I would recommend checking if Azure API Management fits your need before deciding on writing it yourself.

In all cases throttling components are fun to write and are a good brain exercise ūüôā

The Requirements

What are we trying to achieve with¬†a throttling component is “Control the number of calls to a certain resource.” The “why” (such as SaaS scenarios, avoid noisy neighbors, quota-ed calls)¬†part of this requirements is not part of the core requirements.

The Design

Note: The code samples uploaded with this post (check end of post) has multiple implementations using locking and non-locking approaches. However I am here focusing only on non-locking since it is the most interesting.

The idea is to use a circular buffer with size equal to the maximum allowed requests in period.  each entry of the buffer is the ticks of the time the request received. With 2 pointers  head (starting position = 0), tail (starting position = -1) as the following:

buffer

  • Tail represents the first request in a period. So if you want to do¬†max 100 requests¬†every¬†¬†10 seconds, tail will represent the first request received 10 seconds ago.¬†As you receive requests, Tail will race forward¬†stops at the first request in period.
  • For each request: if progressing Head forward 1 step will not make it catch up with Tail (i.e head != tail), then accept and progress forward 1 step. if progressing Head 1 step forward will make it catch up with Tail then return error and don’t progress Head.¬†In essence stop¬†accepting requests¬†every time head and tail meet.
  • As Tail progress forward it may not find a¬†the first request in period (i.e. we didn’t receive the requests in¬†a¬†span longer than our target period). ¬†then set Tail = Head -1 (i.e. reset and start over).
  • Calculating “Retry After”: (i.e.¬†already¬†maxed out¬†on¬†your requests per period quota). The¬†“retry after” time span¬†is the difference between time of Tail + Period (which will be a point in the future) – ¬†time of now.

In a lot of ways think of this as two cars racing on a track, the faster car (requests) starts just ahead of the slower car (period start time), every time the faster car catch up with the slower car (full circle) it stops giving the slower car a chance to advance.

The Code

using System;
using System.Diagnostics;
using System.Threading;

namespace Throttling
{
    internal class MultiNoLock
    {
        // these values are driven from config
        private readonly static TimeSpan Period           = TimeSpan.FromSeconds(5);
        private readonly static int maxReqAllowedInPeriod = 100;
        private readonly static long ticksInPeriod        = Period.Ticks;
        private readonly static int totalRequestsToSend   = 1000 * 1000;
        private readonly static int numOfThreads          = 10; // think of it as # of concurrent Requests

        // these values are per every throttling component
        private long[] allTicks = new long[maxReqAllowedInPeriod];
        private int head = 0; // head
        private int tail = -1; // tail

        // wait for all threads
        private CountdownEvent cde = new CountdownEvent(numOfThreads);

        // this will slow the console.write down.
        // but will have no effect on the actual values
        // if you want you can remove the console.write and write to stdout or stderr
        // and work with the values
        private AutoResetEvent _ConsoleLock = new AutoResetEvent(true);

        private long totalProcessingTicks = 0;
        private int  totalSucessfulReq    = 0;

        private void runThread(object oThreadNum)
        {
            // all the console.SetCursorPosition are meant for clear output. 

            int threadNum = (int)oThreadNum;
            Stopwatch sp = new Stopwatch();
            var startingLine = threadNum * 6;

            var reqForThisThread = totalRequestsToSend / numOfThreads;

            _ConsoleLock.WaitOne();
            Console.SetCursorPosition(1, 0 + startingLine); // aka first
            Console.WriteLine(string.Format("Thread # {0}:", threadNum));
            _ConsoleLock.Set();

            for (var i = 0; i &amp;lt;= reqForThisThread; i++)
            {
                _ConsoleLock.WaitOne();
                    Console.SetCursorPosition(1, 1 + startingLine);
                    Console.WriteLine(string.Format("head/tail: {0}/{1}   ", head,tail));

                    Console.SetCursorPosition(1, 2 + startingLine);
                    Console.WriteLine(string.Format("Total Calls: {0}   ", i));
                _ConsoleLock.Set();

                long TryAfterTicks = 0;

                sp.Start();
                    var bShouldDoIt = ShouldDoIt(ref TryAfterTicks);
                var ElabsedTicks = sp.ElapsedTicks;

                Interlocked.Add(ref totalProcessingTicks, ElabsedTicks);

                if (bShouldDoIt)
                    Interlocked.Increment(ref totalSucessfulReq);

                sp.Stop();
                sp.Reset();

                _ConsoleLock.WaitOne();
                Console.SetCursorPosition(1, 5 + startingLine);
                Console.WriteLine(string.Format("Process Ticks: {0}  ", ElabsedTicks));
                Debug.WriteLine(string.Format("Process Ticks: {0}  ", ElabsedTicks));
                _ConsoleLock.Set();

                _ConsoleLock.WaitOne();
                Console.SetCursorPosition(1, 3 + startingLine);
                if (bShouldDoIt)
                    Console.WriteLine(string.Format("Should do it: {0} \t\t\t\t\t\t ", bShouldDoIt));
                else
                    Console.WriteLine(string.Format("Should do it: {0}  Try After {1} Milliseconds", bShouldDoIt, TryAfterTicks / TimeSpan.TicksPerMillisecond));

                Console.SetCursorPosition(1, 4 + startingLine);
                Console.WriteLine(string.Format("current minute/secound {0}/{1}", DateTime.Now.Minute, DateTime.Now.Second));
                _ConsoleLock.Set();

                //random sleep, so we wouldn't get uniform values (in total ticks &amp;amp; counts).
                // max of 100 ms wait

                Thread.Sleep(TimeSpan.FromMilliseconds((new Random()).Next(1, 10) * 10));
            }
            cde.Signal();
        }
        public void Run()
        {
            Thread[] threads = new Thread[numOfThreads];
            for (var i = 0; i &amp;lt; numOfThreads; i++)             {                 threads[i] = new Thread(this.runThread);                 threads[i].Start(i);             }             cde.Wait();             // although i am printing average, you should look for max.              // some requets too much to identify go/no go decision (i.e 10ms) (with 50 thread) (time waiting for lock release)             // if you are doing SLA (or similar commitment) you will be in trouble             Debug.WriteLine("Threads {0} - Averging Processing is @ {1} ms per request" ,threads.Length, totalProcessingTicks / TimeSpan.TicksPerMillisecond/  totalSucessfulReq  );         }         private void setTail(long nowTicks)         {             var newTTail = 0;             // only first call             if (-1 == tail)             {                 Interlocked.Exchange(ref tail, newTTail);                 Interlocked.Exchange(ref allTicks[newTTail], nowTicks);                 return;             }             long ticksAfter = (nowTicks - ticksInPeriod); // we are not really intersted                                                            // in anything older than the period             var newtail = tail;             var currenttail = newtail;                          // no need to trim if current tail is within period             if (allTicks[newtail] &amp;gt;= ticksAfter)
                return;// tail is good

            while (true)
            {
                if (newtail == maxReqAllowedInPeriod - 1)
                    newtail = 0;
                else
                    newtail++;

                // all entries are really old, and a previous thread didn't just set the value
                if (newtail == currenttail &amp;amp;&amp;amp; allTicks[newtail] &amp;lt; ticksAfter)                 {                     // reset place the tail just one place before the head.                     newtail = (head == 0) ? maxReqAllowedInPeriod - 1 : head - 1;                     Interlocked.Exchange(ref tail,  newtail);                     Interlocked.Exchange(ref allTicks[newtail], nowTicks);                     return;                 }                 // if there are two threads competing for tails                 // by definition one of them will be following the other in the loop                 // this below gurantees that one will set the value for the other                               // is a good tail?                 if (allTicks[newtail] &amp;gt;= ticksAfter)
                {
                    Interlocked.Exchange(ref tail, newtail);
                    return;
                }
            }
        }

        private bool _ShouldDoIt(long nowTicks)
        {
            /*
            as a rule of thumb if your variables assignment is less than 64bit on a 64bit
            machine, then using the = operator should be fine for atomic operation

            hence long = long on a 64 machine is an atomic operation and does not require Interlocked.
            however if you are doing long to long assignment on a 32bit machine then this is
            not an atomic assignment and will require interlocked. 

            the below uses interlocked everywhere to avoid confusion
            */
            setTail(nowTicks);

            var currentHead = 0;
            Interlocked.Exchange(ref currentHead, head);

            if (
            // meeting some where not at the begining of the track.
                ( (currentHead &amp;lt; maxReqAllowedInPeriod - 1)  &amp;amp;&amp;amp; currentHead + 1  == tail )
                ||
            // meeting at the begining of the track
                (0 == tail &amp;amp;&amp;amp; (currentHead == maxReqAllowedInPeriod - 1))
               )
                return false;
            // there is a slim possiblity that head has incremented
            // in this case the increment will be lost. it is very slim chance but possible
            // in the majority of cases this should be acceptable

            if (currentHead &amp;lt; maxReqAllowedInPeriod - 1)
            {
                currentHead++;
                Interlocked.Exchange(ref head, currentHead);
            }
            else
            {
                currentHead = 0;
                Interlocked.Exchange(ref head, 0);
            }

            Interlocked.Exchange(ref allTicks[currentHead], nowTicks);

            return true;
        }
        public bool ShouldDoIt(ref long TryAfterTicks)
        {
            long nowTicks = DateTime.Now.Ticks;
            bool bOk = _ShouldDoIt(nowTicks);
            if (!bOk) // you might get skewed results for TryAfter, since tail might be adjusted
                TryAfterTicks = (allTicks[tail] + ticksInPeriod) - nowTicks;
            return bOk;
        }
    }
}

Notes on the Code

  • Language: The code is written in C# but I have tried to go as agonistic as possible. the same code can be done¬†in C++ or other language on Windows (Interlocked.XXX functions are¬†in fact win32 functions check here).¬†For Linux family of OSs look¬†for¬†how these are¬†surfaced into your SDK/Language.
  • Memory Consumption: The code consumes 8 bytes for every request tracked (ticks are long 8 bytes datatypes). This can be significant if you are working with a low memory system such as Windows on Raspberry PI/2 or you are tracking maximum requests per caller. For example tracking a max of 10,000 requests for max of 1000¬†concurrent¬†callers will result into 10,000 * 1000 * 8 = ¬†¬†76 megs just for this component to operate successfully. You can cut this by half if you absolute ticks to last 1 hour only¬†or shorter period (hence using ints 4 bytes instead of longs 8 bytes).
  • Limitation: You can not dynamically change max # of allowed requests in period without locking (or creating a new¬†circular buffer with the new size¬†then cleverly jumping the pointers to the right position in the new buffer).
  • Accuracy: There is a slight chance of not counting requests (shouldn’t be a lot of requests since it is in between¬†convective lines of code) between calculating head and setting it. please check the code comments for more.

Why not Locking

Locking in itself is not bad. It protects resources and allows you to do fascinating stuff. in a low concurrency scenarios locking is absolutely fine. Because each request will wait for one or two requests to complete adjusting the in memory data structure representing the requests. As a matter of fact if you tried locking and non-locking implementation with low concurrency they will perform about the same.

But if you are dealing with high concurrency scenarios locking is bad and will lead requests waiting for prolonged amount of time just to adjust in memory data structure (in addition to whatever I/O you need to do to complete the request successfully).

Multi Server Environment

Unless you are still some where in the 1980’s you will need to run this code on multiple server (for load balancing and availability). For the sake of discussion let us imagine¬†you need to run this on a load balanced web node. the scenario of choice will depend on some trade offs.

High Accuracy &&  Low Resilience && No Session Affinity

For servers that use round/robin load distribution you can assume max allowed requests per node= (total allowed requests/# of nodes) for requests not tracked per caller. If you are doing tracking per caller, Then you will have to do RPC into other servers to get total # of requests processed for the caller then update internal counters before validation. In this scenario resetting a node will reset its counters.

High Accuracy &&  High Resilience && No Session Affinity

This scenario is similar to the previous, except all servers maintain a single state in a backing storage. Hence their will be no need for RPCing into other servers just the backing store.

Load Distribution with Session Affinity

Session affinity is bad. And you should only use it only when you must use it. I don’t generally favor absolutes, but this is one of the¬†few I do.¬†If you have to do session affinity then state will be in a single server, weather to save it in backing storage or not will depend if you need it to survive server restarts.

Notes on Selecting Backing Storage

Favor storage component that can:

  1.  Perform read, modify, write in a single operation (There are few of those. The list will surprise you). Should make your life easier trying to figure out if head and tails have moved while you were reading them.
  2. Storage components that can be partitions, since state is saved per caller, there is no point to have lock contention for a caller shared storage.

Relaxed Accuracy Approach

Some systems can have relaxed accuracy. This is done by continuously analyzing logs asynchrony (written for each request). Then decide and maintain in memory gate lock. These systems tend to have longer period (usually longer than few minuets). Because¬†processing a request and¬†using it to calculate if the gate should be locked is not immediate these systems typically don’t have high accuracy (typically used for noisy neighbors avoidance scenarios).

Epilog

The code is published here: https://github.com/khenidak/throttling have fun with it.

@khnidk

Azure Service Fabric: Learning Materials & Literature

Bookmark this page, I will keep it updated with hand-picked Service Fabric learning materials & literature

Added on August 4th 2015

 

@khnidk