pankaj

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 22 changed files with 626 additions and 52 deletions
...@@ -56,7 +56,8 @@ public interface MastershipService { ...@@ -56,7 +56,8 @@ public interface MastershipService {
56 Set<DeviceId> getDevicesOf(NodeId nodeId); 56 Set<DeviceId> getDevicesOf(NodeId nodeId);
57 57
58 /** 58 /**
59 - * Returns the mastership term service for getting term information. 59 + * Returns the mastership term service for getting read-only
60 + * term information.
60 * 61 *
61 * @return the MastershipTermService for this mastership manager 62 * @return the MastershipTermService for this mastership manager
62 */ 63 */
......
...@@ -64,4 +64,14 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD ...@@ -64,4 +64,14 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
64 * @return the current master's ID and the term value for device, or null 64 * @return the current master's ID and the term value for device, or null
65 */ 65 */
66 MastershipTerm getTermFor(DeviceId deviceId); 66 MastershipTerm getTermFor(DeviceId deviceId);
67 +
68 + /**
69 + * Revokes a controller instance's mastership over a device and hands
70 + * over mastership to another controller instance.
71 + *
72 + * @param nodeId the controller instance identifier
73 + * @param deviceId device to revoke mastership for
74 + * @return a mastership event
75 + */
76 + MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId);
67 } 77 }
......
1 +package org.onlab.onos.cluster;
2 +
3 +import static org.junit.Assert.assertEquals;
4 +
5 +import org.junit.Test;
6 +
7 +import com.google.common.testing.EqualsTester;
8 +
9 +public class MastershipTermTest {
10 +
11 + private static final NodeId N1 = new NodeId("foo");
12 + private static final NodeId N2 = new NodeId("bar");
13 +
14 + private static final MastershipTerm TERM1 = MastershipTerm.of(N1, 0);
15 + private static final MastershipTerm TERM2 = MastershipTerm.of(N2, 1);
16 + private static final MastershipTerm TERM3 = MastershipTerm.of(N2, 1);
17 + private static final MastershipTerm TERM4 = MastershipTerm.of(N1, 1);
18 +
19 + @Test
20 + public void basics() {
21 + assertEquals("incorrect term number", 0, TERM1.termNumber());
22 + assertEquals("incorrect master", new NodeId("foo"), TERM1.master());
23 + }
24 +
25 + @Test
26 + public void testEquality() {
27 + new EqualsTester().addEqualityGroup(MastershipTerm.of(N1, 0), TERM1)
28 + .addEqualityGroup(TERM2, TERM3)
29 + .addEqualityGroup(TERM4);
30 + }
31 +
32 +}
...@@ -11,6 +11,8 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -11,6 +11,8 @@ import org.apache.felix.scr.annotations.Deactivate;
11 import org.apache.felix.scr.annotations.Reference; 11 import org.apache.felix.scr.annotations.Reference;
12 import org.apache.felix.scr.annotations.ReferenceCardinality; 12 import org.apache.felix.scr.annotations.ReferenceCardinality;
13 import org.apache.felix.scr.annotations.Service; 13 import org.apache.felix.scr.annotations.Service;
14 +import org.onlab.onos.cluster.ClusterEvent;
15 +import org.onlab.onos.cluster.ClusterEventListener;
14 import org.onlab.onos.cluster.ClusterService; 16 import org.onlab.onos.cluster.ClusterService;
15 import org.onlab.onos.cluster.MastershipAdminService; 17 import org.onlab.onos.cluster.MastershipAdminService;
16 import org.onlab.onos.cluster.MastershipEvent; 18 import org.onlab.onos.cluster.MastershipEvent;
...@@ -52,9 +54,12 @@ implements MastershipService, MastershipAdminService { ...@@ -52,9 +54,12 @@ implements MastershipService, MastershipAdminService {
52 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 54 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 protected ClusterService clusterService; 55 protected ClusterService clusterService;
54 56
57 + private ClusterEventListener clusterListener = new InternalClusterEventListener();
58 +
55 @Activate 59 @Activate
56 public void activate() { 60 public void activate() {
57 eventDispatcher.addSink(MastershipEvent.class, listenerRegistry); 61 eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
62 + clusterService.addListener(clusterListener);
58 store.setDelegate(delegate); 63 store.setDelegate(delegate);
59 log.info("Started"); 64 log.info("Started");
60 } 65 }
...@@ -62,6 +67,7 @@ implements MastershipService, MastershipAdminService { ...@@ -62,6 +67,7 @@ implements MastershipService, MastershipAdminService {
62 @Deactivate 67 @Deactivate
63 public void deactivate() { 68 public void deactivate() {
64 eventDispatcher.removeSink(MastershipEvent.class); 69 eventDispatcher.removeSink(MastershipEvent.class);
70 + clusterService.removeListener(clusterListener);
65 store.unsetDelegate(delegate); 71 store.unsetDelegate(delegate);
66 log.info("Stopped"); 72 log.info("Stopped");
67 } 73 }
...@@ -71,12 +77,16 @@ implements MastershipService, MastershipAdminService { ...@@ -71,12 +77,16 @@ implements MastershipService, MastershipAdminService {
71 checkNotNull(nodeId, NODE_ID_NULL); 77 checkNotNull(nodeId, NODE_ID_NULL);
72 checkNotNull(deviceId, DEVICE_ID_NULL); 78 checkNotNull(deviceId, DEVICE_ID_NULL);
73 checkNotNull(role, ROLE_NULL); 79 checkNotNull(role, ROLE_NULL);
74 - //TODO figure out appropriate action for non-MASTER roles, if we even set those 80 +
81 + MastershipEvent event = null;
75 if (role.equals(MastershipRole.MASTER)) { 82 if (role.equals(MastershipRole.MASTER)) {
76 - MastershipEvent event = store.setMaster(nodeId, deviceId); 83 + event = store.setMaster(nodeId, deviceId);
77 - if (event != null) { 84 + } else {
78 - post(event); 85 + event = store.unsetMaster(nodeId, deviceId);
79 - } 86 + }
87 +
88 + if (event != null) {
89 + post(event);
80 } 90 }
81 } 91 }
82 92
...@@ -88,8 +98,16 @@ implements MastershipService, MastershipAdminService { ...@@ -88,8 +98,16 @@ implements MastershipService, MastershipAdminService {
88 98
89 @Override 99 @Override
90 public void relinquishMastership(DeviceId deviceId) { 100 public void relinquishMastership(DeviceId deviceId) {
91 - checkNotNull(deviceId, DEVICE_ID_NULL); 101 + MastershipRole role = getLocalRole(deviceId);
92 - // FIXME: add method to store to give up mastership and trigger new master selection process 102 + if (!role.equals(MastershipRole.MASTER)) {
103 + return;
104 + }
105 +
106 + MastershipEvent event = store.unsetMaster(
107 + clusterService.getLocalNode().id(), deviceId);
108 + if (event != null) {
109 + post(event);
110 + }
93 } 111 }
94 112
95 @Override 113 @Override
...@@ -146,6 +164,26 @@ implements MastershipService, MastershipAdminService { ...@@ -146,6 +164,26 @@ implements MastershipService, MastershipAdminService {
146 164
147 } 165 }
148 166
167 + //callback for reacting to cluster events
168 + private class InternalClusterEventListener implements ClusterEventListener {
169 +
170 + @Override
171 + public void event(ClusterEvent event) {
172 + switch (event.type()) {
173 + //FIXME: worry about addition when the time comes
174 + case INSTANCE_ADDED:
175 + case INSTANCE_ACTIVATED:
176 + break;
177 + case INSTANCE_REMOVED:
178 + case INSTANCE_DEACTIVATED:
179 + break;
180 + default:
181 + log.warn("unknown cluster event {}", event);
182 + }
183 + }
184 +
185 + }
186 +
149 public class InternalDelegate implements MastershipStoreDelegate { 187 public class InternalDelegate implements MastershipStoreDelegate {
150 188
151 @Override 189 @Override
......
...@@ -16,6 +16,7 @@ import org.onlab.onos.cluster.ClusterService; ...@@ -16,6 +16,7 @@ import org.onlab.onos.cluster.ClusterService;
16 import org.onlab.onos.cluster.MastershipEvent; 16 import org.onlab.onos.cluster.MastershipEvent;
17 import org.onlab.onos.cluster.MastershipListener; 17 import org.onlab.onos.cluster.MastershipListener;
18 import org.onlab.onos.cluster.MastershipService; 18 import org.onlab.onos.cluster.MastershipService;
19 +import org.onlab.onos.cluster.MastershipTermService;
19 import org.onlab.onos.cluster.MastershipTerm; 20 import org.onlab.onos.cluster.MastershipTerm;
20 import org.onlab.onos.event.AbstractListenerRegistry; 21 import org.onlab.onos.event.AbstractListenerRegistry;
21 import org.onlab.onos.event.EventDeliveryService; 22 import org.onlab.onos.event.EventDeliveryService;
...@@ -76,6 +77,8 @@ public class DeviceManager ...@@ -76,6 +77,8 @@ public class DeviceManager
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected MastershipService mastershipService; 78 protected MastershipService mastershipService;
78 79
80 + protected MastershipTermService termService;
81 +
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected ClockService clockService; 83 protected ClockService clockService;
81 84
...@@ -84,6 +87,7 @@ public class DeviceManager ...@@ -84,6 +87,7 @@ public class DeviceManager
84 store.setDelegate(delegate); 87 store.setDelegate(delegate);
85 eventDispatcher.addSink(DeviceEvent.class, listenerRegistry); 88 eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
86 mastershipService.addListener(mastershipListener); 89 mastershipService.addListener(mastershipListener);
90 + termService = mastershipService.requestTermService();
87 log.info("Started"); 91 log.info("Started");
88 } 92 }
89 93
...@@ -198,7 +202,7 @@ public class DeviceManager ...@@ -198,7 +202,7 @@ public class DeviceManager
198 log.info("Device {} connected", deviceId); 202 log.info("Device {} connected", deviceId);
199 mastershipService.requestRoleFor(deviceId); 203 mastershipService.requestRoleFor(deviceId);
200 provider().roleChanged(event.subject(), 204 provider().roleChanged(event.subject(),
201 - mastershipService.getLocalRole(deviceId)); 205 + mastershipService.requestRoleFor(deviceId));
202 post(event); 206 post(event);
203 } 207 }
204 } 208 }
...@@ -208,8 +212,11 @@ public class DeviceManager ...@@ -208,8 +212,11 @@ public class DeviceManager
208 checkNotNull(deviceId, DEVICE_ID_NULL); 212 checkNotNull(deviceId, DEVICE_ID_NULL);
209 checkValidity(); 213 checkValidity();
210 DeviceEvent event = store.markOffline(deviceId); 214 DeviceEvent event = store.markOffline(deviceId);
215 +
216 + //we're no longer capable of mastership.
211 if (event != null) { 217 if (event != null) {
212 log.info("Device {} disconnected", deviceId); 218 log.info("Device {} disconnected", deviceId);
219 + mastershipService.relinquishMastership(deviceId);
213 post(event); 220 post(event);
214 } 221 }
215 } 222 }
......
...@@ -65,8 +65,8 @@ public class DefaultTopologyProvider extends AbstractProvider ...@@ -65,8 +65,8 @@ public class DefaultTopologyProvider extends AbstractProvider
65 private volatile boolean isStarted = false; 65 private volatile boolean isStarted = false;
66 66
67 private TopologyProviderService providerService; 67 private TopologyProviderService providerService;
68 - private DeviceListener deviceListener = new InnerDeviceListener(); 68 + private DeviceListener deviceListener = new InternalDeviceListener();
69 - private LinkListener linkListener = new InnerLinkListener(); 69 + private LinkListener linkListener = new InternalLinkListener();
70 70
71 private EventAccumulator accumulator; 71 private EventAccumulator accumulator;
72 private ExecutorService executor; 72 private ExecutorService executor;
...@@ -132,7 +132,7 @@ public class DefaultTopologyProvider extends AbstractProvider ...@@ -132,7 +132,7 @@ public class DefaultTopologyProvider extends AbstractProvider
132 } 132 }
133 133
134 // Callback for device events 134 // Callback for device events
135 - private class InnerDeviceListener implements DeviceListener { 135 + private class InternalDeviceListener implements DeviceListener {
136 @Override 136 @Override
137 public void event(DeviceEvent event) { 137 public void event(DeviceEvent event) {
138 DeviceEvent.Type type = event.type(); 138 DeviceEvent.Type type = event.type();
...@@ -144,7 +144,7 @@ public class DefaultTopologyProvider extends AbstractProvider ...@@ -144,7 +144,7 @@ public class DefaultTopologyProvider extends AbstractProvider
144 } 144 }
145 145
146 // Callback for link events 146 // Callback for link events
147 - private class InnerLinkListener implements LinkListener { 147 + private class InternalLinkListener implements LinkListener {
148 @Override 148 @Override
149 public void event(LinkEvent event) { 149 public void event(LinkEvent event) {
150 accumulator.add(event); 150 accumulator.add(event);
......
...@@ -19,6 +19,7 @@ import org.onlab.onos.net.trivial.impl.SimpleMastershipStore; ...@@ -19,6 +19,7 @@ import org.onlab.onos.net.trivial.impl.SimpleMastershipStore;
19 import org.onlab.packet.IpPrefix; 19 import org.onlab.packet.IpPrefix;
20 20
21 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertEquals;
22 +import static org.junit.Assert.assertNull;
22 import static org.onlab.onos.net.MastershipRole.*; 23 import static org.onlab.onos.net.MastershipRole.*;
23 24
24 /** 25 /**
...@@ -65,7 +66,24 @@ public class MastershipManagerTest { ...@@ -65,7 +66,24 @@ public class MastershipManagerTest {
65 66
66 @Test 67 @Test
67 public void relinquishMastership() { 68 public void relinquishMastership() {
68 - //TODO 69 + //no backups - should turn to standby and no master for device
70 + mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
71 + assertEquals("wrong role:", MASTER, mgr.getLocalRole(DEV_MASTER));
72 + mgr.relinquishMastership(DEV_MASTER);
73 + assertNull("wrong master:", mgr.getMasterFor(DEV_OTHER));
74 + assertEquals("wrong role:", STANDBY, mgr.getLocalRole(DEV_MASTER));
75 +
76 + //not master, nothing should happen
77 + mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY);
78 + mgr.relinquishMastership(DEV_OTHER);
79 + assertNull("wrong role:", mgr.getMasterFor(DEV_OTHER));
80 +
81 + //provide NID_OTHER as backup and relinquish
82 + mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
83 + assertEquals("wrong master:", NID_LOCAL, mgr.getMasterFor(DEV_MASTER));
84 + mgr.setRole(NID_OTHER, DEV_MASTER, STANDBY);
85 + mgr.relinquishMastership(DEV_MASTER);
86 + assertEquals("wrong master:", NID_OTHER, mgr.getMasterFor(DEV_MASTER));
69 } 87 }
70 88
71 @Test 89 @Test
...@@ -95,7 +113,6 @@ public class MastershipManagerTest { ...@@ -95,7 +113,6 @@ public class MastershipManagerTest {
95 mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER); 113 mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
96 mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY); 114 mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY);
97 assertEquals("should be one device:", 1, mgr.getDevicesOf(NID_LOCAL).size()); 115 assertEquals("should be one device:", 1, mgr.getDevicesOf(NID_LOCAL).size());
98 -
99 //hand both devices to NID_LOCAL 116 //hand both devices to NID_LOCAL
100 mgr.setRole(NID_LOCAL, DEV_OTHER, MASTER); 117 mgr.setRole(NID_LOCAL, DEV_OTHER, MASTER);
101 assertEquals("should be two devices:", 2, mgr.getDevicesOf(NID_LOCAL).size()); 118 assertEquals("should be two devices:", 2, mgr.getDevicesOf(NID_LOCAL).size());
......
1 +package org.onlab.onos.store.cluster.messaging;
2 +
3 +import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_ADVERTISEMENT;
4 +import java.util.Map;
5 +
6 +import org.onlab.onos.cluster.NodeId;
7 +import org.onlab.onos.store.Timestamp;
8 +
9 +import com.google.common.collect.ImmutableMap;
10 +
11 +/**
12 + * Anti-Entropy advertisement message.
13 + *
14 + * @param <ID> ID type
15 + */
16 +public class AntiEntropyAdvertisement<ID> extends ClusterMessage {
17 +
18 + private final NodeId sender;
19 + private final ImmutableMap<ID, Timestamp> advertisement;
20 +
21 + /**
22 + * Creates anti-entropy advertisement message.
23 + *
24 + * @param sender sender of this message
25 + * @param advertisement timestamp information of the data sender holds
26 + */
27 + public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
28 + super(AE_ADVERTISEMENT);
29 + this.sender = sender;
30 + this.advertisement = ImmutableMap.copyOf(advertisement);
31 + }
32 +
33 + public NodeId sender() {
34 + return sender;
35 + }
36 +
37 + public ImmutableMap<ID, Timestamp> advertisement() {
38 + return advertisement;
39 + }
40 +
41 + // Default constructor for serializer
42 + protected AntiEntropyAdvertisement() {
43 + super(AE_ADVERTISEMENT);
44 + this.sender = null;
45 + this.advertisement = null;
46 + }
47 +}
1 +package org.onlab.onos.store.cluster.messaging;
2 +
3 +import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_REPLY;
4 +
5 +import java.util.Map;
6 +import java.util.Set;
7 +
8 +import org.onlab.onos.cluster.NodeId;
9 +import org.onlab.onos.store.device.impl.VersionedValue;
10 +
11 +import com.google.common.collect.ImmutableMap;
12 +import com.google.common.collect.ImmutableSet;
13 +
14 +public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
15 +
16 + private final NodeId sender;
17 + private final ImmutableMap<ID, VersionedValue<VALUE>> suggestion;
18 + private final ImmutableSet<ID> request;
19 +
20 + /**
21 + * Creates a reply to anti-entropy message.
22 + *
23 + * @param sender sender of this message
24 + * @param suggestion collection of more recent values, sender had
25 + * @param request Collection of identifiers
26 + */
27 + public AntiEntropyReply(NodeId sender,
28 + Map<ID, VersionedValue<VALUE>> suggestion,
29 + Set<ID> request) {
30 + super(AE_REPLY);
31 + this.sender = sender;
32 + this.suggestion = ImmutableMap.copyOf(suggestion);
33 + this.request = ImmutableSet.copyOf(request);
34 + }
35 +
36 + public NodeId sender() {
37 + return sender;
38 + }
39 +
40 + public ImmutableMap<ID, VersionedValue<VALUE>> suggestion() {
41 + return suggestion;
42 + }
43 +
44 + public ImmutableSet<ID> request() {
45 + return request;
46 + }
47 +
48 + // Default constructor for serializer
49 + protected AntiEntropyReply() {
50 + super(AE_REPLY);
51 + this.sender = null;
52 + this.suggestion = null;
53 + this.request = null;
54 + }
55 +}
...@@ -15,6 +15,12 @@ public enum MessageSubject { ...@@ -15,6 +15,12 @@ public enum MessageSubject {
15 LEAVING_MEMBER, 15 LEAVING_MEMBER,
16 16
17 /** Signifies a heart-beat message. */ 17 /** Signifies a heart-beat message. */
18 - ECHO 18 + ECHO,
19 +
20 + /** Anti-Entropy advertisement message. */
21 + AE_ADVERTISEMENT,
22 +
23 + /** Anti-Entropy reply message. */
24 + AE_REPLY,
19 25
20 } 26 }
......
...@@ -42,4 +42,12 @@ public class VersionedValue<T> { ...@@ -42,4 +42,12 @@ public class VersionedValue<T> {
42 public Timestamp timestamp() { 42 public Timestamp timestamp() {
43 return timestamp; 43 return timestamp;
44 } 44 }
45 +
46 +
47 + // Default constructor for serializer
48 + protected VersionedValue() {
49 + this.entity = null;
50 + this.isUp = false;
51 + this.timestamp = null;
52 + }
45 } 53 }
......
...@@ -123,6 +123,12 @@ implements MastershipStore { ...@@ -123,6 +123,12 @@ implements MastershipStore {
123 return null; 123 return null;
124 } 124 }
125 125
126 + @Override
127 + public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
128 + // TODO Auto-generated method stub
129 + return null;
130 + }
131 +
126 private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> { 132 private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
127 public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) { 133 public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
128 super(cache); 134 super(cache);
......
1 +package org.onlab.onos.store.serializers;
2 +
3 +import java.util.Collections;
4 +import java.util.HashMap;
5 +import java.util.Map;
6 +
7 +import org.onlab.util.KryoPool.FamilySerializer;
8 +
9 +import com.esotericsoftware.kryo.Kryo;
10 +import com.esotericsoftware.kryo.io.Input;
11 +import com.esotericsoftware.kryo.io.Output;
12 +import com.esotericsoftware.kryo.serializers.MapSerializer;
13 +import com.google.common.collect.ImmutableMap;
14 +
15 +/**
16 +* Kryo Serializer for {@link ImmutableMap}.
17 +*/
18 +public class ImmutableMapSerializer extends FamilySerializer<ImmutableMap<?, ?>> {
19 +
20 + private final MapSerializer mapSerializer = new MapSerializer();
21 +
22 + public ImmutableMapSerializer() {
23 + // non-null, immutable
24 + super(false, true);
25 + }
26 +
27 + @Override
28 + public void write(Kryo kryo, Output output, ImmutableMap<?, ?> object) {
29 + // wrapping with unmodifiableMap proxy
30 + // to avoid Kryo from writing only the reference marker of this instance,
31 + // which will be embedded right before this method call.
32 + kryo.writeObject(output, Collections.unmodifiableMap(object), mapSerializer);
33 + }
34 +
35 + @Override
36 + public ImmutableMap<?, ?> read(Kryo kryo, Input input,
37 + Class<ImmutableMap<?, ?>> type) {
38 + Map<?, ?> map = kryo.readObject(input, HashMap.class, mapSerializer);
39 + return ImmutableMap.copyOf(map);
40 + }
41 +
42 + @Override
43 + public void registerFamilies(Kryo kryo) {
44 + kryo.register(ImmutableMap.of().getClass(), this);
45 + kryo.register(ImmutableMap.of(1, 2).getClass(), this);
46 + kryo.register(ImmutableMap.of(1, 2, 3, 4).getClass(), this);
47 + // TODO register required ImmutableMap variants
48 + }
49 +}
1 +package org.onlab.onos.store.serializers;
2 +
3 +import java.util.ArrayList;
4 +import java.util.List;
5 +
6 +import org.onlab.util.KryoPool.FamilySerializer;
7 +
8 +import com.esotericsoftware.kryo.Kryo;
9 +import com.esotericsoftware.kryo.io.Input;
10 +import com.esotericsoftware.kryo.io.Output;
11 +import com.esotericsoftware.kryo.serializers.CollectionSerializer;
12 +import com.google.common.collect.ImmutableSet;
13 +
14 +/**
15 +* Kryo Serializer for {@link ImmutableSet}.
16 +*/
17 +public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
18 +
19 + private final CollectionSerializer serializer = new CollectionSerializer();
20 +
21 + public ImmutableSetSerializer() {
22 + // non-null, immutable
23 + super(false, true);
24 + }
25 +
26 + @Override
27 + public void write(Kryo kryo, Output output, ImmutableSet<?> object) {
28 + kryo.writeObject(output, object.asList(), serializer);
29 + }
30 +
31 + @Override
32 + public ImmutableSet<?> read(Kryo kryo, Input input,
33 + Class<ImmutableSet<?>> type) {
34 + List<?> elms = kryo.readObject(input, ArrayList.class, serializer);
35 + return ImmutableSet.copyOf(elms);
36 + }
37 +
38 + @Override
39 + public void registerFamilies(Kryo kryo) {
40 + kryo.register(ImmutableSet.of().getClass(), this);
41 + kryo.register(ImmutableSet.of(1).getClass(), this);
42 + kryo.register(ImmutableSet.of(1, 2).getClass(), this);
43 + // TODO register required ImmutableSet variants
44 + }
45 +}
1 +package org.onlab.onos.store.serializers;
2 +
3 +import static org.onlab.onos.net.DeviceId.deviceId;
4 +import static org.onlab.onos.net.PortNumber.portNumber;
5 +
6 +import java.net.URI;
7 +import java.nio.ByteBuffer;
8 +import java.util.ArrayList;
9 +import java.util.HashMap;
10 +
11 +import org.junit.After;
12 +import org.junit.Before;
13 +import org.junit.BeforeClass;
14 +import org.junit.Test;
15 +import org.onlab.onos.cluster.NodeId;
16 +import org.onlab.onos.net.ConnectPoint;
17 +import org.onlab.onos.net.DefaultDevice;
18 +import org.onlab.onos.net.DefaultLink;
19 +import org.onlab.onos.net.DefaultPort;
20 +import org.onlab.onos.net.Device;
21 +import org.onlab.onos.net.DeviceId;
22 +import org.onlab.onos.net.Link;
23 +import org.onlab.onos.net.LinkKey;
24 +import org.onlab.onos.net.PortNumber;
25 +import org.onlab.onos.net.provider.ProviderId;
26 +import org.onlab.packet.IpPrefix;
27 +import org.onlab.util.KryoPool;
28 +
29 +import com.google.common.collect.ImmutableMap;
30 +import com.google.common.collect.ImmutableSet;
31 +import com.google.common.testing.EqualsTester;
32 +
33 +import de.javakaffee.kryoserializers.URISerializer;
34 +
35 +public class KryoSerializerTests {
36 + private static final ProviderId PID = new ProviderId("of", "foo");
37 + private static final DeviceId DID1 = deviceId("of:foo");
38 + private static final DeviceId DID2 = deviceId("of:bar");
39 + private static final PortNumber P1 = portNumber(1);
40 + private static final PortNumber P2 = portNumber(2);
41 + private static final ConnectPoint CP1 = new ConnectPoint(DID1, P1);
42 + private static final ConnectPoint CP2 = new ConnectPoint(DID2, P2);
43 + private static final String MFR = "whitebox";
44 + private static final String HW = "1.1.x";
45 + private static final String SW1 = "3.8.1";
46 + private static final String SW2 = "3.9.5";
47 + private static final String SN = "43311-12345";
48 + private static final Device DEV1 = new DefaultDevice(PID, DID1, Device.Type.SWITCH, MFR, HW, SW1, SN);
49 +
50 + private static KryoPool kryos;
51 +
52 + @BeforeClass
53 + public static void setUpBeforeClass() throws Exception {
54 + kryos = KryoPool.newBuilder()
55 + .register(
56 + ArrayList.class,
57 + HashMap.class
58 + )
59 + .register(
60 + Device.Type.class,
61 + Link.Type.class
62 +
63 +// ControllerNode.State.class,
64 +// DefaultControllerNode.class,
65 +// MastershipRole.class,
66 +// Port.class,
67 +// Element.class,
68 + )
69 + .register(ConnectPoint.class, new ConnectPointSerializer())
70 + .register(DefaultLink.class, new DefaultLinkSerializer())
71 + .register(DefaultPort.class, new DefaultPortSerializer())
72 + .register(DeviceId.class, new DeviceIdSerializer())
73 + .register(ImmutableMap.class, new ImmutableMapSerializer())
74 + .register(ImmutableSet.class, new ImmutableSetSerializer())
75 + .register(IpPrefix.class, new IpPrefixSerializer())
76 + .register(LinkKey.class, new LinkKeySerializer())
77 + .register(NodeId.class, new NodeIdSerializer())
78 + .register(PortNumber.class, new PortNumberSerializer())
79 + .register(ProviderId.class, new ProviderIdSerializer())
80 +
81 + .register(DefaultDevice.class)
82 +
83 + .register(URI.class, new URISerializer())
84 + .build();
85 + }
86 +
87 + @Before
88 + public void setUp() throws Exception {
89 + }
90 +
91 + @After
92 + public void tearDown() throws Exception {
93 + // removing Kryo instance to use fresh Kryo on each tests
94 + kryos.getKryo();
95 + }
96 +
97 + private static <T> void testSerialized(T original) {
98 + ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
99 + kryos.serialize(original, buffer);
100 + buffer.flip();
101 + T copy = kryos.deserialize(buffer);
102 +
103 + new EqualsTester()
104 + .addEqualityGroup(original, copy)
105 + .testEquals();
106 + }
107 +
108 +
109 + @Test
110 + public final void test() {
111 + testSerialized(new ConnectPoint(DID1, P1));
112 + testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT));
113 + testSerialized(new DefaultPort(DEV1, P1, true));
114 + testSerialized(DID1);
115 + testSerialized(ImmutableMap.of(DID1, DEV1, DID2, DEV1));
116 + testSerialized(ImmutableMap.of(DID1, DEV1));
117 + testSerialized(ImmutableMap.of());
118 + testSerialized(ImmutableSet.of(DID1, DID2));
119 + testSerialized(ImmutableSet.of(DID1));
120 + testSerialized(ImmutableSet.of());
121 + testSerialized(IpPrefix.valueOf("192.168.0.1/24"));
122 + testSerialized(new LinkKey(CP1, CP2));
123 + testSerialized(new NodeId("SomeNodeIdentifier"));
124 + testSerialized(P1);
125 + testSerialized(PID);
126 + }
127 +
128 +}
...@@ -7,8 +7,6 @@ import java.util.HashMap; ...@@ -7,8 +7,6 @@ import java.util.HashMap;
7 import java.util.HashSet; 7 import java.util.HashSet;
8 import java.util.Map; 8 import java.util.Map;
9 import java.util.Set; 9 import java.util.Set;
10 -import java.util.concurrent.ConcurrentHashMap;
11 -import java.util.concurrent.ConcurrentMap;
12 import java.util.concurrent.atomic.AtomicInteger; 10 import java.util.concurrent.atomic.AtomicInteger;
13 11
14 import org.apache.felix.scr.annotations.Activate; 12 import org.apache.felix.scr.annotations.Activate;
...@@ -48,8 +46,10 @@ public class SimpleMastershipStore ...@@ -48,8 +46,10 @@ public class SimpleMastershipStore
48 new DefaultControllerNode(new NodeId("local"), LOCALHOST); 46 new DefaultControllerNode(new NodeId("local"), LOCALHOST);
49 47
50 //devices mapped to their masters, to emulate multiple nodes 48 //devices mapped to their masters, to emulate multiple nodes
51 - protected final ConcurrentMap<DeviceId, NodeId> masterMap = 49 + protected final Map<DeviceId, NodeId> masterMap = new HashMap<>();
52 - new ConcurrentHashMap<>(); 50 + //emulate backups with pile of nodes
51 + protected final Set<NodeId> backups = new HashSet<>();
52 + //terms
53 protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>(); 53 protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
54 54
55 @Activate 55 @Activate
...@@ -64,25 +64,29 @@ public class SimpleMastershipStore ...@@ -64,25 +64,29 @@ public class SimpleMastershipStore
64 64
65 @Override 65 @Override
66 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) { 66 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
67 - 67 + MastershipRole role = getRole(nodeId, deviceId);
68 - NodeId node = masterMap.get(deviceId); 68 +
69 - if (node == null) { 69 + synchronized (this) {
70 - synchronized (this) { 70 + switch (role) {
71 - masterMap.put(deviceId, nodeId); 71 + case MASTER:
72 - termMap.put(deviceId, new AtomicInteger()); 72 + return null;
73 + case STANDBY:
74 + masterMap.put(deviceId, nodeId);
75 + termMap.get(deviceId).incrementAndGet();
76 + backups.add(nodeId);
77 + break;
78 + case NONE:
79 + masterMap.put(deviceId, nodeId);
80 + termMap.put(deviceId, new AtomicInteger());
81 + backups.add(nodeId);
82 + break;
83 + default:
84 + log.warn("unknown Mastership Role {}", role);
85 + return null;
73 } 86 }
74 - return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
75 } 87 }
76 88
77 - if (node.equals(nodeId)) { 89 + return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
78 - return null;
79 - } else {
80 - synchronized (this) {
81 - masterMap.put(deviceId, nodeId);
82 - termMap.get(deviceId).incrementAndGet();
83 - return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
84 - }
85 - }
86 } 90 }
87 91
88 @Override 92 @Override
...@@ -103,34 +107,112 @@ public class SimpleMastershipStore ...@@ -103,34 +107,112 @@ public class SimpleMastershipStore
103 107
104 @Override 108 @Override
105 public MastershipRole requestRole(DeviceId deviceId) { 109 public MastershipRole requestRole(DeviceId deviceId) {
106 - return getRole(instance.id(), deviceId); 110 + //query+possible reelection
111 + NodeId node = instance.id();
112 + MastershipRole role = getRole(node, deviceId);
113 +
114 + switch (role) {
115 + case MASTER:
116 + break;
117 + case STANDBY:
118 + synchronized (this) {
119 + //try to "re-elect", since we're really not distributed
120 + NodeId rel = reelect(node);
121 + if (rel == null) {
122 + masterMap.put(deviceId, node);
123 + termMap.put(deviceId, new AtomicInteger());
124 + role = MastershipRole.MASTER;
125 + }
126 + backups.add(node);
127 + }
128 + break;
129 + case NONE:
130 + //first to get to it, say we are master
131 + synchronized (this) {
132 + masterMap.put(deviceId, node);
133 + termMap.put(deviceId, new AtomicInteger());
134 + backups.add(node);
135 + role = MastershipRole.MASTER;
136 + }
137 + break;
138 + default:
139 + log.warn("unknown Mastership Role {}", role);
140 + }
141 + return role;
107 } 142 }
108 143
109 @Override 144 @Override
110 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) { 145 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
111 - NodeId node = masterMap.get(deviceId); 146 + //just query
147 + NodeId current = masterMap.get(deviceId);
112 MastershipRole role; 148 MastershipRole role;
113 - if (node != null) { 149 +
114 - if (node.equals(nodeId)) { 150 + if (current == null) {
151 + //degenerate case - only node is its own backup
152 + if (backups.contains(nodeId)) {
153 + role = MastershipRole.STANDBY;
154 + } else {
155 + role = MastershipRole.NONE;
156 + }
157 + } else {
158 + if (current.equals(nodeId)) {
115 role = MastershipRole.MASTER; 159 role = MastershipRole.MASTER;
116 } else { 160 } else {
117 role = MastershipRole.STANDBY; 161 role = MastershipRole.STANDBY;
118 } 162 }
119 - } else {
120 - //masterMap doesn't contain it.
121 - role = MastershipRole.MASTER;
122 - masterMap.put(deviceId, nodeId);
123 } 163 }
124 return role; 164 return role;
125 } 165 }
126 166
127 @Override 167 @Override
128 public MastershipTerm getTermFor(DeviceId deviceId) { 168 public MastershipTerm getTermFor(DeviceId deviceId) {
129 - if (masterMap.get(deviceId) == null) { 169 + if ((masterMap.get(deviceId) == null) ||
170 + (termMap.get(deviceId) == null)) {
130 return null; 171 return null;
131 } 172 }
132 return MastershipTerm.of( 173 return MastershipTerm.of(
133 masterMap.get(deviceId), termMap.get(deviceId).get()); 174 masterMap.get(deviceId), termMap.get(deviceId).get());
134 } 175 }
135 176
177 + @Override
178 + public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
179 + MastershipRole role = getRole(nodeId, deviceId);
180 + synchronized (this) {
181 + switch (role) {
182 + case MASTER:
183 + NodeId backup = reelect(nodeId);
184 + if (backup == null) {
185 + masterMap.remove(deviceId);
186 + } else {
187 + masterMap.put(deviceId, backup);
188 + termMap.get(deviceId).incrementAndGet();
189 + return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
190 + }
191 + case STANDBY:
192 + case NONE:
193 + if (!termMap.containsKey(deviceId)) {
194 + termMap.put(deviceId, new AtomicInteger());
195 + }
196 + backups.add(nodeId);
197 + break;
198 + default:
199 + log.warn("unknown Mastership Role {}", role);
200 + }
201 + }
202 + return null;
203 + }
204 +
205 + //dumbly selects next-available node that's not the current one
206 + //emulate leader election
207 + private NodeId reelect(NodeId nodeId) {
208 + NodeId backup = null;
209 + for (NodeId n : backups) {
210 + if (!n.equals(nodeId)) {
211 + backup = n;
212 + break;
213 + }
214 + }
215 + return backup;
216 + }
217 +
136 } 218 }
......
...@@ -34,6 +34,8 @@ cp -r $ONOS_ROOT/tools/package/etc/* $KARAF_DIST/etc ...@@ -34,6 +34,8 @@ cp -r $ONOS_ROOT/tools/package/etc/* $KARAF_DIST/etc
34 mkdir -p $KARAF_DIST/system/org/onlab 34 mkdir -p $KARAF_DIST/system/org/onlab
35 cp -r $M2_REPO/org/onlab $KARAF_DIST/system/org/ 35 cp -r $M2_REPO/org/onlab $KARAF_DIST/system/org/
36 36
37 +export ONOS_FEATURES="${ONOS_FEATURES:-webconsole,onos-api,onos-core,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd,onos-app-foo}"
38 +
37 # Cellar Patching -------------------------------------------------------------- 39 # Cellar Patching --------------------------------------------------------------
38 40
39 # Patch the Apache Karaf distribution file to add Cellar features repository 41 # Patch the Apache Karaf distribution file to add Cellar features repository
...@@ -51,7 +53,7 @@ perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-feature ...@@ -51,7 +53,7 @@ perl -pi.old -e "s|^(featuresRepositories=.*)|\1,mvn:org.onlab.onos/onos-feature
51 $ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg 53 $ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
52 54
53 # Patch the Apache Karaf distribution file to load ONOS features 55 # Patch the Apache Karaf distribution file to load ONOS features
54 -perl -pi.old -e 's|^(featuresBoot=.*)|\1,webconsole,onos-api,onos-core,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd,onos-app-foo|' \ 56 +perl -pi.old -e "s|^(featuresBoot=.*)|\1,$ONOS_FEATURES|" \
55 $ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg 57 $ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
56 58
57 # Patch the Apache Karaf distribution with ONOS branding bundle 59 # Patch the Apache Karaf distribution with ONOS branding bundle
......
...@@ -66,6 +66,7 @@ function cell { ...@@ -66,6 +66,7 @@ function cell {
66 env | egrep "OCI" 66 env | egrep "OCI"
67 env | egrep "OC[0-9]+" | sort 67 env | egrep "OC[0-9]+" | sort
68 env | egrep "OCN" 68 env | egrep "OCN"
69 + env | egrep "ONOS_" | egrep -v 'ONOS_ROOT|ONOS_CELL'
69 fi 70 fi
70 } 71 }
71 72
......
1 -unset OC1 OC2 OC3 OC4 OC5 OC6 OC7 OC8 OC9 OCN ONOS_NIC 1 +unset OC1 OC2 OC3 OC4 OC5 OC6 OC7 OC8 OC9 OCN ONOS_NIC ONOS_FEATURES
......
1 # ProxMox-based cell of ONOS instances 1,2 & ONOS mininet box 1 # ProxMox-based cell of ONOS instances 1,2 & ONOS mininet box
2 . $ONOS_ROOT/tools/test/cells/.reset 2 . $ONOS_ROOT/tools/test/cells/.reset
3 3
4 +export ONOS_FEATURES="webconsole,onos-api,onos-core-trivial,onos-cli,onos-openflow,onos-app-fwd,onos-app-mobility,onos-app-tvue"
5 +
4 export ONOS_NIC="10.128.4.*" 6 export ONOS_NIC="10.128.4.*"
5 7
6 export OC1="10.128.4.60" 8 export OC1="10.128.4.60"
7 -#export OC2="192.168.97.131"
8 -
9 -#export OCN="192.168.97.130"
10 9
......
1 +# Default virtual box ONOS instances 1,2 & ONOS mininet box
2 +
3 +export ONOS_NIC=192.168.56.*
4 +
5 +export ONOS_FEATURES="webconsole,onos-api,onos-core-trivial,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd,onos-app-foo"
6 +
7 +export OC1="192.168.56.101"
8 +export OC2="192.168.56.102"
9 +export OC3="192.168.56.104"
10 +
11 +export OCN="192.168.56.103"
12 +
...@@ -239,12 +239,41 @@ public final class KryoPool { ...@@ -239,12 +239,41 @@ public final class KryoPool {
239 Kryo kryo = new Kryo(); 239 Kryo kryo = new Kryo();
240 kryo.setRegistrationRequired(registrationRequired); 240 kryo.setRegistrationRequired(registrationRequired);
241 for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) { 241 for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
242 - if (registry.getRight() == null) { 242 + final Serializer<?> serializer = registry.getRight();
243 + if (serializer == null) {
243 kryo.register(registry.getLeft()); 244 kryo.register(registry.getLeft());
244 } else { 245 } else {
245 - kryo.register(registry.getLeft(), registry.getRight()); 246 + kryo.register(registry.getLeft(), serializer);
247 + if (serializer instanceof FamilySerializer) {
248 + FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
249 + fser.registerFamilies(kryo);
250 + }
246 } 251 }
247 } 252 }
248 return kryo; 253 return kryo;
249 } 254 }
255 +
256 + /**
257 + * Serializer implementation, which required registration of family of Classes.
258 + * @param <T> base type of this serializer.
259 + */
260 + public abstract static class FamilySerializer<T> extends Serializer<T> {
261 +
262 +
263 + public FamilySerializer(boolean acceptsNull) {
264 + super(acceptsNull);
265 + }
266 +
267 + public FamilySerializer(boolean acceptsNull, boolean immutable) {
268 + super(acceptsNull, immutable);
269 + }
270 +
271 + /**
272 + * Registers other classes this Serializer supports.
273 + *
274 + * @param kryo instance to register classes to
275 + */
276 + public void registerFamilies(Kryo kryo) {
277 + }
278 + }
250 } 279 }
......