For my 8-month co-op term at Hootsuite, I have been working on the Streaming team. The Streaming infrastructure allows services to receive real-time updates for a large user set. Streaming is responsible for delivering push notifications, analytics, and interaction history to subscribed Hootsuite users. One example is receiving a push notification on your mobile device for a Twitter retweet or mention from the Hootsuite application. Streaming is a distributed system and every complex distributed application needs a coordination and orchestration system of some sort, so the team decided to integrate ZooKeeper into the Streaming infrastructure back in 2011. ZooKeeper is a software project of the Apache Software Foundation, providing an open source distributed configuration service, synchronization service, leader election, and naming registry for large distributed systems.

One of my projects during my co-op term included fixing the previous implementation of ZooKeeper, as there were a number of things that were not properly working. Originally, ZooKeeper was embedded within the Streaming application, and there were a number of issues with this implementation.

Intro:

The Streaming system spawns an Akka Actor for the ZooKeeper logic on each of the servers within the distributed system. These servers are referred to as nodes within ZooKeeper. Each node is responsible for creating the ZooKeeper Client and registering the node to the ZooKeeper server.

Network Partitions:

One of the first problems with the implementation was that the system was unable to recover from a network partition. When a network partition would occur (ie. the ZooKeeper Server is unable to reach the Client on any given node), this implementation would have the nodes attempt to reconnect to the ZooKeeper server a maximum of 4 times before shutting down the entire Streaming application! Even if the server did become reachable within the 4 reconnection attempts, only the ZooKeeper client was created and the ZooKeeper server would have no record of the recovered node. This means that the Zookeeper client was unable to register the node.

Duplicate Nodes:

This implementation of ZooKeeper also caused some strange behaviour in terms of duplicate nodes created within the ZooKeeper server. Because ZooKeeper is used as the source of truth for alive and running nodes, if there are duplicates, Streaming will not work correctly because it is only expecting one node of a given type.

Handling Exceptions:

Another issue with this implementation was the handling of exceptions being thrown from ZooKeeper. When creating a ZooKeeper client:

  • a watcher function is passed to the creation method in the ZooKeeper library.
  • this function is called when the Server and Client have successfully established a connection. One important note is that ZooKeeper has intended for a watcher’s lifetime to only include the triggering of one event. This event can include having a session expire or being disconnected from the client.
  • Once an event occurs, an exception is thrown and it is up to the client to handle them accordingly.

This implementation of ZooKeeper varied in how it handled these exceptions. It would either attempt to reconnect to the ZooKeeper server a maximum of 4 tries, reconnect an infinite amount, or shutdown the application depending on the node. This was unacceptable because there should never be a scenario in ZooKeeper that is not recoverable and would require you to shutdown the entire Actor system.

Possible Solution:

One attempt to fix this implementation was to create a ‘ZooKeeper Keeper’. This was an external process, which was created to watch the ZooKeeper client on the node in the case of a network partition.

Zookeeper keeper (1)

If there was a network partition, this keeper process would be able to identify if the Java Streaming process was still running, and therefore would avoid shutting down the system and disconnecting active users on open streams. The main problem with this approach was not fixing the original reconnect logic, as well as adding complexity with an external process which requires extra effort to maintain.

Fixing ZooKeeper:

The first thing I wanted to do was remove the external process and instead have ZooKeeper embedded within the Streaming project. This meant reverting back to the original implementation and fixing the reconnect logic, how we handled exceptions, and dealing with the duplicate nodes.

Reconnect Logic:

In order to fix the reconnection logic, I wanted to add all of the path and node creation logic within the watcher function. Because this function is called after a ZooKeeper Client is successfully created and connected, it made sense to only create a ZooKeeper node and path at this time. This ensures that a watcher is always actively watching the node created and will be created again after the watchers’ lifespan has expired due to a disconnect or session expire event.

In the following code, the watcher function is shown as well as creation of a ZooKeeper client which takes three arguments: the ZooKeeper hosts, the session timeout, and the watcher function.


private val watcher: ZooKeeperClient => Unit = {
zookeeper =>
val children = zookeeper.getChildren(znodeParentPath)
// create the node and path if child doesn’t already exist in the parent path
if (!children.contains(znodeName)) {
// create parent path. If it already exists, nothing happens
zookeeper.createPath(znodeParentPath)
try {
// create node is different for dispatchers to account for leader election
if (znodeParentPath.contains("/dispatch/node")) {
// create sequential ephemeral node so that lowest node in the sequence is elected the leader
zookeeper.create(znodeParentPath + makeNode(ServiceConfiguration.hostname, ServiceConfiguration.port), CreateMode.EPHEMERAL_SEQUENTIAL)
// need a watch method on the children of dispatch/node path to possibly re-elect leader if any node changes
zookeeper.watchChildren("/dispatch/node", children => {
// send a message to alert that children have changed
self ! CandidatesChanged
})
// elect a new dispatcher leader
self ! ClusterLeader
} else {
val znodePath = znodeParentPath + "/" + znodeName
// create an ephermal node for all other nodes
zookeeper.create(znodePath, CreateMode.EPHEMERAL)
}
} catch {
case e: Exception => EventHandler error(e, this, "createPath")
}
}
}
val zk = new ZooKeeperClient(ServiceConfiguration.zookeeper.hostsList, 3000, ServiceConfiguration.zookeeper.root, watcher)

The create method which is used to create a node takes in two arguments: a node path as well as a CreateMode type. For the first argument, Streaming separates all of our nodes into unique znodePath’s. For example, our harvester nodes are stored under /streamingBase/harvester/twittersite/host/, where as our dispatcher nodes are stored under /streamingBase/dispatcher/host/. Next, the CreateMode value determines how the node is created on ZooKeeper. The ephemeral mode ensures that nodes exist as long as the session that created the node is active. When the session ends or is disconnected, the node is deleted. The other CreateMode type in ZooKeeper is persistent mode which maintains the node in the ZooKeeper server even if the session or connection to the client has ended. One of the main reasons we chose to use the ephemeral mode is that it makes the handling of exceptions from ZooKeeper much easier, since you never have to worry about manually removing nodes from the server once a session is expired or disconnected.

Also, the reason we handle dispatchers differently than any other node is because they utilize leader election. Therefore, it is necessary to add a watchChildren method on these nodes to ensure that if any of the dispatcher nodes lose connection or expire, a leader election is performed again.

Fixing Exception Handling:

Now that the reconnect logic works, it makes handling the exceptions easy. In the case of a network partition, we want to try to reconnect indefinitely. The reason for this is that Streaming has fallback logic built in, so if a node is unreachable, another node will be re-elected to do the work that was done on the unreachable node. This means that it is safe to continuously try to reconnect to the disconnected node. If this node does become reachable, the node that was the fallback node will just continue to run and ZooKeeper will simply register the reconnected node. This node will become a fallback node in case any other node(s) become unreachable. Because we are trying to reconnect indefinitely, there is no reason to shutdown the application.

The following code shows the handling of exceptions thrown from ZooKeeper.


...
// Attempt to reconnect with a connectionLoss exception
case e1: org.apache.zookeeper.KeeperException.ConnectionLossException => {
zkConnectionLostCounter += 1
EventHandler.warning(this, "Zookeeper ConnectionLossException occurred. Counter is at %d".format(zkConnectionLostCounter))

// once counter is maxed at 4 attempts, send actor a NodeKill message
if (zkConnectionLostCounter > 4) {
EventHandler.info(this, "connection attempt at %s".format(zkConnectionLostCounter))
EventHandler.warning(this, "counter reached max. Sending NodeKill to controller.")
controller ! NodeKill
} else {
//do nothing
}
}
// SessionExpired exception is unrecoverable using the original ephemeral node so send actor a NodeKill message
case e2: org.apache.zookeeper.KeeperException.SessionExpiredException => {
EventHandler.warning(this, "Zookeeper session expired. Sending NodeKill to controller.")
controller ! NodeKill
}
...

In the case of the ConnectionLossException, there is retry logic attempting to reconnect, using the original ephemeral node, a maximum of four times before sending itself a NodeKill message. When a NodeKill message is received, the actor will send itself a Poison Pill. For those not familiar with the Akka Actor system, the Poison Pill will kill the actor. This ensures that the actor will be respawned and will attempt to create a new ZooKeeper client and node. If it is unable to create a new client, it will enter the exception handling again, retrying up to 4 times before respawning itself. On the other hand, a SessionExpiredException is unrecoverable using the original ephemeral ZooKeeper node. Therefore, you are required to create a new ZooKeeper client. This is why the exception is handled by sending a Poison Pill to itself so that the actor will be respawned.

Apache Curator Framework:

It should be noted that the Apache Curator Framework is a high-level API that greatly simplifies using ZooKeeper. It adds many features that build on ZooKeeper and handles the complexity of managing connections to the ZooKeeper cluster and retrying operations. However, given the complexity of Streaming’s previous logic, I decided to retrofit this solution into the existing codebase instead.

Fixing Duplicate Nodes:

In the old code, ZooKeeper Clients, node paths, and node creation were implemented in a number of different places. It was very difficult to keep track of which Actors were actually creating these and more importantly, why. I wanted to simplify the creation of the client and nodes and moved the logic into one abstract Controller class, which all the Actors extend.

Duplicates (1)

This ensures that all of the creation logic is centralized into one place for all of the Actors implementing a ZooKeeper Client and node, thereby removing the possibility of duplicate nodes being created.

Testing:

In order to test the ZooKeeper logic, I relied heavily on iptables to mimic a network partition. This way I could test how ZooKeeper would respond when certain nodes became unreachable.


# allow port to be open for ssh access
sudo iptables -I INPUT -p tcp --dport 22 -j ACCEPT

# allow port to be open for remote debugger (if using one)
sudo iptables -I INPUT -p tcp -m tcp --dport 8000 -j ACCEPT

# drops all traffic
sudo iptables -I INPUT 3 -j DROP

The DROP rule drops everything except for the rules above it. This is why the ‘-I INPUT 3’ is required so that it is inserted into position three and drops all traffic except for the rules in the positions above it. You can be more specific with what you want to drop, but blocking all traffic seemed to be sufficient enough for this scenario.

I also tested using a lot of logging to ensure that leader election, network partitions, fallback nodes, and retry logic were all working correctly.

Conclusion:

In a distributed environment, monitoring different parts of the system and ensuring they are working properly is an extremely hard problem. However if ZooKeeper is implemented correctly, it provides effective distributed monitoring and coordination, which is crucial for a large and vital system such as Streaming.

About the author: Brittany is going into the final year of her undergrad at UBC, where she studies computer science. She is a Scala newbie and a big sports nut. Follow her on Twitter: @brit_roesch