Madan Jampani
Committed by Gerrit Code Review

ClusterService implementation that relies on accrual failure detector for determ…

…ining node up/down status.
Initially off by default, until futher testing is done.

Change-Id: I0ac8850d76af717e7804d4503bedb227d5894a0a
...@@ -32,6 +32,13 @@ ...@@ -32,6 +32,13 @@
32 <description>ONOS Gossip based distributed store subsystems</description> 32 <description>ONOS Gossip based distributed store subsystems</description>
33 33
34 <dependencies> 34 <dependencies>
35 +
36 + <dependency>
37 + <groupId>org.apache.commons</groupId>
38 + <artifactId>commons-math3</artifactId>
39 + <version>3.2</version>
40 + </dependency>
41 +
35 <dependency> 42 <dependency>
36 <groupId>org.onosproject</groupId> 43 <groupId>org.onosproject</groupId>
37 <artifactId>onos-core-serializers</artifactId> 44 <artifactId>onos-core-serializers</artifactId>
......
1 +package org.onosproject.store.cluster.impl;
2 +
3 +import static org.onlab.util.Tools.groupedThreads;
4 +import static org.slf4j.LoggerFactory.getLogger;
5 +
6 +import java.io.File;
7 +import java.io.IOException;
8 +import java.net.InetAddress;
9 +import java.net.NetworkInterface;
10 +import java.net.SocketException;
11 +import java.util.Enumeration;
12 +import java.util.Map;
13 +import java.util.Set;
14 +import java.util.concurrent.ExecutorService;
15 +import java.util.concurrent.Executors;
16 +import java.util.concurrent.ScheduledExecutorService;
17 +import java.util.concurrent.TimeUnit;
18 +import java.util.stream.Collectors;
19 +
20 +import org.apache.felix.scr.annotations.Activate;
21 +import org.apache.felix.scr.annotations.Component;
22 +import org.apache.felix.scr.annotations.Deactivate;
23 +import org.apache.felix.scr.annotations.Reference;
24 +import org.apache.felix.scr.annotations.ReferenceCardinality;
25 +import org.apache.felix.scr.annotations.Service;
26 +import org.onlab.netty.Endpoint;
27 +import org.onlab.netty.Message;
28 +import org.onlab.netty.MessageHandler;
29 +import org.onlab.netty.NettyMessagingService;
30 +import org.onlab.packet.IpAddress;
31 +import org.onlab.util.KryoNamespace;
32 +import org.onosproject.cluster.ClusterAdminService;
33 +import org.onosproject.cluster.ClusterEvent;
34 +import org.onosproject.cluster.ClusterEventListener;
35 +import org.onosproject.cluster.ClusterService;
36 +import org.onosproject.cluster.ControllerNode;
37 +import org.onosproject.cluster.ControllerNode.State;
38 +import org.onosproject.cluster.DefaultControllerNode;
39 +import org.onosproject.cluster.NodeId;
40 +import org.onosproject.event.AbstractListenerRegistry;
41 +import org.onosproject.event.EventDeliveryService;
42 +import org.onosproject.store.serializers.KryoNamespaces;
43 +import org.onosproject.store.serializers.KryoSerializer;
44 +import org.slf4j.Logger;
45 +
46 +import com.google.common.collect.ImmutableSet;
47 +import com.google.common.collect.Maps;
48 +
49 +import static com.google.common.base.Preconditions.checkNotNull;
50 +import static com.google.common.base.Preconditions.checkArgument;
51 +
52 +/**
53 + * ClusterService implementation that employs an accrual failure
54 + * detector to identify cluster member up/down status.
55 + */
56 +@Component(immediate = true, enabled = false)
57 +@Service
58 +public class ClusterManager implements ClusterService, ClusterAdminService {
59 +
60 + private final Logger log = getLogger(getClass());
61 +
62 + protected final AbstractListenerRegistry<ClusterEvent, ClusterEventListener>
63 + listenerRegistry = new AbstractListenerRegistry<>();
64 +
65 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 + protected EventDeliveryService eventDispatcher;
67 +
68 + // TODO: make these configurable.
69 + private static final int HEARTBEAT_FD_PORT = 2419;
70 + private static final int HEARTBEAT_INTERVAL_MS = 100;
71 + private static final int PHI_FAILURE_THRESHOLD = 10;
72 +
73 + private static final String CONFIG_DIR = "../config";
74 + private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
75 +
76 + private ClusterDefinitionStore clusterDefinition;
77 +
78 + private Set<ControllerNode> seedNodes;
79 + private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
80 + private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
81 + private NettyMessagingService messagingService = new NettyMessagingService();
82 + private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
83 + groupedThreads("onos/cluster/membership", "heartbeat-sender"));
84 + private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
85 + groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
86 +
87 + private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
88 +
89 +
90 + private PhiAccrualFailureDetector failureDetector;
91 +
92 + private ControllerNode localNode;
93 +
94 + private static final KryoSerializer SERIALIZER = new KryoSerializer() {
95 + @Override
96 + protected void setupKryoPool() {
97 + serializerPool = KryoNamespace.newBuilder()
98 + .register(KryoNamespaces.API)
99 + .register(HeartbeatMessage.class)
100 + .build()
101 + .populate(1);
102 + }
103 + };
104 +
105 + private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
106 +
107 + @Activate
108 + public void activate() {
109 +
110 + File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
111 + clusterDefinition = new ClusterDefinitionStore(clusterDefinitionFile.getPath());
112 + try {
113 + seedNodes = ImmutableSet.copyOf(clusterDefinition.read());
114 + } catch (IOException e) {
115 + log.warn("Failed to read cluster definition.", e);
116 + }
117 +
118 + seedNodes.forEach(node -> {
119 + allNodes.put(node.id(), node);
120 + nodeStates.put(node.id(), State.INACTIVE);
121 + });
122 +
123 + establishSelfIdentity();
124 +
125 + messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
126 +
127 + try {
128 + messagingService.activate();
129 + } catch (InterruptedException e) {
130 + Thread.currentThread().interrupt();
131 + log.warn("Failed to cleanly initialize membership and"
132 + + " failure detector communication channel.", e);
133 + }
134 + messagingService.registerHandler(
135 + HEARTBEAT_MESSAGE,
136 + new HeartbeatMessageHandler(),
137 + heartBeatMessageHandler);
138 +
139 + eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
140 + failureDetector = new PhiAccrualFailureDetector();
141 +
142 + heartBeatSender.scheduleWithFixedDelay(
143 + this::heartbeat,
144 + 0,
145 + HEARTBEAT_INTERVAL_MS,
146 + TimeUnit.MILLISECONDS);
147 +
148 + log.info("Started");
149 + }
150 +
151 + @Deactivate
152 + public void deactivate() {
153 + try {
154 + messagingService.deactivate();
155 + } catch (Exception e) {
156 + log.trace("Failed to cleanly shutdown cluster membership messaging", e);
157 + }
158 +
159 + heartBeatSender.shutdown();
160 + heartBeatMessageHandler.shutdown();
161 + eventDispatcher.removeSink(ClusterEvent.class);
162 +
163 + log.info("Stopped");
164 + }
165 +
166 + @Override
167 + public ControllerNode getLocalNode() {
168 + return localNode;
169 + }
170 +
171 + @Override
172 + public Set<ControllerNode> getNodes() {
173 + return ImmutableSet.copyOf(allNodes.values());
174 + }
175 +
176 + @Override
177 + public ControllerNode getNode(NodeId nodeId) {
178 + checkNotNull(nodeId, INSTANCE_ID_NULL);
179 + return allNodes.get(nodeId);
180 + }
181 +
182 + @Override
183 + public State getState(NodeId nodeId) {
184 + checkNotNull(nodeId, INSTANCE_ID_NULL);
185 + return nodeStates.get(nodeId);
186 + }
187 +
188 + @Override
189 + public void addListener(ClusterEventListener listener) {
190 + checkNotNull(listener, "Listener must not be null");
191 + listenerRegistry.addListener(listener);
192 + }
193 +
194 + @Override
195 + public void removeListener(ClusterEventListener listener) {
196 + checkNotNull(listener, "Listener must not be null");
197 + listenerRegistry.removeListener(listener);
198 + }
199 +
200 + @Override
201 + public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
202 + checkNotNull(nodeId, INSTANCE_ID_NULL);
203 + checkNotNull(ip, "IP address must not be null");
204 + checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
205 + ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
206 + allNodes.put(node.id(), node);
207 + nodeStates.put(nodeId, State.INACTIVE);
208 + eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
209 + return node;
210 + }
211 +
212 + @Override
213 + public void removeNode(NodeId nodeId) {
214 + checkNotNull(nodeId, INSTANCE_ID_NULL);
215 + ControllerNode node = allNodes.remove(nodeId);
216 + if (node != null) {
217 + nodeStates.remove(nodeId);
218 + eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
219 + }
220 + }
221 +
222 + private void establishSelfIdentity() {
223 + try {
224 + IpAddress ip = findLocalIp();
225 + localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
226 + allNodes.put(localNode.id(), localNode);
227 + nodeStates.put(localNode.id(), State.ACTIVE);
228 + log.info("Local Node: {}", localNode);
229 + } catch (SocketException e) {
230 + throw new IllegalStateException("Cannot determine local IP", e);
231 + }
232 + }
233 +
234 + private void heartbeat() {
235 + try {
236 + Set<ControllerNode> peers = allNodes.values()
237 + .stream()
238 + .filter(node -> !(node.id().equals(localNode.id())))
239 + .collect(Collectors.toSet());
240 + byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
241 + peers.forEach((node) -> {
242 + heartbeatToPeer(hbMessagePayload, node);
243 + State currentState = nodeStates.get(node.id());
244 + double phi = failureDetector.phi(node.id());
245 + if (phi >= PHI_FAILURE_THRESHOLD) {
246 + if (currentState == State.ACTIVE) {
247 + nodeStates.put(node.id(), State.INACTIVE);
248 + notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
249 + }
250 + } else {
251 + if (currentState == State.INACTIVE) {
252 + nodeStates.put(node.id(), State.ACTIVE);
253 + notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
254 + }
255 + }
256 + });
257 + } catch (Exception e) {
258 + log.trace("Failed to send heartbeat", e);
259 + }
260 + }
261 +
262 + private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
263 + if (newState == State.ACTIVE) {
264 + eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, allNodes.get(nodeId)));
265 + } else {
266 + eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, allNodes.get(nodeId)));
267 + }
268 + }
269 +
270 + private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
271 + Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
272 + try {
273 + messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
274 + } catch (IOException e) {
275 + log.trace("Sending heartbeat to {} failed", remoteEp, e);
276 + }
277 + }
278 +
279 + private class HeartbeatMessageHandler implements MessageHandler {
280 + @Override
281 + public void handle(Message message) throws IOException {
282 + HeartbeatMessage hb = SERIALIZER.decode(message.payload());
283 + failureDetector.report(hb.source().id());
284 + hb.knownPeers().forEach(node -> {
285 + allNodes.put(node.id(), node);
286 + });
287 + }
288 + }
289 +
290 + private class HeartbeatMessage {
291 + private ControllerNode source;
292 + private Set<ControllerNode> knownPeers;
293 +
294 + public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
295 + this.source = source;
296 + this.knownPeers = ImmutableSet.copyOf(members);
297 + }
298 +
299 + public ControllerNode source() {
300 + return source;
301 + }
302 +
303 + public Set<ControllerNode> knownPeers() {
304 + return knownPeers;
305 + }
306 + }
307 +
308 + private IpAddress findLocalIp() throws SocketException {
309 + NetworkInterface ni = NetworkInterface.getByName("eth0");
310 + Enumeration<InetAddress> inetAddresses = ni.getInetAddresses();
311 +
312 + while (inetAddresses.hasMoreElements()) {
313 + InetAddress ia = inetAddresses.nextElement();
314 + if (!ia.isLinkLocalAddress()) {
315 + return IpAddress.valueOf(ia);
316 + }
317 + }
318 + throw new IllegalStateException("Unable to determine local ip");
319 + }
320 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onosproject.store.cluster.impl;
2 +
3 +import static com.google.common.base.Preconditions.checkArgument;
4 +import static com.google.common.base.Preconditions.checkNotNull;
5 +
6 +import java.util.Map;
7 +
8 +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
9 +import org.onosproject.cluster.NodeId;
10 +
11 +import com.google.common.collect.Maps;
12 +
13 +/**
14 + * Phi Accrual failure detector.
15 + * <p>
16 + * Based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
17 + */
18 +public class PhiAccrualFailureDetector {
19 + private final Map<NodeId, History> states = Maps.newConcurrentMap();
20 +
21 + // TODO: make these configurable.
22 + private static final int WINDOW_SIZE = 250;
23 + private static final int MIN_SAMPLES = 25;
24 +
25 + // If a node does not have any heartbeats, this is the phi
26 + // value to report. Indicates the node is inactive (from the
27 + // detectors perspective.
28 + private static final double BOOTSTRAP_PHI_VALUE = 100.0;
29 +
30 + /**
31 + * Report a new heart beat for the specified node id.
32 + * @param nodeId node id
33 + */
34 + public void report(NodeId nodeId) {
35 + report(nodeId, System.currentTimeMillis());
36 + }
37 +
38 + /**
39 + * Report a new heart beat for the specified node id.
40 + * @param nodeId node id
41 + * @param arrivalTime arrival time
42 + */
43 + public void report(NodeId nodeId, long arrivalTime) {
44 + checkNotNull(nodeId, "NodeId must not be null");
45 + checkArgument(arrivalTime >= 0, "arrivalTime must not be negative");
46 + History nodeState =
47 + states.computeIfAbsent(nodeId, key -> new History());
48 + synchronized (nodeState) {
49 + long latestHeartbeat = nodeState.latestHeartbeatTime();
50 + if (latestHeartbeat != -1) {
51 + nodeState.samples().addValue(arrivalTime - latestHeartbeat);
52 + }
53 + nodeState.setLatestHeartbeatTime(arrivalTime);
54 + }
55 + }
56 +
57 + /**
58 + * Compute phi for the specified node id.
59 + * @param nodeId node id
60 + * @return phi value
61 + */
62 + public Double phi(NodeId nodeId) {
63 + if (!states.containsKey(nodeId)) {
64 + return BOOTSTRAP_PHI_VALUE;
65 + }
66 + checkNotNull(nodeId, "NodeId must not be null");
67 + History nodeState = states.get(nodeId);
68 + synchronized (nodeState) {
69 + long latestHeartbeat = nodeState.latestHeartbeatTime();
70 + DescriptiveStatistics samples = nodeState.samples();
71 + if (latestHeartbeat == -1 || samples.getN() < MIN_SAMPLES) {
72 + return 0.0;
73 + }
74 + return computePhi(samples, latestHeartbeat, System.currentTimeMillis());
75 + }
76 + }
77 +
78 + private double computePhi(DescriptiveStatistics samples, long tLast, long tNow) {
79 + long size = samples.getN();
80 + long t = tNow - tLast;
81 + return (size > 0)
82 + ? (1.0 / Math.log(10.0)) * t / samples.getMean()
83 + : BOOTSTRAP_PHI_VALUE;
84 + }
85 +
86 + private static class History {
87 + DescriptiveStatistics samples =
88 + new DescriptiveStatistics(WINDOW_SIZE);
89 + long lastHeartbeatTime = -1;
90 +
91 + public DescriptiveStatistics samples() {
92 + return samples;
93 + }
94 +
95 + public long latestHeartbeatTime() {
96 + return lastHeartbeatTime;
97 + }
98 +
99 + public void setLatestHeartbeatTime(long value) {
100 + lastHeartbeatTime = value;
101 + }
102 + }
103 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
33 <bundle>mvn:io.netty/netty-codec/4.0.23.Final</bundle> 33 <bundle>mvn:io.netty/netty-codec/4.0.23.Final</bundle>
34 <bundle>mvn:io.netty/netty-transport-native-epoll/4.0.23.Final</bundle> 34 <bundle>mvn:io.netty/netty-transport-native-epoll/4.0.23.Final</bundle>
35 <bundle>mvn:commons-pool/commons-pool/1.6</bundle> 35 <bundle>mvn:commons-pool/commons-pool/1.6</bundle>
36 + <bundle>mvn:org.apache.commons/commons-math3/3.2</bundle>
36 37
37 <bundle>mvn:joda-time/joda-time/2.5</bundle> 38 <bundle>mvn:joda-time/joda-time/2.5</bundle>
38 39
......