pankaj

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 102 changed files with 1356 additions and 652 deletions
......@@ -11,6 +11,9 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.intent.IntentEvent;
import org.onlab.onos.net.intent.IntentListener;
import org.onlab.onos.net.intent.IntentService;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -29,13 +32,18 @@ public class FooComponent {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService intentService;
private final ClusterEventListener clusterListener = new InnerClusterListener();
private final DeviceListener deviceListener = new InnerDeviceListener();
private final IntentListener intentListener = new InnerIntentListener();
@Activate
public void activate() {
clusterService.addListener(clusterListener);
deviceService.addListener(deviceListener);
intentService.addListener(intentListener);
log.info("Started");
}
......@@ -43,6 +51,7 @@ public class FooComponent {
public void deactivate() {
clusterService.removeListener(clusterListener);
deviceService.removeListener(deviceListener);
intentService.removeListener(intentListener);
log.info("Stopped");
}
......@@ -59,6 +68,23 @@ public class FooComponent {
log.info("YEEEEHAAAAW! {}", event);
}
}
private class InnerIntentListener implements IntentListener {
@Override
public void event(IntentEvent event) {
String message;
if (event.type() == IntentEvent.Type.SUBMITTED) {
message = "WOW! It looks like someone has some intentions: {}";
} else if (event.type() == IntentEvent.Type.INSTALLED) {
message = "AWESOME! So far things are going great: {}";
} else if (event.type() == IntentEvent.Type.WITHDRAWN) {
message = "HMMM! Ambitions are fading apparently: {}";
} else {
message = "CRAP!!! Things are not turning out as intended: {}";
}
log.info(message, event.subject());
}
}
}
......
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.link.LinkService;
import org.onlab.onos.net.topology.Topology;
import org.onlab.onos.net.topology.TopologyService;
/**
* Provides summary of ONOS model.
*/
@Command(scope = "onos", name = "summary",
description = "Provides summary of ONOS model")
public class SummaryCommand extends AbstractShellCommand {
@Override
protected void execute() {
TopologyService topologyService = get(TopologyService.class);
Topology topology = topologyService.currentTopology();
print("nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d",
get(ClusterService.class).getNodes().size(),
get(DeviceService.class).getDeviceCount(),
get(LinkService.class).getLinkCount(),
get(HostService.class).getHostCount(),
topologyService.getClusters(topology).size(),
topology.pathCount(),
get(FlowRuleService.class).getFlowRuleCount(),
get(IntentService.class).getIntentCount());
}
}
......@@ -35,7 +35,7 @@ public class DevicesListCommand extends AbstractShellCommand {
* @param service device service
* @return sorted device list
*/
protected List<Device> getSortedDevices(DeviceService service) {
protected static List<Device> getSortedDevices(DeviceService service) {
List<Device> devices = newArrayList(service.getDevices());
Collections.sort(devices, Comparators.ELEMENT_COMPARATOR);
return devices;
......
package org.onlab.onos.cli.net;
import static com.google.common.collect.Lists.newArrayList;
import static org.onlab.onos.cli.net.DevicesListCommand.getSortedDevices;
import java.util.Collections;
import java.util.List;
......@@ -46,7 +47,7 @@ public class FlowsListCommand extends AbstractShellCommand {
DeviceService deviceService = get(DeviceService.class);
FlowRuleService service = get(FlowRuleService.class);
Map<Device, List<FlowEntry>> flows = getSortedFlows(deviceService, service);
for (Device d : flows.keySet()) {
for (Device d : getSortedDevices(deviceService)) {
printFlows(d, flows.get(d));
}
}
......@@ -57,14 +58,15 @@ public class FlowsListCommand extends AbstractShellCommand {
* @param service device service
* @return sorted device list
*/
protected Map<Device, List<FlowEntry>> getSortedFlows(DeviceService deviceService, FlowRuleService service) {
protected Map<Device, List<FlowEntry>> getSortedFlows(DeviceService deviceService,
FlowRuleService service) {
Map<Device, List<FlowEntry>> flows = Maps.newHashMap();
List<FlowEntry> rules;
FlowEntryState s = null;
if (state != null && !state.equals("any")) {
s = FlowEntryState.valueOf(state.toUpperCase());
}
Iterable<Device> devices = uri == null ? deviceService.getDevices() :
Iterable<Device> devices = uri == null ? deviceService.getDevices() :
Collections.singletonList(deviceService.getDevice(DeviceId.deviceId(uri)));
for (Device d : devices) {
if (s == null) {
......@@ -89,18 +91,16 @@ public class FlowsListCommand extends AbstractShellCommand {
* @param flows the set of flows for that device.
*/
protected void printFlows(Device d, List<FlowEntry> flows) {
print("Device: " + d.id());
if (flows == null | flows.isEmpty()) {
print(" %s", "No flows.");
return;
}
for (FlowEntry f : flows) {
print(FMT, Long.toHexString(f.id().value()), f.state(), f.bytes(),
f.packets(), f.life(), f.priority());
print(SFMT, f.selector().criteria());
print(TFMT, f.treatment().instructions());
boolean empty = flows == null || flows.isEmpty();
print("deviceId=%s, flowRuleCount=%d", d.id(), empty ? 0 : flows.size());
if (!empty) {
for (FlowEntry f : flows) {
print(FMT, Long.toHexString(f.id().value()), f.state(), f.bytes(),
f.packets(), f.life(), f.priority());
print(SFMT, f.selector().criteria());
print(TFMT, f.treatment().instructions());
}
}
}
}
......
......@@ -24,7 +24,7 @@ public class IntentIdCompleter implements Completer {
Iterator<Intent> it = service.getIntents().iterator();
SortedSet<String> strings = delegate.getStrings();
while (it.hasNext()) {
strings.add(it.next().getId().toString());
strings.add(it.next().id().toString());
}
// Now let the completer do the work for figuring out what to offer.
......
......@@ -10,9 +10,9 @@ import org.onlab.onos.net.intent.IntentService;
/**
* Removes host-to-host connectivity intent.
*/
@Command(scope = "onos", name = "remove-host-intent",
description = "Removes host-to-host connectivity intent")
public class RemoveHostToHostIntentCommand extends AbstractShellCommand {
@Command(scope = "onos", name = "remove-intent",
description = "Removes the specified intent")
public class IntentRemoveCommand extends AbstractShellCommand {
@Argument(index = 0, name = "id", description = "Intent ID",
required = true, multiValued = false)
......
......@@ -17,8 +17,8 @@ public class IntentsListCommand extends AbstractShellCommand {
protected void execute() {
IntentService service = get(IntentService.class);
for (Intent intent : service.getIntents()) {
IntentState state = service.getIntentState(intent.getId());
print("%s %s %s", intent.getId(), state, intent);
IntentState state = service.getIntentState(intent.id());
print("%s %s %s", intent.id(), state, intent);
}
}
......
package org.onlab.onos.cli.net;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Host;
......@@ -18,8 +19,22 @@ import org.onlab.onos.net.intent.IntentState;
description = "Wipes-out the entire network information base, i.e. devices, links, hosts")
public class WipeOutCommand extends ClustersListCommand {
private static final String DISCLAIMER = "Yes, I know it will delete everything!";
@Argument(index = 0, name = "disclaimer", description = "Device ID",
required = true, multiValued = false)
String disclaimer = null;
@Override
protected void execute() {
if (!disclaimer.equals(DISCLAIMER)) {
print("I'm afraid I can't do that...");
print("You have to acknowledge by: " + DISCLAIMER);
return;
}
print("Good bye...");
DeviceAdminService deviceAdminService = get(DeviceAdminService.class);
DeviceService deviceService = get(DeviceService.class);
for (Device device : deviceService.getDevices()) {
......@@ -34,7 +49,7 @@ public class WipeOutCommand extends ClustersListCommand {
IntentService intentService = get(IntentService.class);
for (Intent intent : intentService.getIntents()) {
if (intentService.getIntentState(intent.getId()) == IntentState.INSTALLED) {
if (intentService.getIntentState(intent.id()) == IntentState.INSTALLED) {
intentService.withdraw(intent);
}
}
......
......@@ -2,6 +2,9 @@
<command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
<command>
<action class="org.onlab.onos.cli.SummaryCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.NodesListCommand"/>
</command>
<command>
......@@ -61,15 +64,15 @@
<action class="org.onlab.onos.cli.net.IntentsListCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.net.AddHostToHostIntentCommand"/>
<action class="org.onlab.onos.cli.net.IntentRemoveCommand"/>
<completers>
<ref component-id="hostIdCompleter"/>
<ref component-id="intentIdCompleter"/>
</completers>
</command>
<command>
<action class="org.onlab.onos.cli.net.RemoveHostToHostIntentCommand"/>
<action class="org.onlab.onos.cli.net.AddHostToHostIntentCommand"/>
<completers>
<ref component-id="intentIdCompleter"/>
<ref component-id="hostIdCompleter"/>
</completers>
</command>
......
......@@ -115,7 +115,7 @@ public class DefaultFlowRule implements FlowRule {
}
public int hash() {
return Objects.hash(deviceId, selector, id);
return Objects.hash(deviceId, selector, treatment);
}
@Override
......@@ -132,7 +132,7 @@ public class DefaultFlowRule implements FlowRule {
if (obj instanceof DefaultFlowRule) {
DefaultFlowRule that = (DefaultFlowRule) obj;
return Objects.equals(deviceId, that.deviceId) &&
//Objects.equals(id, that.id) &&
Objects.equals(id, that.id) &&
Objects.equals(priority, that.priority) &&
Objects.equals(selector, that.selector);
......@@ -143,7 +143,7 @@ public class DefaultFlowRule implements FlowRule {
@Override
public String toString() {
return toStringHelper(this)
.add("id", id)
.add("id", Long.toHexString(id.value()))
.add("deviceId", deviceId)
.add("priority", priority)
.add("selector", selector.criteria())
......@@ -154,7 +154,7 @@ public class DefaultFlowRule implements FlowRule {
@Override
public int timeout() {
return timeout > MAX_TIMEOUT ? MAX_TIMEOUT : this.timeout;
return timeout;
}
}
......
package org.onlab.onos.net.flow;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.flow.instructions.Instructions;
......@@ -8,12 +15,6 @@ import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Default traffic treatment implementation.
*/
......@@ -44,6 +45,25 @@ public final class DefaultTrafficTreatment implements TrafficTreatment {
return new Builder();
}
//FIXME: Order of instructions may affect hashcode
@Override
public int hashCode() {
return Objects.hash(instructions);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof DefaultTrafficTreatment) {
DefaultTrafficTreatment that = (DefaultTrafficTreatment) obj;
return Objects.equals(instructions, that.instructions);
}
return false;
}
/**
* Builds a list of treatments following the following order.
* Modifications -> Group -> Output (including drop)
......@@ -66,6 +86,7 @@ public final class DefaultTrafficTreatment implements TrafficTreatment {
private Builder() {
}
@Override
public Builder add(Instruction instruction) {
if (drop) {
return this;
......
......@@ -13,6 +13,13 @@ import org.onlab.onos.net.DeviceId;
public interface FlowRuleService {
/**
* Returns the number of flow rules in the system.
*
* @return flow rule count
*/
int getFlowRuleCount();
/**
* Returns the collection of flow entries applied on the specified device.
* This will include flow rules which may not yet have been applied to
* the device.
......@@ -59,6 +66,8 @@ public interface FlowRuleService {
*/
Iterable<FlowRule> getFlowRulesById(ApplicationId id);
//Future<CompletedBatchOperation> applyBatch(BatchOperation<FlowRuleBatchEntry>)
/**
* Adds the specified flow rule listener.
*
......@@ -72,7 +81,4 @@ public interface FlowRuleService {
* @param listener flow rule listener
*/
void removeListener(FlowRuleListener listener);
}
......
......@@ -10,7 +10,15 @@ import org.onlab.onos.store.Store;
public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegate> {
/**
* Returns the number of flow rule in the store.
*
* @return number of flow rules
*/
int getFlowRuleCount();
/**
* Returns the stored flow.
*
* @param rule the rule to look for
* @return a flow rule
*/
......@@ -60,5 +68,4 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat
* @return flow_removed event, or null if nothing removed
*/
FlowRuleEvent removeFlowRule(FlowEntry rule);
}
......
......@@ -3,6 +3,8 @@ package org.onlab.onos.net.flow.instructions;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.L2SubType;
import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModEtherInstruction;
......@@ -117,6 +119,24 @@ public final class Instructions {
return toStringHelper(type()).toString();
}
@Override
public int hashCode() {
return Objects.hash(type());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof DropInstruction) {
DropInstruction that = (DropInstruction) obj;
return Objects.equals(type(), that.type());
}
return false;
}
}
......@@ -140,6 +160,26 @@ public final class Instructions {
return toStringHelper(type().toString())
.add("port", port).toString();
}
@Override
public int hashCode() {
return Objects.hash(port, type());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof OutputInstruction) {
OutputInstruction that = (OutputInstruction) obj;
Objects.equals(port, that.port);
}
return false;
}
}
}
......
......@@ -2,6 +2,8 @@ package org.onlab.onos.net.flow.instructions;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Objects;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
......@@ -74,6 +76,25 @@ public abstract class L2ModificationInstruction implements Instruction {
.add("mac", mac).toString();
}
@Override
public int hashCode() {
return Objects.hash(mac, subtype);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof ModEtherInstruction) {
ModEtherInstruction that = (ModEtherInstruction) obj;
return Objects.equals(mac, that.mac) &&
Objects.equals(subtype, that.subtype);
}
return false;
}
}
......@@ -103,6 +124,25 @@ public abstract class L2ModificationInstruction implements Instruction {
.add("id", vlanId).toString();
}
@Override
public int hashCode() {
return Objects.hash(vlanId, subtype());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof ModVlanIdInstruction) {
ModVlanIdInstruction that = (ModVlanIdInstruction) obj;
return Objects.equals(vlanId, that.vlanId);
}
return false;
}
}
/**
......@@ -131,6 +171,24 @@ public abstract class L2ModificationInstruction implements Instruction {
.add("pcp", Long.toHexString(vlanPcp)).toString();
}
@Override
public int hashCode() {
return Objects.hash(vlanPcp, subtype());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof ModVlanPcpInstruction) {
ModVlanPcpInstruction that = (ModVlanPcpInstruction) obj;
return Objects.equals(vlanPcp, that.vlanPcp);
}
return false;
}
}
......
......@@ -2,6 +2,8 @@ package org.onlab.onos.net.flow.instructions;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Objects;
import org.onlab.packet.IpPrefix;
/**
......@@ -66,5 +68,23 @@ public abstract class L3ModificationInstruction implements Instruction {
.add("ip", ip).toString();
}
@Override
public int hashCode() {
return Objects.hash(ip, subtype());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof ModIPInstruction) {
ModIPInstruction that = (ModIPInstruction) obj;
return Objects.equals(ip, that.ip);
}
return false;
}
}
}
......
......@@ -24,7 +24,7 @@ public abstract class AbstractIntent implements Intent {
}
@Override
public IntentId getId() {
public IntentId id() {
return id;
}
......
......@@ -53,7 +53,7 @@ public abstract class ConnectivityIntent extends AbstractIntent {
*
* @return traffic match
*/
public TrafficSelector getTrafficSelector() {
public TrafficSelector selector() {
return selector;
}
......@@ -62,7 +62,7 @@ public abstract class ConnectivityIntent extends AbstractIntent {
*
* @return applied action
*/
public TrafficTreatment getTrafficTreatment() {
public TrafficTreatment treatment() {
return treatment;
}
......
......@@ -80,9 +80,9 @@ public class HostToHostIntent extends ConnectivityIntent {
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", getId())
.add("selector", getTrafficSelector())
.add("treatment", getTrafficTreatment())
.add("id", id())
.add("selector", selector())
.add("treatment", treatment())
.add("one", one)
.add("two", two)
.toString();
......
......@@ -11,5 +11,5 @@ public interface Intent extends BatchOperationTarget {
*
* @return intent identifier
*/
IntentId getId();
IntentId id();
}
......
package org.onlab.onos.net.intent;
import com.google.common.base.MoreObjects;
import org.onlab.onos.event.AbstractEvent;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A class to represent an intent related event.
*/
public class IntentEvent extends AbstractEvent<IntentState, Intent> {
// TODO: determine a suitable parent class; if one does not exist, consider
// introducing one
public class IntentEvent extends AbstractEvent<IntentEvent.Type, Intent> {
private final long time;
private final Intent intent;
private final IntentState state;
private final IntentState previous;
public enum Type {
/**
* Signifies that a new intent has been submitted to the system.
*/
SUBMITTED,
/**
* Creates an event describing a state change of an intent.
*
* @param intent subject intent
* @param state new intent state
* @param previous previous intent state
* @param time time the event created in milliseconds since start of epoch
* @throws NullPointerException if the intent or state is null
*/
public IntentEvent(Intent intent, IntentState state, IntentState previous, long time) {
super(state, intent);
this.intent = checkNotNull(intent);
this.state = checkNotNull(state);
this.previous = previous;
this.time = time;
}
/**
* Signifies that an intent has been successfully installed.
*/
INSTALLED,
/**
* Returns the state of the intent which caused the event.
*
* @return the state of the intent
*/
public IntentState getState() {
return state;
}
/**
* Signifies that an intent has failed compilation or installation.
*/
FAILED,
/**
* Returns the previous state of the intent which caused the event.
*
* @return the previous state of the intent
*/
public IntentState getPreviousState() {
return previous;
/**
* Signifies that an intent has been withdrawn from the system.
*/
WITHDRAWN
}
/**
* Returns the intent associated with the event.
* Creates an event of a given type and for the specified intent and the
* current time.
*
* @return the intent
* @param type event type
* @param intent subject intent
* @param time time the event created in milliseconds since start of epoch
*/
public Intent getIntent() {
return intent;
public IntentEvent(Type type, Intent intent, long time) {
super(type, intent, time);
}
/**
* Returns the time at which the event was created.
* Creates an event of a given type and for the specified intent and the
* current time.
*
* @return the time in milliseconds since start of epoch
* @param type event type
* @param intent subject intent
*/
public long getTime() {
return time;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IntentEvent that = (IntentEvent) o;
return Objects.equals(this.intent, that.intent)
&& Objects.equals(this.state, that.state)
&& Objects.equals(this.previous, that.previous)
&& Objects.equals(this.time, that.time);
public IntentEvent(Type type, Intent intent) {
super(type, intent);
}
@Override
public int hashCode() {
return Objects.hash(intent, state, previous, time);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("intent", intent)
.add("state", state)
.add("previous", previous)
.add("time", time)
.toString();
}
}
......
package org.onlab.onos.net.intent;
/**
* This class represents the states of an intent.
*
* <p>
* Note: The state is expressed as enum, but there is possibility
* in the future that we define specific class instead of enum to improve
* the extensibility of state definition.
* </p>
* Representation of the phases an intent may attain during its lifecycle.
*/
public enum IntentState {
// FIXME: requires discussion on State vs. EventType and a solid state-transition diagram
// TODO: consider the impact of conflict detection
// TODO: consider the impact that external events affect an installed intent
/**
* The beginning state.
*
* Signifies that the intent has been submitted and will start compiling
* shortly. However, this compilation may not necessarily occur on the
* local controller instance.
* <p/>
* All intent in the runtime take this state first.
*/
SUBMITTED,
/**
* The intent compilation has been completed.
*
* An intent translation graph (tree) is completely created.
* Leaves of the graph are installable intent type.
* Signifies that the intent is being compiled into installable intents.
* This is a transitional state after which the intent will enter either
* {@link #FAILED} state or {@link #INSTALLING} state.
*/
COMPILING,
/**
* Signifies that the resulting installable intents are being installed
* into the network environment. This is a transitional state after which
* the intent will enter either {@link #INSTALLED} state or
* {@link #RECOMPILING} state.
*/
COMPILED,
INSTALLING,
/**
* The intent has been successfully installed.
* The intent has been successfully installed. This is a state where the
* intent may remain parked until it is withdrawn by the application or
* until the network environment changes in some way to make the original
* set of installable intents untenable.
*/
INSTALLED,
/**
* The intent is being withdrawn.
*
* When {@link IntentService#withdraw(Intent)} is called,
* the intent takes this state first.
* Signifies that the intent is being recompiled into installable intents
* as an attempt to adapt to an anomaly in the network environment.
* This is a transitional state after which the intent will enter either
* {@link #FAILED} state or {@link #INSTALLING} state.
* <p/>
* Exit to the {@link #FAILED} state may be caused by failure to compile
* or by compiling into the same set of installable intents which have
* previously failed to be installed.
*/
RECOMPILING,
/**
* Indicates that the intent is being withdrawn. This is a transitional
* state, triggered by invocation of the
* {@link IntentService#withdraw(Intent)} but one with only one outcome,
* which is the the intent being placed in the {@link #WITHDRAWN} state.
*/
WITHDRAWING,
/**
* The intent has been successfully withdrawn.
* Indicates that the intent has been successfully withdrawn.
*/
WITHDRAWN,
/**
* The intent has failed to be compiled, installed, or withdrawn.
*
* When the intent failed to be withdrawn, it is still, at least partially installed.
* Signifies that the intent has failed compiling, installing or
* recompiling states.
*/
FAILED,
FAILED
}
......
......@@ -10,10 +10,16 @@ import java.util.List;
public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
/**
* Creates a new intent.
* Submits a new intent into the store. If the returned event is not
* null, the manager is expected to dispatch the event and then to kick
* off intent compilation process. Otherwise, another node has been elected
* to perform the compilation process and the node will learn about
* the submittal and results of the intent compilation via the delegate
* mechanism.
*
* @param intent intent
* @return appropriate event or null if no change resulted
* @param intent intent to be submitted
* @return event indicating the intent was submitted or null if no
* change resulted, e.g. duplicate intent
*/
IntentEvent createIntent(Intent intent);
......@@ -68,10 +74,9 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
*
* @param intentId original intent identifier
* @param installableIntents compiled installable intents
* @return compiled state transition event
*/
IntentEvent addInstallableIntents(IntentId intentId,
List<InstallableIntent> installableIntents);
void addInstallableIntents(IntentId intentId,
List<InstallableIntent> installableIntents);
/**
* Returns the list of the installable events associated with the specified
......
package org.onlab.onos.net.intent;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
import java.util.Set;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Sets;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Sets;
import java.util.Objects;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Abstraction of multiple source to single destination connectivity intent.
*/
public class MultiPointToSinglePointIntent extends ConnectivityIntent {
private final Set<ConnectPoint> ingressPorts;
private final ConnectPoint egressPort;
private final Set<ConnectPoint> ingressPoints;
private final ConnectPoint egressPoint;
/**
* Creates a new multi-to-single point connectivity intent for the specified
......@@ -28,25 +27,25 @@ public class MultiPointToSinglePointIntent extends ConnectivityIntent {
* @param id intent identifier
* @param match traffic match
* @param action action
* @param ingressPorts set of ports from which ingress traffic originates
* @param egressPort port to which traffic will egress
* @throws NullPointerException if {@code ingressPorts} or
* {@code egressPort} is null.
* @throws IllegalArgumentException if the size of {@code ingressPorts} is
* @param ingressPoints set of ports from which ingress traffic originates
* @param egressPoint port to which traffic will egress
* @throws NullPointerException if {@code ingressPoints} or
* {@code egressPoint} is null.
* @throws IllegalArgumentException if the size of {@code ingressPoints} is
* not more than 1
*/
public MultiPointToSinglePointIntent(IntentId id, TrafficSelector match,
TrafficTreatment action,
Set<ConnectPoint> ingressPorts,
ConnectPoint egressPort) {
Set<ConnectPoint> ingressPoints,
ConnectPoint egressPoint) {
super(id, match, action);
checkNotNull(ingressPorts);
checkArgument(!ingressPorts.isEmpty(),
checkNotNull(ingressPoints);
checkArgument(!ingressPoints.isEmpty(),
"there should be at least one ingress port");
this.ingressPorts = Sets.newHashSet(ingressPorts);
this.egressPort = checkNotNull(egressPort);
this.ingressPoints = Sets.newHashSet(ingressPoints);
this.egressPoint = checkNotNull(egressPoint);
}
/**
......@@ -54,8 +53,8 @@ public class MultiPointToSinglePointIntent extends ConnectivityIntent {
*/
protected MultiPointToSinglePointIntent() {
super();
this.ingressPorts = null;
this.egressPort = null;
this.ingressPoints = null;
this.egressPoint = null;
}
/**
......@@ -64,8 +63,8 @@ public class MultiPointToSinglePointIntent extends ConnectivityIntent {
*
* @return set of ingress ports
*/
public Set<ConnectPoint> getIngressPorts() {
return ingressPorts;
public Set<ConnectPoint> ingressPoints() {
return ingressPoints;
}
/**
......@@ -73,8 +72,8 @@ public class MultiPointToSinglePointIntent extends ConnectivityIntent {
*
* @return egress port
*/
public ConnectPoint getEgressPort() {
return egressPort;
public ConnectPoint egressPoint() {
return egressPoint;
}
@Override
......@@ -90,23 +89,23 @@ public class MultiPointToSinglePointIntent extends ConnectivityIntent {
}
MultiPointToSinglePointIntent that = (MultiPointToSinglePointIntent) o;
return Objects.equals(this.ingressPorts, that.ingressPorts)
&& Objects.equals(this.egressPort, that.egressPort);
return Objects.equals(this.ingressPoints, that.ingressPoints)
&& Objects.equals(this.egressPoint, that.egressPoint);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), ingressPorts, egressPort);
return Objects.hash(super.hashCode(), ingressPoints, egressPoint);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", getId())
.add("match", getTrafficSelector())
.add("action", getTrafficTreatment())
.add("ingressPorts", getIngressPorts())
.add("egressPort", getEgressPort())
.add("id", id())
.add("match", selector())
.add("action", treatment())
.add("ingressPoints", ingressPoints())
.add("egressPoint", egressPoint())
.toString();
}
}
......
......@@ -46,7 +46,7 @@ public class PathIntent extends PointToPointIntent implements InstallableIntent
*
* @return traversed links
*/
public Path getPath() {
public Path path() {
return path;
}
......@@ -79,11 +79,11 @@ public class PathIntent extends PointToPointIntent implements InstallableIntent
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", getId())
.add("match", getTrafficSelector())
.add("action", getTrafficTreatment())
.add("ingressPort", getIngressPort())
.add("egressPort", getEgressPort())
.add("id", id())
.add("match", selector())
.add("action", treatment())
.add("ingressPort", ingressPoint())
.add("egressPort", egressPoint())
.add("path", path)
.toString();
}
......
......@@ -14,27 +14,27 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public class PointToPointIntent extends ConnectivityIntent {
private final ConnectPoint ingressPort;
private final ConnectPoint egressPort;
private final ConnectPoint ingressPoint;
private final ConnectPoint egressPoint;
/**
* Creates a new point-to-point intent with the supplied ingress/egress
* ports.
*
* @param id intent identifier
* @param selector traffic selector
* @param treatment treatment
* @param ingressPort ingress port
* @param egressPort egress port
* @throws NullPointerException if {@code ingressPort} or {@code egressPort} is null.
* @param id intent identifier
* @param selector traffic selector
* @param treatment treatment
* @param ingressPoint ingress port
* @param egressPoint egress port
* @throws NullPointerException if {@code ingressPoint} or {@code egressPoints} is null.
*/
public PointToPointIntent(IntentId id, TrafficSelector selector,
TrafficTreatment treatment,
ConnectPoint ingressPort,
ConnectPoint egressPort) {
ConnectPoint ingressPoint,
ConnectPoint egressPoint) {
super(id, selector, treatment);
this.ingressPort = checkNotNull(ingressPort);
this.egressPort = checkNotNull(egressPort);
this.ingressPoint = checkNotNull(ingressPoint);
this.egressPoint = checkNotNull(egressPoint);
}
/**
......@@ -42,8 +42,8 @@ public class PointToPointIntent extends ConnectivityIntent {
*/
protected PointToPointIntent() {
super();
this.ingressPort = null;
this.egressPort = null;
this.ingressPoint = null;
this.egressPoint = null;
}
/**
......@@ -52,8 +52,8 @@ public class PointToPointIntent extends ConnectivityIntent {
*
* @return ingress port
*/
public ConnectPoint getIngressPort() {
return ingressPort;
public ConnectPoint ingressPoint() {
return ingressPoint;
}
/**
......@@ -61,8 +61,8 @@ public class PointToPointIntent extends ConnectivityIntent {
*
* @return egress port
*/
public ConnectPoint getEgressPort() {
return egressPort;
public ConnectPoint egressPoint() {
return egressPoint;
}
@Override
......@@ -78,23 +78,23 @@ public class PointToPointIntent extends ConnectivityIntent {
}
PointToPointIntent that = (PointToPointIntent) o;
return Objects.equals(this.ingressPort, that.ingressPort)
&& Objects.equals(this.egressPort, that.egressPort);
return Objects.equals(this.ingressPoint, that.ingressPoint)
&& Objects.equals(this.egressPoint, that.egressPoint);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), ingressPort, egressPort);
return Objects.hash(super.hashCode(), ingressPoint, egressPoint);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", getId())
.add("match", getTrafficSelector())
.add("action", getTrafficTreatment())
.add("ingressPort", ingressPort)
.add("egressPort", egressPort)
.add("id", id())
.add("match", selector())
.add("action", treatment())
.add("ingressPoint", ingressPoint)
.add("egressPoints", egressPoint)
.toString();
}
......
......@@ -17,34 +17,34 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public class SinglePointToMultiPointIntent extends ConnectivityIntent {
private final ConnectPoint ingressPort;
private final Set<ConnectPoint> egressPorts;
private final ConnectPoint ingressPoint;
private final Set<ConnectPoint> egressPoints;
/**
* Creates a new single-to-multi point connectivity intent.
*
* @param id intent identifier
* @param selector traffic selector
* @param treatment treatment
* @param ingressPort port on which traffic will ingress
* @param egressPorts set of ports on which traffic will egress
* @throws NullPointerException if {@code ingressPort} or
* {@code egressPorts} is null
* @throws IllegalArgumentException if the size of {@code egressPorts} is
* @param id intent identifier
* @param selector traffic selector
* @param treatment treatment
* @param ingressPoint port on which traffic will ingress
* @param egressPoints set of ports on which traffic will egress
* @throws NullPointerException if {@code ingressPoint} or
* {@code egressPoints} is null
* @throws IllegalArgumentException if the size of {@code egressPoints} is
* not more than 1
*/
public SinglePointToMultiPointIntent(IntentId id, TrafficSelector selector,
TrafficTreatment treatment,
ConnectPoint ingressPort,
Set<ConnectPoint> egressPorts) {
ConnectPoint ingressPoint,
Set<ConnectPoint> egressPoints) {
super(id, selector, treatment);
checkNotNull(egressPorts);
checkArgument(!egressPorts.isEmpty(),
checkNotNull(egressPoints);
checkArgument(!egressPoints.isEmpty(),
"there should be at least one egress port");
this.ingressPort = checkNotNull(ingressPort);
this.egressPorts = Sets.newHashSet(egressPorts);
this.ingressPoint = checkNotNull(ingressPoint);
this.egressPoints = Sets.newHashSet(egressPoints);
}
/**
......@@ -52,8 +52,8 @@ public class SinglePointToMultiPointIntent extends ConnectivityIntent {
*/
protected SinglePointToMultiPointIntent() {
super();
this.ingressPort = null;
this.egressPorts = null;
this.ingressPoint = null;
this.egressPoints = null;
}
/**
......@@ -61,8 +61,8 @@ public class SinglePointToMultiPointIntent extends ConnectivityIntent {
*
* @return ingress port
*/
public ConnectPoint getIngressPort() {
return ingressPort;
public ConnectPoint ingressPoint() {
return ingressPoint;
}
/**
......@@ -70,8 +70,8 @@ public class SinglePointToMultiPointIntent extends ConnectivityIntent {
*
* @return set of egress ports
*/
public Set<ConnectPoint> getEgressPorts() {
return egressPorts;
public Set<ConnectPoint> egressPoints() {
return egressPoints;
}
@Override
......@@ -87,23 +87,23 @@ public class SinglePointToMultiPointIntent extends ConnectivityIntent {
}
SinglePointToMultiPointIntent that = (SinglePointToMultiPointIntent) o;
return Objects.equals(this.ingressPort, that.ingressPort)
&& Objects.equals(this.egressPorts, that.egressPorts);
return Objects.equals(this.ingressPoint, that.ingressPoint)
&& Objects.equals(this.egressPoints, that.egressPoints);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), ingressPort, egressPorts);
return Objects.hash(super.hashCode(), ingressPoint, egressPoints);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", getId())
.add("match", getTrafficSelector())
.add("action", getTrafficTreatment())
.add("ingressPort", ingressPort)
.add("egressPort", egressPorts)
.add("id", id())
.add("match", selector())
.add("action", treatment())
.add("ingressPoint", ingressPoint)
.add("egressPort", egressPoints)
.toString();
}
......
......@@ -4,5 +4,53 @@
* <em>what</em> rather than the <em>how</em>. This makes such instructions
* largely independent of topology and device specifics, thus allowing them to
* survive topology mutations.
* <p/>
* The controller core provides a suite of built-in intents and their compilers
* and installers. However, the intent framework is extensible in that it allows
* additional intents and their compilers or installers to be added
* dynamically at run-time. This allows others to enhance the initial arsenal of
* connectivity and policy-based intents available in base controller software.
* <p/>
* The following diagram depicts the state transition diagram for each top-level intent:<br>
* <img src="doc-files/intent-states.png" alt="ONOS intent states">
* <p/>
* The controller core accepts the intent specifications and translates them, via a
* process referred to as intent compilation, to installable intents, which are
* essentially actionable operations on the network environment.
* These actions are carried out by intent installation process, which results
* in some changes to the environment, e.g. tunnel links being provisioned,
* flow rules being installed on the data-plane, optical lambdas being reserved.
* <p/>
* After an intent is submitted by an application, it will be sent immediately
* (but asynchronously) into a compiling phase, then to installing phase and if
* all goes according to plan into installed state. Once an application decides
* it no longer wishes the intent to hold, it can withdraw it. This describes
* the nominal flow. However, it may happen that some issue is encountered.
* For example, an application may ask for an objective that is not currently
* achievable, e.g. connectivity across to unconnected network segments.
* If this is the case, the compiling phase may fail to produce a set of
* installable intents and instead result in a failed compile. If this occurs,
* only a change in the environment can trigger a transition back to the
* compiling state.
* <p/>
* Similarly, an issue may be encountered during the installation phase in
* which case the framework will attempt to recompile the intent to see if an
* alternate approach is available. If so, the intent will be sent back to
* installing phase. Otherwise, it will be parked in the failed state. Another
* scenario that’s very likely to be encountered is where the intent is
* successfully compiled and installed, but due to some topology event, such
* as a downed or downgraded link, loss of throughput may occur or connectivity
* may be lost altogether, thus impacting the viability of a previously
* satisfied intent. If this occurs, the framework will attempt to recompile
* the intent, and if an alternate approach is available, its installation
* will be attempted. Otherwise, the original top-level intent will be parked
* in the failed state.
* <p/>
* Please note that all *ing states, depicted in orange, are transitional and
* are expected to last only a brief amount of time. The rest of the states
* are parking states where the intent may spent some time; except for the
* submitted state of course. There, the intent may pause, but only briefly,
* while the system determines where to perform the compilation or while it
* performs global recomputation/optimization across all prior intents.
*/
package org.onlab.onos.net.intent;
\ No newline at end of file
......
......@@ -40,8 +40,7 @@ public class FakeIntentManager implements TestableIntentService {
@Override
public void run() {
try {
List<InstallableIntent> installable = compileIntent(intent);
installIntents(intent, installable);
executeCompilingPhase(intent);
} catch (IntentException e) {
exceptions.add(e);
}
......@@ -55,8 +54,8 @@ public class FakeIntentManager implements TestableIntentService {
@Override
public void run() {
try {
List<InstallableIntent> installable = getInstallable(intent.getId());
uninstallIntents(intent, installable);
List<InstallableIntent> installable = getInstallable(intent.id());
executeWithdrawingPhase(intent, installable);
} catch (IntentException e) {
exceptions.add(e);
}
......@@ -84,53 +83,60 @@ public class FakeIntentManager implements TestableIntentService {
return installer;
}
private <T extends Intent> List<InstallableIntent> compileIntent(T intent) {
private <T extends Intent> void executeCompilingPhase(T intent) {
setState(intent, IntentState.COMPILING);
try {
// For the fake, we compile using a single level pass
List<InstallableIntent> installable = new ArrayList<>();
for (Intent compiled : getCompiler(intent).compile(intent)) {
installable.add((InstallableIntent) compiled);
}
setState(intent, IntentState.COMPILED);
return installable;
executeInstallingPhase(intent, installable);
} catch (IntentException e) {
setState(intent, IntentState.FAILED);
throw e;
dispatch(new IntentEvent(IntentEvent.Type.FAILED, intent));
}
}
private void installIntents(Intent intent, List<InstallableIntent> installable) {
private void executeInstallingPhase(Intent intent,
List<InstallableIntent> installable) {
setState(intent, IntentState.INSTALLING);
try {
for (InstallableIntent ii : installable) {
registerSubclassInstallerIfNeeded(ii);
getInstaller(ii).install(ii);
}
setState(intent, IntentState.INSTALLED);
putInstallable(intent.getId(), installable);
putInstallable(intent.id(), installable);
dispatch(new IntentEvent(IntentEvent.Type.INSTALLED, intent));
} catch (IntentException e) {
setState(intent, IntentState.FAILED);
throw e;
dispatch(new IntentEvent(IntentEvent.Type.FAILED, intent));
}
}
private void uninstallIntents(Intent intent, List<InstallableIntent> installable) {
private void executeWithdrawingPhase(Intent intent,
List<InstallableIntent> installable) {
setState(intent, IntentState.WITHDRAWING);
try {
for (InstallableIntent ii : installable) {
getInstaller(ii).uninstall(ii);
}
removeInstallable(intent.id());
setState(intent, IntentState.WITHDRAWN);
removeInstallable(intent.getId());
dispatch(new IntentEvent(IntentEvent.Type.WITHDRAWN, intent));
} catch (IntentException e) {
// FIXME: Rework this to always go from WITHDRAWING to WITHDRAWN!
setState(intent, IntentState.FAILED);
throw e;
dispatch(new IntentEvent(IntentEvent.Type.FAILED, intent));
}
}
// Sets the internal state for the given intent and dispatches an event
private void setState(Intent intent, IntentState state) {
IntentState previous = intentStates.get(intent.getId());
intentStates.put(intent.getId(), state);
dispatch(new IntentEvent(intent, state, previous, System.currentTimeMillis()));
intentStates.put(intent.id(), state);
}
private void putInstallable(IntentId id, List<InstallableIntent> installable) {
......@@ -152,15 +158,15 @@ public class FakeIntentManager implements TestableIntentService {
@Override
public void submit(Intent intent) {
intents.put(intent.getId(), intent);
intents.put(intent.id(), intent);
setState(intent, IntentState.SUBMITTED);
dispatch(new IntentEvent(IntentEvent.Type.SUBMITTED, intent));
executeSubmit(intent);
}
@Override
public void withdraw(Intent intent) {
intents.remove(intent.getId());
setState(intent, IntentState.WITHDRAWING);
intents.remove(intent.id());
executeWithdraw(intent);
}
......
......@@ -10,11 +10,9 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static org.onlab.onos.net.intent.IntentState.*;
import static org.junit.Assert.*;
import static org.onlab.onos.net.intent.IntentEvent.Type.*;
// TODO: consider make it categorized as integration test when it become
// slow test or fragile test
/**
* Suite of tests for the intent service contract.
*/
......@@ -64,13 +62,13 @@ public class IntentServiceTest {
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
assertEquals("incorrect intent state", INSTALLED,
service.getIntentState(intent.getId()));
assertEquals("incorrect intent state", IntentState.INSTALLED,
service.getIntentState(intent.id()));
}
});
// Make sure that all expected events have been emitted
validateEvents(intent, SUBMITTED, COMPILED, INSTALLED);
validateEvents(intent, SUBMITTED, INSTALLED);
// Make sure there is just one intent (and is ours)
assertEquals("incorrect intent count", 1, service.getIntentCount());
......@@ -85,19 +83,19 @@ public class IntentServiceTest {
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
assertEquals("incorrect intent state", WITHDRAWN,
service.getIntentState(intent.getId()));
assertEquals("incorrect intent state", IntentState.WITHDRAWN,
service.getIntentState(intent.id()));
}
});
// Make sure that all expected events have been emitted
validateEvents(intent, WITHDRAWING, WITHDRAWN);
validateEvents(intent, WITHDRAWN);
// TODO: discuss what is the fate of intents after they have been withdrawn
// Make sure that the intent is no longer in the system
// assertEquals("incorrect intent count", 0, service.getIntents().size());
// assertNull("intent should not be found", service.getIntent(intent.getId()));
// assertNull("intent state should not be found", service.getIntentState(intent.getId()));
// assertNull("intent should not be found", service.getIntent(intent.id()));
// assertNull("intent state should not be found", service.getIntentState(intent.id()));
}
@Test
......@@ -113,8 +111,8 @@ public class IntentServiceTest {
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
assertEquals("incorrect intent state", FAILED,
service.getIntentState(intent.getId()));
assertEquals("incorrect intent state", IntentState.FAILED,
service.getIntentState(intent.id()));
}
});
......@@ -136,13 +134,13 @@ public class IntentServiceTest {
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
assertEquals("incorrect intent state", FAILED,
service.getIntentState(intent.getId()));
assertEquals("incorrect intent state", IntentState.FAILED,
service.getIntentState(intent.id()));
}
});
// Make sure that all expected events have been emitted
validateEvents(intent, SUBMITTED, COMPILED, FAILED);
validateEvents(intent, SUBMITTED, FAILED);
}
/**
......@@ -151,23 +149,23 @@ public class IntentServiceTest {
* considered.
*
* @param intent intent subject
* @param states list of states for which events are expected
* @param types list of event types for which events are expected
*/
protected void validateEvents(Intent intent, IntentState... states) {
protected void validateEvents(Intent intent, IntentEvent.Type... types) {
Iterator<IntentEvent> events = listener.events.iterator();
for (IntentState state : states) {
for (IntentEvent.Type type : types) {
IntentEvent event = events.hasNext() ? events.next() : null;
if (event == null) {
fail("expected event not found: " + state);
} else if (intent.equals(event.getIntent())) {
assertEquals("incorrect state", state, event.getState());
fail("expected event not found: " + type);
} else if (intent.equals(event.subject())) {
assertEquals("incorrect state", type, event.type());
}
}
// Remainder of events should not apply to this intent; make sure.
while (events.hasNext()) {
assertFalse("unexpected event for intent",
intent.equals(events.next().getIntent()));
intent.equals(events.next().subject()));
}
}
......@@ -228,8 +226,8 @@ public class IntentServiceTest {
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
assertEquals("incorrect intent state", INSTALLED,
service.getIntentState(intent.getId()));
assertEquals("incorrect intent state", IntentState.INSTALLED,
service.getIntentState(intent.id()));
}
});
......
......@@ -12,10 +12,10 @@ public class MultiPointToSinglePointIntentTest extends ConnectivityIntentTest {
@Test
public void basics() {
MultiPointToSinglePointIntent intent = createOne();
assertEquals("incorrect id", IID, intent.getId());
assertEquals("incorrect match", MATCH, intent.getTrafficSelector());
assertEquals("incorrect ingress", PS1, intent.getIngressPorts());
assertEquals("incorrect egress", P2, intent.getEgressPort());
assertEquals("incorrect id", IID, intent.id());
assertEquals("incorrect match", MATCH, intent.selector());
assertEquals("incorrect ingress", PS1, intent.ingressPoints());
assertEquals("incorrect egress", P2, intent.egressPoint());
}
@Override
......
......@@ -16,12 +16,12 @@ public class PathIntentTest extends ConnectivityIntentTest {
@Test
public void basics() {
PathIntent intent = createOne();
assertEquals("incorrect id", IID, intent.getId());
assertEquals("incorrect match", MATCH, intent.getTrafficSelector());
assertEquals("incorrect action", NOP, intent.getTrafficTreatment());
assertEquals("incorrect ingress", P1, intent.getIngressPort());
assertEquals("incorrect egress", P2, intent.getEgressPort());
assertEquals("incorrect path", PATH1, intent.getPath());
assertEquals("incorrect id", IID, intent.id());
assertEquals("incorrect match", MATCH, intent.selector());
assertEquals("incorrect action", NOP, intent.treatment());
assertEquals("incorrect ingress", P1, intent.ingressPoint());
assertEquals("incorrect egress", P2, intent.egressPoint());
assertEquals("incorrect path", PATH1, intent.path());
}
@Override
......
......@@ -12,10 +12,10 @@ public class PointToPointIntentTest extends ConnectivityIntentTest {
@Test
public void basics() {
PointToPointIntent intent = createOne();
assertEquals("incorrect id", IID, intent.getId());
assertEquals("incorrect match", MATCH, intent.getTrafficSelector());
assertEquals("incorrect ingress", P1, intent.getIngressPort());
assertEquals("incorrect egress", P2, intent.getEgressPort());
assertEquals("incorrect id", IID, intent.id());
assertEquals("incorrect match", MATCH, intent.selector());
assertEquals("incorrect ingress", P1, intent.ingressPoint());
assertEquals("incorrect egress", P2, intent.egressPoint());
}
@Override
......
......@@ -12,10 +12,10 @@ public class SinglePointToMultiPointIntentTest extends ConnectivityIntentTest {
@Test
public void basics() {
SinglePointToMultiPointIntent intent = createOne();
assertEquals("incorrect id", IID, intent.getId());
assertEquals("incorrect match", MATCH, intent.getTrafficSelector());
assertEquals("incorrect ingress", P1, intent.getIngressPort());
assertEquals("incorrect egress", PS2, intent.getEgressPorts());
assertEquals("incorrect id", IID, intent.id());
assertEquals("incorrect match", MATCH, intent.selector());
assertEquals("incorrect ingress", P1, intent.ingressPoint());
assertEquals("incorrect egress", PS2, intent.egressPoints());
}
@Override
......
......@@ -40,14 +40,14 @@ import com.google.common.collect.Lists;
@Component(immediate = true)
@Service
public class FlowRuleManager
extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
implements FlowRuleService, FlowRuleProviderRegistry {
extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
implements FlowRuleService, FlowRuleProviderRegistry {
public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
private final Logger log = getLogger(getClass());
private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
listenerRegistry = new AbstractListenerRegistry<>();
listenerRegistry = new AbstractListenerRegistry<>();
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
......@@ -75,6 +75,11 @@ implements FlowRuleService, FlowRuleProviderRegistry {
}
@Override
public int getFlowRuleCount() {
return store.getFlowRuleCount();
}
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
return store.getFlowEntries(deviceId);
}
......@@ -98,15 +103,17 @@ implements FlowRuleService, FlowRuleProviderRegistry {
for (int i = 0; i < flowRules.length; i++) {
f = flowRules[i];
device = deviceService.getDevice(f.deviceId());
frp = getProvider(device.providerId());
store.deleteFlowRule(f);
frp.removeFlowRule(f);
if (device != null) {
frp = getProvider(device.providerId());
frp.removeFlowRule(f);
}
}
}
@Override
public void removeFlowRulesById(ApplicationId id) {
Iterable<FlowRule> rules = getFlowRulesById(id);
Iterable<FlowRule> rules = getFlowRulesById(id);
FlowRuleProvider frp;
Device device;
......@@ -140,8 +147,8 @@ implements FlowRuleService, FlowRuleProviderRegistry {
}
private class InternalFlowRuleProviderService
extends AbstractProviderService<FlowRuleProvider>
implements FlowRuleProviderService {
extends AbstractProviderService<FlowRuleProvider>
implements FlowRuleProviderService {
protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
super(provider);
......@@ -160,16 +167,16 @@ implements FlowRuleService, FlowRuleProviderRegistry {
FlowRuleProvider frp = getProvider(device.providerId());
FlowRuleEvent event = null;
switch (stored.state()) {
case ADDED:
case PENDING_ADD:
case ADDED:
case PENDING_ADD:
frp.applyFlowRule(stored);
break;
case PENDING_REMOVE:
case REMOVED:
event = store.removeFlowRule(stored);
break;
default:
break;
break;
case PENDING_REMOVE:
case REMOVED:
event = store.removeFlowRule(stored);
break;
default:
break;
}
if (event != null) {
......@@ -186,17 +193,17 @@ implements FlowRuleService, FlowRuleProviderRegistry {
FlowRuleProvider frp = getProvider(device.providerId());
FlowRuleEvent event = null;
switch (flowRule.state()) {
case PENDING_REMOVE:
case REMOVED:
event = store.removeFlowRule(flowRule);
frp.removeFlowRule(flowRule);
break;
case ADDED:
case PENDING_ADD:
frp.applyFlowRule(flowRule);
break;
default:
log.debug("Flow {} has not been installed.", flowRule);
case PENDING_REMOVE:
case REMOVED:
event = store.removeFlowRule(flowRule);
frp.removeFlowRule(flowRule);
break;
case ADDED:
case PENDING_ADD:
frp.applyFlowRule(flowRule);
break;
default:
log.debug("Flow {} has not been installed.", flowRule);
}
if (event != null) {
......
......@@ -71,11 +71,11 @@ public class HostToHostIntentCompiler
private Intent createPathIntent(Path path, Host src, Host dst,
HostToHostIntent intent) {
TrafficSelector selector = builder(intent.getTrafficSelector())
TrafficSelector selector = builder(intent.selector())
.matchEthSrc(src.mac()).matchEthDst(dst.mac()).build();
return new PathIntent(intentIdGenerator.getNewId(),
selector, intent.getTrafficTreatment(),
selector, intent.treatment(),
path.src(), path.dst(), path);
}
......
......@@ -19,12 +19,15 @@ import org.onlab.onos.net.topology.TopologyService;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -34,7 +37,7 @@ import static org.slf4j.LoggerFactory.getLogger;
*/
@Component
@Service
public class FlowTracker implements FlowTrackerService {
public class ObjectiveTracker implements ObjectiveTrackerService {
private final Logger log = getLogger(getClass());
......@@ -110,19 +113,26 @@ public class FlowTracker implements FlowTrackerService {
@Override
public void run() {
if (event.reasons() == null) {
delegate.bumpIntents(intentsByLink.values());
delegate.triggerCompile(new HashSet<IntentId>(), true);
} else {
Set<IntentId> toBeRecompiled = new HashSet<>();
boolean recompileOnly = true;
// Scan through the list of reasons and keep accruing all
// intents that need to be recompiled.
for (Event reason : event.reasons()) {
if (reason instanceof LinkEvent) {
LinkEvent linkEvent = (LinkEvent) reason;
if (linkEvent.type() == LinkEvent.Type.LINK_ADDED ||
linkEvent.type() == LinkEvent.Type.LINK_UPDATED) {
delegate.bumpIntents(intentsByLink.get(new LinkKey(linkEvent.subject())));
} else if (linkEvent.type() == LinkEvent.Type.LINK_REMOVED) {
delegate.failIntents(intentsByLink.get(new LinkKey(linkEvent.subject())));
if (linkEvent.type() == LINK_REMOVED) {
Set<IntentId> intentIds = intentsByLink.get(new LinkKey(linkEvent.subject()));
toBeRecompiled.addAll(intentIds);
}
recompileOnly = recompileOnly && linkEvent.type() == LINK_REMOVED;
}
}
delegate.triggerCompile(toBeRecompiled, !recompileOnly);
}
}
}
......
......@@ -9,7 +9,7 @@ import java.util.Collection;
* Auxiliary service for tracking intent path flows and for notifying the
* intent service of environment changes via topology change delegate.
*/
public interface FlowTrackerService {
public interface ObjectiveTrackerService {
/**
* Sets a topology change delegate.
......
package org.onlab.onos.net.intent.impl;
import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
......@@ -21,6 +22,7 @@ import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.IntentExtensionService;
import org.onlab.onos.net.intent.IntentInstaller;
import org.onlab.onos.net.intent.PathIntent;
import org.slf4j.Logger;
/**
* Installer for {@link PathIntent path connectivity intents}.
......@@ -28,6 +30,8 @@ import org.onlab.onos.net.intent.PathIntent;
@Component(immediate = true)
public class PathIntentInstaller implements IntentInstaller<PathIntent> {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentExtensionService intentManager;
......@@ -49,8 +53,8 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
@Override
public void install(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.getTrafficSelector());
Iterator<Link> links = intent.getPath().links().iterator();
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
while (links.hasNext()) {
......@@ -70,8 +74,8 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
@Override
public void uninstall(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.getTrafficSelector());
Iterator<Link> links = intent.getPath().links().iterator();
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
while (links.hasNext()) {
......@@ -82,6 +86,7 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
......
......@@ -9,18 +9,14 @@ public interface TopologyChangeDelegate {
/**
* Notifies that topology has changed in such a way that the specified
* intents should be recompiled.
* intents should be recompiled. If the {@code compileAllFailed} parameter
* is true, then all intents in {@link org.onlab.onos.net.intent.IntentState#FAILED}
* state should be compiled as well.
*
* @param intentIds intents that should be recompiled
* @param compileAllFailed true implies full compile of all failed intents
* is required; false for selective recompile only
*/
void bumpIntents(Iterable<IntentId> intentIds);
/**
* Notifies that topology has changed in such a way that the specified
* intents should be marked failed and then recompiled.
*
* @param intentIds intents that should be failed and recompiled
*/
void failIntents(Iterable<IntentId> intentIds);
void triggerCompile(Iterable<IntentId> intentIds, boolean compileAllFailed);
}
......
......@@ -36,8 +36,6 @@ import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
......@@ -95,7 +93,6 @@ public class DistributedDeviceManagerTest {
private DistributedDeviceStore dstore;
private TestMastershipManager masterManager;
private EventDeliveryService eventService;
private KryoSerializationManager serializationMgr;
@Before
public void setUp() {
......@@ -111,10 +108,7 @@ public class DistributedDeviceManagerTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
dstore = new TestDistributedDeviceStore(storeManager, serializationMgr);
dstore = new TestDistributedDeviceStore(storeManager);
dstore.activate();
mgr.store = dstore;
......@@ -140,7 +134,6 @@ public class DistributedDeviceManagerTest {
mgr.deactivate();
dstore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -306,10 +299,8 @@ public class DistributedDeviceManagerTest {
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
public TestDistributedDeviceStore(StoreService storeService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
......
package org.onlab.onos.store.cluster.messaging.impl;
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
public final class ClusterMessageSubjects {
public final class ClusterManagementMessageSubjects {
// avoid instantiation
private ClusterMessageSubjects() {}
private ClusterManagementMessageSubjects() {}
public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
}
......
package org.onlab.onos.store.cluster.messaging.impl;
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.ControllerNode;
......
package org.onlab.onos.store.cluster.messaging.impl;
package org.onlab.onos.store.cluster.impl;
public enum ClusterMembershipEventType {
NEW_MEMBER,
......
......@@ -11,14 +11,15 @@ public class ClusterMessage {
private final NodeId sender;
private final MessageSubject subject;
private final Object payload;
private final byte[] payload;
// TODO: add field specifying Serializer for payload
/**
* Creates a cluster message.
*
* @param subject message subject
*/
public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) {
public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) {
this.sender = sender;
this.subject = subject;
this.payload = payload;
......@@ -47,7 +48,7 @@ public class ClusterMessage {
*
* @return message payload.
*/
public Object payload() {
public byte[] payload() {
return payload;
}
}
......
package org.onlab.onos.store.cluster.messaging;
/**
* Service for encoding &amp; decoding intra-cluster messages.
* Service for encoding &amp; decoding intra-cluster message payload.
*/
public interface SerializationService {
......@@ -11,7 +11,7 @@ public interface SerializationService {
* @param buffer byte buffer with message(s)
* @return parsed message
*/
Object decode(byte[] data);
<T> T decode(byte[] data);
/**
* Encodes the specified message into the given byte buffer.
......
......@@ -12,17 +12,20 @@ import java.util.TimerTask;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoPool;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
......@@ -44,16 +47,35 @@ public class ClusterCommunicationManager
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
// TODO: This probably should not be a OSGi service.
//@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
.register(ClusterMessage.class)
.register(ClusterMembershipEvent.class)
.build()
.populate(1);
}
};
@Activate
public void activate() {
// TODO: initialize messagingService
// TODO: setPayloadSerializer, which is capable of
// (1) serialize ClusterMessage - ClusterMessage.payload
// (2) serialize ClusterMessage.payload using user specified serializer
// messagingService.setPayloadSerializer(...);
log.info("Started");
}
@Deactivate
public void deactivate() {
// TODO: cleanup messageingService if needed.
log.info("Stopped");
}
......@@ -85,7 +107,7 @@ public class ClusterCommunicationManager
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
messagingService.sendAsync(nodeEp, message.subject().value(), message);
messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
return true;
} catch (IOException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
......@@ -119,7 +141,7 @@ public class ClusterCommunicationManager
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
members.remove(node.id());
}
......@@ -131,7 +153,7 @@ public class ClusterCommunicationManager
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
}
}
......@@ -140,7 +162,7 @@ public class ClusterCommunicationManager
@Override
public void handle(ClusterMessage message) {
ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
ControllerNode node = event.node();
if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
log.info("Node {} sent a hearbeat", node.id());
......@@ -165,7 +187,8 @@ public class ClusterCommunicationManager
@Override
public void handle(Message message) {
handler.handle((ClusterMessage) message.payload());
ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
handler.handle(clusterMessage);
}
}
}
......
package org.onlab.onos.store.cluster.impl;
package org.onlab.onos.store.cluster.messaging.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -52,7 +52,7 @@ public class MessageSerializer implements SerializationService {
@Override
public Object decode(byte[] data) {
public <T> T decode(byte[] data) {
return serializerPool.deserialize(data);
}
......
package org.onlab.onos.store.cluster.messaging;
package org.onlab.onos.store.common.impl;
import java.util.Map;
import java.util.Set;
......
......@@ -8,7 +8,7 @@ import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
// TODO: Handle Port as part of these messages, or separate messages for Ports?
......
......@@ -10,7 +10,7 @@ import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
import org.onlab.onos.store.common.impl.AntiEntropyReply;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
......
......@@ -4,6 +4,7 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate;
......@@ -12,6 +13,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
......@@ -33,10 +35,18 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
......@@ -96,8 +106,35 @@ public class GossipDeviceStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
.register(InternalDeviceEvent.class)
.register(InternalPortEvent.class)
.register(InternalPortStatusEvent.class)
.register(Timestamped.class)
.register(MastershipBasedTimestamp.class)
.build()
.populate(1);
}
};
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
clusterCommunicator.addSubscriber(
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
log.info("Started");
}
......@@ -133,8 +170,14 @@ public class GossipDeviceStore
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
if (event != null) {
// FIXME: broadcast deltaDesc, UP
log.debug("broadcast deltaDesc");
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a device update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
return event;
}
......@@ -298,19 +341,21 @@ public class GossipDeviceStore
List<PortDescription> portDescriptions) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
for (PortDescription e : portDescriptions) {
deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
}
Timestamped<List<PortDescription>> timestampedPortDescriptions =
new Timestamped<>(portDescriptions, newTimestamp);
List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
new Timestamped<>(portDescriptions, newTimestamp));
List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
if (!events.isEmpty()) {
// FIXME: broadcast deltaDesc, UP
log.debug("broadcast deltaDesc");
log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
} catch (IOException e) {
log.error("Failed to notify peers of a port update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
return events;
}
private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
......@@ -437,8 +482,14 @@ public class GossipDeviceStore
final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
if (event != null) {
// FIXME: broadcast deltaDesc
log.debug("broadcast deltaDesc");
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a port status update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
}
}
return event;
}
......@@ -749,4 +800,70 @@ public class GossipDeviceStore
return portDescs.put(newOne.value().portNumber(), newOne);
}
}
private void notifyPeers(InternalDeviceEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalPortEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GossipDeviceStoreMessageSubjects.PORT_UPDATE,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private void notifyPeers(InternalPortStatusEvent event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
private class InternalDeviceEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received device update event from peer: {}", message.sender());
InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
}
}
private class InternalPortEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received port update event from peer: {}", message.sender());
InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
updatePortsInternal(providerId, deviceId, portDescriptions);
}
}
private class InternalPortStatusEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received port status update event from peer: {}", message.sender());
InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
DeviceId deviceId = event.deviceId();
Timestamped<PortDescription> portDescription = event.portDescription();
updatePortStatusInternal(providerId, deviceId, portDescription);
}
}
}
......
package org.onlab.onos.store.device.impl;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
/**
* MessageSubjects used by GossipDeviceStore.
*/
public final class GossipDeviceStoreMessageSubjects {
private GossipDeviceStoreMessageSubjects() {}
public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
}
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
/**
* Information published by GossipDeviceStore to notify peers of a device
* change event.
*/
public class InternalDeviceEvent {
private final ProviderId providerId;
private final DeviceId deviceId;
private final Timestamped<DeviceDescription> deviceDescription;
protected InternalDeviceEvent(
ProviderId providerId,
DeviceId deviceId,
Timestamped<DeviceDescription> deviceDescription) {
this.providerId = providerId;
this.deviceId = deviceId;
this.deviceDescription = deviceDescription;
}
public DeviceId deviceId() {
return deviceId;
}
public ProviderId providerId() {
return providerId;
}
public Timestamped<DeviceDescription> deviceDescription() {
return deviceDescription;
}
}
package org.onlab.onos.store.device.impl;
import java.util.List;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
/**
* Information published by GossipDeviceStore to notify peers of a port
* change event.
*/
public class InternalPortEvent {
private final ProviderId providerId;
private final DeviceId deviceId;
private final Timestamped<List<PortDescription>> portDescriptions;
protected InternalPortEvent(
ProviderId providerId,
DeviceId deviceId,
Timestamped<List<PortDescription>> portDescriptions) {
this.providerId = providerId;
this.deviceId = deviceId;
this.portDescriptions = portDescriptions;
}
public DeviceId deviceId() {
return deviceId;
}
public ProviderId providerId() {
return providerId;
}
public Timestamped<List<PortDescription>> portDescriptions() {
return portDescriptions;
}
}
package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
/**
* Information published by GossipDeviceStore to notify peers of a port
* status change event.
*/
public class InternalPortStatusEvent {
private final ProviderId providerId;
private final DeviceId deviceId;
private final Timestamped<PortDescription> portDescription;
protected InternalPortStatusEvent(
ProviderId providerId,
DeviceId deviceId,
Timestamped<PortDescription> portDescription) {
this.providerId = providerId;
this.deviceId = deviceId;
this.portDescription = portDescription;
}
public DeviceId deviceId() {
return deviceId;
}
public ProviderId providerId() {
return providerId;
}
public Timestamped<PortDescription> portDescription() {
return portDescription;
}
}
......@@ -58,6 +58,11 @@ public class DistributedFlowRuleStore
@Override
public int getFlowRuleCount() {
return flowEntries.size();
}
@Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
for (FlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
public ClusterMessageSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, ClusterMessage message) {
kryo.writeClassAndObject(output, message.sender());
kryo.writeClassAndObject(output, message.subject());
output.writeInt(message.payload().length);
output.writeBytes(message.payload());
}
@Override
public ClusterMessage read(Kryo kryo, Input input,
Class<ClusterMessage> type) {
NodeId sender = (NodeId) kryo.readClassAndObject(input);
MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
int payloadSize = input.readInt();
byte[] payload = input.readBytes(payloadSize);
return new ClusterMessage(sender, subject, payload);
}
}
\ No newline at end of file
......@@ -7,6 +7,7 @@ import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
import org.onlab.onos.store.cluster.messaging.impl.MessageSerializer;
import org.onlab.netty.NettyMessagingService;
import org.onlab.packet.IpPrefix;
......
......@@ -5,6 +5,7 @@ import static org.onlab.onos.net.Device.Type.SWITCH;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
......@@ -19,6 +20,11 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Annotations;
......@@ -37,6 +43,11 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.packet.IpPrefix;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
......@@ -105,7 +116,10 @@ public class GossipDeviceStoreTest {
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
gossipDeviceStore = new TestGossipDeviceStore(clockService);
ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
ClusterService clusterService = new TestClusterService();
gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
gossipDeviceStore.activate();
deviceStore = gossipDeviceStore;
}
......@@ -541,8 +555,65 @@ public class GossipDeviceStoreTest {
private static final class TestGossipDeviceStore extends GossipDeviceStore {
public TestGossipDeviceStore(ClockService clockService) {
public TestGossipDeviceStore(
ClockService clockService,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator) {
this.clockService = clockService;
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
}
}
private static final class TestClusterCommunicationService implements ClusterCommunicationService {
@Override
public boolean broadcast(ClusterMessage message) throws IOException { return true; }
@Override
public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
@Override
public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
}
private static final class TestClusterService implements ClusterService {
private static final ControllerNode ONOS1 =
new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
public TestClusterService() {
nodes.put(new NodeId("N1"), ONOS1);
nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
}
@Override
public ControllerNode getLocalNode() {
return ONOS1;
}
@Override
public Set<ControllerNode> getNodes() {
return Sets.newHashSet(nodes.values());
}
@Override
public ControllerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
@Override
public State getState(NodeId nodeId) {
return nodeStates.get(nodeId);
}
@Override
public void addListener(ClusterEventListener listener) {
}
@Override
public void removeListener(ClusterEventListener listener) {
}
}
}
......
......@@ -57,7 +57,7 @@ public class DistributedClusterStore
rawNodes = theInstance.getMap("nodes");
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawNodes);
= new OptionalCacheLoader<>(serializer, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
......
......@@ -52,7 +52,7 @@ implements MastershipStore {
rawMasters = theInstance.getMap("masters");
OptionalCacheLoader<DeviceId, NodeId> nodeLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
= new OptionalCacheLoader<>(serializer, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
......
......@@ -15,7 +15,8 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.Serializer;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -24,7 +25,7 @@ import static org.slf4j.LoggerFactory.getLogger;
/**
* Abstraction of a distributed store based on Hazelcast.
*/
@Component(componentAbstract = true)
@Component
public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
extends AbstractStore<E, D> {
......@@ -33,13 +34,13 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected KryoSerializationService kryoSerializationService;
protected Serializer serializer;
protected HazelcastInstance theInstance;
@Activate
public void activate() {
serializer = new KryoSerializer();
theInstance = storeService.getHazelcastInstance();
}
......@@ -50,7 +51,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return serialized object
*/
protected byte[] serialize(Object obj) {
return kryoSerializationService.serialize(obj);
return serializer.encode(obj);
}
/**
......@@ -61,7 +62,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return deserialized object
*/
protected <T> T deserialize(byte[] bytes) {
return kryoSerializationService.deserialize(bytes);
return serializer.decode(bytes);
}
......
......@@ -2,7 +2,7 @@ package org.onlab.onos.store.common;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.onos.store.serializers.Serializer;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
......@@ -18,28 +18,28 @@ import com.hazelcast.core.IMap;
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
private final KryoSerializationService kryoSerializationService;
private final Serializer serializer;
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
* @param kryoSerializationService to use for serialization
* @param serializer to use for serialization
* @param rawMap underlying IMap
*/
public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) {
this.kryoSerializationService = checkNotNull(kryoSerializationService);
public OptionalCacheLoader(Serializer serializer, IMap<byte[], byte[]> rawMap) {
this.serializer = checkNotNull(serializer);
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
byte[] keyBytes = kryoSerializationService.serialize(key);
byte[] keyBytes = serializer.encode(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
V dev = kryoSerializationService.deserialize(valBytes);
V dev = serializer.decode(valBytes);
return Optional.of(dev);
}
}
......
......@@ -88,7 +88,7 @@ public class DistributedDeviceStore
// TODO decide on Map name scheme to avoid collision
rawDevices = theInstance.getMap("devices");
final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawDevices);
= new OptionalCacheLoader<>(serializer, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
......@@ -98,7 +98,7 @@ public class DistributedDeviceStore
rawDevicePorts = theInstance.getMap("devicePorts");
final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts);
= new OptionalCacheLoader<>(serializer, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
......
......@@ -58,6 +58,11 @@ public class DistributedFlowRuleStore
@Override
public int getFlowRuleCount() {
return flowEntries.size();
}
@Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
for (FlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
......
......@@ -71,7 +71,7 @@ public class DistributedLinkStore
// TODO decide on Map name scheme to avoid collision
rawLinks = theInstance.getMap("links");
final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawLinks);
= new OptionalCacheLoader<>(serializer, rawLinks);
links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
// refresh/populate cache based on notification from other instance
linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
......
......@@ -36,9 +36,6 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
......@@ -63,7 +60,6 @@ public class DistributedDeviceStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3);
private DistributedDeviceStore deviceStore;
private KryoSerializationManager serializationMgr;
private StoreManager storeManager;
......@@ -85,10 +81,7 @@ public class DistributedDeviceStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr);
deviceStore = new TestDistributedDeviceStore(storeManager);
deviceStore.activate();
}
......@@ -96,8 +89,6 @@ public class DistributedDeviceStoreTest {
public void tearDown() throws Exception {
deviceStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -392,10 +383,8 @@ public class DistributedDeviceStoreTest {
}
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
public TestDistributedDeviceStore(StoreService storeService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
......@@ -30,9 +30,6 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
......@@ -51,7 +48,6 @@ public class DistributedLinkStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3);
private StoreManager storeManager;
private KryoSerializationManager serializationMgr;
private DistributedLinkStore linkStore;
......@@ -71,17 +67,13 @@ public class DistributedLinkStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
linkStore = new TestDistributedLinkStore(storeManager, serializationMgr);
linkStore = new TestDistributedLinkStore(storeManager);
linkStore.activate();
}
@After
public void tearDown() throws Exception {
linkStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -361,10 +353,8 @@ public class DistributedLinkStoreTest {
class TestDistributedLinkStore extends DistributedLinkStore {
TestDistributedLinkStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
TestDistributedLinkStore(StoreService storeService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.serializers;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import org.onlab.onos.cluster.ControllerNode;
......@@ -21,6 +22,8 @@ import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
......@@ -47,6 +50,7 @@ public final class KryoPoolUtil {
.register(
//
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
//
ControllerNode.State.class,
......@@ -54,8 +58,10 @@ public final class KryoPoolUtil {
DefaultAnnotations.class,
DefaultControllerNode.class,
DefaultDevice.class,
DefaultDeviceDescription.class,
MastershipRole.class,
Port.class,
DefaultPortDescription.class,
Element.class,
Link.Type.class
)
......
package org.onlab.onos.store.serializers;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -11,25 +7,16 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
/**
* Serialization service using Kryo.
* Serializer implementation using Kryo.
*/
@Component(immediate = true)
@Service
public class KryoSerializationManager implements KryoSerializationService {
public class KryoSerializer implements Serializer {
private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool;
protected KryoPool serializerPool;
@Activate
public void activate() {
public KryoSerializer() {
setupKryoPool();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
......@@ -43,12 +30,12 @@ public class KryoSerializationManager implements KryoSerializationService {
}
@Override
public byte[] serialize(final Object obj) {
public byte[] encode(final Object obj) {
return serializerPool.serialize(obj);
}
@Override
public <T> T deserialize(final byte[] bytes) {
public <T> T decode(final byte[] bytes) {
if (bytes == null) {
return null;
}
......@@ -56,12 +43,12 @@ public class KryoSerializationManager implements KryoSerializationService {
}
@Override
public void serialize(Object obj, ByteBuffer buffer) {
public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
@Override
public <T> T deserialize(ByteBuffer buffer) {
public <T> T decode(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
......
......@@ -6,41 +6,37 @@ import java.nio.ByteBuffer;
/**
* Service to serialize Objects into byte array.
*/
public interface KryoSerializationService {
public interface Serializer {
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
* Serializes the specified object into bytes.
*
* @param obj object to be serialized
* @return serialized bytes
*/
public byte[] serialize(final Object obj);
public byte[] encode(final Object obj);
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
* Serializes the specified object into bytes.
*
* @param obj object to be serialized
* @param buffer to write serialized bytes
*/
public void serialize(final Object obj, ByteBuffer buffer);
public void encode(final Object obj, ByteBuffer buffer);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
* Deserializes the specified bytes into an object.
*
* @param bytes bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final byte[] bytes);
public <T> T decode(final byte[] bytes);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
* Deserializes the specified bytes into an object.
*
* @param buffer bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final ByteBuffer buffer);
public <T> T decode(final ByteBuffer buffer);
}
......
......@@ -57,6 +57,11 @@ public class SimpleFlowRuleStore
@Override
public int getFlowRuleCount() {
return flowEntries.size();
}
@Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
for (FlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
......@@ -98,6 +103,7 @@ public class SimpleFlowRuleStore
public synchronized void deleteFlowRule(FlowRule rule) {
FlowEntry entry = getFlowEntry(rule);
if (entry == null) {
//log.warn("Cannot find rule {}", rule);
return;
}
entry.setState(FlowEntryState.PENDING_REMOVE);
......@@ -120,7 +126,7 @@ public class SimpleFlowRuleStore
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
flowEntries.put(did, rule);
//flowEntries.put(did, rule);
return null;
}
......
package org.onlab.onos.store.trivial.impl;
import static org.onlab.onos.net.intent.IntentState.COMPILED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -21,13 +15,18 @@ import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.onlab.onos.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class SimpleIntentStore
extends AbstractStore<IntentEvent, IntentStoreDelegate>
implements IntentStore {
extends AbstractStore<IntentEvent, IntentStoreDelegate>
implements IntentStore {
private final Logger log = getLogger(getClass());
private final Map<IntentId, Intent> intents = new HashMap<>();
......@@ -46,7 +45,7 @@ public class SimpleIntentStore
@Override
public IntentEvent createIntent(Intent intent) {
intents.put(intent.getId(), intent);
intents.put(intent.id(), intent);
return this.setState(intent, IntentState.SUBMITTED);
}
......@@ -54,7 +53,7 @@ public class SimpleIntentStore
public IntentEvent removeIntent(IntentId intentId) {
Intent intent = intents.remove(intentId);
installable.remove(intentId);
IntentEvent event = this.setState(intent, IntentState.WITHDRAWN);
IntentEvent event = this.setState(intent, WITHDRAWN);
states.remove(intentId);
return event;
}
......@@ -79,19 +78,21 @@ public class SimpleIntentStore
return states.get(id);
}
// TODO return dispatch event here... replace with state transition methods
@Override
public IntentEvent setState(Intent intent, IntentState newState) {
IntentId id = intent.getId();
IntentState oldState = states.get(id);
states.put(id, newState);
return new IntentEvent(intent, newState, oldState, System.currentTimeMillis());
public IntentEvent setState(Intent intent, IntentState state) {
IntentId id = intent.id();
states.put(id, state);
IntentEvent.Type type = (state == SUBMITTED ? IntentEvent.Type.SUBMITTED :
(state == INSTALLED ? IntentEvent.Type.INSTALLED :
(state == FAILED ? IntentEvent.Type.FAILED :
state == WITHDRAWN ? IntentEvent.Type.WITHDRAWN :
null)));
return type == null ? null : new IntentEvent(type, intent);
}
@Override
public IntentEvent addInstallableIntents(IntentId intentId, List<InstallableIntent> result) {
public void addInstallableIntents(IntentId intentId, List<InstallableIntent> result) {
installable.put(intentId, result);
return this.setState(intents.get(intentId), COMPILED);
}
@Override
......
......@@ -53,6 +53,8 @@
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onlab-netty/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-core-hazelcast" version="1.0.0"
......
......@@ -33,6 +33,7 @@ alias obs='onos-build-selective'
alias op='onos-package'
alias ot='onos-test'
alias ol='onos-log'
alias pub='onos-push-update-bundle'
# Short-hand for tailing the ONOS (karaf) log
alias tl='$ONOS_ROOT/tools/dev/bin/onos-local-log'
......@@ -89,5 +90,5 @@ function spy {
}
function nuke {
spy | cut -c7-11 | xargs kill
spy "$@" | cut -c7-11 | xargs kill
}
......
......@@ -32,6 +32,10 @@ ssh $remote "
# Remove any previous ON.Lab bits from ~/.m2 repo
rm -fr ~/.m2/repository/org/onlab
# Drop log level for the console
echo "log4j.logger.org.apache.sshd = WARN" >> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/org.ops4j.pax.logging.cfg
"
# Configure the ONOS installation
......
......@@ -15,7 +15,7 @@ name=${2:-onos-1}
ssh $remote "
sudo perl -pi.bak -e \"s/127.0.1.1.*/127.0.1.1 $name/g\" /etc/hosts
sudo perl -pi.bak -e \"local \$/ = ''; s/.*/$name/g\" /etc/hostname
sudo bash -c \"echo $name >/etc/hostname\"
sudo hostname $name
" 2>/dev/null
......
......@@ -9,5 +9,9 @@
remote=$ONOS_USER@${1:-$OCI}
scp -q ~/.ssh/id_rsa.pub $remote:/tmp
ssh $remote "cat /tmp/id_rsa.pub >> ~/.ssh/authorized_keys"
ssh $remote "
cat /tmp/id_rsa.pub >> ~/.ssh/authorized_keys
sort -u ~/.ssh/authorized_keys > ~/.ssh/authorized_keys.bak
mv ~/.ssh/authorized_keys.bak ~/.ssh/authorized_keys
"
ssh -n -o PasswordAuthentication=no $remote true
......
#!/bin/bash
#-------------------------------------------------------------------------------
# Pushes the specified bundle to the remote ONOS cell machines and updates it.
#-------------------------------------------------------------------------------
[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
. $ONOS_ROOT/tools/build/envDefaults
cd ~/.m2/repository
jar=$(find org/onlab -type f -name '*.jar' | grep $1 | grep -v -e -tests | head -n 1)
[ -z "$jar" ] && echo "No bundle $1 found for" && exit 1
bundle=$(echo $(basename $jar .jar) | sed 's/-[0-9].*//g')
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
for node in $nodes; do
scp -q $jar $ONOS_USER@$node:$ONOS_INSTALL_DIR/$KARAF_DIST/system/$jar
ssh $ONOS_USER@$node "ls -l $ONOS_INSTALL_DIR/$KARAF_DIST/system/$jar"
ssh $ONOS_USER@$node "$ONOS_INSTALL_DIR/bin/onos \"bundle:update -f $bundle\"" 2>/dev/null
done
#!/usr/bin/env python
from mininet.cli import CLI
from mininet.net import Mininet
from mininet.node import RemoteController, OVSKernelSwitch
MAC = 12
DPID = 16
def string_to_hex(s, length):
""" Convert a string like 00:00 in to hex 0x0000 format"""
tmp = '{0:#x}'.format(int(s.replace(':', '').lstrip('0'),length))
return tmp
def hex_to_string(h, length):
"""Convert a hex number from 0x0000 to 00:00 format"""
tmp = h.lstrip('0x').zfill(length)
tmp = ':'.join(a+b for a,b in zip(tmp[::2], tmp[1::2]))
return tmp
class Tower(object):
""" Create a tower topology from semi-scratch in Mininet """
def __init__(self, cname='flare', cip='15.255.126.183', k=4, h=6,
proto=None):
"""Create tower topology for mininet
cname: controller name
cip: controller ip
k: number of leaf switches
h: number of hosts perl leaf switch
"""
# We are creating the controller with local-loopback on purpose to avoid
# having the switches connect immediately. Instead, we'll set controller
# explicitly for each switch after configuring it as we want.
self.flare = RemoteController(cname, '127.0.0.1', 6633)
self.net = Mininet(controller=self.flare, switch = OVSKernelSwitch,
build=False)
self.cip = cip
self.spines = []
self.leaves = []
self.hosts = []
self.proto = proto
# Create the two spine switches
self.spines.append(self.net.addSwitch('s1'))
self.spines.append(self.net.addSwitch('s2'))
# Create two links between the spine switches
self.net.addLink(self.spines[0], self.spines[1])
self.net.addLink(self.spines[1], self.spines[0])
# Now create the leaf switches, their hosts and connect them together
i = 1
c = 0
while i <= k:
self.leaves.append(self.net.addSwitch('s1%d' % i))
for spine in self.spines:
self.net.addLink(self.leaves[i-1], spine)
j = 1
while j <= h:
self.hosts.append(self.net.addHost('h%d%d' % (i, j)))
self.net.addLink(self.hosts[c], self.leaves[i-1])
j+=1
c+=1
i+=1
def run(self):
""" Runs the created network topology and launches mininet cli"""
self.run_silent()
CLI(self.net)
self.net.stop()
def run_silent(self):
""" Runs silently - for unit testing """
self.net.build()
# Start the switches, configure them with desired protocols and only
# then set the controller
for sw in self.spines:
sw.start([self.flare])
if self.proto:
sw.cmd('ovs-vsctl set bridge %(sw)s protocols=%(proto)s' % \
{ 'sw': sw.name, 'proto': self.proto})
sw.cmdPrint('ovs-vsctl set-controller %(sw)s tcp:%(ctl)s:6633' % \
{'sw': sw.name, 'ctl': self.cip})
for sw in self.leaves:
sw.start([self.flare])
sw.cmdPrint('ovs-vsctl set-controller %(sw)s tcp:%(ctl)s:6633' % \
{'sw': sw.name, 'ctl': self.cip})
def pingAll(self):
""" PingAll to create flows - for unit testing """
self.net.pingAll()
def stop(self):
"Stops the topology. You should call this after run_silent"
self.net.stop()
......@@ -28,8 +28,9 @@ public final class Timer {
private static synchronized void initTimer() {
if (Timer.timer == null) {
Timer.timer = new HashedWheelTimer();
Timer.timer.start();
HashedWheelTimer hwTimer = new HashedWheelTimer();
hwTimer.start();
Timer.timer = hwTimer;
}
}
......
......@@ -8,16 +8,15 @@ import java.util.concurrent.TimeoutException;
* This class provides a base implementation of Response, with methods to retrieve the
* result and query to see if the result is ready. The result can only be retrieved when
* it is ready and the get methods will block if the result is not ready yet.
* @param <T> type of response.
*/
public class AsyncResponse<T> implements Response<T> {
public class AsyncResponse implements Response {
private T value;
private byte[] value;
private boolean done = false;
private final long start = System.nanoTime();
@Override
public T get(long timeout, TimeUnit timeUnit) throws TimeoutException {
public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException {
timeout = timeUnit.toNanos(timeout);
boolean interrupted = false;
try {
......@@ -43,7 +42,7 @@ public class AsyncResponse<T> implements Response<T> {
}
@Override
public T get() throws InterruptedException {
public byte[] get() throws InterruptedException {
throw new UnsupportedOperationException();
}
......@@ -57,11 +56,10 @@ public class AsyncResponse<T> implements Response<T> {
* available.
* @param data response data.
*/
@SuppressWarnings("unchecked")
public synchronized void setResponse(Object data) {
public synchronized void setResponse(byte[] data) {
if (!done) {
done = true;
value = (T) data;
value = data;
this.notifyAll();
}
}
......
......@@ -5,6 +5,7 @@ import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//FIXME: Should be move out to test or app
/**
* Message handler that echos the message back to the sender.
*/
......
package org.onlab.netty;
import java.util.Objects;
import com.google.common.base.MoreObjects;
/**
* Representation of a TCP/UDP communication end point.
*/
......@@ -32,16 +36,15 @@ public class Endpoint {
@Override
public String toString() {
return "Endpoint [port=" + port + ", host=" + host + "]";
return MoreObjects.toStringHelper(getClass())
.add("port", port)
.add("host", host)
.toString();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + port;
return result;
return Objects.hash(host, port);
}
@Override
......@@ -55,17 +58,8 @@ public class Endpoint {
if (getClass() != obj.getClass()) {
return false;
}
Endpoint other = (Endpoint) obj;
if (host == null) {
if (other.host != null) {
return false;
}
} else if (!host.equals(other.host)) {
return false;
}
if (port != other.port) {
return false;
}
return true;
Endpoint that = (Endpoint) obj;
return Objects.equals(this.port, that.port) &&
Objects.equals(this.host, that.host);
}
}
......
......@@ -8,12 +8,13 @@ import java.io.IOException;
*/
public final class InternalMessage implements Message {
public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
private long id;
private Endpoint sender;
private String type;
private Object payload;
private byte[] payload;
private transient NettyMessagingService messagingService;
public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
// Must be created using the Builder.
private InternalMessage() {}
......@@ -31,7 +32,7 @@ public final class InternalMessage implements Message {
}
@Override
public Object payload() {
public byte[] payload() {
return payload;
}
......@@ -40,7 +41,7 @@ public final class InternalMessage implements Message {
}
@Override
public void respond(Object data) throws IOException {
public void respond(byte[] data) throws IOException {
Builder builder = new Builder(messagingService);
InternalMessage message = builder.withId(this.id)
// FIXME: Sender should be messagingService.localEp.
......@@ -55,7 +56,7 @@ public final class InternalMessage implements Message {
/**
* Builder for InternalMessages.
*/
public static class Builder {
public static final class Builder {
private InternalMessage message;
public Builder(NettyMessagingService messagingService) {
......@@ -77,7 +78,7 @@ public final class InternalMessage implements Message {
message.sender = sender;
return this;
}
public Builder withPayload(Object payload) {
public Builder withPayload(byte[] payload) {
message.payload = payload;
return this;
}
......
......@@ -6,10 +6,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
//FIXME: Should be move out to test or app
/**
* Kryo Serializer.
*/
public class KryoSerializer implements Serializer {
public class KryoSerializer {
private KryoPool serializerPool;
......@@ -27,29 +28,26 @@ public class KryoSerializer implements Serializer {
HashMap.class,
ArrayList.class,
InternalMessage.class,
Endpoint.class
Endpoint.class,
byte[].class
)
.build()
.populate(1);
}
@Override
public <T> T decode(byte[] data) {
return serializerPool.deserialize(data);
}
@Override
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
@Override
public <T> T decode(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
@Override
public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
......
......@@ -12,6 +12,6 @@ public class LoggingHandler implements MessageHandler {
@Override
public void handle(Message message) {
log.info("Received message. Payload: " + message.payload());
log.info("Received message. Payload has {} bytes", message.payload().length);
}
}
......
......@@ -12,12 +12,12 @@ public interface Message {
* Returns the payload of this message.
* @return message payload.
*/
public Object payload();
public byte[] payload();
/**
* Sends a reply back to the sender of this messge.
* Sends a reply back to the sender of this message.
* @param data payload of the response.
* @throws IOException if there is a communication error.
*/
public void respond(Object data) throws IOException;
public void respond(byte[] data) throws IOException;
}
......
......@@ -14,14 +14,14 @@ import java.util.List;
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService;
private final Serializer serializer;
private static final KryoSerializer SERIALIZER = new KryoSerializer();
private int contentLength;
public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
public MessageDecoder(NettyMessagingService messagingService) {
super(DecoderState.READ_HEADER_VERSION);
this.messagingService = messagingService;
this.serializer = serializer;
}
@Override
......@@ -48,7 +48,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
InternalMessage message = serializer.decode(buffer.readBytes(contentLength).nioBuffer());
InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_HEADER_VERSION);
......
......@@ -17,11 +17,7 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
public static final int SERIALIZER_VERSION = 1;
private final Serializer serializer;
public MessageEncoder(Serializer serializer) {
this.serializer = serializer;
}
private static final KryoSerializer SERIALIZER = new KryoSerializer();
@Override
protected void encode(
......@@ -35,12 +31,17 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// write preamble
out.writeBytes(PREAMBLE);
byte[] payload = serializer.encode(message);
try {
SERIALIZER.encode(message);
} catch (Exception e) {
e.printStackTrace();
}
byte[] payload = SERIALIZER.encode(message);
// write payload length
out.writeInt(payload.length);
// write serializer version
// write payloadSerializer version
out.writeInt(SERIALIZER_VERSION);
// write payload.
......
......@@ -11,10 +11,10 @@ public interface MessagingService {
* The message is specified using the type and payload.
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload.
* @param payload message payload bytes.
* @throws IOException
*/
public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Sends a message synchronously and waits for a response.
......@@ -24,7 +24,7 @@ public interface MessagingService {
* @return a response future
* @throws IOException
*/
public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Registers a new message handler for message type.
......@@ -38,4 +38,4 @@ public interface MessagingService {
* @param type message type
*/
public void unregisterHandler(String type);
}
}
\ No newline at end of file
......
......@@ -43,7 +43,7 @@ public class NettyMessagingService implements MessagingService {
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder()
private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
......@@ -52,8 +52,6 @@ public class NettyMessagingService implements MessagingService {
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
protected Serializer serializer;
public NettyMessagingService() {
// TODO: Default port should be configurable.
this(8080);
......@@ -83,7 +81,7 @@ public class NettyMessagingService implements MessagingService {
}
@Override
public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
.withId(RandomUtils.nextLong())
.withSender(localEp)
......@@ -108,9 +106,9 @@ public class NettyMessagingService implements MessagingService {
}
@Override
public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
throws IOException {
AsyncResponse<T> futureResponse = new AsyncResponse<T>();
AsyncResponse futureResponse = new AsyncResponse();
Long messageId = RandomUtils.nextLong();
responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this)
......@@ -133,10 +131,6 @@ public class NettyMessagingService implements MessagingService {
handlers.remove(type);
}
public void setSerializer(Serializer serializer) {
this.serializer = serializer;
}
private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
......@@ -201,13 +195,13 @@ public class NettyMessagingService implements MessagingService {
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(serializer);
private final ChannelHandler encoder = new MessageEncoder();
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
.addLast("decoder", new MessageDecoder(NettyMessagingService.this))
.addLast("handler", dispatcher);
}
}
......@@ -236,12 +230,13 @@ public class NettyMessagingService implements MessagingService {
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try {
AsyncResponse<?> futureResponse =
AsyncResponse futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.setResponse(message.payload());
} else {
log.warn("Received a reply. But was unable to locate the request handle");
}
log.warn("Received a reply. But was unable to locate the request handle");
} finally {
NettyMessagingService.this.responseFutures.invalidate(message.id());
}
......
......@@ -7,26 +7,24 @@ import java.util.concurrent.TimeoutException;
* Response object returned when making synchronous requests.
* Can you used to check is a response is ready and/or wait for a response
* to become available.
*
* @param <T> type of response.
*/
public interface Response<T> {
public interface Response {
/**
* Gets the response waiting for a designated timeout period.
* @param timeout timeout period (since request was sent out)
* @param tu unit of time.
* @return response
* @return response payload
* @throws TimeoutException if the timeout expires before the response arrives.
*/
public T get(long timeout, TimeUnit tu) throws TimeoutException;
public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
/**
* Gets the response waiting for indefinite timeout period.
* @return response
* @return response payload
* @throws InterruptedException if the thread is interrupted before the response arrives.
*/
public T get() throws InterruptedException;
public byte[] get() throws InterruptedException;
/**
* Checks if the response is ready without blocking.
......
package org.onlab.netty;
import java.nio.ByteBuffer;
/**
* Interface for encoding/decoding message payloads.
*/
public interface Serializer {
/**
* Decodes the specified byte array to a POJO.
*
* @param data byte array.
* @return POJO
*/
public <T> T decode(byte[] data);
/**
* Encodes the specified POJO into a byte array.
*
* @param data POJO to be encoded
* @return byte array.
*/
public byte[] encode(Object data);
/**
* Encodes the specified POJO into a byte buffer.
*
* @param data POJO to be encoded
* @param buffer to write serialized bytes
*/
public void encode(final Object data, ByteBuffer buffer);
/**
* Decodes the specified byte buffer to a POJO.
*
* @param buffer bytes to be decoded
* @return POJO
*/
public <T> T decode(final ByteBuffer buffer);
}
......@@ -8,6 +8,7 @@ import org.onlab.metrics.MetricsManager;
import com.codahale.metrics.Timer;
// FIXME: Should be move out to test or app
public final class SimpleClient {
private SimpleClient() {
}
......@@ -23,7 +24,7 @@ public final class SimpleClient {
final int warmup = 100;
for (int i = 0; i < warmup; i++) {
Timer.Context context = sendAsyncTimer.time();
messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
......@@ -32,10 +33,10 @@ public final class SimpleClient {
final int iterations = 1000000;
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response<String> response = messaging
Response response = messaging
.sendAndReceive(new Endpoint("localhost", 8080), "echo",
"Hello World");
System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
"Hello World".getBytes());
System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
......@@ -44,8 +45,6 @@ public final class SimpleClient {
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
Serializer serializer = new KryoSerializer();
this.serializer = serializer;
}
}
}
......
This diff is collapsed. Click to expand it.