Jonathan Hart
Committed by Brian O'Connor

Finished implementation of GossipIntentStore based on new API and semantics.

Change-Id: I1a71d075e5d34ab7b9f7c2533d389235d6da1d9a
...@@ -48,7 +48,7 @@ public class Key { ...@@ -48,7 +48,7 @@ public class Key {
48 return new LongKey(key, appId); 48 return new LongKey(key, appId);
49 } 49 }
50 50
51 - private static final class StringKey extends Key { 51 + public static final class StringKey extends Key {
52 52
53 private final ApplicationId appId; 53 private final ApplicationId appId;
54 private final String key; 54 private final String key;
...@@ -87,7 +87,7 @@ public class Key { ...@@ -87,7 +87,7 @@ public class Key {
87 } 87 }
88 } 88 }
89 89
90 - private static final class LongKey extends Key { 90 + public static final class LongKey extends Key {
91 91
92 private final ApplicationId appId; 92 private final ApplicationId appId;
93 private final long key; 93 private final long key;
......
...@@ -199,7 +199,7 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -199,7 +199,7 @@ public class HazelcastLeadershipService implements LeadershipService,
199 LeadershipEvent leadershipEvent = 199 LeadershipEvent leadershipEvent =
200 SERIALIZER.decode(message.getMessageObject()); 200 SERIALIZER.decode(message.getMessageObject());
201 201
202 - log.debug("Leadership Event: time = {} type = {} event = {}", 202 + log.trace("Leadership Event: time = {} type = {} event = {}",
203 leadershipEvent.time(), leadershipEvent.type(), 203 leadershipEvent.time(), leadershipEvent.type(),
204 leadershipEvent); 204 leadershipEvent);
205 205
......
...@@ -18,15 +18,18 @@ package org.onosproject.store.impl; ...@@ -18,15 +18,18 @@ package org.onosproject.store.impl;
18 import org.onosproject.store.Timestamp; 18 import org.onosproject.store.Timestamp;
19 19
20 /** 20 /**
21 - * Clock service that can generate timestamps per object. 21 + * Clock service that can generate timestamps based off of two input objects.
22 + * Implementations are free to only take one or none of the objects into account
23 + * when generating timestamps.
22 */ 24 */
23 -public interface ClockService<T> { 25 +public interface ClockService<T, U> {
24 26
25 /** 27 /**
26 - * Gets a new timestamp for the given object. 28 + * Gets a new timestamp for the given objects.
27 * 29 *
28 - * @param object Object to get a timestamp for 30 + * @param object1 First object to use when generating timestamps
31 + * @param object2 Second object to use when generating timestamps
29 * @return the new timestamp 32 * @return the new timestamp
30 */ 33 */
31 - public Timestamp getTimestamp(T object); 34 + public Timestamp getTimestamp(T object1, U object2);
32 } 35 }
......
...@@ -111,6 +111,27 @@ public interface EventuallyConsistentMap<K, V> { ...@@ -111,6 +111,27 @@ public interface EventuallyConsistentMap<K, V> {
111 public void remove(K key); 111 public void remove(K key);
112 112
113 /** 113 /**
114 + * Removes the given key-value mapping from the map, if it exists.
115 + * <p>
116 + * This actually means remove any values up to and including the timestamp
117 + * given by {@link org.onosproject.store.impl.ClockService#getTimestamp(Object, Object)}.
118 + * Any mappings that produce an earlier timestamp than this given key-value
119 + * pair will be removed, and any mappings that produce a later timestamp
120 + * will supersede this remove.
121 + * </p><p>
122 + * Note: this differs from the specification of {@link java.util.Map}
123 + * because it does not return a boolean indication whether a value was removed.
124 + * Clients are expected to register an
125 + * {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if
126 + * they are interested in receiving notification of updates to the map.
127 + * </p>
128 + *
129 + * @param key the key to remove the mapping for
130 + * @param value the value mapped to the key
131 + */
132 + public void remove(K key, V value);
133 +
134 + /**
114 * Adds mappings for all key-value pairs in the specified map to this map. 135 * Adds mappings for all key-value pairs in the specified map to this map.
115 * <p> 136 * <p>
116 * This will be more efficient in communication than calling individual put 137 * This will be more efficient in communication than calling individual put
......
...@@ -69,7 +69,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -69,7 +69,7 @@ public class EventuallyConsistentMapImpl<K, V>
69 private final ClusterCommunicationService clusterCommunicator; 69 private final ClusterCommunicationService clusterCommunicator;
70 private final KryoSerializer serializer; 70 private final KryoSerializer serializer;
71 71
72 - private final ClockService<K> clockService; 72 + private final ClockService<K, V> clockService;
73 73
74 private final MessageSubject updateMessageSubject; 74 private final MessageSubject updateMessageSubject;
75 private final MessageSubject removeMessageSubject; 75 private final MessageSubject removeMessageSubject;
...@@ -126,7 +126,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -126,7 +126,7 @@ public class EventuallyConsistentMapImpl<K, V>
126 ClusterService clusterService, 126 ClusterService clusterService,
127 ClusterCommunicationService clusterCommunicator, 127 ClusterCommunicationService clusterCommunicator,
128 KryoNamespace.Builder serializerBuilder, 128 KryoNamespace.Builder serializerBuilder,
129 - ClockService<K> clockService) { 129 + ClockService<K, V> clockService) {
130 130
131 this.mapName = checkNotNull(mapName); 131 this.mapName = checkNotNull(mapName);
132 this.clusterService = checkNotNull(clusterService); 132 this.clusterService = checkNotNull(clusterService);
...@@ -227,7 +227,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -227,7 +227,8 @@ public class EventuallyConsistentMapImpl<K, V>
227 checkNotNull(key, ERROR_NULL_KEY); 227 checkNotNull(key, ERROR_NULL_KEY);
228 checkNotNull(value, ERROR_NULL_VALUE); 228 checkNotNull(value, ERROR_NULL_VALUE);
229 229
230 - Timestamp timestamp = clockService.getTimestamp(key); 230 + Timestamp timestamp = clockService.getTimestamp(key, value);
231 +
231 if (putInternal(key, value, timestamp)) { 232 if (putInternal(key, value, timestamp)) {
232 notifyPeers(new InternalPutEvent<>(key, value, timestamp)); 233 notifyPeers(new InternalPutEvent<>(key, value, timestamp));
233 EventuallyConsistentMapEvent<K, V> externalEvent 234 EventuallyConsistentMapEvent<K, V> externalEvent
...@@ -260,7 +261,9 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -260,7 +261,9 @@ public class EventuallyConsistentMapImpl<K, V>
260 checkState(!destroyed, mapName + ERROR_DESTROYED); 261 checkState(!destroyed, mapName + ERROR_DESTROYED);
261 checkNotNull(key, ERROR_NULL_KEY); 262 checkNotNull(key, ERROR_NULL_KEY);
262 263
263 - Timestamp timestamp = clockService.getTimestamp(key); 264 + // TODO prevent calls here if value is important for timestamp
265 + Timestamp timestamp = clockService.getTimestamp(key, null);
266 +
264 if (removeInternal(key, timestamp)) { 267 if (removeInternal(key, timestamp)) {
265 notifyPeers(new InternalRemoveEvent<>(key, timestamp)); 268 notifyPeers(new InternalRemoveEvent<>(key, timestamp));
266 EventuallyConsistentMapEvent<K, V> externalEvent 269 EventuallyConsistentMapEvent<K, V> externalEvent
...@@ -283,6 +286,23 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -283,6 +286,23 @@ public class EventuallyConsistentMapImpl<K, V>
283 } 286 }
284 287
285 @Override 288 @Override
289 + public void remove(K key, V value) {
290 + checkState(!destroyed, mapName + ERROR_DESTROYED);
291 + checkNotNull(key, ERROR_NULL_KEY);
292 + checkNotNull(value, ERROR_NULL_VALUE);
293 +
294 + Timestamp timestamp = clockService.getTimestamp(key, value);
295 +
296 + if (removeInternal(key, timestamp)) {
297 + notifyPeers(new InternalRemoveEvent<>(key, timestamp));
298 + EventuallyConsistentMapEvent<K, V> externalEvent
299 + = new EventuallyConsistentMapEvent<>(
300 + EventuallyConsistentMapEvent.Type.REMOVE, key, value);
301 + notifyListeners(externalEvent);
302 + }
303 + }
304 +
305 + @Override
286 public void putAll(Map<? extends K, ? extends V> m) { 306 public void putAll(Map<? extends K, ? extends V> m) {
287 checkState(!destroyed, mapName + ERROR_DESTROYED); 307 checkState(!destroyed, mapName + ERROR_DESTROYED);
288 308
...@@ -295,7 +315,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -295,7 +315,7 @@ public class EventuallyConsistentMapImpl<K, V>
295 checkNotNull(key, ERROR_NULL_KEY); 315 checkNotNull(key, ERROR_NULL_KEY);
296 checkNotNull(value, ERROR_NULL_VALUE); 316 checkNotNull(value, ERROR_NULL_VALUE);
297 317
298 - Timestamp timestamp = clockService.getTimestamp(entry.getKey()); 318 + Timestamp timestamp = clockService.getTimestamp(key, value);
299 319
300 if (putInternal(key, value, timestamp)) { 320 if (putInternal(key, value, timestamp)) {
301 updates.add(new PutEntry<>(key, value, timestamp)); 321 updates.add(new PutEntry<>(key, value, timestamp));
...@@ -306,7 +326,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -306,7 +326,8 @@ public class EventuallyConsistentMapImpl<K, V>
306 notifyPeers(new InternalPutEvent<>(updates)); 326 notifyPeers(new InternalPutEvent<>(updates));
307 327
308 for (PutEntry<K, V> entry : updates) { 328 for (PutEntry<K, V> entry : updates) {
309 - EventuallyConsistentMapEvent<K, V> externalEvent = new EventuallyConsistentMapEvent<>( 329 + EventuallyConsistentMapEvent<K, V> externalEvent =
330 + new EventuallyConsistentMapEvent<>(
310 EventuallyConsistentMapEvent.Type.PUT, entry.key(), 331 EventuallyConsistentMapEvent.Type.PUT, entry.key(),
311 entry.value()); 332 entry.value());
312 notifyListeners(externalEvent); 333 notifyListeners(externalEvent);
...@@ -321,7 +342,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -321,7 +342,8 @@ public class EventuallyConsistentMapImpl<K, V>
321 List<RemoveEntry<K>> removed = new ArrayList<>(items.size()); 342 List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
322 343
323 for (K key : items.keySet()) { 344 for (K key : items.keySet()) {
324 - Timestamp timestamp = clockService.getTimestamp(key); 345 + // TODO also this is not applicable if value is important for timestamp?
346 + Timestamp timestamp = clockService.getTimestamp(key, null);
325 347
326 if (removeInternal(key, timestamp)) { 348 if (removeInternal(key, timestamp)) {
327 removed.add(new RemoveEntry<>(key, timestamp)); 349 removed.add(new RemoveEntry<>(key, timestamp));
...@@ -565,7 +587,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -565,7 +587,8 @@ public class EventuallyConsistentMapImpl<K, V>
565 // Send all updates to the peer at once 587 // Send all updates to the peer at once
566 if (!updatesToSend.isEmpty()) { 588 if (!updatesToSend.isEmpty()) {
567 try { 589 try {
568 - unicastMessage(sender, updateMessageSubject, new InternalPutEvent<>(updatesToSend)); 590 + unicastMessage(sender, updateMessageSubject,
591 + new InternalPutEvent<>(updatesToSend));
569 } catch (IOException e) { 592 } catch (IOException e) {
570 log.warn("Failed to send advertisement response", e); 593 log.warn("Failed to send advertisement response", e);
571 } 594 }
...@@ -603,7 +626,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -603,7 +626,8 @@ public class EventuallyConsistentMapImpl<K, V>
603 // Send all removes to the peer at once 626 // Send all removes to the peer at once
604 if (!removesToSend.isEmpty()) { 627 if (!removesToSend.isEmpty()) {
605 try { 628 try {
606 - unicastMessage(sender, removeMessageSubject, new InternalRemoveEvent<>(removesToSend)); 629 + unicastMessage(sender, removeMessageSubject,
630 + new InternalRemoveEvent<>(removesToSend));
607 } catch (IOException e) { 631 } catch (IOException e) {
608 log.warn("Failed to send advertisement response", e); 632 log.warn("Failed to send advertisement response", e);
609 } 633 }
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.impl;
17 +
18 +import com.google.common.base.MoreObjects;
19 +import com.google.common.collect.ComparisonChain;
20 +import org.onosproject.store.Timestamp;
21 +
22 +import java.util.Objects;
23 +
24 +import static com.google.common.base.Preconditions.checkArgument;
25 +
26 +/**
27 + * A logical timestamp that derives its value from two input values. Value1
28 + * always takes precedence over value2 when comparing timestamps.
29 + */
30 +public class MultiValuedTimestamp implements Timestamp {
31 +
32 + private final Timestamp timestamp;
33 + private final long value2;
34 +
35 + /**
36 + * Creates a new timestamp based on two values. The first value has higher
37 + * precedence than the second when comparing timestamps.
38 + *
39 + * @param timestamp first value
40 + * @param value2 second value
41 + */
42 + public MultiValuedTimestamp(Timestamp timestamp, long value2) {
43 + this.timestamp = timestamp;
44 + this.value2 = value2;
45 + }
46 +
47 + @Override
48 + public int compareTo(Timestamp o) {
49 + checkArgument(o instanceof MultiValuedTimestamp,
50 + "Must be MultiValuedTimestamp", o);
51 + MultiValuedTimestamp that = (MultiValuedTimestamp) o;
52 +
53 + return ComparisonChain.start()
54 + .compare(this.timestamp, that.timestamp)
55 + .compare(this.value2, that.value2)
56 + .result();
57 + }
58 +
59 + @Override
60 + public int hashCode() {
61 + return Objects.hash(timestamp, value2);
62 + }
63 +
64 + @Override
65 + public boolean equals(Object obj) {
66 + if (this == obj) {
67 + return true;
68 + }
69 + if (!(obj instanceof MultiValuedTimestamp)) {
70 + return false;
71 + }
72 + MultiValuedTimestamp that = (MultiValuedTimestamp) obj;
73 + return Objects.equals(this.timestamp, that.timestamp) &&
74 + Objects.equals(this.value2, that.value2);
75 + }
76 +
77 + @Override
78 + public String toString() {
79 + return MoreObjects.toStringHelper(getClass())
80 + .add("timestamp", timestamp)
81 + .add("value2", value2)
82 + .toString();
83 + }
84 +
85 + /**
86 + * Returns the first value.
87 + *
88 + * @return first value
89 + */
90 + public Timestamp timestamp() {
91 + return timestamp;
92 + }
93 +
94 + /**
95 + * Returns the second value.
96 + *
97 + * @return second value
98 + */
99 + public long sequenceNumber() {
100 + return value2;
101 + }
102 +
103 + // Default constructor for serialization
104 + @SuppressWarnings("unused")
105 + private MultiValuedTimestamp() {
106 + this.timestamp = null;
107 + this.value2 = -1;
108 + }
109 +}
...@@ -20,9 +20,9 @@ import org.onosproject.store.Timestamp; ...@@ -20,9 +20,9 @@ import org.onosproject.store.Timestamp;
20 /** 20 /**
21 * A clock service which hands out wallclock-based timestamps. 21 * A clock service which hands out wallclock-based timestamps.
22 */ 22 */
23 -public class WallclockClockManager<T> implements ClockService<T> { 23 +public class WallclockClockManager<T, U> implements ClockService<T, U> {
24 @Override 24 @Override
25 - public Timestamp getTimestamp(T object) { 25 + public Timestamp getTimestamp(T object1, U object2) {
26 return new WallClockTimestamp(); 26 return new WallClockTimestamp();
27 } 27 }
28 } 28 }
......
...@@ -37,7 +37,8 @@ import org.onosproject.store.impl.EventuallyConsistentMap; ...@@ -37,7 +37,8 @@ import org.onosproject.store.impl.EventuallyConsistentMap;
37 import org.onosproject.store.impl.EventuallyConsistentMapEvent; 37 import org.onosproject.store.impl.EventuallyConsistentMapEvent;
38 import org.onosproject.store.impl.EventuallyConsistentMapImpl; 38 import org.onosproject.store.impl.EventuallyConsistentMapImpl;
39 import org.onosproject.store.impl.EventuallyConsistentMapListener; 39 import org.onosproject.store.impl.EventuallyConsistentMapListener;
40 -import org.onosproject.store.impl.WallclockClockManager; 40 +import org.onosproject.store.impl.MultiValuedTimestamp;
41 +import org.onosproject.store.impl.SystemClockTimestamp;
41 import org.onosproject.store.serializers.KryoNamespaces; 42 import org.onosproject.store.serializers.KryoNamespaces;
42 import org.slf4j.Logger; 43 import org.slf4j.Logger;
43 44
...@@ -58,12 +59,6 @@ public class GossipIntentStore ...@@ -58,12 +59,6 @@ public class GossipIntentStore
58 59
59 private final Logger log = getLogger(getClass()); 60 private final Logger log = getLogger(getClass());
60 61
61 - /*private EventuallyConsistentMap<IntentId, Intent> intents;
62 -
63 - private EventuallyConsistentMap<IntentId, IntentState> intentStates;
64 -
65 - private EventuallyConsistentMap<IntentId, List<Intent>> installables;*/
66 -
67 // Map of intent key => current intent state 62 // Map of intent key => current intent state
68 private EventuallyConsistentMap<Key, IntentData> currentState; 63 private EventuallyConsistentMap<Key, IntentData> currentState;
69 64
...@@ -82,36 +77,22 @@ public class GossipIntentStore ...@@ -82,36 +77,22 @@ public class GossipIntentStore
82 @Activate 77 @Activate
83 public void activate() { 78 public void activate() {
84 KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder() 79 KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
85 - .register(KryoNamespaces.API); 80 + .register(KryoNamespaces.API)
86 - /*intents = new EventuallyConsistentMapImpl<>("intents", clusterService, 81 + .register(IntentData.class)
87 - clusterCommunicator, 82 + .register(MultiValuedTimestamp.class)
88 - intentSerializer, 83 + .register(SystemClockTimestamp.class);
89 - new WallclockClockManager<>());
90 -
91 - intentStates = new EventuallyConsistentMapImpl<>("intent-states",
92 - clusterService,
93 - clusterCommunicator,
94 - intentSerializer,
95 - new WallclockClockManager<>());
96 -
97 - installables = new EventuallyConsistentMapImpl<>("intent-installables",
98 - clusterService,
99 - clusterCommunicator,
100 - intentSerializer,
101 - new WallclockClockManager<>());
102 - */
103 84
104 currentState = new EventuallyConsistentMapImpl<>("intent-current", 85 currentState = new EventuallyConsistentMapImpl<>("intent-current",
105 clusterService, 86 clusterService,
106 clusterCommunicator, 87 clusterCommunicator,
107 intentSerializer, 88 intentSerializer,
108 - new WallclockClockManager<>()); 89 + new IntentDataLogicalClockManager<>());
109 90
110 pending = new EventuallyConsistentMapImpl<>("intent-pending", 91 pending = new EventuallyConsistentMapImpl<>("intent-pending",
111 clusterService, 92 clusterService,
112 clusterCommunicator, 93 clusterCommunicator,
113 intentSerializer, // TODO 94 intentSerializer, // TODO
114 - new WallclockClockManager<>()); 95 + new IntentDataClockManager<>());
115 96
116 currentState.addListener(new InternalIntentStatesListener()); 97 currentState.addListener(new InternalIntentStatesListener());
117 pending.addListener(new InternalPendingListener()); 98 pending.addListener(new InternalPendingListener());
...@@ -121,10 +102,6 @@ public class GossipIntentStore ...@@ -121,10 +102,6 @@ public class GossipIntentStore
121 102
122 @Deactivate 103 @Deactivate
123 public void deactivate() { 104 public void deactivate() {
124 -
125 - /*intents.destroy();
126 - intentStates.destroy();
127 - installables.destroy();*/
128 currentState.destroy(); 105 currentState.destroy();
129 pending.destroy(); 106 pending.destroy();
130 107
...@@ -133,7 +110,6 @@ public class GossipIntentStore ...@@ -133,7 +110,6 @@ public class GossipIntentStore
133 110
134 @Override 111 @Override
135 public long getIntentCount() { 112 public long getIntentCount() {
136 - //return intents.size();
137 return currentState.size(); 113 return currentState.size();
138 } 114 }
139 115
...@@ -146,99 +122,45 @@ public class GossipIntentStore ...@@ -146,99 +122,45 @@ public class GossipIntentStore
146 122
147 @Override 123 @Override
148 public IntentState getIntentState(Key intentKey) { 124 public IntentState getIntentState(Key intentKey) {
149 - // TODO: implement this 125 + IntentData data = currentState.get(intentKey);
150 - return IntentState.FAILED; 126 + if (data != null) {
127 + return data.state();
128 + }
129 + return null;
151 } 130 }
152 131
153 @Override 132 @Override
154 public List<Intent> getInstallableIntents(Key intentKey) { 133 public List<Intent> getInstallableIntents(Key intentKey) {
155 - // TODO: implement this or delete class 134 + IntentData data = currentState.get(intentKey);
135 + if (data != null) {
136 + return data.installables();
137 + }
156 return null; 138 return null;
157 - /*
158 - return installables.get(intentId);
159 - */
160 } 139 }
161 140
162 @Override 141 @Override
163 public List<BatchWrite.Operation> batchWrite(BatchWrite batch) { 142 public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
164 - /* 143 + // Deprecated
165 - List<BatchWrite.Operation> failed = new ArrayList<>();
166 -
167 - for (BatchWrite.Operation op : batch.operations()) {
168 - switch (op.type()) {
169 - case CREATE_INTENT:
170 - checkArgument(op.args().size() == 1,
171 - "CREATE_INTENT takes 1 argument. %s", op);
172 - Intent intent = op.arg(0);
173 -
174 - intents.put(intent.id(), intent);
175 - intentStates.put(intent.id(), INSTALL_REQ);
176 -
177 - // TODO remove from pending?
178 -
179 -
180 - break;
181 - case REMOVE_INTENT:
182 - checkArgument(op.args().size() == 1,
183 - "REMOVE_INTENT takes 1 argument. %s", op);
184 - IntentId intentId = op.arg(0);
185 -
186 - intents.remove(intentId);
187 - intentStates.remove(intentId);
188 - installables.remove(intentId);
189 -
190 - break;
191 - case SET_STATE:
192 - checkArgument(op.args().size() == 2,
193 - "SET_STATE takes 2 arguments. %s", op);
194 - intent = op.arg(0);
195 - IntentState newState = op.arg(1);
196 -
197 - intentStates.put(intent.id(), newState);
198 -
199 - break;
200 - case SET_INSTALLABLE:
201 - checkArgument(op.args().size() == 2,
202 - "SET_INSTALLABLE takes 2 arguments. %s", op);
203 - intentId = op.arg(0);
204 - List<Intent> installableIntents = op.arg(1);
205 -
206 - installables.put(intentId, installableIntents);
207 -
208 - break;
209 - case REMOVE_INSTALLED:
210 - checkArgument(op.args().size() == 1,
211 - "REMOVE_INSTALLED takes 1 argument. %s", op);
212 - intentId = op.arg(0);
213 - installables.remove(intentId);
214 - break;
215 - default:
216 - log.warn("Unknown Operation encountered: {}", op);
217 - failed.add(op);
218 - break;
219 - }
220 - }
221 -
222 - return failed;
223 - */
224 return null; 144 return null;
225 } 145 }
226 146
227 @Override 147 @Override
228 public void write(IntentData newData) { 148 public void write(IntentData newData) {
149 + log.debug("writing intent {}", newData);
150 +
229 // Only the master is modifying the current state. Therefore assume 151 // Only the master is modifying the current state. Therefore assume
230 // this always succeeds 152 // this always succeeds
231 currentState.put(newData.key(), newData); 153 currentState.put(newData.key(), newData);
232 154
233 // if current.put succeeded 155 // if current.put succeeded
234 - //pending.remove(newData.key(), newData); 156 + pending.remove(newData.key(), newData);
235 157
236 - try { 158 + /*try {
237 notifyDelegate(IntentEvent.getEvent(newData)); 159 notifyDelegate(IntentEvent.getEvent(newData));
238 } catch (IllegalArgumentException e) { 160 } catch (IllegalArgumentException e) {
239 //no-op 161 //no-op
240 log.trace("ignore this exception: {}", e); 162 log.trace("ignore this exception: {}", e);
241 - } 163 + }*/
242 } 164 }
243 165
244 @Override 166 @Override
...@@ -262,14 +184,17 @@ public class GossipIntentStore ...@@ -262,14 +184,17 @@ public class GossipIntentStore
262 184
263 @Override 185 @Override
264 public void addPending(IntentData data) { 186 public void addPending(IntentData data) {
187 + log.debug("new call to pending {}", data);
188 + if (data.version() == null) {
189 + log.debug("updating timestamp");
190 + data.setVersion(new SystemClockTimestamp());
191 + }
265 pending.put(data.key(), data); 192 pending.put(data.key(), data);
266 } 193 }
267 194
268 @Override 195 @Override
269 public boolean isMaster(Intent intent) { 196 public boolean isMaster(Intent intent) {
270 - // TODO 197 + return partitionService.isMine(intent.key());
271 - //return partitionService.isMine(intent.key());
272 - return false;
273 } 198 }
274 199
275 private void notifyDelegateIfNotNull(IntentEvent event) { 200 private void notifyDelegateIfNotNull(IntentEvent event) {
...@@ -284,18 +209,16 @@ public class GossipIntentStore ...@@ -284,18 +209,16 @@ public class GossipIntentStore
284 public void event( 209 public void event(
285 EventuallyConsistentMapEvent<Key, IntentData> event) { 210 EventuallyConsistentMapEvent<Key, IntentData> event) {
286 if (event.type() == EventuallyConsistentMapEvent.Type.PUT) { 211 if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
287 - // TODO check event send logic
288 IntentEvent externalEvent; 212 IntentEvent externalEvent;
289 - IntentData intentData = currentState.get(event.key()); // TODO OK if this is null? 213 + IntentData intentData = event.value();
290 214
291 - /*
292 try { 215 try {
293 - externalEvent = IntentEvent.getEvent(event.value(), intent); 216 + externalEvent = IntentEvent.getEvent(intentData.state(), intentData.intent());
294 } catch (IllegalArgumentException e) { 217 } catch (IllegalArgumentException e) {
295 externalEvent = null; 218 externalEvent = null;
296 } 219 }
297 220
298 - notifyDelegateIfNotNull(externalEvent);*/ 221 + notifyDelegateIfNotNull(externalEvent);
299 } 222 }
300 } 223 }
301 } 224 }
...@@ -314,6 +237,13 @@ public class GossipIntentStore ...@@ -314,6 +237,13 @@ public class GossipIntentStore
314 delegate.process(event.value()); 237 delegate.process(event.value());
315 } 238 }
316 } 239 }
240 +
241 + try {
242 + notifyDelegate(IntentEvent.getEvent(event.value()));
243 + } catch (IllegalArgumentException e) {
244 + //no-op
245 + log.trace("ignore this exception: {}", e);
246 + }
317 } 247 }
318 } 248 }
319 } 249 }
......
...@@ -20,11 +20,11 @@ import org.onosproject.store.Timestamp; ...@@ -20,11 +20,11 @@ import org.onosproject.store.Timestamp;
20 import org.onosproject.store.impl.ClockService; 20 import org.onosproject.store.impl.ClockService;
21 21
22 /** 22 /**
23 - * ClockService that generates timestamps based on IntentData versions. 23 + * ClockService that uses IntentData versions as timestamps.
24 */ 24 */
25 -public class IntentDataClockManager implements ClockService<IntentData> { 25 +public class IntentDataClockManager<K> implements ClockService<K, IntentData> {
26 @Override 26 @Override
27 - public Timestamp getTimestamp(IntentData data) { 27 + public Timestamp getTimestamp(K key, IntentData intentData) {
28 - return null; 28 + return intentData.version();
29 } 29 }
30 } 30 }
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.intent.impl;
17 +
18 +import org.onosproject.net.intent.IntentData;
19 +import org.onosproject.store.Timestamp;
20 +import org.onosproject.store.impl.ClockService;
21 +import org.onosproject.store.impl.MultiValuedTimestamp;
22 +
23 +import java.util.concurrent.atomic.AtomicLong;
24 +
25 +/**
26 + * ClockService that generates logical timestamps based on IntentData versions.
27 + */
28 +public class IntentDataLogicalClockManager<K> implements ClockService<K, IntentData> {
29 +
30 + private final AtomicLong sequenceNumber = new AtomicLong(0);
31 +
32 + @Override
33 + public Timestamp getTimestamp(K key, IntentData intentData) {
34 + return new MultiValuedTimestamp(intentData.version(), sequenceNumber.getAndIncrement());
35 + }
36 +}
...@@ -24,14 +24,14 @@ import java.util.Objects; ...@@ -24,14 +24,14 @@ import java.util.Objects;
24 * processed by a single ONOS instance at a time. 24 * processed by a single ONOS instance at a time.
25 */ 25 */
26 public class PartitionId { 26 public class PartitionId {
27 - private final int id; 27 + private final long id;
28 28
29 /** 29 /**
30 * Creates a new partition ID. 30 * Creates a new partition ID.
31 * 31 *
32 * @param id the partition ID 32 * @param id the partition ID
33 */ 33 */
34 - PartitionId(int id) { 34 + PartitionId(long id) {
35 this.id = id; 35 this.id = id;
36 } 36 }
37 37
......
...@@ -26,6 +26,7 @@ import org.onosproject.cluster.Leadership; ...@@ -26,6 +26,7 @@ import org.onosproject.cluster.Leadership;
26 import org.onosproject.cluster.LeadershipEvent; 26 import org.onosproject.cluster.LeadershipEvent;
27 import org.onosproject.cluster.LeadershipEventListener; 27 import org.onosproject.cluster.LeadershipEventListener;
28 import org.onosproject.cluster.LeadershipService; 28 import org.onosproject.cluster.LeadershipService;
29 +import org.onosproject.net.intent.Key;
29 import org.slf4j.Logger; 30 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory; 31 import org.slf4j.LoggerFactory;
31 32
...@@ -33,8 +34,6 @@ import java.util.Collections; ...@@ -33,8 +34,6 @@ import java.util.Collections;
33 import java.util.Set; 34 import java.util.Set;
34 import java.util.concurrent.ConcurrentHashMap; 35 import java.util.concurrent.ConcurrentHashMap;
35 36
36 -import static com.google.common.base.Preconditions.checkNotNull;
37 -
38 /** 37 /**
39 * Manages the assignment of intent keyspace partitions to instances. 38 * Manages the assignment of intent keyspace partitions to instances.
40 */ 39 */
...@@ -75,14 +74,15 @@ public class PartitionManager implements PartitionService { ...@@ -75,14 +74,15 @@ public class PartitionManager implements PartitionService {
75 leadershipService.removeListener(leaderListener); 74 leadershipService.removeListener(leaderListener);
76 } 75 }
77 76
78 - private PartitionId getPartitionForKey(String intentKey) { 77 + private PartitionId getPartitionForKey(Key intentKey) {
79 - return new PartitionId(intentKey.hashCode() % NUM_PARTITIONS); 78 + log.debug("Getting partition for {}: {}", intentKey,
79 + new PartitionId(Math.abs(intentKey.hash()) % NUM_PARTITIONS));
80 + return new PartitionId(Math.abs(intentKey.hash()) % NUM_PARTITIONS);
80 } 81 }
81 82
82 @Override 83 @Override
83 - public boolean isMine(String intentKey) { 84 + public boolean isMine(Key intentKey) {
84 - return checkNotNull( 85 + return myPartitions.contains(getPartitionForKey(intentKey));
85 - myPartitions.contains(getPartitionForKey(intentKey)));
86 } 86 }
87 87
88 private final class InternalLeadershipListener implements LeadershipEventListener { 88 private final class InternalLeadershipListener implements LeadershipEventListener {
...@@ -115,7 +115,6 @@ public class PartitionManager implements PartitionService { ...@@ -115,7 +115,6 @@ public class PartitionManager implements PartitionService {
115 myPartitions.remove(new PartitionId(partitionId)); 115 myPartitions.remove(new PartitionId(partitionId));
116 } 116 }
117 } 117 }
118 -
119 } 118 }
120 } 119 }
121 } 120 }
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
15 */ 15 */
16 package org.onosproject.store.intent.impl; 16 package org.onosproject.store.intent.impl;
17 17
18 +import org.onosproject.net.intent.Key;
19 +
18 /** 20 /**
19 * Service for interacting with the partition-to-instance assignments. 21 * Service for interacting with the partition-to-instance assignments.
20 */ 22 */
...@@ -27,7 +29,7 @@ public interface PartitionService { ...@@ -27,7 +29,7 @@ public interface PartitionService {
27 * @param intentKey intent key to query 29 * @param intentKey intent key to query
28 * @return true if the key is owned by this instance, otherwise false 30 * @return true if the key is owned by this instance, otherwise false
29 */ 31 */
30 - boolean isMine(String intentKey); 32 + boolean isMine(Key intentKey);
31 33
32 // TODO add API for rebalancing partitions 34 // TODO add API for rebalancing partitions
33 } 35 }
......
...@@ -62,7 +62,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -62,7 +62,7 @@ public class EventuallyConsistentMapImplTest {
62 62
63 private ClusterService clusterService; 63 private ClusterService clusterService;
64 private ClusterCommunicationService clusterCommunicator; 64 private ClusterCommunicationService clusterCommunicator;
65 - private SequentialClockService<String> clockService; 65 + private SequentialClockService<String, String> clockService;
66 66
67 private static final String MAP_NAME = "test"; 67 private static final String MAP_NAME = "test";
68 private static final MessageSubject PUT_MESSAGE_SUBJECT 68 private static final MessageSubject PUT_MESSAGE_SUBJECT
...@@ -222,7 +222,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -222,7 +222,7 @@ public class EventuallyConsistentMapImplTest {
222 222
223 // Remote put 223 // Remote put
224 ClusterMessage message 224 ClusterMessage message
225 - = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2)); 225 + = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2));
226 226
227 // Create a latch so we know when the put operation has finished 227 // Create a latch so we know when the put operation has finished
228 latch = new CountDownLatch(1); 228 latch = new CountDownLatch(1);
...@@ -240,7 +240,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -240,7 +240,7 @@ public class EventuallyConsistentMapImplTest {
240 240
241 // Remote remove 241 // Remote remove
242 ClusterMessage removeMessage 242 ClusterMessage removeMessage
243 - = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1)); 243 + = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
244 244
245 // Create a latch so we know when the remove operation has finished 245 // Create a latch so we know when the remove operation has finished
246 latch = new CountDownLatch(1); 246 latch = new CountDownLatch(1);
...@@ -731,14 +731,15 @@ public class EventuallyConsistentMapImplTest { ...@@ -731,14 +731,15 @@ public class EventuallyConsistentMapImplTest {
731 * to give out timestamps from the past. 731 * to give out timestamps from the past.
732 * 732 *
733 * @param <T> Type that the clock service will give out timestamps for 733 * @param <T> Type that the clock service will give out timestamps for
734 + * @param <U> Second type that the clock service will give out values for
734 */ 735 */
735 - private class SequentialClockService<T> implements ClockService<T> { 736 + private class SequentialClockService<T, U> implements ClockService<T, U> {
736 737
737 private static final long INITIAL_VALUE = 1; 738 private static final long INITIAL_VALUE = 1;
738 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE); 739 private final AtomicLong counter = new AtomicLong(INITIAL_VALUE);
739 740
740 @Override 741 @Override
741 - public Timestamp getTimestamp(T object) { 742 + public Timestamp getTimestamp(T object, U object2) {
742 return new TestTimestamp(counter.getAndIncrement()); 743 return new TestTimestamp(counter.getAndIncrement());
743 } 744 }
744 745
...@@ -748,7 +749,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -748,7 +749,7 @@ public class EventuallyConsistentMapImplTest {
748 * still allowing the CUT to get the same timestamp. 749 * still allowing the CUT to get the same timestamp.
749 * 750 *
750 * @return timestamp equal to the timestamp that will be returned by the 751 * @return timestamp equal to the timestamp that will be returned by the
751 - * next call to {@link #getTimestamp(T)}. 752 + * next call to {@link #getTimestamp(T, U)}.
752 */ 753 */
753 public Timestamp peekAtNextTimestamp() { 754 public Timestamp peekAtNextTimestamp() {
754 return peek(1); 755 return peek(1);
......
...@@ -85,6 +85,7 @@ import org.onosproject.net.intent.Intent; ...@@ -85,6 +85,7 @@ import org.onosproject.net.intent.Intent;
85 import org.onosproject.net.intent.IntentId; 85 import org.onosproject.net.intent.IntentId;
86 import org.onosproject.net.intent.IntentOperation; 86 import org.onosproject.net.intent.IntentOperation;
87 import org.onosproject.net.intent.IntentState; 87 import org.onosproject.net.intent.IntentState;
88 +import org.onosproject.net.intent.Key;
88 import org.onosproject.net.intent.LinkCollectionIntent; 89 import org.onosproject.net.intent.LinkCollectionIntent;
89 import org.onosproject.net.intent.MultiPointToSinglePointIntent; 90 import org.onosproject.net.intent.MultiPointToSinglePointIntent;
90 import org.onosproject.net.intent.OpticalConnectivityIntent; 91 import org.onosproject.net.intent.OpticalConnectivityIntent;
...@@ -285,6 +286,9 @@ public final class KryoNamespaces { ...@@ -285,6 +286,9 @@ public final class KryoNamespaces {
285 FlowRuleBatchEntry.FlowRuleOperation.class, 286 FlowRuleBatchEntry.FlowRuleOperation.class,
286 IntentId.class, 287 IntentId.class,
287 IntentState.class, 288 IntentState.class,
289 + Key.class,
290 + Key.LongKey.class,
291 + Key.StringKey.class,
288 Intent.class, 292 Intent.class,
289 ConnectivityIntent.class, 293 ConnectivityIntent.class,
290 PathIntent.class, 294 PathIntent.class,
......