Praseed Balakrishnan

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 55 changed files with 1382 additions and 238 deletions
......@@ -23,6 +23,10 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipListener;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
......@@ -50,15 +54,20 @@ public class FooComponent {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService intentService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
private final ClusterEventListener clusterListener = new InnerClusterListener();
private final DeviceListener deviceListener = new InnerDeviceListener();
private final IntentListener intentListener = new InnerIntentListener();
private final MastershipListener mastershipListener = new InnerMastershipListener();
@Activate
public void activate() {
clusterService.addListener(clusterListener);
deviceService.addListener(deviceListener);
intentService.addListener(intentListener);
mastershipService.addListener(mastershipListener);
log.info("Started");
}
......@@ -67,6 +76,7 @@ public class FooComponent {
clusterService.removeListener(clusterListener);
deviceService.removeListener(deviceListener);
intentService.removeListener(intentListener);
mastershipService.removeListener(mastershipListener);
log.info("Stopped");
}
......@@ -100,6 +110,18 @@ public class FooComponent {
log.info(message, event.subject());
}
}
private class InnerMastershipListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
final NodeId myId = clusterService.getLocalNode().id();
if (myId.equals(event.roleInfo().master())) {
log.info("I have control/I wish you luck {}", event);
} else {
log.info("you have control {}", event);
}
}
}
}
......
......@@ -15,14 +15,16 @@
*/
package org.onlab.onos.cli.net;
import java.util.List;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.intent.HostToHostIntent;
import org.onlab.onos.net.intent.IntentService;
......@@ -31,7 +33,7 @@ import org.onlab.onos.net.intent.IntentService;
*/
@Command(scope = "onos", name = "add-host-intent",
description = "Installs host-to-host connectivity intent")
public class AddHostToHostIntentCommand extends AbstractShellCommand {
public class AddHostToHostIntentCommand extends ConnectivityIntentCommand {
@Argument(index = 0, name = "one", description = "One host ID",
required = true, multiValued = false)
......@@ -50,9 +52,11 @@ public class AddHostToHostIntentCommand extends AbstractShellCommand {
TrafficSelector selector = DefaultTrafficSelector.builder().build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
List<Constraint> constraints = buildConstraints();
HostToHostIntent intent = new HostToHostIntent(appId(), oneId, twoId,
selector, treatment);
selector, treatment,
constraints);
service.submit(intent);
}
......
......@@ -23,11 +23,13 @@ import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.onlab.onos.net.DeviceId.deviceId;
......@@ -69,9 +71,11 @@ public class AddMultiPointToSinglePointIntentCommand extends ConnectivityIntentC
TrafficSelector selector = buildTrafficSelector();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
List<Constraint> constraints = buildConstraints();
Intent intent = new MultiPointToSinglePointIntent(appId(), selector, treatment,
ingressPoints, egress);
ingressPoints, egress,
constraints);
service.submit(intent);
}
......
......@@ -15,6 +15,8 @@
*/
package org.onlab.onos.cli.net;
import java.util.List;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.net.ConnectPoint;
......@@ -22,6 +24,7 @@ import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.PointToPointIntent;
......@@ -63,8 +66,10 @@ public class AddPointToPointIntentCommand extends ConnectivityIntentCommand {
TrafficSelector selector = buildTrafficSelector();
TrafficTreatment treatment = builder().build();
List<Constraint> constraints = buildConstraints();
Intent intent = new PointToPointIntent(appId(), selector, treatment,
ingress, egress);
ingress, egress, constraints);
service.submit(intent);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.onlab.onos.cli.net;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.PointToPointIntent;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.PortNumber.portNumber;
import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder;
/**
* Installs point-to-point connectivity intents.
*/
@Command(scope = "onos", name = "add-point-intent-bw",
description = "Installs point-to-point connectivity intent with bandwidth constraint")
public class AddPointToPointIntentWithBandwidthConstraintCommand extends ConnectivityIntentCommand {
@Argument(index = 0, name = "ingressDevice",
description = "Ingress Device/Port Description",
required = true, multiValued = false)
String ingressDeviceString = null;
@Argument(index = 1, name = "egressDevice",
description = "Egress Device/Port Description",
required = true, multiValued = false)
String egressDeviceString = null;
@Argument(index = 2, name = "bandwidth",
description = "Bandwidth",
required = true, multiValued = false)
String bandwidthString = null;
@Override
protected void execute() {
IntentService service = get(IntentService.class);
DeviceId ingressDeviceId = deviceId(getDeviceId(ingressDeviceString));
PortNumber ingressPortNumber = portNumber(getPortNumber(ingressDeviceString));
ConnectPoint ingress = new ConnectPoint(ingressDeviceId, ingressPortNumber);
DeviceId egressDeviceId = deviceId(getDeviceId(egressDeviceString));
PortNumber egressPortNumber = portNumber(getPortNumber(egressDeviceString));
ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber);
long bandwidth = Long.parseLong(bandwidthString);
TrafficSelector selector = buildTrafficSelector();
TrafficTreatment treatment = builder().build();
// FIXME: add bandwitdh constraint
Intent intent = new PointToPointIntent(
appId(), selector, treatment,
ingress, egress);
service.submit(intent);
}
/**
* Extracts the port number portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return port number as a string, empty string if the port is not found
*/
private String getPortNumber(String deviceString) {
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return "";
}
return deviceString.substring(slash + 1, deviceString.length());
}
/**
* Extracts the device ID portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return device ID string
*/
private String getDeviceId(String deviceString) {
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return "";
}
return deviceString.substring(0, slash);
}
}
......@@ -15,14 +15,21 @@
*/
package org.onlab.onos.cli.net;
import java.util.LinkedList;
import java.util.List;
import org.apache.karaf.shell.commands.Option;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.intent.constraint.BandwidthConstraint;
import org.onlab.onos.net.intent.constraint.LambdaConstraint;
import org.onlab.onos.net.resource.Bandwidth;
import org.onlab.packet.Ethernet;
import org.onlab.packet.MacAddress;
import com.google.common.base.Strings;
import static com.google.common.base.Strings.isNullOrEmpty;
/**
* Base class for command line operations for connectivity based intents.
......@@ -41,6 +48,14 @@ public abstract class ConnectivityIntentCommand extends AbstractShellCommand {
required = false, multiValued = false)
private String ethTypeString = "";
@Option(name = "-b", aliases = "--bandwidth", description = "Bandwidth",
required = false, multiValued = false)
private String bandwidthString = "";
@Option(name = "-l", aliases = "--lambda", description = "Lambda",
required = false, multiValued = false)
private boolean lambda = false;
/**
* Constructs a traffic selector based on the command line arguments
* presented to the command.
......@@ -50,21 +65,43 @@ public abstract class ConnectivityIntentCommand extends AbstractShellCommand {
TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder();
Short ethType = Ethernet.TYPE_IPV4;
if (!Strings.isNullOrEmpty(ethTypeString)) {
if (!isNullOrEmpty(ethTypeString)) {
EthType ethTypeParameter = EthType.valueOf(ethTypeString);
ethType = ethTypeParameter.value();
}
selectorBuilder.matchEthType(ethType);
if (!Strings.isNullOrEmpty(srcMacString)) {
if (!isNullOrEmpty(srcMacString)) {
selectorBuilder.matchEthSrc(MacAddress.valueOf(srcMacString));
}
if (!Strings.isNullOrEmpty(dstMacString)) {
if (!isNullOrEmpty(dstMacString)) {
selectorBuilder.matchEthDst(MacAddress.valueOf(dstMacString));
}
return selectorBuilder.build();
}
/**
* Builds the constraint list for this command based on the command line
* parameters.
*
* @return List of constraint objects describing the constraints requested
*/
protected List<Constraint> buildConstraints() {
final List<Constraint> constraints = new LinkedList<>();
// Check for a bandwidth specification
if (!isNullOrEmpty(bandwidthString)) {
final double bandwidthValue = Double.parseDouble(bandwidthString);
constraints.add(new BandwidthConstraint(Bandwidth.valueOf(bandwidthValue)));
}
// Check for a lambda specification
if (lambda) {
constraints.add(new LambdaConstraint(null));
}
return constraints;
}
}
......
......@@ -25,6 +25,7 @@ import org.apache.karaf.shell.commands.Option;
import org.onlab.onos.cli.Comparators;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceService;
import java.util.ArrayList;
......@@ -108,7 +109,7 @@ public class DevicePortsListCommand extends DevicesListCommand {
for (Port port : service.getPorts(device.id())) {
if (isIncluded(port)) {
ports.add(mapper.createObjectNode()
.put("port", port.number().toString())
.put("port", portName(port.number()))
.put("isEnabled", port.isEnabled())
.put("type", port.type().toString().toLowerCase())
.put("portSpeed", port.portSpeed())
......@@ -120,6 +121,10 @@ public class DevicePortsListCommand extends DevicesListCommand {
return result;
}
private String portName(PortNumber port) {
return port.equals(PortNumber.LOCAL) ? "local" : port.toString();
}
// Determines if a port should be included in output.
private boolean isIncluded(Port port) {
return enabled && port.isEnabled() || disabled && !port.isEnabled() ||
......@@ -133,7 +138,8 @@ public class DevicePortsListCommand extends DevicesListCommand {
Collections.sort(ports, Comparators.PORT_COMPARATOR);
for (Port port : ports) {
if (isIncluded(port)) {
print(FMT, port.number(), port.isEnabled() ? "enabled" : "disabled",
print(FMT, portName(port.number()),
port.isEnabled() ? "enabled" : "disabled",
port.type().toString().toLowerCase(), port.portSpeed(),
annotations(port.annotations()));
}
......
......@@ -116,17 +116,6 @@
</optional-completers>
</command>
<command>
<action class="org.onlab.onos.cli.net.AddPointToPointIntentWithBandwidthConstraintCommand"/>
<completers>
<ref component-id="connectPointCompleter"/>
<ref component-id="connectPointCompleter"/>
<null/>
</completers>
<optional-completers>
<entry key="-t" value-ref="ethTypeCompleter"/>
</optional-completers>
</command>
<command>
<action class="org.onlab.onos.cli.net.AddOpticalIntentCommand"/>
<completers>
<ref component-id="connectPointCompleter"/>
......
......@@ -40,6 +40,10 @@
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
</dependency>
</dependencies>
</project>
......
......@@ -105,6 +105,7 @@ public final class HostToHostIntent extends ConnectivityIntent {
.add("appId", appId())
.add("selector", selector())
.add("treatment", treatment())
.add("constraints", constraints())
.add("one", one)
.add("two", two)
.toString();
......
......@@ -22,6 +22,7 @@ import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import java.util.List;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
......@@ -65,6 +66,38 @@ public final class MultiPointToSinglePointIntent extends ConnectivityIntent {
}
/**
* Creates a new multi-to-single point connectivity intent for the specified
* traffic selector and treatment.
*
* @param appId application identifier
* @param selector traffic selector
* @param treatment treatment
* @param ingressPoints set of ports from which ingress traffic originates
* @param egressPoint port to which traffic will egress
* @param constraints constraints to apply to the intent
* @throws NullPointerException if {@code ingressPoints} or
* {@code egressPoint} is null.
* @throws IllegalArgumentException if the size of {@code ingressPoints} is
* not more than 1
*/
public MultiPointToSinglePointIntent(ApplicationId appId,
TrafficSelector selector,
TrafficTreatment treatment,
Set<ConnectPoint> ingressPoints,
ConnectPoint egressPoint,
List<Constraint> constraints) {
super(id(MultiPointToSinglePointIntent.class, selector, treatment,
ingressPoints, egressPoint), appId, null, selector, treatment,
constraints);
checkNotNull(ingressPoints);
checkArgument(!ingressPoints.isEmpty(), "Ingress point set cannot be empty");
this.ingressPoints = Sets.newHashSet(ingressPoints);
this.egressPoint = checkNotNull(egressPoint);
}
/**
* Constructor for serializer.
*/
protected MultiPointToSinglePointIntent() {
......@@ -101,6 +134,7 @@ public final class MultiPointToSinglePointIntent extends ConnectivityIntent {
.add("treatment", treatment())
.add("ingress", ingressPoints())
.add("egress", egressPoint())
.add("constraints", constraints())
.toString();
}
}
......
......@@ -15,6 +15,8 @@
*/
package org.onlab.onos.net.intent;
import java.util.List;
import com.google.common.base.MoreObjects;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.net.Path;
......@@ -46,6 +48,24 @@ public class PathIntent extends ConnectivityIntent {
}
/**
* Creates a new point-to-point intent with the supplied ingress/egress
* ports and using the specified explicit path.
*
* @param appId application identifier
* @param selector traffic selector
* @param treatment treatment
* @param path traversed links
* @param constraints optional list of constraints
* @throws NullPointerException {@code path} is null
*/
public PathIntent(ApplicationId appId, TrafficSelector selector,
TrafficTreatment treatment, Path path, List<Constraint> constraints) {
super(id(PathIntent.class, selector, treatment, path, constraints), appId,
resources(path.links()), selector, treatment, constraints);
this.path = path;
}
/**
* Constructor for serializer.
*/
protected PathIntent() {
......@@ -75,6 +95,7 @@ public class PathIntent extends ConnectivityIntent {
.add("appId", appId())
.add("selector", selector())
.add("treatment", treatment())
.add("constraints", constraints())
.add("path", path)
.toString();
}
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.net.intent.constraint;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import org.onlab.onos.net.ElementId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Path;
import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.resource.LinkResourceService;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Constraint that evaluates elements passed through in order.
*/
public class WaypointConstraint implements Constraint {
private final List<ElementId> waypoints;
/**
* Creates a new waypoint constraint.
*
* @param waypoints waypoints
*/
public WaypointConstraint(ElementId... waypoints) {
checkNotNull(waypoints, "waypoints cannot be null");
checkArgument(waypoints.length > 0, "length of waypoints should be more than 0");
this.waypoints = ImmutableList.copyOf(waypoints);
}
public List<ElementId> waypoints() {
return waypoints;
}
@Override
public double cost(Link link, LinkResourceService resourceService) {
// Always consider the number of hops
return 1;
}
@Override
public boolean validate(Path path, LinkResourceService resourceService) {
LinkedList<ElementId> waypoints = new LinkedList<>(this.waypoints);
ElementId current = waypoints.poll();
// This is safe because Path class ensures the number of links are more than 0
Link firstLink = path.links().get(0);
if (firstLink.src().elementId().equals(current)) {
current = waypoints.poll();
}
for (Link link : path.links()) {
if (link.dst().elementId().equals(current)) {
current = waypoints.poll();
// Empty waypoints means passing through all waypoints in the specified order
if (current == null) {
return true;
}
}
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(waypoints);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof WaypointConstraint)) {
return false;
}
final WaypointConstraint that = (WaypointConstraint) obj;
return Objects.equals(this.waypoints, that.waypoints);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("waypoints", waypoints)
.toString();
}
}
package org.onlab.onos.store.service;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
import com.google.common.base.MoreObjects;
/**
......@@ -10,9 +14,21 @@ public class ReadRequest {
private final String tableName;
private final String key;
/**
* Creates a read request,
* which will retrieve the specified key from the table.
*
* @param tableName name of the table
* @param key key in the table
* @return ReadRequest
*/
public static ReadRequest get(String tableName, String key) {
return new ReadRequest(tableName, key);
}
public ReadRequest(String tableName, String key) {
this.tableName = tableName;
this.key = key;
this.tableName = checkNotNull(tableName);
this.key = checkNotNull(key);
}
/**
......@@ -38,4 +54,26 @@ public class ReadRequest {
.add("key", key)
.toString();
}
@Override
public int hashCode() {
return Objects.hash(key, tableName);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ReadRequest other = (ReadRequest) obj;
return Objects.equals(this.key, other.key) &&
Objects.equals(this.tableName, other.tableName);
}
}
\ No newline at end of file
......
......@@ -38,6 +38,28 @@ public class VersionedValue {
return version;
}
/**
* Creates a copy of given VersionedValue.
*
* @param original VersionedValue to create a copy
* @return same as original if original or it's value is null,
* otherwise creates a copy.
*/
public static VersionedValue copy(VersionedValue original) {
if (original == null) {
return null;
}
if (original.value == null) {
// immutable, no need to copy
return original;
} else {
return new VersionedValue(
Arrays.copyOf(original.value,
original.value.length),
original.version);
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.net.intent.constraint;
import com.google.common.testing.EqualsTester;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPath;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Path;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.resource.LinkResourceService;
import java.util.Arrays;
import static org.easymock.EasyMock.createMock;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.onlab.onos.net.DefaultLinkTest.cp;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.Link.Type.DIRECT;
/**
* Test for constraint of intermediate elements.
*/
public class WaypointConstraintTest {
public static final DeviceId DID1 = deviceId("of:1");
public static final DeviceId DID2 = deviceId("of:2");
public static final DeviceId DID3 = deviceId("of:3");
public static final DeviceId DID4 = deviceId("of:4");
public static final PortNumber PN1 = PortNumber.portNumber(1);
public static final PortNumber PN2 = PortNumber.portNumber(2);
public static final PortNumber PN3 = PortNumber.portNumber(3);
public static final PortNumber PN4 = PortNumber.portNumber(4);
public static final ProviderId PROVIDER_ID = new ProviderId("of", "foo");
private WaypointConstraint sut;
private LinkResourceService linkResourceService;
private Path path;
private DefaultLink link2;
private DefaultLink link1;
@Before
public void setUp() {
linkResourceService = createMock(LinkResourceService.class);
link1 = new DefaultLink(PROVIDER_ID, cp(DID1, PN1), cp(DID2, PN2), DIRECT);
link2 = new DefaultLink(PROVIDER_ID, cp(DID2, PN3), cp(DID3, PN4), DIRECT);
path = new DefaultPath(PROVIDER_ID, Arrays.asList(link1, link2), 10);
}
/**
* Tests that all of the specified waypoints are included in the specified path in order.
*/
@Test
public void testSatisfyWaypoints() {
sut = new WaypointConstraint(DID1, DID2, DID3);
assertThat(sut.validate(path, linkResourceService), is(true));
}
/**
* Tests that the specified path does not includes the specified waypoint.
*/
@Test
public void testNotSatisfyWaypoint() {
sut = new WaypointConstraint(DID4);
assertThat(sut.validate(path, linkResourceService), is(false));
}
@Test
public void testEquality() {
Constraint c1 = new WaypointConstraint(DID1, DID2);
Constraint c2 = new WaypointConstraint(DID1, DID2);
Constraint c3 = new WaypointConstraint(DID2);
Constraint c4 = new WaypointConstraint(DID3);
new EqualsTester()
.addEqualityGroup(c1, c2)
.addEqualityGroup(c3)
.addEqualityGroup(c4)
.testEquals();
}
}
......@@ -118,13 +118,14 @@ public abstract class ConnectivityIntentCompiler<T extends ConnectivityIntent>
@Override
public double weight(TopologyEdge edge) {
if (constraints == null) {
if (constraints == null || !constraints.iterator().hasNext()) {
return 1.0;
}
// iterate over all constraints in order and return the weight of
// the first one with fast fail over the first failure
Iterator<Constraint> it = constraints.iterator();
double cost = it.next().cost(edge.link(), resourceService);
while (it.hasNext() && cost > 0) {
if (it.next().cost(edge.link(), resourceService) < 0) {
......@@ -132,6 +133,7 @@ public abstract class ConnectivityIntentCompiler<T extends ConnectivityIntent>
}
}
return cost;
}
}
......
......@@ -70,7 +70,8 @@ public class HostToHostIntentCompiler
HostToHostIntent intent) {
TrafficSelector selector = builder(intent.selector())
.matchEthSrc(src.mac()).matchEthDst(dst.mac()).build();
return new PathIntent(intent.appId(), selector, intent.treatment(), path);
return new PathIntent(intent.appId(), selector, intent.treatment(),
path, intent.constraints());
}
}
......
......@@ -77,7 +77,7 @@ import com.google.common.collect.Lists;
@Service
public class IntentManager
implements IntentService, IntentExtensionService {
private final Logger log = getLogger(getClass());
private static final Logger log = getLogger(IntentManager.class);
public static final String INTENT_NULL = "Intent cannot be null";
public static final String INTENT_ID_NULL = "Intent ID cannot be null";
......
......@@ -77,7 +77,8 @@ public class PointToPointIntentCompiler
private Intent createPathIntent(Path path,
PointToPointIntent intent) {
return new PathIntent(intent.appId(),
intent.selector(), intent.treatment(), path);
intent.selector(), intent.treatment(), path,
intent.constraints());
}
}
......
......@@ -28,6 +28,7 @@ import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.mastership.MastershipStore;
import org.onlab.onos.mastership.MastershipTermService;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.trivial.impl.SimpleMastershipStore;
......@@ -57,9 +58,9 @@ public class MastershipManagerTest {
public void setUp() {
mgr = new MastershipManager();
service = mgr;
mgr.store = new SimpleMastershipStore();
mgr.eventDispatcher = new TestEventDispatcher();
mgr.clusterService = new TestClusterService();
mgr.store = new TestSimpleMastershipStore(mgr.clusterService);
mgr.activate();
}
......@@ -74,7 +75,8 @@ public class MastershipManagerTest {
@Test
public void setRole() {
mgr.setRole(NID_OTHER, DEV_MASTER, MASTER);
assertEquals("wrong local role:", STANDBY, mgr.getLocalRole(DEV_MASTER));
assertEquals("wrong local role:", NONE, mgr.getLocalRole(DEV_MASTER));
assertEquals("wrong obtained role:", STANDBY, mgr.requestRoleFor(DEV_MASTER));
//set to master
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
......@@ -182,4 +184,12 @@ public class MastershipManagerTest {
}
}
private final class TestSimpleMastershipStore extends SimpleMastershipStore
implements MastershipStore {
public TestSimpleMastershipStore(ClusterService clusterService) {
super.clusterService = clusterService;
}
}
}
......
......@@ -64,6 +64,12 @@
-->
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
......
......@@ -159,7 +159,7 @@ public class ClusterCommunicationManager
return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
} catch (IOException e) {
log.error("Failed interaction with remote nodeId: " + toNodeId, e);
log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
throw e;
}
}
......
......@@ -283,16 +283,15 @@ implements MastershipStore {
case MASTER:
NodeId newMaster = reelect(nodeId, deviceId, rv);
rv.reassign(nodeId, NONE, STANDBY);
if (newMaster != null) {
updateTerm(deviceId);
if (newMaster != null) {
roleMap.put(deviceId, rv);
return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
} else {
// no master candidate
roleMap.put(deviceId, rv);
// FIXME: Should there be new event type?
// or should we issue null Master event?
return null;
// TODO: Should there be new event type for no MASTER?
return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
}
case STANDBY:
return null;
......
......@@ -149,12 +149,12 @@ public class ClusterMessagingProtocol
@Activate
public void activate() {
log.info("Started.");
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped.");
log.info("Stopped");
}
@Override
......
......@@ -132,8 +132,8 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
} catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
log.warn("Request to {} failed. Will retry "
+ "in {} ms", remoteNode, RETRY_INTERVAL_MILLIS);
log.warn("{} Request to {} failed. Will retry in {} ms",
message.subject(), remoteNode, RETRY_INTERVAL_MILLIS);
THREAD_POOL.schedule(
this,
RETRY_INTERVAL_MILLIS,
......
......@@ -3,12 +3,17 @@ package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
......@@ -57,37 +62,37 @@ public class ClusterMessagingProtocolServer implements ProtocolServer {
public void handle(ClusterMessage message) {
T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
if (request.getClass().equals(PingRequest.class)) {
handler.ping((PingRequest) request).whenComplete((response, error) -> {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to ping request", e);
}
});
handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
} else if (request.getClass().equals(PollRequest.class)) {
handler.poll((PollRequest) request).whenComplete((response, error) -> {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to poll request", e);
}
});
handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
} else if (request.getClass().equals(SyncRequest.class)) {
handler.sync((SyncRequest) request).whenComplete((response, error) -> {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to sync request", e);
}
});
handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
} else if (request.getClass().equals(SubmitRequest.class)) {
handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
} else {
throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
}
}
private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
private final ClusterMessage message;
public PostExecutionTask(ClusterMessage message) {
this.message = message;
}
@Override
public void accept(R response, Throwable t) {
if (t != null) {
log.error("Processing for " + message.subject() + " failed.", t);
} else {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to submit request", e);
log.error("Failed to respond to " + response.getClass().getName(), e);
}
}
});
}
}
}
......
......@@ -5,20 +5,26 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.StateMachine;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.log.InMemoryLog;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.service.DatabaseAdminService;
......@@ -35,8 +41,6 @@ import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
/**
* Strongly consistent and durable state management service based on
* Copycat implementation of Raft consensus protocol.
......@@ -58,17 +62,34 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
private Copycat copycat;
private DatabaseClient client;
// guarded by synchronized block
private ClusterConfig<TcpMember> clusterConfig;
private CountDownLatch clusterEventLatch;
private ClusterEventListener clusterEventListener;
@Activate
public void activate() {
log.info("Starting.");
// TODO: Not every node can be part of the consensus ring.
// TODO: Not every node should be part of the consensus ring.
final ControllerNode localNode = clusterService.getLocalNode();
TcpMember localMember =
new TcpMember(
clusterService.getLocalNode().ip().toString(),
clusterService.getLocalNode().tcpPort());
List<TcpMember> remoteMembers = Lists.newArrayList();
localNode.ip().toString(),
localNode.tcpPort());
clusterConfig = new TcpClusterConfig();
clusterConfig.setLocalMember(localMember);
List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
clusterEventLatch = new CountDownLatch(1);
clusterEventListener = new InternalClusterEventListener();
clusterService.addListener(clusterEventListener);
// note: from this point beyond, clusterConfig requires synchronization
for (ControllerNode node : clusterService.getNodes()) {
TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
......@@ -77,20 +98,37 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
}
// Configure the cluster.
TcpClusterConfig config = new TcpClusterConfig();
if (remoteMembers.isEmpty()) {
log.info("This node is the only node in the cluster. "
+ "Waiting for others to show up.");
// FIXME: hack trying to relax cases forming multiple consensus rings.
// add seed node configuration to avoid this
// If the node is alone on it's own, wait some time
// hoping other will come up soon
try {
if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
log.info("Starting as single node cluster");
}
} catch (InterruptedException e) {
log.info("Interrupted waiting for others", e);
}
}
config.setLocalMember(localMember);
config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
final TcpCluster cluster;
synchronized (clusterConfig) {
clusterConfig.addRemoteMembers(remoteMembers);
// Create the cluster.
TcpCluster cluster = new TcpCluster(config);
cluster = new TcpCluster(clusterConfig);
}
log.info("Starting cluster: {}", cluster);
StateMachine stateMachine = new DatabaseStateMachine();
ControllerNode thisNode = clusterService.getLocalNode();
// FIXME resolve Chronicle + OSGi issue
//Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
Log consensusLog = new InMemoryLog();
Log consensusLog = new KryoRegisteredInMemoryLog();
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.start();
......@@ -102,6 +140,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
@Deactivate
public void deactivate() {
clusterService.removeListener(clusterEventListener);
copycat.stop();
log.info("Stopped.");
}
......@@ -179,6 +218,53 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
private final class InternalClusterEventListener
implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
// TODO: Not every node should be part of the consensus ring.
final ControllerNode node = event.subject();
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
node.tcpPort());
log.trace("{}", event);
switch (event.type()) {
case INSTANCE_ACTIVATED:
case INSTANCE_ADDED:
log.info("{} was added to the cluster", tcpMember);
synchronized (clusterConfig) {
clusterConfig.addRemoteMember(tcpMember);
}
break;
case INSTANCE_DEACTIVATED:
case INSTANCE_REMOVED:
log.info("{} was removed from the cluster", tcpMember);
synchronized (clusterConfig) {
clusterConfig.removeRemoteMember(tcpMember);
}
break;
default:
break;
}
if (copycat != null) {
log.debug("Current cluster: {}", copycat.cluster());
}
clusterEventLatch.countDown();
}
}
public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
public KryoRegisteredInMemoryLog() {
super();
// required to deserialize object across bundles
super.kryo.register(TcpMember.class, new TcpMemberSerializer());
super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
}
}
private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
private final R result;
......
package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.kuujo.copycat.Command;
import net.kuujo.copycat.Query;
import net.kuujo.copycat.StateMachine;
......@@ -16,7 +16,9 @@ import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
......@@ -28,6 +30,8 @@ import com.google.common.collect.Maps;
*/
public class DatabaseStateMachine implements StateMachine {
private final Logger log = getLogger(getClass());
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
......@@ -59,8 +63,8 @@ public class DatabaseStateMachine implements StateMachine {
}
@Query
public Set<String> listTables() {
return state.getTables().keySet();
public List<String> listTables() {
return ImmutableList.copyOf(state.getTables().keySet());
}
@Query
......@@ -72,7 +76,7 @@ public class DatabaseStateMachine implements StateMachine {
results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
continue;
}
VersionedValue value = table.get(request.key());
VersionedValue value = VersionedValue.copy(table.get(request.key()));
results.add(new InternalReadResult(
InternalReadResult.Status.OK,
new ReadResult(
......@@ -85,6 +89,8 @@ public class DatabaseStateMachine implements StateMachine {
@Command
public List<InternalWriteResult> write(List<WriteRequest> requests) {
// applicability check
boolean abort = false;
List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
for (WriteRequest request : requests) {
......@@ -128,8 +134,13 @@ public class DatabaseStateMachine implements StateMachine {
return results;
}
// apply changes
for (WriteRequest request : requests) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
// FIXME: If this method could be called by multiple thread,
// synchronization scope is wrong.
// Whole function including applicability check needs to be protected.
// Confirm copycat's thread safety requirement for StateMachine
synchronized (table) {
VersionedValue previousValue =
table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
......@@ -161,8 +172,8 @@ public class DatabaseStateMachine implements StateMachine {
try {
return SERIALIZER.encode(state);
} catch (Exception e) {
e.printStackTrace();
return null;
log.error("Failed to take snapshot", e);
throw new SnapshotException(e);
}
}
......@@ -171,7 +182,8 @@ public class DatabaseStateMachine implements StateMachine {
try {
this.state = SERIALIZER.decode(data);
} catch (Exception e) {
e.printStackTrace();
log.error("Failed to install from snapshot", e);
throw new SnapshotException(e);
}
}
}
......
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import net.kuujo.copycat.log.Entry;
import net.kuujo.copycat.log.Log;
import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
import org.mapdb.Atomic;
import org.mapdb.BTreeMap;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.TxBlock;
import org.mapdb.TxMaker;
import org.onlab.onos.store.serializers.StoreSerializer;
import com.google.common.collect.Lists;
/**
* MapDB based log implementation.
*/
public class MapDBLog implements Log {
private final File dbFile;
private TxMaker txMaker;
private final StoreSerializer serializer;
private static final String LOG_NAME = "log";
private static final String SIZE_FIELD_NAME = "size";
public MapDBLog(File dbFile, StoreSerializer serializer) {
this.dbFile = dbFile;
this.serializer = serializer;
}
@Override
public void open() throws IOException {
txMaker = DBMaker
.newFileDB(dbFile)
.makeTxMaker();
}
@Override
public void close() throws IOException {
assertIsOpen();
txMaker.close();
txMaker = null;
}
@Override
public boolean isOpen() {
return txMaker != null;
}
protected void assertIsOpen() {
checkState(isOpen(), "The log is not currently open.");
}
@Override
public long appendEntry(Entry entry) {
checkArgument(entry != null, "expecting non-null entry");
return appendEntries(entry).get(0);
}
@Override
public List<Long> appendEntries(Entry... entries) {
checkArgument(entries != null, "expecting non-null entries");
return appendEntries(Arrays.asList(entries));
}
@Override
public List<Long> appendEntries(List<Entry> entries) {
assertIsOpen();
checkArgument(entries != null, "expecting non-null entries");
final List<Long> indices = Lists.newArrayList();
txMaker.execute(new TxBlock() {
@Override
public void tx(DB db) {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
for (Entry entry : entries) {
byte[] entryBytes = serializer.encode(entry);
log.put(nextIndex, entryBytes);
size.addAndGet(entryBytes.length);
indices.add(nextIndex);
nextIndex++;
}
}
});
return indices;
}
@Override
public boolean containsEntry(long index) {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.containsKey(index);
} finally {
db.close();
}
}
@Override
public void delete() throws IOException {
assertIsOpen();
txMaker.execute(new TxBlock() {
@Override
public void tx(DB db) {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
log.clear();
size.set(0);
}
});
}
@Override
public <T extends Entry> T firstEntry() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
} finally {
db.close();
}
}
@Override
public long firstIndex() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty() ? 0 : log.firstKey();
} finally {
db.close();
}
}
@Override
public <T extends Entry> List<T> getEntries(long from, long to) {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
if (log.isEmpty()) {
throw new LogIndexOutOfBoundsException("Log is empty");
} else if (from < log.firstKey()) {
throw new LogIndexOutOfBoundsException("From index out of bounds.");
} else if (to > log.lastKey()) {
throw new LogIndexOutOfBoundsException("To index out of bounds.");
}
List<T> entries = new ArrayList<>((int) (to - from + 1));
for (long i = from; i <= to; i++) {
T entry = serializer.decode(log.get(i));
entries.add(entry);
}
return entries;
} finally {
db.close();
}
}
@Override
public <T extends Entry> T getEntry(long index) {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
byte[] entryBytes = log.get(index);
return entryBytes == null ? null : serializer.decode(entryBytes);
} finally {
db.close();
}
}
@Override
public boolean isEmpty() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty();
} finally {
db.close();
}
}
@Override
public <T extends Entry> T lastEntry() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
} finally {
db.close();
}
}
@Override
public long lastIndex() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
return log.isEmpty() ? 0 : log.lastKey();
} finally {
db.close();
}
}
@Override
public void removeAfter(long index) {
assertIsOpen();
txMaker.execute(new TxBlock() {
@Override
public void tx(DB db) {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
long startIndex = index + 1;
long endIndex = log.lastKey();
for (long i = startIndex; i <= endIndex; ++i) {
byte[] entryBytes = log.remove(i);
size.addAndGet(-1L * entryBytes.length);
}
}
});
}
@Override
public long size() {
assertIsOpen();
DB db = txMaker.makeTx();
try {
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
return size.get();
} finally {
db.close();
}
}
@Override
public void sync() throws IOException {
assertIsOpen();
}
@Override
public void compact(long index, Entry entry) throws IOException {
assertIsOpen();
txMaker.execute(new TxBlock() {
@Override
public void tx(DB db) {
BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
size.addAndGet(-1 * deletedBytes);
byte[] entryBytes = serializer.encode(entry);
byte[] existingEntry = log.put(index, entryBytes);
size.addAndGet(entryBytes.length - existingEntry.length);
db.compact();
}
});
}
}
\ No newline at end of file
package org.onlab.onos.store.service.impl;
import org.onlab.onos.store.service.DatabaseException;
/**
* Exception that indicates a problem with the state machine snapshotting.
*/
@SuppressWarnings("serial")
public class SnapshotException extends DatabaseException {
public SnapshotException(Throwable t) {
super(t);
}
}
package org.onlab.onos.store.service.impl;
import java.util.Collection;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class TcpClusterConfigSerializer extends Serializer<TcpClusterConfig> {
@Override
public void write(Kryo kryo, Output output, TcpClusterConfig object) {
kryo.writeClassAndObject(output, object.getLocalMember());
kryo.writeClassAndObject(output, object.getRemoteMembers());
}
@Override
public TcpClusterConfig read(Kryo kryo, Input input,
Class<TcpClusterConfig> type) {
TcpMember localMember = (TcpMember) kryo.readClassAndObject(input);
@SuppressWarnings("unchecked")
Collection<TcpMember> remoteMembers = (Collection<TcpMember>) kryo.readClassAndObject(input);
return new TcpClusterConfig(localMember, remoteMembers);
}
}
package org.onlab.onos.store.service.impl;
import net.kuujo.copycat.cluster.TcpMember;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class TcpMemberSerializer extends Serializer<TcpMember> {
@Override
public void write(Kryo kryo, Output output, TcpMember object) {
output.writeString(object.host());
output.writeInt(object.port());
}
@Override
public TcpMember read(Kryo kryo, Input input, Class<TcpMember> type) {
String host = input.readString();
int port = input.readInt();
return new TcpMember(host, port);
}
}
package org.onlab.onos.store.service.impl;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import net.kuujo.copycat.internal.log.OperationEntry;
import net.kuujo.copycat.log.Entry;
import net.kuujo.copycat.log.Log;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.store.serializers.StoreSerializer;
import com.google.common.testing.EqualsTester;
/**
* Test the MapDBLog implementation.
*/
public class MapDBLogTest {
private static final String DB_FILE_NAME = "mapdbTest";
private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.SERIALIZER;
private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");
private static final Entry TEST_ENTRY4 = new OperationEntry(4, "test1234");
private static final Entry TEST_SNAPSHOT_ENTRY = new OperationEntry(5, "snapshot");
private static final long TEST_ENTRY1_SIZE = SERIALIZER.encode(TEST_ENTRY1).length;
private static final long TEST_ENTRY2_SIZE = SERIALIZER.encode(TEST_ENTRY2).length;
private static final long TEST_ENTRY3_SIZE = SERIALIZER.encode(TEST_ENTRY3).length;
private static final long TEST_ENTRY4_SIZE = SERIALIZER.encode(TEST_ENTRY4).length;
private static final long TEST_SNAPSHOT_ENTRY_SIZE = SERIALIZER.encode(TEST_SNAPSHOT_ENTRY).length;
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
Files.deleteIfExists(new File(DB_FILE_NAME).toPath());
Files.deleteIfExists(new File(DB_FILE_NAME + ".t").toPath());
Files.deleteIfExists(new File(DB_FILE_NAME + ".p").toPath());
}
@Test(expected = IllegalStateException.class)
public void testAssertOpen() {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.size();
}
@Test
public void testAppendEntry() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntry(TEST_ENTRY1);
OperationEntry first = log.firstEntry();
OperationEntry last = log.lastEntry();
new EqualsTester()
.addEqualityGroup(first, last, TEST_ENTRY1)
.testEquals();
Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
Assert.assertEquals(1, log.firstIndex());
Assert.assertEquals(1, log.lastIndex());
}
@Test
public void testAppendEntries() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3);
OperationEntry first = log.firstEntry();
OperationEntry last = log.lastEntry();
new EqualsTester()
.addEqualityGroup(first, TEST_ENTRY1)
.addEqualityGroup(last, TEST_ENTRY3)
.testEquals();
Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY2_SIZE, TEST_ENTRY3_SIZE, log.size());
Assert.assertEquals(1, log.firstIndex());
Assert.assertEquals(3, log.lastIndex());
Assert.assertTrue(log.containsEntry(1));
Assert.assertTrue(log.containsEntry(2));
}
@Test
public void testDelete() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2);
log.delete();
Assert.assertEquals(0, log.size());
Assert.assertTrue(log.isEmpty());
Assert.assertEquals(0, log.firstIndex());
Assert.assertNull(log.firstEntry());
Assert.assertEquals(0, log.lastIndex());
Assert.assertNull(log.lastEntry());
}
@Test
public void testGetEntries() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
Assert.assertEquals(
TEST_ENTRY1_SIZE +
TEST_ENTRY2_SIZE +
TEST_ENTRY3_SIZE +
TEST_ENTRY4_SIZE, log.size());
List<Entry> entries = log.getEntries(2, 3);
new EqualsTester()
.addEqualityGroup(log.getEntry(4), TEST_ENTRY4)
.addEqualityGroup(entries.get(0), TEST_ENTRY2)
.addEqualityGroup(entries.get(1), TEST_ENTRY3)
.testEquals();
}
@Test
public void testRemoveAfter() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.removeAfter(1);
Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
new EqualsTester()
.addEqualityGroup(log.firstEntry(), log.lastEntry(), TEST_ENTRY1)
.testEquals();
}
@Test
public void testAddAfterRemove() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.removeAfter(1);
log.appendEntry(TEST_ENTRY4);
Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE, log.size());
new EqualsTester()
.addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
.addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
.addEqualityGroup(log.size(), TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE)
.testEquals();
}
@Test
public void testClose() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
Assert.assertFalse(log.isOpen());
log.open();
Assert.assertTrue(log.isOpen());
log.close();
Assert.assertFalse(log.isOpen());
}
@Test
public void testReopen() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.close();
log.open();
new EqualsTester()
.addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
.addEqualityGroup(log.getEntry(2), TEST_ENTRY2)
.addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
.addEqualityGroup(log.size(),
TEST_ENTRY1_SIZE +
TEST_ENTRY2_SIZE +
TEST_ENTRY3_SIZE +
TEST_ENTRY4_SIZE)
.testEquals();
}
@Test
public void testCompact() throws IOException {
Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.compact(3, TEST_SNAPSHOT_ENTRY);
new EqualsTester()
.addEqualityGroup(log.firstEntry(), TEST_SNAPSHOT_ENTRY)
.addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
.addEqualityGroup(log.size(),
TEST_SNAPSHOT_ENTRY_SIZE +
TEST_ENTRY4_SIZE)
.testEquals();
}
}
......@@ -103,6 +103,20 @@ import com.google.common.collect.ImmutableSet;
public final class KryoNamespaces {
public static final KryoNamespace BASIC = KryoNamespace.newBuilder()
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableList.class, new ImmutableListSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
.register(
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
HashSet.class,
LinkedList.class,
byte[].class
)
.build();
/**
* KryoNamespace which can serialize ON.lab misc classes.
*/
......@@ -123,19 +137,8 @@ public final class KryoNamespaces {
*/
public static final KryoNamespace API = KryoNamespace.newBuilder()
.register(MISC)
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableList.class, new ImmutableListSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
.register(BASIC)
.register(
//
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
HashSet.class,
LinkedList.class,
byte[].class,
//
//
ControllerNode.State.class,
Device.Type.class,
Port.Type.class,
......
......@@ -15,6 +15,8 @@
*/
package org.onlab.onos.store.trivial.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -22,6 +24,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
......@@ -74,6 +77,7 @@ public class SimpleMastershipStoreTest {
assertEquals("wrong role", MASTER, sms.getRole(N2, DID3));
//N2 is master but N1 is only in backups set
put(DID4, N1, false, true);
put(DID4, N2, true, false);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID4));
}
......@@ -127,12 +131,12 @@ public class SimpleMastershipStoreTest {
put(DID1, N1, false, false);
assertEquals("wrong role", MASTER, sms.requestRole(DID1));
//STANDBY without backup - become MASTER
//was STANDBY - become MASTER
put(DID2, N1, false, true);
assertEquals("wrong role", MASTER, sms.requestRole(DID2));
//STANDBY with backup - stay STANDBY
put(DID3, N2, false, true);
//other MASTER - stay STANDBY
put(DID3, N2, true, false);
assertEquals("wrong role", STANDBY, sms.requestRole(DID3));
//local (N1) is MASTER - stay MASTER
......@@ -145,30 +149,34 @@ public class SimpleMastershipStoreTest {
//NONE - record backup but take no other action
put(DID1, N1, false, false);
sms.setStandby(N1, DID1);
assertTrue("not backed up", sms.backups.contains(N1));
sms.termMap.clear();
assertTrue("not backed up", sms.backups.get(DID1).contains(N1));
int prev = sms.termMap.get(DID1).get();
sms.setStandby(N1, DID1);
assertTrue("term not set", sms.termMap.containsKey(DID1));
assertEquals("term should not change", prev, sms.termMap.get(DID1).get());
//no backup, MASTER
put(DID1, N1, true, true);
assertNull("wrong event", sms.setStandby(N1, DID1));
put(DID1, N1, true, false);
assertNull("expect no MASTER event", sms.setStandby(N1, DID1).roleInfo().master());
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
sms.masterMap.clear();
put(DID1, N1, true, true);
put(DID1, N2, false, true);
put(DID2, N2, true, true);
assertEquals("wrong event", MASTER_CHANGED, sms.setStandby(N1, DID1).type());
MastershipEvent event = sms.setStandby(N1, DID1);
assertEquals("wrong event", MASTER_CHANGED, event.type());
assertEquals("wrong master", N2, event.roleInfo().master());
}
//helper to populate master/backup structures
private void put(DeviceId dev, NodeId node, boolean store, boolean backup) {
if (store) {
private void put(DeviceId dev, NodeId node, boolean master, boolean backup) {
if (master) {
sms.masterMap.put(dev, node);
}
if (backup) {
sms.backups.add(node);
} else if (backup) {
List<NodeId> stbys = sms.backups.getOrDefault(dev, new ArrayList<>());
stbys.add(node);
sms.backups.put(dev, stbys);
}
sms.termMap.put(dev, new AtomicInteger());
}
......
......@@ -56,6 +56,9 @@
<bundle>mvn:org.codehaus.jackson/jackson-mapper-asl/1.9.13</bundle>
<bundle>mvn:org.onlab.onos/onlab-thirdparty/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.mapdb/mapdb/1.0.6</bundle>
<!-- FIXME: resolce Chronicle's dependency issue
<bundle>mvn:net.openhft/lang/6.4.6</bundle>
<bundle>mvn:net.openhft/affinity/2.1.1</bundle>
......
This diff is collapsed. Click to expand it.
......@@ -15,7 +15,7 @@
*/
/*
ONOS GUI -- Masthead
ONOS GUI -- Masthead script
Defines the masthead for the UI. Injects logo and title, as well as providing
the placeholder for a set of radio buttons.
......
......@@ -407,7 +407,8 @@
height: this.height,
uid: this.uid,
setRadio: this.setRadio,
setKeys: this.setKeys
setKeys: this.setKeys,
dataLoadError: this.dataLoadError
}
},
......@@ -498,6 +499,16 @@
uid: function (id) {
return makeUid(this, id);
},
// TODO : implement custom dialogs (don't use alerts)
dataLoadError: function (err, url) {
var msg = 'Data Load Error\n\n' +
err.status + ' -- ' + err.statusText + '\n\n' +
'relative-url: "' + url + '"\n\n' +
'complete-url: "' + err.responseURL + '"';
alert(msg);
}
// TODO: consider schedule, clearTimer, etc.
......
......@@ -24,3 +24,6 @@ svg #topo-bg {
opacity: 0.5;
}
svg .node {
fill: #03c;
}
\ No newline at end of file
......
......@@ -25,7 +25,7 @@
// configuration data
var config = {
useLiveData: false,
useLiveData: true,
debugOn: false,
debug: {
showNodeXY: false,
......@@ -56,12 +56,24 @@
opt: 'img/opt.png'
},
force: {
marginLR: 20,
marginTB: 20,
note: 'node.class or link.class is used to differentiate',
linkDistance: {
infra: 200,
host: 40
},
linkStrength: {
infra: 1.0,
host: 1.0
},
charge: {
device: -400,
host: -100
},
pad: 20,
translate: function() {
return 'translate(' +
config.force.marginLR + ',' +
config.force.marginTB + ')';
config.force.pad + ',' +
config.force.pad + ')';
}
}
};
......@@ -94,7 +106,11 @@
// D3 selections
var svg,
bgImg,
topoG;
topoG,
nodeG,
linkG,
node,
link;
// ==============================
// For Debugging / Development
......@@ -175,23 +191,146 @@
// ==============================
// Private functions
// set the size of the given element to that of the view
function setSize(el, view) {
// set the size of the given element to that of the view (reduced if padded)
function setSize(el, view, pad) {
var padding = pad ? pad * 2 : 0;
el.attr({
width: view.width(),
height: view.height()
width: view.width() - padding,
height: view.height() - padding
});
}
function getNetworkData(view) {
var url = getTopoUrl();
// TODO ...
console.log('Fetching JSON: ' + url);
d3.json(url, function(err, data) {
if (err) {
view.dataLoadError(err, url);
} else {
network.data = data;
drawNetwork(view);
}
});
}
function drawNetwork(view) {
preprocessData(view);
updateLayout(view);
}
function preprocessData(view) {
var w = view.width(),
h = view.height(),
hDevice = h * 0.6,
hHost = h * 0.3,
data = network.data,
deviceLayout = computeInitLayout(w, hDevice, data.devices.length),
hostLayout = computeInitLayout(w, hHost, data.hosts.length);
network.lookup = {};
network.nodes = [];
network.links = [];
// we created new arrays, so need to set the refs in the force layout
network.force.nodes(network.nodes);
network.force.links(network.links);
// let's just start with the nodes
// note that both 'devices' and 'hosts' get mapped into the nodes array
function makeNode(d, cls, layout) {
var node = {
id: d.id,
labels: d.labels,
class: cls,
icon: cls,
type: d.type,
x: layout.x(),
y: layout.y()
};
network.lookup[d.id] = node;
network.nodes.push(node);
}
// first the devices...
network.data.devices.forEach(function (d) {
makeNode(d, 'device', deviceLayout);
});
// then the hosts...
network.data.hosts.forEach(function (d) {
makeNode(d, 'host', hostLayout);
});
// TODO: process links
}
function computeInitLayout(w, h, n) {
var maxdw = 60,
compdw, dw, ox, layout;
if (n < 2) {
layout = { ox: w/2, dw: 0 }
} else {
compdw = (0.8 * w) / (n - 1);
dw = Math.min(maxdw, compdw);
ox = w/2 - ((n - 1)/2 * dw);
layout = { ox: ox, dw: dw }
}
layout.i = 0;
layout.x = function () {
var x = layout.ox + layout.i*layout.dw;
layout.i++;
return x;
};
layout.y = function () {
return h;
};
return layout;
}
function linkId(d) {
return d.source.id + '~' + d.target.id;
}
function nodeId(d) {
return d.id;
}
function updateLayout(view) {
link = link.data(network.force.links(), linkId);
link.enter().append('line')
.attr('class', 'link');
link.exit().remove();
node = node.data(network.force.nodes(), nodeId);
node.enter().append('circle')
.attr('id', function (d) { return 'nodeId-' + d.id; })
.attr('class', function (d) { return 'node'; })
.attr('r', 12);
network.force.start();
}
function tick() {
node.attr({
cx: function(d) { return d.x; },
cy: function(d) { return d.y; }
});
link.attr({
x1: function (d) { return d.source.x; },
y1: function (d) { return d.source.y; },
x2: function (d) { return d.target.x; },
y2: function (d) { return d.target.y; }
});
}
// ==============================
// View life-cycle callbacks
......@@ -199,15 +338,15 @@
var w = view.width(),
h = view.height(),
idBg = view.uid('bg'),
showBg = config.options.showBackground ? 'visible' : 'hidden';
showBg = config.options.showBackground ? 'visible' : 'hidden',
fcfg = config.force,
fpad = fcfg.pad,
forceDim = [w - 2*fpad, h - 2*fpad];
// NOTE: view.$div is a D3 selection of the view's div
svg = view.$div.append('svg');
setSize(svg, view);
topoG = svg.append('g')
.attr('transform', config.force.translate());
// load the background image
bgImg = svg.append('svg:image')
.attr({
......@@ -219,6 +358,28 @@
.style({
visibility: showBg
});
// group for the topology
topoG = svg.append('g')
.attr('transform', fcfg.translate());
// subgroups for links and nodes
linkG = topoG.append('g').attr('id', 'links');
nodeG = topoG.append('g').attr('id', 'nodes');
// selection of nodes and links
link = linkG.selectAll('.link');
node = nodeG.selectAll('.node');
// set up the force layout
network.force = d3.layout.force()
.size(forceDim)
.nodes(network.nodes)
.links(network.links)
.charge(function (d) { return fcfg.charge[d.class]; })
.linkDistance(function (d) { return fcfg.linkDistance[d.class]; })
.linkStrength(function (d) { return fcfg.linkStrength[d.class]; })
.on('tick', tick);
}
......