Madan Jampani
Committed by Gerrit Code Review

FlowRuleStore: Consider errors when updating state of all current backups

Change-Id: I3bf4d20d79dc37c7040648ec6379794b8c93aad2
......@@ -290,7 +290,7 @@ public class NewDistributedFlowRuleStore
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
clusterCommunicator.addSubscriber(
FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, executor);
FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
}
private void unregisterMessageHandlers() {
......@@ -644,21 +644,33 @@ public class NewDistributedFlowRuleStore
private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Maps.newConcurrentMap();
flowEntries.forEach((key, value) -> {
if (deviceIds.contains(key)) {
deviceFlowEntries.put(key, value);
}
});
clusterCommunicator.unicast(deviceFlowEntries,
FLOW_TABLE_BACKUP,
SERIALIZER::encode,
nodeId);
deviceIds.forEach(id -> {
lastBackupTimes.put(id, System.currentTimeMillis());
lastBackupNodes.put(id, nodeId);
});
clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
deviceFlowEntries,
FLOW_TABLE_BACKUP,
SERIALIZER::encode,
SERIALIZER::decode,
nodeId)
.whenComplete((backedupDevices, error) -> {
Set<DeviceId> devicesNotBackedup = error != null ?
deviceFlowEntries.keySet() :
Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
if (devicesNotBackedup.size() > 0) {
log.warn("Failed to backup devices: {}", devicesNotBackedup, error);
}
if (backedupDevices != null) {
backedupDevices.forEach(id -> {
lastBackupTimes.put(id, System.currentTimeMillis());
lastBackupNodes.put(id, nodeId);
});
}
});
}
/**
......@@ -751,16 +763,23 @@ public class NewDistributedFlowRuleStore
}
}
private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
log.debug("Received flowEntries for {} to backup", flowTables.keySet());
Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
// Only process those devices are that not managed by the local node.
Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
Set<DeviceId> backedupDevices = Sets.newHashSet();
try {
Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
// Only process those devices are that not managed by the local node.
Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
.forEach((deviceId, flowTable) -> {
Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
deviceFlowTable.clear();
deviceFlowTable.putAll(flowTable);
backedupDevices.add(deviceId);
});
} catch (Exception e) {
log.warn("Failure processing backup request", e);
}
return backedupDevices;
}
}
}
......