Ayaka Koshibe
Committed by Madan Jampani

Refactored ClusterManager as proper fix for Karaf clean issue (Topic phi-fd-on)

Change-Id: Ibb328d73412855dd2d44ca6b734f738ae2996873
1 -/*
2 - * Copyright 2015 Open Networking Laboratory
3 - *
4 - * Licensed under the Apache License, Version 2.0 (the "License");
5 - * you may not use this file except in compliance with the License.
6 - * You may obtain a copy of the License at
7 - *
8 - * http://www.apache.org/licenses/LICENSE-2.0
9 - *
10 - * Unless required by applicable law or agreed to in writing, software
11 - * distributed under the License is distributed on an "AS IS" BASIS,
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 - * See the License for the specific language governing permissions and
14 - * limitations under the License.
15 - */
16 -package org.onosproject.store.cluster.impl;
17 -
18 -import static org.onlab.util.Tools.groupedThreads;
19 -import static org.slf4j.LoggerFactory.getLogger;
20 -
21 -import java.io.File;
22 -import java.io.IOException;
23 -import java.net.InetAddress;
24 -import java.net.NetworkInterface;
25 -import java.net.SocketException;
26 -import java.util.Enumeration;
27 -import java.util.Map;
28 -import java.util.Set;
29 -import java.util.concurrent.ExecutorService;
30 -import java.util.concurrent.Executors;
31 -import java.util.concurrent.ScheduledExecutorService;
32 -import java.util.concurrent.TimeUnit;
33 -import java.util.stream.Collectors;
34 -
35 -import org.apache.felix.scr.annotations.Activate;
36 -import org.apache.felix.scr.annotations.Component;
37 -import org.apache.felix.scr.annotations.Deactivate;
38 -import org.apache.felix.scr.annotations.Reference;
39 -import org.apache.felix.scr.annotations.ReferenceCardinality;
40 -import org.apache.felix.scr.annotations.Service;
41 -import org.onlab.netty.Endpoint;
42 -import org.onlab.netty.Message;
43 -import org.onlab.netty.MessageHandler;
44 -import org.onlab.netty.NettyMessagingService;
45 -import org.onlab.packet.IpAddress;
46 -import org.onlab.util.KryoNamespace;
47 -import org.onosproject.cluster.ClusterAdminService;
48 -import org.onosproject.cluster.ClusterEvent;
49 -import org.onosproject.cluster.ClusterEventListener;
50 -import org.onosproject.cluster.ClusterService;
51 -import org.onosproject.cluster.ControllerNode;
52 -import org.onosproject.cluster.ControllerNode.State;
53 -import org.onosproject.cluster.DefaultControllerNode;
54 -import org.onosproject.cluster.NodeId;
55 -import org.onosproject.event.AbstractListenerRegistry;
56 -import org.onosproject.event.EventDeliveryService;
57 -import org.onosproject.store.serializers.KryoNamespaces;
58 -import org.onosproject.store.serializers.KryoSerializer;
59 -import org.slf4j.Logger;
60 -
61 -import com.google.common.collect.ImmutableSet;
62 -import com.google.common.collect.Maps;
63 -import com.hazelcast.util.AddressUtil;
64 -
65 -import static com.google.common.base.Preconditions.checkNotNull;
66 -import static com.google.common.base.Preconditions.checkArgument;
67 -
68 -/**
69 - * ClusterService implementation that employs an accrual failure
70 - * detector to identify cluster member up/down status.
71 - */
72 -@Component(immediate = true, enabled = false)
73 -@Service
74 -public class ClusterManager implements ClusterService, ClusterAdminService {
75 -
76 - private final Logger log = getLogger(getClass());
77 -
78 - protected final AbstractListenerRegistry<ClusterEvent, ClusterEventListener>
79 - listenerRegistry = new AbstractListenerRegistry<>();
80 -
81 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 - protected EventDeliveryService eventDispatcher;
83 -
84 - // TODO: make these configurable.
85 - private static final int HEARTBEAT_FD_PORT = 2419;
86 - private static final int HEARTBEAT_INTERVAL_MS = 100;
87 - private static final int PHI_FAILURE_THRESHOLD = 10;
88 -
89 - private static final String CONFIG_DIR = "../config";
90 - private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
91 -
92 - private ClusterDefinition clusterDefinition;
93 -
94 - private Set<ControllerNode> seedNodes;
95 - private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
96 - private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
97 - private NettyMessagingService messagingService = new NettyMessagingService();
98 - private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
99 - groupedThreads("onos/cluster/membership", "heartbeat-sender"));
100 - private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
101 - groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
102 -
103 - private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
104 -
105 -
106 - private PhiAccrualFailureDetector failureDetector;
107 -
108 - private ControllerNode localNode;
109 -
110 - private static final KryoSerializer SERIALIZER = new KryoSerializer() {
111 - @Override
112 - protected void setupKryoPool() {
113 - serializerPool = KryoNamespace.newBuilder()
114 - .register(KryoNamespaces.API)
115 - .register(HeartbeatMessage.class)
116 - .build()
117 - .populate(1);
118 - }
119 - };
120 -
121 - private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
122 -
123 - @Activate
124 - public void activate() {
125 -
126 - File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
127 -
128 - try {
129 - clusterDefinition = new ClusterDefinitionStore(clusterDefinitionFile.getPath()).read();
130 - seedNodes = ImmutableSet.copyOf(clusterDefinition.getNodes())
131 - .stream()
132 - .map(nodeInfo -> new DefaultControllerNode(
133 - new NodeId(nodeInfo.getId()),
134 - IpAddress.valueOf(nodeInfo.getIp()),
135 - nodeInfo.getTcpPort()))
136 - .collect(Collectors.toSet());
137 - } catch (IOException e) {
138 - throw new IllegalStateException("Failed to read cluster definition.", e);
139 - }
140 -
141 - seedNodes.forEach(node -> {
142 - allNodes.put(node.id(), node);
143 - nodeStates.put(node.id(), State.INACTIVE);
144 - });
145 -
146 - establishSelfIdentity();
147 -
148 - messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
149 -
150 - try {
151 - messagingService.activate();
152 - } catch (InterruptedException e) {
153 - Thread.currentThread().interrupt();
154 - throw new IllegalStateException("Failed to cleanly initialize membership and"
155 - + " failure detector communication channel.", e);
156 - }
157 - messagingService.registerHandler(
158 - HEARTBEAT_MESSAGE,
159 - new HeartbeatMessageHandler(),
160 - heartBeatMessageHandler);
161 -
162 - eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
163 - failureDetector = new PhiAccrualFailureDetector();
164 -
165 - heartBeatSender.scheduleWithFixedDelay(
166 - this::heartbeat,
167 - 0,
168 - HEARTBEAT_INTERVAL_MS,
169 - TimeUnit.MILLISECONDS);
170 -
171 - log.info("Started");
172 - }
173 -
174 - @Deactivate
175 - public void deactivate() {
176 - try {
177 - messagingService.deactivate();
178 - } catch (Exception e) {
179 - log.trace("Failed to cleanly shutdown cluster membership messaging", e);
180 - }
181 -
182 - heartBeatSender.shutdownNow();
183 - heartBeatMessageHandler.shutdownNow();
184 - eventDispatcher.removeSink(ClusterEvent.class);
185 -
186 - log.info("Stopped");
187 - }
188 -
189 - @Override
190 - public ControllerNode getLocalNode() {
191 - return localNode;
192 - }
193 -
194 - @Override
195 - public Set<ControllerNode> getNodes() {
196 - return ImmutableSet.copyOf(allNodes.values());
197 - }
198 -
199 - @Override
200 - public ControllerNode getNode(NodeId nodeId) {
201 - checkNotNull(nodeId, INSTANCE_ID_NULL);
202 - return allNodes.get(nodeId);
203 - }
204 -
205 - @Override
206 - public State getState(NodeId nodeId) {
207 - checkNotNull(nodeId, INSTANCE_ID_NULL);
208 - return nodeStates.get(nodeId);
209 - }
210 -
211 - @Override
212 - public void addListener(ClusterEventListener listener) {
213 - checkNotNull(listener, "Listener must not be null");
214 - listenerRegistry.addListener(listener);
215 - }
216 -
217 - @Override
218 - public void removeListener(ClusterEventListener listener) {
219 - checkNotNull(listener, "Listener must not be null");
220 - listenerRegistry.removeListener(listener);
221 - }
222 -
223 - @Override
224 - public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
225 - checkNotNull(nodeId, INSTANCE_ID_NULL);
226 - checkNotNull(ip, "IP address must not be null");
227 - checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
228 - ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
229 - allNodes.put(node.id(), node);
230 - nodeStates.put(nodeId, State.INACTIVE);
231 - eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
232 - return node;
233 - }
234 -
235 - @Override
236 - public void removeNode(NodeId nodeId) {
237 - checkNotNull(nodeId, INSTANCE_ID_NULL);
238 - ControllerNode node = allNodes.remove(nodeId);
239 - if (node != null) {
240 - nodeStates.remove(nodeId);
241 - eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
242 - }
243 - }
244 -
245 - private void establishSelfIdentity() {
246 - try {
247 - IpAddress ip = findLocalIp();
248 - localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
249 - allNodes.put(localNode.id(), localNode);
250 - nodeStates.put(localNode.id(), State.ACTIVE);
251 - log.info("Local Node: {}", localNode);
252 - } catch (SocketException e) {
253 - throw new IllegalStateException("Cannot determine local IP", e);
254 - }
255 - }
256 -
257 - private void heartbeat() {
258 - try {
259 - Set<ControllerNode> peers = allNodes.values()
260 - .stream()
261 - .filter(node -> !(node.id().equals(localNode.id())))
262 - .collect(Collectors.toSet());
263 - byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
264 - peers.forEach((node) -> {
265 - heartbeatToPeer(hbMessagePayload, node);
266 - State currentState = nodeStates.get(node.id());
267 - double phi = failureDetector.phi(node.id());
268 - if (phi >= PHI_FAILURE_THRESHOLD) {
269 - if (currentState == State.ACTIVE) {
270 - nodeStates.put(node.id(), State.INACTIVE);
271 - notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
272 - }
273 - } else {
274 - if (currentState == State.INACTIVE) {
275 - nodeStates.put(node.id(), State.ACTIVE);
276 - notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
277 - }
278 - }
279 - });
280 - } catch (Exception e) {
281 - log.debug("Failed to send heartbeat", e);
282 - }
283 - }
284 -
285 - private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
286 - if (newState == State.ACTIVE) {
287 - eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, allNodes.get(nodeId)));
288 - } else {
289 - eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, allNodes.get(nodeId)));
290 - }
291 - }
292 -
293 - private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
294 - Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
295 - try {
296 - messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
297 - } catch (IOException e) {
298 - log.debug("Sending heartbeat to {} failed", remoteEp, e);
299 - }
300 - }
301 -
302 - private class HeartbeatMessageHandler implements MessageHandler {
303 - @Override
304 - public void handle(Message message) throws IOException {
305 - HeartbeatMessage hb = SERIALIZER.decode(message.payload());
306 - failureDetector.report(hb.source().id());
307 - hb.knownPeers().forEach(node -> {
308 - allNodes.put(node.id(), node);
309 - });
310 - }
311 - }
312 -
313 - private static class HeartbeatMessage {
314 - private ControllerNode source;
315 - private Set<ControllerNode> knownPeers;
316 -
317 - public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
318 - this.source = source;
319 - this.knownPeers = ImmutableSet.copyOf(members);
320 - }
321 -
322 - public ControllerNode source() {
323 - return source;
324 - }
325 -
326 - public Set<ControllerNode> knownPeers() {
327 - return knownPeers;
328 - }
329 - }
330 -
331 - private IpAddress findLocalIp() throws SocketException {
332 - Enumeration<NetworkInterface> interfaces =
333 - NetworkInterface.getNetworkInterfaces();
334 - while (interfaces.hasMoreElements()) {
335 - NetworkInterface iface = interfaces.nextElement();
336 - Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
337 - while (inetAddresses.hasMoreElements()) {
338 - IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
339 - if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
340 - return ip;
341 - }
342 - }
343 - }
344 - throw new IllegalStateException("Unable to determine local ip");
345 - }
346 -}
...@@ -15,167 +15,323 @@ ...@@ -15,167 +15,323 @@
15 */ 15 */
16 package org.onosproject.store.cluster.impl; 16 package org.onosproject.store.cluster.impl;
17 17
18 -import com.google.common.base.Optional; 18 +import static com.google.common.base.Preconditions.checkArgument;
19 -import com.google.common.cache.LoadingCache; 19 +import static com.google.common.base.Preconditions.checkNotNull;
20 -import com.google.common.collect.ImmutableSet; 20 +import static org.onlab.util.Tools.groupedThreads;
21 -import com.hazelcast.core.IMap; 21 +import static org.slf4j.LoggerFactory.getLogger;
22 -import com.hazelcast.core.Member; 22 +
23 -import com.hazelcast.core.MemberAttributeEvent; 23 +import java.io.File;
24 -import com.hazelcast.core.MembershipEvent; 24 +import java.io.IOException;
25 -import com.hazelcast.core.MembershipListener; 25 +import java.net.InetAddress;
26 +import java.net.NetworkInterface;
27 +import java.net.SocketException;
28 +import java.util.Enumeration;
29 +import java.util.Map;
30 +import java.util.Set;
31 +import java.util.concurrent.ExecutorService;
32 +import java.util.concurrent.Executors;
33 +import java.util.concurrent.ScheduledExecutorService;
34 +import java.util.concurrent.TimeUnit;
35 +import java.util.stream.Collectors;
26 36
27 import org.apache.felix.scr.annotations.Activate; 37 import org.apache.felix.scr.annotations.Activate;
28 import org.apache.felix.scr.annotations.Component; 38 import org.apache.felix.scr.annotations.Component;
29 import org.apache.felix.scr.annotations.Deactivate; 39 import org.apache.felix.scr.annotations.Deactivate;
30 import org.apache.felix.scr.annotations.Service; 40 import org.apache.felix.scr.annotations.Service;
41 +import org.onlab.netty.Endpoint;
42 +import org.onlab.netty.Message;
43 +import org.onlab.netty.MessageHandler;
44 +import org.onlab.netty.NettyMessagingService;
45 +import org.onlab.packet.IpAddress;
46 +import org.onlab.util.KryoNamespace;
31 import org.onosproject.cluster.ClusterEvent; 47 import org.onosproject.cluster.ClusterEvent;
32 import org.onosproject.cluster.ClusterStore; 48 import org.onosproject.cluster.ClusterStore;
33 import org.onosproject.cluster.ClusterStoreDelegate; 49 import org.onosproject.cluster.ClusterStoreDelegate;
34 import org.onosproject.cluster.ControllerNode; 50 import org.onosproject.cluster.ControllerNode;
35 import org.onosproject.cluster.DefaultControllerNode; 51 import org.onosproject.cluster.DefaultControllerNode;
52 +import org.onosproject.cluster.ControllerNode.State;
36 import org.onosproject.cluster.NodeId; 53 import org.onosproject.cluster.NodeId;
37 -import org.onosproject.store.hz.AbsentInvalidatingLoadingCache; 54 +import org.onosproject.store.AbstractStore;
38 -import org.onosproject.store.hz.AbstractHazelcastStore; 55 +import org.onosproject.store.serializers.KryoNamespaces;
39 -import org.onosproject.store.hz.OptionalCacheLoader; 56 +import org.onosproject.store.serializers.KryoSerializer;
40 -import org.onlab.packet.IpAddress; 57 +import org.slf4j.Logger;
41 -
42 -import java.util.Map;
43 -import java.util.Set;
44 -import java.util.concurrent.ConcurrentHashMap;
45 58
46 -import static com.google.common.cache.CacheBuilder.newBuilder; 59 +import com.google.common.collect.ImmutableSet;
47 -import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED; 60 +import com.google.common.collect.Maps;
48 -import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED; 61 +import com.hazelcast.util.AddressUtil;
49 -import static org.onosproject.cluster.ControllerNode.State;
50 62
51 -/**
52 - * Distributed implementation of the cluster nodes store.
53 - */
54 @Component(immediate = true) 63 @Component(immediate = true)
55 @Service 64 @Service
65 +/**
66 + * Distributed cluster nodes store that employs an accrual failure
67 + * detector to identify cluster member up/down status.
68 + */
56 public class DistributedClusterStore 69 public class DistributedClusterStore
57 - extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate> 70 + extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
58 implements ClusterStore { 71 implements ClusterStore {
59 72
60 - private IMap<byte[], byte[]> rawNodes; 73 + private final Logger log = getLogger(DistributedClusterStore.class);
61 - private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
62 74
63 - private String listenerId; 75 + // TODO: make these configurable.
64 - private final MembershipListener listener = new InternalMembershipListener(); 76 + private static final int HEARTBEAT_FD_PORT = 2419;
65 - private final Map<NodeId, State> states = new ConcurrentHashMap<>(); 77 + private static final int HEARTBEAT_INTERVAL_MS = 100;
78 + private static final int PHI_FAILURE_THRESHOLD = 10;
66 79
67 - private String nodesListenerId; 80 + private static final String CONFIG_DIR = "../config";
81 + private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
82 + private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
68 83
84 + private static final KryoSerializer SERIALIZER = new KryoSerializer() {
69 @Override 85 @Override
86 + protected void setupKryoPool() {
87 + serializerPool = KryoNamespace.newBuilder()
88 + .register(KryoNamespaces.API)
89 + .register(HeartbeatMessage.class)
90 + .build()
91 + .populate(1);
92 + }
93 + };
94 +
95 + private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
96 +
97 + private ClusterDefinition clusterDefinition;
98 +
99 + private Set<ControllerNode> seedNodes;
100 + private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
101 + private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
102 + private NettyMessagingService messagingService = new NettyMessagingService();
103 + private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
104 + groupedThreads("onos/cluster/membership", "heartbeat-sender"));
105 + private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
106 + groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
107 +
108 + private PhiAccrualFailureDetector failureDetector;
109 +
110 + private ControllerNode localNode;
111 +
70 @Activate 112 @Activate
71 public void activate() { 113 public void activate() {
72 - super.activate(); 114 + File clusterDefinitionFile = new File(CONFIG_DIR,
73 - listenerId = theInstance.getCluster().addMembershipListener(listener); 115 + CLUSTER_DEFINITION_FILE);
74 116
75 - rawNodes = theInstance.getMap("nodes"); 117 + try {
76 - OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader 118 + clusterDefinition = new ClusterDefinitionStore(
77 - = new OptionalCacheLoader<>(serializer, rawNodes); 119 + clusterDefinitionFile.getPath()).read();
78 - nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); 120 + seedNodes = ImmutableSet
79 - nodesListenerId = rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true); 121 + .copyOf(clusterDefinition.getNodes())
122 + .stream()
123 + .map(nodeInfo -> new DefaultControllerNode(new NodeId(
124 + nodeInfo.getId()), IpAddress.valueOf(nodeInfo
125 + .getIp()), nodeInfo.getTcpPort()))
126 + .collect(Collectors.toSet());
127 + } catch (IOException e) {
128 + throw new IllegalStateException(
129 + "Failed to read cluster definition.", e);
130 + }
80 131
81 - loadClusterNodes(); 132 + seedNodes.forEach(node -> {
133 + allNodes.put(node.id(), node);
134 + nodeStates.put(node.id(), State.INACTIVE);
135 + });
82 136
83 - log.info("Started"); 137 + establishSelfIdentity();
84 - } 138 +
139 + messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
85 140
86 - // Loads the initial set of cluster nodes 141 + try {
87 - private void loadClusterNodes() { 142 + messagingService.activate();
88 - for (Member member : theInstance.getCluster().getMembers()) { 143 + } catch (InterruptedException e) {
89 - addNode(node(member)); 144 + Thread.currentThread().interrupt();
145 + throw new IllegalStateException(
146 + "Failed to cleanly initialize membership and"
147 + + " failure detector communication channel.", e);
90 } 148 }
149 + messagingService.registerHandler(HEARTBEAT_MESSAGE,
150 + new HeartbeatMessageHandler(), heartBeatMessageHandler);
151 +
152 + failureDetector = new PhiAccrualFailureDetector();
153 +
154 + heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
155 + HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
156 +
157 + log.info("Started");
91 } 158 }
92 159
93 @Deactivate 160 @Deactivate
94 public void deactivate() { 161 public void deactivate() {
95 - rawNodes.removeEntryListener(nodesListenerId); 162 + try {
96 - theInstance.getCluster().removeMembershipListener(listenerId); 163 + messagingService.deactivate();
164 + } catch (Exception e) {
165 + log.trace("Failed to cleanly shutdown cluster membership messaging", e);
166 + }
167 +
168 + heartBeatSender.shutdownNow();
169 + heartBeatMessageHandler.shutdownNow();
170 +
97 log.info("Stopped"); 171 log.info("Stopped");
98 } 172 }
99 173
100 @Override 174 @Override
175 + public void setDelegate(ClusterStoreDelegate delegate) {
176 + checkNotNull(delegate, "Delegate cannot be null");
177 + this.delegate = delegate;
178 + }
179 +
180 + @Override
181 + public void unsetDelegate(ClusterStoreDelegate delegate) {
182 + this.delegate = null;
183 + }
184 +
185 + @Override
186 + public boolean hasDelegate() {
187 + return this.delegate != null;
188 + }
189 +
190 + @Override
101 public ControllerNode getLocalNode() { 191 public ControllerNode getLocalNode() {
102 - return node(theInstance.getCluster().getLocalMember()); 192 + return localNode;
103 } 193 }
104 194
105 @Override 195 @Override
106 public Set<ControllerNode> getNodes() { 196 public Set<ControllerNode> getNodes() {
107 - ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder(); 197 + return ImmutableSet.copyOf(allNodes.values());
108 - for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
109 - builder.add(optional.get());
110 - }
111 - return builder.build();
112 } 198 }
113 199
114 @Override 200 @Override
115 public ControllerNode getNode(NodeId nodeId) { 201 public ControllerNode getNode(NodeId nodeId) {
116 - return nodes.getUnchecked(nodeId).orNull(); 202 + checkNotNull(nodeId, INSTANCE_ID_NULL);
203 + return allNodes.get(nodeId);
117 } 204 }
118 205
119 @Override 206 @Override
120 public State getState(NodeId nodeId) { 207 public State getState(NodeId nodeId) {
121 - State state = states.get(nodeId); 208 + checkNotNull(nodeId, INSTANCE_ID_NULL);
122 - return state == null ? State.INACTIVE : state; 209 + return nodeStates.get(nodeId);
123 } 210 }
124 211
125 @Override 212 @Override
126 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) { 213 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
127 - return addNode(new DefaultControllerNode(nodeId, ip, tcpPort)); 214 + checkNotNull(nodeId, INSTANCE_ID_NULL);
215 + checkNotNull(ip, "IP address must not be null");
216 + checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
217 + ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
218 + allNodes.put(node.id(), node);
219 + nodeStates.put(nodeId, State.INACTIVE);
220 + delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
221 + return node;
128 } 222 }
129 223
130 @Override 224 @Override
131 public void removeNode(NodeId nodeId) { 225 public void removeNode(NodeId nodeId) {
132 - synchronized (this) { 226 + checkNotNull(nodeId, INSTANCE_ID_NULL);
133 - rawNodes.remove(serialize(nodeId)); 227 + ControllerNode node = allNodes.remove(nodeId);
134 - nodes.invalidate(nodeId); 228 + if (node != null) {
229 + nodeStates.remove(nodeId);
230 + delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
135 } 231 }
136 } 232 }
137 233
138 - // Adds a new node based on the specified member 234 + private void establishSelfIdentity() {
139 - private synchronized ControllerNode addNode(DefaultControllerNode node) { 235 + try {
140 - rawNodes.put(serialize(node.id()), serialize(node)); 236 + IpAddress ip = findLocalIp();
141 - nodes.put(node.id(), Optional.of(node)); 237 + localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
142 - states.put(node.id(), State.ACTIVE); 238 + allNodes.put(localNode.id(), localNode);
143 - return node; 239 + nodeStates.put(localNode.id(), State.ACTIVE);
240 + log.info("Local Node: {}", localNode);
241 + } catch (SocketException e) {
242 + throw new IllegalStateException("Cannot determine local IP", e);
243 + }
244 + }
245 +
246 + private void heartbeat() {
247 + try {
248 + Set<ControllerNode> peers = allNodes.values()
249 + .stream()
250 + .filter(node -> !(node.id().equals(localNode.id())))
251 + .collect(Collectors.toSet());
252 + byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
253 + peers.forEach((node) -> {
254 + heartbeatToPeer(hbMessagePayload, node);
255 + State currentState = nodeStates.get(node.id());
256 + double phi = failureDetector.phi(node.id());
257 + if (phi >= PHI_FAILURE_THRESHOLD) {
258 + if (currentState == State.ACTIVE) {
259 + nodeStates.put(node.id(), State.INACTIVE);
260 + notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
261 + }
262 + } else {
263 + if (currentState == State.INACTIVE) {
264 + nodeStates.put(node.id(), State.ACTIVE);
265 + notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
266 + }
267 + }
268 + });
269 + } catch (Exception e) {
270 + log.debug("Failed to send heartbeat", e);
271 + }
144 } 272 }
145 273
146 - // Creates a controller node descriptor from the Hazelcast member. 274 + private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
147 - private DefaultControllerNode node(Member member) { 275 + ControllerNode node = allNodes.get(nodeId);
148 - IpAddress ip = memberAddress(member); 276 + if (newState == State.ACTIVE) {
149 - return new DefaultControllerNode(new NodeId(ip.toString()), ip); 277 + delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
278 + } else {
279 + delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
280 + }
150 } 281 }
151 282
152 - private IpAddress memberAddress(Member member) { 283 + private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
153 - return IpAddress.valueOf(member.getSocketAddress().getAddress()); 284 + Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
285 + try {
286 + messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
287 + } catch (IOException e) {
288 + log.debug("Sending heartbeat to {} failed", remoteEp, e);
289 + }
154 } 290 }
155 291
156 - // Interceptor for membership events. 292 + private IpAddress findLocalIp() throws SocketException {
157 - private class InternalMembershipListener implements MembershipListener { 293 + Enumeration<NetworkInterface> interfaces =
158 - @Override 294 + NetworkInterface.getNetworkInterfaces();
159 - public void memberAdded(MembershipEvent membershipEvent) { 295 + while (interfaces.hasMoreElements()) {
160 - log.info("Member {} added", membershipEvent.getMember()); 296 + NetworkInterface iface = interfaces.nextElement();
161 - ControllerNode node = addNode(node(membershipEvent.getMember())); 297 + Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
162 - notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node)); 298 + while (inetAddresses.hasMoreElements()) {
299 + IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
300 + if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
301 + return ip;
302 + }
303 + }
304 + }
305 + throw new IllegalStateException("Unable to determine local ip");
163 } 306 }
164 307
308 + private class HeartbeatMessageHandler implements MessageHandler {
165 @Override 309 @Override
166 - public void memberRemoved(MembershipEvent membershipEvent) { 310 + public void handle(Message message) throws IOException {
167 - log.info("Member {} removed", membershipEvent.getMember()); 311 + HeartbeatMessage hb = SERIALIZER.decode(message.payload());
168 - NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString()); 312 + failureDetector.report(hb.source().id());
169 - states.put(nodeId, State.INACTIVE); 313 + hb.knownPeers().forEach(node -> {
170 - notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId))); 314 + allNodes.put(node.id(), node);
315 + });
316 + }
171 } 317 }
172 318
173 - @Override 319 + private static class HeartbeatMessage {
174 - public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) { 320 + private ControllerNode source;
175 - log.info("Member {} attribute {} changed to {}", 321 + private Set<ControllerNode> knownPeers;
176 - memberAttributeEvent.getMember(), 322 +
177 - memberAttributeEvent.getKey(), 323 + public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
178 - memberAttributeEvent.getValue()); 324 + this.source = source;
325 + this.knownPeers = ImmutableSet.copyOf(members);
326 + }
327 +
328 + public ControllerNode source() {
329 + return source;
330 + }
331 +
332 + public Set<ControllerNode> knownPeers() {
333 + return knownPeers;
179 } 334 }
180 } 335 }
336 +
181 } 337 }
......
1 +/*
2 + * Copyright 2014 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.cluster.impl;
17 +
18 +import com.google.common.base.Optional;
19 +import com.google.common.cache.LoadingCache;
20 +import com.google.common.collect.ImmutableSet;
21 +import com.hazelcast.core.IMap;
22 +import com.hazelcast.core.Member;
23 +import com.hazelcast.core.MemberAttributeEvent;
24 +import com.hazelcast.core.MembershipEvent;
25 +import com.hazelcast.core.MembershipListener;
26 +
27 +import org.apache.felix.scr.annotations.Activate;
28 +import org.apache.felix.scr.annotations.Component;
29 +import org.apache.felix.scr.annotations.Deactivate;
30 +import org.apache.felix.scr.annotations.Service;
31 +import org.onosproject.cluster.ClusterEvent;
32 +import org.onosproject.cluster.ClusterStore;
33 +import org.onosproject.cluster.ClusterStoreDelegate;
34 +import org.onosproject.cluster.ControllerNode;
35 +import org.onosproject.cluster.DefaultControllerNode;
36 +import org.onosproject.cluster.NodeId;
37 +import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
38 +import org.onosproject.store.hz.AbstractHazelcastStore;
39 +import org.onosproject.store.hz.OptionalCacheLoader;
40 +import org.onlab.packet.IpAddress;
41 +
42 +import java.util.Map;
43 +import java.util.Set;
44 +import java.util.concurrent.ConcurrentHashMap;
45 +
46 +import static com.google.common.cache.CacheBuilder.newBuilder;
47 +import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
48 +import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
49 +import static org.onosproject.cluster.ControllerNode.State;
50 +
51 +/**
52 + * Distributed, Hazelcast-based implementation of the cluster nodes store.
53 + */
54 +@Component(immediate = true, enabled = false)
55 +@Service
56 +public class HazelcastClusterStore
57 + extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
58 + implements ClusterStore {
59 +
60 + private IMap<byte[], byte[]> rawNodes;
61 + private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
62 +
63 + private String listenerId;
64 + private final MembershipListener listener = new InternalMembershipListener();
65 + private final Map<NodeId, State> states = new ConcurrentHashMap<>();
66 +
67 + private String nodesListenerId;
68 +
69 + @Override
70 + @Activate
71 + public void activate() {
72 + super.activate();
73 + listenerId = theInstance.getCluster().addMembershipListener(listener);
74 +
75 + rawNodes = theInstance.getMap("nodes");
76 + OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
77 + = new OptionalCacheLoader<>(serializer, rawNodes);
78 + nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
79 + nodesListenerId = rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
80 +
81 + loadClusterNodes();
82 +
83 + log.info("Started");
84 + }
85 +
86 + // Loads the initial set of cluster nodes
87 + private void loadClusterNodes() {
88 + for (Member member : theInstance.getCluster().getMembers()) {
89 + addNode(node(member));
90 + }
91 + }
92 +
93 + @Deactivate
94 + public void deactivate() {
95 + rawNodes.removeEntryListener(nodesListenerId);
96 + theInstance.getCluster().removeMembershipListener(listenerId);
97 + log.info("Stopped");
98 + }
99 +
100 + @Override
101 + public ControllerNode getLocalNode() {
102 + return node(theInstance.getCluster().getLocalMember());
103 + }
104 +
105 + @Override
106 + public Set<ControllerNode> getNodes() {
107 + ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
108 + for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
109 + builder.add(optional.get());
110 + }
111 + return builder.build();
112 + }
113 +
114 + @Override
115 + public ControllerNode getNode(NodeId nodeId) {
116 + return nodes.getUnchecked(nodeId).orNull();
117 + }
118 +
119 + @Override
120 + public State getState(NodeId nodeId) {
121 + State state = states.get(nodeId);
122 + return state == null ? State.INACTIVE : state;
123 + }
124 +
125 + @Override
126 + public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
127 + return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
128 + }
129 +
130 + @Override
131 + public void removeNode(NodeId nodeId) {
132 + synchronized (this) {
133 + rawNodes.remove(serialize(nodeId));
134 + nodes.invalidate(nodeId);
135 + }
136 + }
137 +
138 + // Adds a new node based on the specified member
139 + private synchronized ControllerNode addNode(DefaultControllerNode node) {
140 + rawNodes.put(serialize(node.id()), serialize(node));
141 + nodes.put(node.id(), Optional.of(node));
142 + states.put(node.id(), State.ACTIVE);
143 + return node;
144 + }
145 +
146 + // Creates a controller node descriptor from the Hazelcast member.
147 + private DefaultControllerNode node(Member member) {
148 + IpAddress ip = memberAddress(member);
149 + return new DefaultControllerNode(new NodeId(ip.toString()), ip);
150 + }
151 +
152 + private IpAddress memberAddress(Member member) {
153 + return IpAddress.valueOf(member.getSocketAddress().getAddress());
154 + }
155 +
156 + // Interceptor for membership events.
157 + private class InternalMembershipListener implements MembershipListener {
158 + @Override
159 + public void memberAdded(MembershipEvent membershipEvent) {
160 + log.info("Member {} added", membershipEvent.getMember());
161 + ControllerNode node = addNode(node(membershipEvent.getMember()));
162 + notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
163 + }
164 +
165 + @Override
166 + public void memberRemoved(MembershipEvent membershipEvent) {
167 + log.info("Member {} removed", membershipEvent.getMember());
168 + NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
169 + states.put(nodeId, State.INACTIVE);
170 + notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
171 + }
172 +
173 + @Override
174 + public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
175 + log.info("Member {} attribute {} changed to {}",
176 + memberAttributeEvent.getMember(),
177 + memberAttributeEvent.getKey(),
178 + memberAttributeEvent.getValue());
179 + }
180 + }
181 +}