Madan Jampani
Committed by Brian O'Connor

Using ClusterCommunicationService instead of ITopic for notifying cluster member…

…s of leadership events.

Change-Id: I164f30da436f3e4f65c4e938c25bb2aa2faa16c3
......@@ -18,9 +18,6 @@ package org.onosproject.store.cluster.impl;
import com.google.common.collect.Maps;
import com.hazelcast.config.TopicConfig;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -36,6 +33,10 @@ import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.hz.StoreService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
......@@ -71,8 +72,7 @@ import static org.onlab.util.Tools.namedThreads;
*/
@Component(immediate = true)
@Service
public class HazelcastLeadershipService implements LeadershipService,
MessageListener<byte[]> {
public class HazelcastLeadershipService implements LeadershipService {
private static final Logger log =
LoggerFactory.getLogger(HazelcastLeadershipService.class);
......@@ -94,6 +94,9 @@ public class HazelcastLeadershipService implements LeadershipService,
private static final long NO_TERM = 0;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -107,8 +110,8 @@ public class HazelcastLeadershipService implements LeadershipService,
private final Map<String, Topic> topics = Maps.newConcurrentMap();
private NodeId localNodeId;
private ITopic<byte[]> leaderTopic;
private String leaderTopicRegistrationId;
private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
new MessageSubject("hz-leadership-events");
@Activate
protected void activate() {
......@@ -120,8 +123,8 @@ public class HazelcastLeadershipService implements LeadershipService,
topicConfig.setGlobalOrderingEnabled(true);
topicConfig.setName(TOPIC_HZ_ID);
storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
leaderTopic = storeService.getHazelcastInstance().getTopic(TOPIC_HZ_ID);
leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener());
log.info("Hazelcast Leadership Service started");
}
......@@ -129,7 +132,7 @@ public class HazelcastLeadershipService implements LeadershipService,
@Deactivate
protected void deactivate() {
eventDispatcher.removeSink(LeadershipEvent.class);
leaderTopic.removeMessageListener(leaderTopicRegistrationId);
clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
for (Topic topic : topics.values()) {
topic.stop();
......@@ -194,35 +197,6 @@ public class HazelcastLeadershipService implements LeadershipService,
listenerRegistry.removeListener(listener);
}
@Override
public void onMessage(Message<byte[]> message) {
LeadershipEvent leadershipEvent =
SERIALIZER.decode(message.getMessageObject());
log.trace("Leadership Event: time = {} type = {} event = {}",
leadershipEvent.time(), leadershipEvent.type(),
leadershipEvent);
//
// If there is no entry for the topic, then create a new one to
// keep track of the leadership, but don't run for leadership itself.
//
String topicName = leadershipEvent.subject().topic();
Topic topic = topics.get(topicName);
if (topic == null) {
topic = new Topic(topicName);
Topic oldTopic = topics.putIfAbsent(topicName, topic);
if (oldTopic == null) {
// encountered new topic, start periodic processing
topic.start();
} else {
topic = oldTopic;
}
}
topic.receivedLeadershipEvent(leadershipEvent);
eventDispatcher.post(leadershipEvent);
}
/**
* Class for keeping per-topic information.
*/
......@@ -406,7 +380,12 @@ public class HazelcastLeadershipService implements LeadershipService,
LeadershipEvent.Type.LEADER_REELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
// Dispatch to all instances
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
clusterService.getLocalNode().id(),
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
SERIALIZER.encode(leadershipEvent)));
} else {
//
// Test if time to expire a stale leader
......@@ -473,7 +452,11 @@ public class HazelcastLeadershipService implements LeadershipService,
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
clusterService.getLocalNode().id(),
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
SERIALIZER.encode(leadershipEvent)));
}
// Sleep forever until interrupted
......@@ -497,7 +480,11 @@ public class HazelcastLeadershipService implements LeadershipService,
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
clusterCommunicator.broadcastIncludeSelf(
new ClusterMessage(
clusterService.getLocalNode().id(),
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
SERIALIZER.encode(leadershipEvent)));
leaderLock.unlock();
}
}
......@@ -515,4 +502,35 @@ public class HazelcastLeadershipService implements LeadershipService,
oldTerm, newTerm);
}
}
private class InternalLeadershipEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
LeadershipEvent leadershipEvent =
SERIALIZER.decode(message.payload());
log.trace("Leadership Event: time = {} type = {} event = {}",
leadershipEvent.time(), leadershipEvent.type(),
leadershipEvent);
//
// If there is no entry for the topic, then create a new one to
// keep track of the leadership, but don't run for leadership itself.
//
String topicName = leadershipEvent.subject().topic();
Topic topic = topics.get(topicName);
if (topic == null) {
topic = new Topic(topicName);
Topic oldTopic = topics.putIfAbsent(topicName, topic);
if (oldTopic == null) {
// encountered new topic, start periodic processing
topic.start();
} else {
topic = oldTopic;
}
}
topic.receivedLeadershipEvent(leadershipEvent);
eventDispatcher.post(leadershipEvent);
}
}
}
......