Distributed Tracing

Distributed tracing means different things to different people – depends on whom you ask.

In this post, we will look at distributed tracing in the context of debugging web application errors using logs in a microservices environment. We will also figure out the way to implement distributed tracing in the least disruptive manner. The aim is to leverage components that are already part of your application stack without introducing anything new for the sake of distributed tracing.

There are many closed as well as open-source products to get distributed tracing off the ground. There is nothing wrong in using these; in a lot many cases, the investment may not be worth it.

The idea of distributed tracing is straightforward. Every request that lands on your application should have a trace id – a random unique alphanumeric string. The trace id is usually called a request id. From there onwards, the request id should be part of whichever code path the request invokes within the application as well as any external calls it makes. The request id should also be part of the logs generated in all these paths. If the code path invokes external services, those calls should have the request id in the header. The application serving the external calls should follow the same pattern as discussed here.

There you go, the simplest possible distributed tracing implementation.

distributed-tracing-copy.jpg

Get articles on coding, software and product development, managing software teams, scaling organisations and enhancing productivity by subscribing to my blog

One decision to make is where to generate the request id?

If you are serving a user-initiated action, generate the request id at the entry point of the request. Post that, all the subsequent code paths and dependent services will use the same request id.

The best place to generate the request id is the web server fronting the application server. All web servers have a way to add a custom header to an incoming request.

Below is how you would generate a request id in Nginx:


location / {
  proxy_pass http://upstream;
  proxy_set_header X-Request-Id $request_id;
}

Nginx doc – http://nginx.org/en/docs/http/ngx_http_core_module.html#var_request_id

We should generate a request id only if the request does not already have one. This sort of conditional generation can be achieved using the below configuration:


map $http_x_request_id $reqid {
  default $http_x_request_id;
  "" $request_id;
}

location @proxy_to_app {
  proxy_set_header X-Request-ID $reqid;
  proxy_pass http://backend;
}

Helpful links:

https://stackoverflow.com/questions/17748735/setting-a-trace-id-in-nginx-load-balancer

https://stackoverflow.com/questions/13583501/nginx-how-to-add-header-if-it-is-not-set/44761645

Also, make the request id part of web server access log.

Now that we have guaranteed all incoming requests have an id, we need to log the request id along with the application logs.

Most logging/application frameworks have a way to pass a thread-local custom context between layers. You can inject the request id in this context.

If you are on Java using Logback, MDC can help you to achieve this – https://logback.qos.ch/manual/mdc.html
If you are on Python and Django – https://github.com/dabapps/django-log-request-id
If you are on Go and Gin – https://github.com/atarantini/ginrequestid

Now that the access log, application log as well as any external calls made has request id, you can trace a request’s entire journey throughout the application and debug errors and pinpoint the cause.

Even if you are not trying to tie up micro service calls together, even within an application, having a request id helps in debugging issues faster – you can trace a request as it traverses through the different layers of an application.

Now that the strategy and tactic are clear, some enhancements; this is the right time to talk about span ids.

Let us say you have service A calling service B. Service A generates and sends a trace id as part of the call to service B. In addition to logging A’s trace id, service B should create a trace id of its own and log that too; this is the concept of a span id – an id owned by each service.

You can expand the span concept to a unit of work within a single service. For example, if your application has three different logical units, you can generate a span id for each of them too; depends on your use case and what is it that you are trying to achieve.

Having distributed tracing in place is a huge productivity boost. It is a low investment high ROI activity.

Kafka Is Not A Queue

How many times have you been part of a conversation where someone goes – “Let us use a queue like Kafka.”

And you are like

341yvn

Get articles on coding, software and product development, managing software teams, scaling organisations and enhancing productivity by subscribing to my blog

Kafka is a distributed stream processor. There is a gulf of difference between a queue and a distributed stream processor. Kafka happens to be the most popular distributed stream processor; there are others too, for example, Kinesis from AWS. Even though you can use a distributed stream processor like a queue, it is a good idea not to. Also, you need to be very clear about the difference between the two; you have to approach and model them differently.

For the rest of this post, we will not talk specifically about Kafka. We will build a mental model of a distributed stream processor like how we did with a distributed key-value store in one of the earlier posts.

Imagine that there is a huge file which your applications can write to. Others can read from this file. Let us refer to this file as “Topic.” The applications which write to the file are called “Producers.” Applications which read from the file are called “Consumers.” Each line in this file is called an “Event/Message.”

There you go, we have clarified many distributed stream processing parlances.

New Doc 2019-06-23 20.57.56 - Page 1

The lines in a file have a sequence based on the order in which they are written. The same goes for messages in a distributed stream processor; they are ordered. Also, each message has a unique incrementing sequence number to determine the order.

Like how a file can have multiple writers and readers, a distributed stream processor can have many producers and consumers.

A distributed stream processor is stateless. All it cares about is ensuring the file is always available. The consumers are responsible for maintaining their state. The consumers keep count of the last line they read from so that they know where to start reading from again. Some distributed stream processors may help you in maintaining the state, but that is not their primary concern.

While reading a file, even though you may be reading from the 100th line, you can always reset and start reading from another line. Reading a line from a file does not delete it from the file. The same is true for consumers of a distributed stream processor; they can go back and forth between messages as they wish.

If we keep writing to a file without ever deleting, it will soon grow large and eat up the entire storage. Distributed stream processors have retention periods to counter this problem. The retention period is configurable based on how much storage capacity you have. Messages past the retention period are deleted.

If multiple processes write to a file, you will soon hit performance limits. Same goes for a topic in a distributed stream processor; shards/partitions are used to overcome this. Instead of writing to one file, imagine that the file is split into multiple smaller files and consumers write to these thus distributing the writes.

How does a consumer decide which file to write to?
Partition key aids this decision.

A data point in the message is used to determine which partition to write to. Usually, this data point is hashed, and then modulo arithmetic is used to determine the target partition. There are other schemes too for deciding the target partition.

A queue has a straightforward mental model; first in, first out. Once you fetch a message from a queue, it is deleted; it will not be available again for processing. If a queue has multiple consumers, a message will be available to only one; this is in stark contrast to the consumers of a distributed stream processor. In a distributed stream processor, since we are reading lines from a file, a message can be processed multiple times by the same or different consumer; there is no concept of deletion.

Multiple machines back a distributed stream processor so that they are resilient to machine failures; this should be obvious since it is a “Distributed” stream processor.

Distributed systems are hard; tons of engineering goes into making anything distributed; same goes for a distributed stream processor.

I have written before too on similar lines.

Distributed System Fundamentals With An Imaginary Key-Value Store

The CAP theorem says that in the event of a network partition, a distributed system can be either consistent or available, not both.

Let us first define some of the terms in the CAP theorem.

A distributed system is a group of independent computers coordinating with each other to solve a problem. The group of computers is called a cluster.

A network connects the computers so that they can communicate and coordinate. Whenever a network is involved, there are bound to be delays and outages. Also, the individual computers themselves may go down due to hardware failure. An event which leads to some of the machines in the cluster not being reachable is called a network partition.

Get articles on coding, software and product development, managing software teams, scaling organisations and enhancing productivity by subscribing to my blog

Now that we have defined some of the terms, we will build a hypothetical distributed key-value store and witness some of the distributed computing concepts and trade-offs in action.

A key-value store is a dictionary data structure with persistence. A distributed key-value store uses multiple machines for storage.

The user sees and interacts with the key-value store as a single unit through a client. The multiple computers backing up the store is abstracted from the user.

Let us say that we have three machines – m0, m1, and m2 backing our key-value store. m0 is the coordinator and m1, and m2 are the followers.

cluster.png

The coordinator is the one which handles the reads and the writes. The client only interacts with the coordinator. The role of the followers will become clear as you read on.

Let us say that the client writes key-value k0 to the store. The coordinator persists it and then asks the followers also to persist. m0, m1, and m2 now have the key-value pair k0. Along with the key value pair, a version number is also stored; in this case, the version number is zero.

How does a read occur?
The read goes to the coordinator m0. m0 reads it’s own value, also fetches the values from m1, and m2. It compares the value it has with those fetched from m1, and m2 and sees that all of them are in sync, i.e., they are consistent. It responds to the client with the value.

Now the client updates k0 with a new value. The same sequence of steps follow, and the version number is updated to one. When this update is in transit, m2 is not reachable from m0 due to network congestion, i.e., a network partition occurs.

m2-down-1-1.png

Recalling the CAP theorem – Our system can be either available or consistent in the event of a network partition.

If we want to design our key-value store to be consistent, we should reject the write and throw an error to the client.

If we want to design our key-value store for availability, we should accept the write even though we cannot reliably store it in all the machines in the cluster, i.e., maintain the consistency of the value.

Every distributed system has to make this trade-off.

Let our key-value store trade-off consistency for availability.

The update proceeds and the state now is: m0, and m1 have the version one of the key-value and m2 is still at version zero. Now, the coordinator m0 goes down due to a hardware fault, again a network partition; our cluster has only m1 and m2 now.

down-img.pngWe have to figure out a new coordinator for our cluster. Let m1 become the new coordinator and m2 its follower. We will ignore how and why m1 becomes the new coordinator; we will tackle that in another post.

If you are a keen reader, you will see that the cluster is in an inconsistent state now as m1 has version one of the key-value whereas m2 has version zero.

Now the client tries to read k0. The same sequence of events as earlier occur for read, but coordinator realizes that the cluster is in an inconsistent state.

How to resolve this inconsistency?

Distributed key-value stores make trade-offs to resolve this.

One option is to let the last write win. m1 sees that the value it has is the latest and updates that value in m2 too and then returns the value to the client.

This is prone to the notorious clock problem in distributed systems.

Another option is to let the client decide which value to keep. In the event of inconsistency, all the conflicting values are sent to the client, and the client is free to determine which value to retain. Once the client decides, the chosen value is updated in the cluster.

In our scenario, with the version numbers, m1 could have taken the call that it has the latest value and update it in m2 too before responding to the client; this is what real distributed key-value stores do. But, it is not possible to do this in complicated scenarios involving highly diverged versions. Vector/Lamport clocks and version numbers aid this process.

At this stage, it should be apparent as to why we store copies of the key-value in multiple machines: to avoid catastrophic loss of data in case of machine failure.

All machines in the cluster keep sending heartbeat messages to each other so that they are aware of non-reachable machines in the cluster. Gossip protocol is one of the ways to achieve this. This cluster membership information is proliferated to the client too.`

We have glossed over a lot of details and simplified things in the interest of readability. A real key-value store is much more complicated and can involve thousands of machines coordinating with each other for storage. After reading this, you should be able to make some sense of the many seminal papers in distributed systems.

Dynamo paper, Aerospike paper, Bigtable paper.

Now You See Me

In the modern software world, where micro-services are de rigueur, observability of systems is paramount. If you do not have a way to observe your application, you are as good as dead.

w-a-t-a-r-i-776085-unsplash

W A T A R I

The first step towards embracing observability is figuring out what to track. Broadly, we can categorize software observability into:
1. Infrastructure metrics.
2. Application metrics.
3. Business metrics.
4. Distributed tracing.
5. Logging.
6. Alerting.

Infrastructure metrics:
Infrastructure metrics boil down to capturing the pulse of the underlying infrastructure where the application is running. Some examples are CPU utilization, memory usage, disc space usage, network ingress, and egress. Infrastructure metrics should give a clear picture as to how well the application is utilizing the hardware it is running on. Infrastructure metrics also aid in capacity planning and scaling.

Application metrics:
Application metrics help in gauging the efficiency of the application; how fast or slow the application is responding and where are the bottlenecks. Some examples of application metrics are the API response time, the number of times a particular API is called, the processing time of a specific segment of code, calls to external services and their latency. Application metrics help in weeding out potential bottlenecks as well as in optimizing the application.

Infrastructure metrics give an overall picture whereas application metrics help in drilling down to the specifics. For example, if the infrastructure metric indicates more than 100% CPU utilization, application metrics help in zeroing in on the cause of this.

Business metrics:
Business metrics are the numbers which are crucial from a functionality point of view. For example, if the piece of code deals with user login and sign-up, some business metrics of interest would be the number of people who sign up, number of people who log in, number of people who log out, the modes of login like social login versus direct. Business metrics help in keeping a pulse on the functionality and diagnosing feature specific breakdowns.

Business metrics should not be confused with business reports. Business metrics serve a very different purpose; they are not to quantify numbers accurately but more to gauge the trend and detect anomalous behavior.

It helps to think of infrastructure, application and business metrics as a hierarchy where you zoom in from one to the other when keeping a tab on the health of the system as well as diagnosing problems. Keeping a check on all three ensures you have hale and hearty application.

Logging:
Logging enables to pinpoint specific errors. The big challenge with logs is making logs easily accessible to all in the organization. Business metrics help in tracking the overall trend and logging helps to zero in on the specific details.

Distributed Tracing:
Distributed tracing ties up all the microservices in the ecosystem and assists to trace a flow end to end, as it moves from one microservice to another. Microservices fail all the time; if distributed tracing is not in place, diagnosing issues which span microservices feels like searching for a needle in a haystack.

Alerts:
If you have infrastructure, application and business metrics in place, you can create alerts which should be triggered when they show abnormal behavior; this pre-empts potential downtimes and business loss. One golden rule for alerts is, if it is an alert, it should be actionable. If not, alerts lose their significance and meaning.

Both commercial, as well as open source software, are available to build observability. NewRelic is one of the primary contenders on the commercial side. StatsD, Prometheus and the ilk dominate the open source spectrum. For log management, Splunk is the clear leader in the commercial space. ELK stack takes the crown on the open source front. Zipkin is an open source reference implementation of distributed tracing. Most of the metrics tracking software have alerting capabilities these days.

If you already have microservices or are moving towards that paradigm, you should be investing heavily on observability. Microservices without observability is a fool’s errand.

Poor Man’s Anomaly Detection

You have a feature where if someone signs up on your product, you create a wallet for that person and top it up with complimentary money. Your organization swears by micro-services; hence sign-up logic is in one service and wallet creation and crediting is in another service. Once a user signs up, sign up service sends a message to the wallet service so that it can create the wallet and do the credit. To ensure the sanctity of the system, you have to make sure that the number of signups, wallets created and credits done match one another. Also, if these go out of sync, alerts need to be in place to take corrective action.

Since the two are disparate distributed systems, one way to achieve the above is to use an anomaly detector. There are off the shelf products for this as well as open source projects. If you do not have the time, need and resources to invest in deploying an anomaly detection system, having a reconciliation system is the way to go.

black-and-white-child-connected-265702

Reconciliation is deeply entrenched in the financial domain where it is a way of life. The technology world can borrow this and use it as a poor man’s anomaly detector. For the scenario that we started with, we run queries on the data repository of the sign-up and wallet systems at regular intervals. These queries fetch the count of sign-ups, wallet creations, and credits that occurred during the period. Once we have the numbers, all we have to do is ensure that they match. One can do this with a simple bash script; this is extremely simple to develop and deploy.

Reconciliation can play a role in all places where two-phase commit flows are involved. For example, most payment flows follow a two-phase commit process. You first deduct money from the user’s payment instrument and then fulfill the commitment. There is a good chance that post payment debit, your system dies not doing the fulfillment. Having a reconciliation system in place helps you to take corrective action in these scenarios.

Reconciliation is a simple way to achieve anomaly detection until you have the need and the resources to invest in a more robust distributed anomaly detector.

Concurrency Models

We can roughly classify concurrency models into:
1. Thread based concurrency.
2. Event based concurrency.

Imagine that you run a store with only one customer service representative. As soon as a customer walks in, the customer service representative greets the customer with a quick hello saying – “If you need any help, give me a shout, and I will help you out.” She then waits for the customer to seek help. She aims to complete the interaction as soon as possible and wait for the next communication. When a customer asks for help, she quickly answers the query and goes back to waiting. If a customer asks where is the washroom, she points in the right direction quickly and reverts to waiting. If a customer asks her for the price of a product, she quickly conveys the price and goes back to waiting. The point to note here is that there is only one customer service representative for the entire store servicing all customers. This model works exceptionally well when the representative is fast, and the answers to the queries are quick. Concurrency based on events works like this.

Now consider the situation where you have five customer service representatives in your store. As soon as a customer walks in, a representative is assigned exclusively to that customer. When another customer walks in, one more representative is picked from the pool and assigned to the customer. The critical point to note here is that there is a one to one relationship between the customer service representative and the customer. When one representative is servicing a customer, she does not bother about other customers; she is exclusive to that customer. Since our pool has five representatives, at most, we can serve only five customers at a time. What do we do when the sixth customer walks into the store? We can wait until one of the customers walks out or we can have a rule saying that a representative services a customer for a fixed period after which she will be assigned to another waiting customer. She is reassigned to the original customer once the time elapses. Concurrency based on threads works like this.

Coming back to the scenario wherein the sixth customer walks in. Now, we have to ask the sixth customer to wait until a representative is free. On the other hand, we have to wean away a representative from one of the existing customers and assign her to the new customer. When this happens, the customer who was initially being serviced by this representative has to wait. After the elapsed time, we have to assign the representative back to the original customer. When a lot of customers walk in, and you have a fixed no of representatives, quite a bit of coordination is needed to service all customers satisfactorily. In a computer, the CPU scheduler takes care of switching between tasks. Switching is a comparatively time-consuming operation and an overhead of the thread based concurrency model when compared to an event based one.

In the single representative scenario, what happens if one of the customers starts a long conversation with the representative? The representative will be stuck with the customer, and if other customers have queries, they will have to wait for the representative to finish the ongoing conversation. Also, what if one of the customers sends a representative on a long-running errand like fetching something from the depot a mile away? Until the representative returns, all other customers have to wait to get their queries resolved. One egregious customer can jeopardize all other customers and hold up the entire store operation.

Hence, when working with event based concurrency, it is essential not to:
1. Carry out CPU intensive tasks akin to having a long-running conversation with the representative.
2. Carry out blocking IO tasks similar to sending the representative to the depot.

superhero-534120_640

NGINX and Redis are probably the most commonly used software that leverage event based concurrency. The workloads that these cater to are quick. Hence event based concurrency makes perfect sense here.

Taking the case of NGINX used as a reverse proxy, what does it do? Pick a client connection from the listen queue, do some operations on this and then forward it to the upstream server and then wait for the upstream to respond. While waiting for the upstream, NGINX can pick more client connections from the queue and repeat the above. When the upstream sends a response, it relies on this back to the client. Since all these are short-lived operations, this fits beautifully into an event based concurrency model. Good old Apache HTTP server creates a thread/process for each connection to do the same. The no of threads it has constraints apache. If the number of incoming requests is more than the number of threads in its pool, it has to deal with switching and coordination. NGINX does not have this overhead which makes it comparatively faster than Apache in real-world workloads. All of this is a bit simplistic and hand-wavy but should convey the idea.

Event based concurrency cannot leverage multiple CPU cores which all modern processors have. To do this, you create one event unit for each core usually called a worker. Also, most software that leverage event based concurrency adopt a hybrid model where they use event based concurrency for short-lived quick operations and off-load long-running tasks to a thread/process.

I have glossed over a lot of details and nuances to explain a complex topic like concurrency in simple terms. Treat this as a good starting guide to dig more into this fascinating world.

Ode To Queues

If you have a producer with an uneven rate of production and a consumer which cannot keep pace with the producer at its peak, use a queue.

If you have a workload which need not be addressed synchronously, use a queue.

If your customer-facing application is riddled with workloads which can be deferred, move these to a queue thus making the customer-facing application lean and mean.

Get articles on coding, software and product development, managing software teams, scaling organisations and enhancing productivity by subscribing to my blog

duck-3217049_640

Think of a queue as a shock absorber.

There are workloads which need to be processed immediately with sub-millisecond latency, and then there are ones where you have the luxury of taking time. It is advisable not to mix these in an application. The second kind of workload can be addressed by moving it to a queue and having a consumer process them.

For example, consider a scenario where you are consuming messages and persisting them in a data store. These messages are coming in at a variable rate, and at its peak, the data store cannot handle the load. You have two options. Scale the data store to meet the peak load or slap a queue in between to absorb the shock. Queue solves this problem in a KISS manner.

Queues enable applications to be highly available while giving enough room to maneuver. As long as the queue is highly available, the chance of message loss is almost nil. Since a queue is durable, you need not perfect your consumer’s high availability; you get leeway to manage.

With applications embracing microservices paradigm, there is a lot of API back and forth. Not all API consumption has to be in real-time. Whatever can be deferred should use a queue as the transport mechanism.

Queue introduces a bit more complexity into an application but the advantage it brings to the table makes it a worthwhile investment.