Yuta HIGUCHI

DistributedDeviceStore

Change-Id: I34cf5a787bf0f9d16840bf2e3cc8d0167060f628
...@@ -369,7 +369,7 @@ public class DistributedDeviceStore ...@@ -369,7 +369,7 @@ public class DistributedDeviceStore
369 } 369 }
370 370
371 @Override 371 @Override
372 - protected void onUpdate(DeviceId deviceId, DefaultDevice device) { 372 + protected void onUpdate(DeviceId deviceId, DefaultDevice oldDevice, DefaultDevice device) {
373 notifyDelegate(new DeviceEvent(DEVICE_UPDATED, device)); 373 notifyDelegate(new DeviceEvent(DEVICE_UPDATED, device));
374 } 374 }
375 } 375 }
...@@ -390,7 +390,7 @@ public class DistributedDeviceStore ...@@ -390,7 +390,7 @@ public class DistributedDeviceStore
390 } 390 }
391 391
392 @Override 392 @Override
393 - protected void onUpdate(DeviceId deviceId, Map<PortNumber, Port> ports) { 393 + protected void onUpdate(DeviceId deviceId, Map<PortNumber, Port> oldPorts, Map<PortNumber, Port> ports) {
394 // notifyDelegate(new DeviceEvent(PORT_UPDATED, getDevice(deviceId))); 394 // notifyDelegate(new DeviceEvent(PORT_UPDATED, getDevice(deviceId)));
395 } 395 }
396 } 396 }
......
...@@ -101,7 +101,7 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD ...@@ -101,7 +101,7 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
101 V newVal = deserialize(event.getValue()); 101 V newVal = deserialize(event.getValue());
102 Optional<V> newValue = Optional.of(newVal); 102 Optional<V> newValue = Optional.of(newVal);
103 cache.asMap().replace(key, oldValue, newValue); 103 cache.asMap().replace(key, oldValue, newValue);
104 - onUpdate(key, newVal); 104 + onUpdate(key, oldVal, newVal);
105 } 105 }
106 106
107 @Override 107 @Override
...@@ -125,9 +125,10 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD ...@@ -125,9 +125,10 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
125 * Cache entry update hook. 125 * Cache entry update hook.
126 * 126 *
127 * @param key new key 127 * @param key new key
128 + * @param oldValue old value
128 * @param newVal new value 129 * @param newVal new value
129 */ 130 */
130 - protected void onUpdate(K key, V newVal) { 131 + protected void onUpdate(K key, V oldValue, V newVal) {
131 } 132 }
132 133
133 /** 134 /**
......
...@@ -14,19 +14,26 @@ import org.apache.felix.scr.annotations.Service; ...@@ -14,19 +14,26 @@ import org.apache.felix.scr.annotations.Service;
14 import org.onlab.onos.cluster.ControllerNode; 14 import org.onlab.onos.cluster.ControllerNode;
15 import org.onlab.onos.cluster.DefaultControllerNode; 15 import org.onlab.onos.cluster.DefaultControllerNode;
16 import org.onlab.onos.cluster.NodeId; 16 import org.onlab.onos.cluster.NodeId;
17 +import org.onlab.onos.net.ConnectPoint;
17 import org.onlab.onos.net.DefaultDevice; 18 import org.onlab.onos.net.DefaultDevice;
19 +import org.onlab.onos.net.DefaultLink;
18 import org.onlab.onos.net.DefaultPort; 20 import org.onlab.onos.net.DefaultPort;
19 import org.onlab.onos.net.Device; 21 import org.onlab.onos.net.Device;
20 import org.onlab.onos.net.DeviceId; 22 import org.onlab.onos.net.DeviceId;
21 import org.onlab.onos.net.Element; 23 import org.onlab.onos.net.Element;
24 +import org.onlab.onos.net.Link;
25 +import org.onlab.onos.net.LinkKey;
22 import org.onlab.onos.net.MastershipRole; 26 import org.onlab.onos.net.MastershipRole;
23 import org.onlab.onos.net.Port; 27 import org.onlab.onos.net.Port;
24 import org.onlab.onos.net.PortNumber; 28 import org.onlab.onos.net.PortNumber;
25 import org.onlab.onos.net.provider.ProviderId; 29 import org.onlab.onos.net.provider.ProviderId;
26 import org.onlab.onos.store.common.StoreService; 30 import org.onlab.onos.store.common.StoreService;
31 +import org.onlab.onos.store.serializers.ConnectPointSerializer;
32 +import org.onlab.onos.store.serializers.DefaultLinkSerializer;
27 import org.onlab.onos.store.serializers.DefaultPortSerializer; 33 import org.onlab.onos.store.serializers.DefaultPortSerializer;
28 import org.onlab.onos.store.serializers.DeviceIdSerializer; 34 import org.onlab.onos.store.serializers.DeviceIdSerializer;
29 import org.onlab.onos.store.serializers.IpPrefixSerializer; 35 import org.onlab.onos.store.serializers.IpPrefixSerializer;
36 +import org.onlab.onos.store.serializers.LinkKeySerializer;
30 import org.onlab.onos.store.serializers.NodeIdSerializer; 37 import org.onlab.onos.store.serializers.NodeIdSerializer;
31 import org.onlab.onos.store.serializers.OnosTimestampSerializer; 38 import org.onlab.onos.store.serializers.OnosTimestampSerializer;
32 import org.onlab.onos.store.serializers.PortNumberSerializer; 39 import org.onlab.onos.store.serializers.PortNumberSerializer;
...@@ -84,7 +91,9 @@ public class StoreManager implements StoreService { ...@@ -84,7 +91,9 @@ public class StoreManager implements StoreService {
84 DefaultDevice.class, 91 DefaultDevice.class,
85 MastershipRole.class, 92 MastershipRole.class,
86 Port.class, 93 Port.class,
87 - Element.class 94 + Element.class,
95 +
96 + Link.Type.class
88 ) 97 )
89 .register(IpPrefix.class, new IpPrefixSerializer()) 98 .register(IpPrefix.class, new IpPrefixSerializer())
90 .register(URI.class, new URISerializer()) 99 .register(URI.class, new URISerializer())
...@@ -94,6 +103,9 @@ public class StoreManager implements StoreService { ...@@ -94,6 +103,9 @@ public class StoreManager implements StoreService {
94 .register(PortNumber.class, new PortNumberSerializer()) 103 .register(PortNumber.class, new PortNumberSerializer())
95 .register(DefaultPort.class, new DefaultPortSerializer()) 104 .register(DefaultPort.class, new DefaultPortSerializer())
96 .register(OnosTimestamp.class, new OnosTimestampSerializer()) 105 .register(OnosTimestamp.class, new OnosTimestampSerializer())
106 + .register(LinkKey.class, new LinkKeySerializer())
107 + .register(ConnectPoint.class, new ConnectPointSerializer())
108 + .register(DefaultLink.class, new DefaultLinkSerializer())
97 .build() 109 .build()
98 .populate(10); 110 .populate(10);
99 } 111 }
......
1 +package org.onlab.onos.store.link.impl;
2 +
3 +import static com.google.common.cache.CacheBuilder.newBuilder;
4 +import static org.onlab.onos.net.Link.Type.DIRECT;
5 +import static org.onlab.onos.net.Link.Type.INDIRECT;
6 +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
7 +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
8 +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
9 +import static org.slf4j.LoggerFactory.getLogger;
10 +
11 +import java.util.HashSet;
12 +import java.util.Set;
13 +import org.apache.felix.scr.annotations.Activate;
14 +import org.apache.felix.scr.annotations.Component;
15 +import org.apache.felix.scr.annotations.Deactivate;
16 +import org.apache.felix.scr.annotations.Service;
17 +import org.onlab.onos.net.ConnectPoint;
18 +import org.onlab.onos.net.DefaultLink;
19 +import org.onlab.onos.net.DeviceId;
20 +import org.onlab.onos.net.Link;
21 +import org.onlab.onos.net.LinkKey;
22 +import org.onlab.onos.net.link.LinkDescription;
23 +import org.onlab.onos.net.link.LinkEvent;
24 +import org.onlab.onos.net.link.LinkStore;
25 +import org.onlab.onos.net.link.LinkStoreDelegate;
26 +import org.onlab.onos.net.provider.ProviderId;
27 +import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
28 +import org.onlab.onos.store.impl.AbstractDistributedStore;
29 +import org.onlab.onos.store.impl.OptionalCacheLoader;
30 +import org.slf4j.Logger;
31 +
32 +import com.google.common.base.Optional;
33 +import com.google.common.cache.LoadingCache;
34 +import com.google.common.collect.HashMultimap;
35 +import com.google.common.collect.ImmutableSet;
36 +import com.google.common.collect.Multimap;
37 +import com.google.common.collect.ImmutableSet.Builder;
38 +import com.hazelcast.core.IMap;
39 +
40 +/**
41 + * Manages inventory of infrastructure links using Hazelcast-backed map.
42 + */
43 +@Component(immediate = true)
44 +@Service
45 +public class DistributedLinkStore
46 + extends AbstractDistributedStore<LinkEvent, LinkStoreDelegate>
47 + implements LinkStore {
48 +
49 + private final Logger log = getLogger(getClass());
50 +
51 + // Link inventory
52 + private IMap<byte[], byte[]> rawLinks;
53 + private LoadingCache<LinkKey, Optional<DefaultLink>> links;
54 +
55 + // TODO synchronize?
56 + // Egress and ingress link sets
57 + private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
58 + private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
59 +
60 + @Override
61 + @Activate
62 + public void activate() {
63 + super.activate();
64 +
65 + boolean includeValue = true;
66 +
67 + // TODO decide on Map name scheme to avoid collision
68 + rawLinks = theInstance.getMap("links");
69 + final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
70 + = new OptionalCacheLoader<>(storeService, rawLinks);
71 + links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
72 + // refresh/populate cache based on notification from other instance
73 + rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
74 +
75 + loadLinkCache();
76 +
77 + log.info("Started");
78 + }
79 +
80 + @Deactivate
81 + public void deactivate() {
82 + super.activate();
83 + log.info("Stopped");
84 + }
85 +
86 + private void loadLinkCache() {
87 + for (byte[] keyBytes : rawLinks.keySet()) {
88 + final LinkKey id = deserialize(keyBytes);
89 + links.refresh(id);
90 + }
91 + }
92 +
93 + @Override
94 + public int getLinkCount() {
95 + return links.asMap().size();
96 + }
97 +
98 + @Override
99 + public Iterable<Link> getLinks() {
100 + Builder<Link> builder = ImmutableSet.builder();
101 + for (Optional<DefaultLink> e : links.asMap().values()) {
102 + if (e.isPresent()) {
103 + builder.add(e.get());
104 + }
105 + }
106 + return builder.build();
107 + }
108 +
109 + @Override
110 + public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
111 + return ImmutableSet.copyOf(srcLinks.get(deviceId));
112 + }
113 +
114 + @Override
115 + public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
116 + return ImmutableSet.copyOf(dstLinks.get(deviceId));
117 + }
118 +
119 + @Override
120 + public Link getLink(ConnectPoint src, ConnectPoint dst) {
121 + return links.getUnchecked(new LinkKey(src, dst)).orNull();
122 + }
123 +
124 + @Override
125 + public Set<Link> getEgressLinks(ConnectPoint src) {
126 + Set<Link> egress = new HashSet<>();
127 + for (Link link : srcLinks.get(src.deviceId())) {
128 + if (link.src().equals(src)) {
129 + egress.add(link);
130 + }
131 + }
132 + return egress;
133 + }
134 +
135 + @Override
136 + public Set<Link> getIngressLinks(ConnectPoint dst) {
137 + Set<Link> ingress = new HashSet<>();
138 + for (Link link : dstLinks.get(dst.deviceId())) {
139 + if (link.dst().equals(dst)) {
140 + ingress.add(link);
141 + }
142 + }
143 + return ingress;
144 + }
145 +
146 + @Override
147 + public LinkEvent createOrUpdateLink(ProviderId providerId,
148 + LinkDescription linkDescription) {
149 + LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
150 + Optional<DefaultLink> link = links.getUnchecked(key);
151 + if (!link.isPresent()) {
152 + return createLink(providerId, key, linkDescription);
153 + }
154 + return updateLink(providerId, link.get(), key, linkDescription);
155 + }
156 +
157 + // Creates and stores the link and returns the appropriate event.
158 + private LinkEvent createLink(ProviderId providerId, LinkKey key,
159 + LinkDescription linkDescription) {
160 + DefaultLink link = new DefaultLink(providerId, key.src(), key.dst(),
161 + linkDescription.type());
162 + synchronized (this) {
163 + final byte[] keyBytes = serialize(key);
164 + rawLinks.put(keyBytes, serialize(link));
165 + links.asMap().putIfAbsent(key, Optional.of(link));
166 +
167 + addNewLink(link);
168 + }
169 + return new LinkEvent(LINK_ADDED, link);
170 + }
171 +
172 + // update Egress and ingress link sets
173 + private void addNewLink(DefaultLink link) {
174 + synchronized (this) {
175 + srcLinks.put(link.src().deviceId(), link);
176 + dstLinks.put(link.dst().deviceId(), link);
177 + }
178 + }
179 +
180 + // Updates, if necessary the specified link and returns the appropriate event.
181 + private LinkEvent updateLink(ProviderId providerId, DefaultLink link,
182 + LinkKey key, LinkDescription linkDescription) {
183 + // FIXME confirm Link update condition is OK
184 + if (link.type() == INDIRECT && linkDescription.type() == DIRECT) {
185 + synchronized (this) {
186 +
187 + DefaultLink updated =
188 + new DefaultLink(providerId, link.src(), link.dst(),
189 + linkDescription.type());
190 + final byte[] keyBytes = serialize(key);
191 + rawLinks.put(keyBytes, serialize(updated));
192 + links.asMap().replace(key, Optional.of(link), Optional.of(updated));
193 +
194 + replaceLink(link, updated);
195 + return new LinkEvent(LINK_UPDATED, updated);
196 + }
197 + }
198 + return null;
199 + }
200 +
201 + // update Egress and ingress link sets
202 + private void replaceLink(DefaultLink link, DefaultLink updated) {
203 + synchronized (this) {
204 + srcLinks.remove(link.src().deviceId(), link);
205 + dstLinks.remove(link.dst().deviceId(), link);
206 +
207 + srcLinks.put(link.src().deviceId(), updated);
208 + dstLinks.put(link.dst().deviceId(), updated);
209 + }
210 + }
211 +
212 + @Override
213 + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
214 + synchronized (this) {
215 + LinkKey key = new LinkKey(src, dst);
216 + byte[] keyBytes = serialize(key);
217 + Link link = deserialize(rawLinks.remove(keyBytes));
218 + links.invalidate(key);
219 + if (link != null) {
220 + removeLink(link);
221 + return new LinkEvent(LINK_REMOVED, link);
222 + }
223 + return null;
224 + }
225 + }
226 +
227 + // update Egress and ingress link sets
228 + private void removeLink(Link link) {
229 + synchronized (this) {
230 + srcLinks.remove(link.src().deviceId(), link);
231 + dstLinks.remove(link.dst().deviceId(), link);
232 + }
233 + }
234 +
235 + private class RemoteLinkEventHandler extends RemoteEventHandler<LinkKey, DefaultLink> {
236 + public RemoteLinkEventHandler(LoadingCache<LinkKey, Optional<DefaultLink>> cache) {
237 + super(cache);
238 + }
239 +
240 + @Override
241 + protected void onAdd(LinkKey key, DefaultLink newVal) {
242 + addNewLink(newVal);
243 + notifyDelegate(new LinkEvent(LINK_ADDED, newVal));
244 + }
245 +
246 + @Override
247 + protected void onUpdate(LinkKey key, DefaultLink oldVal, DefaultLink newVal) {
248 + replaceLink(oldVal, newVal);
249 + notifyDelegate(new LinkEvent(LINK_UPDATED, newVal));
250 + }
251 +
252 + @Override
253 + protected void onRemove(LinkKey key, DefaultLink val) {
254 + removeLink(val);
255 + notifyDelegate(new LinkEvent(LINK_REMOVED, val));
256 + }
257 + }
258 +}
1 +/**
2 + * Implementation of link store using Hazelcast distributed structures.
3 + */
4 +package org.onlab.onos.store.link.impl;
1 +package org.onlab.onos.store.serializers;
2 +
3 +import org.onlab.onos.net.ConnectPoint;
4 +import org.onlab.onos.net.ElementId;
5 +import org.onlab.onos.net.PortNumber;
6 +
7 +import com.esotericsoftware.kryo.Kryo;
8 +import com.esotericsoftware.kryo.Serializer;
9 +import com.esotericsoftware.kryo.io.Input;
10 +import com.esotericsoftware.kryo.io.Output;
11 +
12 +/**
13 + * Kryo Serializer for {@link ConnectPointSerializer}.
14 + */
15 +public class ConnectPointSerializer extends Serializer<ConnectPoint> {
16 +
17 + /**
18 + * Default constructor.
19 + */
20 + public ConnectPointSerializer() {
21 + // non-null, immutable
22 + super(false, true);
23 + }
24 +
25 + @Override
26 + public void write(Kryo kryo, Output output, ConnectPoint object) {
27 + kryo.writeClassAndObject(output, object.elementId());
28 + kryo.writeClassAndObject(output, object.port());
29 + }
30 +
31 + @Override
32 + public ConnectPoint read(Kryo kryo, Input input, Class<ConnectPoint> type) {
33 + ElementId elementId = (ElementId) kryo.readClassAndObject(input);
34 + PortNumber portNumber = (PortNumber) kryo.readClassAndObject(input);
35 + return new ConnectPoint(elementId, portNumber);
36 + }
37 +}
1 +package org.onlab.onos.store.serializers;
2 +
3 +import org.onlab.onos.net.ConnectPoint;
4 +import org.onlab.onos.net.DefaultLink;
5 +import org.onlab.onos.net.Link.Type;
6 +import org.onlab.onos.net.provider.ProviderId;
7 +
8 +import com.esotericsoftware.kryo.Kryo;
9 +import com.esotericsoftware.kryo.Serializer;
10 +import com.esotericsoftware.kryo.io.Input;
11 +import com.esotericsoftware.kryo.io.Output;
12 +
13 +/**
14 + * Kryo Serializer for {@link DefaultLink}.
15 + */
16 +public class DefaultLinkSerializer extends Serializer<DefaultLink> {
17 +
18 + /**
19 + * Default constructor.
20 + */
21 + public DefaultLinkSerializer() {
22 + // non-null, immutable
23 + super(false, true);
24 + }
25 +
26 + @Override
27 + public void write(Kryo kryo, Output output, DefaultLink object) {
28 + kryo.writeClassAndObject(output, object.providerId());
29 + kryo.writeClassAndObject(output, object.src());
30 + kryo.writeClassAndObject(output, object.dst());
31 + kryo.writeClassAndObject(output, object.type());
32 + }
33 +
34 + @Override
35 + public DefaultLink read(Kryo kryo, Input input, Class<DefaultLink> type) {
36 + ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
37 + ConnectPoint src = (ConnectPoint) kryo.readClassAndObject(input);
38 + ConnectPoint dst = (ConnectPoint) kryo.readClassAndObject(input);
39 + Type linkType = (Type) kryo.readClassAndObject(input);
40 + return new DefaultLink(providerId, src, dst, linkType);
41 + }
42 +}
1 +package org.onlab.onos.store.serializers;
2 +
3 +import org.onlab.onos.net.ConnectPoint;
4 +import org.onlab.onos.net.LinkKey;
5 +import com.esotericsoftware.kryo.Kryo;
6 +import com.esotericsoftware.kryo.Serializer;
7 +import com.esotericsoftware.kryo.io.Input;
8 +import com.esotericsoftware.kryo.io.Output;
9 +
10 +/**
11 + * Kryo Serializer for {@link LinkKey}.
12 + */
13 +public class LinkKeySerializer extends Serializer<LinkKey> {
14 +
15 + /**
16 + * Default constructor.
17 + */
18 + public LinkKeySerializer() {
19 + // non-null, immutable
20 + super(false, true);
21 + }
22 +
23 + @Override
24 + public void write(Kryo kryo, Output output, LinkKey object) {
25 + kryo.writeClassAndObject(output, object.src());
26 + kryo.writeClassAndObject(output, object.dst());
27 + }
28 +
29 + @Override
30 + public LinkKey read(Kryo kryo, Input input, Class<LinkKey> type) {
31 + ConnectPoint src = (ConnectPoint) kryo.readClassAndObject(input);
32 + ConnectPoint dst = (ConnectPoint) kryo.readClassAndObject(input);
33 + return new LinkKey(src, dst);
34 + }
35 +}