public class ConsistentHashUtil{
private static final Logger log = LoggerFactory.getLogger(ConsistentHashUtil.class);
private static final int REPLICA_NUMBER = 500;
private static final String VIRTUAL_NODE_POSTFIX = "$$VN";
public Node doDispatch(List<Node> nodes, String taskId) {
if (nodes == null || nodes.size() == 0 || StringUtils.isBlank(taskId)) {
throw new IllegalArgumentException("cluster list and task id can not be null.");
}
if (nodes.size() == 1) {
return nodes.iterator().next();
}
Map<String, Node> taskNodeMap = new HashMap<>();
nodes.forEach(node -> taskNodeMap.put(node.getHost(), node));
//虚拟节点:{hashCode: 12.10.92.20:8080$$VN1, hashCode: 12.10.92.20:8080$$VN2}
SortedMap<Integer, String> virtualNode = new TreeMap<>();
//每台真实服务器对应500台虚拟服务器
for (int i = 0; i < REPLICA_NUMBER; i++) {
for (Node node : nodes) {
String realNodeName = node.getHost();
String virtualNodeName = realNodeName.concat(VIRTUAL_NODE_POSTFIX).concat(String.valueOf(i));
int virtualNodeHash = getHashCode(virtualNodeName);
virtualNode.put(virtualNodeHash, virtualNodeName);
}
}
int taskHashCode = getHashCode(taskId);
SortedMap<Integer, String> tailMap = virtualNode.tailMap(taskHashCode);
Integer closestHash;
if (tailMap.isEmpty()) {
//此时taskId的hash值最大,取虚拟节点中第一个节点
closestHash = virtualNode.firstKey();
} else {
//否则取最靠近taskId的hash值第一个虚拟节点
closestHash = tailMap.firstKey();
}
String realNodeName = virtualNode.get(closestHash);
if (StringUtils.isNotBlank(realNodeName)) {
Node selectedNode = taskNodeMap.get(realNodeName.substring(0, realNodeName.indexOf("$")));
return selectedNode;
} else {
throw new IllegalStateException();
}
}
private static int getHashCode(String str) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < str.length(); i++) {
hash = (hash ^ str.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
return hash < 0 ? Math.abs(hash) : hash;
}
}