tom

Reworked AbstractDistributedStore RemoteEventHandler to allow delegating various…

… events to accommodate cache-specific behaviours.
......@@ -8,6 +8,11 @@ import org.onlab.onos.event.Event;
*/
public interface StoreDelegate<E extends Event> {
/**
* Notifies the delegate via the specified event.
*
* @param event store generated event
*/
void notify(E event);
}
......
......@@ -160,7 +160,7 @@ public class DistributedDeviceManagerTest {
public void deviceDisconnected() {
connectDevice(DID1, SW1);
connectDevice(DID2, SW1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED);
assertTrue("device should be available", service.isAvailable(DID1));
// Disconnect
......@@ -179,7 +179,7 @@ public class DistributedDeviceManagerTest {
@Test
public void deviceUpdated() {
connectDevice(DID1, SW1);
validateEvents(DEVICE_ADDED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED);
connectDevice(DID1, SW2);
validateEvents(DEVICE_UPDATED);
......@@ -199,7 +199,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P2, true));
pds.add(new DefaultPortDescription(P3, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
pds.clear();
pds.add(new DefaultPortDescription(P1, false));
......@@ -215,7 +215,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false));
validateEvents(PORT_UPDATED);
......@@ -230,7 +230,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
assertEquals("wrong port count", 2, service.getPorts(DID1).size());
Port port = service.getPort(DID1, P1);
......@@ -244,10 +244,10 @@ public class DistributedDeviceManagerTest {
connectDevice(DID2, SW2);
assertEquals("incorrect device count", 2, service.getDeviceCount());
admin.removeDevice(DID1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED, DEVICE_REMOVED);
assertNull("device should not be found", service.getDevice(DID1));
assertNotNull("device should be found", service.getDevice(DID2));
assertEquals("incorrect device count", 1, service.getDeviceCount());
}
protected void validateEvents(Enum... types) {
......
......@@ -7,7 +7,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ISet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -38,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
......@@ -82,7 +82,7 @@ public class DistributedDeviceStore
= new OptionalCacheLoader<>(storeService, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
rawDevices.addEntryListener(new RemoteEventHandler<>(devices), includeValue);
rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
// TODO cache availableDevices
availableDevices = theInstance.getSet("availableDevices");
......@@ -92,35 +92,25 @@ public class DistributedDeviceStore
= new OptionalCacheLoader<>(storeService, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
rawDevicePorts.addEntryListener(new RemoteEventHandler<>(devicePorts), includeValue);
rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
loadDeviceCache();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public int getDeviceCount() {
// TODO IMap size or cache size?
return rawDevices.size();
return devices.asMap().size();
}
@Override
public Iterable<Device> getDevices() {
// TODO Revisit if we ever need to do this.
// log.info("{}:{}", rawMap.size(), cache.size());
// if (rawMap.size() != cache.size()) {
// for (Entry<byte[], byte[]> e : rawMap.entrySet()) {
// final DeviceId key = deserialize(e.getKey());
// final DefaultDevice val = deserialize(e.getValue());
// cache.put(key, val);
// }
// }
// TODO builder v.s. copyOf. Guava semms to be using copyOf?
Builder<Device> builder = ImmutableSet.builder();
for (Optional<DefaultDevice> e : devices.asMap().values()) {
......@@ -131,6 +121,17 @@ public class DistributedDeviceStore
return builder.build();
}
private void loadDeviceCache() {
log.info("{}:{}", rawDevices.size(), devices.size());
if (rawDevices.size() != devices.size()) {
for (Map.Entry<byte[], byte[]> e : rawDevices.entrySet()) {
final DeviceId key = deserialize(e.getKey());
final DefaultDevice val = deserialize(e.getValue());
devices.put(key, Optional.of(val));
}
}
}
@Override
public Device getDevice(DeviceId deviceId) {
// TODO revisit if ignoring exception is safe.
......@@ -162,7 +163,7 @@ public class DistributedDeviceStore
availableDevices.add(deviceIdBytes);
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
return new DeviceEvent(DEVICE_ADDED, device, null);
}
// Updates the device and returns the appropriate event if necessary.
......@@ -343,5 +344,48 @@ public class DistributedDeviceStore
}
}
private class RemoteDeviceEventHandler extends RemoteEventHandler<DeviceId, DefaultDevice> {
public RemoteDeviceEventHandler(LoadingCache<DeviceId, Optional<DefaultDevice>> cache) {
super(cache);
}
@Override
protected void onAdd(DeviceId deviceId, DefaultDevice device) {
delegate.notify(new DeviceEvent(DEVICE_ADDED, device));
}
@Override
protected void onRemove(DeviceId deviceId, DefaultDevice device) {
delegate.notify(new DeviceEvent(DEVICE_REMOVED, device));
}
@Override
protected void onUpdate(DeviceId deviceId, DefaultDevice device) {
delegate.notify(new DeviceEvent(DEVICE_UPDATED, device));
}
}
private class RemotePortEventHandler extends RemoteEventHandler<DeviceId, Map<PortNumber, Port>> {
public RemotePortEventHandler(LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> cache) {
super(cache);
}
@Override
protected void onAdd(DeviceId deviceId, Map<PortNumber, Port> ports) {
// delegate.notify(new DeviceEvent(PORT_ADDED, getDevice(deviceId)));
}
@Override
protected void onRemove(DeviceId deviceId, Map<PortNumber, Port> ports) {
// delegate.notify(new DeviceEvent(PORT_REMOVED, getDevice(deviceId)));
}
@Override
protected void onUpdate(DeviceId deviceId, Map<PortNumber, Port> ports) {
// delegate.notify(new DeviceEvent(PORT_UPDATED, getDevice(deviceId)));
}
}
// TODO cache serialized DeviceID if we suffer from serialization cost
}
......
......@@ -6,7 +6,6 @@ import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MapEvent;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
......@@ -67,7 +66,7 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public final class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private LoadingCache<K, Optional<V>> cache;
......@@ -86,26 +85,58 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
K key = deserialize(event.getKey());
V newVal = deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
cache.asMap().putIfAbsent(key, newValue);
onAdd(key, newVal);
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
K key = storeService.<K>deserialize(event.getKey());
final V oldVal = storeService.<V>deserialize(event.getOldValue());
K key = deserialize(event.getKey());
V oldVal = deserialize(event.getOldValue());
Optional<V> oldValue = Optional.fromNullable(oldVal);
final V newVal = storeService.<V>deserialize(event.getValue());
V newVal = deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
cache.asMap().replace(key, oldValue, newValue);
onUpdate(key, newVal);
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
cache.invalidate(storeService.<K>deserialize(event.getKey()));
K key = deserialize(event.getKey());
V val = deserialize(event.getValue());
cache.invalidate(key);
onRemove(key, val);
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
K key = storeService.<K>deserialize(event.getKey());
final V newVal = storeService.<V>deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
cache.asMap().putIfAbsent(key, newValue);
/**
* Cache entry addition hook.
*
* @param key new key
* @param newVal new value
*/
protected void onAdd(K key, V newVal) {
}
/**
* Cache entry update hook.
*
* @param key new key
* @param newVal new value
*/
protected void onUpdate(K key, V newVal) {
}
/**
* Cache entry remove hook.
*
* @param key new key
* @param val old value
*/
protected void onRemove(K key, V val) {
}
}
......