On Application Routers/Gateways

Prelog

The discussion below is about Routing & Application Gateways. It is about a component a wrote the code for and is published here https://github.com/khenidak/Router along with the documentation on design & how to use. The below is ranting on how things in this component are made in a certain way.

“Application Gateways are hard to build, but should not be hard to use”

Why Build an application Gateway or a Router

First: Why Application Routers/Gateways?

Your application will have requests coming to your external (or even internal) end points. These requests are not meant to be fulfilled by the application that received the requests but rather via backend of some sort. Most large scale (and especially those that are built for hyper-scale) have this requirement. Take the following examples:

  1. A SaaS application where users’ requests are routed to specific tenants. These requests might be web pages, Web Sockets, REST API Calls or even TCP/IP packets.
  2. An application that distribute the loads/requests types among multiple backend systems.
  3. An application that went through massive transformation (take M&A for example scenarios) where external API have changed, you will need a gateway in the middle to upgrade or bridge the protocol without the need to disrupt your existing external clients.
  4. Application Zoning/Containment/Partitioning/Segregation, where connections from the external world are authenticated, authorized then terminated at the gateway. The gateway then establishes connections to the backend systems.
  5. A microservices application deployed on top of a cluster managed by a cluster management platform such as Apache Mesos or Azure Service Fabric will need a gateway. While some examples refer to using 2 tiers (Web UX deployed on all nodes routing to compute in the backend such as Azure Service Fabric’s WordCount (https://github.com/Azure/servicefabric-samples/tree/master/samples/Services/VS2015/WordCount). This approach does not deal with situations where Web UX might be subject to routing per request (take a SaaS application, A/B Testing, or an application that provision mesos jobs or Azure Service Fabric App instances to scale to meet the load or isolation requirements).
  6. Specific platform requirements such as offering Azure Service Fabric Actor framework to external – to the cluster – applications (where Actor clients cannot be used).

And many others.

Is Routing a New Thing?

No, as a matter of fact every time you used a web server internal routing happens on the OS – typically by a kernel component – to route requests to different processes responsible for different address. At a higher level, frameworks such as ASP.NET Web API or WCF preform per request routing (dispatching in case of WCF) to the target controller/method (Service/Interface in case of WCF).

But This Is at the Platform Level, What about the Application Level Are This Requirement New? Is It a Cloud Thing?

No & No. Application gateways has been a thing since forever. I remember building an Http application gateway 1998. And there are 10+ years old products out that perform various parts of Application Gateway logic. The Cloud came in to fulfill “hyper-scale” requirements. Where the application itself can be provisioned multiple times to support load or isolation (hence the stress on routing).

The Problem With Routing

If your requirements are about from point A to point B and both are the same semantics (say Web API) with fairly static URL then you don’t need a custom gateway and I would strongly recommend looking for existing solutions such as Azure API Management (https://azure.microsoft.com/en-us/documentation/articles/api-management-get-started/). Or build a single purpose gateway. The problem is most requirements comes in the following forms:

  • Routing in multitenant solutions

If request is on https://<some-tenante>.<hist>.com/crm route it to http://node:port/<some-tenante>/crmapp (node is backend server) and add custom authorization header, then get the response and put in the original downstream payload while adding a custom header.

  • Routing in multi version and or A/B testing scenarios

If request is on https://www.<host>.com/api/customer and authorization header claim contains usergroup = “default” or usertype != “admin” then route it to http://node:port/api-v2/customer else route to http://node:port/api /customer

  • Routing in microservices like environments

if request is on https://www.<host>.com/api/customer then resolve microservices address list and perform round robin load balancing between them however if the request type is Post or Put or Delete then route only to primary partition (in case of Service Fabric).

  • Routing in protocol bridges

If request came on Web Sockets address ws://www.host.com/sockets/customer then route based to http://node:port/<some-tenante>/api/customer and set Http Method based on messagePayload.Type MT, MT = “add” then Method = Post, MT = “update” method = post etc.

Sounds complex enough? those are typical requirements for an application gateway.

The problem is scary enough but relatively easy to solve. If you split the representing the logic from the actual execution

Routing Logic, Simplifying the Complex

You can easily represent the logic by a linked list where each node represents a condition and/or logic and is only executed if the node.next executed successfully. In my code I called nodes Matcher (not the sexiest name I know). Consider representing them as the following:

 

//pseudo code 

// If request on bing then route it as Get to &lt;a href="http://www.microsoft.com"&gt;http://www.microsoft.com&lt;/a&gt; and add a custom header “CUSTOM_HEADER” with value “Hello, World!”;

var head = new SetMethod(“Get”)

head.chain(new SetAddress(“http://www.microsoft.com”), new AddHeader(“CUSTOM_HEADER, “Hello, World!”), new MatchAddress(“bing”, MatchType.MatchHostNameOnly);

// for more concrete samples and implementation check https://github.com/khenidak/Router

The above code describes routing and processing logic in an easy to understand fashion, and more importantly an easy extend framework. You can extend to add whatever matcher types you want. You will end up with something that looks like this (all images below are from the repo):

matching-frx

But what about my ANDs and ORs? This can also be represented by matchers, consider the following:


// pseudo code 

// If request on bing and user type is “dev” then route it as Get to &lt;a href="http://www.,msdn.com"&gt;http://www.,msdn.com&lt;/a&gt; else &lt;a href="http://microsoft.com"&gt;http://microsoft.com&lt;/a&gt; and add a custom header “CUSTOM_HEADER” with value “Hello, World!”;


// If request on bing then route it as Get to &lt;a href="http://www.microsoft.com"&gt;http://www.microsoft.com&lt;/a&gt; and add custom header “CUSTOM_HEADER” with value “Hello, World!”;

var head = new SetMethod(“Get”)

var msdn = IsUserType(“Dev”)
msdn.chain(new SetAddress(“http://www.msdn.com”))


head.chain(
   new OrMatcher(
              msdn
              ,
             new SetAddress(“http://www.microsoft.com”)
             ),
   new AddHeader(“CUSTOM_HEADER, “Hello, World!”),
   new MatchAddress(“bing”, MatchType.MatchHostNameOnly));

// for more concrete samples and implementation check https://github.com/khenidak/Router

Because of this type of branching the matchers are represented in memory as a tree not just a linked list.

matching-frx-tree

 

Because Matcher is a .NET type you can subclass it in new types of matching that suits your application (the code published here https://github.com/khenidak/Router contains most of the common stuff) or You can extend the existing ones with new capabilities specific to your application.

Executing the logic becomes a matter of mechanics as described in the documentation here https://github.com/khenidak/Router (the basic idea a “Routing Context” is pushed through the linked list).

 

Epilog

I have chosen to use this space to describe what and why we need application routers / gateways (the what and how along with source code is published on GitHub). I have also chosen to cover just one aspect of the complexity that usually comes in building them. Check the documentation to get an idea about the rest of the problems that a typical application router/gateway have to solve and how they were solved.

till next time

@khnidk

 

The Case on Latency, Fairness & Throughput for Connected Clients

Prelog

The rant below is some of the challenges and possible solutions that you will encounter when developing a server side application that offer services to connected clients. While I will talk mostly about state-ful/session connection (as in TCP/IP sockets and the likes) the below to a high degree also applies on session-less connection (such as REST/Web API type of implantation).

A couple of things we need to note before going forward. Performance & Scale are different things. Performance is how fast you can respond to a request. Scale is how many of those can you respond to concurrently. They are related because typically at large number of concurrent connections & requests, your response times tend to increase.

For the sake of discussion, we will assume that we are building a game server that powers an action game such as Halo or the likes.

What is Under the Hood? Threads, Thread Pools and .NET’s TPL

So clients’ connections come in, in your code you need to assign compute resource to respond to connection’s requests. Irrespective of Windows, Linux or even other systems. There will be a message loop in place for each client that picks fragments of (typically a byte array) out of the wire buffer into your code. Each client gets a copy of that loop (or one loop goes through the clients). Fragments are picked, routed into your code, some execution happens, then a response is sent to the original client.

    • Threads: Each client connection gets a thread, the thread will perform the message pump for that particular client and you are done. As the client connects the thread is created, as the client disconnects the thread is destroyed. Easy? yes. too easy that it makes questionable. Here is why

Threads are an expensive compute resource, you don’t have a lot of them per process (obviously depends on your CPU/MEM and how big your kernel memory is, since it carries the handle table including thread handles). The maximum number of connected clients will be tied by the maximum number of your threads you can create in-process (and this usually tend to be small number).

The other problem is, your clients will come in all shapes and colors, some will be active, some will not be very active/idle. Not very active clients will have their idle threads (which executes just the message loop against empty buffer) eating away your compute resources.

It cannot be all that wrong, right? yes. Threads are really good if you are trying to achieve the fastest possible response times for an expected - better, fixed - number of clients. example: 2 servers sharing data (backup servers, warm stand by ones etc).  But they are problematic if you want to support the maximum number of connected clients, fairly (more on this later).

Best way to think about it if your topology is more like snowflake (few clients connected to each server, each server is connected to one or more servers) then threads are better, if you are doing a star like topology they are not.

  • Thread Pools (and .NET TPL): Because .NET TPL is built on top of Thread Pool we will just group the 2 approaches in one discussion. Here is how it works. As connections come in, you create the connection object and then string the pull (aka BCL’s/.NET Socket.Receive or win32 recv function) into a continues call where one call upon finishing queue the next call in thread pool via direct call or via await construct inside some loop). This will scale well. Resources are distributed well on connected clients (and are not mapped or dedicated to them). 

On Fairness Across Conneted Clients

Are We Fair?

Fair is when all connected clients’ requests are treated equally, irrespective of how active they are.

The thread per connected client approach, is fair/er because each client gets a dedicated thread. The OS Kernel will ensure equal time scheduled per threads. It does not scale well but it is fair. The thread pool approach while scalable it is not fair and will yield skewed performance numbers. Some clients will get faster responses others won’t.

Here is why (hint: The keyword here is “queue”):

Thread pools offer a queue (one and only one) where all your work items are queued, threads on the other end of the queue pick them up one by one and execute them, with a call back to notify completion (or the await .NET construct in TPL). Active clients will be served faster than *not very* active clients because they will have more work items in that queue. You might have heard ”The harder you hit the faster it will respond across all your calls” behavior in some of cloud PaaS services in Aws or Azure. It is largely attributed to this. Everything is placed in queue irrespective of fairness to connected clients.

Additionally, it has that phantom lock/release behavior. Consider this, One active client with 20 not very active clients on 20 thread thread-pool. There will be a condition where all threads in the thread pool are busy executing read from inactive clients while we have one client ready with a request waiting at the end of the queue. You will see this as idle CPU + longer response time then busy CPU with less response times even when the total number of request did not change.

Typically, we solve these problems by using very short timeout on the receive call. Or we use smaller I/O frames. Those solutions will not stop condition above from happening, but they will resolve it much faster than longer timeout on receive calls.

Do I Need Fairness Across Connected Clients?

If the situation above applies on the server side application, you are developing then start by asking yourself. Does it really worth it? The answer really is in what you promised your clients, if the maximum response time (for the most unfairly treated connected client) is within your SLA then you don’t need fairness. Ensuring that you are within SLA is a bit of art and a bit science (also known as performance testing). As you can tell you will have to test multiple load patterns (not just stress the system until it fails).

As an analogy, this is a lot like waiting for few minutes (minimum standard wait time) to get your espresso because the shop is full of other clients (who are not ordering anything). Some applications with super small latency cannot just afford that, gaming is one of them.

Where Can I Apply Fairness?

If you are reading thus far I assume you still need fairness and you are looking for possible solutions. Let us start. A typical application in this context perform 3 major things

  • Receive Requests
  • Execute Requests (i.e. route data fragments into your code)
  • Sends Response.

Each of this area (can/will) need fairness, depending on your requirements. I am obviously assuming that we will go with the thread pool route, not the thread per connected client route.

Using Time Slices

Each execution unit (thread pool work item) will get a specified amount of time that it has to be allowed to execute in. if it does not then it should time out. It is important that this is not razor sharp time allocation, it will vary as we will discuss it. This is fairly simple to implement; the complexity is to leave in-memory objects in a non-corrupt state. Some APIs are easier than others when it comes to that. For example, the Socket.Receive can be called with a timeout when it returns copy the received bytes into an array outside the about-to-be-terminated-task scope. Some will require addition work; for example, consider ExecuteRequestAsync below (as sample of #2 above)

async Task ExecuteRequstAsync(byte[] frame)

{

// do something that takes along time.

}

Such an opaque method is very hard to put a timeout on, because if you terminate the call you might be in a situation where your data structures are in a corrupt state.

A better approach is:

Task ExecuteRequstAsync(byte[] frame, CancellationToken ct)

{

// do step 0: execute any previous uncompleted work.

if (ct.IsCancellationRequested)

{

// retain state for uncomplete work (maybe an external queue)

return null; // don’t throw exceptions because from the perspective of the caller, the call did succeed.

}

// do something step 1

if (ct.IsCancellationRequested)

{

// retain state for uncomplete work (maybe an external queue)

return null; // don’t throw exceptions because from the perspective of the caller, the call did succeed.

}

// do step 2

if (ct.IsCancellationRequested)

{

// retain state for uncomplete work (maybe an external queue)

return null; // don’t throw exceptions because from the perspective of the caller, the call did succeed.

}

// do step 3

return null;

}

And here is how I can call it

ExecuteRequstAsync(data, new CancellationTokenSource(timeoutInMilisecond).token); //Token is created to be canceled after timeout

This way the method choses when and how it can return (leaving the in memory objects in a non-corrupt state). Most of the execution will use a little more time than the allocated timeout, yet overall it is easier to implement in safe fashion. As you do your checks for IsCancelRequested you can sign off the rest of the method to another Task (or use Task.ContinueWith calling another async method) which thread pool will put at the end of the queue (keep in mind that does not mean in-order execusion). The most difficult challenge is to ensure that ExecuteRequestAsync is called accumulatively (i.e. all state changes are applied in accumulative in-order fashion).

An alternative to this, is differed execution approach. A deferred execution is represented by an object that has a queue, and an execution loop that, instead of calling ExecuteRequestAsync directly you en-queue the call to the deferred execution. Each connected client is represented by a deferred execution instance timeout can still be used with retry (assuming that ExecuteRequestAsync is idempotent).

Using A Quantum

So far we refer to work items and execution as roughly the same thing. This approach depends on separating them. A work item is a task that connected client need to execute (i.e. one of the receive, execute, or send tasks). A quantum represents the number of work items that you will execute for a certain connected client before moving to the next. This assumes that all work items are executed in roughly the same time. This fairness model depends on having a queue per connected client (outside the queue which the thread pool controls). The easiest way to do this in .NET world is to implement your own TaskScheduler. For native you will have to implement queue and the scheduler from scratch. I have implemented the same pattern in .NET for a web socket server (that ensures fairness on the send side only) here I also ranted about this here

Epilog – Fixed Time Slice and Fixed Quantum are Bad

In a typical solution like the above you will start with a value (for either this or that). And then improve on them. If you see too many timeouts reported on your tasks (or the number of remaining tasks in queue for the second approach is too high) then you need to increase the size of your value. Remember that a time out or (quantum expiry) is somehow like context switching that require dropping/acquiring locks, unwinding stacks you don’t too many of these happening in your process it consumes CPU and does not directly contribute to responding to connected clients. If you are feeling brave you can use a watch dog that dynamically adjust the value in the runtime and/or per connected client.

till next time @khnidk

On Italian Restaurants, Mafia Bosses & Web Sockets (Part 2)

This is the second and final part of 2 parts article, Part 1 is here

The first part focused on the need to create sequential multi queue Task Scheduler to solve the producer/consumer needs of downstream ordered messages on protocols such as Web Sockets.

This part is not going to be long 🙂

  1. Code for the Web Socket  Server (on top the scheduler) along with the unit tests is published here https://github.com/khenidak/WebSocketsServer
  2. In brief: The web socket server is designed to work as a stand alone server with Owin (including self hosting) and to work on top of Service Fabric.
  3. The web server uses “Session Manager” to allow the hosting process to interact with active connected web sockets (such as find a socket, send a message down stream).
  4. The web socket themselves are strong types, for example a Customer socket type, an Order socket type and so on.
  5. The web socket supports, close (async), Abort, and Drain and close (async).

please go through the readme.md for more details and usage samples.

 

till next time

@khnidk

On Italian Restaurants, Mafia Bosses & Web Sockets (Part 1)

**Part 2 published here**

Prelog

Consider your 50’s movie style Italian restaurant with one of those comically usually angry chefs. The restaurant kitchen operation is really simple. He – the chef – can cook many different menu items concurrently. But he can only cook one – and only one – plate/meal of each menu item at a time. He is a fair chef, orders cooked in first ordered, first served style. He cannot interrupt the cooking of menu item A for customer C1 for the same menu item A for customer C2. C1’s meal will never be nicely cooked and as a food respecting chef he will never allow that to happen. Now enter a mafia boss , who walks in – queue in the God Father theme music – the chef has to place whatever the mafia boss orders in front of the previously queued orders (come on it is his life on the line). Because the boss is also a food respecting person he will never ask the chef to drop currently cooking meals for his. He just asks for a place in front of the queue.

Aside from the theatrics the above analogy is commonly found in client/server communication (even when they are full duplex) including Web Sockets. Messages flow in order (per each session/client) with the ability to place one message in front of the queue (think of it as a high priority message). For example consider a very simple banking scenario where client sends the following (starting account balance is a 100$):

  1. Withdraw 50 (Account Balance: 50$).
  2. Deposit 100 (Account Balance: 150$).
  3. Withdraw 150 (Account Balance: 0$).

If message 2 and 3 came out of order the operation will fail although logically from the customer point he has the fund to execute the operation. Worse if you stopped one operation – in the middle of execution – for another you will end up with an account in an “unknown state”.

Detailing the Problem

The upcoming second part of this post I will focus more on the Web Sockets server implementation (and yes you guess it right it will be Azure Service Fabric friendly). But for now let us focus on solving the above problem.

The traditional route to solve this problem is to create producer/consumer queue-like data structure. Low priority messages placed at the end of the queue, high priority ones are placed in front of the queue.  Having a loop as a consumer picking up messages. As the following

SingleQueue

 

//Click to Enlarge

While this approach will work it will have the following problems (though you can hack your code to solve them, your code base will be a bit too complex to maintain).

  1. Ensure Fairness: One messaging/web socket session might overload the queue with large number of messages, the loop will be forced to stream them all down which means other sessions will have to wait. You can solve this by having multiple queues + multiple loops.
  2. Sent Notification: Most of messaging systems maintain an-after “send” journal-ing or logging operation. You can have this log as part of your loop (but then your other sessions will wait for send + logging).
  3. Wait Time: Your loop will implement pull > found new messages > execute or pull > not found sleep sequence (or a similar paradigm). Which means the loop uses CPU even when there are no messages to process and the minimum wait time for a message (in a free queue) is equal to your sleep time + de-queue time.

The Solution

Using a .NET Custom Task Scheduler (details here) as the following :

Note: the term message and task used below in an inter exchangeable fashion.

  • The Scheduler sets on top of N * 2 queues (where N is the number of active messaging sessions, each session will have a high priority queue and low/normal priority queue).
  • The scheduler ensures that there is only one active thread working per messaging session and this thread executes one task at a time. While it will allow multiple threads working on different sessions concurrently. The worker thread de-queues high priority messages first then low priory messages.
  • The scheduler ensures that threads yields execution to other queues after a configurable number of messages. This ensures fairness among queues even under high pressure where all threads of the thread pool are in use. This only happens if we have multiple active messaging sessions. If we have only one active session then the scheduler will ensure it is drained in one go.
  • The Scheduler does not have a loop, it en queue work items in .NET thread pool only when there is work to do (to solve the wait/sleep time discussed above).
  • The scheduler uses .NET’s Task Class derivatives which means you can use c# await construct, task continuation, task chaining just as regular tasks (to elegantly solve the after-send processing/notification problem discussed above).
  • The scheduler has a “remove queue” method, allow you to remove a queue from the sessions (it drains then remove the queue).

The implementation

First we need to have a “Leveled Class” that is derived from Task class with 2 additions:

  1. Queue Id: Used to help the scheduler to identify which queue this task should be en-queued.
  2. Is High Priority: Flag Used to identify if the task should go in front of every other exiting task in the queue (except the currently executing task).
 public class LeveledTask : Task
    {
        public string QueueId = string.Empty;
        public bool IsHighPriority = false;
       // ... CTORs for base class 
     }

Then we need the scheduler itself which goes like that

   public class SequentialLeveledTaskScheduler : TaskScheduler
    {

        private ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>> m_HiQueues
            = new ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>>();

        private ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>> m_LowQueues
        = new ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>>();

        private ConcurrentDictionary<string, object> MasterQueueLocks
            = new ConcurrentDictionary<string, object>();


        public const int DefaulltMinTaskBeforeYield = 10;
        public const int DefaultMaxTaskBeforeYield  = 50;

        private int m_MaxTaskBeforeYield = DefaulltMinTaskBeforeYield;

        public int MaxTaskBeforeYield
            {
            set {
                if (value > DefaultMaxTaskBeforeYield)
                {
                    m_MaxTaskBeforeYield = DefaultMaxTaskBeforeYield;
                    return;
                }
                if (value < DefaulltMinTaskBeforeYield)
                {
                    m_MaxTaskBeforeYield = DefaulltMinTaskBeforeYield;
                    return;
                }

                m_MaxTaskBeforeYield = value;
            }
                get { return m_MaxTaskBeforeYield; }
            }

        public void AddQueue(string QueueId)
        {
            var bHighPrority = true;
            var bAddIfNotExist = true;

            GetOrAddQueue(QueueId, bHighPrority, bAddIfNotExist);
            GetOrAddQueue(QueueId, !bHighPrority, bAddIfNotExist);
        }



        public  IEnumerable<Task> GetScheduledTasks(string QueueId)
        {
            var hQ = GetOrAddQueue(QueueId, true, false);
            var lQ = GetOrAddQueue(QueueId, false, false);

            if (null == hQ || null == lQ)
                return null;

            var masterList = new List<Task>();

            masterList.AddRange(hQ);
            masterList.AddRange(lQ);
            

            return masterList;
        }


        private ConcurrentQueue<LeveledTask> GetOrAddQueue(string QueueId, bool isHighPriority, bool addIfNotExist = true)
        {
            if (addIfNotExist)
            {
                var hiQueue = m_HiQueues.GetOrAdd(QueueId, new ConcurrentQueue<LeveledTask>());
                var lowQueue = m_LowQueues.GetOrAdd(QueueId, new ConcurrentQueue<LeveledTask>());

                return (isHighPriority) ? hiQueue : lowQueue;
            }
            else
            {
                var qList = (isHighPriority) ? m_HiQueues : m_LowQueues;
                return qList[QueueId];
            }
        }

        private object GetOrAddLock(string QueueId, bool AddIfNotExist = true)
        {
            if (AddIfNotExist)
            {
                object oNewLock = new object();
                var o = MasterQueueLocks.GetOrAdd(QueueId, oNewLock);
                return o;
            }
            else
            {
                return MasterQueueLocks[QueueId];
            }
            
        }

        public void RemoveQueue(string QueueId)
        {
            LeveledTask lt = new LeveledTask(() =>
            {

                // this will remove the Q from the list of Q
                // but will not null it so if we have an going exection it will just keep on going. 
                // because the list of Q and locks no longer maintain a reference to lQ & HQ and lock
                // it will evantually be null

                Trace.WriteLine(string.Format("queue {0} will be removed", QueueId), "info");

                ConcurrentQueue<LeveledTask> q;
                object oLock;
                
                
                m_HiQueues.TryRemove(QueueId, out q);
                m_HiQueues.TryRemove(QueueId, out q);

                MasterQueueLocks.TryRemove(QueueId, out oLock);

            });

            lt.IsHighPriority = false;
            lt.QueueId = QueueId;
            lt.Start(this);

        }

        

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            var masterList = new List<Task>();

            foreach (var hqueue in m_HiQueues.Values)
                masterList.AddRange(hqueue);

            foreach (var lqueue in m_LowQueues.Values)
                masterList.AddRange(lqueue);
            return masterList;
        }

        protected override void QueueTask(Task task)
        {
            var leveledtask = task  as LeveledTask;
            if (null == leveledtask)
                throw new InvalidOperationException("this leveled sequential scheduler shouldn't be used with regular Task objects"); // bang!


            if (leveledtask.QueueId == null ||
                leveledtask.QueueId == string.Empty)
                throw new InvalidOperationException("Task scheduler received a task that does not have a queue assigned to it"); // bang!

            var Q = GetOrAddQueue(leveledtask.QueueId, leveledtask.IsHighPriority);
            Q.Enqueue(leveledtask);

            ProcessWork(leveledtask.QueueId);   
        }

        private void ProcessWork(string QueueId)
        {

          

            ThreadPool.UnsafeQueueUserWorkItem(w => {



                var oLock = GetOrAddLock(QueueId);

                bool bGotLock = false;
                Monitor.TryEnter(oLock, ref bGotLock);

                var hQ = GetOrAddQueue(QueueId, true);
                var lQ = GetOrAddQueue(QueueId, false);

                if (0 == hQ.Count && 0 == lQ.Count)
                    return; // was already completed.

                if (!bGotLock) // a thread from the thread pool is already looping on the Q. 
                {
                    //Trace.WriteLine(string.Format("Scheduler attempt to acquire lock on {0} and failed", QueueId), "info");
                    return;    // at any point of time ONLY one thread is allwoed to dequeue on the Q to enable order tasks
                }


                var ExecutedTasks = 0;
                
                
                while (
                            // should yield
                            ExecutedTasks <= m_MaxTaskBeforeYield || // don't yeild if we have only one queue. (ExecutedTasks > m_MaxTaskBeforeYield  && m_HiQueues.Count + m_LowQueues.Count == 2)  || 
                            // don't yeild if this queue has been removed, drain it before dropping the reference. 
                            (ExecutedTasks > m_MaxTaskBeforeYield && (!m_HiQueues.ContainsKey(QueueId) && !m_LowQueues.ContainsKey(QueueId) ) ) 
                      ) 
                             
                {


                    if (ExecutedTasks > m_MaxTaskBeforeYield && (!m_HiQueues.ContainsKey(QueueId) && !m_LowQueues.ContainsKey(QueueId)))
                        Trace.WriteLine(string.Format("Queue {0} has been removed. Draining.. (remaining {1} tasks)", QueueId, lQ.Count + hQ.Count), "info");


                    LeveledTask leveledTask = null;
                    var bFound = false;

                    // found in High Priority Queue
                    bFound = hQ.TryDequeue(out leveledTask);

                    // found in Low Priority Queue
                    if (!bFound && null == leveledTask)
                        lQ.TryDequeue(out leveledTask);

                    if (!bFound && null == leveledTask) //nothing here to work on
                        break;
                    //{
                    //Trace.WriteLine(string.Format("faild! count {0}/{1} queue {2}", hQ.Count, lQ.Count, QueueId), "info");
                    ///break;
                    //}

                    try
                    {
                        base.TryExecuteTask(leveledTask);
                    }
                    catch (Exception e)
                    {
                        
                        //TODO: check if we need to call unobserved exceptions
                        Trace.WriteLine(string.Format("Task Executer: Error Executing Task {0} {1}", e.Message, e.StackTrace), "error");
                    }

                    ExecutedTasks++;

                }

                if (0 == ExecutedTasks) // we were unsucessfull picking up tasks 
                    Trace.WriteLine(string.Format("Scheduler attempted to execute on queue {0} with count {1} and found 0 tasks", QueueId, lQ.Count + hQ.Count), "info");


                Monitor.Exit(oLock);
             

                

                if ((ExecutedTasks > m_MaxTaskBeforeYield && hQ.Count + lQ.Count > 0))
                {

                    // current thread is about to be released back to the pool (and we still have more as we yielded).
                    // call it back to ensure that remaining tasks will be executed (even if no more tasks are sent to the scheduler). 
                    Trace.WriteLine(string.Format("Queue {0} yielded thread {1} after {2} tasks",
                                                   QueueId,
                                                   Thread.CurrentThread.ManagedThreadId,
                                                   ExecutedTasks - 1));

                    Task.Run(() => { ProcessWork(QueueId);} );
                }



            }, null);
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // if tasks got inlined it will lose this position in the queue.
            // so we can not afford inline tasks here
            return false;
        }
    }

The magic happens in the following methods

QueueTask(Task task)

Where the task is validated, destination queue (per session and per priority) is identified then the task is en-queued and ProcessWork method is called.

ProcessWork(string QueueId)

This method en-queues a delegate in the thread pool as a work item. When the delegates executes it checks if there is a currently active worker on the queue if there is one then it returns. Then it keeps pulling messages (high priority then low priority) until it has to yield (With some special condition regarding draining the queue as discussed above). If it has yielded the execution, it creates a new task of just another call to ProcessWork(). This ensure that execution will keep going even when QueueTask is not being called.

A couple of additional notes, The scheduler does not support inline execution (because this will make us lose the order of tasks). The rest of class implementation is maintenance and management to the memory structures used by the scheduler instance.

From the client point of view (this could be your messaging server, in my case it is a Web Sockets server) a message is sent this way

 LeveledTask lt = new LeveledTask(() =&gt;
            {
             // the magic goes here (your send logic)
            });


            lt.QueueId = q; // queue is my messaging session id
            lt.IsHighPriority = IsHighPrority; 
            lt.Start(scheduler); // scheduler is an instance of the scheduler class. 
// I can now use lt to do await, or ContinueWith just like Task class. 
// Note: un observed exceptions will go to TaskScheduler.UnobservedTaskException as usual.

Epilog

This was part 1 of two parts post covering Web Sockets Server Implementation. This part is focused on the scheduler (which in a lot of ways is a stand alone component). Part 2 will cover the server implementation. I should be able to post part 2 next week. Once I do, I will upload the code along with unit tests to git hub.

till next time!

Questions/Comments
@khnidk