Elasticsearch has always been good to us at Forter. It’s truly reliable. It’s damn good at searching, and it’s a solid distributed application as well. It balances a cluster relatively quickly, and it is easy to configure and easy to backup/restore, etc., but… we were haunted by an issue which had a catastrophic effect on our system.
The Observed Issue – Chain of Events:
- An under-utilized ES cluster needs downscaling;
- De-allocating of all shards from the nodes planned for termination via Shard Allocation Filtering;
- Wait for the cluster state to become “green” again;
- Terminate the nodes that are now vacant;
- Issue: All ES operations from the Java clients freeze.
We use Elasticsearch in a Storm topology. Storm is a Java distributed streaming framework. We use Storm to do some real-time fraud prevention, and it’s essential for business that our Storm application returns an answer in under 1 second. We use the official Elasticsearch Java REST API client (latest, 5.5.3) because we care about forward compatibility. The ES REST client is a wrapper around a version of Apache Async HttpClient (4.1.2). Actually, due to the nature of Java and our application infrastructures, we had to migrate all of our previously HTTP blocking clients to async when we adopted the new ES REST client, so you can say that we are pretty invested.
What do I mean by, “ES operations from the Java clients freeze,” you might ask? Well, usually when using an async HTTP client you can start a request and use a Future object to wait for the response. This waiting process happens on a separate thread to where any I/O might happen, and that really comes in handy when in need of an accurate timeout mechanism. Client-side timeouts are indeed supported in the ES REST client API via the SyncResponseListener class. In other words, using a SyncResponseListener object that was initialized with a 5 second timeout should guarantee that the request would take no more than 5 seconds, regardless of the cluster state. However, that is not what we were observing: Instead, we see all clients freezing completely for long periods of time the minute nodes are terminated. We expected the clients to timeout any requests to the terminated nodes after the determined timeout, having them instead freeze is terrible for our system. It’s the kind of bug we simply can’t afford to guess at in order to fix. It’s also the kind of bug that involves a nuanced interplay between at least three different processes: 2 ES nodes and a Java client.
With no errors other than timeouts anywhere to be found, there wasn’t much for us to do short of replicating the issue locally.
Reproducing Issue Locally
To replicate the issue I needed an ES cluster: 2 VBox machines running the latest Elasticsearch docker image were enough – I didn’t just use two docker containers on my machine because I wanted them to have host-network IPs (VBox bridged network adapter, no NAT involved). For the Java client I used a Kumulus (shameless self promotion, lightweight Storm implementation), topology with a single component: An ES client that indexes one document every second to the cluster. The ES REST client was built with sniffing and on-failure hook support. The client construction code looked like this:
Right off the bat I noticed something strange here: The builder pattern used for the initialization of restClient is very common in Java. Its semantics imply that once you call the build() method you construct a new “final” object. While obviously syntactically possible, it is a bit confusing to have it get an empty instance of SniffOnFailureListener that would get its setSniffer() called after the build() method that was already executed with the “final” instance created. Also, notice how sniffOnFailureListener eventually contains a reference to sniffer that contains a reference to restClient. That’s while sniffOnFailureListener is used in the initialization of that very same restClient. It isn’t hard to imagine a situation in which this kind of reference chain can lead to bugs of a circular nature. I had an inkling that a thread synchronization issue was the culprit, so now I needed a thread dump.
With my cluster online I started the tester app and produced some output for the valid state:
15:09:39.914 [KumulusThread-26] DEBUG com.forter.es.EsClientSnifferIT - StatusCode: 201, Result: created, SuccessShards: 2, FailedShards: 0 15:09:40.436 [pool-8-thread-1] DEBUG com.forter.utils.elasticsearch.ForterElasticsearchHostsSniffer - Sniffing ES hosts 15:09:41.121 [KumulusThread-27] DEBUG com.forter.es.EsClientSnifferIT - StatusCode: 201, Result: created, SuccessShards: 2, FailedShards: 0
One log line should be produced every ~1.1 second since the timeout in the requests is 100 milliseconds and I sleep 1 second in between them. Next, I need to terminate a machine (I chose to skip the deallocation step).
Side note: Another thing that comes in handy with VBox machines is that you can pause them. In that case, all processing stops, but the IP address isn’t returning connection refused errors like when you kill an underlying service. Instead, you are at the mercy of the network in search of a missing IP address. That’s perfect because it is exactly what happens when we terminate our cloud instances in production.
Initially after terminating the slave machine, things go as expected:
15:15:18.427 [KumulusThread-32] ERROR com.forter.es.EsClientSnifferIT - Got exception: TimeoutException 15:15:19.740 [KumulusThread-33] ERROR com.forter.es.EsClientSnifferIT - Got exception: TimeoutException 15:15:21.055 [KumulusThread-34] ERROR com.forter.es.EsClientSnifferIT - Got exception: TimeoutException
Those TimeoutException errors are good. They tell us the request failed, but it did so without blocking more than it should. Note that a line is printed every ~1.1 second. But that’s only for the first 3 seconds. Once a premature sniffing call is initiated due to use of setFailureListener() in building the client, things break. Next on the log is:
15:15:21.957 [pool-7-thread-1] DEBUG com.forter.utils.elasticsearch.ForterElasticsearchHostsSniffer - Sniffing ES hosts
And then, silence. The time is 15:15:21 and the next line on the log appears at 15:15:51. This tells the whole story:
Dec 25, 2017 3:15:51 PM org.elasticsearch.client.sniff.Sniffer sniff SEVERE: error while sniffing nodes java.io.IOException: listener timeout after waiting for  ms at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:617) ... at com.forter.utils.elasticsearch.ForterElasticsearchHostsSniffer.sniffHosts(ForterElasticsearchHostsSniffer.java:69) ... at org.elasticsearch.client.sniff.Sniffer.sniffOnFailure(Sniffer.java:59) ... at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:134) at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:168) at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:561)
The thread dump shows a similar stack trace on pool-7-thread-1 (note the call to sniffHosts):
"pool-7-thread-1" #68 prio=5 os_prio=31 tid=0x00007fc37b262800 nid=0x14603 waiting on condition [0x0000700006539000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007bff88000> (a java.util.concurrent.CountDownLatch$Sync) ... at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:616) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:212) ... at com.forter.utils.elasticsearch.ForterElasticsearchHostsSniffer.sniffHosts(ForterElasticsearchHostsSniffer.java:69) ... at org.elasticsearch.client.sniff.Sniffer.sniffOnFailure(Sniffer.java:59) ... at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.connectionRequestFailed(AbstractClientExchangeHandler.java:335) ... at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:561) ... - locked <0x00000007bff90328> (a org.apache.http.impl.nio.reactor.SessionRequestImpl) at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210) ... "KumulusThread-37" #52 daemon prio=5 os_prio=31 tid=0x00007fc377a7c000 nid=0x7e03 waiting on condition [0x0000700005509000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000760661018> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) ... at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at org.apache.http.nio.pool.AbstractNIOConnPool.lease(AbstractNIOConnPool.java:272) at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.requestConnection(PoolingNHttpClientConnectionManager.java:266) ... at com.forter.utils.storm.bolts.elasticsearch.AbstractElasticsearchRestBolt.execute(AbstractElasticsearchRestBolt.java:142) ...
So the Kumulus thread (KumulusThread-37) that’s supposed to block for a maximum of ~1.1 seconds is currently stuck on a lock at AbstractNIOConnPool.lease(AbstractNIOConnPool.java:272) which is held by Apache HTTP client at thread pool-7-thread-1 AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:561).
Eureka! (Plus, workarounds fresh out the oven)
Basically, what happened here was that the ES REST client registered a callback to the Apache client’s onTimeout hook, and that callback is performed on a computation thread belonging to the Apache async HTTP client. The sniffer opted to do the sniffing operation I/O (an API call) on that thread, which is a bug. You are not supposed to block those threads for long, and definitely not supposed to do any blocking I/O operations on them. If you do, as demonstrated in our production env, all subsequent operations on the same client will be blocked until the I/O finishes (sniffing request times-out or finishes).
I submitted a PR to fix the issue to the Elastic repository.
In the meantime, if you are encountering this issue, the following workarounds can be applied:
- Not using setFailureListener()
- Killing the ES services on the de-allocated nodes manually before terminating them to produce connection refused errors in clients until they sniff the new cluster state (as speculated by my learned friend Moshe prior to this reproduction).
Conclusions from a day’s work:
- From Apache HC’s perspective, you can say that the fault was with the user. The API was simply misused. See section 3.2.1 of the guide, titled I/O dispatchers. It says clearly you shouldn’t block I/O dispatcher threads. But that’s a pretty easy detail to inadvertently ignore when implementing an interface. Java is missing a lot of fundamental language support for async operations so async implementations in Java tend to be cumbersome and complex (Apache Async HC as a perfect example). I wonder if an HTTP client written using Kotlin coroutines can make that API clearer in the JVM.
- The weirdness of the API, where the reference to the ES REST client was passed into its own member hierarchy turned out not to be the issue after all, but in some magical way the issue was hiding right around that corner. Coincidence?
- Connection refused is my favorite of all errors. It happens fast and it tells you exactly what the problem is. Why can’t all errors be like that?