/L8Again

Musings about distributed data infrastructure, cloud, high performance computing, and a bit of philosophy.

I am interested in high performance computing in general, and more specifically distributed systems and cloud services (admittedly biased towards AWS). I work in the data infrastructure team of a social media company that handles user generated content (UGC) for major retailer sites.

Deleting hints in a Cassandra node

This is primarily to remind myself of an often useful command of clearing hints in my cassandra node when faced with a "stop the world" kind of scenario. To be clear, always be sure to repair the ring before proceeding to clear hints on any node. Also, this should be an exception and not a norm. With those disclaimers out of the way, here goes:

The "hints" column family is where the hints are stored in Cassandra 1.2.x and onwards. Each row key is an endpoint with pending hints. 

cassandra-cli -h <host>

use system;
del hints['a095ce27-c240-4937-8b06-f744510c528f'];
row removed.
Elapsed time: 17 msec(s).

list hints;
Using default limit of 100
Using default cell limit of 100
-------------------
RowKey: a095ce27-c240-4937-8b06-f744510c528f

1 Row Returned.
Elapsed time: 1707 msec(s).

Notice that Cassandra still shows the row key that we just deleted. This is because it is marked with a tombstone and therefore we still see the key but no columns are returned. Also, the listEndpointsPendingHints() of HintedHandoffManager will still return 1, until the hints column family is compacted. To force a major compaction, run the following:

nodetool flush system hints && nodetool compact system hints

Maven-Process-Plugin: Run External Processes for your Maven Integration Tests

Maven-Process-Plugin is a simple Maven plugin I developed that would start multiple external processes before your integration tests run (pre-integration-test phase) and then terminate them after the integration tests finish (post-integration-test phase). My company Bazaarvoice has graciously open sourced this plugin.

Simply put, this plugin allows you to improve end to end testing as a part of your maven build by allowing you to start multiple processes in the order you want, in pre-integration test phase, and then stops all the processes after the integration tests are run. Simply add the following plugin to your POM file:

<plugin>
<groupId>com.bazaarvoice.maven.plugins</groupId>
<artifactId>process-exec-maven-plugin</artifactId>
<version>0.4</version>
<executions>
 <!--Start process-->
 <execution>
<id>start-jar</id>
<phase>pre-integration-test</phase>
<goals><goal>start</goal></goals>
<configuration>
 <workingDir>app</workingDir>
 <arguments>
<argument>java</argument>
<argument>-jar</argument>
<argument>app.jar</argument>
 </arguments>
</configuration>
 </execution>
 <!--Stop Process-->
<execution>
 <id>stop-jar-process</id>
 <phase>post-integration-test</phase>
 <goals><goal>stop-all</goal></goals>
</execution>
</executions>
</plugin>

I will let you look at the github readme page for details. Here, I would like to talk about some of the most frequently used applications of this plugin.

  1. Distributed Systems Testing:  Anyone dealing with distributed systems knows well about the challenge of testing and debugging such systems. This plugin allows us to write better integration tests that are closer to the production environment. It makes it really easy to spin up several distinct hosts of our application locally, and then run integration tests specifically designed to test distributed algorithms in the code. Equipped with this plugin, we can have small hooks built into our services for testing fault tolerance, introducing lag, etc. Although these tests may still not be deterministic in some cases, it brings us much closer to control quality and build confidence in complicated distributed algorithms.
     
  2. End-to-End Testing: In a service oriented architecture, services have several external dependencies. Usually testing requires us to mock up these dependencies, and hard code responses from these dependencies. Using maven-process-plugin, you can eliminate the reliance on mocked up objects and actually start dependencies locally allowing you to write richer tests. Of course, depending on your use case, your mileage may vary.  
     
  3. Testing client-side SDKs: Very often, the service you are creating needs to be consumed by other services. Hence, the need for client side SDKs (sometimes in multiple languages). In this case, not only does the plugin help in water-tight test cases for the client SDK, but your consumers now have a running example in the form of your integration tests to easily see how to use your service.

As always, let me know of your comments / suggestions / feedback should you get a chance to use this plugin.

Immutable Updates - part 1

Immutability is the corner stone of our internal data store, which is backed by Cassandra. I would highly recommend to go over the immutability slides of Marz's presentation, to get a quick context on immutable updates and their immediate advantages.

In our data store, a row is basically a sequence of immutable updates, or deltas. A delta can be an update to an existing property (or column), an addition of a new property, or even a delete of the entire row or a property. For example, a document is stored in the following way in Cassandra, where each delta is a column of the row.

Δ1 { "rating": 4,
"text": "I like it."}
Δ2 { .., "status": "APPROVED" }
Δ3 { .., "status": if "PENDING" then "REJECTED" end }

Now, when a document is requested, the reader (internal system reader process, not the end user) resolves all the above deltas (Δ1, Δ2, Δ3) in that order, and produces the following document:

{ "rating": 4,
"text": "I like it.",
"status": "APPROVED" }

Notice that Δ3 is a conditional delta that says "only change the field status if the existing value is PENDING". This basically results in a no-op in our case, since the status is already changed to APPROVED. We will go into conditional deltas and its uses in Part 2 of this blog. The point to note here is that even though Δ3 does not have any effect to the resulting document ( in other words, Δ1 + Δ2 = Δ1 + Δ2 + Δ3), it is still stored in the row and serves an important purpose in conflict resolution as we will see soon. It is also easy to see, that storing a row in this format makes it easy to watch for events, and trigger applications so they can get the latest state of the row, and behave accordingly.

To design such a system in a scalable way with multiple data centers across different global regions comes with its fair share of challenges.

Immutability, eventual consistency, and conflict resolution

Immutability and eventual consistency go hand in hand. So, even if you receive deltas out of order, as long as the resolution of deltas are done in a sequence we can rest assured that eventually the document will get to a consistent state. 


Let's take an example of a document life cycle that depicts out of order deltas scenario. Check out the topology below. The right hand side is our data center in EU, and on the left hand side we have the US data center.

Consistent State

There are some applications in each region that listen on the databus to watch changes in the documents. The "Display" application on either region is a display application that displays the current state of a document. Assume that we moderate reviews for a client who is in Europe (EU), while our moderators operate in US. The "Client Portal" is used by clients that can also moderate their own reviews, and "CMS" is an app in US that is used by moderators in US.

Consider the time sequence below:

T1: Both regions have a consistent state of the review where status is "PENDING"
T2: Client in EU writes a new "delta" that sets the status to "REJECTED_CLIENT"
T3: Moderator in US updates the document with a "conditional" delta that updates status = approved, IFF status = PENDING

Let's say T2, and T3 are really close, and US doesn't see Δ2 until after it writes Δ3. This is depicted in the figure below:

Inconsistent State

Notice that in the absence of Δ2, the review gets approved and published in US - (not good, but its OK as we will see). However, when Δ3 arrives, the document will achieve a consistent state and the review will be placed in a rejected state. Since all deltas are ordered by timeUUID, we know that the conditional delta, Δ3 fails in the presence of Δ2. So, for a brief period our document was inconsistent, but achieved full consistency eventually.

Eventually consistent state

Points to note is that eventual consistency is truly achieved with the help of immutable updates. Also, note that we never had to "modify" any of our deltas. The above scenario typically results in conflict resolution, but by virtue of immutable updates, no cross-data center synchronization was needed, and neither was any conflict resolution required. Sure, the said review may have been published, but then it was taken off the website automatically.

Compaction

What if the rows get updated too often? Or even if they don't, then over time they accrue a lot of updates causing the row to become really wide. The writes will still be OK, but the read latency becomes really high as the system tries to consolidate all those deltas into one document. A row can become prohibitively wide to the point where it couldn't be resolved at read time. This is where compaction helps. As the name suggests, compaction resolves several deltas, and replaces them with one "compacted" delta. The next time the read happens, it will only see a compaction record, and the read latency issue is resolved.

Great. But there is a major challenge that comes with compaction in a multi-datacenter cluster. When is the best time to compact rows on a local node in a data center? More importantly, what if an older delta arrives after we are done compacting? If we arbitrarily decide to compact rows every five minutes, then we run the risk of losing deltas that may be in flight from a different data center.

The solution to this is figure out what deltas are fully consistent on all nodes, and only compact those deltas, which basically is to say, "Figure out what time (t) in the past, before which all deltas are available on all nodes". This t, or full consistency timestamp, can be calculated as described in my previous blog post. Full consistency timestamp assures us that no deltas will ever arrive with a time UUID before this timestamp. Thus everything before the full consistency timestamp can be compacted without any fear of data loss. So, now even in this eventually consistent world, we can deterministically handle high update rate by employing continuous partial compactions on our rows, so they never get so wide that it adversely affects read latency.
 

Full consistency lag for eventually consistent systems

A system is said to be fully consistent, when all nodes in a distributed system have the same state of the data. So, if record A is in State S on one node, then we know that it is in the same state in all its replicas.

Now that we are on the same page, let's answer the following questions.

When is the last time my system was fully consistent?

The correct answer is - maybe never. Maybe your throughput is so massive, there may be no point in time when you can say a system is fully consistent. But, the correct question to ask is maybe this:

When is the last time before which I can be assured that all updates were fully consistent on all nodes?

That is a slightly different question. So, while my system may not be fully consistent at this very moment, how far back do I have to go in time when all the updates are consistent? In other words, what is the full consistency lag?

To me, this is a vital metric for an AP system. In the practical world there is always a tolerance level for how long a particular row can remain inconsistent. Facebook, for example, can tolerate eventually consistent comments on a given story for only so long, until its users start questioning the wisdom of it all. So, while your buddy may not see the comment you posted instantly, he may see it 2 seconds later than you see it - which is fine. However, 10 minutes? Maybe, that's ok too, but now some people need to be alerted. So, the question arises what are your SLAs, and how can you make sure you keep your promises to your customer. Promises, such as, your system will be fully consistent within, say 5 minutes.

I was actually surprised when I couldn't find anything similar out-of-the-box. So, here is one way of calculating full consistency lag on Cassandra: 

Tf = Time no hints were found on any node
rpc_timeout = Maximum timeout in cassandra that nodes will use when communicating with each other.

FCL = Full Consistency Lag

FCL = Tf - rpc_timeout

The concept of Hinted Handoffs was introduced in Amazon's dynamo paper as a way of handling failure. This is what Cassandra leverages for fault-tolerant replication. Basically, if a write is made to a replica node, which is down, then Cassandra will write a "hint" to the coordinator node, and try again in a configured amount of time.

We exploit this feature of cassandra to get us our full consistency lag. The main idea is to poll all the nodes to see if they have any pending hints for other nodes. The time when they all report zero (Tf) is when we know that there are no failed writes, and the only pending writes are those that are in flight. So, subtracting the cassandra timeout (rpc_timeout) will give us our full consistency lag.

Now, that we have our full consistency lag, this metric can be used to alert the appropriate people when the cluster is lagging too far behind. 

Finally, you would want to graph this metric for monitoring.

Screen Shot 2014-02-04 at 6.15.47 AM.png

Note that in the above graph I artificially add a 5 minute lag to our rpc_timeout value. So, the ideal value should always be at 300 seconds. We, then, periodically poll for full consistency check every 300 seconds (or 5 minutes). You should tweak this value according to your needs. So, for our settings, the expected lag should remain consistent at 5 minutes, but you can see it spike at 10 minutes. All that really says is there was one time when we checked and found a few hints. The next time we checked (after 5 minutes in our case) all hints were taken care of. You can now set an alert in your system that should wake people up if this lag violates a given threshold - something that makes sense for your business.