Madan Jampani

Checking in GossipHostStore. The implementation is only partly done and not fully distributed, yet.

1 +package org.onlab.onos.store.host.impl;
2 +
3 +import com.google.common.collect.HashMultimap;
4 +import com.google.common.collect.ImmutableSet;
5 +import com.google.common.collect.Multimap;
6 +import com.google.common.collect.Sets;
7 +
8 +import org.apache.felix.scr.annotations.Activate;
9 +import org.apache.felix.scr.annotations.Component;
10 +import org.apache.felix.scr.annotations.Deactivate;
11 +import org.apache.felix.scr.annotations.Reference;
12 +import org.apache.felix.scr.annotations.ReferenceCardinality;
13 +import org.apache.felix.scr.annotations.Service;
14 +import org.onlab.onos.cluster.ClusterService;
15 +import org.onlab.onos.net.Annotations;
16 +import org.onlab.onos.net.ConnectPoint;
17 +import org.onlab.onos.net.DefaultHost;
18 +import org.onlab.onos.net.DeviceId;
19 +import org.onlab.onos.net.Host;
20 +import org.onlab.onos.net.HostId;
21 +import org.onlab.onos.net.HostLocation;
22 +import org.onlab.onos.net.host.HostClockService;
23 +import org.onlab.onos.net.host.HostDescription;
24 +import org.onlab.onos.net.host.HostEvent;
25 +import org.onlab.onos.net.host.HostStore;
26 +import org.onlab.onos.net.host.HostStoreDelegate;
27 +import org.onlab.onos.net.host.PortAddresses;
28 +import org.onlab.onos.net.provider.ProviderId;
29 +import org.onlab.onos.store.AbstractStore;
30 +import org.onlab.onos.store.Timestamp;
31 +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
32 +import org.onlab.onos.store.common.impl.Timestamped;
33 +import org.onlab.packet.IpPrefix;
34 +import org.onlab.packet.MacAddress;
35 +import org.onlab.packet.VlanId;
36 +import org.slf4j.Logger;
37 +
38 +import java.util.HashSet;
39 +import java.util.Map;
40 +import java.util.Set;
41 +import java.util.concurrent.ConcurrentHashMap;
42 +
43 +import static org.onlab.onos.net.host.HostEvent.Type.*;
44 +import static org.slf4j.LoggerFactory.getLogger;
45 +
46 +/**
47 + * Manages inventory of end-station hosts in distributed data store
48 + * that uses optimistic replication and gossip based techniques.
49 + */
50 +@Component(immediate = true)
51 +@Service
52 +public class GossipHostStore
53 + extends AbstractStore<HostEvent, HostStoreDelegate>
54 + implements HostStore {
55 +
56 + private final Logger log = getLogger(getClass());
57 +
58 + // Host inventory
59 + private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
60 +
61 + private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
62 +
63 + // Hosts tracked by their location
64 + private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
65 +
66 + private final Map<ConnectPoint, PortAddresses> portAddresses =
67 + new ConcurrentHashMap<>();
68 +
69 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 + protected HostClockService hostClockService;
71 +
72 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 + protected ClusterCommunicationService clusterCommunicator;
74 +
75 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 + protected ClusterService clusterService;
77 +
78 + @Activate
79 + public void activate() {
80 + log.info("Started");
81 + }
82 +
83 + @Deactivate
84 + public void deactivate() {
85 + log.info("Stopped");
86 + }
87 +
88 + @Override
89 + public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
90 + HostDescription hostDescription) {
91 + Timestamp timestamp = hostClockService.getTimestamp(hostId);
92 + return createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
93 + // TODO: tell peers.
94 + }
95 +
96 + private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
97 + HostDescription hostDescription, Timestamp timestamp) {
98 + StoredHost host = hosts.get(hostId);
99 + if (host == null) {
100 + return createHost(providerId, hostId, hostDescription, timestamp);
101 + }
102 + return updateHost(providerId, host, hostDescription, timestamp);
103 + }
104 +
105 + // creates a new host and sends HOST_ADDED
106 + private HostEvent createHost(ProviderId providerId, HostId hostId,
107 + HostDescription descr, Timestamp timestamp) {
108 + synchronized (this) {
109 + // If this host was previously removed, first ensure
110 + // this new request is "newer"
111 + if (removedHosts.containsKey(hostId)) {
112 + if (removedHosts.get(hostId).isNewer(timestamp)) {
113 + return null;
114 + } else {
115 + removedHosts.remove(hostId);
116 + }
117 + }
118 + StoredHost newhost = new StoredHost(providerId, hostId,
119 + descr.hwAddress(),
120 + descr.vlan(),
121 + new Timestamped<>(descr.location(), timestamp),
122 + ImmutableSet.of(descr.ipAddress()));
123 + hosts.put(hostId, newhost);
124 + locations.put(descr.location(), newhost);
125 + return new HostEvent(HOST_ADDED, newhost);
126 + }
127 + }
128 +
129 + // checks for type of update to host, sends appropriate event
130 + private HostEvent updateHost(ProviderId providerId, StoredHost host,
131 + HostDescription descr, Timestamp timestamp) {
132 + HostEvent event;
133 + if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
134 + host.setLocation(new Timestamped<>(descr.location(), timestamp));
135 + return new HostEvent(HOST_MOVED, host);
136 + }
137 +
138 + if (host.ipAddresses().contains(descr.ipAddress())) {
139 + return null;
140 + }
141 +
142 + Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
143 + addresses.add(descr.ipAddress());
144 + StoredHost updated = new StoredHost(providerId, host.id(),
145 + host.mac(), host.vlan(),
146 + host.location, addresses);
147 + event = new HostEvent(HOST_UPDATED, updated);
148 + synchronized (this) {
149 + hosts.put(host.id(), updated);
150 + locations.remove(host.location(), host);
151 + locations.put(updated.location(), updated);
152 + }
153 + return event;
154 + }
155 +
156 + @Override
157 + public HostEvent removeHost(HostId hostId) {
158 + Timestamp timestamp = hostClockService.getTimestamp(hostId);
159 + return removeHostInternal(hostId, timestamp);
160 + // TODO: tell peers
161 + }
162 +
163 + private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
164 + synchronized (this) {
165 + Host host = hosts.remove(hostId);
166 + if (host != null) {
167 + locations.remove((host.location()), host);
168 + removedHosts.put(hostId, new Timestamped<>(host, timestamp));
169 + return new HostEvent(HOST_REMOVED, host);
170 + }
171 + return null;
172 + }
173 + }
174 +
175 + @Override
176 + public int getHostCount() {
177 + return hosts.size();
178 + }
179 +
180 + @Override
181 + public Iterable<Host> getHosts() {
182 + return ImmutableSet.<Host>copyOf(hosts.values());
183 + }
184 +
185 + @Override
186 + public Host getHost(HostId hostId) {
187 + return hosts.get(hostId);
188 + }
189 +
190 + @Override
191 + public Set<Host> getHosts(VlanId vlanId) {
192 + Set<Host> vlanset = new HashSet<>();
193 + for (Host h : hosts.values()) {
194 + if (h.vlan().equals(vlanId)) {
195 + vlanset.add(h);
196 + }
197 + }
198 + return vlanset;
199 + }
200 +
201 + @Override
202 + public Set<Host> getHosts(MacAddress mac) {
203 + Set<Host> macset = new HashSet<>();
204 + for (Host h : hosts.values()) {
205 + if (h.mac().equals(mac)) {
206 + macset.add(h);
207 + }
208 + }
209 + return macset;
210 + }
211 +
212 + @Override
213 + public Set<Host> getHosts(IpPrefix ip) {
214 + Set<Host> ipset = new HashSet<>();
215 + for (Host h : hosts.values()) {
216 + if (h.ipAddresses().contains(ip)) {
217 + ipset.add(h);
218 + }
219 + }
220 + return ipset;
221 + }
222 +
223 + @Override
224 + public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
225 + return ImmutableSet.copyOf(locations.get(connectPoint));
226 + }
227 +
228 + @Override
229 + public Set<Host> getConnectedHosts(DeviceId deviceId) {
230 + Set<Host> hostset = new HashSet<>();
231 + for (ConnectPoint p : locations.keySet()) {
232 + if (p.deviceId().equals(deviceId)) {
233 + hostset.addAll(locations.get(p));
234 + }
235 + }
236 + return hostset;
237 + }
238 +
239 + @Override
240 + public void updateAddressBindings(PortAddresses addresses) {
241 + synchronized (portAddresses) {
242 + PortAddresses existing = portAddresses.get(addresses.connectPoint());
243 + if (existing == null) {
244 + portAddresses.put(addresses.connectPoint(), addresses);
245 + } else {
246 + Set<IpPrefix> union = Sets.union(existing.ips(), addresses.ips())
247 + .immutableCopy();
248 +
249 + MacAddress newMac = (addresses.mac() == null) ? existing.mac()
250 + : addresses.mac();
251 +
252 + PortAddresses newAddresses =
253 + new PortAddresses(addresses.connectPoint(), union, newMac);
254 +
255 + portAddresses.put(newAddresses.connectPoint(), newAddresses);
256 + }
257 + }
258 + }
259 +
260 + @Override
261 + public void removeAddressBindings(PortAddresses addresses) {
262 + synchronized (portAddresses) {
263 + PortAddresses existing = portAddresses.get(addresses.connectPoint());
264 + if (existing != null) {
265 + Set<IpPrefix> difference =
266 + Sets.difference(existing.ips(), addresses.ips()).immutableCopy();
267 +
268 + // If they removed the existing mac, set the new mac to null.
269 + // Otherwise, keep the existing mac.
270 + MacAddress newMac = existing.mac();
271 + if (addresses.mac() != null && addresses.mac().equals(existing.mac())) {
272 + newMac = null;
273 + }
274 +
275 + PortAddresses newAddresses =
276 + new PortAddresses(addresses.connectPoint(), difference, newMac);
277 +
278 + portAddresses.put(newAddresses.connectPoint(), newAddresses);
279 + }
280 + }
281 + }
282 +
283 + @Override
284 + public void clearAddressBindings(ConnectPoint connectPoint) {
285 + synchronized (portAddresses) {
286 + portAddresses.remove(connectPoint);
287 + }
288 + }
289 +
290 + @Override
291 + public Set<PortAddresses> getAddressBindings() {
292 + synchronized (portAddresses) {
293 + return new HashSet<>(portAddresses.values());
294 + }
295 + }
296 +
297 + @Override
298 + public PortAddresses getAddressBindingsForPort(ConnectPoint connectPoint) {
299 + PortAddresses addresses;
300 +
301 + synchronized (portAddresses) {
302 + addresses = portAddresses.get(connectPoint);
303 + }
304 +
305 + if (addresses == null) {
306 + addresses = new PortAddresses(connectPoint, null, null);
307 + }
308 +
309 + return addresses;
310 + }
311 +
312 + // Auxiliary extension to allow location to mutate.
313 + private class StoredHost extends DefaultHost {
314 + private Timestamped<HostLocation> location;
315 +
316 + /**
317 + * Creates an end-station host using the supplied information.
318 + *
319 + * @param providerId provider identity
320 + * @param id host identifier
321 + * @param mac host MAC address
322 + * @param vlan host VLAN identifier
323 + * @param location host location
324 + * @param ips host IP addresses
325 + * @param annotations optional key/value annotations
326 + */
327 + public StoredHost(ProviderId providerId, HostId id,
328 + MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
329 + Set<IpPrefix> ips, Annotations... annotations) {
330 + super(providerId, id, mac, vlan, location.value(), ips, annotations);
331 + this.location = location;
332 + }
333 +
334 + void setLocation(Timestamped<HostLocation> location) {
335 + this.location = location;
336 + }
337 +
338 + @Override
339 + public HostLocation location() {
340 + return location.value();
341 + }
342 + }
343 +}