Committed by
Gerrit Code Review
Bug Fix - Tunnel subsystem 1. After delete the tunnel and then query the tunnel …
…and call provider. Query will always return null. Null Pointer exception. 2. In Borrow Tunnel, Order relationship is added even if the there is no tunnel to borrow. Note only where tunnel manger do not initiate the tunnel creation through provider. 3. In ConsistancyMap when the key (the set) is empty still the map hold the key with empty list and never deleted. Memory leak. Change-Id: Iff8ba662f4828324c0588a9dfc494a2b158bcfce
Showing
2 changed files
with
150 additions
and
109 deletions
... | @@ -93,18 +93,20 @@ public class TunnelManager | ... | @@ -93,18 +93,20 @@ public class TunnelManager |
93 | @Override | 93 | @Override |
94 | public void removeTunnel(TunnelId tunnelId) { | 94 | public void removeTunnel(TunnelId tunnelId) { |
95 | checkNotNull(tunnelId, TUNNNEL_ID_NULL); | 95 | checkNotNull(tunnelId, TUNNNEL_ID_NULL); |
96 | - store.deleteTunnel(tunnelId); | ||
97 | Tunnel tunnel = store.queryTunnel(tunnelId); | 96 | Tunnel tunnel = store.queryTunnel(tunnelId); |
98 | - if (tunnel.providerId() != null) { | 97 | + if (tunnel != null) { |
99 | - TunnelProvider provider = getProvider(tunnel.providerId()); | 98 | + store.deleteTunnel(tunnelId); |
100 | - if (provider != null) { | 99 | + if (tunnel.providerId() != null) { |
101 | - provider.releaseTunnel(tunnel); | 100 | + TunnelProvider provider = getProvider(tunnel.providerId()); |
102 | - } | 101 | + if (provider != null) { |
103 | - } else { | 102 | + provider.releaseTunnel(tunnel); |
104 | - Set<ProviderId> ids = getProviders(); | 103 | + } |
105 | - for (ProviderId providerId : ids) { | 104 | + } else { |
106 | - TunnelProvider provider = getProvider(providerId); | 105 | + Set<ProviderId> ids = getProviders(); |
107 | - provider.releaseTunnel(tunnel); | 106 | + for (ProviderId providerId : ids) { |
107 | + TunnelProvider provider = getProvider(providerId); | ||
108 | + provider.releaseTunnel(tunnel); | ||
109 | + } | ||
108 | } | 110 | } |
109 | } | 111 | } |
110 | } | 112 | } |
... | @@ -129,23 +131,25 @@ public class TunnelManager | ... | @@ -129,23 +131,25 @@ public class TunnelManager |
129 | @Override | 131 | @Override |
130 | public void removeTunnels(TunnelEndPoint src, TunnelEndPoint dst, | 132 | public void removeTunnels(TunnelEndPoint src, TunnelEndPoint dst, |
131 | ProviderId producerName) { | 133 | ProviderId producerName) { |
132 | - store.deleteTunnel(src, dst, producerName); | ||
133 | Collection<Tunnel> setTunnels = store.queryTunnel(src, dst); | 134 | Collection<Tunnel> setTunnels = store.queryTunnel(src, dst); |
134 | - for (Tunnel tunnel : setTunnels) { | 135 | + if (!setTunnels.isEmpty()) { |
135 | - if (producerName != null | 136 | + store.deleteTunnel(src, dst, producerName); |
136 | - && !tunnel.providerId().equals(producerName)) { | 137 | + for (Tunnel tunnel : setTunnels) { |
137 | - continue; | 138 | + if (producerName != null |
138 | - } | 139 | + && !tunnel.providerId().equals(producerName)) { |
139 | - if (tunnel.providerId() != null) { | 140 | + continue; |
140 | - TunnelProvider provider = getProvider(tunnel.providerId()); | ||
141 | - if (provider != null) { | ||
142 | - provider.releaseTunnel(tunnel); | ||
143 | } | 141 | } |
144 | - } else { | 142 | + if (tunnel.providerId() != null) { |
145 | - Set<ProviderId> ids = getProviders(); | 143 | + TunnelProvider provider = getProvider(tunnel.providerId()); |
146 | - for (ProviderId providerId : ids) { | 144 | + if (provider != null) { |
147 | - TunnelProvider provider = getProvider(providerId); | 145 | + provider.releaseTunnel(tunnel); |
148 | - provider.releaseTunnel(tunnel); | 146 | + } |
147 | + } else { | ||
148 | + Set<ProviderId> ids = getProviders(); | ||
149 | + for (ProviderId providerId : ids) { | ||
150 | + TunnelProvider provider = getProvider(providerId); | ||
151 | + provider.releaseTunnel(tunnel); | ||
152 | + } | ||
149 | } | 153 | } |
150 | } | 154 | } |
151 | } | 155 | } |
... | @@ -154,24 +158,26 @@ public class TunnelManager | ... | @@ -154,24 +158,26 @@ public class TunnelManager |
154 | @Override | 158 | @Override |
155 | public void removeTunnels(TunnelEndPoint src, TunnelEndPoint dst, Type type, | 159 | public void removeTunnels(TunnelEndPoint src, TunnelEndPoint dst, Type type, |
156 | ProviderId producerName) { | 160 | ProviderId producerName) { |
157 | - store.deleteTunnel(src, dst, type, producerName); | ||
158 | Collection<Tunnel> setTunnels = store.queryTunnel(src, dst); | 161 | Collection<Tunnel> setTunnels = store.queryTunnel(src, dst); |
159 | - for (Tunnel tunnel : setTunnels) { | 162 | + if (!setTunnels.isEmpty()) { |
160 | - if (producerName != null | 163 | + store.deleteTunnel(src, dst, type, producerName); |
161 | - && !tunnel.providerId().equals(producerName) | 164 | + for (Tunnel tunnel : setTunnels) { |
162 | - || !type.equals(tunnel.type())) { | 165 | + if (producerName != null |
163 | - continue; | 166 | + && !tunnel.providerId().equals(producerName) |
164 | - } | 167 | + || !type.equals(tunnel.type())) { |
165 | - if (tunnel.providerId() != null) { | 168 | + continue; |
166 | - TunnelProvider provider = getProvider(tunnel.providerId()); | ||
167 | - if (provider != null) { | ||
168 | - provider.releaseTunnel(tunnel); | ||
169 | } | 169 | } |
170 | - } else { | 170 | + if (tunnel.providerId() != null) { |
171 | - Set<ProviderId> ids = getProviders(); | 171 | + TunnelProvider provider = getProvider(tunnel.providerId()); |
172 | - for (ProviderId providerId : ids) { | 172 | + if (provider != null) { |
173 | - TunnelProvider provider = getProvider(providerId); | 173 | + provider.releaseTunnel(tunnel); |
174 | - provider.releaseTunnel(tunnel); | 174 | + } |
175 | + } else { | ||
176 | + Set<ProviderId> ids = getProviders(); | ||
177 | + for (ProviderId providerId : ids) { | ||
178 | + TunnelProvider provider = getProvider(providerId); | ||
179 | + provider.releaseTunnel(tunnel); | ||
180 | + } | ||
175 | } | 181 | } |
176 | } | 182 | } |
177 | } | 183 | } | ... | ... |
... | @@ -22,9 +22,11 @@ import java.util.Collection; | ... | @@ -22,9 +22,11 @@ import java.util.Collection; |
22 | import java.util.Collections; | 22 | import java.util.Collections; |
23 | import java.util.HashSet; | 23 | import java.util.HashSet; |
24 | import java.util.List; | 24 | import java.util.List; |
25 | +import java.util.Map; | ||
25 | import java.util.Objects; | 26 | import java.util.Objects; |
26 | import java.util.Set; | 27 | import java.util.Set; |
27 | 28 | ||
29 | +import com.google.common.collect.Maps; | ||
28 | import org.apache.felix.scr.annotations.Activate; | 30 | import org.apache.felix.scr.annotations.Activate; |
29 | import org.apache.felix.scr.annotations.Component; | 31 | import org.apache.felix.scr.annotations.Component; |
30 | import org.apache.felix.scr.annotations.Deactivate; | 32 | import org.apache.felix.scr.annotations.Deactivate; |
... | @@ -56,6 +58,8 @@ import org.onosproject.store.app.GossipApplicationStore.InternalState; | ... | @@ -56,6 +58,8 @@ import org.onosproject.store.app.GossipApplicationStore.InternalState; |
56 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; | 58 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
57 | import org.onosproject.store.serializers.KryoNamespaces; | 59 | import org.onosproject.store.serializers.KryoNamespaces; |
58 | import org.onosproject.store.service.EventuallyConsistentMap; | 60 | import org.onosproject.store.service.EventuallyConsistentMap; |
61 | +import org.onosproject.store.service.EventuallyConsistentMapEvent; | ||
62 | +import org.onosproject.store.service.EventuallyConsistentMapListener; | ||
59 | import org.onosproject.store.service.MultiValuedTimestamp; | 63 | import org.onosproject.store.service.MultiValuedTimestamp; |
60 | import org.onosproject.store.service.StorageService; | 64 | import org.onosproject.store.service.StorageService; |
61 | import org.onosproject.store.service.WallClockTimestamp; | 65 | import org.onosproject.store.service.WallClockTimestamp; |
... | @@ -64,6 +68,9 @@ import org.slf4j.Logger; | ... | @@ -64,6 +68,9 @@ import org.slf4j.Logger; |
64 | import com.google.common.base.MoreObjects; | 68 | import com.google.common.base.MoreObjects; |
65 | import com.google.common.collect.ImmutableSet; | 69 | import com.google.common.collect.ImmutableSet; |
66 | 70 | ||
71 | +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; | ||
72 | +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; | ||
73 | + | ||
67 | /** | 74 | /** |
68 | * Manages inventory of tunnel in distributed data store that uses optimistic | 75 | * Manages inventory of tunnel in distributed data store that uses optimistic |
69 | * replication and gossip based techniques. | 76 | * replication and gossip based techniques. |
... | @@ -95,17 +102,21 @@ public class DistributedTunnelStore | ... | @@ -95,17 +102,21 @@ public class DistributedTunnelStore |
95 | 102 | ||
96 | // tunnel identity as map key in the store. | 103 | // tunnel identity as map key in the store. |
97 | private EventuallyConsistentMap<TunnelId, Tunnel> tunnelIdAsKeyStore; | 104 | private EventuallyConsistentMap<TunnelId, Tunnel> tunnelIdAsKeyStore; |
98 | - // tunnel name as map key in the store. | ||
99 | - private EventuallyConsistentMap<TunnelName, Set<TunnelId>> tunnelNameAsKeyStore; | ||
100 | - // maintains all the tunnels between source and destination. | ||
101 | - private EventuallyConsistentMap<TunnelKey, Set<TunnelId>> srcAndDstKeyStore; | ||
102 | - // maintains all the tunnels by tunnel type. | ||
103 | - private EventuallyConsistentMap<Tunnel.Type, Set<TunnelId>> typeKeyStore; | ||
104 | // maintains records that app subscribes tunnel. | 105 | // maintains records that app subscribes tunnel. |
105 | private EventuallyConsistentMap<ApplicationId, Set<TunnelSubscription>> orderRelationship; | 106 | private EventuallyConsistentMap<ApplicationId, Set<TunnelSubscription>> orderRelationship; |
106 | 107 | ||
108 | + // tunnel name as map key. | ||
109 | + private final Map<TunnelName, Set<TunnelId>> tunnelNameAsKeyMap = Maps.newConcurrentMap(); | ||
110 | + // maintains all the tunnels between source and destination. | ||
111 | + private final Map<TunnelKey, Set<TunnelId>> srcAndDstKeyMap = Maps.newConcurrentMap(); | ||
112 | + // maintains all the tunnels by tunnel type. | ||
113 | + private final Map<Tunnel.Type, Set<TunnelId>> typeKeyMap = Maps.newConcurrentMap(); | ||
114 | + | ||
107 | private IdGenerator idGenerator; | 115 | private IdGenerator idGenerator; |
108 | 116 | ||
117 | + private EventuallyConsistentMapListener<TunnelId, Tunnel> tunnelUpdateListener = | ||
118 | + new InternalTunnelChangeEventListener(); | ||
119 | + | ||
109 | @Activate | 120 | @Activate |
110 | public void activate() { | 121 | public void activate() { |
111 | KryoNamespace.Builder serializer = KryoNamespace.newBuilder() | 122 | KryoNamespace.Builder serializer = KryoNamespace.newBuilder() |
... | @@ -116,33 +127,23 @@ public class DistributedTunnelStore | ... | @@ -116,33 +127,23 @@ public class DistributedTunnelStore |
116 | .<TunnelId, Tunnel>eventuallyConsistentMapBuilder() | 127 | .<TunnelId, Tunnel>eventuallyConsistentMapBuilder() |
117 | .withName("all_tunnel").withSerializer(serializer) | 128 | .withName("all_tunnel").withSerializer(serializer) |
118 | .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); | 129 | .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); |
119 | - tunnelNameAsKeyStore = storageService | ||
120 | - .<TunnelName, Set<TunnelId>>eventuallyConsistentMapBuilder() | ||
121 | - .withName("tunnel_name_tunnel").withSerializer(serializer) | ||
122 | - .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); | ||
123 | - srcAndDstKeyStore = storageService | ||
124 | - .<TunnelKey, Set<TunnelId>>eventuallyConsistentMapBuilder() | ||
125 | - .withName("src_dst_tunnel").withSerializer(serializer) | ||
126 | - .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); | ||
127 | - typeKeyStore = storageService | ||
128 | - .<Tunnel.Type, Set<TunnelId>>eventuallyConsistentMapBuilder() | ||
129 | - .withName("type_tunnel").withSerializer(serializer) | ||
130 | - .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); | ||
131 | orderRelationship = storageService | 130 | orderRelationship = storageService |
132 | .<ApplicationId, Set<TunnelSubscription>>eventuallyConsistentMapBuilder() | 131 | .<ApplicationId, Set<TunnelSubscription>>eventuallyConsistentMapBuilder() |
133 | .withName("type_tunnel").withSerializer(serializer) | 132 | .withName("type_tunnel").withSerializer(serializer) |
134 | .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); | 133 | .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); |
135 | idGenerator = coreService.getIdGenerator(tunnelOpTopic); | 134 | idGenerator = coreService.getIdGenerator(tunnelOpTopic); |
135 | + tunnelIdAsKeyStore.addListener(tunnelUpdateListener); | ||
136 | log.info("Started"); | 136 | log.info("Started"); |
137 | } | 137 | } |
138 | 138 | ||
139 | @Deactivate | 139 | @Deactivate |
140 | public void deactivate() { | 140 | public void deactivate() { |
141 | + tunnelIdAsKeyStore.removeListener(tunnelUpdateListener); | ||
141 | orderRelationship.destroy(); | 142 | orderRelationship.destroy(); |
142 | tunnelIdAsKeyStore.destroy(); | 143 | tunnelIdAsKeyStore.destroy(); |
143 | - srcAndDstKeyStore.destroy(); | 144 | + srcAndDstKeyMap.clear(); |
144 | - typeKeyStore.destroy(); | 145 | + typeKeyMap.clear(); |
145 | - tunnelNameAsKeyStore.destroy(); | 146 | + tunnelNameAsKeyMap.clear(); |
146 | log.info("Stopped"); | 147 | log.info("Stopped"); |
147 | } | 148 | } |
148 | 149 | ||
... | @@ -189,28 +190,8 @@ public class DistributedTunnelStore | ... | @@ -189,28 +190,8 @@ public class DistributedTunnelStore |
189 | tunnel.tunnelName(), | 190 | tunnel.tunnelName(), |
190 | tunnel.path(), | 191 | tunnel.path(), |
191 | tunnel.annotations()); | 192 | tunnel.annotations()); |
192 | - TunnelKey key = TunnelKey.tunnelKey(tunnel.src(), tunnel.dst()); | ||
193 | tunnelIdAsKeyStore.put(tunnelId, newT); | 193 | tunnelIdAsKeyStore.put(tunnelId, newT); |
194 | - Set<TunnelId> tunnelnameSet = tunnelNameAsKeyStore.get(tunnel | 194 | + |
195 | - .tunnelName()); | ||
196 | - if (tunnelnameSet == null) { | ||
197 | - tunnelnameSet = new HashSet<TunnelId>(); | ||
198 | - } | ||
199 | - tunnelnameSet.add(tunnelId); | ||
200 | - tunnelNameAsKeyStore.put(tunnel | ||
201 | - .tunnelName(), tunnelnameSet); | ||
202 | - Set<TunnelId> srcAndDstKeySet = srcAndDstKeyStore.get(key); | ||
203 | - if (srcAndDstKeySet == null) { | ||
204 | - srcAndDstKeySet = new HashSet<TunnelId>(); | ||
205 | - } | ||
206 | - srcAndDstKeySet.add(tunnelId); | ||
207 | - srcAndDstKeyStore.put(key, srcAndDstKeySet); | ||
208 | - Set<TunnelId> typeKeySet = typeKeyStore.get(tunnel.type()); | ||
209 | - if (typeKeySet == null) { | ||
210 | - typeKeySet = new HashSet<TunnelId>(); | ||
211 | - } | ||
212 | - typeKeySet.add(tunnelId); | ||
213 | - typeKeyStore.put(tunnel.type(), typeKeySet); | ||
214 | TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_ADDED, | 195 | TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_ADDED, |
215 | tunnel); | 196 | tunnel); |
216 | notifyDelegate(event); | 197 | notifyDelegate(event); |
... | @@ -224,11 +205,9 @@ public class DistributedTunnelStore | ... | @@ -224,11 +205,9 @@ public class DistributedTunnelStore |
224 | if (deletedTunnel == null) { | 205 | if (deletedTunnel == null) { |
225 | return; | 206 | return; |
226 | } | 207 | } |
227 | - tunnelNameAsKeyStore.get(deletedTunnel.tunnelName()).remove(tunnelId); | 208 | + |
228 | tunnelIdAsKeyStore.remove(tunnelId); | 209 | tunnelIdAsKeyStore.remove(tunnelId); |
229 | - TunnelKey key = new TunnelKey(deletedTunnel.src(), deletedTunnel.dst()); | 210 | + |
230 | - srcAndDstKeyStore.get(key).remove(tunnelId); | ||
231 | - typeKeyStore.get(deletedTunnel.type()).remove(tunnelId); | ||
232 | TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED, | 211 | TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED, |
233 | deletedTunnel); | 212 | deletedTunnel); |
234 | notifyDelegate(event); | 213 | notifyDelegate(event); |
... | @@ -238,7 +217,7 @@ public class DistributedTunnelStore | ... | @@ -238,7 +217,7 @@ public class DistributedTunnelStore |
238 | public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst, | 217 | public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst, |
239 | ProviderId producerName) { | 218 | ProviderId producerName) { |
240 | TunnelKey key = TunnelKey.tunnelKey(src, dst); | 219 | TunnelKey key = TunnelKey.tunnelKey(src, dst); |
241 | - Set<TunnelId> idSet = srcAndDstKeyStore.get(key); | 220 | + Set<TunnelId> idSet = srcAndDstKeyMap.get(key); |
242 | if (idSet == null) { | 221 | if (idSet == null) { |
243 | return; | 222 | return; |
244 | } | 223 | } |
... | @@ -251,11 +230,7 @@ public class DistributedTunnelStore | ... | @@ -251,11 +230,7 @@ public class DistributedTunnelStore |
251 | if (producerName == null || (producerName != null | 230 | if (producerName == null || (producerName != null |
252 | && producerName.equals(deletedTunnel.providerId()))) { | 231 | && producerName.equals(deletedTunnel.providerId()))) { |
253 | tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId()); | 232 | tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId()); |
254 | - tunnelNameAsKeyStore.get(deletedTunnel.tunnelName()) | 233 | + |
255 | - .remove(deletedTunnel.tunnelId()); | ||
256 | - srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId()); | ||
257 | - typeKeyStore.get(deletedTunnel.type()) | ||
258 | - .remove(deletedTunnel.tunnelId()); | ||
259 | event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED, | 234 | event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED, |
260 | deletedTunnel); | 235 | deletedTunnel); |
261 | ls.add(event); | 236 | ls.add(event); |
... | @@ -271,7 +246,7 @@ public class DistributedTunnelStore | ... | @@ -271,7 +246,7 @@ public class DistributedTunnelStore |
271 | public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst, Type type, | 246 | public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst, Type type, |
272 | ProviderId producerName) { | 247 | ProviderId producerName) { |
273 | TunnelKey key = TunnelKey.tunnelKey(src, dst); | 248 | TunnelKey key = TunnelKey.tunnelKey(src, dst); |
274 | - Set<TunnelId> idSet = srcAndDstKeyStore.get(key); | 249 | + Set<TunnelId> idSet = srcAndDstKeyMap.get(key); |
275 | if (idSet == null) { | 250 | if (idSet == null) { |
276 | return; | 251 | return; |
277 | } | 252 | } |
... | @@ -284,11 +259,7 @@ public class DistributedTunnelStore | ... | @@ -284,11 +259,7 @@ public class DistributedTunnelStore |
284 | if (type.equals(deletedTunnel.type()) && (producerName == null || (producerName != null | 259 | if (type.equals(deletedTunnel.type()) && (producerName == null || (producerName != null |
285 | && producerName.equals(deletedTunnel.providerId())))) { | 260 | && producerName.equals(deletedTunnel.providerId())))) { |
286 | tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId()); | 261 | tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId()); |
287 | - tunnelNameAsKeyStore.get(deletedTunnel.tunnelName()) | 262 | + |
288 | - .remove(deletedTunnel.tunnelId()); | ||
289 | - srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId()); | ||
290 | - typeKeyStore.get(deletedTunnel.type()) | ||
291 | - .remove(deletedTunnel.tunnelId()); | ||
292 | event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED, | 263 | event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED, |
293 | deletedTunnel); | 264 | deletedTunnel); |
294 | ls.add(event); | 265 | ls.add(event); |
... | @@ -335,7 +306,7 @@ public class DistributedTunnelStore | ... | @@ -335,7 +306,7 @@ public class DistributedTunnelStore |
335 | } | 306 | } |
336 | orderRelationship.put(appId, orderSet); | 307 | orderRelationship.put(appId, orderSet); |
337 | TunnelKey key = TunnelKey.tunnelKey(src, dst); | 308 | TunnelKey key = TunnelKey.tunnelKey(src, dst); |
338 | - Set<TunnelId> idSet = srcAndDstKeyStore.get(key); | 309 | + Set<TunnelId> idSet = srcAndDstKeyMap.get(key); |
339 | if (idSet == null || idSet.size() == 0) { | 310 | if (idSet == null || idSet.size() == 0) { |
340 | return Collections.emptySet(); | 311 | return Collections.emptySet(); |
341 | } | 312 | } |
... | @@ -365,7 +336,7 @@ public class DistributedTunnelStore | ... | @@ -365,7 +336,7 @@ public class DistributedTunnelStore |
365 | } | 336 | } |
366 | orderRelationship.put(appId, orderSet); | 337 | orderRelationship.put(appId, orderSet); |
367 | TunnelKey key = TunnelKey.tunnelKey(src, dst); | 338 | TunnelKey key = TunnelKey.tunnelKey(src, dst); |
368 | - Set<TunnelId> idSet = srcAndDstKeyStore.get(key); | 339 | + Set<TunnelId> idSet = srcAndDstKeyMap.get(key); |
369 | if (idSet == null || idSet.size() == 0) { | 340 | if (idSet == null || idSet.size() == 0) { |
370 | return Collections.emptySet(); | 341 | return Collections.emptySet(); |
371 | } | 342 | } |
... | @@ -391,11 +362,8 @@ public class DistributedTunnelStore | ... | @@ -391,11 +362,8 @@ public class DistributedTunnelStore |
391 | TunnelSubscription order = new TunnelSubscription(appId, null, null, null, null, tunnelName, | 362 | TunnelSubscription order = new TunnelSubscription(appId, null, null, null, null, tunnelName, |
392 | annotations); | 363 | annotations); |
393 | boolean isExist = orderSet.contains(order); | 364 | boolean isExist = orderSet.contains(order); |
394 | - if (!isExist) { | 365 | + |
395 | - orderSet.add(order); | 366 | + Set<TunnelId> idSet = tunnelNameAsKeyMap.get(tunnelName); |
396 | - } | ||
397 | - orderRelationship.put(appId, orderSet); | ||
398 | - Set<TunnelId> idSet = tunnelNameAsKeyStore.get(tunnelName); | ||
399 | if (idSet == null || idSet.size() == 0) { | 367 | if (idSet == null || idSet.size() == 0) { |
400 | return Collections.emptySet(); | 368 | return Collections.emptySet(); |
401 | } | 369 | } |
... | @@ -406,6 +374,12 @@ public class DistributedTunnelStore | ... | @@ -406,6 +374,12 @@ public class DistributedTunnelStore |
406 | tunnelSet.add(result); | 374 | tunnelSet.add(result); |
407 | } | 375 | } |
408 | } | 376 | } |
377 | + | ||
378 | + if (!tunnelSet.isEmpty() && !isExist) { | ||
379 | + orderSet.add(order); | ||
380 | + orderRelationship.put(appId, orderSet); | ||
381 | + } | ||
382 | + | ||
409 | return tunnelSet; | 383 | return tunnelSet; |
410 | } | 384 | } |
411 | 385 | ||
... | @@ -466,7 +440,7 @@ public class DistributedTunnelStore | ... | @@ -466,7 +440,7 @@ public class DistributedTunnelStore |
466 | @Override | 440 | @Override |
467 | public Collection<Tunnel> queryTunnel(Type type) { | 441 | public Collection<Tunnel> queryTunnel(Type type) { |
468 | Collection<Tunnel> result = new HashSet<Tunnel>(); | 442 | Collection<Tunnel> result = new HashSet<Tunnel>(); |
469 | - Set<TunnelId> tunnelIds = typeKeyStore.get(type); | 443 | + Set<TunnelId> tunnelIds = typeKeyMap.get(type); |
470 | if (tunnelIds == null) { | 444 | if (tunnelIds == null) { |
471 | return Collections.emptySet(); | 445 | return Collections.emptySet(); |
472 | } | 446 | } |
... | @@ -481,7 +455,7 @@ public class DistributedTunnelStore | ... | @@ -481,7 +455,7 @@ public class DistributedTunnelStore |
481 | public Collection<Tunnel> queryTunnel(TunnelEndPoint src, TunnelEndPoint dst) { | 455 | public Collection<Tunnel> queryTunnel(TunnelEndPoint src, TunnelEndPoint dst) { |
482 | Collection<Tunnel> result = new HashSet<Tunnel>(); | 456 | Collection<Tunnel> result = new HashSet<Tunnel>(); |
483 | TunnelKey key = TunnelKey.tunnelKey(src, dst); | 457 | TunnelKey key = TunnelKey.tunnelKey(src, dst); |
484 | - Set<TunnelId> tunnelIds = srcAndDstKeyStore.get(key); | 458 | + Set<TunnelId> tunnelIds = srcAndDstKeyMap.get(key); |
485 | if (tunnelIds == null) { | 459 | if (tunnelIds == null) { |
486 | return Collections.emptySet(); | 460 | return Collections.emptySet(); |
487 | } | 461 | } |
... | @@ -550,4 +524,65 @@ public class DistributedTunnelStore | ... | @@ -550,4 +524,65 @@ public class DistributedTunnelStore |
550 | .add("dst", dst).toString(); | 524 | .add("dst", dst).toString(); |
551 | } | 525 | } |
552 | } | 526 | } |
527 | + | ||
528 | + /** | ||
529 | + * Eventually consistent map listener for tunnel change event which updated the local map based on event. | ||
530 | + */ | ||
531 | + private class InternalTunnelChangeEventListener | ||
532 | + implements EventuallyConsistentMapListener<TunnelId, Tunnel> { | ||
533 | + @Override | ||
534 | + public void event(EventuallyConsistentMapEvent<TunnelId, Tunnel> event) { | ||
535 | + TunnelId tunnelId = event.key(); | ||
536 | + Tunnel tunnel = event.value(); | ||
537 | + | ||
538 | + if (event.type() == PUT) { | ||
539 | + | ||
540 | + // Update tunnel name map | ||
541 | + Set<TunnelId> tunnelNameSet = tunnelNameAsKeyMap.get(tunnel | ||
542 | + .tunnelName()); | ||
543 | + if (tunnelNameSet == null) { | ||
544 | + tunnelNameSet = new HashSet<TunnelId>(); | ||
545 | + } | ||
546 | + tunnelNameSet.add(tunnelId); | ||
547 | + tunnelNameAsKeyMap.put(tunnel.tunnelName(), tunnelNameSet); | ||
548 | + | ||
549 | + // Update tunnel source and destination map | ||
550 | + TunnelKey key = TunnelKey.tunnelKey(tunnel.src(), tunnel.dst()); | ||
551 | + Set<TunnelId> srcAndDstKeySet = srcAndDstKeyMap.get(key); | ||
552 | + if (srcAndDstKeySet == null) { | ||
553 | + srcAndDstKeySet = new HashSet<TunnelId>(); | ||
554 | + } | ||
555 | + srcAndDstKeySet.add(tunnelId); | ||
556 | + srcAndDstKeyMap.put(key, srcAndDstKeySet); | ||
557 | + | ||
558 | + // Update tunnel type map | ||
559 | + Set<TunnelId> typeKeySet = typeKeyMap.get(tunnel.type()); | ||
560 | + if (typeKeySet == null) { | ||
561 | + typeKeySet = new HashSet<TunnelId>(); | ||
562 | + } | ||
563 | + typeKeySet.add(tunnelId); | ||
564 | + typeKeyMap.put(tunnel.type(), typeKeySet); | ||
565 | + } else if (event.type() == REMOVE) { | ||
566 | + | ||
567 | + // Update tunnel name map | ||
568 | + tunnelNameAsKeyMap.get(tunnel.tunnelName()).remove(tunnelId); | ||
569 | + if (tunnelNameAsKeyMap.get(tunnel.tunnelName()).isEmpty()) { | ||
570 | + tunnelNameAsKeyMap.remove(tunnel.tunnelName()); | ||
571 | + } | ||
572 | + | ||
573 | + // Update tunnel source and destination map | ||
574 | + TunnelKey key = TunnelKey.tunnelKey(tunnel.src(), tunnel.dst()); | ||
575 | + srcAndDstKeyMap.get(key).remove(tunnelId); | ||
576 | + if (srcAndDstKeyMap.get(key).isEmpty()) { | ||
577 | + srcAndDstKeyMap.remove(key); | ||
578 | + } | ||
579 | + | ||
580 | + // Update tunnel type map | ||
581 | + typeKeyMap.get(tunnel.type()).remove(tunnelId); | ||
582 | + if (typeKeyMap.get(tunnel.type()).isEmpty()) { | ||
583 | + typeKeyMap.remove(tunnel.type()); | ||
584 | + } | ||
585 | + } | ||
586 | + } | ||
587 | + } | ||
553 | } | 588 | } | ... | ... |
-
Please register or login to post a comment