HIGUCHI Yuta
Committed by Gerrit Code Review

Use separate instance of serializer for ClusterMessaging and ECMap

- might be related to CORD-199
- Builder was not intended to be shared, since they are mutable

- also unsubscribe from cluster communicator on deactivate()

Change-Id: I0eebec1d5420277b33e2fb373119ffcb40a31c43
...@@ -128,6 +128,8 @@ public class DistributedGroupStore ...@@ -128,6 +128,8 @@ public class DistributedGroupStore
128 128
129 private final AtomicLong sequenceNumber = new AtomicLong(0); 129 private final AtomicLong sequenceNumber = new AtomicLong(0);
130 130
131 + private KryoNamespace clusterMsgSerializer;
132 +
131 @Activate 133 @Activate
132 public void activate() { 134 public void activate() {
133 kryoBuilder = new KryoNamespace.Builder() 135 kryoBuilder = new KryoNamespace.Builder()
...@@ -150,13 +152,15 @@ public class DistributedGroupStore ...@@ -150,13 +152,15 @@ public class DistributedGroupStore
150 GroupStoreMapKey.class 152 GroupStoreMapKey.class
151 ); 153 );
152 154
155 + clusterMsgSerializer = kryoBuilder.build();
156 +
153 messageHandlingExecutor = Executors. 157 messageHandlingExecutor = Executors.
154 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE, 158 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
155 groupedThreads("onos/store/group", 159 groupedThreads("onos/store/group",
156 "message-handlers")); 160 "message-handlers"));
157 161
158 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, 162 clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
159 - kryoBuilder.build()::deserialize, 163 + clusterMsgSerializer::deserialize,
160 this::process, 164 this::process,
161 messageHandlingExecutor); 165 messageHandlingExecutor);
162 166
...@@ -192,6 +196,7 @@ public class DistributedGroupStore ...@@ -192,6 +196,7 @@ public class DistributedGroupStore
192 196
193 @Deactivate 197 @Deactivate
194 public void deactivate() { 198 public void deactivate() {
199 + clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
195 groupStoreEntriesByKey.destroy(); 200 groupStoreEntriesByKey.destroy();
196 auditPendingReqQueue.destroy(); 201 auditPendingReqQueue.destroy();
197 log.info("Stopped"); 202 log.info("Stopped");
...@@ -366,7 +371,7 @@ public class DistributedGroupStore ...@@ -366,7 +371,7 @@ public class DistributedGroupStore
366 371
367 clusterCommunicator.unicast(groupOp, 372 clusterCommunicator.unicast(groupOp,
368 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, 373 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
369 - m -> kryoBuilder.build().serialize(m), 374 + clusterMsgSerializer::serialize,
370 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> { 375 mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
371 if (error != null) { 376 if (error != null) {
372 log.warn("Failed to send request to master: {} to {}", 377 log.warn("Failed to send request to master: {} to {}",
...@@ -564,7 +569,7 @@ public class DistributedGroupStore ...@@ -564,7 +569,7 @@ public class DistributedGroupStore
564 569
565 clusterCommunicator.unicast(groupOp, 570 clusterCommunicator.unicast(groupOp,
566 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, 571 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
567 - m -> kryoBuilder.build().serialize(m), 572 + clusterMsgSerializer::serialize,
568 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> { 573 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
569 if (error != null) { 574 if (error != null) {
570 log.warn("Failed to send request to master: {} to {}", 575 log.warn("Failed to send request to master: {} to {}",
...@@ -696,7 +701,7 @@ public class DistributedGroupStore ...@@ -696,7 +701,7 @@ public class DistributedGroupStore
696 701
697 clusterCommunicator.unicast(groupOp, 702 clusterCommunicator.unicast(groupOp,
698 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST, 703 GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
699 - m -> kryoBuilder.build().serialize(m), 704 + clusterMsgSerializer::serialize,
700 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> { 705 mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
701 if (error != null) { 706 if (error != null) {
702 log.warn("Failed to send request to master: {} to {}", 707 log.warn("Failed to send request to master: {} to {}",
......