Madan Jampani
Committed by Thomas Vachuska

Using non-static serializers in Flow Rule Store

Change-Id: Ifacd9ca98d8c6d3bbf03b3b9784234f7eab458a5
......@@ -68,8 +68,6 @@ import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
......@@ -183,13 +181,9 @@ public class DistributedFlowRuleStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.build("FlowRuleStore"));
protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MastershipBasedTimestamp.class);
......@@ -223,7 +217,7 @@ public class DistributedFlowRuleStore
deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
.withName("onos-flow-table-stats")
.withSerializer(SERIALIZER_BUILDER)
.withSerializer(serializerBuilder)
.withAntiEntropyPeriod(5, TimeUnit.SECONDS)
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.withTombstonesDisabled()
......@@ -331,17 +325,17 @@ public class DistributedFlowRuleStore
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
clusterCommunicator.addSubscriber(
GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
clusterCommunicator.addSubscriber(
GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
clusterCommunicator.addSubscriber(
FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
}
private void unregisterMessageHandlers() {
......@@ -386,8 +380,8 @@ public class DistributedFlowRuleStore
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
FlowStoreMessageSubjects.GET_FLOW_ENTRY,
SERIALIZER::encode,
SERIALIZER::decode,
serializer::encode,
serializer::decode,
master),
FLOW_RULE_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
......@@ -412,8 +406,8 @@ public class DistributedFlowRuleStore
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
SERIALIZER::encode,
SERIALIZER::decode,
serializer::encode,
serializer::decode,
master),
FLOW_RULE_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
......@@ -460,7 +454,7 @@ public class DistributedFlowRuleStore
clusterCommunicator.unicast(operation,
APPLY_BATCH_FLOWS,
SERIALIZER::encode,
serializer::encode,
master)
.whenComplete((result, error) -> {
if (error != null) {
......@@ -607,8 +601,8 @@ public class DistributedFlowRuleStore
return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
rule,
REMOVE_FLOW_ENTRY,
SERIALIZER::encode,
SERIALIZER::decode,
serializer::encode,
serializer::decode,
master));
}
......@@ -633,7 +627,7 @@ public class DistributedFlowRuleStore
notifyDelegate(event);
} else {
// TODO check unicast return value
clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
//error log: log.warn("Failed to respond to peer for batch operation result");
}
}
......@@ -642,7 +636,7 @@ public class DistributedFlowRuleStore
@Override
public void handle(final ClusterMessage message) {
FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
FlowRuleBatchOperation operation = serializer.decode(message.payload());
log.debug("received batch request {}", operation);
final DeviceId deviceId = operation.deviceId();
......@@ -657,7 +651,7 @@ public class DistributedFlowRuleStore
// TODO: we might want to wrap response in envelope
// to distinguish sw programming failure and hand over
// it make sense in the latter case to retry immediately.
message.respond(SERIALIZER.encode(allFailed));
message.respond(serializer.encode(allFailed));
return;
}
......@@ -736,8 +730,8 @@ public class DistributedFlowRuleStore
Set<DeviceId>>
sendAndReceive(deviceFlowEntries,
FLOW_TABLE_BACKUP,
SERIALIZER::encode,
SERIALIZER::decode,
serializer::encode,
serializer::decode,
nodeId)
.whenComplete((backedupDevices, error) -> {
Set<DeviceId> devicesNotBackedup = error != null ?
......@@ -769,12 +763,12 @@ public class DistributedFlowRuleStore
.withSerializer(new Serializer() {
@Override
public <T> byte[] encode(T object) {
return SERIALIZER.encode(object);
return serializer.encode(object);
}
@Override
public <T> T decode(byte[] bytes) {
return SERIALIZER.decode(bytes);
return serializer.decode(bytes);
}
})
.build());
......