helenyrwu
Committed by Gerrit Code Review

Distribute failover event with topic

Change-Id: I8629e7e19ebd4a18f95b32ad3ce1eba7ddf4ecc6
...@@ -47,4 +47,9 @@ public class TestStorageService extends StorageServiceAdapter { ...@@ -47,4 +47,9 @@ public class TestStorageService extends StorageServiceAdapter {
47 public TransactionContextBuilder transactionContextBuilder() { 47 public TransactionContextBuilder transactionContextBuilder() {
48 throw new UnsupportedOperationException("transactionContextBuilder"); 48 throw new UnsupportedOperationException("transactionContextBuilder");
49 } 49 }
50 +
51 + @Override
52 + public <T> Topic<T> getTopic(String name, Serializer serializer) {
53 + return new TestTopic(name);
54 + }
50 } 55 }
......
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.service;
18 +
19 +import com.google.common.collect.Sets;
20 +
21 +import java.util.Set;
22 +import java.util.concurrent.CompletableFuture;
23 +import java.util.function.Consumer;
24 +
25 +/**
26 + * Test implementation of topic.
27 + */
28 +public class TestTopic<T> implements Topic<T> {
29 + private final String name;
30 + private final Set<Consumer<T>> callbacks = Sets.newConcurrentHashSet();
31 +
32 + public TestTopic(String name) {
33 + this.name = name;
34 + }
35 +
36 + @Override
37 + public CompletableFuture<Void> publish(T message) {
38 + callbacks.forEach(c -> c.accept(message));
39 + return CompletableFuture.completedFuture(null);
40 + }
41 +
42 + @Override
43 + public CompletableFuture<Void> subscribe(Consumer<T> callback) {
44 + callbacks.add(callback);
45 + return CompletableFuture.completedFuture(null);
46 + }
47 +
48 + @Override
49 + public CompletableFuture<Void> unsubscribe(Consumer<T> callback) {
50 + callbacks.remove(callback);
51 + return CompletableFuture.completedFuture(null);
52 + }
53 +
54 + @Override
55 + public String name() {
56 + return name;
57 + }
58 +
59 + @Override
60 + public Type primitiveType() {
61 + return Type.TOPIC;
62 + }
63 +}
...@@ -62,6 +62,7 @@ import org.onosproject.store.service.MapEventListener; ...@@ -62,6 +62,7 @@ import org.onosproject.store.service.MapEventListener;
62 import org.onosproject.store.service.MultiValuedTimestamp; 62 import org.onosproject.store.service.MultiValuedTimestamp;
63 import org.onosproject.store.service.Serializer; 63 import org.onosproject.store.service.Serializer;
64 import org.onosproject.store.service.StorageService; 64 import org.onosproject.store.service.StorageService;
65 +import org.onosproject.store.service.Topic;
65 import org.onosproject.store.service.Versioned; 66 import org.onosproject.store.service.Versioned;
66 import org.osgi.service.component.ComponentContext; 67 import org.osgi.service.component.ComponentContext;
67 import org.slf4j.Logger; 68 import org.slf4j.Logger;
...@@ -145,6 +146,8 @@ public class DistributedGroupStore ...@@ -145,6 +146,8 @@ public class DistributedGroupStore
145 146
146 private KryoNamespace clusterMsgSerializer; 147 private KryoNamespace clusterMsgSerializer;
147 148
149 + private static Topic<GroupStoreMessage> groupTopic;
150 +
148 @Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT, 151 @Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT,
149 label = "Enable group garbage collection") 152 label = "Enable group garbage collection")
150 private boolean garbageCollect = GARBAGE_COLLECT; 153 private boolean garbageCollect = GARBAGE_COLLECT;
...@@ -210,6 +213,9 @@ public class DistributedGroupStore ...@@ -210,6 +213,9 @@ public class DistributedGroupStore
210 log.debug("Current size of pendinggroupkeymap:{}", 213 log.debug("Current size of pendinggroupkeymap:{}",
211 auditPendingReqQueue.size()); 214 auditPendingReqQueue.size());
212 215
216 + groupTopic = getOrCreateGroupTopic(serializer);
217 + groupTopic.subscribe(this::processGroupMessage);
218 +
213 log.info("Started"); 219 log.info("Started");
214 } 220 }
215 221
...@@ -237,6 +243,14 @@ public class DistributedGroupStore ...@@ -237,6 +243,14 @@ public class DistributedGroupStore
237 } 243 }
238 } 244 }
239 245
246 + private Topic<GroupStoreMessage> getOrCreateGroupTopic(Serializer serializer) {
247 + if (groupTopic == null) {
248 + return storageService.getTopic("group-failover-notif", serializer);
249 + } else {
250 + return groupTopic;
251 + }
252 + };
253 +
240 /** 254 /**
241 * Returns the group store eventual consistent key map. 255 * Returns the group store eventual consistent key map.
242 * 256 *
...@@ -1109,6 +1123,16 @@ public class DistributedGroupStore ...@@ -1109,6 +1123,16 @@ public class DistributedGroupStore
1109 } 1123 }
1110 } 1124 }
1111 1125
1126 + private void processGroupMessage(GroupStoreMessage message) {
1127 + if (message.type() == GroupStoreMessage.Type.FAILOVER) {
1128 + // FIXME: groupStoreEntriesByKey inaccessible here
1129 + getGroupIdTable(message.deviceId()).values()
1130 + .stream()
1131 + .filter((storedGroup) -> (storedGroup.appCookie().equals(message.appCookie())))
1132 + .findFirst().ifPresent(group -> notifyDelegate(new GroupEvent(Type.GROUP_BUCKET_FAILOVER, group)));
1133 + }
1134 + }
1135 +
1112 private void process(GroupStoreMessage groupOp) { 1136 private void process(GroupStoreMessage groupOp) {
1113 log.debug("Received remote group operation {} request for device {}", 1137 log.debug("Received remote group operation {} request for device {}",
1114 groupOp.type(), 1138 groupOp.type(),
...@@ -1314,13 +1338,12 @@ public class DistributedGroupStore ...@@ -1314,13 +1338,12 @@ public class DistributedGroupStore
1314 1338
1315 @Override 1339 @Override
1316 public void notifyOfFailovers(Collection<Group> failoverGroups) { 1340 public void notifyOfFailovers(Collection<Group> failoverGroups) {
1317 - List<GroupEvent> failoverEvents = new ArrayList<>();
1318 failoverGroups.forEach(group -> { 1341 failoverGroups.forEach(group -> {
1319 if (group.type() == Group.Type.FAILOVER) { 1342 if (group.type() == Group.Type.FAILOVER) {
1320 - failoverEvents.add(new GroupEvent(GroupEvent.Type.GROUP_BUCKET_FAILOVER, group)); 1343 + groupTopic.publish(GroupStoreMessage.createGroupFailoverMsg(
1344 + group.deviceId(), group));
1321 } 1345 }
1322 }); 1346 });
1323 - notifyDelegate(failoverEvents);
1324 } 1347 }
1325 1348
1326 private void garbageCollect(DeviceId deviceId, 1349 private void garbageCollect(DeviceId deviceId,
......
...@@ -40,7 +40,8 @@ public final class GroupStoreMessage { ...@@ -40,7 +40,8 @@ public final class GroupStoreMessage {
40 public enum Type { 40 public enum Type {
41 ADD, 41 ADD,
42 UPDATE, 42 UPDATE,
43 - DELETE 43 + DELETE,
44 + FAILOVER
44 } 45 }
45 46
46 private GroupStoreMessage(Type type, 47 private GroupStoreMessage(Type type,
...@@ -119,6 +120,18 @@ public final class GroupStoreMessage { ...@@ -119,6 +120,18 @@ public final class GroupStoreMessage {
119 null); 120 null);
120 } 121 }
121 122
123 + public static GroupStoreMessage createGroupFailoverMsg(DeviceId deviceId,
124 + GroupDescription desc) {
125 + return new GroupStoreMessage(Type.FAILOVER,
126 + deviceId,
127 + desc.appCookie(),
128 + desc,
129 + null,
130 + null,
131 + desc.appCookie());
132 + }
133 +
134 +
122 /** 135 /**
123 * Returns the device identifier of this group request. 136 * Returns the device identifier of this group request.
124 * 137 *
......