Service Fabric: Partitions

Prelog

Tackling a computing problem by partitioning it into a smaller pieces is not new. In fact it goes back to 1970’s (check LPAR references on Wikipedia here https://en.wikipedia.org/wiki/Partition). This particular solution or approach got even higher traction in our field as the data sets we are dealing with got even bigger. Enter “Sharding” (check here: https://en.wikipedia.org/wiki/Shard_(database_architecture) the key premise is if we are can split a large data set into a smaller buckets each can be served by a server then by definition time T1 to find record R in data set S of size Z1 is longer than time T2 to find same record R in data set S1 of size Z2 which is equal to roughly Z1/(number of shards). This premise obviously works and is embraced widely.

Now imagine your solution which consists of two parts compute and storage which is Shard-ed across multiple buckets. Your compute receives the requests that has to do with either reading or alter data. the key decision to make before anything to identify which shard should it modify the data in. this typically led to one of two approaches (generally speaking, there are more).

  1. Create a list of shards according to key data element. For example the US State of the user lives in. which means you will have 52 shard. For each request before processing match a user State to a shard and perform the action accordingly.
  2. Create a list of shards with a sequence of arbitrary keys (example integer sequence). For each  request use a hash function against a key data element (for example user email) which will yield into a shard key.

Once you have the key to the shard you can identify the address or location that serves that shard.

Service Fabric Partitions

The easiest way to tie the above to Service Fabric is to introduce few Service Fabric Concepts.

  • Service Fabric enables you to store data locally to the service itself without the need to go to external store such as Azure Storage or Sql Server. This data is referred to as a Service State. Services that maintain state locally are referred to as Stateful Services. There is a lot of performance advantages to this approach as your “compute” operations doesn’t need to perform out of band (network calls) to other resources to acquire the needed data to process the request.

Service Fabric also supports stateless services. Which doesn’t require to save state locally, they are called “Stateless Services” we will take about them in later posts.

  • Because this state is important and persisted between requests this state need to be highly resilient to failures. Service Fabric ensures that for each partition the state is saved to quorum of 3 (this is not a fixed # and we will talk about this in details in later posts). This 3 is the number of replicas you identify for your services. Service Fabric also ensures that replicas are deployed to multiple servers by employing mechanisms similar to Azure PaaS called fault domains & update domains. Typically you will have one primary replica responsible for writing and read data. and a number Active Secondaries which gets a copy of the write for every write operation performed on the Primary synchronously. There will be a lot of talk on Service Fabric Replicas in later posts.
  • Now you can split your computing problem using multiple smaller pieces (each will be highly resilient to failure), in this case we name them Service Fabric Partitions. Each partition is responsible for part of your “state” and by definition your “compute“.

Service Fabric Partitions in Relation to Service Life Cycle

When you deploy a new service to a cluster. Service Fabric run time deploys a new instance of your code for each replica in your partitions. For example if you have 26 partitions with 3 replica then the # of instances deployed will be 26 X 3 = 78. the Fabric run time will not consider the service healthy until the # of instances deployed is equal to that number.

In case of failures (node down or similar event). Service Fabric will re-initialize partitions served by this node on other healthy nodes keeping in mind fault domains, update domains, resource balancing & Placement Constraints. We will talk about all of that in details in later posts.

Creating Service Fabric Partitions

There is a big implicit assumption here. The number of partitions in your service is fixed. And predetermined during the design phase. Hence the amount of efforts you should put in defining the correct # of partitions and how they are partitioned.

Remember from an earlier post (http://henidak.com/2015/07/blog/). ApplicationManifest.xml defines how the service will be deployed on the cluster and # of partitions. Configuring partitioning is done by modification to ApplicaitonManifest.xml

Service Fabric Supports the following partitions Types

Uniform Partitions

This is where your partitions are identified by a key spread over int64 value. For each incoming request a hash function is used to identify the key.  For each partition Service Fabric will assign a low key and high key. In essence every partition is assigned a range of keys. The key you identify based on the hash is used to match the partition (key falling with this range).

Service Fabric does not force a certain distribution of load and data across the partition nor is concerned with how you identify the key. Hence the importance of your hashing function. It can be as simple as matching the first of a word (check word count sample here https://github.com/Azure/servicefabric-samples/tree/master/samples/Services/VS2015/WordCount).  Or it can be as complex as FNV hashing functions (check: http://www.isthe.com/chongo/tech/comp/fnv/).

 

<DefaultServices>
    <Service Name="ReverseCompute">
      <StatefulService ServiceTypeName="ReverseComputeType" TargetReplicaSetSize="3" MinReplicaSetSize="2">
        <UniformInt64Partition PartitionCount="26" LowKey="1" HighKey="26" />
      </StatefulService>
    </Service>
  </DefaultServices>

The above is a snippet for a service (ApplicationMainfest.xml DefaultServices section) that performs a reverse on strings it receives. it is partitioned into a 26 partition (for English alphabetical letters). And my keys are spread over values from 1 to 26. my hashing function in this case is a simple as

 public static long getPartitionKeyForWord(string sWord)
        {
            return ((long)char.ToUpper(sWord[0])) - 64;
        }

Named Partitions

Named partition is a predefined list of partitioned where each is identified by name. as an example

<DefaultServices>

    <Service Name="PartitionedService">
      <StatefulService ServiceTypeName="PartitionedServiceType" TargetReplicaSetSize="3" MinReplicaSetSize="2">
        <NamedPartition>
          <Partition Name="WestUS" />
          <Partition Name="CentralUS" />
          <Partition Name="EastUS" />
        </NamedPartition>
      </StatefulService>
    </Service>
  </DefaultServices>

The above is an arbitrary service split over 3 named partitions for each US zone. In this case I don’t need to employ a hashing function I can just use user address to identify which partition should handle the request.

Singleton Partition

As the name implies. Singleton partition is a single partition that serves all your data and compute requirements. In essence this is you saying “please don’t split my computing problem into a smaller pieces”

<DefaultServices>
    <Service Name="PartitionedService">
      <StatefulService ServiceTypeName="PartitionedServiceType" TargetReplicaSetSize="3" MinReplicaSetSize="2">
        <SingletonPartition/>
      </StatefulService>
    </Service>
  </DefaultServices>

The above is the same arbitrary service configured to use a singleton partition.

Partitions & Resources

If you think about it. Your stateful service can be categorized as the following:

  1. Service that does not interact of with the external world. As in it doesn’t listen to external calls/network address. This can be a background service that pulls data from external store performs certain staged compute with data saved locally until final stage where the service spits the data out. An example for this as index services such are those used by search engine.
  2. Services that does interacts with external world. This service typically listens network traffic (as an example Web API or TCP).

Listening to the External World

Services are running on the cluster as replicas. The cluster state itself is fluid. And changes in cluster state (# of nodes, nodes availability and health, resource re-balancing to name a few) May require changes to where your replicas are running (on which node). Hence the address which your services are listening is subject to change. The cluster itself is designed to support high density micro service deployment. So fixed address will cause ports collision/ address conflicts and so on.  To solve this problem Service Fabric allows you to define the network resources needed in your service in form of endpoints. If you read the previous post you will notice that Services are described in the ServiceMainfest.xml and this is where we can describe our endpoints. Usually these end points are in fact network ports (not the address).  These ports are handed over to your service to construct the listening address based on them.

It is important to understand that Service Fabric does not enforce a certain communication type nor addressing scheme.

Describing Your End Points (Ports)

Service Fabric allows to describe end points as the following

Unnamed Ports Approach

<Resources>
    <Endpoints>
      <Endpoint Name="ServiceEndpoint" />
      <Endpoint Name="ReplicatorEndpoint" />
    </Endpoints>
  </Resources>

The snippet above is from my ServiceManifest.xml which states 2 endpoints.  Each is a port. Since it is unnamed Service Fabric will assign dynamic one. One for each of your partitions (not replicas, this is important as we will see below). this approach is common for Uniform and Named Partition and in cases where you expect multiple primary partition of the same service to be served by the same cluster node.

Service Fabric uses the port range stated in the ClusterManifest.xml (section NodeTypes/NodeType/ApplicationEndPoints) to pick a port from. It also ensures that the same port is not used on the same cluster node twice. The endpoint name serves as a key that allow you to locate your port (more on this below).

Note: the ReplicatorEndPoint is used by the mechanism that ensures that stateful service writes are committed against a quorum before considered successful to ensure resilience against failure.  

Named End Points (Ports)

<Resources>
    <Endpoints>
      <Endpoint Name="ServiceEndpoint" Port="9000" />
      <Endpoint Name="ReplicatorEndpoint" />
    </Endpoints>
  </Resources>

The above snipped forces your “ServiceEndPoint” to always use port 9000. In this case this port doesn’t have to reside within the range in the ClusterManifest.xml. this is used typically if:

  1. Your primaries will never collide and no more than one primary will be hosted by a cluster node.
  2. You are using singleton partition in this case only one partition with one primary replica will ever exist on the cluster.
  3. This is a stateless service in this case you need to know the exact port (and the listening address) to register it with an external load balance-er (more on this in later posts).

Note: you can also identify the protocol and type of your end point but this have no impact of the port assignment. 

Note: the same mechanism – describing resources – is also used to describe other resources, which we will talk about later

Listening & Constructing Listening Addresses

Your stateful service inherit from StatefulService class. VS.NET tooling ensures this once you select Stateful service template. StatefulService base class defines a

  protected override ICommunicationListener CreateCommunicationListener();

This method is called to return an object that implements ICommunicationListener interface. You typically override this method and return your concrete ICommunicationListener implementation. ICommunicationListener is one of those badly named interfaces. The ICommunicationListener itself doesn’t define “listener” it rather defines the lifetime of a typical network listener as the following:

namespace Microsoft.ServiceFabric.Services
{
    public interface ICommunicationListener
    {
        void Abort();
        Task CloseAsync(CancellationToken cancellationToken);
        void Initialize(ServiceInitializationParameters serviceInitializationParameters);
        Task<string> OpenAsync(CancellationToken cancellationToken);
    }
}
  • Abort: called when your replica is being aborted. This is when Service Fabric is responding to a permanent failure in the replica (that owns ICommunicationListener) and it needs to re-initialize it. if you noticed this is a void method and runs synchronously you shouldn’t hog the thread calling you during your clean up.
  • CloseAsync: called when your replica is being closed, during un-deployment or an upgrade. The CancellationToken is sent to you to attach your work to an overall cascading cancellation with the calling thread.
  • Initialize: Called before OpenAsync with a set of parameters (when you cast them to StatefulServiceInitializationParameters containing
    • Partition ID
    • Replica ID
    • ActivationContext – to reach packages (check earlier posts on configuration and data packages).
    • Service Name / Service Type Name
  • OpenAsync: Called when the service is ready to listen. The string returned is the address of your listening endpoint and is reported to clients (check Reaching Your Partitions below) and on Service Fabric Explorer. For example once I select a partition of my service I get this info (among them is the string i returned by OpenAsync method).

ReplicaAddress

Keep in mind that you typically listen to an address and report a different one. for example you listen to http://+/ and return http://<ServerName>/ . This to ensure perform listening on all IPs assigned to the machine but you tell the outside world that you listening to a particular machine name.

So your job during initialization is to define the addresses; Internal & External. and in OpenAsync start the actual listening and report the external address. as an example here is my Initialize and OpenAsync methods.

void ICommunicationListener.Initialize(ServiceInitializationParameters serviceInitializationParameters)
        {
            // get the end point description
            EndpointResourceDescription serviceEndpoint = serviceInitializationParameters.CodePackageActivationContext.GetEndpoint("ServiceEndpoint");
            int port = serviceEndpoint.Port;

            if (serviceInitializationParameters is StatefulServiceInitializationParameters)
            {
                // this should be +:port but will require presetup for each port
                // allocted for the service
                statefulInitParams = (StatefulServiceInitializationParameters)serviceInitializationParameters;

                this.ListeningAddress = String.Format(
                    CultureInfo.InvariantCulture,
                    "http://+:{0}/{1}/{2}/",
                    port,
                    statefulInitParams.PartitionId,
                    statefulInitParams.ReplicaId);
            }
            else
            {
                // our service is only stateful. can not run as stateless
                throw new InvalidOperationException();
            }
            this.PublishAddress = this.ListeningAddress.Replace("+", FabricRuntime.GetNodeContext().IPAddressOrFQDN);
            this.PublishAddress = this.PublishAddress.Replace("http://", "ws://");
            Debug.WriteLine(string.Format("New replica listening on: {0}", this.ListeningAddress));

        }

        async Task<string> ICommunicationListener.OpenAsync(CancellationToken cancellationToken)
        {

            try
            {
                await this.startServer();
            }
            catch(Exception e)
            {
                ServiceEventSource.Current.ErrorMessage("Replica:{0} Partition:{1} - Failed to start web server on {2}",
                                                        this.statefulInitParams.ReplicaId,
                                                        this.statefulInitParams.PartitionId,
                                                        this.ListeningAddress);

                Debug.WriteLine("Start Web Server Failed: " + e.Message);

                //TODO: report health
                throw (e);

            }

            return this.PublishAddress;
        }

In here i defined 2 local member variables one represents internal listening address named ListeningAddress and one reported to external world named PublishAddress. My listening address is http://+/<ParttitionId>/<ReplicaId> to avoid collision if my replicas were hosted on the same node (which is typical for a dev box where everything is hosted on a single machine). My public address is ws://<machine name>/<ParttitionId>/<ReplicaId>. The differences are:

  1. Internal address uses all the IPs of the machine by http://+/ while external is actual machine name http://<Machine Name> to allow clients to perform cross network calls.
  2. Internal address uses http:// because I am using HttpListener .NET classe with OWIN which require address in that http:// scheme. External address are ws:// because this is a web socket server and most clients require addresses in that scheme.

Note: HttpListener enables port sharing by defining different listening addresses. So i can have multiple replicas using this long address format listening on the same port.  

Listening on Your Active Secondaries

By default the above behavior applies only on Primary Replicas. So out of – in my example – 78 replica (26 partition X 3 Replica each) only 26 Primary Replicas will be allowed to listen to incoming network calls. This is based on the assumption that request processing (for every request) you will need to perform a read and write operation. Write operations are only allowed on Primary Replica.

If this is not the situation you are dealing with (where some of the requests) can be processed with read only operations then it is possible to enable listening on Active Secondaries to distribute the load across multiple processes. given that

  1. You can differentiate incoming requests based on (read only R or R/W) operations.
  2. You can route the requests to the correct replica based on the type (R vs. R/W).
  3. You perform read only in your secondaries. Any write operation on your secondaries will result in an exception.

To enable this behavior just override EnableCommunicationListenerOnSecondary property in your service class and return true value.

Additional Notes on StatefulService & ICommunicationListener

It is important to draw a distinction between what Service Fabric needs to run your services correctly. And the helper classes that the Service Fabric team created to make our job creating services easier. At minimum Service Fabric needs your service to implement IStatefulServiceReplica interface. A quick look at it you will see the communication & listening life cycle management is not really a part of it. You can choose to implement your own concrete implementation of IStatefulServiceReplica. But if choose to do so you will have to implement a lot of state management details (specially replicating it across multiple replicas) in addition to communication & listening life cycle management.

It is also important to note that these helper classes are part of the SDK and are/will be supported by Service Fabric team. So you are better off fitting within these classes than implementing your own.

The details discussed above is part of the helper classes Service Fabric created and are not enforced by the run time. You are free to choose a different implementation approach we will be covering this in later posts.

Reaching Your Partitions

Everything we talked about so far was about services, partition and replica. This section is about clients that wants to connect to these services’ replicas. Since the state of the cluster is fluid, replicas move between machines accordingly you will need to to resolve the correct Partition & Replica and get its address. Choosing the pattern highly depends on the following factors:

  • Clients that are not based on .NET and can be aware of the existence of Service Fabric cluster and the above facts. An example of this could be your favorite NodeJS server application calling into services hosted by Service Fabric cluster via REST or Web Sockets. These clients can use Service Fabric REST interface to resolve the partition and replicas.  The Rest API is hosted on port 19007 (with port changing for every node) for the public preview version. This expected to change as we approach release date to one port for the entire cluster isolating client from the location of the REST API service (currently called HttpGateway) for more details on how it is hosted check the your ClusterManifest.xml.

Clients that are based .NET can still use this option. These clients might want to avoid using TCP/IP (as used by FabricClient class, described below) as it is not enabled on corporate firewall or other reasons.

  • Clients that are based on .NET and can be aware of the existence of Service Fabric cluster can use FabricClient class part of System.Fabric namespace to resolve partitions (described below). An example for this is a either a service (acts as a client for other services on the cluster)  Or a .NET server application such as OWIN or others using a service hosted on the cluster.
  • Clients that are based on .NET or others and can not be aware of the existence of the cluster. The solution for this problem falls within “Add another abstraction layer”. In essence you add a gateway stateless service using named port exposed via a load-balancer the service will perform the resolution on behalf of the caller and connect to the correct partition. Since this is a gateway other interesting behaviors can be added such as:
    • Security Termination at the gateway level.
    • Protocol Transition. for example your gateway can be REST API to support the maximum # of client types but your back-end partitioned stateful services can use sockets.

Evantually your architecture will (Logical View) look something like this

GW Architecture Overview

 

In the above architecture when you generate the address you will have to make sure that the external address (returned by OpenAsync) points to the load balancer not the node name or cluster address.

Note: on the below discussion we are not covering how Fabric Client authenticates to the cluster.

Using Fabric Client To Resolve Partitions

Irrespective of your architecture if you have to process a request via services on a cluster you will eventually have to resolve which partition (and replica) should respond to this particular request. Service Fabric using “ServiceManagementClient” object (as member variable named “ServiceManager”)  can be used as in the following sequence

The below is referred to Complaint based Service Resolution:

  1. Resolve the current partition key using you hashing function.
  2. Use FabricClient.ServiceManager.ResolvePartitionAsync method to resolve the partiton as ResolvedServicePartition object.
  3. Locate the endpoints used by the replica you are looking for (Primary or other).

 var serviceUri = new Uri("fabric:/ReverseApp/ReverseCompute");
            var client = new FabricClient(); // or new FabricClient(params host endpoints as string) if you are running this outside the cluster.
            var pKey = await ResolvePartitionKey(RequestObject); // returns either a string or long (Named Partitions || Uniform Partitions).

            var ResolvedPartition = await client.ServiceManager.ResolveServicePartitionAsync(serviceUri, pKey);

            // get end points
            var endpoints = ResolvedPartition.Endpoints;
            foreach (ResolvedServiceEndpoint ep in endpoints)
            {
                if (ep.Role == ServiceEndpointRole.StatefulPrimary)
                    _serviceAddress = ep.Address; // service Address is a member variable

                Debug.WriteLine("Address" + ep.Address); // address is whatever returned in your ICommnuicationListener.OpenAsync
                Debug.WriteLine("Role" + ep.Role.ToString()); // StatefulPrimary/Secondry/Stateless or invalid
            }

The above is executed on demand, which is typically when you have a new request. Remember between one request and another the cluster state might change and the address you have might change. You can rerun the same code with a slight change as the below:


// PresolvedPartition is previously resolved partition.
   var ResolvedPartition = await client.ServiceManager.ResolveServicePartitionAsync(serviceUri, PresolvedPartition);
// get end points and the rest of actions

You can listen to notification one when a partition location changes. This is typical when you expect your clients will send mutiple messages targeting the same partition. The below is referred to as Notification Based Service Resolution

  private void PartitionChangeHandler(FabricClient source, long handlerId, ServicePartitionResolutionChange args)
        {
            // every time a change occured this will be called "After" the change

            // check there are exceptions by
            //args.HasException

            var endpoints = args.Result.Endpoints;
            foreach (ResolvedServiceEndpoint ep in endpoints)
            {
                if (ep.Role == ServiceEndpointRole.StatefulPrimary)
                    _serviceAddress = ep.Address; // service Address is a member variable

                Debug.WriteLine("Address" + ep.Address); // address is whatever returned in your ICommnuicationListener.OpenAsync
                Debug.WriteLine("Role" + ep.Role.ToString()); // StatefulPrimary/Secondry/Stateless or invalid
            }
        }

// other method
 var serviceUri = new Uri("fabric:/ReverseApp/ReverseCompute");
            var client = new FabricClient(); // or new FabricClient(params host endpoints as string) if you are running this outside the cluster.
            var pKey = await ResolvePartitionKey(RequestObject); // returns either a string or long (Named Partitions || Uniform Partitions).

            var ResolvedPartition = await client.ServiceManager.ResolveServicePartitionAsync(serviceUri, ResolvedPartition);
            // use ResolvedPartition to get the endpoint as described before.

            var handler = client.ServiceManager.RegisterServicePartitionResolutionChangeHandler(serviceUri, pKey, PartitionChangeHandler);

            // once you are done use the following to un-register
            client.ServiceManager.UnregisterServicePartitionResolutionChangeHandler(handler);

You can also listen to any change in any service partition state by using RegisterServicePartitionResolutionChangeHandler overload that doesn’t require a partition key.

As you noticed partition key is a center piece to resolution. You can get a view of all partitions – without the need for a partition key – of a certain service using FabricClient.QueryManager (type of QueryClient) as the following

var serviceUri = new Uri("fabric:/ReverseApp/ReverseCompute");
var client = new FabricClient(); // or new FabricClient(params host endpoints as string) if you are running this outside the cluster. 

            var partitions = client.QueryManager.GetPartitionListAsync(serviceUri).Result;

 // resolve all partitions as usual - in my case i am using Uniform Partitions. 

            foreach (Partition p in partitions)
            {
                var partitionInfo = p.PartitionInformation as Int64RangePartitionInformation;
                Debug.WriteLine(partitionInfo.LowKey +  " " + partitionInfo.HighKey);
                var ResolvedPartition = await client.ServiceManager.ResolveServicePartitionAsync(serviceUri, partitionInfo.LowKey);
                var endpoints = ResolvedPartition.Endpoints;
                foreach (ResolvedServiceEndpoint ep in endpoints)
                {

                    Debug.WriteLine("Address" + ep.Address);
                    Debug.WriteLine("Role" + ep.Role.ToString());
                }

                Debug.WriteLine(" ");

            }

 

How Does FabricClient Really Work

FabricClient “XXXManager” members (as of preview version) sets on top of a native MTA COM object (apparently COM is still love). almost each of them sets on top of a MTA COM object.  for example ServiceManager (ServiceManagementClient) sets on top of COM with UUID of “8180db27-7d0b-43b0-82e0-4a8e022fc238”. Almost all calls are funneled through this COM object. There is a serious wrapping efforts in FabricClient code to wrap COM MTA with call backs into .NET async/await pairs. this result into the following sequence (when you perform resolution):

  1. Resolution Call  is wrapped into 2 delegates one for on going call and one for call backs.
  2. Call is interop-ed to COM runtime.
  3. The native COM serializes and performs the network call over TCP/IP (on port 1900). The COM object also maintains the connection between client and server.
  4. Call is routed to server, de-serialized and eventually routed Naming Service which stores updated list of services/partitions/replicas and addresses.
  5. Server responds, serializes the result, puts on network packets to the client (COM Object).
  6. COM serializes and bubbles it to your CLR FabricClient via interop then call backs.

As you can tell this is pretty expensive call with network and COM interop in the middle between your code and needed information. This doesn’t change in case of Notification Based Service Resolution. At least it begins with step #4 Also registering for all partitions is just a waste of resources (imagine having a G/W service with 10 instances, each subscribing changes on a compute service with 100 partitions).

There are ways to work around that or at least optimize our resolution mechanisms, we will have another post dedicated just for this topic. 

 

Epilog

I admit when I set out to put this post together I didn’t expect it to be that long. But my aim has always been providing a cohesive set of information in one location that allows the reader to create advanced scenarios on top of Service Fabric. I have deferred some deeper dives in sub topics to follow up posts, so stay tuned.

till next time!

@khnidk

One thought on “Service Fabric: Partitions”

Leave a Reply

Your email address will not be published. Required fields are marked *