On Italian Restaurants, Mafia Bosses & Web Sockets (Part 1)

**Part 2 published here**

Prelog

Consider your 50’s movie style Italian restaurant with one of those comically usually angry chefs. The restaurant kitchen operation is really simple. He – the chef – can cook many different menu items concurrently. But he can only cook one – and only one – plate/meal of each menu item at a time. He is a fair chef, orders cooked in first ordered, first served style. He cannot interrupt the cooking of menu item A for customer C1 for the same menu item A for customer C2. C1’s meal will never be nicely cooked and as a food respecting chef he will never allow that to happen. Now enter a mafia boss , who walks in – queue in the God Father theme music – the chef has to place whatever the mafia boss orders in front of the previously queued orders (come on it is his life on the line). Because the boss is also a food respecting person he will never ask the chef to drop currently cooking meals for his. He just asks for a place in front of the queue.

Aside from the theatrics the above analogy is commonly found in client/server communication (even when they are full duplex) including Web Sockets. Messages flow in order (per each session/client) with the ability to place one message in front of the queue (think of it as a high priority message). For example consider a very simple banking scenario where client sends the following (starting account balance is a 100$):

  1. Withdraw 50 (Account Balance: 50$).
  2. Deposit 100 (Account Balance: 150$).
  3. Withdraw 150 (Account Balance: 0$).

If message 2 and 3 came out of order the operation will fail although logically from the customer point he has the fund to execute the operation. Worse if you stopped one operation – in the middle of execution – for another you will end up with an account in an “unknown state”.

Detailing the Problem

The upcoming second part of this post I will focus more on the Web Sockets server implementation (and yes you guess it right it will be Azure Service Fabric friendly). But for now let us focus on solving the above problem.

The traditional route to solve this problem is to create producer/consumer queue-like data structure. Low priority messages placed at the end of the queue, high priority ones are placed in front of the queue.  Having a loop as a consumer picking up messages. As the following

SingleQueue

 

//Click to Enlarge

While this approach will work it will have the following problems (though you can hack your code to solve them, your code base will be a bit too complex to maintain).

  1. Ensure Fairness: One messaging/web socket session might overload the queue with large number of messages, the loop will be forced to stream them all down which means other sessions will have to wait. You can solve this by having multiple queues + multiple loops.
  2. Sent Notification: Most of messaging systems maintain an-after “send” journal-ing or logging operation. You can have this log as part of your loop (but then your other sessions will wait for send + logging).
  3. Wait Time: Your loop will implement pull > found new messages > execute or pull > not found sleep sequence (or a similar paradigm). Which means the loop uses CPU even when there are no messages to process and the minimum wait time for a message (in a free queue) is equal to your sleep time + de-queue time.

The Solution

Using a .NET Custom Task Scheduler (details here) as the following :

Note: the term message and task used below in an inter exchangeable fashion.

  • The Scheduler sets on top of N * 2 queues (where N is the number of active messaging sessions, each session will have a high priority queue and low/normal priority queue).
  • The scheduler ensures that there is only one active thread working per messaging session and this thread executes one task at a time. While it will allow multiple threads working on different sessions concurrently. The worker thread de-queues high priority messages first then low priory messages.
  • The scheduler ensures that threads yields execution to other queues after a configurable number of messages. This ensures fairness among queues even under high pressure where all threads of the thread pool are in use. This only happens if we have multiple active messaging sessions. If we have only one active session then the scheduler will ensure it is drained in one go.
  • The Scheduler does not have a loop, it en queue work items in .NET thread pool only when there is work to do (to solve the wait/sleep time discussed above).
  • The scheduler uses .NET’s Task Class derivatives which means you can use c# await construct, task continuation, task chaining just as regular tasks (to elegantly solve the after-send processing/notification problem discussed above).
  • The scheduler has a “remove queue” method, allow you to remove a queue from the sessions (it drains then remove the queue).

The implementation

First we need to have a “Leveled Class” that is derived from Task class with 2 additions:

  1. Queue Id: Used to help the scheduler to identify which queue this task should be en-queued.
  2. Is High Priority: Flag Used to identify if the task should go in front of every other exiting task in the queue (except the currently executing task).
 public class LeveledTask : Task
    {
        public string QueueId = string.Empty;
        public bool IsHighPriority = false;
       // ... CTORs for base class 
     }

Then we need the scheduler itself which goes like that

   public class SequentialLeveledTaskScheduler : TaskScheduler
    {

        private ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>> m_HiQueues
            = new ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>>();

        private ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>> m_LowQueues
        = new ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>>();

        private ConcurrentDictionary<string, object> MasterQueueLocks
            = new ConcurrentDictionary<string, object>();


        public const int DefaulltMinTaskBeforeYield = 10;
        public const int DefaultMaxTaskBeforeYield  = 50;

        private int m_MaxTaskBeforeYield = DefaulltMinTaskBeforeYield;

        public int MaxTaskBeforeYield
            {
            set {
                if (value > DefaultMaxTaskBeforeYield)
                {
                    m_MaxTaskBeforeYield = DefaultMaxTaskBeforeYield;
                    return;
                }
                if (value < DefaulltMinTaskBeforeYield)
                {
                    m_MaxTaskBeforeYield = DefaulltMinTaskBeforeYield;
                    return;
                }

                m_MaxTaskBeforeYield = value;
            }
                get { return m_MaxTaskBeforeYield; }
            }

        public void AddQueue(string QueueId)
        {
            var bHighPrority = true;
            var bAddIfNotExist = true;

            GetOrAddQueue(QueueId, bHighPrority, bAddIfNotExist);
            GetOrAddQueue(QueueId, !bHighPrority, bAddIfNotExist);
        }



        public  IEnumerable<Task> GetScheduledTasks(string QueueId)
        {
            var hQ = GetOrAddQueue(QueueId, true, false);
            var lQ = GetOrAddQueue(QueueId, false, false);

            if (null == hQ || null == lQ)
                return null;

            var masterList = new List<Task>();

            masterList.AddRange(hQ);
            masterList.AddRange(lQ);
            

            return masterList;
        }


        private ConcurrentQueue<LeveledTask> GetOrAddQueue(string QueueId, bool isHighPriority, bool addIfNotExist = true)
        {
            if (addIfNotExist)
            {
                var hiQueue = m_HiQueues.GetOrAdd(QueueId, new ConcurrentQueue<LeveledTask>());
                var lowQueue = m_LowQueues.GetOrAdd(QueueId, new ConcurrentQueue<LeveledTask>());

                return (isHighPriority) ? hiQueue : lowQueue;
            }
            else
            {
                var qList = (isHighPriority) ? m_HiQueues : m_LowQueues;
                return qList[QueueId];
            }
        }

        private object GetOrAddLock(string QueueId, bool AddIfNotExist = true)
        {
            if (AddIfNotExist)
            {
                object oNewLock = new object();
                var o = MasterQueueLocks.GetOrAdd(QueueId, oNewLock);
                return o;
            }
            else
            {
                return MasterQueueLocks[QueueId];
            }
            
        }

        public void RemoveQueue(string QueueId)
        {
            LeveledTask lt = new LeveledTask(() =>
            {

                // this will remove the Q from the list of Q
                // but will not null it so if we have an going exection it will just keep on going. 
                // because the list of Q and locks no longer maintain a reference to lQ & HQ and lock
                // it will evantually be null

                Trace.WriteLine(string.Format("queue {0} will be removed", QueueId), "info");

                ConcurrentQueue<LeveledTask> q;
                object oLock;
                
                
                m_HiQueues.TryRemove(QueueId, out q);
                m_HiQueues.TryRemove(QueueId, out q);

                MasterQueueLocks.TryRemove(QueueId, out oLock);

            });

            lt.IsHighPriority = false;
            lt.QueueId = QueueId;
            lt.Start(this);

        }

        

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            var masterList = new List<Task>();

            foreach (var hqueue in m_HiQueues.Values)
                masterList.AddRange(hqueue);

            foreach (var lqueue in m_LowQueues.Values)
                masterList.AddRange(lqueue);
            return masterList;
        }

        protected override void QueueTask(Task task)
        {
            var leveledtask = task  as LeveledTask;
            if (null == leveledtask)
                throw new InvalidOperationException("this leveled sequential scheduler shouldn't be used with regular Task objects"); // bang!


            if (leveledtask.QueueId == null ||
                leveledtask.QueueId == string.Empty)
                throw new InvalidOperationException("Task scheduler received a task that does not have a queue assigned to it"); // bang!

            var Q = GetOrAddQueue(leveledtask.QueueId, leveledtask.IsHighPriority);
            Q.Enqueue(leveledtask);

            ProcessWork(leveledtask.QueueId);   
        }

        private void ProcessWork(string QueueId)
        {

          

            ThreadPool.UnsafeQueueUserWorkItem(w => {



                var oLock = GetOrAddLock(QueueId);

                bool bGotLock = false;
                Monitor.TryEnter(oLock, ref bGotLock);

                var hQ = GetOrAddQueue(QueueId, true);
                var lQ = GetOrAddQueue(QueueId, false);

                if (0 == hQ.Count && 0 == lQ.Count)
                    return; // was already completed.

                if (!bGotLock) // a thread from the thread pool is already looping on the Q. 
                {
                    //Trace.WriteLine(string.Format("Scheduler attempt to acquire lock on {0} and failed", QueueId), "info");
                    return;    // at any point of time ONLY one thread is allwoed to dequeue on the Q to enable order tasks
                }


                var ExecutedTasks = 0;
                
                
                while (
                            // should yield
                            ExecutedTasks <= m_MaxTaskBeforeYield || // don't yeild if we have only one queue. (ExecutedTasks > m_MaxTaskBeforeYield  && m_HiQueues.Count + m_LowQueues.Count == 2)  || 
                            // don't yeild if this queue has been removed, drain it before dropping the reference. 
                            (ExecutedTasks > m_MaxTaskBeforeYield && (!m_HiQueues.ContainsKey(QueueId) && !m_LowQueues.ContainsKey(QueueId) ) ) 
                      ) 
                             
                {


                    if (ExecutedTasks > m_MaxTaskBeforeYield && (!m_HiQueues.ContainsKey(QueueId) && !m_LowQueues.ContainsKey(QueueId)))
                        Trace.WriteLine(string.Format("Queue {0} has been removed. Draining.. (remaining {1} tasks)", QueueId, lQ.Count + hQ.Count), "info");


                    LeveledTask leveledTask = null;
                    var bFound = false;

                    // found in High Priority Queue
                    bFound = hQ.TryDequeue(out leveledTask);

                    // found in Low Priority Queue
                    if (!bFound && null == leveledTask)
                        lQ.TryDequeue(out leveledTask);

                    if (!bFound && null == leveledTask) //nothing here to work on
                        break;
                    //{
                    //Trace.WriteLine(string.Format("faild! count {0}/{1} queue {2}", hQ.Count, lQ.Count, QueueId), "info");
                    ///break;
                    //}

                    try
                    {
                        base.TryExecuteTask(leveledTask);
                    }
                    catch (Exception e)
                    {
                        
                        //TODO: check if we need to call unobserved exceptions
                        Trace.WriteLine(string.Format("Task Executer: Error Executing Task {0} {1}", e.Message, e.StackTrace), "error");
                    }

                    ExecutedTasks++;

                }

                if (0 == ExecutedTasks) // we were unsucessfull picking up tasks 
                    Trace.WriteLine(string.Format("Scheduler attempted to execute on queue {0} with count {1} and found 0 tasks", QueueId, lQ.Count + hQ.Count), "info");


                Monitor.Exit(oLock);
             

                

                if ((ExecutedTasks > m_MaxTaskBeforeYield && hQ.Count + lQ.Count > 0))
                {

                    // current thread is about to be released back to the pool (and we still have more as we yielded).
                    // call it back to ensure that remaining tasks will be executed (even if no more tasks are sent to the scheduler). 
                    Trace.WriteLine(string.Format("Queue {0} yielded thread {1} after {2} tasks",
                                                   QueueId,
                                                   Thread.CurrentThread.ManagedThreadId,
                                                   ExecutedTasks - 1));

                    Task.Run(() => { ProcessWork(QueueId);} );
                }



            }, null);
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // if tasks got inlined it will lose this position in the queue.
            // so we can not afford inline tasks here
            return false;
        }
    }

The magic happens in the following methods

QueueTask(Task task)

Where the task is validated, destination queue (per session and per priority) is identified then the task is en-queued and ProcessWork method is called.

ProcessWork(string QueueId)

This method en-queues a delegate in the thread pool as a work item. When the delegates executes it checks if there is a currently active worker on the queue if there is one then it returns. Then it keeps pulling messages (high priority then low priority) until it has to yield (With some special condition regarding draining the queue as discussed above). If it has yielded the execution, it creates a new task of just another call to ProcessWork(). This ensure that execution will keep going even when QueueTask is not being called.

A couple of additional notes, The scheduler does not support inline execution (because this will make us lose the order of tasks). The rest of class implementation is maintenance and management to the memory structures used by the scheduler instance.

From the client point of view (this could be your messaging server, in my case it is a Web Sockets server) a message is sent this way

 LeveledTask lt = new LeveledTask(() =&gt;
            {
             // the magic goes here (your send logic)
            });


            lt.QueueId = q; // queue is my messaging session id
            lt.IsHighPriority = IsHighPrority; 
            lt.Start(scheduler); // scheduler is an instance of the scheduler class. 
// I can now use lt to do await, or ContinueWith just like Task class. 
// Note: un observed exceptions will go to TaskScheduler.UnobservedTaskException as usual.

Epilog

This was part 1 of two parts post covering Web Sockets Server Implementation. This part is focused on the scheduler (which in a lot of ways is a stand alone component). Part 2 will cover the server implementation. I should be able to post part 2 next week. Once I do, I will upload the code along with unit tests to git hub.

till next time!

Questions/Comments
@khnidk