pankaj

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 38 changed files with 815 additions and 116 deletions
1 package org.onlab.onos.cli; 1 package org.onlab.onos.cli;
2 2
3 import org.apache.karaf.shell.commands.Command; 3 import org.apache.karaf.shell.commands.Command;
4 +import org.onlab.onos.CoreService;
4 import org.onlab.onos.cluster.ClusterService; 5 import org.onlab.onos.cluster.ClusterService;
5 import org.onlab.onos.net.device.DeviceService; 6 import org.onlab.onos.net.device.DeviceService;
6 import org.onlab.onos.net.flow.FlowRuleService; 7 import org.onlab.onos.net.flow.FlowRuleService;
...@@ -21,7 +22,8 @@ public class SummaryCommand extends AbstractShellCommand { ...@@ -21,7 +22,8 @@ public class SummaryCommand extends AbstractShellCommand {
21 protected void execute() { 22 protected void execute() {
22 TopologyService topologyService = get(TopologyService.class); 23 TopologyService topologyService = get(TopologyService.class);
23 Topology topology = topologyService.currentTopology(); 24 Topology topology = topologyService.currentTopology();
24 - print("nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d", 25 + print("version=%s, nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d",
26 + get(CoreService.class).version().toString(),
25 get(ClusterService.class).getNodes().size(), 27 get(ClusterService.class).getNodes().size(),
26 get(DeviceService.class).getDeviceCount(), 28 get(DeviceService.class).getDeviceCount(),
27 get(LinkService.class).getLinkCount(), 29 get(LinkService.class).getLinkCount(),
......
1 +package org.onlab.onos;
2 +
3 +/**
4 + * Service for interacting with the core system of the controller.
5 + */
6 +public interface CoreService {
7 +
8 + /**
9 + * Returns the product version.
10 + *
11 + * @return product version
12 + */
13 + Version version();
14 +
15 +}
1 +package org.onlab.onos;
2 +
3 +import java.util.Objects;
4 +
5 +import static java.lang.Integer.parseInt;
6 +
7 +/**
8 + * Representation of the product version.
9 + */
10 +public final class Version {
11 +
12 + public static final String FORMAT = "%d.%d.%d.%s";
13 +
14 + private final int major;
15 + private final int minor;
16 + private final int patch;
17 + private final String build;
18 +
19 + private final String format;
20 +
21 + // Creates a new version descriptor
22 + private Version(int major, int minor, int patch, String build) {
23 + this.major = major;
24 + this.minor = minor;
25 + this.patch = patch;
26 + this.build = build;
27 + this.format = String.format(FORMAT, major, minor, patch, build);
28 + }
29 +
30 +
31 + /**
32 + * Creates a new version from the specified constituent numbers.
33 + *
34 + * @param major major version number
35 + * @param minor minod version number
36 + * @param patch version patch number
37 + * @param build build string
38 + * @return version descriptor
39 + */
40 + public static Version version(int major, int minor, int patch, String build) {
41 + return new Version(major, minor, patch, build);
42 + }
43 +
44 + /**
45 + * Creates a new version by parsing the specified string.
46 + *
47 + * @param string version string
48 + * @return version descriptor
49 + */
50 + public static Version version(String string) {
51 + String[] fields = string.split("[.-]");
52 + return new Version(parseInt(fields[0]), parseInt(fields[1]),
53 + parseInt(fields[2]), fields[3]);
54 + }
55 +
56 + /**
57 + * Returns the major version number.
58 + *
59 + * @return major version number
60 + */
61 + public int major() {
62 + return major;
63 + }
64 +
65 + /**
66 + * Returns the minor version number.
67 + *
68 + * @return minor version number
69 + */
70 + public int minor() {
71 + return minor;
72 + }
73 +
74 + /**
75 + * Returns the version patch number.
76 + *
77 + * @return patch number
78 + */
79 + public int patch() {
80 + return patch;
81 + }
82 +
83 + /**
84 + * Returns the version build string.
85 + *
86 + * @return build string
87 + */
88 + public String build() {
89 + return build;
90 + }
91 +
92 + @Override
93 + public String toString() {
94 + return format;
95 + }
96 +
97 + @Override
98 + public int hashCode() {
99 + return Objects.hash(format);
100 + }
101 +
102 + @Override
103 + public boolean equals(Object obj) {
104 + if (this == obj) {
105 + return true;
106 + }
107 + if (obj instanceof Version) {
108 + final Version other = (Version) obj;
109 + return Objects.equals(this.format, other.format);
110 + }
111 + return false;
112 + }
113 +}
1 +package org.onlab.onos.net.flow;
2 +
3 +public class CompletedBatchOperation {
4 +
5 +
6 +}
...@@ -2,12 +2,13 @@ package org.onlab.onos.net.flow; ...@@ -2,12 +2,13 @@ package org.onlab.onos.net.flow;
2 2
3 import org.onlab.onos.ApplicationId; 3 import org.onlab.onos.ApplicationId;
4 import org.onlab.onos.net.DeviceId; 4 import org.onlab.onos.net.DeviceId;
5 +import org.onlab.onos.net.intent.BatchOperationTarget;
5 6
6 /** 7 /**
7 * Represents a generalized match & action pair to be applied to 8 * Represents a generalized match & action pair to be applied to
8 * an infrastucture device. 9 * an infrastucture device.
9 */ 10 */
10 -public interface FlowRule { 11 +public interface FlowRule extends BatchOperationTarget {
11 12
12 static final int MAX_TIMEOUT = 60; 13 static final int MAX_TIMEOUT = 60;
13 static final int MIN_PRIORITY = 0; 14 static final int MIN_PRIORITY = 0;
......
1 +package org.onlab.onos.net.flow;
2 +
3 +import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
4 +import org.onlab.onos.net.intent.BatchOperationEntry;
5 +
6 +
7 +public class FlowRuleBatchEntry
8 + extends BatchOperationEntry<FlowRuleOperation, FlowRule> {
9 +
10 + public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target) {
11 + super(operator, target);
12 + }
13 +
14 + public enum FlowRuleOperation {
15 + ADD,
16 + REMOVE,
17 + MODIFY
18 + }
19 +
20 +}
1 +package org.onlab.onos.net.flow;
2 +
3 +import java.util.Collection;
4 +
5 +import org.onlab.onos.net.intent.BatchOperation;
6 +
7 +public class FlowRuleBatchOperation
8 + extends BatchOperation<FlowRuleBatchEntry> {
9 +
10 + public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
11 + super(operations);
12 + }
13 +}
1 package org.onlab.onos.net.flow; 1 package org.onlab.onos.net.flow;
2 2
3 +import java.util.concurrent.Future;
4 +
3 import org.onlab.onos.ApplicationId; 5 import org.onlab.onos.ApplicationId;
6 +import org.onlab.onos.net.intent.BatchOperation;
4 import org.onlab.onos.net.provider.Provider; 7 import org.onlab.onos.net.provider.Provider;
5 8
6 /** 9 /**
...@@ -34,4 +37,6 @@ public interface FlowRuleProvider extends Provider { ...@@ -34,4 +37,6 @@ public interface FlowRuleProvider extends Provider {
34 */ 37 */
35 void removeRulesById(ApplicationId id, FlowRule... flowRules); 38 void removeRulesById(ApplicationId id, FlowRule... flowRules);
36 39
40 + Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
41 +
37 } 42 }
......
1 package org.onlab.onos.net.flow; 1 package org.onlab.onos.net.flow;
2 2
3 +import java.util.concurrent.Future;
4 +
3 import org.onlab.onos.ApplicationId; 5 import org.onlab.onos.ApplicationId;
4 import org.onlab.onos.net.DeviceId; 6 import org.onlab.onos.net.DeviceId;
5 7
...@@ -66,7 +68,12 @@ public interface FlowRuleService { ...@@ -66,7 +68,12 @@ public interface FlowRuleService {
66 */ 68 */
67 Iterable<FlowRule> getFlowRulesById(ApplicationId id); 69 Iterable<FlowRule> getFlowRulesById(ApplicationId id);
68 70
69 - //Future<CompletedBatchOperation> applyBatch(BatchOperation<FlowRuleBatchEntry>) 71 + /**
72 + * Applies a batch operation of FlowRules.
73 + *
74 + * @return future indicating the state of the batch operation
75 + */
76 + Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
70 77
71 /** 78 /**
72 * Adds the specified flow rule listener. 79 * Adds the specified flow rule listener.
......
1 package org.onlab.onos.net.intent; 1 package org.onlab.onos.net.intent;
2 //TODO is this the right package? 2 //TODO is this the right package?
3 3
4 +import static com.google.common.base.Preconditions.checkNotNull;
5 +
6 +import java.util.Collection;
4 import java.util.Collections; 7 import java.util.Collections;
5 import java.util.LinkedList; 8 import java.util.LinkedList;
6 import java.util.List; 9 import java.util.List;
7 10
8 -import static com.google.common.base.Preconditions.checkNotNull;
9 -
10 /** 11 /**
11 * A list of BatchOperationEntry. 12 * A list of BatchOperationEntry.
12 * 13 *
...@@ -15,7 +16,7 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -15,7 +16,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
15 */ 16 */
16 public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> { 17 public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
17 18
18 - private List<T> ops; 19 + private final List<T> ops;
19 20
20 /** 21 /**
21 * Creates new {@link BatchOperation} object. 22 * Creates new {@link BatchOperation} object.
...@@ -30,7 +31,7 @@ public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> { ...@@ -30,7 +31,7 @@ public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
30 * 31 *
31 * @param batchOperations the list of batch operation entries. 32 * @param batchOperations the list of batch operation entries.
32 */ 33 */
33 - public BatchOperation(List<T> batchOperations) { 34 + public BatchOperation(Collection<T> batchOperations) {
34 ops = new LinkedList<>(checkNotNull(batchOperations)); 35 ops = new LinkedList<>(checkNotNull(batchOperations));
35 } 36 }
36 37
...@@ -61,6 +62,10 @@ public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> { ...@@ -61,6 +62,10 @@ public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
61 62
62 /** 63 /**
63 * Adds an operation. 64 * Adds an operation.
65 + * FIXME: Brian promises that the Intent Framework
66 + * will not modify the batch operation after it has submitted it.
67 + * Ali would prefer immutablity, but trusts brian for better or
68 + * for worse.
64 * 69 *
65 * @param entry the operation to be added 70 * @param entry the operation to be added
66 * @return this object if succeeded, null otherwise 71 * @return this object if succeeded, null otherwise
......
...@@ -15,14 +15,7 @@ public class BatchOperationEntry<T extends Enum<?>, U extends BatchOperationTarg ...@@ -15,14 +15,7 @@ public class BatchOperationEntry<T extends Enum<?>, U extends BatchOperationTarg
15 private final T operator; 15 private final T operator;
16 private final U target; 16 private final U target;
17 17
18 - /** 18 +
19 - * Default constructor for serializer.
20 - */
21 - @Deprecated
22 - protected BatchOperationEntry() {
23 - this.operator = null;
24 - this.target = null;
25 - }
26 19
27 /** 20 /**
28 * Constructs new instance for the entry of the BatchOperation. 21 * Constructs new instance for the entry of the BatchOperation.
......
...@@ -12,6 +12,7 @@ public interface ClockProviderService { ...@@ -12,6 +12,7 @@ public interface ClockProviderService {
12 12
13 /** 13 /**
14 * Updates the mastership term for the specified deviceId. 14 * Updates the mastership term for the specified deviceId.
15 + *
15 * @param deviceId device identifier. 16 * @param deviceId device identifier.
16 * @param term mastership term. 17 * @param term mastership term.
17 */ 18 */
......
1 +package org.onlab.onos;
2 +
3 +import com.google.common.testing.EqualsTester;
4 +import org.junit.Test;
5 +
6 +import static org.junit.Assert.*;
7 +import static org.onlab.onos.Version.version;
8 +
9 +/**
10 + * Tests of the version descriptor.
11 + */
12 +public class VersionTest {
13 +
14 + @Test
15 + public void fromParts() {
16 + Version v = version(1, 2, 3, "4321");
17 + assertEquals("wrong major", 1, v.major());
18 + assertEquals("wrong minor", 2, v.minor());
19 + assertEquals("wrong patch", 3, v.patch());
20 + assertEquals("wrong build", "4321", v.build());
21 + }
22 +
23 + @Test
24 + public void fromString() {
25 + Version v = version("1.2.3.4321");
26 + assertEquals("wrong major", 1, v.major());
27 + assertEquals("wrong minor", 2, v.minor());
28 + assertEquals("wrong patch", 3, v.patch());
29 + assertEquals("wrong build", "4321", v.build());
30 + }
31 +
32 + @Test
33 + public void snapshot() {
34 + Version v = version("1.2.3-SNAPSHOT");
35 + assertEquals("wrong major", 1, v.major());
36 + assertEquals("wrong minor", 2, v.minor());
37 + assertEquals("wrong patch", 3, v.patch());
38 + assertEquals("wrong build", "SNAPSHOT", v.build());
39 + }
40 +
41 + @Test
42 + public void testEquals() {
43 + new EqualsTester()
44 + .addEqualityGroup(version("1.2.3.4321"), version(1, 2, 3, "4321"))
45 + .addEqualityGroup(version("1.9.3.4321"), version(1, 9, 3, "4321"))
46 + .addEqualityGroup(version("1.2.8.4321"), version(1, 2, 8, "4321"))
47 + .addEqualityGroup(version("1.2.3.x"), version(1, 2, 3, "x"))
48 + .testEquals();
49 + }
50 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onlab.onos.cluster.impl;
2 +
3 +import org.apache.felix.scr.annotations.Activate;
4 +import org.apache.felix.scr.annotations.Component;
5 +import org.apache.felix.scr.annotations.Service;
6 +import org.onlab.onos.CoreService;
7 +import org.onlab.onos.Version;
8 +import org.onlab.util.Tools;
9 +
10 +import java.io.File;
11 +import java.util.List;
12 +
13 +/**
14 + * Core service implementation.
15 + */
16 +@Component
17 +@Service
18 +public class CoreManager implements CoreService {
19 +
20 + private static final File VERSION_FILE = new File("../VERSION");
21 + private static Version version = Version.version("1.0.0-SNAPSHOT");
22 +
23 + // TODO: work in progress
24 +
25 + @Activate
26 + public void activate() {
27 + List<String> versionLines = Tools.slurp(VERSION_FILE);
28 + if (versionLines != null && !versionLines.isEmpty()) {
29 + version = Version.version(versionLines.get(0));
30 + }
31 + }
32 +
33 + @Override
34 + public Version version() {
35 + return version;
36 + }
37 +
38 +}
...@@ -144,6 +144,10 @@ public class DeviceManager ...@@ -144,6 +144,10 @@ public class DeviceManager
144 private void applyRole(DeviceId deviceId, MastershipRole newRole) { 144 private void applyRole(DeviceId deviceId, MastershipRole newRole) {
145 if (newRole != MastershipRole.NONE) { 145 if (newRole != MastershipRole.NONE) {
146 Device device = store.getDevice(deviceId); 146 Device device = store.getDevice(deviceId);
147 + // FIXME: Device might not be there yet. (eventual consistent)
148 + if (device == null) {
149 + return;
150 + }
147 DeviceProvider provider = getProvider(device.providerId()); 151 DeviceProvider provider = getProvider(device.providerId());
148 if (provider != null) { 152 if (provider != null) {
149 provider.roleChanged(device, newRole); 153 provider.roleChanged(device, newRole);
...@@ -193,16 +197,38 @@ public class DeviceManager ...@@ -193,16 +197,38 @@ public class DeviceManager
193 checkNotNull(deviceId, DEVICE_ID_NULL); 197 checkNotNull(deviceId, DEVICE_ID_NULL);
194 checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL); 198 checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL);
195 checkValidity(); 199 checkValidity();
200 +
201 + log.info("Device {} connected", deviceId);
202 + // check my Role
203 + MastershipRole role = mastershipService.requestRoleFor(deviceId);
204 +
205 + if (role != MastershipRole.MASTER) {
206 + // TODO: Do we need to tell the Provider that
207 + // I am no longer the MASTER?
208 + return;
209 + }
210 +
211 + // Master:
212 + MastershipTerm term = mastershipService.requestTermService()
213 + .getMastershipTerm(deviceId);
214 + if (!term.master().equals(clusterService.getLocalNode().id())) {
215 + // I've lost mastership after I thought I was MASTER.
216 + return;
217 + }
218 + clockProviderService.setMastershipTerm(deviceId, term);
219 +
196 DeviceEvent event = store.createOrUpdateDevice(provider().id(), 220 DeviceEvent event = store.createOrUpdateDevice(provider().id(),
197 deviceId, deviceDescription); 221 deviceId, deviceDescription);
198 222
199 - // If there was a change of any kind, trigger role selection 223 + // If there was a change of any kind, tell the provider
200 - // process. 224 + // I am the master.
225 + // Note: can be null, if mastership was lost.
201 if (event != null) { 226 if (event != null) {
202 - log.info("Device {} connected", deviceId); 227 + // TODO: Check switch reconnected case, is it assured that
203 - mastershipService.requestRoleFor(deviceId); 228 + // event will not be null?
204 - provider().roleChanged(event.subject(), 229 +
205 - mastershipService.requestRoleFor(deviceId)); 230 + // FIXME: 1st argument should be deviceId
231 + provider().roleChanged(event.subject(), role);
206 post(event); 232 post(event);
207 } 233 }
208 } 234 }
...@@ -211,6 +237,10 @@ public class DeviceManager ...@@ -211,6 +237,10 @@ public class DeviceManager
211 public void deviceDisconnected(DeviceId deviceId) { 237 public void deviceDisconnected(DeviceId deviceId) {
212 checkNotNull(deviceId, DEVICE_ID_NULL); 238 checkNotNull(deviceId, DEVICE_ID_NULL);
213 checkValidity(); 239 checkValidity();
240 + if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) {
241 + log.debug("Device {} disconnected, but I am not the master", deviceId);
242 + return;
243 + }
214 DeviceEvent event = store.markOffline(deviceId); 244 DeviceEvent event = store.markOffline(deviceId);
215 245
216 //we're no longer capable of mastership. 246 //we're no longer capable of mastership.
...@@ -272,9 +302,15 @@ public class DeviceManager ...@@ -272,9 +302,15 @@ public class DeviceManager
272 @Override 302 @Override
273 public void event(MastershipEvent event) { 303 public void event(MastershipEvent event) {
274 if (event.master().equals(clusterService.getLocalNode().id())) { 304 if (event.master().equals(clusterService.getLocalNode().id())) {
305 +
275 MastershipTerm term = mastershipService.requestTermService() 306 MastershipTerm term = mastershipService.requestTermService()
276 .getMastershipTerm(event.subject()); 307 .getMastershipTerm(event.subject());
277 - clockProviderService.setMastershipTerm(event.subject(), term); 308 +
309 + if (term.master().equals(clusterService.getLocalNode().id())) {
310 + // only set if I am the master
311 + clockProviderService.setMastershipTerm(event.subject(), term);
312 + }
313 +
278 applyRole(event.subject(), MastershipRole.MASTER); 314 applyRole(event.subject(), MastershipRole.MASTER);
279 } else { 315 } else {
280 applyRole(event.subject(), MastershipRole.STANDBY); 316 applyRole(event.subject(), MastershipRole.STANDBY);
......
...@@ -5,6 +5,10 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -5,6 +5,10 @@ import static org.slf4j.LoggerFactory.getLogger;
5 5
6 import java.util.Iterator; 6 import java.util.Iterator;
7 import java.util.List; 7 import java.util.List;
8 +import java.util.concurrent.ExecutionException;
9 +import java.util.concurrent.Future;
10 +import java.util.concurrent.TimeUnit;
11 +import java.util.concurrent.TimeoutException;
8 12
9 import org.apache.felix.scr.annotations.Activate; 13 import org.apache.felix.scr.annotations.Activate;
10 import org.apache.felix.scr.annotations.Component; 14 import org.apache.felix.scr.annotations.Component;
...@@ -18,8 +22,11 @@ import org.onlab.onos.event.EventDeliveryService; ...@@ -18,8 +22,11 @@ import org.onlab.onos.event.EventDeliveryService;
18 import org.onlab.onos.net.Device; 22 import org.onlab.onos.net.Device;
19 import org.onlab.onos.net.DeviceId; 23 import org.onlab.onos.net.DeviceId;
20 import org.onlab.onos.net.device.DeviceService; 24 import org.onlab.onos.net.device.DeviceService;
25 +import org.onlab.onos.net.flow.CompletedBatchOperation;
21 import org.onlab.onos.net.flow.FlowEntry; 26 import org.onlab.onos.net.flow.FlowEntry;
22 import org.onlab.onos.net.flow.FlowRule; 27 import org.onlab.onos.net.flow.FlowRule;
28 +import org.onlab.onos.net.flow.FlowRuleBatchEntry;
29 +import org.onlab.onos.net.flow.FlowRuleBatchOperation;
23 import org.onlab.onos.net.flow.FlowRuleEvent; 30 import org.onlab.onos.net.flow.FlowRuleEvent;
24 import org.onlab.onos.net.flow.FlowRuleListener; 31 import org.onlab.onos.net.flow.FlowRuleListener;
25 import org.onlab.onos.net.flow.FlowRuleProvider; 32 import org.onlab.onos.net.flow.FlowRuleProvider;
...@@ -32,7 +39,9 @@ import org.onlab.onos.net.provider.AbstractProviderRegistry; ...@@ -32,7 +39,9 @@ import org.onlab.onos.net.provider.AbstractProviderRegistry;
32 import org.onlab.onos.net.provider.AbstractProviderService; 39 import org.onlab.onos.net.provider.AbstractProviderService;
33 import org.slf4j.Logger; 40 import org.slf4j.Logger;
34 41
42 +import com.google.common.collect.ArrayListMultimap;
35 import com.google.common.collect.Lists; 43 import com.google.common.collect.Lists;
44 +import com.google.common.collect.Multimap;
36 45
37 /** 46 /**
38 * Provides implementation of the flow NB &amp; SB APIs. 47 * Provides implementation of the flow NB &amp; SB APIs.
...@@ -131,6 +140,38 @@ public class FlowRuleManager ...@@ -131,6 +140,38 @@ public class FlowRuleManager
131 } 140 }
132 141
133 @Override 142 @Override
143 + public Future<CompletedBatchOperation> applyBatch(
144 + FlowRuleBatchOperation batch) {
145 + Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
146 + ArrayListMultimap.create();
147 + List<Future<Void>> futures = Lists.newArrayList();
148 + for (FlowRuleBatchEntry fbe : batch.getOperations()) {
149 + final FlowRule f = fbe.getTarget();
150 + final Device device = deviceService.getDevice(f.deviceId());
151 + final FlowRuleProvider frp = getProvider(device.providerId());
152 + batches.put(frp, fbe);
153 + switch (fbe.getOperator()) {
154 + case ADD:
155 + store.storeFlowRule(f);
156 + break;
157 + case REMOVE:
158 + store.deleteFlowRule(f);
159 + break;
160 + case MODIFY:
161 + default:
162 + log.error("Batch operation type {} unsupported.", fbe.getOperator());
163 + }
164 + }
165 + for (FlowRuleProvider provider : batches.keySet()) {
166 + FlowRuleBatchOperation b =
167 + new FlowRuleBatchOperation(batches.get(provider));
168 + Future<Void> future = provider.executeBatch(b);
169 + futures.add(future);
170 + }
171 + return new FlowRuleBatchFuture(futures);
172 + }
173 +
174 + @Override
134 public void addListener(FlowRuleListener listener) { 175 public void addListener(FlowRuleListener listener) {
135 listenerRegistry.addListener(listener); 176 listenerRegistry.addListener(listener);
136 } 177 }
...@@ -296,4 +337,63 @@ public class FlowRuleManager ...@@ -296,4 +337,63 @@ public class FlowRuleManager
296 eventDispatcher.post(event); 337 eventDispatcher.post(event);
297 } 338 }
298 } 339 }
340 +
341 + private class FlowRuleBatchFuture
342 + implements Future<CompletedBatchOperation> {
343 +
344 + private final List<Future<Void>> futures;
345 +
346 + public FlowRuleBatchFuture(List<Future<Void>> futures) {
347 + this.futures = futures;
348 + }
349 +
350 + @Override
351 + public boolean cancel(boolean mayInterruptIfRunning) {
352 + // TODO Auto-generated method stub
353 + return false;
354 + }
355 +
356 + @Override
357 + public boolean isCancelled() {
358 + // TODO Auto-generated method stub
359 + return false;
360 + }
361 +
362 + @Override
363 + public boolean isDone() {
364 + boolean isDone = true;
365 + for (Future<Void> future : futures) {
366 + isDone &= future.isDone();
367 + }
368 + return isDone;
369 + }
370 +
371 + @Override
372 + public CompletedBatchOperation get() throws InterruptedException,
373 + ExecutionException {
374 + // TODO Auto-generated method stub
375 + for (Future<Void> future : futures) {
376 + future.get();
377 + }
378 + return new CompletedBatchOperation();
379 + }
380 +
381 + @Override
382 + public CompletedBatchOperation get(long timeout, TimeUnit unit)
383 + throws InterruptedException, ExecutionException,
384 + TimeoutException {
385 + // TODO we should decrement the timeout
386 + long start = System.nanoTime();
387 + long end = start + unit.toNanos(timeout);
388 + for (Future<Void> future : futures) {
389 + long now = System.nanoTime();
390 + long thisTimeout = end - now;
391 + future.get(thisTimeout, TimeUnit.NANOSECONDS);
392 + }
393 + return new CompletedBatchOperation();
394 + }
395 +
396 + }
397 +
398 +
299 } 399 }
......
...@@ -4,6 +4,8 @@ import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder; ...@@ -4,6 +4,8 @@ import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder;
4 import static org.slf4j.LoggerFactory.getLogger; 4 import static org.slf4j.LoggerFactory.getLogger;
5 5
6 import java.util.Iterator; 6 import java.util.Iterator;
7 +import java.util.List;
8 +import java.util.concurrent.ExecutionException;
7 9
8 import org.apache.felix.scr.annotations.Activate; 10 import org.apache.felix.scr.annotations.Activate;
9 import org.apache.felix.scr.annotations.Component; 11 import org.apache.felix.scr.annotations.Component;
...@@ -16,6 +18,9 @@ import org.onlab.onos.net.Link; ...@@ -16,6 +18,9 @@ import org.onlab.onos.net.Link;
16 import org.onlab.onos.net.flow.DefaultFlowRule; 18 import org.onlab.onos.net.flow.DefaultFlowRule;
17 import org.onlab.onos.net.flow.DefaultTrafficSelector; 19 import org.onlab.onos.net.flow.DefaultTrafficSelector;
18 import org.onlab.onos.net.flow.FlowRule; 20 import org.onlab.onos.net.flow.FlowRule;
21 +import org.onlab.onos.net.flow.FlowRuleBatchEntry;
22 +import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
23 +import org.onlab.onos.net.flow.FlowRuleBatchOperation;
19 import org.onlab.onos.net.flow.FlowRuleService; 24 import org.onlab.onos.net.flow.FlowRuleService;
20 import org.onlab.onos.net.flow.TrafficSelector; 25 import org.onlab.onos.net.flow.TrafficSelector;
21 import org.onlab.onos.net.flow.TrafficTreatment; 26 import org.onlab.onos.net.flow.TrafficTreatment;
...@@ -24,6 +29,8 @@ import org.onlab.onos.net.intent.IntentInstaller; ...@@ -24,6 +29,8 @@ import org.onlab.onos.net.intent.IntentInstaller;
24 import org.onlab.onos.net.intent.PathIntent; 29 import org.onlab.onos.net.intent.PathIntent;
25 import org.slf4j.Logger; 30 import org.slf4j.Logger;
26 31
32 +import com.google.common.collect.Lists;
33 +
27 /** 34 /**
28 * Installer for {@link PathIntent path connectivity intents}. 35 * Installer for {@link PathIntent path connectivity intents}.
29 */ 36 */
...@@ -56,19 +63,27 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> { ...@@ -56,19 +63,27 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
56 DefaultTrafficSelector.builder(intent.selector()); 63 DefaultTrafficSelector.builder(intent.selector());
57 Iterator<Link> links = intent.path().links().iterator(); 64 Iterator<Link> links = intent.path().links().iterator();
58 ConnectPoint prev = links.next().dst(); 65 ConnectPoint prev = links.next().dst();
59 - 66 + List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
60 while (links.hasNext()) { 67 while (links.hasNext()) {
61 builder.matchInport(prev.port()); 68 builder.matchInport(prev.port());
62 Link link = links.next(); 69 Link link = links.next();
63 TrafficTreatment treatment = builder() 70 TrafficTreatment treatment = builder()
64 .setOutput(link.src().port()).build(); 71 .setOutput(link.src().port()).build();
72 +
65 FlowRule rule = new DefaultFlowRule(link.src().deviceId(), 73 FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
66 builder.build(), treatment, 74 builder.build(), treatment,
67 123, appId, 600); 75 123, appId, 600);
68 - flowRuleService.applyFlowRules(rule); 76 + rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
77 + //flowRuleService.applyFlowRules(rule);
69 prev = link.dst(); 78 prev = link.dst();
70 } 79 }
71 - 80 + FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
81 + try {
82 + flowRuleService.applyBatch(batch).get();
83 + } catch (InterruptedException | ExecutionException e) {
84 + // TODO Auto-generated catch block
85 + e.printStackTrace();
86 + }
72 } 87 }
73 88
74 @Override 89 @Override
...@@ -77,6 +92,7 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> { ...@@ -77,6 +92,7 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
77 DefaultTrafficSelector.builder(intent.selector()); 92 DefaultTrafficSelector.builder(intent.selector());
78 Iterator<Link> links = intent.path().links().iterator(); 93 Iterator<Link> links = intent.path().links().iterator();
79 ConnectPoint prev = links.next().dst(); 94 ConnectPoint prev = links.next().dst();
95 + List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
80 96
81 while (links.hasNext()) { 97 while (links.hasNext()) {
82 builder.matchInport(prev.port()); 98 builder.matchInport(prev.port());
...@@ -86,9 +102,16 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> { ...@@ -86,9 +102,16 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
86 FlowRule rule = new DefaultFlowRule(link.src().deviceId(), 102 FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
87 builder.build(), treatment, 103 builder.build(), treatment,
88 123, appId, 600); 104 123, appId, 600);
89 - 105 + rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
90 - flowRuleService.removeFlowRules(rule); 106 + //flowRuleService.removeFlowRules(rule);
91 prev = link.dst(); 107 prev = link.dst();
92 } 108 }
109 + FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
110 + try {
111 + flowRuleService.applyBatch(batch).get();
112 + } catch (InterruptedException | ExecutionException e) {
113 + // TODO Auto-generated catch block
114 + e.printStackTrace();
115 + }
93 } 116 }
94 } 117 }
......
...@@ -12,6 +12,7 @@ import java.util.ArrayList; ...@@ -12,6 +12,7 @@ import java.util.ArrayList;
12 import java.util.Collections; 12 import java.util.Collections;
13 import java.util.List; 13 import java.util.List;
14 import java.util.Set; 14 import java.util.Set;
15 +import java.util.concurrent.Future;
15 16
16 import org.junit.After; 17 import org.junit.After;
17 import org.junit.Before; 18 import org.junit.Before;
...@@ -32,6 +33,7 @@ import org.onlab.onos.net.flow.DefaultFlowRule; ...@@ -32,6 +33,7 @@ import org.onlab.onos.net.flow.DefaultFlowRule;
32 import org.onlab.onos.net.flow.FlowEntry; 33 import org.onlab.onos.net.flow.FlowEntry;
33 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState; 34 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
34 import org.onlab.onos.net.flow.FlowRule; 35 import org.onlab.onos.net.flow.FlowRule;
36 +import org.onlab.onos.net.flow.FlowRuleBatchEntry;
35 import org.onlab.onos.net.flow.FlowRuleEvent; 37 import org.onlab.onos.net.flow.FlowRuleEvent;
36 import org.onlab.onos.net.flow.FlowRuleListener; 38 import org.onlab.onos.net.flow.FlowRuleListener;
37 import org.onlab.onos.net.flow.FlowRuleProvider; 39 import org.onlab.onos.net.flow.FlowRuleProvider;
...@@ -42,6 +44,7 @@ import org.onlab.onos.net.flow.TrafficSelector; ...@@ -42,6 +44,7 @@ import org.onlab.onos.net.flow.TrafficSelector;
42 import org.onlab.onos.net.flow.TrafficTreatment; 44 import org.onlab.onos.net.flow.TrafficTreatment;
43 import org.onlab.onos.net.flow.criteria.Criterion; 45 import org.onlab.onos.net.flow.criteria.Criterion;
44 import org.onlab.onos.net.flow.instructions.Instruction; 46 import org.onlab.onos.net.flow.instructions.Instruction;
47 +import org.onlab.onos.net.intent.BatchOperation;
45 import org.onlab.onos.net.provider.AbstractProvider; 48 import org.onlab.onos.net.provider.AbstractProvider;
46 import org.onlab.onos.net.provider.ProviderId; 49 import org.onlab.onos.net.provider.ProviderId;
47 import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore; 50 import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
...@@ -404,6 +407,13 @@ public class FlowRuleManagerTest { ...@@ -404,6 +407,13 @@ public class FlowRuleManagerTest {
404 public void removeRulesById(ApplicationId id, FlowRule... flowRules) { 407 public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
405 } 408 }
406 409
410 + @Override
411 + public Future<Void> executeBatch(
412 + BatchOperation<FlowRuleBatchEntry> batch) {
413 + // TODO Auto-generated method stub
414 + return null;
415 + }
416 +
407 417
408 } 418 }
409 419
......
...@@ -45,4 +45,9 @@ public class MessageSubject { ...@@ -45,4 +45,9 @@ public class MessageSubject {
45 MessageSubject that = (MessageSubject) obj; 45 MessageSubject that = (MessageSubject) obj;
46 return Objects.equals(this.value, that.value); 46 return Objects.equals(this.value, that.value);
47 } 47 }
48 +
49 + // for serializer
50 + protected MessageSubject() {
51 + this.value = "";
52 + }
48 } 53 }
......
...@@ -3,11 +3,11 @@ package org.onlab.onos.store.cluster.messaging.impl; ...@@ -3,11 +3,11 @@ package org.onlab.onos.store.cluster.messaging.impl;
3 import static com.google.common.base.Preconditions.checkArgument; 3 import static com.google.common.base.Preconditions.checkArgument;
4 4
5 import java.io.IOException; 5 import java.io.IOException;
6 -import java.util.HashMap;
7 -import java.util.Map;
8 import java.util.Set; 6 import java.util.Set;
9 import java.util.Timer; 7 import java.util.Timer;
10 import java.util.TimerTask; 8 import java.util.TimerTask;
9 +import java.util.concurrent.TimeUnit;
10 +import java.util.concurrent.TimeoutException;
11 11
12 import org.apache.felix.scr.annotations.Activate; 12 import org.apache.felix.scr.annotations.Activate;
13 import org.apache.felix.scr.annotations.Component; 13 import org.apache.felix.scr.annotations.Component;
...@@ -26,14 +26,17 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; ...@@ -26,14 +26,17 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
26 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 26 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
27 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; 27 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
28 import org.onlab.onos.store.cluster.messaging.MessageSubject; 28 import org.onlab.onos.store.cluster.messaging.MessageSubject;
29 +import org.onlab.onos.store.serializers.ClusterMessageSerializer;
29 import org.onlab.onos.store.serializers.KryoPoolUtil; 30 import org.onlab.onos.store.serializers.KryoPoolUtil;
30 import org.onlab.onos.store.serializers.KryoSerializer; 31 import org.onlab.onos.store.serializers.KryoSerializer;
32 +import org.onlab.onos.store.serializers.MessageSubjectSerializer;
31 import org.onlab.util.KryoPool; 33 import org.onlab.util.KryoPool;
32 import org.onlab.netty.Endpoint; 34 import org.onlab.netty.Endpoint;
33 import org.onlab.netty.Message; 35 import org.onlab.netty.Message;
34 import org.onlab.netty.MessageHandler; 36 import org.onlab.netty.MessageHandler;
35 import org.onlab.netty.MessagingService; 37 import org.onlab.netty.MessagingService;
36 import org.onlab.netty.NettyMessagingService; 38 import org.onlab.netty.NettyMessagingService;
39 +import org.onlab.netty.Response;
37 import org.slf4j.Logger; 40 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory; 41 import org.slf4j.LoggerFactory;
39 42
...@@ -50,8 +53,6 @@ public class ClusterCommunicationManager ...@@ -50,8 +53,6 @@ public class ClusterCommunicationManager
50 private ClusterService clusterService; 53 private ClusterService clusterService;
51 54
52 private ClusterNodesDelegate nodesDelegate; 55 private ClusterNodesDelegate nodesDelegate;
53 - // FIXME: `members` should go away and should be using ClusterService
54 - private Map<NodeId, ControllerNode> members = new HashMap<>();
55 private final Timer timer = new Timer("onos-controller-heatbeats"); 56 private final Timer timer = new Timer("onos-controller-heatbeats");
56 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L; 57 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
57 58
...@@ -59,11 +60,14 @@ public class ClusterCommunicationManager ...@@ -59,11 +60,14 @@ public class ClusterCommunicationManager
59 private MessagingService messagingService; 60 private MessagingService messagingService;
60 61
61 private static final KryoSerializer SERIALIZER = new KryoSerializer() { 62 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
63 + @Override
62 protected void setupKryoPool() { 64 protected void setupKryoPool() {
63 serializerPool = KryoPool.newBuilder() 65 serializerPool = KryoPool.newBuilder()
64 .register(KryoPoolUtil.API) 66 .register(KryoPoolUtil.API)
65 - .register(ClusterMessage.class) 67 + .register(ClusterMessage.class, new ClusterMessageSerializer())
66 .register(ClusterMembershipEvent.class) 68 .register(ClusterMembershipEvent.class)
69 + .register(byte[].class)
70 + .register(MessageSubject.class, new MessageSubjectSerializer())
67 .build() 71 .build()
68 .populate(1); 72 .populate(1);
69 } 73 }
...@@ -73,7 +77,15 @@ public class ClusterCommunicationManager ...@@ -73,7 +77,15 @@ public class ClusterCommunicationManager
73 @Activate 77 @Activate
74 public void activate() { 78 public void activate() {
75 localNode = clusterService.getLocalNode(); 79 localNode = clusterService.getLocalNode();
76 - messagingService = new NettyMessagingService(localNode.tcpPort()); 80 + NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
81 + // FIXME: workaround until it becomes a service.
82 + try {
83 + netty.activate();
84 + } catch (Exception e) {
85 + // TODO Auto-generated catch block
86 + log.error("NettyMessagingService#activate", e);
87 + }
88 + messagingService = netty;
77 log.info("Started"); 89 log.info("Started");
78 } 90 }
79 91
...@@ -86,7 +98,7 @@ public class ClusterCommunicationManager ...@@ -86,7 +98,7 @@ public class ClusterCommunicationManager
86 @Override 98 @Override
87 public boolean broadcast(ClusterMessage message) { 99 public boolean broadcast(ClusterMessage message) {
88 boolean ok = true; 100 boolean ok = true;
89 - for (ControllerNode node : members.values()) { 101 + for (ControllerNode node : clusterService.getNodes()) {
90 if (!node.equals(localNode)) { 102 if (!node.equals(localNode)) {
91 ok = unicast(message, node.id()) && ok; 103 ok = unicast(message, node.id()) && ok;
92 } 104 }
...@@ -107,13 +119,17 @@ public class ClusterCommunicationManager ...@@ -107,13 +119,17 @@ public class ClusterCommunicationManager
107 119
108 @Override 120 @Override
109 public boolean unicast(ClusterMessage message, NodeId toNodeId) { 121 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
110 - ControllerNode node = members.get(toNodeId); 122 + ControllerNode node = clusterService.getNode(toNodeId);
111 checkArgument(node != null, "Unknown nodeId: %s", toNodeId); 123 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
112 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort()); 124 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
113 try { 125 try {
114 - messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message)); 126 + log.info("sending...");
127 + Response resp = messagingService.sendAndReceive(nodeEp,
128 + message.subject().value(), SERIALIZER.encode(message));
129 + resp.get(1, TimeUnit.SECONDS);
130 + log.info("sent...");
115 return true; 131 return true;
116 - } catch (IOException e) { 132 + } catch (IOException | TimeoutException e) {
117 log.error("Failed to send cluster message to nodeId: " + toNodeId, e); 133 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
118 } 134 }
119 135
...@@ -137,7 +153,7 @@ public class ClusterCommunicationManager ...@@ -137,7 +153,7 @@ public class ClusterCommunicationManager
137 153
138 @Override 154 @Override
139 public void addNode(ControllerNode node) { 155 public void addNode(ControllerNode node) {
140 - members.put(node.id(), node); 156 + //members.put(node.id(), node);
141 } 157 }
142 158
143 @Override 159 @Override
...@@ -146,7 +162,7 @@ public class ClusterCommunicationManager ...@@ -146,7 +162,7 @@ public class ClusterCommunicationManager
146 localNode.id(), 162 localNode.id(),
147 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), 163 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
148 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)))); 164 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
149 - members.remove(node.id()); 165 + //members.remove(node.id());
150 } 166 }
151 167
152 // Sends a heart beat to all peers. 168 // Sends a heart beat to all peers.
...@@ -181,7 +197,8 @@ public class ClusterCommunicationManager ...@@ -181,7 +197,8 @@ public class ClusterCommunicationManager
181 } 197 }
182 } 198 }
183 199
184 - private static class InternalClusterMessageHandler implements MessageHandler { 200 + // FIXME: revert static
201 + private class InternalClusterMessageHandler implements MessageHandler {
185 202
186 private final ClusterMessageHandler handler; 203 private final ClusterMessageHandler handler;
187 204
...@@ -191,8 +208,18 @@ public class ClusterCommunicationManager ...@@ -191,8 +208,18 @@ public class ClusterCommunicationManager
191 208
192 @Override 209 @Override
193 public void handle(Message message) { 210 public void handle(Message message) {
194 - ClusterMessage clusterMessage = SERIALIZER.decode(message.payload()); 211 + // FIXME: remove me
195 - handler.handle(clusterMessage); 212 + log.info("InternalClusterMessageHandler.handle({})", message);
213 + try {
214 + log.info("before decode");
215 + ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
216 + log.info("Subject:({}), Sender:({})", clusterMessage.subject(), clusterMessage.sender());
217 + handler.handle(clusterMessage);
218 + message.respond("ACK".getBytes());
219 + } catch (Exception e) {
220 + // TODO Auto-generated catch block
221 + log.error("failed", e);
222 + }
196 } 223 }
197 } 224 }
198 } 225 }
......
...@@ -113,6 +113,7 @@ public class GossipDeviceStore ...@@ -113,6 +113,7 @@ public class GossipDeviceStore
113 protected ClusterService clusterService; 113 protected ClusterService clusterService;
114 114
115 private static final KryoSerializer SERIALIZER = new KryoSerializer() { 115 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
116 + @Override
116 protected void setupKryoPool() { 117 protected void setupKryoPool() {
117 serializerPool = KryoPool.newBuilder() 118 serializerPool = KryoPool.newBuilder()
118 .register(KryoPoolUtil.API) 119 .register(KryoPoolUtil.API)
......
...@@ -35,4 +35,11 @@ public class InternalDeviceEvent { ...@@ -35,4 +35,11 @@ public class InternalDeviceEvent {
35 public Timestamped<DeviceDescription> deviceDescription() { 35 public Timestamped<DeviceDescription> deviceDescription() {
36 return deviceDescription; 36 return deviceDescription;
37 } 37 }
38 +
39 + // for serializer
40 + protected InternalDeviceEvent() {
41 + this.providerId = null;
42 + this.deviceId = null;
43 + this.deviceDescription = null;
44 + }
38 } 45 }
......
...@@ -37,4 +37,11 @@ public class InternalPortEvent { ...@@ -37,4 +37,11 @@ public class InternalPortEvent {
37 public Timestamped<List<PortDescription>> portDescriptions() { 37 public Timestamped<List<PortDescription>> portDescriptions() {
38 return portDescriptions; 38 return portDescriptions;
39 } 39 }
40 +
41 + // for serializer
42 + protected InternalPortEvent() {
43 + this.providerId = null;
44 + this.deviceId = null;
45 + this.portDescriptions = null;
46 + }
40 } 47 }
......
...@@ -35,4 +35,11 @@ public class InternalPortStatusEvent { ...@@ -35,4 +35,11 @@ public class InternalPortStatusEvent {
35 public Timestamped<PortDescription> portDescription() { 35 public Timestamped<PortDescription> portDescription() {
36 return portDescription; 36 return portDescription;
37 } 37 }
38 +
39 + // for serializer
40 + protected InternalPortStatusEvent() {
41 + this.providerId = null;
42 + this.deviceId = null;
43 + this.portDescription = null;
44 + }
38 } 45 }
......
...@@ -3,7 +3,6 @@ package org.onlab.onos.store.serializers; ...@@ -3,7 +3,6 @@ package org.onlab.onos.store.serializers;
3 import org.onlab.onos.cluster.NodeId; 3 import org.onlab.onos.cluster.NodeId;
4 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 4 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
5 import org.onlab.onos.store.cluster.messaging.MessageSubject; 5 import org.onlab.onos.store.cluster.messaging.MessageSubject;
6 -
7 import com.esotericsoftware.kryo.Kryo; 6 import com.esotericsoftware.kryo.Kryo;
8 import com.esotericsoftware.kryo.Serializer; 7 import com.esotericsoftware.kryo.Serializer;
9 import com.esotericsoftware.kryo.io.Input; 8 import com.esotericsoftware.kryo.io.Input;
...@@ -11,6 +10,9 @@ import com.esotericsoftware.kryo.io.Output; ...@@ -11,6 +10,9 @@ import com.esotericsoftware.kryo.io.Output;
11 10
12 public final class ClusterMessageSerializer extends Serializer<ClusterMessage> { 11 public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
13 12
13 + /**
14 + * Creates a serializer for {@link ClusterMessage}.
15 + */
14 public ClusterMessageSerializer() { 16 public ClusterMessageSerializer() {
15 // does not accept null 17 // does not accept null
16 super(false); 18 super(false);
......
...@@ -14,7 +14,7 @@ import com.esotericsoftware.kryo.io.Output; ...@@ -14,7 +14,7 @@ import com.esotericsoftware.kryo.io.Output;
14 public class MastershipBasedTimestampSerializer extends Serializer<MastershipBasedTimestamp> { 14 public class MastershipBasedTimestampSerializer extends Serializer<MastershipBasedTimestamp> {
15 15
16 /** 16 /**
17 - * Default constructor. 17 + * Creates a serializer for {@link MastershipBasedTimestamp}.
18 */ 18 */
19 public MastershipBasedTimestampSerializer() { 19 public MastershipBasedTimestampSerializer() {
20 // non-null, immutable 20 // non-null, immutable
......
1 +package org.onlab.onos.store.serializers;
2 +
3 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
4 +
5 +import com.esotericsoftware.kryo.Kryo;
6 +import com.esotericsoftware.kryo.Serializer;
7 +import com.esotericsoftware.kryo.io.Input;
8 +import com.esotericsoftware.kryo.io.Output;
9 +
10 +public final class MessageSubjectSerializer extends Serializer<MessageSubject> {
11 +
12 + /**
13 + * Creates a serializer for {@link MessageSubject}.
14 + */
15 + public MessageSubjectSerializer() {
16 + // non-null, immutable
17 + super(false, true);
18 + }
19 +
20 +
21 + @Override
22 + public void write(Kryo kryo, Output output, MessageSubject object) {
23 + output.writeString(object.value());
24 + }
25 +
26 + @Override
27 + public MessageSubject read(Kryo kryo, Input input,
28 + Class<MessageSubject> type) {
29 + return new MessageSubject(input.readString());
30 + }
31 +}
...@@ -119,8 +119,8 @@ implements MastershipStore { ...@@ -119,8 +119,8 @@ implements MastershipStore {
119 119
120 @Override 120 @Override
121 public MastershipTerm getTermFor(DeviceId deviceId) { 121 public MastershipTerm getTermFor(DeviceId deviceId) {
122 - // TODO Auto-generated method stub 122 + // FIXME: implement this
123 - return null; 123 + return MastershipTerm.of(getMaster(deviceId), 1);
124 } 124 }
125 125
126 @Override 126 @Override
......
...@@ -68,7 +68,7 @@ public class FlowModBuilder { ...@@ -68,7 +68,7 @@ public class FlowModBuilder {
68 this.cookie = flowRule.id(); 68 this.cookie = flowRule.id();
69 } 69 }
70 70
71 - public OFFlowMod buildFlowMod() { 71 + public OFFlowMod buildFlowAdd() {
72 Match match = buildMatch(); 72 Match match = buildMatch();
73 List<OFAction> actions = buildActions(); 73 List<OFAction> actions = buildActions();
74 74
...@@ -86,6 +86,24 @@ public class FlowModBuilder { ...@@ -86,6 +86,24 @@ public class FlowModBuilder {
86 86
87 } 87 }
88 88
89 + public OFFlowMod buildFlowMod() {
90 + Match match = buildMatch();
91 + List<OFAction> actions = buildActions();
92 +
93 + //TODO: what to do without bufferid? do we assume that there will be a pktout as well?
94 + OFFlowMod fm = factory.buildFlowModify()
95 + .setCookie(U64.of(cookie.value()))
96 + .setBufferId(OFBufferId.NO_BUFFER)
97 + .setActions(actions)
98 + .setMatch(match)
99 + .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
100 + .setPriority(priority)
101 + .build();
102 +
103 + return fm;
104 +
105 + }
106 +
89 public OFFlowMod buildFlowDel() { 107 public OFFlowMod buildFlowDel() {
90 Match match = buildMatch(); 108 Match match = buildMatch();
91 List<OFAction> actions = buildActions(); 109 List<OFAction> actions = buildActions();
......
...@@ -2,8 +2,17 @@ package org.onlab.onos.provider.of.flow.impl; ...@@ -2,8 +2,17 @@ package org.onlab.onos.provider.of.flow.impl;
2 2
3 import static org.slf4j.LoggerFactory.getLogger; 3 import static org.slf4j.LoggerFactory.getLogger;
4 4
5 +import java.util.HashSet;
5 import java.util.List; 6 import java.util.List;
6 import java.util.Map; 7 import java.util.Map;
8 +import java.util.Set;
9 +import java.util.concurrent.ConcurrentHashMap;
10 +import java.util.concurrent.CountDownLatch;
11 +import java.util.concurrent.ExecutionException;
12 +import java.util.concurrent.Future;
13 +import java.util.concurrent.TimeUnit;
14 +import java.util.concurrent.TimeoutException;
15 +import java.util.concurrent.atomic.AtomicBoolean;
7 16
8 import org.apache.felix.scr.annotations.Activate; 17 import org.apache.felix.scr.annotations.Activate;
9 import org.apache.felix.scr.annotations.Component; 18 import org.apache.felix.scr.annotations.Component;
...@@ -14,9 +23,11 @@ import org.onlab.onos.ApplicationId; ...@@ -14,9 +23,11 @@ import org.onlab.onos.ApplicationId;
14 import org.onlab.onos.net.DeviceId; 23 import org.onlab.onos.net.DeviceId;
15 import org.onlab.onos.net.flow.FlowEntry; 24 import org.onlab.onos.net.flow.FlowEntry;
16 import org.onlab.onos.net.flow.FlowRule; 25 import org.onlab.onos.net.flow.FlowRule;
26 +import org.onlab.onos.net.flow.FlowRuleBatchEntry;
17 import org.onlab.onos.net.flow.FlowRuleProvider; 27 import org.onlab.onos.net.flow.FlowRuleProvider;
18 import org.onlab.onos.net.flow.FlowRuleProviderRegistry; 28 import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
19 import org.onlab.onos.net.flow.FlowRuleProviderService; 29 import org.onlab.onos.net.flow.FlowRuleProviderService;
30 +import org.onlab.onos.net.intent.BatchOperation;
20 import org.onlab.onos.net.provider.AbstractProvider; 31 import org.onlab.onos.net.provider.AbstractProvider;
21 import org.onlab.onos.net.provider.ProviderId; 32 import org.onlab.onos.net.provider.ProviderId;
22 import org.onlab.onos.net.topology.TopologyService; 33 import org.onlab.onos.net.topology.TopologyService;
...@@ -27,6 +38,8 @@ import org.onlab.onos.openflow.controller.OpenFlowSwitch; ...@@ -27,6 +38,8 @@ import org.onlab.onos.openflow.controller.OpenFlowSwitch;
27 import org.onlab.onos.openflow.controller.OpenFlowSwitchListener; 38 import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
28 import org.onlab.onos.openflow.controller.RoleState; 39 import org.onlab.onos.openflow.controller.RoleState;
29 import org.projectfloodlight.openflow.protocol.OFActionType; 40 import org.projectfloodlight.openflow.protocol.OFActionType;
41 +import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
42 +import org.projectfloodlight.openflow.protocol.OFErrorMsg;
30 import org.projectfloodlight.openflow.protocol.OFFlowRemoved; 43 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
31 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry; 44 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
32 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply; 45 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
...@@ -42,9 +55,11 @@ import org.projectfloodlight.openflow.protocol.action.OFActionOutput; ...@@ -42,9 +55,11 @@ import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
42 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction; 55 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
43 import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions; 56 import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
44 import org.projectfloodlight.openflow.types.OFPort; 57 import org.projectfloodlight.openflow.types.OFPort;
58 +import org.projectfloodlight.openflow.types.U32;
45 import org.slf4j.Logger; 59 import org.slf4j.Logger;
46 60
47 import com.google.common.collect.ArrayListMultimap; 61 import com.google.common.collect.ArrayListMultimap;
62 +import com.google.common.collect.Lists;
48 import com.google.common.collect.Maps; 63 import com.google.common.collect.Maps;
49 import com.google.common.collect.Multimap; 64 import com.google.common.collect.Multimap;
50 65
...@@ -70,6 +85,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -70,6 +85,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
70 85
71 private final InternalFlowProvider listener = new InternalFlowProvider(); 86 private final InternalFlowProvider listener = new InternalFlowProvider();
72 87
88 + private final Map<Long, InstallationFuture> pendingFutures =
89 + new ConcurrentHashMap<Long, InstallationFuture>();
90 +
73 /** 91 /**
74 * Creates an OpenFlow host provider. 92 * Creates an OpenFlow host provider.
75 */ 93 */
...@@ -101,7 +119,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -101,7 +119,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
101 119
102 private void applyRule(FlowRule flowRule) { 120 private void applyRule(FlowRule flowRule) {
103 OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri())); 121 OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
104 - sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod()); 122 + sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
105 } 123 }
106 124
107 125
...@@ -154,6 +172,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -154,6 +172,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
154 172
155 @Override 173 @Override
156 public void handleMessage(Dpid dpid, OFMessage msg) { 174 public void handleMessage(Dpid dpid, OFMessage msg) {
175 + InstallationFuture future = null;
157 switch (msg.getType()) { 176 switch (msg.getType()) {
158 case FLOW_REMOVED: 177 case FLOW_REMOVED:
159 //TODO: make this better 178 //TODO: make this better
...@@ -166,7 +185,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -166,7 +185,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
166 pushFlowMetrics(dpid, (OFStatsReply) msg); 185 pushFlowMetrics(dpid, (OFStatsReply) msg);
167 break; 186 break;
168 case BARRIER_REPLY: 187 case BARRIER_REPLY:
188 + future = pendingFutures.get(msg.getXid());
189 + if (future != null) {
190 + future.satisfyRequirement(dpid);
191 + }
192 + break;
169 case ERROR: 193 case ERROR:
194 + future = pendingFutures.get(msg.getXid());
195 + if (future != null) {
196 + future.fail((OFErrorMsg) msg, dpid);
197 + }
198 + break;
170 default: 199 default:
171 log.debug("Unhandled message type: {}", msg.getType()); 200 log.debug("Unhandled message type: {}", msg.getType());
172 } 201 }
...@@ -226,6 +255,144 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -226,6 +255,144 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
226 } 255 }
227 256
228 257
258 + @Override
259 + public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
260 + final Set<Dpid> sws = new HashSet<Dpid>();
261 +
262 + for (FlowRuleBatchEntry fbe : batch.getOperations()) {
263 + FlowRule flowRule = fbe.getTarget();
264 + OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
265 + sws.add(new Dpid(sw.getId()));
266 + switch (fbe.getOperator()) {
267 + case ADD:
268 + //TODO: Track XID for each flowmod
269 + sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
270 + break;
271 + case REMOVE:
272 + //TODO: Track XID for each flowmod
273 + sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
274 + break;
275 + case MODIFY:
276 + //TODO: Track XID for each flowmod
277 + sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
278 + break;
279 + default:
280 + log.error("Unsupported batch operation {}", fbe.getOperator());
281 + }
282 + }
283 + InstallationFuture installation = new InstallationFuture(sws);
284 + pendingFutures.put(U32.f(batch.hashCode()), installation);
285 + installation.verify(batch.hashCode());
286 + return installation;
287 + }
288 +
289 + private class InstallationFuture implements Future<Void> {
290 +
291 + private final Set<Dpid> sws;
292 + private final AtomicBoolean ok = new AtomicBoolean(true);
293 + private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
294 +
295 + private final CountDownLatch countDownLatch;
296 +
297 + public InstallationFuture(Set<Dpid> sws) {
298 + this.sws = sws;
299 + countDownLatch = new CountDownLatch(sws.size());
300 + }
301 +
302 + public void fail(OFErrorMsg msg, Dpid dpid) {
303 + ok.set(false);
304 + //TODO add reason to flowentry
305 + //TODO handle specific error msgs
306 + //offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
307 + switch (msg.getErrType()) {
308 + case BAD_ACTION:
309 + break;
310 + case BAD_INSTRUCTION:
311 + break;
312 + case BAD_MATCH:
313 + break;
314 + case BAD_REQUEST:
315 + break;
316 + case EXPERIMENTER:
317 + break;
318 + case FLOW_MOD_FAILED:
319 + break;
320 + case GROUP_MOD_FAILED:
321 + break;
322 + case HELLO_FAILED:
323 + break;
324 + case METER_MOD_FAILED:
325 + break;
326 + case PORT_MOD_FAILED:
327 + break;
328 + case QUEUE_OP_FAILED:
329 + break;
330 + case ROLE_REQUEST_FAILED:
331 + break;
332 + case SWITCH_CONFIG_FAILED:
333 + break;
334 + case TABLE_FEATURES_FAILED:
335 + break;
336 + case TABLE_MOD_FAILED:
337 + break;
338 + default:
339 + break;
340 +
341 + }
342 +
343 + }
344 +
345 + public void satisfyRequirement(Dpid dpid) {
346 + log.warn("Satisfaction from switch {}", dpid);
347 + sws.remove(controller.getSwitch(dpid));
348 + countDownLatch.countDown();
349 + }
350 +
351 + public void verify(Integer id) {
352 + for (Dpid dpid : sws) {
353 + OpenFlowSwitch sw = controller.getSwitch(dpid);
354 + OFBarrierRequest.Builder builder = sw.factory()
355 + .buildBarrierRequest()
356 + .setXid(id);
357 + sw.sendMsg(builder.build());
358 + }
229 359
230 360
361 + }
362 +
363 + @Override
364 + public boolean cancel(boolean mayInterruptIfRunning) {
365 + // TODO Auto-generated method stub
366 + return false;
367 + }
368 +
369 + @Override
370 + public boolean isCancelled() {
371 + // TODO Auto-generated method stub
372 + return false;
373 + }
374 +
375 + @Override
376 + public boolean isDone() {
377 + return sws.isEmpty();
378 + }
379 +
380 + @Override
381 + public Void get() throws InterruptedException, ExecutionException {
382 + countDownLatch.await();
383 + //return offendingFlowMods;
384 + return null;
385 + }
386 +
387 + @Override
388 + public Void get(long timeout, TimeUnit unit)
389 + throws InterruptedException, ExecutionException,
390 + TimeoutException {
391 + countDownLatch.await(timeout, unit);
392 + //return offendingFlowMods;
393 + return null;
394 + }
395 +
396 + }
397 +
231 } 398 }
......
...@@ -60,6 +60,13 @@ perl -pi.old -e "s|^(featuresBoot=.*)|\1,$ONOS_FEATURES|" \ ...@@ -60,6 +60,13 @@ perl -pi.old -e "s|^(featuresBoot=.*)|\1,$ONOS_FEATURES|" \
60 cp $M2_REPO/org/onlab/onos/onos-branding/$ONOS_VERSION/onos-branding-*.jar \ 60 cp $M2_REPO/org/onlab/onos/onos-branding/$ONOS_VERSION/onos-branding-*.jar \
61 $ONOS_STAGE/$KARAF_DIST/lib 61 $ONOS_STAGE/$KARAF_DIST/lib
62 62
63 +# Patch in the ONOS version file use the build number or the user name for
64 +# build postfix in place of the SNAPSHOT post-fix.
65 +build=${BUILD_NUMBER:-$(id -un)}
66 +grep '<version>' $ONOS_ROOT/pom.xml | head -n1 | \
67 + sed 's:.*<version>::g;s:</version>.*::g' | sed "s/SNAPSHOT/$build/g" \
68 + >> $ONOS_STAGE/VERSION
69 +
63 # Now package up the ONOS tar file 70 # Now package up the ONOS tar file
64 cd $ONOS_STAGE_ROOT 71 cd $ONOS_STAGE_ROOT
65 COPYFILE_DISABLE=1 tar zcf $ONOS_TAR $ONOS_BITS 72 COPYFILE_DISABLE=1 tar zcf $ONOS_TAR $ONOS_BITS
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
4 #------------------------------------------------------------------------------- 4 #-------------------------------------------------------------------------------
5 5
6 export JAVA_HOME=${JAVA_HOME:-/usr/lib/jvm/java-7-openjdk-amd64/} 6 export JAVA_HOME=${JAVA_HOME:-/usr/lib/jvm/java-7-openjdk-amd64/}
7 +export JAVA_OPTS="-Xms256M -Xmx2048M"
7 8
8 cd /opt/onos 9 cd /opt/onos
9 /opt/onos/apache-karaf-3.0.1/bin/karaf "$@" 10 /opt/onos/apache-karaf-3.0.1/bin/karaf "$@"
......
...@@ -4,6 +4,12 @@ import com.google.common.base.Strings; ...@@ -4,6 +4,12 @@ import com.google.common.base.Strings;
4 import com.google.common.primitives.UnsignedLongs; 4 import com.google.common.primitives.UnsignedLongs;
5 import com.google.common.util.concurrent.ThreadFactoryBuilder; 5 import com.google.common.util.concurrent.ThreadFactoryBuilder;
6 6
7 +import java.io.BufferedReader;
8 +import java.io.File;
9 +import java.io.FileReader;
10 +import java.io.IOException;
11 +import java.util.ArrayList;
12 +import java.util.List;
7 import java.util.concurrent.ThreadFactory; 13 import java.util.concurrent.ThreadFactory;
8 14
9 public abstract class Tools { 15 public abstract class Tools {
...@@ -66,4 +72,24 @@ public abstract class Tools { ...@@ -66,4 +72,24 @@ public abstract class Tools {
66 } 72 }
67 } 73 }
68 74
75 + /**
76 + * Slurps the contents of a file into a list of strings, one per line.
77 + *
78 + * @param path file path
79 + * @return file contents
80 + */
81 + public static List<String> slurp(File path) {
82 + try (BufferedReader br = new BufferedReader(new FileReader(path))) {
83 + List<String> lines = new ArrayList<>();
84 + String line;
85 + while ((line = br.readLine()) != null) {
86 + lines.add(line);
87 + }
88 + return lines;
89 +
90 + } catch (IOException e) {
91 + return null;
92 + }
93 + }
94 +
69 } 95 }
......
...@@ -8,11 +8,16 @@ import io.netty.handler.codec.ReplayingDecoder; ...@@ -8,11 +8,16 @@ import io.netty.handler.codec.ReplayingDecoder;
8 import java.util.Arrays; 8 import java.util.Arrays;
9 import java.util.List; 9 import java.util.List;
10 10
11 +import org.slf4j.Logger;
12 +import org.slf4j.LoggerFactory;
13 +
11 /** 14 /**
12 * Decoder for inbound messages. 15 * Decoder for inbound messages.
13 */ 16 */
14 public class MessageDecoder extends ReplayingDecoder<DecoderState> { 17 public class MessageDecoder extends ReplayingDecoder<DecoderState> {
15 18
19 + private final Logger log = LoggerFactory.getLogger(getClass());
20 +
16 private final NettyMessagingService messagingService; 21 private final NettyMessagingService messagingService;
17 22
18 private static final KryoSerializer SERIALIZER = new KryoSerializer(); 23 private static final KryoSerializer SERIALIZER = new KryoSerializer();
...@@ -57,4 +62,10 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -57,4 +62,10 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
57 checkState(false, "Must not be here"); 62 checkState(false, "Must not be here");
58 } 63 }
59 } 64 }
65 +
66 + @Override
67 + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
68 + log.error("Exception inside channel handling pipeline.", cause);
69 + context.close();
70 + }
60 } 71 }
......
1 package org.onlab.netty; 1 package org.onlab.netty;
2 2
3 +import org.slf4j.Logger;
4 +import org.slf4j.LoggerFactory;
5 +
3 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.ByteBuf;
4 import io.netty.channel.ChannelHandler.Sharable; 7 import io.netty.channel.ChannelHandler.Sharable;
5 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelHandlerContext;
...@@ -11,6 +14,8 @@ import io.netty.handler.codec.MessageToByteEncoder; ...@@ -11,6 +14,8 @@ import io.netty.handler.codec.MessageToByteEncoder;
11 @Sharable 14 @Sharable
12 public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { 15 public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
13 16
17 + private final Logger log = LoggerFactory.getLogger(getClass());
18 +
14 // onosiscool in ascii 19 // onosiscool in ascii
15 public static final byte[] PREAMBLE = "onosiscool".getBytes(); 20 public static final byte[] PREAMBLE = "onosiscool".getBytes();
16 public static final int HEADER_VERSION = 1; 21 public static final int HEADER_VERSION = 1;
...@@ -31,11 +36,6 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { ...@@ -31,11 +36,6 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
31 // write preamble 36 // write preamble
32 out.writeBytes(PREAMBLE); 37 out.writeBytes(PREAMBLE);
33 38
34 - try {
35 - SERIALIZER.encode(message);
36 - } catch (Exception e) {
37 - e.printStackTrace();
38 - }
39 byte[] payload = SERIALIZER.encode(message); 39 byte[] payload = SERIALIZER.encode(message);
40 40
41 // write payload length 41 // write payload length
...@@ -47,4 +47,10 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { ...@@ -47,4 +47,10 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
47 // write payload. 47 // write payload.
48 out.writeBytes(payload); 48 out.writeBytes(payload);
49 } 49 }
50 +
51 + @Override
52 + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
53 + log.error("Exception inside channel handling pipeline.", cause);
54 + context.close();
55 + }
50 } 56 }
......
...@@ -248,6 +248,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -248,6 +248,7 @@ public class NettyMessagingService implements MessagingService {
248 248
249 @Override 249 @Override
250 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { 250 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
251 + log.error("Exception inside channel handling pipeline.", cause);
251 context.close(); 252 context.close();
252 } 253 }
253 } 254 }
......
1 -package org.onlab.netty;
2 -
3 -import java.util.concurrent.TimeUnit;
4 -
5 -import org.onlab.metrics.MetricsComponent;
6 -import org.onlab.metrics.MetricsFeature;
7 -import org.onlab.metrics.MetricsManager;
8 -
9 -import com.codahale.metrics.Timer;
10 -
11 -// FIXME: Should be move out to test or app
12 -public final class SimpleClient {
13 - private SimpleClient() {
14 - }
15 -
16 - public static void main(String... args) throws Exception {
17 - NettyMessagingService messaging = new TestNettyMessagingService(9081);
18 - MetricsManager metrics = new MetricsManager();
19 - messaging.activate();
20 - metrics.activate();
21 - MetricsFeature feature = new MetricsFeature("timers");
22 - MetricsComponent component = metrics.registerComponent("NettyMessaging");
23 - Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
24 - final int warmup = 100;
25 - for (int i = 0; i < warmup; i++) {
26 - Timer.Context context = sendAsyncTimer.time();
27 - messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
28 - context.stop();
29 - }
30 - metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
31 -
32 - Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
33 - final int iterations = 1000000;
34 - for (int i = 0; i < iterations; i++) {
35 - Timer.Context context = sendAndReceiveTimer.time();
36 - Response response = messaging
37 - .sendAndReceive(new Endpoint("localhost", 8080), "echo",
38 - "Hello World".getBytes());
39 - System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
40 - context.stop();
41 - }
42 - metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
43 - }
44 -
45 - public static class TestNettyMessagingService extends NettyMessagingService {
46 - public TestNettyMessagingService(int port) throws Exception {
47 - super(port);
48 - }
49 - }
50 -}
1 -package org.onlab.netty;
2 -
3 -//FIXME: Should be move out to test or app
4 -public final class SimpleServer {
5 - private SimpleServer() {}
6 -
7 - public static void main(String... args) throws Exception {
8 - NettyMessagingService server = new NettyMessagingService(8080);
9 - server.activate();
10 - server.registerHandler("simple", new LoggingHandler());
11 - server.registerHandler("echo", new EchoHandler());
12 - }
13 -}