pankaj

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 20 changed files with 550 additions and 120 deletions
1 package org.onlab.onos.ifwd; 1 package org.onlab.onos.ifwd;
2 2
3 +import static org.slf4j.LoggerFactory.getLogger;
4 +
3 import org.apache.felix.scr.annotations.Activate; 5 import org.apache.felix.scr.annotations.Activate;
4 import org.apache.felix.scr.annotations.Component; 6 import org.apache.felix.scr.annotations.Component;
5 import org.apache.felix.scr.annotations.Deactivate; 7 import org.apache.felix.scr.annotations.Deactivate;
...@@ -26,8 +28,6 @@ import org.onlab.onos.net.topology.TopologyService; ...@@ -26,8 +28,6 @@ import org.onlab.onos.net.topology.TopologyService;
26 import org.onlab.packet.Ethernet; 28 import org.onlab.packet.Ethernet;
27 import org.slf4j.Logger; 29 import org.slf4j.Logger;
28 30
29 -import static org.slf4j.LoggerFactory.getLogger;
30 -
31 /** 31 /**
32 * WORK-IN-PROGRESS: Sample reactive forwarding application using intent framework. 32 * WORK-IN-PROGRESS: Sample reactive forwarding application using intent framework.
33 */ 33 */
...@@ -50,7 +50,7 @@ public class IntentReactiveForwarding { ...@@ -50,7 +50,7 @@ public class IntentReactiveForwarding {
50 50
51 private ReactivePacketProcessor processor = new ReactivePacketProcessor(); 51 private ReactivePacketProcessor processor = new ReactivePacketProcessor();
52 52
53 - private static long intentId = 1; 53 + private static long intentId = 0x123000;
54 54
55 @Activate 55 @Activate
56 public void activate() { 56 public void activate() {
......
...@@ -36,6 +36,12 @@ ...@@ -36,6 +36,12 @@
36 <scope>test</scope> 36 <scope>test</scope>
37 </dependency> 37 </dependency>
38 38
39 + <dependency>
40 + <groupId>org.easymock</groupId>
41 + <artifactId>easymock</artifactId>
42 + <scope>test</scope>
43 + </dependency>
44 +
39 <!-- TODO Consider removing store dependency. 45 <!-- TODO Consider removing store dependency.
40 Currently required for DistributedDeviceManagerTest. --> 46 Currently required for DistributedDeviceManagerTest. -->
41 <dependency> 47 <dependency>
......
...@@ -60,14 +60,15 @@ public class HostMonitor implements TimerTask { ...@@ -60,14 +60,15 @@ public class HostMonitor implements TimerTask {
60 * 60 *
61 * @param deviceService device service used to find edge ports 61 * @param deviceService device service used to find edge ports
62 * @param packetService packet service used to send packets on the data plane 62 * @param packetService packet service used to send packets on the data plane
63 - * @param hostService host service used to look up host information 63 + * @param hostManager host manager used to look up host information and
64 + * probe existing hosts
64 */ 65 */
65 public HostMonitor(DeviceService deviceService, PacketService packetService, 66 public HostMonitor(DeviceService deviceService, PacketService packetService,
66 - HostManager hostService) { 67 + HostManager hostManager) {
67 68
68 this.deviceService = deviceService; 69 this.deviceService = deviceService;
69 this.packetService = packetService; 70 this.packetService = packetService;
70 - this.hostManager = hostService; 71 + this.hostManager = hostManager;
71 72
72 monitoredAddresses = Collections.newSetFromMap( 73 monitoredAddresses = Collections.newSetFromMap(
73 new ConcurrentHashMap<IpAddress, Boolean>()); 74 new ConcurrentHashMap<IpAddress, Boolean>());
......
1 +package org.onlab.onos.net.host.impl;
2 +
3 +import static org.easymock.EasyMock.createMock;
4 +import static org.easymock.EasyMock.expect;
5 +import static org.easymock.EasyMock.expectLastCall;
6 +import static org.easymock.EasyMock.replay;
7 +import static org.easymock.EasyMock.verify;
8 +import static org.junit.Assert.assertTrue;
9 +
10 +import java.util.ArrayList;
11 +import java.util.Arrays;
12 +import java.util.Collections;
13 +import java.util.List;
14 +import java.util.Set;
15 +
16 +import org.junit.Test;
17 +import org.onlab.onos.net.ConnectPoint;
18 +import org.onlab.onos.net.Device;
19 +import org.onlab.onos.net.DeviceId;
20 +import org.onlab.onos.net.Host;
21 +import org.onlab.onos.net.MastershipRole;
22 +import org.onlab.onos.net.Port;
23 +import org.onlab.onos.net.PortNumber;
24 +import org.onlab.onos.net.device.DeviceListener;
25 +import org.onlab.onos.net.device.DeviceService;
26 +import org.onlab.onos.net.flow.instructions.Instruction;
27 +import org.onlab.onos.net.flow.instructions.Instructions.OutputInstruction;
28 +import org.onlab.onos.net.host.HostProvider;
29 +import org.onlab.onos.net.host.PortAddresses;
30 +import org.onlab.onos.net.packet.OutboundPacket;
31 +import org.onlab.onos.net.packet.PacketProcessor;
32 +import org.onlab.onos.net.packet.PacketService;
33 +import org.onlab.onos.net.provider.ProviderId;
34 +import org.onlab.packet.ARP;
35 +import org.onlab.packet.Ethernet;
36 +import org.onlab.packet.IpAddress;
37 +import org.onlab.packet.IpPrefix;
38 +import org.onlab.packet.MacAddress;
39 +
40 +import com.google.common.collect.HashMultimap;
41 +import com.google.common.collect.Lists;
42 +import com.google.common.collect.Multimap;
43 +
44 +public class HostMonitorTest {
45 +
46 + private IpAddress targetIpAddress = IpAddress.valueOf("10.0.0.1");
47 + private IpPrefix targetIpPrefix = IpPrefix.valueOf(targetIpAddress.toOctets());
48 +
49 + private IpPrefix sourcePrefix = IpPrefix.valueOf("10.0.0.99/24");
50 + private MacAddress sourceMac = MacAddress.valueOf(1L);
51 +
52 + private HostMonitor hostMonitor;
53 +
54 + @Test
55 + public void testMonitorHostExists() throws Exception {
56 + ProviderId id = new ProviderId("fake://", "id");
57 +
58 + Host host = createMock(Host.class);
59 + expect(host.providerId()).andReturn(id);
60 + replay(host);
61 +
62 + HostManager hostManager = createMock(HostManager.class);
63 + expect(hostManager.getHostsByIp(targetIpPrefix))
64 + .andReturn(Collections.singleton(host));
65 + replay(hostManager);
66 +
67 + HostProvider hostProvider = createMock(HostProvider.class);
68 + expect(hostProvider.id()).andReturn(id).anyTimes();
69 + hostProvider.triggerProbe(host);
70 + expectLastCall().once();
71 + replay(hostProvider);
72 +
73 + hostMonitor = new HostMonitor(null, null, hostManager);
74 +
75 + hostMonitor.registerHostProvider(hostProvider);
76 + hostMonitor.addMonitoringFor(targetIpAddress);
77 +
78 + hostMonitor.run(null);
79 +
80 + verify(hostProvider);
81 + }
82 +
83 + @Test
84 + public void testMonitorHostDoesNotExist() throws Exception {
85 + HostManager hostManager = createMock(HostManager.class);
86 +
87 + DeviceId devId = DeviceId.deviceId("fake");
88 +
89 + Device device = createMock(Device.class);
90 + expect(device.id()).andReturn(devId).anyTimes();
91 + replay(device);
92 +
93 + PortNumber portNum = PortNumber.portNumber(1L);
94 +
95 + Port port = createMock(Port.class);
96 + expect(port.number()).andReturn(portNum).anyTimes();
97 + replay(port);
98 +
99 + TestDeviceService deviceService = new TestDeviceService();
100 + deviceService.addDevice(device, Collections.singleton(port));
101 +
102 + ConnectPoint cp = new ConnectPoint(devId, portNum);
103 + PortAddresses pa = new PortAddresses(cp, Collections.singleton(sourcePrefix),
104 + sourceMac);
105 +
106 + expect(hostManager.getHostsByIp(targetIpPrefix))
107 + .andReturn(Collections.<Host>emptySet()).anyTimes();
108 + expect(hostManager.getAddressBindingsForPort(cp))
109 + .andReturn(pa).anyTimes();
110 + replay(hostManager);
111 +
112 + TestPacketService packetService = new TestPacketService();
113 +
114 +
115 + // Run the test
116 + hostMonitor = new HostMonitor(deviceService, packetService, hostManager);
117 +
118 + hostMonitor.addMonitoringFor(targetIpAddress);
119 + hostMonitor.run(null);
120 +
121 +
122 + // Check that a packet was sent to our PacketService and that it has
123 + // the properties we expect
124 + assertTrue(packetService.packets.size() == 1);
125 + OutboundPacket packet = packetService.packets.get(0);
126 +
127 + // Check the output port is correct
128 + assertTrue(packet.treatment().instructions().size() == 1);
129 + Instruction instruction = packet.treatment().instructions().get(0);
130 + assertTrue(instruction instanceof OutputInstruction);
131 + OutputInstruction oi = (OutputInstruction) instruction;
132 + assertTrue(oi.port().equals(portNum));
133 +
134 + // Check the output packet is correct (well the important bits anyway)
135 + Ethernet eth = new Ethernet();
136 + eth.deserialize(packet.data().array(), 0, packet.data().array().length);
137 + ARP arp = (ARP) eth.getPayload();
138 + assertTrue(Arrays.equals(arp.getSenderProtocolAddress(), sourcePrefix.toOctets()));
139 + assertTrue(Arrays.equals(arp.getSenderHardwareAddress(), sourceMac.toBytes()));
140 + assertTrue(Arrays.equals(arp.getTargetProtocolAddress(), targetIpPrefix.toOctets()));
141 + }
142 +
143 + class TestPacketService implements PacketService {
144 +
145 + List<OutboundPacket> packets = new ArrayList<>();
146 +
147 + @Override
148 + public void addProcessor(PacketProcessor processor, int priority) {
149 + }
150 +
151 + @Override
152 + public void removeProcessor(PacketProcessor processor) {
153 + }
154 +
155 + @Override
156 + public void emit(OutboundPacket packet) {
157 + packets.add(packet);
158 + }
159 + }
160 +
161 + class TestDeviceService implements DeviceService {
162 +
163 + List<Device> devices = Lists.newArrayList();
164 + Multimap<DeviceId, Port> devicePorts = HashMultimap.create();
165 +
166 + void addDevice(Device device, Set<Port> ports) {
167 + devices.add(device);
168 + for (Port p : ports) {
169 + devicePorts.put(device.id(), p);
170 + }
171 + }
172 +
173 + @Override
174 + public int getDeviceCount() {
175 + return 0;
176 + }
177 +
178 + @Override
179 + public Iterable<Device> getDevices() {
180 + return devices;
181 + }
182 +
183 + @Override
184 + public Device getDevice(DeviceId deviceId) {
185 + return null;
186 + }
187 +
188 + @Override
189 + public MastershipRole getRole(DeviceId deviceId) {
190 + return null;
191 + }
192 +
193 + @Override
194 + public List<Port> getPorts(DeviceId deviceId) {
195 + List<Port> ports = Lists.newArrayList();
196 + for (Port p : devicePorts.get(deviceId)) {
197 + ports.add(p);
198 + }
199 + return ports;
200 + }
201 +
202 + @Override
203 + public Port getPort(DeviceId deviceId, PortNumber portNumber) {
204 + return null;
205 + }
206 +
207 + @Override
208 + public boolean isAvailable(DeviceId deviceId) {
209 + return false;
210 + }
211 +
212 + @Override
213 + public void addListener(DeviceListener listener) {
214 + }
215 +
216 + @Override
217 + public void removeListener(DeviceListener listener) {
218 + }
219 + }
220 +}
...@@ -3,6 +3,8 @@ package org.onlab.onos.store.cluster.messaging; ...@@ -3,6 +3,8 @@ package org.onlab.onos.store.cluster.messaging;
3 import org.onlab.onos.cluster.ControllerNode; 3 import org.onlab.onos.cluster.ControllerNode;
4 import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate; 4 import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
5 5
6 +// TODO: This service interface can be removed, once we properly start
7 +// using ClusterService
6 /** 8 /**
7 * Service for administering communications manager. 9 * Service for administering communications manager.
8 */ 10 */
......
...@@ -2,6 +2,8 @@ package org.onlab.onos.store.cluster.messaging; ...@@ -2,6 +2,8 @@ package org.onlab.onos.store.cluster.messaging;
2 2
3 import org.onlab.onos.cluster.NodeId; 3 import org.onlab.onos.cluster.NodeId;
4 4
5 +// TODO: ClusterMessage should be aware about how to serialize the payload
6 +// TODO: Should payload type be made generic?
5 /** 7 /**
6 * Base message for cluster-wide communications. 8 * Base message for cluster-wide communications.
7 */ 9 */
......
1 package org.onlab.onos.store.cluster.messaging; 1 package org.onlab.onos.store.cluster.messaging;
2 2
3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +
5 +import java.util.Objects;
6 +
3 /** 7 /**
4 * Representation of a message subject. 8 * Representation of a message subject.
5 * Cluster messages have associated subjects that dictate how they get handled 9 * Cluster messages have associated subjects that dictate how they get handled
...@@ -10,7 +14,7 @@ public class MessageSubject { ...@@ -10,7 +14,7 @@ public class MessageSubject {
10 private final String value; 14 private final String value;
11 15
12 public MessageSubject(String value) { 16 public MessageSubject(String value) {
13 - this.value = value; 17 + this.value = checkNotNull(value);
14 } 18 }
15 19
16 public String value() { 20 public String value() {
...@@ -21,4 +25,24 @@ public class MessageSubject { ...@@ -21,4 +25,24 @@ public class MessageSubject {
21 public String toString() { 25 public String toString() {
22 return value; 26 return value;
23 } 27 }
28 +
29 + @Override
30 + public int hashCode() {
31 + return value.hashCode();
32 + }
33 +
34 + @Override
35 + public boolean equals(Object obj) {
36 + if (this == obj) {
37 + return true;
38 + }
39 + if (obj == null) {
40 + return false;
41 + }
42 + if (getClass() != obj.getClass()) {
43 + return false;
44 + }
45 + MessageSubject that = (MessageSubject) obj;
46 + return Objects.equals(this.value, that.value);
47 + }
24 } 48 }
......
...@@ -39,6 +39,7 @@ public class ClusterCommunicationManager ...@@ -39,6 +39,7 @@ public class ClusterCommunicationManager
39 39
40 private ControllerNode localNode; 40 private ControllerNode localNode;
41 private ClusterNodesDelegate nodesDelegate; 41 private ClusterNodesDelegate nodesDelegate;
42 + // FIXME: `members` should go away and should be using ClusterService
42 private Map<NodeId, ControllerNode> members = new HashMap<>(); 43 private Map<NodeId, ControllerNode> members = new HashMap<>();
43 private final Timer timer = new Timer("onos-controller-heatbeats"); 44 private final Timer timer = new Timer("onos-controller-heatbeats");
44 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L; 45 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
......
...@@ -3,6 +3,8 @@ package org.onlab.onos.store.cluster.messaging.impl; ...@@ -3,6 +3,8 @@ package org.onlab.onos.store.cluster.messaging.impl;
3 import org.onlab.onos.store.cluster.messaging.MessageSubject; 3 import org.onlab.onos.store.cluster.messaging.MessageSubject;
4 4
5 public final class ClusterMessageSubjects { 5 public final class ClusterMessageSubjects {
6 + // avoid instantiation
6 private ClusterMessageSubjects() {} 7 private ClusterMessageSubjects() {}
8 +
7 public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"); 9 public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
8 } 10 }
......
...@@ -6,6 +6,8 @@ import java.util.Objects; ...@@ -6,6 +6,8 @@ import java.util.Objects;
6 6
7 import org.onlab.onos.store.Timestamp; 7 import org.onlab.onos.store.Timestamp;
8 8
9 +import com.google.common.base.MoreObjects;
10 +
9 /** 11 /**
10 * Wrapper class to store Timestamped value. 12 * Wrapper class to store Timestamped value.
11 * @param <T> 13 * @param <T>
...@@ -70,6 +72,14 @@ public final class Timestamped<T> { ...@@ -70,6 +72,14 @@ public final class Timestamped<T> {
70 return Objects.equals(this.timestamp, that.timestamp); 72 return Objects.equals(this.timestamp, that.timestamp);
71 } 73 }
72 74
75 + @Override
76 + public String toString() {
77 + return MoreObjects.toStringHelper(getClass())
78 + .add("timestamp", timestamp)
79 + .add("value", value)
80 + .toString();
81 + }
82 +
73 // Default constructor for serialization 83 // Default constructor for serialization
74 @Deprecated 84 @Deprecated
75 protected Timestamped() { 85 protected Timestamped() {
......
...@@ -2,7 +2,8 @@ package org.onlab.onos.store.device.impl; ...@@ -2,7 +2,8 @@ package org.onlab.onos.store.device.impl;
2 2
3 import com.google.common.collect.FluentIterable; 3 import com.google.common.collect.FluentIterable;
4 import com.google.common.collect.ImmutableList; 4 import com.google.common.collect.ImmutableList;
5 - 5 +import com.google.common.collect.Maps;
6 +import com.google.common.collect.Sets;
6 import org.apache.commons.lang3.concurrent.ConcurrentException; 7 import org.apache.commons.lang3.concurrent.ConcurrentException;
7 import org.apache.commons.lang3.concurrent.ConcurrentInitializer; 8 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
8 import org.apache.felix.scr.annotations.Activate; 9 import org.apache.felix.scr.annotations.Activate;
...@@ -59,7 +60,7 @@ import static org.onlab.onos.net.DefaultAnnotations.merge; ...@@ -59,7 +60,7 @@ import static org.onlab.onos.net.DefaultAnnotations.merge;
59 import static org.onlab.onos.net.DefaultAnnotations.union; 60 import static org.onlab.onos.net.DefaultAnnotations.union;
60 import static com.google.common.base.Verify.verify; 61 import static com.google.common.base.Verify.verify;
61 62
62 -// TODO: implement remove event handling and call *Internal 63 +// TODO: give me a better name
63 /** 64 /**
64 * Manages inventory of infrastructure devices using gossip protocol to distribute 65 * Manages inventory of infrastructure devices using gossip protocol to distribute
65 * information. 66 * information.
...@@ -79,14 +80,18 @@ public class GossipDeviceStore ...@@ -79,14 +80,18 @@ public class GossipDeviceStore
79 // collection of Description given from various providers 80 // collection of Description given from various providers
80 private final ConcurrentMap<DeviceId, 81 private final ConcurrentMap<DeviceId,
81 ConcurrentMap<ProviderId, DeviceDescriptions>> 82 ConcurrentMap<ProviderId, DeviceDescriptions>>
82 - deviceDescs = new ConcurrentHashMap<>(); 83 + deviceDescs = Maps.newConcurrentMap();
83 84
84 // cache of Device and Ports generated by compositing descriptions from providers 85 // cache of Device and Ports generated by compositing descriptions from providers
85 - private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>(); 86 + private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
86 - private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>(); 87 + private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
88 +
89 + // to be updated under Device lock
90 + private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
91 + private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
87 92
88 // available(=UP) devices 93 // available(=UP) devices
89 - private final Set<DeviceId> availableDevices = new HashSet<>(); 94 + private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
90 95
91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92 protected ClockService clockService; 97 protected ClockService clockService;
...@@ -121,7 +126,8 @@ public class GossipDeviceStore ...@@ -121,7 +126,8 @@ public class GossipDeviceStore
121 } 126 }
122 127
123 @Override 128 @Override
124 - public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, 129 + public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
130 + DeviceId deviceId,
125 DeviceDescription deviceDescription) { 131 DeviceDescription deviceDescription) {
126 Timestamp newTimestamp = clockService.getTimestamp(deviceId); 132 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
127 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp); 133 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
...@@ -133,22 +139,26 @@ public class GossipDeviceStore ...@@ -133,22 +139,26 @@ public class GossipDeviceStore
133 return event; 139 return event;
134 } 140 }
135 141
136 - private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, DeviceId deviceId, 142 + private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
143 + DeviceId deviceId,
137 Timestamped<DeviceDescription> deltaDesc) { 144 Timestamped<DeviceDescription> deltaDesc) {
138 145
139 // Collection of DeviceDescriptions for a Device 146 // Collection of DeviceDescriptions for a Device
140 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs 147 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
141 = getDeviceDescriptions(deviceId); 148 = getDeviceDescriptions(deviceId);
142 149
150 + synchronized (providerDescs) {
151 + // locking per device
152 +
153 + if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
154 + log.debug("Ignoring outdated event: {}", deltaDesc);
155 + return null;
156 + }
143 157
144 DeviceDescriptions descs 158 DeviceDescriptions descs
145 = createIfAbsentUnchecked(providerDescs, providerId, 159 = createIfAbsentUnchecked(providerDescs, providerId,
146 new InitDeviceDescs(deltaDesc)); 160 new InitDeviceDescs(deltaDesc));
147 161
148 - // update description
149 - synchronized (providerDescs) {
150 - // locking per device
151 -
152 final Device oldDevice = devices.get(deviceId); 162 final Device oldDevice = devices.get(deviceId);
153 final Device newDevice; 163 final Device newDevice;
154 164
...@@ -163,18 +173,18 @@ public class GossipDeviceStore ...@@ -163,18 +173,18 @@ public class GossipDeviceStore
163 } 173 }
164 if (oldDevice == null) { 174 if (oldDevice == null) {
165 // ADD 175 // ADD
166 - return createDevice(providerId, newDevice); 176 + return createDevice(providerId, newDevice, deltaDesc.timestamp());
167 } else { 177 } else {
168 // UPDATE or ignore (no change or stale) 178 // UPDATE or ignore (no change or stale)
169 - return updateDevice(providerId, oldDevice, newDevice); 179 + return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
170 } 180 }
171 } 181 }
172 } 182 }
173 183
174 // Creates the device and returns the appropriate event if necessary. 184 // Creates the device and returns the appropriate event if necessary.
175 - // Guarded by deviceDescs value (=locking Device) 185 + // Guarded by deviceDescs value (=Device lock)
176 private DeviceEvent createDevice(ProviderId providerId, 186 private DeviceEvent createDevice(ProviderId providerId,
177 - Device newDevice) { 187 + Device newDevice, Timestamp timestamp) {
178 188
179 // update composed device cache 189 // update composed device cache
180 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice); 190 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
...@@ -183,16 +193,17 @@ public class GossipDeviceStore ...@@ -183,16 +193,17 @@ public class GossipDeviceStore
183 providerId, oldDevice, newDevice); 193 providerId, oldDevice, newDevice);
184 194
185 if (!providerId.isAncillary()) { 195 if (!providerId.isAncillary()) {
186 - availableDevices.add(newDevice.id()); 196 + markOnline(newDevice.id(), timestamp);
187 } 197 }
188 198
189 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null); 199 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
190 } 200 }
191 201
192 // Updates the device and returns the appropriate event if necessary. 202 // Updates the device and returns the appropriate event if necessary.
193 - // Guarded by deviceDescs value (=locking Device) 203 + // Guarded by deviceDescs value (=Device lock)
194 private DeviceEvent updateDevice(ProviderId providerId, 204 private DeviceEvent updateDevice(ProviderId providerId,
195 - Device oldDevice, Device newDevice) { 205 + Device oldDevice,
206 + Device newDevice, Timestamp newTimestamp) {
196 207
197 // We allow only certain attributes to trigger update 208 // We allow only certain attributes to trigger update
198 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) || 209 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
...@@ -207,14 +218,14 @@ public class GossipDeviceStore ...@@ -207,14 +218,14 @@ public class GossipDeviceStore
207 , newDevice); 218 , newDevice);
208 } 219 }
209 if (!providerId.isAncillary()) { 220 if (!providerId.isAncillary()) {
210 - availableDevices.add(newDevice.id()); 221 + markOnline(newDevice.id(), newTimestamp);
211 } 222 }
212 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null); 223 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
213 } 224 }
214 225
215 // Otherwise merely attempt to change availability if primary provider 226 // Otherwise merely attempt to change availability if primary provider
216 if (!providerId.isAncillary()) { 227 if (!providerId.isAncillary()) {
217 - boolean added = availableDevices.add(newDevice.id()); 228 + boolean added = markOnline(newDevice.id(), newTimestamp);
218 return !added ? null : 229 return !added ? null :
219 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null); 230 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
220 } 231 }
...@@ -223,11 +234,29 @@ public class GossipDeviceStore ...@@ -223,11 +234,29 @@ public class GossipDeviceStore
223 234
224 @Override 235 @Override
225 public DeviceEvent markOffline(DeviceId deviceId) { 236 public DeviceEvent markOffline(DeviceId deviceId) {
226 - ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs 237 + Timestamp timestamp = clockService.getTimestamp(deviceId);
238 + return markOfflineInternal(deviceId, timestamp);
239 + }
240 +
241 + private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
242 +
243 + Map<ProviderId, DeviceDescriptions> providerDescs
227 = getDeviceDescriptions(deviceId); 244 = getDeviceDescriptions(deviceId);
228 245
229 // locking device 246 // locking device
230 synchronized (providerDescs) { 247 synchronized (providerDescs) {
248 +
249 + // accept off-line if given timestamp is newer than
250 + // the latest Timestamp from Primary provider
251 + DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
252 + Timestamp lastTimestamp = primDescs.getLatestTimestamp();
253 + if (timestamp.compareTo(lastTimestamp) <= 0) {
254 + // outdated event ignore
255 + return null;
256 + }
257 +
258 + offline.put(deviceId, timestamp);
259 +
231 Device device = devices.get(deviceId); 260 Device device = devices.get(deviceId);
232 if (device == null) { 261 if (device == null) {
233 return null; 262 return null;
...@@ -236,14 +265,36 @@ public class GossipDeviceStore ...@@ -236,14 +265,36 @@ public class GossipDeviceStore
236 if (removed) { 265 if (removed) {
237 // TODO: broadcast ... DOWN only? 266 // TODO: broadcast ... DOWN only?
238 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); 267 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
239 -
240 } 268 }
241 return null; 269 return null;
242 } 270 }
243 } 271 }
244 272
273 + /**
274 + * Marks the device as available if the given timestamp is not outdated,
275 + * compared to the time the device has been marked offline.
276 + *
277 + * @param deviceId identifier of the device
278 + * @param timestamp of the event triggering this change.
279 + * @return true if availability change request was accepted and changed the state
280 + */
281 + // Guarded by deviceDescs value (=Device lock)
282 + private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
283 + // accept on-line if given timestamp is newer than
284 + // the latest offline request Timestamp
285 + Timestamp offlineTimestamp = offline.get(deviceId);
286 + if (offlineTimestamp == null ||
287 + offlineTimestamp.compareTo(timestamp) < 0) {
288 +
289 + offline.remove(deviceId);
290 + return availableDevices.add(deviceId);
291 + }
292 + return false;
293 + }
294 +
245 @Override 295 @Override
246 - public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId, 296 + public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
297 + DeviceId deviceId,
247 List<PortDescription> portDescriptions) { 298 List<PortDescription> portDescriptions) {
248 Timestamp newTimestamp = clockService.getTimestamp(deviceId); 299 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
249 300
...@@ -252,7 +303,8 @@ public class GossipDeviceStore ...@@ -252,7 +303,8 @@ public class GossipDeviceStore
252 deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp)); 303 deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
253 } 304 }
254 305
255 - List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, deltaDescs); 306 + List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
307 + new Timestamped<>(portDescriptions, newTimestamp));
256 if (!events.isEmpty()) { 308 if (!events.isEmpty()) {
257 // FIXME: broadcast deltaDesc, UP 309 // FIXME: broadcast deltaDesc, UP
258 log.debug("broadcast deltaDesc"); 310 log.debug("broadcast deltaDesc");
...@@ -261,8 +313,9 @@ public class GossipDeviceStore ...@@ -261,8 +313,9 @@ public class GossipDeviceStore
261 313
262 } 314 }
263 315
264 - private List<DeviceEvent> updatePortsInternal(ProviderId providerId, DeviceId deviceId, 316 + private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
265 - List<Timestamped<PortDescription>> deltaDescs) { 317 + DeviceId deviceId,
318 + Timestamped<List<PortDescription>> portDescriptions) {
266 319
267 Device device = devices.get(deviceId); 320 Device device = devices.get(deviceId);
268 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); 321 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
...@@ -270,30 +323,41 @@ public class GossipDeviceStore ...@@ -270,30 +323,41 @@ public class GossipDeviceStore
270 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); 323 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
271 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); 324 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
272 325
326 + List<DeviceEvent> events = new ArrayList<>();
327 + synchronized (descsMap) {
328 +
329 + if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
330 + log.debug("Ignoring outdated events: {}", portDescriptions);
331 + return null;
332 + }
333 +
273 DeviceDescriptions descs = descsMap.get(providerId); 334 DeviceDescriptions descs = descsMap.get(providerId);
274 // every provider must provide DeviceDescription. 335 // every provider must provide DeviceDescription.
275 checkArgument(descs != null, 336 checkArgument(descs != null,
276 "Device description for Device ID %s from Provider %s was not found", 337 "Device description for Device ID %s from Provider %s was not found",
277 deviceId, providerId); 338 deviceId, providerId);
278 339
279 - List<DeviceEvent> events = new ArrayList<>();
280 - synchronized (descsMap) {
281 Map<PortNumber, Port> ports = getPortMap(deviceId); 340 Map<PortNumber, Port> ports = getPortMap(deviceId);
282 341
342 + final Timestamp newTimestamp = portDescriptions.timestamp();
343 +
283 // Add new ports 344 // Add new ports
284 Set<PortNumber> processed = new HashSet<>(); 345 Set<PortNumber> processed = new HashSet<>();
285 - for (Timestamped<PortDescription> deltaDesc : deltaDescs) { 346 + for (PortDescription portDescription : portDescriptions.value()) {
286 - final PortNumber number = deltaDesc.value().portNumber(); 347 + final PortNumber number = portDescription.portNumber();
348 + processed.add(number);
349 +
287 final Port oldPort = ports.get(number); 350 final Port oldPort = ports.get(number);
288 final Port newPort; 351 final Port newPort;
289 352
353 +
290 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number); 354 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
291 if (existingPortDesc == null || 355 if (existingPortDesc == null ||
292 - deltaDesc == existingPortDesc || 356 + newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
293 - deltaDesc.isNewer(existingPortDesc)) {
294 // on new port or valid update 357 // on new port or valid update
295 // update description 358 // update description
296 - descs.putPortDesc(deltaDesc); 359 + descs.putPortDesc(new Timestamped<>(portDescription,
360 + portDescriptions.timestamp()));
297 newPort = composePort(device, number, descsMap); 361 newPort = composePort(device, number, descsMap);
298 } else { 362 } else {
299 // outdated event, ignored. 363 // outdated event, ignored.
...@@ -303,7 +367,6 @@ public class GossipDeviceStore ...@@ -303,7 +367,6 @@ public class GossipDeviceStore
303 events.add(oldPort == null ? 367 events.add(oldPort == null ?
304 createPort(device, newPort, ports) : 368 createPort(device, newPort, ports) :
305 updatePort(device, oldPort, newPort, ports)); 369 updatePort(device, oldPort, newPort, ports));
306 - processed.add(number);
307 } 370 }
308 371
309 events.addAll(pruneOldPorts(device, ports, processed)); 372 events.addAll(pruneOldPorts(device, ports, processed));
...@@ -313,7 +376,7 @@ public class GossipDeviceStore ...@@ -313,7 +376,7 @@ public class GossipDeviceStore
313 376
314 // Creates a new port based on the port description adds it to the map and 377 // Creates a new port based on the port description adds it to the map and
315 // Returns corresponding event. 378 // Returns corresponding event.
316 - // Guarded by deviceDescs value (=locking Device) 379 + // Guarded by deviceDescs value (=Device lock)
317 private DeviceEvent createPort(Device device, Port newPort, 380 private DeviceEvent createPort(Device device, Port newPort,
318 Map<PortNumber, Port> ports) { 381 Map<PortNumber, Port> ports) {
319 ports.put(newPort.number(), newPort); 382 ports.put(newPort.number(), newPort);
...@@ -322,7 +385,7 @@ public class GossipDeviceStore ...@@ -322,7 +385,7 @@ public class GossipDeviceStore
322 385
323 // Checks if the specified port requires update and if so, it replaces the 386 // Checks if the specified port requires update and if so, it replaces the
324 // existing entry in the map and returns corresponding event. 387 // existing entry in the map and returns corresponding event.
325 - // Guarded by deviceDescs value (=locking Device) 388 + // Guarded by deviceDescs value (=Device lock)
326 private DeviceEvent updatePort(Device device, Port oldPort, 389 private DeviceEvent updatePort(Device device, Port oldPort,
327 Port newPort, 390 Port newPort,
328 Map<PortNumber, Port> ports) { 391 Map<PortNumber, Port> ports) {
...@@ -337,7 +400,7 @@ public class GossipDeviceStore ...@@ -337,7 +400,7 @@ public class GossipDeviceStore
337 400
338 // Prunes the specified list of ports based on which ports are in the 401 // Prunes the specified list of ports based on which ports are in the
339 // processed list and returns list of corresponding events. 402 // processed list and returns list of corresponding events.
340 - // Guarded by deviceDescs value (=locking Device) 403 + // Guarded by deviceDescs value (=Device lock)
341 private List<DeviceEvent> pruneOldPorts(Device device, 404 private List<DeviceEvent> pruneOldPorts(Device device,
342 Map<PortNumber, Port> ports, 405 Map<PortNumber, Port> ports,
343 Set<PortNumber> processed) { 406 Set<PortNumber> processed) {
...@@ -389,13 +452,19 @@ public class GossipDeviceStore ...@@ -389,13 +452,19 @@ public class GossipDeviceStore
389 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); 452 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
390 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); 453 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
391 454
455 + synchronized (descsMap) {
456 +
457 + if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
458 + log.debug("Ignoring outdated event: {}", deltaDesc);
459 + return null;
460 + }
461 +
392 DeviceDescriptions descs = descsMap.get(providerId); 462 DeviceDescriptions descs = descsMap.get(providerId);
393 // assuming all providers must to give DeviceDescription 463 // assuming all providers must to give DeviceDescription
394 checkArgument(descs != null, 464 checkArgument(descs != null,
395 "Device description for Device ID %s from Provider %s was not found", 465 "Device description for Device ID %s from Provider %s was not found",
396 deviceId, providerId); 466 deviceId, providerId);
397 467
398 - synchronized (descsMap) {
399 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId); 468 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
400 final PortNumber number = deltaDesc.value().portNumber(); 469 final PortNumber number = deltaDesc.value().portNumber();
401 final Port oldPort = ports.get(number); 470 final Port oldPort = ports.get(number);
...@@ -443,19 +512,51 @@ public class GossipDeviceStore ...@@ -443,19 +512,51 @@ public class GossipDeviceStore
443 } 512 }
444 513
445 @Override 514 @Override
446 - public DeviceEvent removeDevice(DeviceId deviceId) { 515 + public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
447 - ConcurrentMap<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId); 516 + Timestamp timestamp = clockService.getTimestamp(deviceId);
517 + DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
518 + // TODO: broadcast removal event
519 + return event;
520 + }
521 +
522 + private DeviceEvent removeDeviceInternal(DeviceId deviceId,
523 + Timestamp timestamp) {
524 +
525 + Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
448 synchronized (descs) { 526 synchronized (descs) {
527 + // accept removal request if given timestamp is newer than
528 + // the latest Timestamp from Primary provider
529 + DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
530 + Timestamp lastTimestamp = primDescs.getLatestTimestamp();
531 + if (timestamp.compareTo(lastTimestamp) <= 0) {
532 + // outdated event ignore
533 + return null;
534 + }
535 + removalRequest.put(deviceId, timestamp);
536 +
449 Device device = devices.remove(deviceId); 537 Device device = devices.remove(deviceId);
450 // should DEVICE_REMOVED carry removed ports? 538 // should DEVICE_REMOVED carry removed ports?
451 - devicePorts.get(deviceId).clear(); 539 + Map<PortNumber, Port> ports = devicePorts.get(deviceId);
452 - availableDevices.remove(deviceId); 540 + if (ports != null) {
541 + ports.clear();
542 + }
543 + markOfflineInternal(deviceId, timestamp);
453 descs.clear(); 544 descs.clear();
454 return device == null ? null : 545 return device == null ? null :
455 new DeviceEvent(DEVICE_REMOVED, device, null); 546 new DeviceEvent(DEVICE_REMOVED, device, null);
456 } 547 }
457 } 548 }
458 549
550 + private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
551 + Timestamp removalTimestamp = removalRequest.get(deviceId);
552 + if (removalTimestamp != null &&
553 + removalTimestamp.compareTo(timestampToCheck) >= 0) {
554 + // removalRequest is more recent
555 + return true;
556 + }
557 + return false;
558 + }
559 +
459 /** 560 /**
460 * Returns a Device, merging description given from multiple Providers. 561 * Returns a Device, merging description given from multiple Providers.
461 * 562 *
...@@ -472,7 +573,7 @@ public class GossipDeviceStore ...@@ -472,7 +573,7 @@ public class GossipDeviceStore
472 573
473 DeviceDescriptions desc = providerDescs.get(primary); 574 DeviceDescriptions desc = providerDescs.get(primary);
474 575
475 - DeviceDescription base = desc.getDeviceDesc().value(); 576 + final DeviceDescription base = desc.getDeviceDesc().value();
476 Type type = base.type(); 577 Type type = base.type();
477 String manufacturer = base.manufacturer(); 578 String manufacturer = base.manufacturer();
478 String hwVersion = base.hwVersion(); 579 String hwVersion = base.hwVersion();
...@@ -545,7 +646,7 @@ public class GossipDeviceStore ...@@ -545,7 +646,7 @@ public class GossipDeviceStore
545 * @return primary ProviderID, or randomly chosen one if none exists 646 * @return primary ProviderID, or randomly chosen one if none exists
546 */ 647 */
547 private ProviderId pickPrimaryPID( 648 private ProviderId pickPrimaryPID(
548 - ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) { 649 + Map<ProviderId, DeviceDescriptions> providerDescs) {
549 ProviderId fallBackPrimary = null; 650 ProviderId fallBackPrimary = null;
550 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { 651 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
551 if (!e.getKey().isAncillary()) { 652 if (!e.getKey().isAncillary()) {
...@@ -558,6 +659,12 @@ public class GossipDeviceStore ...@@ -558,6 +659,12 @@ public class GossipDeviceStore
558 return fallBackPrimary; 659 return fallBackPrimary;
559 } 660 }
560 661
662 + private DeviceDescriptions getPrimaryDescriptions(
663 + Map<ProviderId, DeviceDescriptions> providerDescs) {
664 + ProviderId pid = pickPrimaryPID(providerDescs);
665 + return providerDescs.get(pid);
666 + }
667 +
561 public static final class InitDeviceDescs 668 public static final class InitDeviceDescs
562 implements ConcurrentInitializer<DeviceDescriptions> { 669 implements ConcurrentInitializer<DeviceDescriptions> {
563 670
...@@ -586,6 +693,16 @@ public class GossipDeviceStore ...@@ -586,6 +693,16 @@ public class GossipDeviceStore
586 this.portDescs = new ConcurrentHashMap<>(); 693 this.portDescs = new ConcurrentHashMap<>();
587 } 694 }
588 695
696 + Timestamp getLatestTimestamp() {
697 + Timestamp latest = deviceDesc.get().timestamp();
698 + for (Timestamped<PortDescription> desc : portDescs.values()) {
699 + if (desc.timestamp().compareTo(latest) > 0) {
700 + latest = desc.timestamp();
701 + }
702 + }
703 + return latest;
704 + }
705 +
589 public Timestamped<DeviceDescription> getDeviceDesc() { 706 public Timestamped<DeviceDescription> getDeviceDesc() {
590 return deviceDesc.get(); 707 return deviceDesc.get();
591 } 708 }
......
...@@ -2,6 +2,8 @@ package org.onlab.onos.store.trivial.impl; ...@@ -2,6 +2,8 @@ package org.onlab.onos.store.trivial.impl;
2 2
3 import com.google.common.collect.FluentIterable; 3 import com.google.common.collect.FluentIterable;
4 import com.google.common.collect.ImmutableList; 4 import com.google.common.collect.ImmutableList;
5 +import com.google.common.collect.Maps;
6 +import com.google.common.collect.Sets;
5 7
6 import org.apache.commons.lang3.concurrent.ConcurrentException; 8 import org.apache.commons.lang3.concurrent.ConcurrentException;
7 import org.apache.commons.lang3.concurrent.ConcurrentInitializer; 9 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
...@@ -32,7 +34,6 @@ import org.onlab.util.NewConcurrentHashMap; ...@@ -32,7 +34,6 @@ import org.onlab.util.NewConcurrentHashMap;
32 import org.slf4j.Logger; 34 import org.slf4j.Logger;
33 35
34 import java.util.ArrayList; 36 import java.util.ArrayList;
35 -import java.util.Collection;
36 import java.util.Collections; 37 import java.util.Collections;
37 import java.util.HashSet; 38 import java.util.HashSet;
38 import java.util.Iterator; 39 import java.util.Iterator;
...@@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference; ...@@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference;
48 import static com.google.common.base.Preconditions.checkArgument; 49 import static com.google.common.base.Preconditions.checkArgument;
49 import static com.google.common.base.Preconditions.checkNotNull; 50 import static com.google.common.base.Preconditions.checkNotNull;
50 import static com.google.common.base.Predicates.notNull; 51 import static com.google.common.base.Predicates.notNull;
52 +import static com.google.common.base.Verify.verify;
51 import static org.onlab.onos.net.device.DeviceEvent.Type.*; 53 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
52 import static org.slf4j.LoggerFactory.getLogger; 54 import static org.slf4j.LoggerFactory.getLogger;
53 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; 55 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
...@@ -71,14 +73,14 @@ public class SimpleDeviceStore ...@@ -71,14 +73,14 @@ public class SimpleDeviceStore
71 // collection of Description given from various providers 73 // collection of Description given from various providers
72 private final ConcurrentMap<DeviceId, 74 private final ConcurrentMap<DeviceId,
73 ConcurrentMap<ProviderId, DeviceDescriptions>> 75 ConcurrentMap<ProviderId, DeviceDescriptions>>
74 - deviceDescs = new ConcurrentHashMap<>(); 76 + deviceDescs = Maps.newConcurrentMap();
75 77
76 // cache of Device and Ports generated by compositing descriptions from providers 78 // cache of Device and Ports generated by compositing descriptions from providers
77 - private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>(); 79 + private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
78 - private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>(); 80 + private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
79 81
80 // available(=UP) devices 82 // available(=UP) devices
81 - private final Set<DeviceId> availableDevices = new HashSet<>(); 83 + private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
82 84
83 85
84 @Activate 86 @Activate
...@@ -88,6 +90,10 @@ public class SimpleDeviceStore ...@@ -88,6 +90,10 @@ public class SimpleDeviceStore
88 90
89 @Deactivate 91 @Deactivate
90 public void deactivate() { 92 public void deactivate() {
93 + deviceDescs.clear();
94 + devices.clear();
95 + devicePorts.clear();
96 + availableDevices.clear();
91 log.info("Stopped"); 97 log.info("Stopped");
92 } 98 }
93 99
...@@ -107,17 +113,21 @@ public class SimpleDeviceStore ...@@ -107,17 +113,21 @@ public class SimpleDeviceStore
107 } 113 }
108 114
109 @Override 115 @Override
110 - public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, 116 + public DeviceEvent createOrUpdateDevice(ProviderId providerId,
117 + DeviceId deviceId,
111 DeviceDescription deviceDescription) { 118 DeviceDescription deviceDescription) {
119 +
112 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs 120 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
113 = getDeviceDescriptions(deviceId); 121 = getDeviceDescriptions(deviceId);
114 122
115 - Device oldDevice = devices.get(deviceId); 123 + synchronized (providerDescs) {
124 + // locking per device
116 125
117 DeviceDescriptions descs 126 DeviceDescriptions descs
118 = createIfAbsentUnchecked(providerDescs, providerId, 127 = createIfAbsentUnchecked(providerDescs, providerId,
119 new InitDeviceDescs(deviceDescription)); 128 new InitDeviceDescs(deviceDescription));
120 129
130 + Device oldDevice = devices.get(deviceId);
121 // update description 131 // update description
122 descs.putDeviceDesc(deviceDescription); 132 descs.putDeviceDesc(deviceDescription);
123 Device newDevice = composeDevice(deviceId, providerDescs); 133 Device newDevice = composeDevice(deviceId, providerDescs);
...@@ -130,22 +140,27 @@ public class SimpleDeviceStore ...@@ -130,22 +140,27 @@ public class SimpleDeviceStore
130 return updateDevice(providerId, oldDevice, newDevice); 140 return updateDevice(providerId, oldDevice, newDevice);
131 } 141 }
132 } 142 }
143 + }
133 144
134 // Creates the device and returns the appropriate event if necessary. 145 // Creates the device and returns the appropriate event if necessary.
146 + // Guarded by deviceDescs value (=Device lock)
135 private DeviceEvent createDevice(ProviderId providerId, Device newDevice) { 147 private DeviceEvent createDevice(ProviderId providerId, Device newDevice) {
136 148
137 // update composed device cache 149 // update composed device cache
138 - synchronized (this) { 150 + Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
139 - devices.putIfAbsent(newDevice.id(), newDevice); 151 + verify(oldDevice == null,
152 + "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
153 + providerId, oldDevice, newDevice);
154 +
140 if (!providerId.isAncillary()) { 155 if (!providerId.isAncillary()) {
141 availableDevices.add(newDevice.id()); 156 availableDevices.add(newDevice.id());
142 } 157 }
143 - }
144 158
145 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null); 159 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
146 } 160 }
147 161
148 // Updates the device and returns the appropriate event if necessary. 162 // Updates the device and returns the appropriate event if necessary.
163 + // Guarded by deviceDescs value (=Device lock)
149 private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) { 164 private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) {
150 165
151 // We allow only certain attributes to trigger update 166 // We allow only certain attributes to trigger update
...@@ -153,70 +168,87 @@ public class SimpleDeviceStore ...@@ -153,70 +168,87 @@ public class SimpleDeviceStore
153 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) || 168 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
154 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) { 169 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
155 170
156 - synchronized (this) { 171 + boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
157 - devices.replace(newDevice.id(), oldDevice, newDevice); 172 + if (!replaced) {
173 + verify(replaced,
174 + "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
175 + providerId, oldDevice, devices.get(newDevice.id())
176 + , newDevice);
177 + }
158 if (!providerId.isAncillary()) { 178 if (!providerId.isAncillary()) {
159 availableDevices.add(newDevice.id()); 179 availableDevices.add(newDevice.id());
160 } 180 }
161 - }
162 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null); 181 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
163 } 182 }
164 183
165 // Otherwise merely attempt to change availability if primary provider 184 // Otherwise merely attempt to change availability if primary provider
166 if (!providerId.isAncillary()) { 185 if (!providerId.isAncillary()) {
167 - synchronized (this) {
168 boolean added = availableDevices.add(newDevice.id()); 186 boolean added = availableDevices.add(newDevice.id());
169 return !added ? null : 187 return !added ? null :
170 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null); 188 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
171 } 189 }
172 - }
173 return null; 190 return null;
174 } 191 }
175 192
176 @Override 193 @Override
177 public DeviceEvent markOffline(DeviceId deviceId) { 194 public DeviceEvent markOffline(DeviceId deviceId) {
178 - synchronized (this) { 195 + ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
196 + = getDeviceDescriptions(deviceId);
197 +
198 + // locking device
199 + synchronized (providerDescs) {
179 Device device = devices.get(deviceId); 200 Device device = devices.get(deviceId);
180 - boolean removed = (device != null) && availableDevices.remove(deviceId); 201 + if (device == null) {
181 - return !removed ? null : 202 + return null;
182 - new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); 203 + }
204 + boolean removed = availableDevices.remove(deviceId);
205 + if (removed) {
206 + // TODO: broadcast ... DOWN only?
207 + return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
208 + }
209 + return null;
183 } 210 }
184 } 211 }
185 212
186 @Override 213 @Override
187 - public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId, 214 + public List<DeviceEvent> updatePorts(ProviderId providerId,
215 + DeviceId deviceId,
188 List<PortDescription> portDescriptions) { 216 List<PortDescription> portDescriptions) {
189 217
190 - // TODO: implement multi-provider
191 Device device = devices.get(deviceId); 218 Device device = devices.get(deviceId);
192 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); 219 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
193 220
194 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); 221 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
195 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); 222 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
196 223
224 + List<DeviceEvent> events = new ArrayList<>();
225 + synchronized (descsMap) {
197 DeviceDescriptions descs = descsMap.get(providerId); 226 DeviceDescriptions descs = descsMap.get(providerId);
227 + // every provider must provide DeviceDescription.
198 checkArgument(descs != null, 228 checkArgument(descs != null,
199 "Device description for Device ID %s from Provider %s was not found", 229 "Device description for Device ID %s from Provider %s was not found",
200 deviceId, providerId); 230 deviceId, providerId);
201 231
202 - 232 + Map<PortNumber, Port> ports = getPortMap(deviceId);
203 - List<DeviceEvent> events = new ArrayList<>();
204 - synchronized (this) {
205 - ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
206 233
207 // Add new ports 234 // Add new ports
208 Set<PortNumber> processed = new HashSet<>(); 235 Set<PortNumber> processed = new HashSet<>();
209 for (PortDescription portDescription : portDescriptions) { 236 for (PortDescription portDescription : portDescriptions) {
210 - PortNumber number = portDescription.portNumber(); 237 + final PortNumber number = portDescription.portNumber();
211 - Port oldPort = ports.get(number); 238 + processed.add(portDescription.portNumber());
239 +
240 + final Port oldPort = ports.get(number);
241 + final Port newPort;
242 +
243 +// event suppression hook?
244 +
212 // update description 245 // update description
213 descs.putPortDesc(portDescription); 246 descs.putPortDesc(portDescription);
214 - Port newPort = composePort(device, number, descsMap); 247 + newPort = composePort(device, number, descsMap);
215 248
216 events.add(oldPort == null ? 249 events.add(oldPort == null ?
217 createPort(device, newPort, ports) : 250 createPort(device, newPort, ports) :
218 updatePort(device, oldPort, newPort, ports)); 251 updatePort(device, oldPort, newPort, ports));
219 - processed.add(portDescription.portNumber());
220 } 252 }
221 253
222 events.addAll(pruneOldPorts(device, ports, processed)); 254 events.addAll(pruneOldPorts(device, ports, processed));
...@@ -226,17 +258,19 @@ public class SimpleDeviceStore ...@@ -226,17 +258,19 @@ public class SimpleDeviceStore
226 258
227 // Creates a new port based on the port description adds it to the map and 259 // Creates a new port based on the port description adds it to the map and
228 // Returns corresponding event. 260 // Returns corresponding event.
261 + // Guarded by deviceDescs value (=Device lock)
229 private DeviceEvent createPort(Device device, Port newPort, 262 private DeviceEvent createPort(Device device, Port newPort,
230 - ConcurrentMap<PortNumber, Port> ports) { 263 + Map<PortNumber, Port> ports) {
231 ports.put(newPort.number(), newPort); 264 ports.put(newPort.number(), newPort);
232 return new DeviceEvent(PORT_ADDED, device, newPort); 265 return new DeviceEvent(PORT_ADDED, device, newPort);
233 } 266 }
234 267
235 // Checks if the specified port requires update and if so, it replaces the 268 // Checks if the specified port requires update and if so, it replaces the
236 // existing entry in the map and returns corresponding event. 269 // existing entry in the map and returns corresponding event.
270 + // Guarded by deviceDescs value (=Device lock)
237 private DeviceEvent updatePort(Device device, Port oldPort, 271 private DeviceEvent updatePort(Device device, Port oldPort,
238 Port newPort, 272 Port newPort,
239 - ConcurrentMap<PortNumber, Port> ports) { 273 + Map<PortNumber, Port> ports) {
240 if (oldPort.isEnabled() != newPort.isEnabled() || 274 if (oldPort.isEnabled() != newPort.isEnabled() ||
241 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) { 275 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
242 276
...@@ -248,6 +282,7 @@ public class SimpleDeviceStore ...@@ -248,6 +282,7 @@ public class SimpleDeviceStore
248 282
249 // Prunes the specified list of ports based on which ports are in the 283 // Prunes the specified list of ports based on which ports are in the
250 // processed list and returns list of corresponding events. 284 // processed list and returns list of corresponding events.
285 + // Guarded by deviceDescs value (=Device lock)
251 private List<DeviceEvent> pruneOldPorts(Device device, 286 private List<DeviceEvent> pruneOldPorts(Device device,
252 Map<PortNumber, Port> ports, 287 Map<PortNumber, Port> ports,
253 Set<PortNumber> processed) { 288 Set<PortNumber> processed) {
...@@ -264,12 +299,6 @@ public class SimpleDeviceStore ...@@ -264,12 +299,6 @@ public class SimpleDeviceStore
264 return events; 299 return events;
265 } 300 }
266 301
267 - private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
268 - DeviceId deviceId) {
269 - return createIfAbsentUnchecked(deviceDescs, deviceId,
270 - NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
271 - }
272 -
273 // Gets the map of ports for the specified device; if one does not already 302 // Gets the map of ports for the specified device; if one does not already
274 // exist, it creates and registers a new one. 303 // exist, it creates and registers a new one.
275 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) { 304 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
...@@ -277,8 +306,14 @@ public class SimpleDeviceStore ...@@ -277,8 +306,14 @@ public class SimpleDeviceStore
277 NewConcurrentHashMap.<PortNumber, Port>ifNeeded()); 306 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
278 } 307 }
279 308
309 + private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
310 + DeviceId deviceId) {
311 + return createIfAbsentUnchecked(deviceDescs, deviceId,
312 + NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
313 + }
314 +
280 @Override 315 @Override
281 - public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId, 316 + public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
282 PortDescription portDescription) { 317 PortDescription portDescription) {
283 Device device = devices.get(deviceId); 318 Device device = devices.get(deviceId);
284 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); 319 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
...@@ -286,19 +321,22 @@ public class SimpleDeviceStore ...@@ -286,19 +321,22 @@ public class SimpleDeviceStore
286 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); 321 ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
287 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); 322 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
288 323
324 + synchronized (descsMap) {
289 DeviceDescriptions descs = descsMap.get(providerId); 325 DeviceDescriptions descs = descsMap.get(providerId);
290 // assuming all providers must to give DeviceDescription 326 // assuming all providers must to give DeviceDescription
291 checkArgument(descs != null, 327 checkArgument(descs != null,
292 "Device description for Device ID %s from Provider %s was not found", 328 "Device description for Device ID %s from Provider %s was not found",
293 deviceId, providerId); 329 deviceId, providerId);
294 330
295 - synchronized (this) {
296 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId); 331 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
297 final PortNumber number = portDescription.portNumber(); 332 final PortNumber number = portDescription.portNumber();
298 - Port oldPort = ports.get(number); 333 + final Port oldPort = ports.get(number);
334 + final Port newPort;
335 +
299 // update description 336 // update description
300 descs.putPortDesc(portDescription); 337 descs.putPortDesc(portDescription);
301 - Port newPort = composePort(device, number, descsMap); 338 + newPort = composePort(device, number, descsMap);
339 +
302 if (oldPort == null) { 340 if (oldPort == null) {
303 return createPort(device, newPort, ports); 341 return createPort(device, newPort, ports);
304 } else { 342 } else {
...@@ -333,7 +371,7 @@ public class SimpleDeviceStore ...@@ -333,7 +371,7 @@ public class SimpleDeviceStore
333 synchronized (descs) { 371 synchronized (descs) {
334 Device device = devices.remove(deviceId); 372 Device device = devices.remove(deviceId);
335 // should DEVICE_REMOVED carry removed ports? 373 // should DEVICE_REMOVED carry removed ports?
336 - ConcurrentMap<PortNumber, Port> ports = devicePorts.get(deviceId); 374 + Map<PortNumber, Port> ports = devicePorts.get(deviceId);
337 if (ports != null) { 375 if (ports != null) {
338 ports.clear(); 376 ports.clear();
339 } 377 }
...@@ -360,14 +398,14 @@ public class SimpleDeviceStore ...@@ -360,14 +398,14 @@ public class SimpleDeviceStore
360 398
361 DeviceDescriptions desc = providerDescs.get(primary); 399 DeviceDescriptions desc = providerDescs.get(primary);
362 400
363 - // base 401 + final DeviceDescription base = desc.getDeviceDesc();
364 - Type type = desc.getDeviceDesc().type(); 402 + Type type = base.type();
365 - String manufacturer = desc.getDeviceDesc().manufacturer(); 403 + String manufacturer = base.manufacturer();
366 - String hwVersion = desc.getDeviceDesc().hwVersion(); 404 + String hwVersion = base.hwVersion();
367 - String swVersion = desc.getDeviceDesc().swVersion(); 405 + String swVersion = base.swVersion();
368 - String serialNumber = desc.getDeviceDesc().serialNumber(); 406 + String serialNumber = base.serialNumber();
369 DefaultAnnotations annotations = DefaultAnnotations.builder().build(); 407 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
370 - annotations = merge(annotations, desc.getDeviceDesc().annotations()); 408 + annotations = merge(annotations, base.annotations());
371 409
372 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { 410 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
373 if (e.getKey().equals(primary)) { 411 if (e.getKey().equals(primary)) {
...@@ -386,7 +424,14 @@ public class SimpleDeviceStore ...@@ -386,7 +424,14 @@ public class SimpleDeviceStore
386 hwVersion, swVersion, serialNumber, annotations); 424 hwVersion, swVersion, serialNumber, annotations);
387 } 425 }
388 426
389 - // probably want composePort"s" also 427 + /**
428 + * Returns a Port, merging description given from multiple Providers.
429 + *
430 + * @param device device the port is on
431 + * @param number port number
432 + * @param providerDescs Collection of Descriptions from multiple providers
433 + * @return Port instance
434 + */
390 private Port composePort(Device device, PortNumber number, 435 private Port composePort(Device device, PortNumber number,
391 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) { 436 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
392 437
...@@ -441,7 +486,9 @@ public class SimpleDeviceStore ...@@ -441,7 +486,9 @@ public class SimpleDeviceStore
441 486
442 public static final class InitDeviceDescs 487 public static final class InitDeviceDescs
443 implements ConcurrentInitializer<DeviceDescriptions> { 488 implements ConcurrentInitializer<DeviceDescriptions> {
489 +
444 private final DeviceDescription deviceDesc; 490 private final DeviceDescription deviceDesc;
491 +
445 public InitDeviceDescs(DeviceDescription deviceDesc) { 492 public InitDeviceDescs(DeviceDescription deviceDesc) {
446 this.deviceDesc = checkNotNull(deviceDesc); 493 this.deviceDesc = checkNotNull(deviceDesc);
447 } 494 }
...@@ -456,8 +503,6 @@ public class SimpleDeviceStore ...@@ -456,8 +503,6 @@ public class SimpleDeviceStore
456 * Collection of Description of a Device and it's Ports given from a Provider. 503 * Collection of Description of a Device and it's Ports given from a Provider.
457 */ 504 */
458 private static class DeviceDescriptions { 505 private static class DeviceDescriptions {
459 - // private final DeviceId id;
460 - // private final ProviderId pid;
461 506
462 private final AtomicReference<DeviceDescription> deviceDesc; 507 private final AtomicReference<DeviceDescription> deviceDesc;
463 private final ConcurrentMap<PortNumber, PortDescription> portDescs; 508 private final ConcurrentMap<PortNumber, PortDescription> portDescs;
...@@ -475,10 +520,6 @@ public class SimpleDeviceStore ...@@ -475,10 +520,6 @@ public class SimpleDeviceStore
475 return portDescs.get(number); 520 return portDescs.get(number);
476 } 521 }
477 522
478 - public Collection<PortDescription> getPortDescs() {
479 - return Collections.unmodifiableCollection(portDescs.values());
480 - }
481 -
482 /** 523 /**
483 * Puts DeviceDescription, merging annotations as necessary. 524 * Puts DeviceDescription, merging annotations as necessary.
484 * 525 *
......
...@@ -129,6 +129,12 @@ ...@@ -129,6 +129,12 @@
129 <version>1.9.13</version> 129 <version>1.9.13</version>
130 </dependency> 130 </dependency>
131 131
132 + <dependency>
133 + <groupId>org.easymock</groupId>
134 + <artifactId>easymock</artifactId>
135 + <version>3.2</version>
136 + <scope>test</scope>
137 + </dependency>
132 138
133 <!-- Web related --> 139 <!-- Web related -->
134 <dependency> 140 <dependency>
......
...@@ -45,12 +45,12 @@ public class KryoSerializer implements Serializer { ...@@ -45,12 +45,12 @@ public class KryoSerializer implements Serializer {
45 } 45 }
46 46
47 @Override 47 @Override
48 - public <T> T deserialize(ByteBuffer buffer) { 48 + public <T> T decode(ByteBuffer buffer) {
49 return serializerPool.deserialize(buffer); 49 return serializerPool.deserialize(buffer);
50 } 50 }
51 51
52 @Override 52 @Override
53 - public void serialize(Object obj, ByteBuffer buffer) { 53 + public void encode(Object obj, ByteBuffer buffer) {
54 serializerPool.serialize(obj, buffer); 54 serializerPool.serialize(obj, buffer);
55 } 55 }
56 } 56 }
......
...@@ -48,7 +48,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -48,7 +48,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
48 checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version"); 48 checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
49 checkpoint(DecoderState.READ_CONTENT); 49 checkpoint(DecoderState.READ_CONTENT);
50 case READ_CONTENT: 50 case READ_CONTENT:
51 - InternalMessage message = serializer.deserialize(buffer.readBytes(contentLength).nioBuffer()); 51 + InternalMessage message = serializer.decode(buffer.readBytes(contentLength).nioBuffer());
52 message.setMessagingService(messagingService); 52 message.setMessagingService(messagingService);
53 out.add(message); 53 out.add(message);
54 checkpoint(DecoderState.READ_HEADER_VERSION); 54 checkpoint(DecoderState.READ_HEADER_VERSION);
......
...@@ -24,20 +24,18 @@ public interface Serializer { ...@@ -24,20 +24,18 @@ public interface Serializer {
24 public byte[] encode(Object data); 24 public byte[] encode(Object data);
25 25
26 /** 26 /**
27 - * Serializes the specified object into bytes using one of the 27 + * Encodes the specified POJO into a byte buffer.
28 - * pre-registered serializers.
29 * 28 *
30 - * @param obj object to be serialized 29 + * @param data POJO to be encoded
31 * @param buffer to write serialized bytes 30 * @param buffer to write serialized bytes
32 */ 31 */
33 - public void serialize(final Object obj, ByteBuffer buffer); 32 + public void encode(final Object data, ByteBuffer buffer);
34 33
35 /** 34 /**
36 - * Deserializes the specified bytes into an object using one of the 35 + * Decodes the specified byte buffer to a POJO.
37 - * pre-registered serializers.
38 * 36 *
39 - * @param buffer bytes to be deserialized 37 + * @param buffer bytes to be decoded
40 - * @return deserialized object 38 + * @return POJO
41 */ 39 */
42 - public <T> T deserialize(final ByteBuffer buffer); 40 + public <T> T decode(final ByteBuffer buffer);
43 } 41 }
......