On Italian Restaurants, Mafia Bosses & Web Sockets (Part 2)

This is the second and final part of 2 parts article, Part 1 is here

The first part focused on the need to create sequential multi queue Task Scheduler to solve the producer/consumer needs of downstream ordered messages on protocols such as Web Sockets.

This part is not going to be long ūüôā

  1. Code for the Web Socket  Server (on top the scheduler) along with the unit tests is published here https://github.com/khenidak/WebSocketsServer
  2. In brief: The web socket server is designed to work as a stand alone server with Owin (including self hosting) and to work on top of Service Fabric.
  3. The web server uses “Session Manager” to allow the hosting process to interact with active connected web sockets (such as find a socket, send a message down stream).
  4. The web socket themselves are strong types, for example a Customer socket type, an Order socket type and so on.
  5. The web socket supports, close (async), Abort, and Drain and close (async).

please go through the readme.md for more details and usage samples.

 

till next time

@khnidk

On Italian Restaurants, Mafia Bosses & Web Sockets (Part 1)

**Part 2 published here**

Prelog

Consider¬†your 50’s movie style Italian restaurant with one of those comically usually angry chefs. The restaurant kitchen operation is really simple. He – the chef – can cook many different menu items concurrently. But he can only cook one – and only one – plate/meal of each menu item at a time. He is a fair chef, orders cooked in first ordered, first served style. He cannot interrupt the cooking of menu item A for customer C1 for the same menu item A for customer C2. C1’s meal will never be nicely cooked and as a food respecting chef he will never allow that to happen. Now enter a mafia boss , who walks in – queue in the God Father theme music – the chef has to place whatever the mafia boss orders in front of the previously queued orders (come on it is his life on the line). Because the boss is also a food respecting person he will never ask the chef to drop currently cooking meals for his. He just asks for a place in front of the queue.

Aside from the theatrics the above analogy is commonly found in client/server communication (even when they are full duplex) including Web Sockets. Messages flow in order (per each session/client) with the ability to place one message in front of the queue (think of it as a high priority message). For example consider a very simple banking scenario where client sends the following (starting account balance is a 100$):

  1. Withdraw 50 (Account Balance: 50$).
  2. Deposit 100 (Account Balance: 150$).
  3. Withdraw 150 (Account Balance: 0$).

If message 2 and 3 came out of order the operation will fail although logically from the customer point he has the fund to execute the operation. Worse if you stopped one operation – in the middle of execution – for another you will end up with an account in an “unknown state”.

Detailing the Problem

The upcoming second part of this post I will focus more on the Web Sockets server implementation (and yes you guess it right it will be Azure Service Fabric friendly). But for now let us focus on solving the above problem.

The traditional route to solve this problem is to create producer/consumer queue-like data structure. Low priority messages placed at the end of the queue, high priority ones are placed in front of the queue.  Having a loop as a consumer picking up messages. As the following

SingleQueue

 

//Click to Enlarge

While this approach will work it will have the following problems (though you can hack your code to solve them, your code base will be a bit too complex to maintain).

  1. Ensure Fairness: One messaging/web socket session might overload the queue with large number of messages, the loop will be forced to stream them all down which means other sessions will have to wait. You can solve this by having multiple queues + multiple loops.
  2. Sent Notification:¬†Most of messaging systems maintain an-after¬†“send” journal-ing or logging operation. You can have this log as part of your loop (but then your other sessions will wait for send + logging).
  3. Wait Time: Your loop will implement pull > found new messages > execute or pull > not found > sleep sequence (or a similar paradigm). Which means the loop uses CPU even when there are no messages to process and the minimum wait time for a message (in a free queue) is equal to your sleep time + de-queue time.

The Solution

Using a .NET Custom Task Scheduler (details here) as the following :

Note: the term message and task used below in an inter exchangeable fashion.

  • The Scheduler sets on top of N *¬†2 queues (where N is the number of active messaging sessions, each session will have a high priority queue and low/normal priority queue).
  • The scheduler ensures that there is only one active thread working per messaging session and this thread executes one task at a time. While it will allow multiple threads working on different sessions concurrently. The worker thread de-queues high priority messages first then low priory messages.
  • The scheduler ensures that threads yields execution to other queues after a configurable number of messages. This ensures fairness among queues even under high pressure where all threads of the thread pool are in use. This only happens if we have multiple active messaging sessions. If we have only one active session then the scheduler will ensure it is drained in one go.
  • The Scheduler does not have a loop, it en queue work items in .NET thread pool only when there is work to do (to solve the wait/sleep time discussed above).
  • The scheduler uses .NET’s Task Class derivatives which means you can use c# await construct, task continuation, task chaining just as regular tasks (to elegantly solve the after-send processing/notification¬†problem discussed¬†above).
  • The scheduler has a “remove queue” method, allow you to remove a queue from the sessions (it drains then remove the queue).

The implementation

First we need to have a “Leveled Class” that is derived from Task class with 2 additions:

  1. Queue Id: Used to help the scheduler to identify which queue this task should be en-queued.
  2. Is High Priority: Flag Used to identify if the task should go in front of every other exiting task in the queue (except the currently executing task).
 public class LeveledTask : Task
    {
        public string QueueId = string.Empty;
        public bool IsHighPriority = false;
       // ... CTORs for base class 
     }

Then we need the scheduler itself which goes like that

   public class SequentialLeveledTaskScheduler : TaskScheduler
    {

        private ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>> m_HiQueues
            = new ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>>();

        private ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>> m_LowQueues
        = new ConcurrentDictionary<string, ConcurrentQueue<LeveledTask>>();

        private ConcurrentDictionary<string, object> MasterQueueLocks
            = new ConcurrentDictionary<string, object>();


        public const int DefaulltMinTaskBeforeYield = 10;
        public const int DefaultMaxTaskBeforeYield  = 50;

        private int m_MaxTaskBeforeYield = DefaulltMinTaskBeforeYield;

        public int MaxTaskBeforeYield
            {
            set {
                if (value > DefaultMaxTaskBeforeYield)
                {
                    m_MaxTaskBeforeYield = DefaultMaxTaskBeforeYield;
                    return;
                }
                if (value < DefaulltMinTaskBeforeYield)
                {
                    m_MaxTaskBeforeYield = DefaulltMinTaskBeforeYield;
                    return;
                }

                m_MaxTaskBeforeYield = value;
            }
                get { return m_MaxTaskBeforeYield; }
            }

        public void AddQueue(string QueueId)
        {
            var bHighPrority = true;
            var bAddIfNotExist = true;

            GetOrAddQueue(QueueId, bHighPrority, bAddIfNotExist);
            GetOrAddQueue(QueueId, !bHighPrority, bAddIfNotExist);
        }



        public  IEnumerable<Task> GetScheduledTasks(string QueueId)
        {
            var hQ = GetOrAddQueue(QueueId, true, false);
            var lQ = GetOrAddQueue(QueueId, false, false);

            if (null == hQ || null == lQ)
                return null;

            var masterList = new List<Task>();

            masterList.AddRange(hQ);
            masterList.AddRange(lQ);
            

            return masterList;
        }


        private ConcurrentQueue<LeveledTask> GetOrAddQueue(string QueueId, bool isHighPriority, bool addIfNotExist = true)
        {
            if (addIfNotExist)
            {
                var hiQueue = m_HiQueues.GetOrAdd(QueueId, new ConcurrentQueue<LeveledTask>());
                var lowQueue = m_LowQueues.GetOrAdd(QueueId, new ConcurrentQueue<LeveledTask>());

                return (isHighPriority) ? hiQueue : lowQueue;
            }
            else
            {
                var qList = (isHighPriority) ? m_HiQueues : m_LowQueues;
                return qList[QueueId];
            }
        }

        private object GetOrAddLock(string QueueId, bool AddIfNotExist = true)
        {
            if (AddIfNotExist)
            {
                object oNewLock = new object();
                var o = MasterQueueLocks.GetOrAdd(QueueId, oNewLock);
                return o;
            }
            else
            {
                return MasterQueueLocks[QueueId];
            }
            
        }

        public void RemoveQueue(string QueueId)
        {
            LeveledTask lt = new LeveledTask(() =>
            {

                // this will remove the Q from the list of Q
                // but will not null it so if we have an going exection it will just keep on going. 
                // because the list of Q and locks no longer maintain a reference to lQ & HQ and lock
                // it will evantually be null

                Trace.WriteLine(string.Format("queue {0} will be removed", QueueId), "info");

                ConcurrentQueue<LeveledTask> q;
                object oLock;
                
                
                m_HiQueues.TryRemove(QueueId, out q);
                m_HiQueues.TryRemove(QueueId, out q);

                MasterQueueLocks.TryRemove(QueueId, out oLock);

            });

            lt.IsHighPriority = false;
            lt.QueueId = QueueId;
            lt.Start(this);

        }

        

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            var masterList = new List<Task>();

            foreach (var hqueue in m_HiQueues.Values)
                masterList.AddRange(hqueue);

            foreach (var lqueue in m_LowQueues.Values)
                masterList.AddRange(lqueue);
            return masterList;
        }

        protected override void QueueTask(Task task)
        {
            var leveledtask = task  as LeveledTask;
            if (null == leveledtask)
                throw new InvalidOperationException("this leveled sequential scheduler shouldn't be used with regular Task objects"); // bang!


            if (leveledtask.QueueId == null ||
                leveledtask.QueueId == string.Empty)
                throw new InvalidOperationException("Task scheduler received a task that does not have a queue assigned to it"); // bang!

            var Q = GetOrAddQueue(leveledtask.QueueId, leveledtask.IsHighPriority);
            Q.Enqueue(leveledtask);

            ProcessWork(leveledtask.QueueId);   
        }

        private void ProcessWork(string QueueId)
        {

          

            ThreadPool.UnsafeQueueUserWorkItem(w => {



                var oLock = GetOrAddLock(QueueId);

                bool bGotLock = false;
                Monitor.TryEnter(oLock, ref bGotLock);

                var hQ = GetOrAddQueue(QueueId, true);
                var lQ = GetOrAddQueue(QueueId, false);

                if (0 == hQ.Count && 0 == lQ.Count)
                    return; // was already completed.

                if (!bGotLock) // a thread from the thread pool is already looping on the Q. 
                {
                    //Trace.WriteLine(string.Format("Scheduler attempt to acquire lock on {0} and failed", QueueId), "info");
                    return;    // at any point of time ONLY one thread is allwoed to dequeue on the Q to enable order tasks
                }


                var ExecutedTasks = 0;
                
                
                while (
                            // should yield
                            ExecutedTasks <= m_MaxTaskBeforeYield || // don't yeild if we have only one queue. (ExecutedTasks > m_MaxTaskBeforeYield  && m_HiQueues.Count + m_LowQueues.Count == 2)  || 
                            // don't yeild if this queue has been removed, drain it before dropping the reference. 
                            (ExecutedTasks > m_MaxTaskBeforeYield && (!m_HiQueues.ContainsKey(QueueId) && !m_LowQueues.ContainsKey(QueueId) ) ) 
                      ) 
                             
                {


                    if (ExecutedTasks > m_MaxTaskBeforeYield && (!m_HiQueues.ContainsKey(QueueId) && !m_LowQueues.ContainsKey(QueueId)))
                        Trace.WriteLine(string.Format("Queue {0} has been removed. Draining.. (remaining {1} tasks)", QueueId, lQ.Count + hQ.Count), "info");


                    LeveledTask leveledTask = null;
                    var bFound = false;

                    // found in High Priority Queue
                    bFound = hQ.TryDequeue(out leveledTask);

                    // found in Low Priority Queue
                    if (!bFound && null == leveledTask)
                        lQ.TryDequeue(out leveledTask);

                    if (!bFound && null == leveledTask) //nothing here to work on
                        break;
                    //{
                    //Trace.WriteLine(string.Format("faild! count {0}/{1} queue {2}", hQ.Count, lQ.Count, QueueId), "info");
                    ///break;
                    //}

                    try
                    {
                        base.TryExecuteTask(leveledTask);
                    }
                    catch (Exception e)
                    {
                        
                        //TODO: check if we need to call unobserved exceptions
                        Trace.WriteLine(string.Format("Task Executer: Error Executing Task {0} {1}", e.Message, e.StackTrace), "error");
                    }

                    ExecutedTasks++;

                }

                if (0 == ExecutedTasks) // we were unsucessfull picking up tasks 
                    Trace.WriteLine(string.Format("Scheduler attempted to execute on queue {0} with count {1} and found 0 tasks", QueueId, lQ.Count + hQ.Count), "info");


                Monitor.Exit(oLock);
             

                

                if ((ExecutedTasks > m_MaxTaskBeforeYield && hQ.Count + lQ.Count > 0))
                {

                    // current thread is about to be released back to the pool (and we still have more as we yielded).
                    // call it back to ensure that remaining tasks will be executed (even if no more tasks are sent to the scheduler). 
                    Trace.WriteLine(string.Format("Queue {0} yielded thread {1} after {2} tasks",
                                                   QueueId,
                                                   Thread.CurrentThread.ManagedThreadId,
                                                   ExecutedTasks - 1));

                    Task.Run(() => { ProcessWork(QueueId);} );
                }



            }, null);
        }

        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            // if tasks got inlined it will lose this position in the queue.
            // so we can not afford inline tasks here
            return false;
        }
    }

The magic happens in the following methods

QueueTask(Task task)

Where the task is validated, destination queue (per session and per priority) is identified then the task is en-queued and ProcessWork method is called.

ProcessWork(string QueueId)

This method en-queues a delegate in the thread pool as a work item. When the delegates executes it checks if there is a currently active worker on the queue if there is one then it returns. Then it keeps pulling messages (high priority then low priority) until it has to yield (With some special condition regarding draining the queue as discussed above). If it has yielded the execution, it creates a new task of just another call to ProcessWork(). This ensure that execution will keep going even when QueueTask is not being called.

A couple of additional notes, The scheduler does not support inline execution (because this will make us lose the order of tasks). The rest of class implementation is maintenance and management to the memory structures used by the scheduler instance.

From the client point of view (this could be your messaging server, in my case it is a Web Sockets server) a message is sent this way

 LeveledTask lt = new LeveledTask(() =&gt;
            {
             // the magic goes here (your send logic)
            });


            lt.QueueId = q; // queue is my messaging session id
            lt.IsHighPriority = IsHighPrority; 
            lt.Start(scheduler); // scheduler is an instance of the scheduler class. 
// I can now use lt to do await, or ContinueWith just like Task class. 
// Note: un observed exceptions will go to TaskScheduler.UnobservedTaskException as usual.

Epilog

This was part 1 of two parts post covering Web Sockets Server Implementation. This part is focused on the scheduler (which in a lot of ways is a stand alone component). Part 2 will cover the server implementation. I should be able to post part 2 next week. Once I do, I will upload the code along with unit tests to git hub.

till next time!

Questions/Comments
@khnidk

Azure Service Fabric: On State Management

Prelog

Making the right design decision is a mix of¬†Experience, Knowledge¬†and a bit of Art.¬†The phrase¬†“right design decision” in itself requires the understanding that design¬†is not an isolated abstract, it has context that spans requirements, environment to the people who actually about to write¬†the software.

Service Fabric employs a complex set of procedures that ensures that your data is consistent and resilient (i.e. if a cluster node goes down).

Understanding (knowing) what goes under the hood in your application platform is key to making better design decisions. Today I want to attempt to describe how state is managed in Service Fabric and different choices that you have and their implications on the performance and availability of your Service Fabric Application.

Strongly recommend to go through Service Fabric: Partitions before reading this one.

Replication Process & Copy Process

Service Fabric employs two distinct processes used to ensure consistency across a replica set (the number of replicas in a given partition for a  stateful service).

Replication Process: used during writing state data in your service.

Copy Process: used to build a new replica when an existing Active Secondary fails. Also used to recover an Active Secondary that was down for a while.

Both processes are described below.

Replication Process

So your Primary goes online start listening to requests and is about to write data using one of the reliable collection classes. Here is a break down of events:

  1. The Primary wraps the write operation (can be an add, update, or delete) puts it in a queue. This operation is synchronized with a transaction. At the other end of the queue your Active Secondary(s) are pumping out the operations as described in the next step. No data has been written yet to the Primary data store.
  2. Active Secondary(s) pick the operation(s) and apply them locally, apply them locally and acknowledge the Replicator one by one.
  3. The replicator delivers a call back to the Primary when write were success.
  4. Primary then can apply the data locally and return the original request caller.

Diving Even Deeper

  • The replicator will not¬†acknowledge the primary¬†with a successful write until¬†it was “Quorum Written”.¬†A quorum is floor(n/2)+1¬†where n is the # of replicas in a set. As in the¬†more than half ¬†of the replicas in a set has successfully applied it.
  • ¬†Each operation is logically sequenced. The replication process is out of band (from the perspective o the Primary).¬†As in at in any point of time an internal list¬†is maintained¬†by the¬†primary that indicates¬†operation waiting to be¬†sent to replicators, operations sent to replicators and not yet Acked, operations sent to replicator and Acked.
  • There is no relation between the data structure semantics (queue vs dictionary) and the actual storage/replication. All data are replicated and stored the same way.
  • Given the following pseudo code

Create transaction TX
Update¬†dictionary D (key =¬†K to value of “1”)
commit TX
Create another transaction TX1
Read Dictionary D (Key = K)

What value should you expect when you read the dictionary the second time? most probably whatever value K had before updating it. Because remember changes are not applied locally until replicated to the quorum.

Let us bend our minds even more

  • Active Secondary(s) keep pumping data out from replication queue. They don’t depend on replicator to notify for new data. As in a number of Active Secondary(s) can be at different stages of replications. In a perfect case scenario a quorum has to be aligned with the Primary. This effect how you do your reads as we will describe it later.
  • Service Fabric maintains – sort of – configuration version for each replica set. This version changes as the formation of the replica set (i.e. which one currently is the primary). This particular information are communicated to the replica.
  • You can run in a situation where an Active Secondary is a head of the Primary. Given a situation where a Primary failed to apply changes locally before dying and not notifying the caller of successful writes.¬†Or worse¬†Service Fabric lost the write quorum (for example: many nodes of the cluster¬†failed at the same time) and the¬†surviving replicas (of which one will be elected primary)¬†are the slow ones that was not catching up. The details of how this is recovered¬†and communicated to your code (and control you will have on recovery)¬†is the part of the¬†Service Fabric team is working on.¬†¬†Rolling Back, Loss Acceptance,¬†Restore from Backup are all techniques that can be used for this.

What About My Reads?

The default behavior is to direct reads to the Primary replica to ensure consistent reads (i.e. read from the latest most trusted version of data) However you can enable listening on your secondary(s) and redirect reads to them (more on how to here).
As you can tell from the above discussion reads from Secondary(s) can be out of sync reads or a better expression ghost data. The current service interfaces does not allow you to know how far ahead or far behind the current replica is from the Primary.

The Copy Process

The scenarios above discuss what happens when writes occur from Primary to Active Secondary(s). But what happen when Service Fabric needs to bring one a Secondary up to speed (i.e. synchronize it). The Secondary in question can be new (i.e. service fabric replacing a dead one on a dead node or moving an existing one to a new node due to resource rebalance). Obviously using the replication mechanism is an over kill. Instead Service Fabric allows the Secondary to communicate to the primary its marker (the last sequence it applied internally) the Primary can then send back the delta operations for the secondary to apply. Once the secondary is brought up to speed it can follow the normal replication process.

The Fabric Replicator

Up until this moment we have talked about a “Replicator” with further details. Service Fabric comes with “Fabric Replicator” as the default replicator, this is surfaced to you partially in the “StateManager” member of your stateful service which holds reference to the replicator. If you tinker inside the code you will notice that there is a process that¬†couples your service to a state provider / manager (in IStatefulServiceReplica interface) meaning in theory you can build your own replicator which as you can tell is quite complex.

So What Does This Means For Me?

The below is various discussion points on multiple design aspects when it comes to replicas/replica sets.

How Fast are My Writes?

You have to understand that the fastest is S/A + time -on-slowest-Network (R) + D/A  + Ack. Where
  • S/A is the time of serialization & local calls to the replicator at the Primary.
  • Slowest network to the replica (those of the quorum, or the slowest of fastest responders who happened to be most updated secondary(s)).
  • D/A is the time of deserialization & calls to the replicator¬†at the secondary.
  • Ack time (the way back).

Large # of Replicas vs Small # of Replicas

It is really tempting to go with large # of replicas. But keep in mind¬†“How fast are my writes?” discussion above & the fact that write quorum are floor(r/2) + 1. The larger your replica set the larger your quorum the slower your writes will be. I would go with the smallest possible replica set per partition.

But really, When Should I Go With Larger # of Replica in my Replica Set?

If you have a system where reads are far more than writes, and the impact of ghost reads is negligible to your business then it might make sense to go with large # of replicas (and allowing read on secondary(s)).
As a side note, replica listening address resolution mechanism (more here) does not keep track of your calls. So you probably want to do simple load balancing (for example round/robin across all secondary replicas in a partition).

Large Objects (Data Fragment) vs. Small Objects

¬†How big your objects (those entries in reliable dictionaries and queues) should be? well consider the following. The sequential nature of replication, the fact that replicas live on different machines (network overhead). you will¬†end up with “smaller is better“.
A data fragment that is a 500K big, it is not only slow in replication to the quorum. it will also slow the line of pending replications to replicas (trying to catch up). Not only that it will slow the Copy Process hence slowing down the recovery and resource re-balancing processes.
Consider techniques of breaking down your data into smaller chunks (writes) and write them in one transaction.
Also note that reliable collections persist in memory (for primary and Active Secondary(s)) as well (that is how data structure semantics queue vs dictionary are maintained on a natural replicator). That means your entire state lives in memory of your cluster nodes.

Epilog

As you can tell the mechanics of Service Fabric internals are quite complex yet an understanding of how it work internally is important to building high performance Service Fabric applications.

till next time

@khnidk

Writing a Non-Locking Throttling Component

Prelog

Throttling components, also known as flood gates, latches or just gates have variety of use cases. On top of my head, I can think of:

  1. SaaS software that charges customer per call/quota of calls to a certain component.
  2. Protecting a resource, for example an old system that can handle up to X calls in a certain period.
  3. Some uses it systems such as DOS detection (if X calls to resource Y from Z caller > threshold) then flag then throttle.

If you have  requirements for a throttling component I would recommend checking if Azure API Management fits your need before deciding on writing it yourself.

In all cases throttling components are fun to write and are a good brain exercise ūüôā

The Requirements

What are we trying to achieve with¬†a throttling component is “Control the number of calls to a certain resource.” The “why” (such as SaaS scenarios, avoid noisy neighbors, quota-ed calls)¬†part of this requirements is not part of the core requirements.

The Design

Note: The code samples uploaded with this post (check end of post) has multiple implementations using locking and non-locking approaches. However I am here focusing only on non-locking since it is the most interesting.

The idea is to use a circular buffer with size equal to the maximum allowed requests in period.  each entry of the buffer is the ticks of the time the request received. With 2 pointers  head (starting position = 0), tail (starting position = -1) as the following:

buffer

  • Tail represents the first request in a period. So if you want to do¬†max 100 requests¬†every¬†¬†10 seconds, tail will represent the first request received 10 seconds ago.¬†As you receive requests, Tail will race forward¬†stops at the first request in period.
  • For each request: if progressing Head forward 1 step will not make it catch up with Tail (i.e head != tail), then accept and progress forward 1 step. if progressing Head 1 step forward will make it catch up with Tail then return error and don’t progress Head.¬†In essence stop¬†accepting requests¬†every time head and tail meet.
  • As Tail progress forward it may not find a¬†the first request in period (i.e. we didn’t receive the requests in¬†a¬†span longer than our target period). ¬†then set Tail = Head -1 (i.e. reset and start over).
  • Calculating “Retry After”: (i.e.¬†already¬†maxed out¬†on¬†your requests per period quota). The¬†“retry after” time span¬†is the difference between time of Tail + Period (which will be a point in the future) – ¬†time of now.

In a lot of ways think of this as two cars racing on a track, the faster car (requests) starts just ahead of the slower car (period start time), every time the faster car catch up with the slower car (full circle) it stops giving the slower car a chance to advance.

The Code

using System;
using System.Diagnostics;
using System.Threading;

namespace Throttling
{
    internal class MultiNoLock
    {
        // these values are driven from config
        private readonly static TimeSpan Period           = TimeSpan.FromSeconds(5);
        private readonly static int maxReqAllowedInPeriod = 100;
        private readonly static long ticksInPeriod        = Period.Ticks;
        private readonly static int totalRequestsToSend   = 1000 * 1000;
        private readonly static int numOfThreads          = 10; // think of it as # of concurrent Requests

        // these values are per every throttling component
        private long[] allTicks = new long[maxReqAllowedInPeriod];
        private int head = 0; // head
        private int tail = -1; // tail

        // wait for all threads
        private CountdownEvent cde = new CountdownEvent(numOfThreads);

        // this will slow the console.write down.
        // but will have no effect on the actual values
        // if you want you can remove the console.write and write to stdout or stderr
        // and work with the values
        private AutoResetEvent _ConsoleLock = new AutoResetEvent(true);

        private long totalProcessingTicks = 0;
        private int  totalSucessfulReq    = 0;

        private void runThread(object oThreadNum)
        {
            // all the console.SetCursorPosition are meant for clear output. 

            int threadNum = (int)oThreadNum;
            Stopwatch sp = new Stopwatch();
            var startingLine = threadNum * 6;

            var reqForThisThread = totalRequestsToSend / numOfThreads;

            _ConsoleLock.WaitOne();
            Console.SetCursorPosition(1, 0 + startingLine); // aka first
            Console.WriteLine(string.Format("Thread # {0}:", threadNum));
            _ConsoleLock.Set();

            for (var i = 0; i &amp;lt;= reqForThisThread; i++)
            {
                _ConsoleLock.WaitOne();
                    Console.SetCursorPosition(1, 1 + startingLine);
                    Console.WriteLine(string.Format("head/tail: {0}/{1}   ", head,tail));

                    Console.SetCursorPosition(1, 2 + startingLine);
                    Console.WriteLine(string.Format("Total Calls: {0}   ", i));
                _ConsoleLock.Set();

                long TryAfterTicks = 0;

                sp.Start();
                    var bShouldDoIt = ShouldDoIt(ref TryAfterTicks);
                var ElabsedTicks = sp.ElapsedTicks;

                Interlocked.Add(ref totalProcessingTicks, ElabsedTicks);

                if (bShouldDoIt)
                    Interlocked.Increment(ref totalSucessfulReq);

                sp.Stop();
                sp.Reset();

                _ConsoleLock.WaitOne();
                Console.SetCursorPosition(1, 5 + startingLine);
                Console.WriteLine(string.Format("Process Ticks: {0}  ", ElabsedTicks));
                Debug.WriteLine(string.Format("Process Ticks: {0}  ", ElabsedTicks));
                _ConsoleLock.Set();

                _ConsoleLock.WaitOne();
                Console.SetCursorPosition(1, 3 + startingLine);
                if (bShouldDoIt)
                    Console.WriteLine(string.Format("Should do it: {0} \t\t\t\t\t\t ", bShouldDoIt));
                else
                    Console.WriteLine(string.Format("Should do it: {0}  Try After {1} Milliseconds", bShouldDoIt, TryAfterTicks / TimeSpan.TicksPerMillisecond));

                Console.SetCursorPosition(1, 4 + startingLine);
                Console.WriteLine(string.Format("current minute/secound {0}/{1}", DateTime.Now.Minute, DateTime.Now.Second));
                _ConsoleLock.Set();

                //random sleep, so we wouldn't get uniform values (in total ticks &amp;amp; counts).
                // max of 100 ms wait

                Thread.Sleep(TimeSpan.FromMilliseconds((new Random()).Next(1, 10) * 10));
            }
            cde.Signal();
        }
        public void Run()
        {
            Thread[] threads = new Thread[numOfThreads];
            for (var i = 0; i &amp;lt; numOfThreads; i++)             {                 threads[i] = new Thread(this.runThread);                 threads[i].Start(i);             }             cde.Wait();             // although i am printing average, you should look for max.              // some requets too much to identify go/no go decision (i.e 10ms) (with 50 thread) (time waiting for lock release)             // if you are doing SLA (or similar commitment) you will be in trouble             Debug.WriteLine("Threads {0} - Averging Processing is @ {1} ms per request" ,threads.Length, totalProcessingTicks / TimeSpan.TicksPerMillisecond/  totalSucessfulReq  );         }         private void setTail(long nowTicks)         {             var newTTail = 0;             // only first call             if (-1 == tail)             {                 Interlocked.Exchange(ref tail, newTTail);                 Interlocked.Exchange(ref allTicks[newTTail], nowTicks);                 return;             }             long ticksAfter = (nowTicks - ticksInPeriod); // we are not really intersted                                                            // in anything older than the period             var newtail = tail;             var currenttail = newtail;                          // no need to trim if current tail is within period             if (allTicks[newtail] &amp;gt;= ticksAfter)
                return;// tail is good

            while (true)
            {
                if (newtail == maxReqAllowedInPeriod - 1)
                    newtail = 0;
                else
                    newtail++;

                // all entries are really old, and a previous thread didn't just set the value
                if (newtail == currenttail &amp;amp;&amp;amp; allTicks[newtail] &amp;lt; ticksAfter)                 {                     // reset place the tail just one place before the head.                     newtail = (head == 0) ? maxReqAllowedInPeriod - 1 : head - 1;                     Interlocked.Exchange(ref tail,  newtail);                     Interlocked.Exchange(ref allTicks[newtail], nowTicks);                     return;                 }                 // if there are two threads competing for tails                 // by definition one of them will be following the other in the loop                 // this below gurantees that one will set the value for the other                               // is a good tail?                 if (allTicks[newtail] &amp;gt;= ticksAfter)
                {
                    Interlocked.Exchange(ref tail, newtail);
                    return;
                }
            }
        }

        private bool _ShouldDoIt(long nowTicks)
        {
            /*
            as a rule of thumb if your variables assignment is less than 64bit on a 64bit
            machine, then using the = operator should be fine for atomic operation

            hence long = long on a 64 machine is an atomic operation and does not require Interlocked.
            however if you are doing long to long assignment on a 32bit machine then this is
            not an atomic assignment and will require interlocked. 

            the below uses interlocked everywhere to avoid confusion
            */
            setTail(nowTicks);

            var currentHead = 0;
            Interlocked.Exchange(ref currentHead, head);

            if (
            // meeting some where not at the begining of the track.
                ( (currentHead &amp;lt; maxReqAllowedInPeriod - 1)  &amp;amp;&amp;amp; currentHead + 1  == tail )
                ||
            // meeting at the begining of the track
                (0 == tail &amp;amp;&amp;amp; (currentHead == maxReqAllowedInPeriod - 1))
               )
                return false;
            // there is a slim possiblity that head has incremented
            // in this case the increment will be lost. it is very slim chance but possible
            // in the majority of cases this should be acceptable

            if (currentHead &amp;lt; maxReqAllowedInPeriod - 1)
            {
                currentHead++;
                Interlocked.Exchange(ref head, currentHead);
            }
            else
            {
                currentHead = 0;
                Interlocked.Exchange(ref head, 0);
            }

            Interlocked.Exchange(ref allTicks[currentHead], nowTicks);

            return true;
        }
        public bool ShouldDoIt(ref long TryAfterTicks)
        {
            long nowTicks = DateTime.Now.Ticks;
            bool bOk = _ShouldDoIt(nowTicks);
            if (!bOk) // you might get skewed results for TryAfter, since tail might be adjusted
                TryAfterTicks = (allTicks[tail] + ticksInPeriod) - nowTicks;
            return bOk;
        }
    }
}

Notes on the Code

  • Language: The code is written in C# but I have tried to go as agonistic as possible. the same code can be done¬†in C++ or other language on Windows (Interlocked.XXX functions are¬†in fact win32 functions check here).¬†For Linux family of OSs look¬†for¬†how these are¬†surfaced into your SDK/Language.
  • Memory Consumption: The code consumes 8 bytes for every request tracked (ticks are long 8 bytes datatypes). This can be significant if you are working with a low memory system such as Windows on Raspberry PI/2 or you are tracking maximum requests per caller. For example tracking a max of 10,000 requests for max of 1000¬†concurrent¬†callers will result into 10,000 * 1000 * 8 = ¬†¬†76 megs just for this component to operate successfully. You can cut this by half if you absolute ticks to last 1 hour only¬†or shorter period (hence using ints 4 bytes instead of longs 8 bytes).
  • Limitation: You can not dynamically change max # of allowed requests in period without locking (or creating a new¬†circular buffer with the new size¬†then cleverly jumping the pointers to the right position in the new buffer).
  • Accuracy: There is a slight chance of not counting requests (shouldn’t be a lot of requests since it is in between¬†convective lines of code) between calculating head and setting it. please check the code comments for more.

Why not Locking

Locking in itself is not bad. It protects resources and allows you to do fascinating stuff. in a low concurrency scenarios locking is absolutely fine. Because each request will wait for one or two requests to complete adjusting the in memory data structure representing the requests. As a matter of fact if you tried locking and non-locking implementation with low concurrency they will perform about the same.

But if you are dealing with high concurrency scenarios locking is bad and will lead requests waiting for prolonged amount of time just to adjust in memory data structure (in addition to whatever I/O you need to do to complete the request successfully).

Multi Server Environment

Unless you are still some where in the 1980’s you will need to run this code on multiple server (for load balancing and availability). For the sake of discussion let us imagine¬†you need to run this on a load balanced web node. the scenario of choice will depend on some trade offs.

High Accuracy &&  Low Resilience && No Session Affinity

For servers that use round/robin load distribution you can assume max allowed requests per node= (total allowed requests/# of nodes) for requests not tracked per caller. If you are doing tracking per caller, Then you will have to do RPC into other servers to get total # of requests processed for the caller then update internal counters before validation. In this scenario resetting a node will reset its counters.

High Accuracy &&  High Resilience && No Session Affinity

This scenario is similar to the previous, except all servers maintain a single state in a backing storage. Hence their will be no need for RPCing into other servers just the backing store.

Load Distribution with Session Affinity

Session affinity is bad. And you should only use it only when you must use it. I don’t generally favor absolutes, but this is one of the¬†few I do.¬†If you have to do session affinity then state will be in a single server, weather to save it in backing storage or not will depend if you need it to survive server restarts.

Notes on Selecting Backing Storage

Favor storage component that can:

  1.  Perform read, modify, write in a single operation (There are few of those. The list will surprise you). Should make your life easier trying to figure out if head and tails have moved while you were reading them.
  2. Storage components that can be partitions, since state is saved per caller, there is no point to have lock contention for a caller shared storage.

Relaxed Accuracy Approach

Some systems can have relaxed accuracy. This is done by continuously analyzing logs asynchrony (written for each request). Then decide and maintain in memory gate lock. These systems tend to have longer period (usually longer than few minuets). Because¬†processing a request and¬†using it to calculate if the gate should be locked is not immediate these systems typically don’t have high accuracy (typically used for noisy neighbors avoidance scenarios).

Epilog

The code is published here: https://github.com/khenidak/throttling have fun with it.

@khnidk

Azure Service Fabric: Learning Materials & Literature

Bookmark this page, I will keep it updated with hand-picked Service Fabric learning materials & literature

Added on August 4th 2015

 

@khnidk

Service Fabric: Hosting, Activation & Isolation

Prelog

If you try to map Service Fabric your old style Application Server platform. You probably end up – after stripping it to bare metal – with defining Service Fabric as an App Server that runs on a multi server cluster that provides:

  1. Process Management: including availability, partitioning, health and recovery.
  2. Application Management: including isolation, version management and updates.
  3. State Management: including resilience, transaction support and partitioning.

The above as an informal definition (for formal definition check here) acts as bases for interesting patterns such as Actor Models, Micro Services. Understanding how your code runs on top of a Service Fabric cluster is key to efficient solution design and architecture.

This post focuses on #1 Process Management/Model & #2 Application Management.

Before we begin I consciously decided not to include the following in the discussion below, to ensure focus. we will able to cover them in later posts:

  1. Actor Framework.
  2. Service Groups.
  3. Application/Service Package Resources, Configuration or Secret Stores (only focused on Service Package Code).
  4. Host Setup

Note: While I use mostly PowerShell below .NET API can also be used. Unless stated otherwise.

Overview Of Service Fabric Related Concepts

We start our understanding of things, by understanding their concepts and how they relate to each others. Instead of trying to describe them in abstract I put together a entity model like diagram that puts them in perspective.

ActivationHosting

// Click to enlarge

Let us attempt to describe the monstrous diagram above.

On The Design Time Side

  • An¬†Application Package is described by ApplicationManifest.xml defines 1) Type Name¬†identifier used to allow you to create instances of applications¬†2) Version. for example the first line of ApplicationMainfest.xml declares “HostingActivationApp” as Application Type Name & “1.0.0.0” as version:
    <ApplicationManifest ApplicationTypeName="HostingActivationApp" ApplicationTypeVersion="1.0.0.0" xmlns="http://schemas.microsoft.com/2011/01/fabric" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    
  • An Application Package can import multiple Service Package each¬†is described by ServiceManifest.xml each declares 1) Name¬†2) Version as the following:
    <ServiceManifest Name="HostingActivationPkg"
                     Version="1.0.0.0"
                     xmlns="http://schemas.microsoft.com/2011/01/fabric"
                     xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    
  • Each Service Package defines
    • Services Types (not to be confused by “.NET Class Name”). This name allow you to reference a service for activation irrespective of the actual class name that implements it.¬†The type also defines¬†if it is a¬†Stateful or a Stateless service.
    • Code Packages¬†(with Name & Version)¬†defines where your service will run. Think of it as a host¬†definition, a container definition or “Worker Process Definition”. This host can be explicit, as in EXE program (typically¬†a¬†console application created by VS.NET for you). or Implicit where Service Fabric will run your code in a worker process created for you. For this post we will focus only on Explicit Hosts.
    • Service Package does not define how the service will be partitioned¬†nor¬†the number of instances per service, nor how many times the service will be activated in an application.
    • The below example defines 2 service types (Stateful Service named “ComputeSvc” and another stateless service named “GatewaySvc”). It also defines one¬†Code Package with EXE host named ¬†“HostingActivationSvcs.exe”
      <ServiceTypes>
          <StatefulServiceType ServiceTypeName="ComputeSvc" HasPersistedState="true"  />
      
          <!-- added another service type to my package-->
          <StatelessServiceType ServiceTypeName="GatewaySvc" />
        </ServiceTypes>
      
        <!-- Code package is your service executable. -->
        <CodePackage Name="Gw.Code" Version="1.0.0.0">
          <EntryPoint>
            <ExeHost>
              <!--
                  this is the host where replicas will be activated,
                  typically Service Fabric runs one instance per node per application instance
               -->
              <Program>HostingActivationSvcs.exe </Program>
            </ExeHost>
          </EntryPoint>
        </CodePackage>
      
  • Back to the Application Package.¬†After¬†importing one or more Service Packages it can now declare:
    • Default Services Each defines a Service Name (used in resolution by Uri for example¬†fabric:/<App Instance Name>/<Service Name>), ¬†of¬†“Service Type”¬†and a¬†Partitioning Schema (or instance count for stateless services). Default Services are activated automatically once an application instance is created.
    • Service Templates similar to Default Services but are not activated ¬†when application instance are created. Instead you have to manually activate¬†them via API or PowerShell. Because of this¬†it does not have a Service Name (which¬†has to be provided during activation).
    • The following example of¬†Application Manifest Imports “HostingActivationPkg” with Version & Name. Defines:
      • 1 Service Template (Unnamed).
      • 3 Default Services (named: Worker1, Worker2 & Gw1) which will be activated once an application instance is created.
        
         <ServiceManifestImport>
            <ServiceManifestRef ServiceManifestName="HostingActivationPkg" ServiceManifestVersion="1.0.0.0" />
            <ConfigOverrides />
          </ServiceManifestImport>
        
            <!-- -->
          <ServiceTemplates>
        
            <StatelessService   ServiceTypeName="GatewaySvc" InstanceCount="4">
              <UniformInt64Partition PartitionCount="3" LowKey="-9223372036854775808" HighKey="9223372036854775807"/>
            </StatelessService>
          </ServiceTemplates>
        
            <DefaultServices>
              <!-- this is the initial backend worker staticly defined-->
            <Service Name="Worker1">
              <StatefulService ServiceTypeName="ComputeSvc" TargetReplicaSetSize="3" MinReplicaSetSize="3">
                <UniformInt64Partition PartitionCount="3" LowKey="-9223372036854775808" HighKey="9223372036854775807" />
              </StatefulService>
            </Service>
        
            <!-- this is an additional worker, initially we expect to offload here,
                 notice I am using a different partitioning scheme-->
            <Service Name="worker2">
        
              <StatefulService ServiceTypeName="ComputeSvc" TargetReplicaSetSize="3" MinReplicaSetSize="3">
                <NamedPartition>
                  <Partition Name="London"/>
                  <Partition Name="Paris"/>
                  <Partition Name="Newyork"/>
                </NamedPartition>
              </StatefulService>
            </Service>
        
            <!-- Staticly defining another statless service for the gateway-->
            <Service Name="Gw1">
              <StatelessService ServiceTypeName="GatewaySvc" InstanceCount="-1">
                <SingletonPartition/>
              </StatelessService>
            </Service>
        
          </DefaultServices>
        

On The Runtime Side

  • Application Package are copied to the cluster using Copy-ServiceFabricApplicationPackage cmdlet. Interestingly enough you don’t need to provide name or version, just a path for the Image Store service (more on this in later posts). Meaning, you can copy the package multiple times (which would be a mistake, that is why in the diagram I¬†have it as 1:1 relation).
  • ¬†Then you¬†will have to register your Application Type¬†once¬†and only once using Register-ServiceFabricApplicationType¬†cmdlet. Passing in the location where you copied the Application Package on the cluster.
  • Once an Application Type is registered I can create multiple Application Instances. an Application Instances are¬†created using¬†New-ServiceFabricApplication cmdlet. passing in:
    • Application¬†Instance Name ¬†in form of “fabric:/<arbitrary name>
    • Application Type Name
    • Application Type Version¬†
    • Application Instances¬†are the unit of management within a Service Fabric cluster. Each
      • Can be¬†upgraded separably.
      • Will have it is own hosts (as discussed¬†in¬†Activation & Isolation below). ¬†Application instances (even of the same Application type) does not share hosts.

Note:

When you Deploy¬†your Service Fabric VS.NET project. VS.NET runs “Package” msbuild target on *.sfproj and produces a package saved in <Solution>/<sfproj>/pkg/<build type>/ then runs through the above cmdlets via the script available in <solution>/<sfproj>/scripts/Create-FabricApplication.ps1 & Deploy-FabricApplication.ps1. VS.NET creates the first Application Instance with Instance Name = “Application Type Name” which tends to be confusing.

Activation & Isolation

For the sake of next discussion, we will assume that Fault Domain, Update Domain & Node are the same thing. We will just refer to them as Node or Cluster Node. We will also completely¬†ignore Placement Constraints, “TargetReplicaSize” & “MinReplicaSize” values.¬†

In previous posts I referred to your code running in a process. Let us attempt to detail this now. Imagine having an Application Type T registered in a cluster of 3 nodes.  With one Service Type S defined in Default Services section as S1  which has code package C with 1 Named Partition and 3 Replicas. If I created a new a new Application Instance I1 my cluster will look like that:

 

ClusterView1

Service Fabric does not like to host replicas of the same partition on the same Node. Because I have 3 Replicas of S1 it will decide to create 3 Hosts (one per node). This typically done by running your code (your exe process).

Now remember I can choose to activate services in an Application instance (either using Service Templates or using Service Types). My new services can follow different partitioning schema. The following example activates  S2 with 3 Uniform Partition & 3 Replicas each. Which will result in a different view of the cluster.

New-ServiceFabricService -Stateful `
                         -PartitionSchemeUniformInt64 `
                         -LowKey 1 `
                         -HighKey 3 `
                         -PartitionCount 3  `
                         -ApplicationName "fabric:/I1" `
                         -ServiceName "fabric:/I1/S2" `
                         -ServiceTypeName "S" `
                         -HasPersistedState `
                         -TargetReplicaSetSize 3 `
                         -MinReplicaSetSize 3

Now my cluster will look like

ClusterView2

Because S2 is Activated within an existing Application Instance, Service Fabric will not create new hosts, instead S2 will be activated within the same host processes already running. That means multiple Services within the same Application Instance and their Replicas will share the same Windows Server Process. This means they share the memory, handle table, address space and so on. For this Service Fabric preview version they also share the App Domain for CLR based services.

If I have other Services Types that runs in the same Code Package (similar to my sample above), They will be activated within the same existing hosts. Service Types can explicitly define different Code Package (for example using different Service Packages); in this case different host instances running different Code Packages (processes) will be instantiated on Cluster Nodes to run instances of this Service Type with the same Application Instance.

How will my cluster look like if created a new instance I2 of my Application Type T? (for example using the below PowerShell).

New-ServiceFabricApplication -ApplicationName "fabric:/I2" `
                             -ApplicationTypeName "T" `
                             -ApplicationTypeVersion "1.0.0.0"

ClusterView3

Because Service Fabric treats Application Instance as isolation boundaries, it will create a new instance of your Code Package on the cluster nodes. Remember S2 which was activated manually on I1 above (not using default services section)? it will not be activated in I2 application instance.

Similarly I can change Application Type Version in my package, copy and register on the cluster the same Application Type with a Different Application Type Version. Which will allow me to:

  1. Create Application Instances (isolated on Code Packages instances) from the new Application Type Version.
  2.  Upgrade existing Application Instance to the new Application Type Version.

Few “Gotchas” to Think About

  1. You will not be able to create partitions with # of replicas > ¬†# of cluster nodes. as Fabric really doesn’t like 2 replicas of the same partition in the same¬†node.
  2. For the same reason. For Stateless services you will not be able to state # of instances >  # of cluster nodes.
  3. If a replica fails the entire windows process instance is scrapped and recreated again (might be on a different node, if a process contains a Primary, an Active Secondary on different nodes will be prompted to Primary) with all replicas/instances running inside it. In essence un-handled exceptions are truly evil for your replicas.
  4. ¬†Don’t assume that certain replicas will exist together in the same windows process instance. And it is a good practice not to share process wide resources among replicas (such as Thread Pools, Static Variables etc.) unless you really understand the implications.
  5. Because all replicas share the process GC runs for all of them at once, certain principals in optimized CLR memory management applies heavily here. It is possible for a replica to hog the process resources and affect the performance of other replicas running in the same process.
  6. Although I did not talk about it. You can remove Activated Services (check remove-ServiceFabircService ccmdlet) from an Application Instances. This means replicas can be taken down from a process. Hence the practice of not sharing anything process wide.
  7. Service Fabric is an excellent tool. However it groups the display according to the Partition, not the node. Which might be confusing if you are trying to learn about these concepts. A Combination of SysInternals Process Explorer & Debug View will be needed if you want to drill into this topic. My sample code throws few OutputDebugString Win32 API calls (via Debug.WriteLine) to clarify the relationship between Windows Process & Replicas.

Making The Right Isolation Decision

  • For applications that require “Data Level” isolation, Partitions works really well. However keep in mind that your code might run in the same process. An example for that same Application Instance might support multiple departments – within the same organization – using partitions. Keep in mind that data are still shared on the same disk (as separate files). If you are dealing with undefined # of departments you can¬†activate services within the same application as needed.
  • For applications that require “Data Level” and “Process Level” isolation, use different¬†Application Instances. As an example of this SaaS application where each customer gets a dedicated application instance, each instance will have it is data and processes. As far as I can tell, this model is used by Azure Event Hub and Azure DocumentDB.

Factory Based Activation

If you noticed there is nothing that ties “Service Type” as a concept to the actual .NET class that implements it in the manifest¬†files. Service Fabric expects your Code Package to register “Service Types” with “.NET Types” inside your code this happens as the following (usually in your Main() method):

using (FabricRuntime fabricRuntime = FabricRuntime.Create())
    {

                    // static registeration for both service type
                    fabricRuntime.RegisterServiceType("ComputeSvc", typeof(ComputeSvc));
                    fabricRuntime.RegisterServiceType("GatewaySvc", typeof(GatewaySvc));
    }

The above register Service Types named “ComputeSvc” and “GatewaySvc” with their respective .NET types (who happened to share the same name).
For those of you who are using Dependency Injection (or abuse it, I may need to create a post just ranting about that) and want to inject dependencies in your Service .NET types instances you can use factories as the following:

    // factories for both Stateful and Stateless services
    class StatefulSvcFactory : IStatefulServiceFactory
    {
        public IStatefulServiceReplica CreateReplica(string serviceTypeName, Uri serviceName, byte[] initializationData, Guid partitionId, long replicaId)
        {
            Debug.WriteLine("****** Stateful Factory");

            // this factory works only with one type of services.
            // uri will give you hosting app, which you can use in SaaS models where appname maps to data location, reource pool, user identities etc.
            ComputeSvc svc = new ComputeSvc();

            // if i have a super duper DI i can assign it here (or use the constructor).
            // svc.MySuperDuperDI = CreateDI();

            return svc;

        }
    }

    class StatelessSvcFactory : IStatelessServiceFactory
    {
        public IStatelessServiceInstance CreateInstance(string serviceTypeName, Uri serviceName, byte[] initializationData, Guid partitionId, long instanceId)
        {
            // just like above only stateless
            Debug.WriteLine("****** Statless Factory");
            return new GatewaySvc();
        }
    }

// inside my Main() method

          using (FabricRuntime fabricRuntime = FabricRuntime.Create())
                {

                    fabricRuntime.RegisterStatefulServiceFactory("ComputeSvc", new StatefulSvcFactory());
                    fabricRuntime.RegisterStatelessServiceFactory("GatewaySvc", new StatelessSvcFactory());
}

Instead of explicitly specifying .NET type I am passing in a factory which can create an instance of the Service (as a replica or as an Instance). It also gives me a change to learn about which Application Instance is used (this can be extended in highly isolated environments such as SaaS).

Running It Step By Step

I have published a sample code used in the discussion above here https://github.com/khenidak/ServiceFabricSeries/tree/master/HostingActivation you can run the walk through using do-it.ps1 if you want to keep your dev machine clean run un-do-it.ps1 after you are done.

Extras! – after you run do-it.ps1 try to play around with Application Type Version and use the cmdlet to copy,register and create a new application instance using the new Application Type Version

Epilog

I would strongly recommend playing around with the concepts above, having Process Explorer & Debug View available. While most of times you do not have to worry about the underlying platform details yet understanding its internal dynamics is key for a good solution architecture. I hope the above clarified some of Service Fabric concepts.

Till Next Time!

Questions / comments

@khnidk

Service Fabric: Stateless Services

Prelog

I admit initially everything about stateful services was counter intuitive.  Storing data with compute node contradicts what we have been hardwired to do for the last couple of decades specially on Microsoft powered platform. We are trained to isolate the persisted data away from compute in a highly available persisted storage either at the block level by a SAN backed clusters such as Sql Servers or at the byte level such as Azure storage be it tables or blobs. Service Fabric currently Рas far as i can tell Рis the only platform that works in this way (data are stored with compute processes).

Stateless services however is what we have been used to. It is a code we build activated one or multiple times in process instances on one or more server machines. Because there is not persisted state; once the process stopped or crashed all the process memory scrapped and there is no data stored anywhere.

Service Fabric Powered Applications Categorized by Lifetime

If you think about Service Fabric Powered Applications from a perspective who controls their life time you will end up with the following:

  1. Applications that you as a the developer control their life time they usually start and once deployed and keep running until un-deployed or moved around on the cluster as it is fluid state change (as a response node availability changes, load re-balance etc..). Those are Service Fabric Services either Stateful or Stateless.
  2. Application that the caller/client control their life time. They are started (the correct term is: Activated) when the first call arrives, terminated when no more calls are coming (after a grace period). Those are Service Fabric Actors either Stateful or Stateless.

We have talked about Stateful Services (here) with focus Partitions, today we will talk about Stateless Services and later we will cover Actors.

Creating Stateless Services

To create a stateless service you inherit from StatelessService base class which will give you 

  1. A means to create communication listener (via overriding CreateCommnuicationListener method) that works exactly as a stateful service as described here which will allow you to listen to incoming requests to your service.
    
            protected override ICommunicationListener CreateCommunicationListener()
    
  2. A a single call is issued to the following object method
    protected override async Task RunAsync(CancellationToken cancellationToken)
    

It is assumed that you are supposed to keep processing via means of a loop until the token is signaled. Again similar to Stateful service returning from this method will not terminate the replica.

Stateless Services Use Cases

Statless Services in addition to supporting partitions (one primary per partition, as there is no need for secondaries) supports “Instances” as well. Instead of describing how¬†Partitioning/Instances is different with Stateless services I¬†decided to use¬†architecture uses case to describe the use cases for the following reasons:

  1. The architecture use cases published with the documentation and samples while really good they work sometime unintentionally as an anchor where we template-ize our architecture based on them missing all other great possibilities the platform can offer.
  2. Introduce few other possibilities as baby steps at taking the platform beyond boiler plate designs.
  3. It is more fun this way

The following are few uses cases of how stateless services (most also apply on Stateful as well) can be used.

The Gateway / Web Site

We have discussed the gateway in the previous post (Here); i have borrowed the diagram again below

GW Architecture Overview

I will not add more to it. Yet it is worth mentioning that gateway pattern serves well in on-ramping your system bit by bit on Service Fabric. Your gateway might be talking to services already built on different types of systems. In order to enable the gateway pattern you will need a set of instances typically one per each node on your cluster, for this modify ApplicationManifest.xml as the following:


<!-- default services XML element -->
<Service Name="StatelessSvc01">
      <StatelessService ServiceTypeName="StatelessSvc01Type" InstanceCount="-1">
        <SingletonPartition />
      </StatelessService>
    </Service>

Having Instance count as -1 Fabric Runtime will ensure to activate one instance per node for each node of the cluster. Because all your services are listening to the same URL it is easy to configure a load balancer on top of them that services the services as a single URL to your external clients.

You can use a smaller # of instances (as in less than total number of cluster nodes). In this case your load balancer should be proactively updating its internal routing table so that it won’t route to nodes that doesn’t host services.

If you used # of instances > than # of nodes service activation will fail after it activate # of instances = # of nodes. Singleton partitions are not meant to live with each other inside the same node (as a placement constraint). Hence Service Fabric will fail after reading # of instances = # of nodes. There will be no other locations it can find that meets the singleton special placement constraint. More on this in later posts.

the current version of WordCount sample uses this approach.

Backend Worker

Imagine your a typical workflow based backend processing that is characterized as the following:

  1. Highly Sequential.
  2. Each step is a long running it either succeed from beginning to end, retry from beginning to end  or fail. State will only need to be persisted before and after each step.
  3. Coordination Is needed to ensure no step is started before another.

We can lay out a system like that on Service Fabric as the following

Background Worker

 

The process described above is bread and butter for financial institutes used in fiscal period closure, produce statements, update credit reports and so on.

A system like that will consist of a Stateful Service X with # of partitions. I might decide to partition by operation type (example: Account Statements and Credit Reports). Each instance of account statement process require steps I can call externally into a highly partitioned stateless service. The system only needs to persist state before and after each step (by the stateful service or an external store). The stateful service essentially is a book keeper for the processes. I can choose to partition the stateless services into steps, operation type and so on. A System like that is typically designed to fan out and max out H/W resources to ensure biggest bang for a buck. My partitioning schema (for stateless services) will be like that


 <!-- default services element -->
 <Service Name="StatelessSvc01">
      <StatelessService ServiceTypeName="StatelessSvc01Type">
        <UniformInt64Partition HighKey="100" LowKey="1" PartitionCount="100" />
      </StatelessService>
    </Service>

I have a 100 partition on a uniform key range (we talked about this here). I can partition according to step type and so on, or i can treat them all equal and use my partition resolution (while i am calling them from my stateful service) to route calls to them on round/robin bases.

I can also use actors instead of the stateless services if I can ensure that each instance will handle one call at a time (more on actors on later posts).

In addition to throughput gains from a design like that, I can now upgrade my stateless services (which represents parts of logic) without having to upgrade my book keepers as well (stateful services). 

Singleton Monitor

Systems, specially complex systems require a singleton of some sort. A single instance of a process that monitors events and accordingly react. For example:

  1. A SaaS based system will have a monitor application that scale and de/scale the system based on the load.
  2. A log collector application that periodically moves logs from all servers to a unified storage analysis and audit.

Service Fabric will ensure that A singleton service will always have one and only instance on the cluster.  To enable this you will need to change your ApplicationManifest.xml as the following

<!-- Default services element -->
<Service Name="StatelessSvc01">
      <StatelessService ServiceTypeName="StatelessSvc01Type">
        <SingletonPartition />
      </StatelessService>
    </Service>

Epilog

This part of the journey was focused on Stateless services.

If you noticed i have been using the term “Activation” and “Instance” a lot in this post. I am preparing a post on how instance/Service/Application/Host are related to each others. Stay Tuned

Till next time!

@khnidk

Service Fabric: Partitions

Prelog

Tackling a computing problem by partitioning it into a smaller pieces is not new. In fact it goes back to 1970’s (check LPAR references on Wikipedia here https://en.wikipedia.org/wiki/Partition). This particular solution or approach got even higher traction in our field as the data sets we are dealing with got even bigger. Enter “Sharding” (check here: https://en.wikipedia.org/wiki/Shard_(database_architecture) the key premise is if we are can split a large data set into a smaller buckets each can be served by a server then by definition time T1 to find record R in data set S of size Z1 is longer¬†than time T2 to find same¬†record R in data set S1 of size Z2 which is equal to roughly Z1/(number of shards). This premise obviously works and is embraced¬†widely.

Now imagine your solution which consists of two parts compute and storage which is Shard-ed across multiple buckets. Your compute receives the requests that has to do with either reading or alter data. the key decision to make before anything to identify which shard should it modify the data in. this typically led to one of two approaches (generally speaking, there are more).

  1. Create a list of shards according to key data element. For example the US State of the user lives in. which means you will have 52 shard. For each request before processing match a user State to a shard and perform the action accordingly.
  2. Create a list of shards with a sequence of arbitrary keys (example integer sequence). For each  request use a hash function against a key data element (for example user email) which will yield into a shard key.

Once you have the key to the shard you can identify the address or location that serves that shard.

Service Fabric Partitions

The easiest way to tie the above to Service Fabric is to introduce few Service Fabric Concepts.

  • Service Fabric enables you to store data locally to the service itself without the need to go to external store such as Azure Storage or Sql Server. This data is referred¬†to as a Service State.¬†Services that maintain¬†state locally are referred to as Stateful Services. There is a lot of performance advantages to this approach as your “compute” operations doesn’t need to perform out of band (network calls) to other resources to acquire the needed data to process the request.

Service Fabric also supports stateless services. Which doesn’t require to save state locally, they are called “Stateless Services” we will take about them in later posts.

  • Because this state¬†is important and persisted between requests this state¬†need to be highly resilient to failures. Service Fabric¬†ensures¬†that for each partition the¬†state is saved to quorum of 3 (this is not a fixed # and we will talk about this in details in later posts). This 3 is the number of replicas you identify for your services. Service Fabric also ensures that replicas are deployed to multiple servers by employing mechanisms similar to Azure PaaS called fault domains & update domains. Typically you will have one primary replica responsible for writing and read data. and a number Active Secondaries which gets a copy of the write for every write operation performed on the Primary synchronously. There will be a lot of¬†talk on Service Fabric Replicas in later posts.
  • Now you can split your computing problem using multiple smaller pieces¬†(each will be highly resilient to failure), in this case we name them¬†Service Fabric¬†Partitions. Each partition is responsible for part of your “state” and by definition your “compute“.

Service Fabric Partitions in Relation to Service Life Cycle

When you deploy a new service to a cluster. Service Fabric run time deploys a new instance of your code for each replica in your partitions. For example if you have 26 partitions with 3 replica then the # of instances deployed will be 26 X 3 = 78. the Fabric run time will not consider the service healthy until the # of instances deployed is equal to that number.

In case of failures (node down or similar event). Service Fabric will re-initialize partitions served by this node on other healthy nodes keeping in mind fault domains, update domains, resource balancing & Placement Constraints. We will talk about all of that in details in later posts.

Creating Service Fabric Partitions

There is a big implicit assumption here. The number of partitions in your service is fixed. And predetermined during the design phase. Hence the amount of efforts you should put in defining the correct # of partitions and how they are partitioned.

Remember from an earlier post (http://henidak.com/2015/07/blog/). ApplicationManifest.xml defines how the service will be deployed on the cluster and # of partitions. Configuring partitioning is done by modification to ApplicaitonManifest.xml

Service Fabric Supports the following partitions Types

Uniform Partitions

This is where your partitions are identified by a key spread over int64 value. For each incoming request a hash function is used to identify the key.  For each partition Service Fabric will assign a low key and high key. In essence every partition is assigned a range of keys. The key you identify based on the hash is used to match the partition (key falling with this range).

Service Fabric does not force a certain distribution of load and data across the partition nor is concerned with how you identify the key. Hence the importance of your hashing function. It can be as simple as matching the first of a word (check word count sample here https://github.com/Azure/servicefabric-samples/tree/master/samples/Services/VS2015/WordCount).  Or it can be as complex as FNV hashing functions (check: http://www.isthe.com/chongo/tech/comp/fnv/).

 

<DefaultServices>
    <Service Name="ReverseCompute">
      <StatefulService ServiceTypeName="ReverseComputeType" TargetReplicaSetSize="3" MinReplicaSetSize="2">
        <UniformInt64Partition PartitionCount="26" LowKey="1" HighKey="26" />
      </StatefulService>
    </Service>
  </DefaultServices>

The above is a snippet for a service (ApplicationMainfest.xml DefaultServices section) that performs a reverse on strings it receives. it is partitioned into a 26 partition (for English alphabetical letters). And my keys are spread over values from 1 to 26. my hashing function in this case is a simple as

 public static long getPartitionKeyForWord(string sWord)
        {
            return ((long)char.ToUpper(sWord[0])) - 64;
        }

Named Partitions

Named partition is a predefined list of partitioned where each is identified by name. as an example

<DefaultServices>

    <Service Name="PartitionedService">
      <StatefulService ServiceTypeName="PartitionedServiceType" TargetReplicaSetSize="3" MinReplicaSetSize="2">
        <NamedPartition>
          <Partition Name="WestUS" />
          <Partition Name="CentralUS" />
          <Partition Name="EastUS" />
        </NamedPartition>
      </StatefulService>
    </Service>
  </DefaultServices>

The above is¬†an arbitrary service split over 3 named partitions for each US zone.¬†In this case I¬†don’t need to employ a hashing function I¬†can just use user address to identify which partition should handle the¬†request.

Singleton Partition

As the name implies. Singleton partition is a single partition that serves all your data and compute requirements. In essence this is you saying “please don’t split my computing problem into a smaller pieces”

<DefaultServices>
    <Service Name="PartitionedService">
      <StatefulService ServiceTypeName="PartitionedServiceType" TargetReplicaSetSize="3" MinReplicaSetSize="2">
        <SingletonPartition/>
      </StatefulService>
    </Service>
  </DefaultServices>

The above is the same arbitrary service configured to use a singleton partition.

Partitions & Resources

If you think about it. Your stateful service can be categorized as the following:

  1. Service that does not interact of with the external world. As in it doesn’t listen to external calls/network address. This can be a background service that pulls data from external store performs certain staged compute with data saved locally until final stage where the service spits the data out. An example for this as index services such are those used by¬†search engine.
  2. Services that does interacts with external world. This service typically listens network traffic (as an example Web API or TCP).

Listening to the External World

Services are running on the cluster as replicas. The cluster state itself is fluid. And changes in cluster state (# of nodes, nodes availability and health, resource re-balancing to name a few) May require changes to where your replicas are running (on which node). Hence the address which your services are listening is subject to change. The cluster itself is designed to support high density micro service deployment. So fixed address will cause ports collision/ address conflicts and so on.  To solve this problem Service Fabric allows you to define the network resources needed in your service in form of endpoints. If you read the previous post you will notice that Services are described in the ServiceMainfest.xml and this is where we can describe our endpoints. Usually these end points are in fact network ports (not the address).  These ports are handed over to your service to construct the listening address based on them.

It is important to understand that Service Fabric does not enforce a certain communication type nor addressing scheme.

Describing Your End Points (Ports)

Service Fabric allows to describe end points as the following

Unnamed Ports Approach

<Resources>
    <Endpoints>
      <Endpoint Name="ServiceEndpoint" />
      <Endpoint Name="ReplicatorEndpoint" />
    </Endpoints>
  </Resources>

The snippet above is from my ServiceManifest.xml which states 2 endpoints.  Each is a port. Since it is unnamed Service Fabric will assign dynamic one. One for each of your partitions (not replicas, this is important as we will see below). this approach is common for Uniform and Named Partition and in cases where you expect multiple primary partition of the same service to be served by the same cluster node.

Service Fabric uses the port range stated in the ClusterManifest.xml (section NodeTypes/NodeType/ApplicationEndPoints) to pick a port from. It also ensures that the same port is not used on the same cluster node twice. The endpoint name serves as a key that allow you to locate your port (more on this below).

Note: the ReplicatorEndPoint is used by the mechanism that ensures that stateful service writes are committed against a quorum before considered successful to ensure resilience against failure.  

Named End Points (Ports)

<Resources>
    <Endpoints>
      <Endpoint Name="ServiceEndpoint" Port="9000" />
      <Endpoint Name="ReplicatorEndpoint" />
    </Endpoints>
  </Resources>

The above snipped forces your “ServiceEndPoint” to always use port 9000. In this case this port doesn’t have to reside within the range in the ClusterManifest.xml. this is used typically if:

  1. Your primaries will never collide and no more than one primary will be hosted by a cluster node.
  2. You are using singleton partition in this case only one partition with one primary replica will ever exist on the cluster.
  3. This is a stateless service in this case you need to know the exact port (and the listening address) to register it with an external load balance-er (more on this in later posts).

Note: you can also identify the protocol and type of your end point but this have no impact of the port assignment. 

Note: the same mechanism – describing resources – is also used to describe other resources, which we will talk about later

Listening & Constructing Listening Addresses

Your stateful service inherit from StatefulService class. VS.NET tooling ensures this once you select Stateful service template. StatefulService base class defines a

  protected override ICommunicationListener CreateCommunicationListener();

This method is called to return an object¬†that implements¬†ICommunicationListener interface. You typically override this method and return your concrete ICommunicationListener implementation. ICommunicationListener is one of those badly named interfaces. The ICommunicationListener itself doesn’t define “listener” it rather defines the lifetime of a typical network listener as the following:

namespace Microsoft.ServiceFabric.Services
{
    public interface ICommunicationListener
    {
        void Abort();
        Task CloseAsync(CancellationToken cancellationToken);
        void Initialize(ServiceInitializationParameters serviceInitializationParameters);
        Task<string> OpenAsync(CancellationToken cancellationToken);
    }
}
  • Abort: called when your replica is being aborted. This is when Service Fabric is responding to a permanent failure in the replica (that owns¬†ICommunicationListener)¬†and it needs to re-initialize it. if you noticed this is a void method and runs synchronously you shouldn’t hog the thread calling you during your clean up.
  • CloseAsync: called when your replica is being closed, during un-deployment or an upgrade. The CancellationToken is sent to you to attach your¬†work to an overall cascading cancellation with the calling thread.
  • Initialize: Called before OpenAsync¬†with¬†a set of parameters (when you cast them to StatefulServiceInitializationParameters¬†containing
    • Partition ID
    • Replica ID
    • ActivationContext – to reach packages (check earlier posts on configuration and data packages).
    • Service Name / Service Type Name
  • OpenAsync: Called when the service is ready to listen. The string returned is the address of your listening endpoint and is reported to clients (check Reaching Your Partitions below) and on Service Fabric Explorer. For example once I¬†select a partition of my service I get this info (among them is the string i¬†returned by¬†OpenAsync method).

ReplicaAddress

Keep in mind that you typically listen to an address and report a different one. for example you listen to http://+/ and return http://<ServerName>/ . This to ensure perform listening on all IPs assigned to the machine but you tell the outside world that you listening to a particular machine name.

So your job during initialization is to define the addresses; Internal & External. and in OpenAsync start the actual listening and report the external address. as an example here is my Initialize and OpenAsync methods.

void ICommunicationListener.Initialize(ServiceInitializationParameters serviceInitializationParameters)
        {
            // get the end point description
            EndpointResourceDescription serviceEndpoint = serviceInitializationParameters.CodePackageActivationContext.GetEndpoint("ServiceEndpoint");
            int port = serviceEndpoint.Port;

            if (serviceInitializationParameters is StatefulServiceInitializationParameters)
            {
                // this should be +:port but will require presetup for each port
                // allocted for the service
                statefulInitParams = (StatefulServiceInitializationParameters)serviceInitializationParameters;

                this.ListeningAddress = String.Format(
                    CultureInfo.InvariantCulture,
                    "http://+:{0}/{1}/{2}/",
                    port,
                    statefulInitParams.PartitionId,
                    statefulInitParams.ReplicaId);
            }
            else
            {
                // our service is only stateful. can not run as stateless
                throw new InvalidOperationException();
            }
            this.PublishAddress = this.ListeningAddress.Replace("+", FabricRuntime.GetNodeContext().IPAddressOrFQDN);
            this.PublishAddress = this.PublishAddress.Replace("http://", "ws://");
            Debug.WriteLine(string.Format("New replica listening on: {0}", this.ListeningAddress));

        }

        async Task<string> ICommunicationListener.OpenAsync(CancellationToken cancellationToken)
        {

            try
            {
                await this.startServer();
            }
            catch(Exception e)
            {
                ServiceEventSource.Current.ErrorMessage("Replica:{0} Partition:{1} - Failed to start web server on {2}",
                                                        this.statefulInitParams.ReplicaId,
                                                        this.statefulInitParams.PartitionId,
                                                        this.ListeningAddress);

                Debug.WriteLine("Start Web Server Failed: " + e.Message);

                //TODO: report health
                throw (e);

            }

            return this.PublishAddress;
        }

In here i defined 2 local member variables one represents internal listening address named ListeningAddress and one reported to external world named PublishAddress. My listening address is http://+/<ParttitionId>/<ReplicaId> to avoid collision if my replicas were hosted on the same node (which is typical for a dev box where everything is hosted on a single machine). My public address is ws://<machine name>/<ParttitionId>/<ReplicaId>. The differences are:

  1. Internal address uses all the IPs of the machine by http://+/ while external is actual machine name http://<Machine Name> to allow clients to perform cross network calls.
  2. Internal address uses http:// because I am using HttpListener .NET classe with OWIN which require address in that http:// scheme. External address are ws:// because this is a web socket server and most clients require addresses in that scheme.

Note: HttpListener enables port sharing by defining different listening addresses. So i can have multiple replicas using this long address format listening on the same port.  

Listening on Your Active Secondaries

By default the above behavior applies only on Primary Replicas. So out of Рin my example Р78 replica (26 partition X 3 Replica each) only 26 Primary Replicas will be allowed to listen to incoming network calls. This is based on the assumption that request processing (for every request) you will need to perform a read and write operation. Write operations are only allowed on Primary Replica.

If this is not the situation you are dealing with (where some of the requests) can be processed with read only operations then it is possible to enable listening on Active Secondaries to distribute the load across multiple processes. given that

  1. You can differentiate incoming requests based on (read only R or R/W) operations.
  2. You can route the requests to the correct replica based on the type (R vs. R/W).
  3. You perform read only in your secondaries. Any write operation on your secondaries will result in an exception.

To enable this behavior just override EnableCommunicationListenerOnSecondary property in your service class and return true value.

Additional Notes on StatefulService & ICommunicationListener

It is important to draw a distinction between what Service Fabric needs to run your services correctly. And the helper classes that the Service Fabric team created to make our job creating services easier. At minimum Service Fabric needs your service to implement IStatefulServiceReplica interface. A quick look at it you will see the communication & listening life cycle management is not really a part of it. You can choose to implement your own concrete implementation of IStatefulServiceReplica. But if choose to do so you will have to implement a lot of state management details (specially replicating it across multiple replicas) in addition to communication & listening life cycle management.

It is also important to note that these helper classes are part of the SDK and are/will be supported by Service Fabric team. So you are better off fitting within these classes than implementing your own.

The details discussed above is part of the helper classes Service Fabric created and are not enforced by the run time. You are free to choose a different implementation approach we will be covering this in later posts.

Reaching Your Partitions

Everything we talked about so far was about services, partition and replica.¬†This section is about clients that wants to connect to these services’ replicas. Since the state of the cluster is fluid, replicas move between machines accordingly you will need to to resolve the correct Partition & Replica and get its address. Choosing the¬†pattern highly depends on the following factors:

  • Clients that are not based on .NET and¬†can be aware of the existence of Service Fabric cluster and the above facts. An example of this could be your favorite NodeJS server application calling into services hosted by Service Fabric cluster via REST or Web Sockets. These clients can use Service Fabric¬†REST interface to resolve the partition and replicas. ¬†The Rest API is hosted on port 19007 (with port changing for every node) for the public preview version. This expected to change as we approach release date to one port for the entire cluster¬†isolating client from the location of the REST API service (currently called HttpGateway) for more details on how it is hosted check the your ClusterManifest.xml.

Clients that are based .NET can still use this option. These clients might want to avoid using TCP/IP (as used by FabricClient class, described below) as it is not enabled on corporate firewall or other reasons.

  • Clients that are based on .NET and can be aware of the existence of Service Fabric cluster can use FabricClient class part of System.Fabric namespace to resolve partitions (described below). An example for this is a either a service (acts as a client for other services on the cluster) ¬†Or a .NET server application such as OWIN or others using a service hosted on the cluster.
  • Clients that are based on .NET or others and can not be aware of the existence of the cluster. The solution for this problem¬†falls within “Add another abstraction layer”. In essence you add a gateway stateless service using named port exposed via a load-balancer the service will perform the resolution on behalf of the caller and connect to the correct partition. Since this is a gateway other interesting behaviors can be added such as:
    • Security¬†Termination at the gateway level.
    • Protocol Transition. for example your gateway can be REST API to support the maximum # of client types but your back-end partitioned stateful services can use sockets.

Evantually your architecture will (Logical View) look something like this

GW Architecture Overview

 

In the above architecture when you generate the address you will have to make sure that the external address (returned by OpenAsync) points to the load balancer not the node name or cluster address.

Note: on the below discussion we are not covering how Fabric Client authenticates to the cluster.

Using Fabric Client To Resolve Partitions

Irrespective of your architecture if you have to process a request via services¬†on a cluster you will eventually have to resolve which partition (and replica) should respond to this particular request. Service Fabric using “ServiceManagementClient” object (as member variable named “ServiceManager”) ¬†can be used as in the following sequence

The below is referred to Complaint based Service Resolution:

  1. Resolve the current partition key using you hashing function.
  2. Use FabricClient.ServiceManager.ResolvePartitionAsync method to resolve the partiton as ResolvedServicePartition object.
  3. Locate the endpoints used by the replica you are looking for (Primary or other).

 var serviceUri = new Uri("fabric:/ReverseApp/ReverseCompute");
            var client = new FabricClient(); // or new FabricClient(params host endpoints as string) if you are running this outside the cluster.
            var pKey = await ResolvePartitionKey(RequestObject); // returns either a string or long (Named Partitions || Uniform Partitions).

            var ResolvedPartition = await client.ServiceManager.ResolveServicePartitionAsync(serviceUri, pKey);

            // get end points
            var endpoints = ResolvedPartition.Endpoints;
            foreach (ResolvedServiceEndpoint ep in endpoints)
            {
                if (ep.Role == ServiceEndpointRole.StatefulPrimary)
                    _serviceAddress = ep.Address; // service Address is a member variable

                Debug.WriteLine("Address" + ep.Address); // address is whatever returned in your ICommnuicationListener.OpenAsync
                Debug.WriteLine("Role" + ep.Role.ToString()); // StatefulPrimary/Secondry/Stateless or invalid
            }

The above is executed on demand, which is typically when you have a new request. Remember between one request and another the cluster state might change and the address you have might change. You can rerun the same code with a slight change as the below:


// PresolvedPartition is previously resolved partition.
   var ResolvedPartition = await client.ServiceManager.ResolveServicePartitionAsync(serviceUri, PresolvedPartition);
// get end points and the rest of actions

You can listen to notification one when a partition location changes. This is typical when you expect your clients will send mutiple messages targeting the same partition. The below is referred to as Notification Based Service Resolution

  private void PartitionChangeHandler(FabricClient source, long handlerId, ServicePartitionResolutionChange args)
        {
            // every time a change occured this will be called "After" the change

            // check there are exceptions by
            //args.HasException

            var endpoints = args.Result.Endpoints;
            foreach (ResolvedServiceEndpoint ep in endpoints)
            {
                if (ep.Role == ServiceEndpointRole.StatefulPrimary)
                    _serviceAddress = ep.Address; // service Address is a member variable

                Debug.WriteLine("Address" + ep.Address); // address is whatever returned in your ICommnuicationListener.OpenAsync
                Debug.WriteLine("Role" + ep.Role.ToString()); // StatefulPrimary/Secondry/Stateless or invalid
            }
        }

// other method
 var serviceUri = new Uri("fabric:/ReverseApp/ReverseCompute");
            var client = new FabricClient(); // or new FabricClient(params host endpoints as string) if you are running this outside the cluster.
            var pKey = await ResolvePartitionKey(RequestObject); // returns either a string or long (Named Partitions || Uniform Partitions).

            var ResolvedPartition = await client.ServiceManager.ResolveServicePartitionAsync(serviceUri, ResolvedPartition);
            // use ResolvedPartition to get the endpoint as described before.

            var handler = client.ServiceManager.RegisterServicePartitionResolutionChangeHandler(serviceUri, pKey, PartitionChangeHandler);

            // once you are done use the following to un-register
            client.ServiceManager.UnregisterServicePartitionResolutionChangeHandler(handler);

You can also listen to any change in any¬†service partition state by using¬†RegisterServicePartitionResolutionChangeHandler overload that doesn’t require a partition key.

As you noticed partition key is a center piece to resolution. You can get a view of all partitions – without the need for a partition key – of a certain service using FabricClient.QueryManager (type of QueryClient) as the following

var serviceUri = new Uri("fabric:/ReverseApp/ReverseCompute");
var client = new FabricClient(); // or new FabricClient(params host endpoints as string) if you are running this outside the cluster. 

            var partitions = client.QueryManager.GetPartitionListAsync(serviceUri).Result;

 // resolve all partitions as usual - in my case i am using Uniform Partitions. 

            foreach (Partition p in partitions)
            {
                var partitionInfo = p.PartitionInformation as Int64RangePartitionInformation;
                Debug.WriteLine(partitionInfo.LowKey +  " " + partitionInfo.HighKey);
                var ResolvedPartition = await client.ServiceManager.ResolveServicePartitionAsync(serviceUri, partitionInfo.LowKey);
                var endpoints = ResolvedPartition.Endpoints;
                foreach (ResolvedServiceEndpoint ep in endpoints)
                {

                    Debug.WriteLine("Address" + ep.Address);
                    Debug.WriteLine("Role" + ep.Role.ToString());
                }

                Debug.WriteLine(" ");

            }

 

How Does FabricClient Really Work

FabricClient “XXXManager” members (as of preview version) sets on top of a native MTA COM¬†object (apparently COM¬†is still love). almost each of them sets on top of a MTA COM object.¬†¬†for example ServiceManager (ServiceManagementClient) sets on top of COM with UUID of¬†“8180db27-7d0b-43b0-82e0-4a8e022fc238”.¬†Almost all calls are funneled through this COM object. There is a serious wrapping efforts in FabricClient code to wrap COM MTA with call backs into .NET async/await pairs. this result into the following sequence (when you perform resolution):

  1. Resolution Call  is wrapped into 2 delegates one for on going call and one for call backs.
  2. Call is interop-ed to COM runtime.
  3. The native COM serializes and performs the network call over TCP/IP (on port 1900). The COM object also maintains the connection between client and server.
  4. Call is routed to server, de-serialized and eventually routed Naming Service which stores updated list of services/partitions/replicas and addresses.
  5. Server responds, serializes the result, puts on network packets to the client (COM Object).
  6. COM serializes and bubbles it to your CLR FabricClient via interop then call backs.

As you can tell this is pretty expensive call with network and COM interop in the middle between your code and needed information. This doesn’t change in case of Notification Based Service Resolution. At least it begins with step #4 Also registering for all partitions is just a waste of resources (imagine having a G/W service with 10 instances, each subscribing changes on a compute service with 100 partitions).

There are ways to work around that or at least optimize our resolution mechanisms, we will have another post dedicated just for this topic. 

 

Epilog

I admit when I¬†set out to put this post together I¬†didn’t expect it to be that long. But my aim has always been providing a cohesive set of information in one location that allows the reader to create advanced scenarios on top of Service Fabric. I have deferred some deeper dives in sub topics to follow up posts, so stay tuned.

till next time!

@khnidk

Service Fabric: The Anatomy of a Service Fabric Application Package

Prelog

Before we begin I believe a quick introduction to what is service fabric is due. Check https://azure.microsoft.com/en-us/documentation/articles/service-fabric-overview/ for an architecture overview and https://azure.microsoft.com/en-us/documentation/articles/service-fabric-technical-overview/ for technical overview.

So what is a Service Fabric application package anyway? A service fabric application package is the smallest deploy-able unit to a service fabric cluster. The content of an application package describe a service or a set of services that will be deployed on a Service Fabric cluster. The content and structure of the package is so important to understand in context of creating & updating your micro- services. Hence a dedicated post on this topic.

Service Fabric Application Package Contents

Package Directory Physical Structure

The following is what VS.NET actually produces when you Right Click->Package on your Service Fabric Application project. (Go to %solution directory%/%Service Fabric Application project%/pkg/Debug to see it). Which what gets eventually deployed to Service Fabric cluster using PowerShell cmdlets.

  • One ApplicationMainfest.xml file: contains the description of
    1. What are the services included in the package (with reference to Service Manifest files)
    2. How are they are deployed to the cluster. Such as whether they are stateful or stateless and how each service is partitioned and number of replicas per partition.
  • Number of directories ‚Äď each is for a service within the application package. Each will contain the following
    • One directory named ‚ÄúCode‚ÄĚ where your .NET assembly & and referenced assemblies will be copied to.
    • One or more directory mapped to configuration and data packages (more on this below)
    • One ServiceManifest.xml contains description of:
      1. What are the services in your assembly and their .NET types.
      2. What are the code, configuration & data packages.
      3. Resources used by your services (more on that in later posts).

folderStructure

 

For example the above is the contents of my application package. It contains one service called ‚ÄúSFPkg.StatefulSvcPkg‚ÄĚ which contains ‚ÄúCode‚ÄĚ directory. A configuration package named ‚ÄúConfig‚ÄĚ and a data package named ‚ÄúSvcData‚ÄĚ.

What is “Configuration” & “Data” Packages?

There is nothing to say about them beyond naming. Configuration is whatever you use in services such as API addresses, Throttling values and so forth. Data is whatever static data you use in your service. for instance the example i have in my repo right now uses “States” and “Zones” as a static data.¬†As you probably figured by now. Configuration & Data are shared between replicas¬†of “Services” not with other services even those which are members of the same Application Package.

Notes on Configuration & Data Packages Versioning (& Updates)

Everything in the application package is versioned. These versions allows you to update the package on the cluster using Rolling Update. Rolling Update is a mechanism that allows you to perform live updates on the application without taking the services down.

And yes you guessed it right you can update each component of your service without affecting the rest. As in update code only, configuration only, data only or combination of them. We will be covering Rolling Updates in later posts.

How Service Fabric Application Package Relates to Your Visual Studio Solution

1-VSToPackage

//You know the drill, Click to enlarge.

 

The above is a screen shot of Visual Studio Solution (the same one produced the directory structure presented earlier).

Logical View of a Single Service Fabric Application Package Contents

  1. Application Manifest
  2. Services (for-each)
    1. Service Manifest: which describes how your service should run on top of Service Fabric cluster.
    2. Code Packages: One or more .NET assemblies.
    3. Data Packages: One or more data packages. Each data package is a directory in your visual studio project. A directory represent one data package and can contain multiple data files. Service Fabric treats data as opaque binary files and avail them to your services.
    4. Configuration Packages: One or more configuration packages. Each configuration package is a directory in your visual studio project. A directory represent one configuration package and can contain multiple files. Except for Settings.XML Service Fabric treats configuration file as opaque files and does not process them.

There are two types of configurations associated with your service

    1. Settings.xml: by default added to your Service Fabric Service project by Visual Studio tooling. And can be accessed using
      var configPkg = ServiceInitializationParameters.CodePackageActivationContext.GetConfigurationPackageObject("Config");
      
       // working with standard configuration file settings.xml
      // settings file has to be XML as it is surfaces in Settings member below 
      // (fabric takes care of loading it for you)
      var customSection = configPkg.Settings.Sections["<Section name here>"];
      

      Then parameters collection each will have a name and value (strings)

    2. Custom Configuration: (discussed below)

Configuration & Data Packages

Service Fabric enables you to deploy configuration and data in a non-specific format (except for Settings.xml as described below) with your service code. The configuration and data can later be updated independently from the service code. Updating the configuration and data will not trigger tear down/recreation replicas.  Note: App.Config is part of the service code and updating it will require code update.

Data packages works exactly like configuration packages. Data packages are opaque to Service Fabric. And can be in any format. It can also be updated independently from service code without triggering tear down/recreation of replicas. They are read only objects; all replicas will get the same copy of the Data Packages.

Typically when designing a micro-service you have 3 types of data:

  1. Old with very low volatility typically generated from request processing or OLTP like operations. Those are externalized in external storage such as Azure Tables/Blobs or SQL.
  2. Current highly volatile typically used and generated from request processing or OLTP like operations. Those are partitioned and kept with your services using reliable queues and reliable dictionaries.
  3. Lookup data that is almost static and used during request processing or OLTP like operations. This is your Data Packages.

For both types of packages Service Fabric allows you to subscribe to ‚ÄúAdd‚ÄĚ, ‚ÄúModified‚ÄĚ & ‚ÄúDeleted‚ÄĚ events in order to update the internal state of your service accordingly. These events are typically fired during Rolling Updates.

Adding and Using Configuration Packages to Your Application Package

Using Default Settings.xml

Settings.xml is added by default in the ServiceMainfest.xml by Visual Studio (and is used by the classes your service class is inhriting from (more on this later). Visual Studio adds it as the following

  <!‚ÄĒ
Config package is the contents of the Config directoy under PackageRoot that contains an independently-updateable and versioned set of custom configuration settings for your service. 
-->
  <ConfigPackage Name="Config" Version="1.0.0.0" />

You can add custom sections to Settings.xml as the following

<Section Name="customSettingsSection">
    <Parameter Name="p1" Value="v1" />
  </Section>

To use custom configuration sections in settings.xml in service code:

var configPkg = ServiceInitializationParameters.CodePackageActivationContext.GetConfigurationPackageObject("Config");

// working with standard configuration file settings.xml
// settings file has to be XML as it is surfaces in Settings member below (fabric takes // care of loading it for you)
var customSection = configPkg.Settings.Sections["customSettingsSection"];

foreach (var p in customSection.Parameters)
{
Debug.WriteLine(string.Format("*** Custom Settings Section Param:{0} -> {1}", p.Name, p.Value));
}

Adding Custom Configuration (New file or New Package)

If setings.xml is not enough for your needs you have the following options

  1. Adding a custom file to ‚ÄúConfig‚ÄĚ package by using the following steps:
    1. Add a file to ‚ÄúConfig‚ÄĚ directory and name it for example customConfig.json
    2. Reference the file in your code as the following
      // working with none standard configuration file other than settings.xml
         // this are treated as blobs by fabric so we can use JSON or any other format of //choice
      var customConfigFilePath = configPkg.Path + @"\CustomConfig.json";
       // ConfigMaster is a POCO object where we are keeping the configuration
      ConfigMaster configMaster = JsonConvert.DeserializeObject<ConfigMaster>(File.ReadAllText(customConfigFilePath));
                  Debug.WriteLine(string.Format("*** Custom Config File ConfigKey -> {0}", configMaster.ConfigKey));
      
  2. Add a an entirely new configuration package using the following steps:
    1. Create a directory under ‚ÄúPackageRoot‚ÄĚ. For example name it ‚ÄúAPIEndPoints‚ÄĚ.
    2. Add your configuration files under it. For example one file can be named PurchaseEndPoints.json
    3. Use it as the following

      var ApiConfig = ServiceInitializationParameters.CodePackageActivationContext.GetConfigurationPackageObject("APIEndPoints");
      
      // fabric doesn't load data it is just manages for you. data is opaque to Fabric
      var ApiConfigFilePath = ApiConfig.Path + @"\PurchaseEndPoints.json";
      
      // read the content & use your favorite json tool for this. 
      File.ReadAllText(ApiConfigFilePath); 
      

Adding and Using Data Packages

To add a Data Package use the following steps:

  1. Create a directory under ‚ÄúPackageRoot‚ÄĚ directory. For example name it ‚ÄúSvcData‚ÄĚ.
  2. Add your data file in your format of choice to the directory. For this example I have used json.
  3. In your service manifest add a data package reference to your directory as the following:
    <!-- name has to match your directory name-->
      <DataPackage Name="SvcData" Version="1.0.0.0"/>
    
  4. To use the data in your service code
    // get the data package
    var DataPkg = ServiceInitializationParameters.CodePackageActivationContext.GetDataPackageObject("SvcData");
    
     // fabric doesn't load data it is just manages for you. data is opaque to Fabric
    var customDataFilePath = DataPkg.Path + @"\StaticDataMaster.json";
    
    
    // Config Master is a POCO object where we are keeping the configuration
    StaticDataMaster dataMaster = JsonConvert.DeserializeObject<StaticDataMaster>(File.ReadAllText(customDataFilePath));
    

Structuring Your Configuration & Data Packages

When thinking about packages keep the following in mind:

  1. When performing an update. Service Fabric updates the entire package not a single file. Packages are updated using the versions (more on this in later posts).
  2. Updating a package requires a copy to the cluster which might be hosted remotely such as Azure Service Fabric. Application Packages are managed by ‚ÄúImageStore‚ÄĚ service on the cluster (part of the ‚ÄúSystem‚ÄĚ application). And yes they follow the same semantics of committing to quorum before returning a successful write. As in your upload is actually a multiple writes to cluster nodes hence uploading large data (gigabytes for example) might not be the best approach.
  3. You will get notification in your service code when a package is updated, you will not be able to identify which file has been updated. Accordingly you will have to reload the entire content of the package to reflect the change on the internal service state.

In essence break your packages according to volatility and size keep in mind that if two or more files has to be updated together then they will have to belong to the same package. This is to avoid situation where the internal state of your services were updated to reflect a partial change (yielding into broken internal state).

Epilog

This article is the first of a miniseries where I talk about Service Fabric. Expect more to come. once we have a few i will create a master index of all Service Fabric articles in one place for your quick reference

The code for this article is hosted on https://github.com/khenidak/ServiceFabricSeries

Till next time

@khnidk