notification for mastership changes
Change-Id: I191ccd42ff9f8a41e87cfcda07531e4fbdd923c8
Showing
2 changed files
with
34 additions
and
21 deletions
1 | package org.onlab.onos.store.cluster.impl; | 1 | package org.onlab.onos.store.cluster.impl; |
2 | 2 | ||
3 | -import static com.google.common.cache.CacheBuilder.newBuilder; | ||
4 | import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED; | 3 | import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED; |
5 | 4 | ||
6 | import java.util.Map; | 5 | import java.util.Map; |
... | @@ -20,12 +19,8 @@ import org.onlab.onos.cluster.MastershipTerm; | ... | @@ -20,12 +19,8 @@ import org.onlab.onos.cluster.MastershipTerm; |
20 | import org.onlab.onos.cluster.NodeId; | 19 | import org.onlab.onos.cluster.NodeId; |
21 | import org.onlab.onos.net.DeviceId; | 20 | import org.onlab.onos.net.DeviceId; |
22 | import org.onlab.onos.net.MastershipRole; | 21 | import org.onlab.onos.net.MastershipRole; |
23 | -import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache; | ||
24 | import org.onlab.onos.store.common.AbstractHazelcastStore; | 22 | import org.onlab.onos.store.common.AbstractHazelcastStore; |
25 | -import org.onlab.onos.store.common.OptionalCacheLoader; | ||
26 | 23 | ||
27 | -import com.google.common.base.Optional; | ||
28 | -import com.google.common.cache.LoadingCache; | ||
29 | import com.google.common.collect.ImmutableSet; | 24 | import com.google.common.collect.ImmutableSet; |
30 | import com.hazelcast.core.ILock; | 25 | import com.hazelcast.core.ILock; |
31 | import com.hazelcast.core.IMap; | 26 | import com.hazelcast.core.IMap; |
... | @@ -53,9 +48,6 @@ implements MastershipStore { | ... | @@ -53,9 +48,6 @@ implements MastershipStore { |
53 | //collection of nodes. values are ignored, as it's used as a makeshift 'set' | 48 | //collection of nodes. values are ignored, as it's used as a makeshift 'set' |
54 | protected IMap<byte[], Byte> backups; | 49 | protected IMap<byte[], Byte> backups; |
55 | 50 | ||
56 | - //TODO - remove | ||
57 | - //private LoadingCache<DeviceId, Optional<NodeId>> masters; | ||
58 | - | ||
59 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 51 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
60 | protected ClusterService clusterService; | 52 | protected ClusterService clusterService; |
61 | 53 | ||
... | @@ -68,11 +60,7 @@ implements MastershipStore { | ... | @@ -68,11 +60,7 @@ implements MastershipStore { |
68 | rawTerms = theInstance.getMap("terms"); | 60 | rawTerms = theInstance.getMap("terms"); |
69 | backups = theInstance.getMap("backups"); | 61 | backups = theInstance.getMap("backups"); |
70 | 62 | ||
71 | - //TODO: hook up maps to event notification | 63 | + rawMasters.addEntryListener(new RemoteMasterShipEventHandler(), true); |
72 | - //OptionalCacheLoader<DeviceId, NodeId> nodeLoader | ||
73 | - //= new OptionalCacheLoader<>(kryoSerializationService, rawMasters); | ||
74 | - //masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader)); | ||
75 | - //rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true); | ||
76 | 64 | ||
77 | log.info("Started"); | 65 | log.info("Started"); |
78 | } | 66 | } |
... | @@ -253,27 +241,26 @@ implements MastershipStore { | ... | @@ -253,27 +241,26 @@ implements MastershipStore { |
253 | 241 | ||
254 | //adds node to pool(s) of backup | 242 | //adds node to pool(s) of backup |
255 | private void backup(NodeId nodeId, DeviceId deviceId) { | 243 | private void backup(NodeId nodeId, DeviceId deviceId) { |
256 | - //TODO might be useful to isolate out | 244 | + //TODO might be useful to isolate out this function and reelect() if we |
245 | + //get more backup/election schemes | ||
257 | } | 246 | } |
258 | 247 | ||
259 | - private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> { | 248 | + private class RemoteMasterShipEventHandler extends RemoteEventHandler<DeviceId, NodeId> { |
260 | - public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) { | ||
261 | - super(cache); | ||
262 | - } | ||
263 | 249 | ||
264 | @Override | 250 | @Override |
265 | protected void onAdd(DeviceId deviceId, NodeId nodeId) { | 251 | protected void onAdd(DeviceId deviceId, NodeId nodeId) { |
252 | + //only addition indicates a change in mastership | ||
266 | notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId)); | 253 | notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId)); |
267 | } | 254 | } |
268 | 255 | ||
269 | @Override | 256 | @Override |
270 | protected void onRemove(DeviceId deviceId, NodeId nodeId) { | 257 | protected void onRemove(DeviceId deviceId, NodeId nodeId) { |
271 | - notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId)); | 258 | + //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId)); |
272 | } | 259 | } |
273 | 260 | ||
274 | @Override | 261 | @Override |
275 | protected void onUpdate(DeviceId deviceId, NodeId oldNodeId, NodeId nodeId) { | 262 | protected void onUpdate(DeviceId deviceId, NodeId oldNodeId, NodeId nodeId) { |
276 | - notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId)); | 263 | + //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId)); |
277 | } | 264 | } |
278 | } | 265 | } |
279 | 266 | ... | ... |
... | @@ -6,18 +6,23 @@ import static org.junit.Assert.assertTrue; | ... | @@ -6,18 +6,23 @@ import static org.junit.Assert.assertTrue; |
6 | import static org.onlab.onos.net.MastershipRole.*; | 6 | import static org.onlab.onos.net.MastershipRole.*; |
7 | 7 | ||
8 | import java.util.Set; | 8 | import java.util.Set; |
9 | +import java.util.concurrent.CountDownLatch; | ||
10 | +import java.util.concurrent.TimeUnit; | ||
9 | 11 | ||
10 | import org.junit.After; | 12 | import org.junit.After; |
11 | import org.junit.AfterClass; | 13 | import org.junit.AfterClass; |
12 | import org.junit.Before; | 14 | import org.junit.Before; |
13 | import org.junit.BeforeClass; | 15 | import org.junit.BeforeClass; |
16 | +import org.junit.Ignore; | ||
14 | import org.junit.Test; | 17 | import org.junit.Test; |
15 | import org.onlab.onos.cluster.ClusterEventListener; | 18 | import org.onlab.onos.cluster.ClusterEventListener; |
16 | import org.onlab.onos.cluster.ClusterService; | 19 | import org.onlab.onos.cluster.ClusterService; |
17 | import org.onlab.onos.cluster.ControllerNode; | 20 | import org.onlab.onos.cluster.ControllerNode; |
18 | import org.onlab.onos.cluster.ControllerNode.State; | 21 | import org.onlab.onos.cluster.ControllerNode.State; |
19 | import org.onlab.onos.cluster.DefaultControllerNode; | 22 | import org.onlab.onos.cluster.DefaultControllerNode; |
23 | +import org.onlab.onos.cluster.MastershipEvent; | ||
20 | import org.onlab.onos.cluster.MastershipEvent.Type; | 24 | import org.onlab.onos.cluster.MastershipEvent.Type; |
25 | +import org.onlab.onos.cluster.MastershipStoreDelegate; | ||
21 | import org.onlab.onos.cluster.MastershipTerm; | 26 | import org.onlab.onos.cluster.MastershipTerm; |
22 | import org.onlab.onos.cluster.NodeId; | 27 | import org.onlab.onos.cluster.NodeId; |
23 | import org.onlab.onos.net.DeviceId; | 28 | import org.onlab.onos.net.DeviceId; |
... | @@ -40,7 +45,6 @@ public class DistributedMastershipStoreTest { | ... | @@ -40,7 +45,6 @@ public class DistributedMastershipStoreTest { |
40 | private static final DeviceId DID1 = DeviceId.deviceId("of:01"); | 45 | private static final DeviceId DID1 = DeviceId.deviceId("of:01"); |
41 | private static final DeviceId DID2 = DeviceId.deviceId("of:02"); | 46 | private static final DeviceId DID2 = DeviceId.deviceId("of:02"); |
42 | private static final DeviceId DID3 = DeviceId.deviceId("of:03"); | 47 | private static final DeviceId DID3 = DeviceId.deviceId("of:03"); |
43 | - private static final DeviceId DID4 = DeviceId.deviceId("of:04"); | ||
44 | 48 | ||
45 | private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1"); | 49 | private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1"); |
46 | 50 | ||
... | @@ -191,6 +195,28 @@ public class DistributedMastershipStoreTest { | ... | @@ -191,6 +195,28 @@ public class DistributedMastershipStoreTest { |
191 | assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2)); | 195 | assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2)); |
192 | } | 196 | } |
193 | 197 | ||
198 | + @Ignore("Ignore until Delegate spec. is clear.") | ||
199 | + @Test | ||
200 | + public void testEvents() throws InterruptedException { | ||
201 | + //shamelessly copy other distributed store tests | ||
202 | + final CountDownLatch addLatch = new CountDownLatch(1); | ||
203 | + | ||
204 | + MastershipStoreDelegate checkAdd = new MastershipStoreDelegate() { | ||
205 | + @Override | ||
206 | + public void notify(MastershipEvent event) { | ||
207 | + assertEquals("wrong event:", Type.MASTER_CHANGED, event.type()); | ||
208 | + assertEquals("wrong subject", DID1, event.subject()); | ||
209 | + assertEquals("wrong subject", N1, event.master()); | ||
210 | + addLatch.countDown(); | ||
211 | + } | ||
212 | + }; | ||
213 | + | ||
214 | + dms.setDelegate(checkAdd); | ||
215 | + dms.setMaster(N1, DID1); | ||
216 | + //this will fail until we do something about single-instance-ness | ||
217 | + assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS)); | ||
218 | + } | ||
219 | + | ||
194 | private class TestDistributedMastershipStore extends | 220 | private class TestDistributedMastershipStore extends |
195 | DistributedMastershipStore { | 221 | DistributedMastershipStore { |
196 | public TestDistributedMastershipStore(StoreService storeService, | 222 | public TestDistributedMastershipStore(StoreService storeService, | ... | ... |
-
Please register or login to post a comment