Madan Jampani
Committed by Gerrit Code Review

Moving group store to consistent map

Change-Id: Id3c23c0cd9d7c713bceffc7bd9125aa3de99c45e
......@@ -56,11 +56,12 @@ import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.ArrayList;
......@@ -70,6 +71,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
......@@ -79,7 +81,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
......@@ -113,12 +114,12 @@ public class DistributedGroupStore
protected MastershipService mastershipService;
// Per device group table with (device id + app cookie) as key
private EventuallyConsistentMap<GroupStoreKeyMapKey,
private ConsistentMap<GroupStoreKeyMapKey,
StoredGroupEntry> groupStoreEntriesByKey = null;
// Per device group table with (device id + group id) as key
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
groupEntriesById = new ConcurrentHashMap<>();
private EventuallyConsistentMap<GroupStoreKeyMapKey,
private ConsistentMap<GroupStoreKeyMapKey,
StoredGroupEntry> auditPendingReqQueue = null;
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
extraneousGroupEntriesById = new ConcurrentHashMap<>();
......@@ -131,8 +132,6 @@ public class DistributedGroupStore
private KryoNamespace.Builder kryoBuilder = null;
private final AtomicLong sequenceNumber = new AtomicLong(0);
private KryoNamespace clusterMsgSerializer;
@Activate
......@@ -169,29 +168,21 @@ public class DistributedGroupStore
this::process,
messageHandlingExecutor);
log.debug("Creating EC map groupstorekeymap");
EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
log.debug("Creating Consistent map onos-group-store-keymap");
groupStoreEntriesByKey = keyMapBuilder
.withName("groupstorekeymap")
.withSerializer(kryoBuilder)
.withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
sequenceNumber.getAndIncrement()))
groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
.withName("onos-group-store-keymap")
.withSerializer(Serializer.using(kryoBuilder.build()))
.build();
groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
log.debug("Current size of groupstorekeymap:{}",
groupStoreEntriesByKey.size());
log.debug("Creating EC map pendinggroupkeymap");
EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
log.debug("Creating Consistent map pendinggroupkeymap");
auditPendingReqQueue = auditMapBuilder
.withName("pendinggroupkeymap")
.withSerializer(kryoBuilder)
.withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
sequenceNumber.getAndIncrement()))
auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
.withName("onos-pending-group-keymap")
.withSerializer(Serializer.using(kryoBuilder.build()))
.build();
log.debug("Current size of pendinggroupkeymap:{}",
auditPendingReqQueue.size());
......@@ -222,9 +213,9 @@ public class DistributedGroupStore
*
* @return Map representing group key table.
*/
private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
private Map<GroupStoreKeyMapKey, StoredGroupEntry>
getGroupStoreKeyMap() {
return groupStoreEntriesByKey;
return groupStoreEntriesByKey.asJavaMap();
}
/**
......@@ -243,9 +234,9 @@ public class DistributedGroupStore
*
* @return Map representing group key table.
*/
private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
private Map<GroupStoreKeyMapKey, StoredGroupEntry>
getPendingGroupKeyTable() {
return auditPendingReqQueue;
return auditPendingReqQueue.asJavaMap();
}
/**
......@@ -445,7 +436,7 @@ public class DistributedGroupStore
groupDesc.deviceId());
StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
group.setState(GroupState.WAITING_AUDIT_COMPLETE);
EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
getPendingGroupKeyTable();
pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
groupDesc.appCookie()),
......@@ -851,7 +842,7 @@ public class DistributedGroupStore
Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entryPendingRemove =
new HashSet<>();
groupStoreEntriesByKey.entrySet().stream()
getGroupStoreKeyMap().entrySet().stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId))
.forEach(entryPendingRemove::add);
......@@ -987,14 +978,13 @@ public class DistributedGroupStore
* Map handler to receive any events when the group key map is updated.
*/
private class GroupStoreKeyMapListener implements
EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
@Override
public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
StoredGroupEntry> mapEvent) {
public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
GroupEvent groupEvent = null;
GroupStoreKeyMapKey key = mapEvent.key();
StoredGroupEntry group = mapEvent.value();
StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
if ((key == null) && (group == null)) {
log.error("GroupStoreKeyMapListener: Received "
+ "event {} with null entry", mapEvent.type());
......@@ -1014,25 +1004,24 @@ public class DistributedGroupStore
mapEvent.type(),
group.id(),
key.deviceId());
if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
// Update the group ID table
getGroupIdTable(group.deviceId()).put(group.id(), group);
if (mapEvent.value().state() == Group.GroupState.ADDED) {
if (mapEvent.value().isGroupStateAddedFirstTime()) {
groupEvent = new GroupEvent(Type.GROUP_ADDED,
mapEvent.value());
StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
if (value.state() == Group.GroupState.ADDED) {
if (value.isGroupStateAddedFirstTime()) {
groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
group.id(),
group.deviceId());
} else {
groupEvent = new GroupEvent(Type.GROUP_UPDATED,
mapEvent.value());
groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
log.trace("Received following GROUP_ADDED state update for id {} in device {}",
group.id(),
group.deviceId());
}
}
} else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
} else if (mapEvent.type() == MapEvent.Type.REMOVE) {
groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
// Remove the entry from the group ID table
getGroupIdTable(group.deviceId()).remove(group.id(), group);
......
......@@ -46,7 +46,7 @@ import org.onosproject.net.group.GroupOperation;
import org.onosproject.net.group.GroupStore;
import org.onosproject.net.group.GroupStoreDelegate;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.TestStorageService;
import com.google.common.collect.ImmutableList;
......@@ -109,7 +109,7 @@ public class DistributedGroupStoreTest {
DistributedGroupStore groupStoreImpl;
GroupStore groupStore;
EventuallyConsistentMap auditPendingReqQueue;
ConsistentMap auditPendingReqQueue;
static class MasterOfAll extends MastershipServiceAdapter {
@Override
......