Writing a Non-Locking Throttling Component

Prelog

Throttling components, also known as flood gates, latches or just gates have variety of use cases. On top of my head, I can think of:

  1. SaaS software that charges customer per call/quota of calls to a certain component.
  2. Protecting a resource, for example an old system that can handle up to X calls in a certain period.
  3. Some uses it systems such as DOS detection (if X calls to resource Y from Z caller > threshold) then flag then throttle.

If you have  requirements for a throttling component I would recommend checking if Azure API Management fits your need before deciding on writing it yourself.

In all cases throttling components are fun to write and are a good brain exercise 🙂

The Requirements

What are we trying to achieve with a throttling component is “Control the number of calls to a certain resource.” The “why” (such as SaaS scenarios, avoid noisy neighbors, quota-ed calls) part of this requirements is not part of the core requirements.

The Design

Note: The code samples uploaded with this post (check end of post) has multiple implementations using locking and non-locking approaches. However I am here focusing only on non-locking since it is the most interesting.

The idea is to use a circular buffer with size equal to the maximum allowed requests in period.  each entry of the buffer is the ticks of the time the request received. With 2 pointers  head (starting position = 0), tail (starting position = -1) as the following:

buffer

  • Tail represents the first request in a period. So if you want to do max 100 requests every  10 seconds, tail will represent the first request received 10 seconds ago. As you receive requests, Tail will race forward stops at the first request in period.
  • For each request: if progressing Head forward 1 step will not make it catch up with Tail (i.e head != tail), then accept and progress forward 1 step. if progressing Head 1 step forward will make it catch up with Tail then return error and don’t progress HeadIn essence stop accepting requests every time head and tail meet.
  • As Tail progress forward it may not find a the first request in period (i.e. we didn’t receive the requests in a span longer than our target period).  then set Tail = Head -1 (i.e. reset and start over).
  • Calculating “Retry After”: (i.e. already maxed out on your requests per period quota). The “retry after” time span is the difference between time of Tail + Period (which will be a point in the future) –  time of now.

In a lot of ways think of this as two cars racing on a track, the faster car (requests) starts just ahead of the slower car (period start time), every time the faster car catch up with the slower car (full circle) it stops giving the slower car a chance to advance.

The Code

using System;
using System.Diagnostics;
using System.Threading;

namespace Throttling
{
    internal class MultiNoLock
    {
        // these values are driven from config
        private readonly static TimeSpan Period           = TimeSpan.FromSeconds(5);
        private readonly static int maxReqAllowedInPeriod = 100;
        private readonly static long ticksInPeriod        = Period.Ticks;
        private readonly static int totalRequestsToSend   = 1000 * 1000;
        private readonly static int numOfThreads          = 10; // think of it as # of concurrent Requests

        // these values are per every throttling component
        private long[] allTicks = new long[maxReqAllowedInPeriod];
        private int head = 0; // head
        private int tail = -1; // tail

        // wait for all threads
        private CountdownEvent cde = new CountdownEvent(numOfThreads);

        // this will slow the console.write down.
        // but will have no effect on the actual values
        // if you want you can remove the console.write and write to stdout or stderr
        // and work with the values
        private AutoResetEvent _ConsoleLock = new AutoResetEvent(true);

        private long totalProcessingTicks = 0;
        private int  totalSucessfulReq    = 0;

        private void runThread(object oThreadNum)
        {
            // all the console.SetCursorPosition are meant for clear output. 

            int threadNum = (int)oThreadNum;
            Stopwatch sp = new Stopwatch();
            var startingLine = threadNum * 6;

            var reqForThisThread = totalRequestsToSend / numOfThreads;

            _ConsoleLock.WaitOne();
            Console.SetCursorPosition(1, 0 + startingLine); // aka first
            Console.WriteLine(string.Format("Thread # {0}:", threadNum));
            _ConsoleLock.Set();

            for (var i = 0; i <= reqForThisThread; i++)
            {
                _ConsoleLock.WaitOne();
                    Console.SetCursorPosition(1, 1 + startingLine);
                    Console.WriteLine(string.Format("head/tail: {0}/{1}   ", head,tail));

                    Console.SetCursorPosition(1, 2 + startingLine);
                    Console.WriteLine(string.Format("Total Calls: {0}   ", i));
                _ConsoleLock.Set();

                long TryAfterTicks = 0;

                sp.Start();
                    var bShouldDoIt = ShouldDoIt(ref TryAfterTicks);
                var ElabsedTicks = sp.ElapsedTicks;

                Interlocked.Add(ref totalProcessingTicks, ElabsedTicks);

                if (bShouldDoIt)
                    Interlocked.Increment(ref totalSucessfulReq);

                sp.Stop();
                sp.Reset();

                _ConsoleLock.WaitOne();
                Console.SetCursorPosition(1, 5 + startingLine);
                Console.WriteLine(string.Format("Process Ticks: {0}  ", ElabsedTicks));
                Debug.WriteLine(string.Format("Process Ticks: {0}  ", ElabsedTicks));
                _ConsoleLock.Set();

                _ConsoleLock.WaitOne();
                Console.SetCursorPosition(1, 3 + startingLine);
                if (bShouldDoIt)
                    Console.WriteLine(string.Format("Should do it: {0} \t\t\t\t\t\t ", bShouldDoIt));
                else
                    Console.WriteLine(string.Format("Should do it: {0}  Try After {1} Milliseconds", bShouldDoIt, TryAfterTicks / TimeSpan.TicksPerMillisecond));

                Console.SetCursorPosition(1, 4 + startingLine);
                Console.WriteLine(string.Format("current minute/secound {0}/{1}", DateTime.Now.Minute, DateTime.Now.Second));
                _ConsoleLock.Set();

                //random sleep, so we wouldn't get uniform values (in total ticks & counts).
                // max of 100 ms wait

                Thread.Sleep(TimeSpan.FromMilliseconds((new Random()).Next(1, 10) * 10));
            }
            cde.Signal();
        }
        public void Run()
        {
            Thread[] threads = new Thread[numOfThreads];
            for (var i = 0; i < numOfThreads; i++)             {                 threads[i] = new Thread(this.runThread);                 threads[i].Start(i);             }             cde.Wait();             // although i am printing average, you should look for max.              // some requets too much to identify go/no go decision (i.e 10ms) (with 50 thread) (time waiting for lock release)             // if you are doing SLA (or similar commitment) you will be in trouble             Debug.WriteLine("Threads {0} - Averging Processing is @ {1} ms per request" ,threads.Length, totalProcessingTicks / TimeSpan.TicksPerMillisecond/  totalSucessfulReq  );         }         private void setTail(long nowTicks)         {             var newTTail = 0;             // only first call             if (-1 == tail)             {                 Interlocked.Exchange(ref tail, newTTail);                 Interlocked.Exchange(ref allTicks[newTTail], nowTicks);                 return;             }             long ticksAfter = (nowTicks - ticksInPeriod); // we are not really intersted                                                            // in anything older than the period             var newtail = tail;             var currenttail = newtail;                          // no need to trim if current tail is within period             if (allTicks[newtail] >= ticksAfter)
                return;// tail is good

            while (true)
            {
                if (newtail == maxReqAllowedInPeriod - 1)
                    newtail = 0;
                else
                    newtail++;

                // all entries are really old, and a previous thread didn't just set the value
                if (newtail == currenttail && allTicks[newtail] < ticksAfter)                 {                     // reset place the tail just one place before the head.                     newtail = (head == 0) ? maxReqAllowedInPeriod - 1 : head - 1;                     Interlocked.Exchange(ref tail,  newtail);                     Interlocked.Exchange(ref allTicks[newtail], nowTicks);                     return;                 }                 // if there are two threads competing for tails                 // by definition one of them will be following the other in the loop                 // this below gurantees that one will set the value for the other                               // is a good tail?                 if (allTicks[newtail] >= ticksAfter)
                {
                    Interlocked.Exchange(ref tail, newtail);
                    return;
                }
            }
        }

        private bool _ShouldDoIt(long nowTicks)
        {
            /*
            as a rule of thumb if your variables assignment is less than 64bit on a 64bit
            machine, then using the = operator should be fine for atomic operation

            hence long = long on a 64 machine is an atomic operation and does not require Interlocked.
            however if you are doing long to long assignment on a 32bit machine then this is
            not an atomic assignment and will require interlocked. 

            the below uses interlocked everywhere to avoid confusion
            */
            setTail(nowTicks);

            var currentHead = 0;
            Interlocked.Exchange(ref currentHead, head);

            if (
            // meeting some where not at the begining of the track.
                ( (currentHead < maxReqAllowedInPeriod - 1)  && currentHead + 1  == tail )
                ||
            // meeting at the begining of the track
                (0 == tail && (currentHead == maxReqAllowedInPeriod - 1))
               )
                return false;
            // there is a slim possiblity that head has incremented
            // in this case the increment will be lost. it is very slim chance but possible
            // in the majority of cases this should be acceptable

            if (currentHead < maxReqAllowedInPeriod - 1)
            {
                currentHead++;
                Interlocked.Exchange(ref head, currentHead);
            }
            else
            {
                currentHead = 0;
                Interlocked.Exchange(ref head, 0);
            }

            Interlocked.Exchange(ref allTicks[currentHead], nowTicks);

            return true;
        }
        public bool ShouldDoIt(ref long TryAfterTicks)
        {
            long nowTicks = DateTime.Now.Ticks;
            bool bOk = _ShouldDoIt(nowTicks);
            if (!bOk) // you might get skewed results for TryAfter, since tail might be adjusted
                TryAfterTicks = (allTicks[tail] + ticksInPeriod) - nowTicks;
            return bOk;
        }
    }
}

Notes on the Code

  • Language: The code is written in C# but I have tried to go as agonistic as possible. the same code can be done in C++ or other language on Windows (Interlocked.XXX functions are in fact win32 functions check here). For Linux family of OSs look for how these are surfaced into your SDK/Language.
  • Memory Consumption: The code consumes 8 bytes for every request tracked (ticks are long 8 bytes datatypes). This can be significant if you are working with a low memory system such as Windows on Raspberry PI/2 or you are tracking maximum requests per caller. For example tracking a max of 10,000 requests for max of 1000 concurrent callers will result into 10,000 * 1000 * 8 =   76 megs just for this component to operate successfully. You can cut this by half if you absolute ticks to last 1 hour only or shorter period (hence using ints 4 bytes instead of longs 8 bytes).
  • Limitation: You can not dynamically change max # of allowed requests in period without locking (or creating a new circular buffer with the new size then cleverly jumping the pointers to the right position in the new buffer).
  • Accuracy: There is a slight chance of not counting requests (shouldn’t be a lot of requests since it is in between convective lines of code) between calculating head and setting it. please check the code comments for more.

Why not Locking

Locking in itself is not bad. It protects resources and allows you to do fascinating stuff. in a low concurrency scenarios locking is absolutely fine. Because each request will wait for one or two requests to complete adjusting the in memory data structure representing the requests. As a matter of fact if you tried locking and non-locking implementation with low concurrency they will perform about the same.

But if you are dealing with high concurrency scenarios locking is bad and will lead requests waiting for prolonged amount of time just to adjust in memory data structure (in addition to whatever I/O you need to do to complete the request successfully).

Multi Server Environment

Unless you are still some where in the 1980’s you will need to run this code on multiple server (for load balancing and availability). For the sake of discussion let us imagine you need to run this on a load balanced web node. the scenario of choice will depend on some trade offs.

High Accuracy &&  Low Resilience && No Session Affinity

For servers that use round/robin load distribution you can assume max allowed requests per node= (total allowed requests/# of nodes) for requests not tracked per caller. If you are doing tracking per caller, Then you will have to do RPC into other servers to get total # of requests processed for the caller then update internal counters before validation. In this scenario resetting a node will reset its counters.

High Accuracy &&  High Resilience && No Session Affinity

This scenario is similar to the previous, except all servers maintain a single state in a backing storage. Hence their will be no need for RPCing into other servers just the backing store.

Load Distribution with Session Affinity

Session affinity is bad. And you should only use it only when you must use it. I don’t generally favor absolutes, but this is one of the few I do. If you have to do session affinity then state will be in a single server, weather to save it in backing storage or not will depend if you need it to survive server restarts.

Notes on Selecting Backing Storage

Favor storage component that can:

  1.  Perform read, modify, write in a single operation (There are few of those. The list will surprise you). Should make your life easier trying to figure out if head and tails have moved while you were reading them.
  2. Storage components that can be partitions, since state is saved per caller, there is no point to have lock contention for a caller shared storage.

Relaxed Accuracy Approach

Some systems can have relaxed accuracy. This is done by continuously analyzing logs asynchrony (written for each request). Then decide and maintain in memory gate lock. These systems tend to have longer period (usually longer than few minuets). Because processing a request and using it to calculate if the gate should be locked is not immediate these systems typically don’t have high accuracy (typically used for noisy neighbors avoidance scenarios).

Epilog

The code is published here: https://github.com/khenidak/throttling have fun with it.

@khnidk