Distributed Tracing

Distributed tracing means different things to different people – depends on whom you ask.

In this post, we will look at distributed tracing in the context of debugging web application errors using logs in a microservices environment. We will also figure out the way to implement distributed tracing in the least disruptive manner. The aim is to leverage components that are already part of your application stack without introducing anything new for the sake of distributed tracing.

There are many closed as well as open-source products to get distributed tracing off the ground. There is nothing wrong in using these; in a lot many cases, the investment may not be worth it.

The idea of distributed tracing is straightforward. Every request that lands on your application should have a trace id – a random unique alphanumeric string. The trace id is usually called a request id. From there onwards, the request id should be part of whichever code path the request invokes within the application as well as any external calls it makes. The request id should also be part of the logs generated in all these paths. If the code path invokes external services, those calls should have the request id in the header. The application serving the external calls should follow the same pattern as discussed here.

There you go, the simplest possible distributed tracing implementation.

distributed-tracing-copy.jpg

Get articles on coding, software and product development, managing software teams, scaling organisations and enhancing productivity by subscribing to my blog

One decision to make is where to generate the request id?

If you are serving a user-initiated action, generate the request id at the entry point of the request. Post that, all the subsequent code paths and dependent services will use the same request id.

The best place to generate the request id is the web server fronting the application server. All web servers have a way to add a custom header to an incoming request.

Below is how you would generate a request id in Nginx:


location / {
  proxy_pass http://upstream;
  proxy_set_header X-Request-Id $request_id;
}

Nginx doc – http://nginx.org/en/docs/http/ngx_http_core_module.html#var_request_id

We should generate a request id only if the request does not already have one. This sort of conditional generation can be achieved using the below configuration:


map $http_x_request_id $reqid {
  default $http_x_request_id;
  "" $request_id;
}

location @proxy_to_app {
  proxy_set_header X-Request-ID $reqid;
  proxy_pass http://backend;
}

Helpful links:

https://stackoverflow.com/questions/17748735/setting-a-trace-id-in-nginx-load-balancer

https://stackoverflow.com/questions/13583501/nginx-how-to-add-header-if-it-is-not-set/44761645

Also, make the request id part of web server access log.

Now that we have guaranteed all incoming requests have an id, we need to log the request id along with the application logs.

Most logging/application frameworks have a way to pass a thread-local custom context between layers. You can inject the request id in this context.

If you are on Java using Logback, MDC can help you to achieve this – https://logback.qos.ch/manual/mdc.html
If you are on Python and Django – https://github.com/dabapps/django-log-request-id
If you are on Go and Gin – https://github.com/atarantini/ginrequestid

Now that the access log, application log as well as any external calls made has request id, you can trace a request’s entire journey throughout the application and debug errors and pinpoint the cause.

Even if you are not trying to tie up micro service calls together, even within an application, having a request id helps in debugging issues faster – you can trace a request as it traverses through the different layers of an application.

Now that the strategy and tactic are clear, some enhancements; this is the right time to talk about span ids.

Let us say you have service A calling service B. Service A generates and sends a trace id as part of the call to service B. In addition to logging A’s trace id, service B should create a trace id of its own and log that too; this is the concept of a span id – an id owned by each service.

You can expand the span concept to a unit of work within a single service. For example, if your application has three different logical units, you can generate a span id for each of them too; depends on your use case and what is it that you are trying to achieve.

Having distributed tracing in place is a huge productivity boost. It is a low investment high ROI activity.

Kafka Is Not A Queue

How many times have you been part of a conversation where someone goes – “Let us use a queue like Kafka.”

And you are like

341yvn

Get articles on coding, software and product development, managing software teams, scaling organisations and enhancing productivity by subscribing to my blog

Kafka is a distributed stream processor. There is a gulf of difference between a queue and a distributed stream processor. Kafka happens to be the most popular distributed stream processor; there are others too, for example, Kinesis from AWS. Even though you can use a distributed stream processor like a queue, it is a good idea not to. Also, you need to be very clear about the difference between the two; you have to approach and model them differently.

For the rest of this post, we will not talk specifically about Kafka. We will build a mental model of a distributed stream processor like how we did with a distributed key-value store in one of the earlier posts.

Imagine that there is a huge file which your applications can write to. Others can read from this file. Let us refer to this file as “Topic.” The applications which write to the file are called “Producers.” Applications which read from the file are called “Consumers.” Each line in this file is called an “Event/Message.”

There you go, we have clarified many distributed stream processing parlances.

New Doc 2019-06-23 20.57.56 - Page 1

The lines in a file have a sequence based on the order in which they are written. The same goes for messages in a distributed stream processor; they are ordered. Also, each message has a unique incrementing sequence number to determine the order.

Like how a file can have multiple writers and readers, a distributed stream processor can have many producers and consumers.

A distributed stream processor is stateless. All it cares about is ensuring the file is always available. The consumers are responsible for maintaining their state. The consumers keep count of the last line they read from so that they know where to start reading from again. Some distributed stream processors may help you in maintaining the state, but that is not their primary concern.

While reading a file, even though you may be reading from the 100th line, you can always reset and start reading from another line. Reading a line from a file does not delete it from the file. The same is true for consumers of a distributed stream processor; they can go back and forth between messages as they wish.

If we keep writing to a file without ever deleting, it will soon grow large and eat up the entire storage. Distributed stream processors have retention periods to counter this problem. The retention period is configurable based on how much storage capacity you have. Messages past the retention period are deleted.

If multiple processes write to a file, you will soon hit performance limits. Same goes for a topic in a distributed stream processor; shards/partitions are used to overcome this. Instead of writing to one file, imagine that the file is split into multiple smaller files and consumers write to these thus distributing the writes.

How does a consumer decide which file to write to?
Partition key aids this decision.

A data point in the message is used to determine which partition to write to. Usually, this data point is hashed, and then modulo arithmetic is used to determine the target partition. There are other schemes too for deciding the target partition.

A queue has a straightforward mental model; first in, first out. Once you fetch a message from a queue, it is deleted; it will not be available again for processing. If a queue has multiple consumers, a message will be available to only one; this is in stark contrast to the consumers of a distributed stream processor. In a distributed stream processor, since we are reading lines from a file, a message can be processed multiple times by the same or different consumer; there is no concept of deletion.

Multiple machines back a distributed stream processor so that they are resilient to machine failures; this should be obvious since it is a “Distributed” stream processor.

Distributed systems are hard; tons of engineering goes into making anything distributed; same goes for a distributed stream processor.

I have written before too on similar lines.

Distributed System Fundamentals With An Imaginary Key-Value Store

The CAP theorem says that in the event of a network partition, a distributed system can be either consistent or available, not both.

Let us first define some of the terms in the CAP theorem.

A distributed system is a group of independent computers coordinating with each other to solve a problem. The group of computers is called a cluster.

A network connects the computers so that they can communicate and coordinate. Whenever a network is involved, there are bound to be delays and outages. Also, the individual computers themselves may go down due to hardware failure. An event which leads to some of the machines in the cluster not being reachable is called a network partition.

Get articles on coding, software and product development, managing software teams, scaling organisations and enhancing productivity by subscribing to my blog

Now that we have defined some of the terms, we will build a hypothetical distributed key-value store and witness some of the distributed computing concepts and trade-offs in action.

A key-value store is a dictionary data structure with persistence. A distributed key-value store uses multiple machines for storage.

The user sees and interacts with the key-value store as a single unit through a client. The multiple computers backing up the store is abstracted from the user.

Let us say that we have three machines – m0, m1, and m2 backing our key-value store. m0 is the coordinator and m1, and m2 are the followers.

cluster.png

The coordinator is the one which handles the reads and the writes. The client only interacts with the coordinator. The role of the followers will become clear as you read on.

Let us say that the client writes key-value k0 to the store. The coordinator persists it and then asks the followers also to persist. m0, m1, and m2 now have the key-value pair k0. Along with the key value pair, a version number is also stored; in this case, the version number is zero.

How does a read occur?
The read goes to the coordinator m0. m0 reads it’s own value, also fetches the values from m1, and m2. It compares the value it has with those fetched from m1, and m2 and sees that all of them are in sync, i.e., they are consistent. It responds to the client with the value.

Now the client updates k0 with a new value. The same sequence of steps follow, and the version number is updated to one. When this update is in transit, m2 is not reachable from m0 due to network congestion, i.e., a network partition occurs.

m2-down-1-1.png

Recalling the CAP theorem – Our system can be either available or consistent in the event of a network partition.

If we want to design our key-value store to be consistent, we should reject the write and throw an error to the client.

If we want to design our key-value store for availability, we should accept the write even though we cannot reliably store it in all the machines in the cluster, i.e., maintain the consistency of the value.

Every distributed system has to make this trade-off.

Let our key-value store trade-off consistency for availability.

The update proceeds and the state now is: m0, and m1 have the version one of the key-value and m2 is still at version zero. Now, the coordinator m0 goes down due to a hardware fault, again a network partition; our cluster has only m1 and m2 now.

down-img.pngWe have to figure out a new coordinator for our cluster. Let m1 become the new coordinator and m2 its follower. We will ignore how and why m1 becomes the new coordinator; we will tackle that in another post.

If you are a keen reader, you will see that the cluster is in an inconsistent state now as m1 has version one of the key-value whereas m2 has version zero.

Now the client tries to read k0. The same sequence of events as earlier occur for read, but coordinator realizes that the cluster is in an inconsistent state.

How to resolve this inconsistency?

Distributed key-value stores make trade-offs to resolve this.

One option is to let the last write win. m1 sees that the value it has is the latest and updates that value in m2 too and then returns the value to the client.

This is prone to the notorious clock problem in distributed systems.

Another option is to let the client decide which value to keep. In the event of inconsistency, all the conflicting values are sent to the client, and the client is free to determine which value to retain. Once the client decides, the chosen value is updated in the cluster.

In our scenario, with the version numbers, m1 could have taken the call that it has the latest value and update it in m2 too before responding to the client; this is what real distributed key-value stores do. But, it is not possible to do this in complicated scenarios involving highly diverged versions. Vector/Lamport clocks and version numbers aid this process.

At this stage, it should be apparent as to why we store copies of the key-value in multiple machines: to avoid catastrophic loss of data in case of machine failure.

All machines in the cluster keep sending heartbeat messages to each other so that they are aware of non-reachable machines in the cluster. Gossip protocol is one of the ways to achieve this. This cluster membership information is proliferated to the client too.`

We have glossed over a lot of details and simplified things in the interest of readability. A real key-value store is much more complicated and can involve thousands of machines coordinating with each other for storage. After reading this, you should be able to make some sense of the many seminal papers in distributed systems.

Dynamo paper, Aerospike paper, Bigtable paper.