Jonathan Hart
Committed by Brian O'Connor

Implemented a PartitionManager to keep track of partitions

assigned to instances.

Also updated GossipIntentStore a little to the new API. This work is not
yet complete.

Change-Id: I64d1779b669de51c35da686b65006a80ac4819b0
...@@ -26,11 +26,14 @@ import org.onlab.util.KryoNamespace; ...@@ -26,11 +26,14 @@ import org.onlab.util.KryoNamespace;
26 import org.onosproject.cluster.ClusterService; 26 import org.onosproject.cluster.ClusterService;
27 import org.onosproject.net.intent.BatchWrite; 27 import org.onosproject.net.intent.BatchWrite;
28 import org.onosproject.net.intent.Intent; 28 import org.onosproject.net.intent.Intent;
29 +import org.onosproject.net.intent.IntentData;
29 import org.onosproject.net.intent.IntentEvent; 30 import org.onosproject.net.intent.IntentEvent;
30 import org.onosproject.net.intent.IntentId; 31 import org.onosproject.net.intent.IntentId;
32 +import org.onosproject.net.intent.IntentOperation;
31 import org.onosproject.net.intent.IntentState; 33 import org.onosproject.net.intent.IntentState;
32 import org.onosproject.net.intent.IntentStore; 34 import org.onosproject.net.intent.IntentStore;
33 import org.onosproject.net.intent.IntentStoreDelegate; 35 import org.onosproject.net.intent.IntentStoreDelegate;
36 +import org.onosproject.net.intent.Key;
34 import org.onosproject.store.AbstractStore; 37 import org.onosproject.store.AbstractStore;
35 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 38 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36 import org.onosproject.store.impl.EventuallyConsistentMap; 39 import org.onosproject.store.impl.EventuallyConsistentMap;
...@@ -66,12 +69,18 @@ public class GossipIntentStore ...@@ -66,12 +69,18 @@ public class GossipIntentStore
66 69
67 private EventuallyConsistentMap<IntentId, List<Intent>> installables; 70 private EventuallyConsistentMap<IntentId, List<Intent>> installables;
68 71
72 + // Map of intent key => pending intent operation
73 + private EventuallyConsistentMap<String, IntentOperation> pending;
74 +
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected ClusterCommunicationService clusterCommunicator; 76 protected ClusterCommunicationService clusterCommunicator;
71 77
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected ClusterService clusterService; 79 protected ClusterService clusterService;
74 80
81 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 + protected PartitionService partitionService;
83 +
75 @Activate 84 @Activate
76 public void activate() { 85 public void activate() {
77 KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder() 86 KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
...@@ -93,16 +102,25 @@ public class GossipIntentStore ...@@ -93,16 +102,25 @@ public class GossipIntentStore
93 intentSerializer, 102 intentSerializer,
94 new WallclockClockManager<>()); 103 new WallclockClockManager<>());
95 104
105 + pending = new EventuallyConsistentMapImpl<>("intent-pending",
106 + clusterService,
107 + clusterCommunicator,
108 + intentSerializer, // TODO
109 + new WallclockClockManager<>());
110 +
96 intentStates.addListener(new InternalIntentStatesListener()); 111 intentStates.addListener(new InternalIntentStatesListener());
112 + pending.addListener(new InternalPendingListener());
97 113
98 log.info("Started"); 114 log.info("Started");
99 } 115 }
100 116
101 @Deactivate 117 @Deactivate
102 public void deactivate() { 118 public void deactivate() {
119 +
103 intents.destroy(); 120 intents.destroy();
104 intentStates.destroy(); 121 intentStates.destroy();
105 installables.destroy(); 122 installables.destroy();
123 + pending.destroy();
106 124
107 log.info("Stopped"); 125 log.info("Stopped");
108 } 126 }
...@@ -148,6 +166,9 @@ public class GossipIntentStore ...@@ -148,6 +166,9 @@ public class GossipIntentStore
148 intents.put(intent.id(), intent); 166 intents.put(intent.id(), intent);
149 intentStates.put(intent.id(), INSTALL_REQ); 167 intentStates.put(intent.id(), INSTALL_REQ);
150 168
169 + // TODO remove from pending?
170 +
171 +
151 break; 172 break;
152 case REMOVE_INTENT: 173 case REMOVE_INTENT:
153 checkArgument(op.args().size() == 1, 174 checkArgument(op.args().size() == 1,
...@@ -193,6 +214,41 @@ public class GossipIntentStore ...@@ -193,6 +214,41 @@ public class GossipIntentStore
193 return failed; 214 return failed;
194 } 215 }
195 216
217 + @Override
218 + public void write(IntentData newData) {
219 + // TODO
220 + }
221 +
222 + @Override
223 + public void batchWrite(Iterable<IntentData> updates) {
224 + // TODO
225 + }
226 +
227 + @Override
228 + public Intent getIntent(Key key) {
229 + return null; // TODO
230 + }
231 +
232 + @Override
233 + public IntentData getIntentData(Key key) {
234 + return null; // TODO
235 + }
236 +
237 + @Override
238 + public void addPending(IntentData data) {
239 + // TODO implement
240 +
241 + // Check the intent versions
242 + //pending.put(op.key(), op);
243 + }
244 +
245 + @Override
246 + public boolean isMaster(Intent intent) {
247 + // TODO
248 + //return partitionService.isMine(intent.key());
249 + return false;
250 + }
251 +
196 private void notifyDelegateIfNotNull(IntentEvent event) { 252 private void notifyDelegateIfNotNull(IntentEvent event) {
197 if (event != null) { 253 if (event != null) {
198 notifyDelegate(event); 254 notifyDelegate(event);
...@@ -219,5 +275,22 @@ public class GossipIntentStore ...@@ -219,5 +275,22 @@ public class GossipIntentStore
219 } 275 }
220 } 276 }
221 277
278 + private final class InternalPendingListener implements
279 + EventuallyConsistentMapListener<String, IntentOperation> {
280 + @Override
281 + public void event(
282 + EventuallyConsistentMapEvent<String, IntentOperation> event) {
283 + if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
284 + // The pending intents map has been updated. If we are master for
285 + // this intent's partition, notify the Manager that they should do
286 + // some work.
287 + if (isMaster(event.value().intent())) {
288 + // TODO delegate.process(event.value());
289 + log.debug("implement this");
290 + }
291 + }
292 + }
293 + }
294 +
222 } 295 }
223 296
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.intent.impl;
17 +
18 +import com.google.common.base.MoreObjects;
19 +
20 +import java.util.Objects;
21 +
22 +/**
23 + * Identifies a partition of the intent keyspace which will be assigned to and
24 + * processed by a single ONOS instance at a time.
25 + */
26 +public class PartitionId {
27 + private final int id;
28 +
29 + /**
30 + * Creates a new partition ID.
31 + *
32 + * @param id the partition ID
33 + */
34 + PartitionId(int id) {
35 + this.id = id;
36 + }
37 +
38 + @Override
39 + public boolean equals(Object o) {
40 + if (!(o instanceof PartitionId)) {
41 + return false;
42 + }
43 +
44 + PartitionId that = (PartitionId) o;
45 + return Objects.equals(this.id, that.id);
46 + }
47 +
48 + @Override
49 + public int hashCode() {
50 + return Objects.hash(id);
51 + }
52 +
53 + @Override
54 + public String toString() {
55 + return MoreObjects.toStringHelper(getClass())
56 + .add("partition ID", id)
57 + .toString();
58 + }
59 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.intent.impl;
17 +
18 +import org.apache.felix.scr.annotations.Activate;
19 +import org.apache.felix.scr.annotations.Component;
20 +import org.apache.felix.scr.annotations.Deactivate;
21 +import org.apache.felix.scr.annotations.Reference;
22 +import org.apache.felix.scr.annotations.ReferenceCardinality;
23 +import org.apache.felix.scr.annotations.Service;
24 +import org.onosproject.cluster.ClusterService;
25 +import org.onosproject.cluster.Leadership;
26 +import org.onosproject.cluster.LeadershipEvent;
27 +import org.onosproject.cluster.LeadershipEventListener;
28 +import org.onosproject.cluster.LeadershipService;
29 +import org.slf4j.Logger;
30 +import org.slf4j.LoggerFactory;
31 +
32 +import java.util.Collections;
33 +import java.util.Set;
34 +import java.util.concurrent.ConcurrentHashMap;
35 +
36 +import static com.google.common.base.Preconditions.checkNotNull;
37 +
38 +/**
39 + * Manages the assignment of intent keyspace partitions to instances.
40 + */
41 +@Component(immediate = true)
42 +@Service
43 +public class PartitionManager implements PartitionService {
44 +
45 + private static final Logger log = LoggerFactory.getLogger(PartitionManager.class);
46 +
47 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
48 + protected LeadershipService leadershipService;
49 +
50 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
51 + protected ClusterService clusterService;
52 +
53 + // TODO make configurable
54 + private static final int NUM_PARTITIONS = 100;
55 +
56 + private static final String ELECTION_PREFIX = "intent-partition-";
57 +
58 + private LeadershipEventListener leaderListener = new InternalLeadershipListener();
59 +
60 + private Set<PartitionId> myPartitions;
61 +
62 + @Activate
63 + public void activate() {
64 + myPartitions = Collections.newSetFromMap(new ConcurrentHashMap<>());
65 +
66 + leadershipService.addListener(leaderListener);
67 +
68 + for (int i = 0; i < NUM_PARTITIONS; i++) {
69 + leadershipService.runForLeadership(ELECTION_PREFIX + i);
70 + }
71 + }
72 +
73 + @Deactivate
74 + public void deactivate() {
75 + leadershipService.removeListener(leaderListener);
76 + }
77 +
78 + private PartitionId getPartitionForKey(String intentKey) {
79 + return new PartitionId(intentKey.hashCode() % NUM_PARTITIONS);
80 + }
81 +
82 + @Override
83 + public boolean isMine(String intentKey) {
84 + return checkNotNull(
85 + myPartitions.contains(getPartitionForKey(intentKey)));
86 + }
87 +
88 + private final class InternalLeadershipListener implements LeadershipEventListener {
89 +
90 + @Override
91 + public void event(LeadershipEvent event) {
92 + Leadership leadership = event.subject();
93 + // update internal state about which partitions I'm leader of
94 + if (leadership.leader().equals(clusterService.getLocalNode().id()) &&
95 + leadership.topic().startsWith(ELECTION_PREFIX)) {
96 +
97 + // Parse out the partition ID
98 + String[] splitted = leadership.topic().split("-");
99 + if (splitted.length != 3) {
100 + log.warn("Couldn't parse leader election topic {}", leadership.topic());
101 + return;
102 + }
103 +
104 + int partitionId;
105 + try {
106 + partitionId = Integer.parseInt(splitted[2]);
107 + } catch (NumberFormatException e) {
108 + log.warn("Couldn't parse partition ID {}", splitted[2]);
109 + return;
110 + }
111 +
112 + if (event.type() == LeadershipEvent.Type.LEADER_ELECTED) {
113 + myPartitions.add(new PartitionId(partitionId));
114 + } else if (event.type() == LeadershipEvent.Type.LEADER_BOOTED) {
115 + myPartitions.remove(new PartitionId(partitionId));
116 + }
117 + }
118 +
119 + }
120 + }
121 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.intent.impl;
17 +
18 +/**
19 + * Service for interacting with the partition-to-instance assignments.
20 + */
21 +public interface PartitionService {
22 +
23 + /**
24 + * Returns whether the given intent key is in a partition owned by this
25 + * instance or not.
26 + *
27 + * @param intentKey intent key to query
28 + * @return true if the key is owned by this instance, otherwise false
29 + */
30 + boolean isMine(String intentKey);
31 +
32 + // TODO add API for rebalancing partitions
33 +}