The purpose of this document is to explain how Apache Curator can be utilized for distributed leader election. Its application is aimed at addressing the need for high availability in the processing of a large dataset.
It will be useful to anyone who’s into trying to understand how to implement a solution for distributed leader election.
The problem
Currently, we have a dataset divided into partitions, and it is being processed by a single node. To enhance the system's availability, we intend to introduce a dynamic group of nodes responsible for processing the dataset. Each node will have the capability to exclusively process any given partition. In other words, only one node will be assigned to process a specific partition at any given time.
In order to handle increased workload, we have the flexibility to add or remove nodes from the pool as needed. This means that when nodes are added or removed, the partition workers must be redistributed accordingly. However, this redistribution can result in an imbalance of workload. To address this, we introduce the concept of a preferred node for each partition. This means that each partition will run on its preferred node whenever that node is available, helping to maintain a more balanced workload distribution.
Each node will maintain a full set of partition workers, which can exist in two states: active or latent. Only active workers perform actual work, and among the nodes, only one copy of each worker can be active. When a new node is added, it will initialize the entire partition worker set in the latent state. If any of these partition workers have the new node as their preferred node, they will eventually become active on that node while becoming latent on their former node. When a node is removed, all the active partition workers it was hosting will transition to an active state on other nodes.
The solution
To achieve this we’ll be using Apache Curator. For the active/latent status we’ll leverage the LeaderSelector recipe which will allow us to have exclusive access to a partition. An active partition worker will be the leader process, while latent partition workers will be the processes waiting to become leaders.
public void start() {
this.selector = new LeaderSelector(client, "/partitions/" + partition.getID(),
new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
try {
LOGGER.info("Node {} taking leadership of partition {}", node.getID(), partition.getID());
job.star(); // blocking call
LOGGER.info("Node {} gracefully finishing leadership of partition {}", node.getID(),
partition.getID());
} catch (Exception e) {
LOGGER.error(
"Error running worker process in node " + node.getID() + " as leader of partition "
+ partition.getID(), e);
throw e;
} finally {
relinquish();
}
}
});
client.getCuratorListenable().addListener((c, event) -> {
if (event.getType() == CuratorEventType.CLOSING) {
relinquish();
}
});
selector.autoRequeue();
selector.setId(node.getID());
selector.start();
LOGGER.info("Node {} started worker for partition {}", node.getID(), partition.getID());
}
We’ll have a scheduled process periodically checking for every partition worker if the current node is the preferred node for the partition. In case of not being the preferred one the worker will relinquish its leadership in that node. For this part we’ll make use of LeaderSelector participants to know what nodes are alive.
public boolean isLeaderNotPreferred() {
return isLeader() && isNotPreferred();
}
public boolean isLeader() {
return selector.hasLeadership();
}
private boolean isNotPreferred() {
return partition.getPreferredNodeID().map(preferredNodeID -> {
return !preferredNodeID.equals(this.node.getID()) && isNodeAlive(preferredNodeID);
}).orElse(false);
}
private boolean isNodeAlive(String nodeID) {
return getAliveNodeIDs().contains(nodeID);
}
public Set<String> getAliveNodeIDs() {
try {
return selector.getParticipants().stream()//
.map(Participant::getId)//
.collect(Collectors.toSet());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
This design ensures high availability. The system is able to respond to node failures, as well as, scale up/down if required by adding/removing nodes. The upper limit is the number of partitions. OTOH it requires some manual tweaking of the partition's preferred node to balance load among nodes.