Committed by
Gerrit Code Review
ClusterCommunicationManager: only serialize once for broadcast
Change-Id: Ife78af3c758c87eeb8a79cdbf51b5307b8b1ca88
Showing
1 changed file
with
14 additions
and
7 deletions
... | @@ -103,9 +103,10 @@ public class ClusterCommunicationManager | ... | @@ -103,9 +103,10 @@ public class ClusterCommunicationManager |
103 | public boolean broadcast(ClusterMessage message) { | 103 | public boolean broadcast(ClusterMessage message) { |
104 | boolean ok = true; | 104 | boolean ok = true; |
105 | final ControllerNode localNode = clusterService.getLocalNode(); | 105 | final ControllerNode localNode = clusterService.getLocalNode(); |
106 | + byte[] payload = SERIALIZER.encode(message); | ||
106 | for (ControllerNode node : clusterService.getNodes()) { | 107 | for (ControllerNode node : clusterService.getNodes()) { |
107 | if (!node.equals(localNode)) { | 108 | if (!node.equals(localNode)) { |
108 | - ok = unicastUnchecked(message, node.id()) && ok; | 109 | + ok = unicastUnchecked(message.subject(), payload, node.id()) && ok; |
109 | } | 110 | } |
110 | } | 111 | } |
111 | return ok; | 112 | return ok; |
... | @@ -114,8 +115,9 @@ public class ClusterCommunicationManager | ... | @@ -114,8 +115,9 @@ public class ClusterCommunicationManager |
114 | @Override | 115 | @Override |
115 | public boolean broadcastIncludeSelf(ClusterMessage message) { | 116 | public boolean broadcastIncludeSelf(ClusterMessage message) { |
116 | boolean ok = true; | 117 | boolean ok = true; |
118 | + byte[] payload = SERIALIZER.encode(message); | ||
117 | for (ControllerNode node : clusterService.getNodes()) { | 119 | for (ControllerNode node : clusterService.getNodes()) { |
118 | - ok = unicastUnchecked(message, node.id()) && ok; | 120 | + ok = unicastUnchecked(message.subject(), payload, node.id()) && ok; |
119 | } | 121 | } |
120 | return ok; | 122 | return ok; |
121 | } | 123 | } |
... | @@ -124,9 +126,10 @@ public class ClusterCommunicationManager | ... | @@ -124,9 +126,10 @@ public class ClusterCommunicationManager |
124 | public boolean multicast(ClusterMessage message, Set<NodeId> nodes) { | 126 | public boolean multicast(ClusterMessage message, Set<NodeId> nodes) { |
125 | boolean ok = true; | 127 | boolean ok = true; |
126 | final ControllerNode localNode = clusterService.getLocalNode(); | 128 | final ControllerNode localNode = clusterService.getLocalNode(); |
129 | + byte[] payload = SERIALIZER.encode(message); | ||
127 | for (NodeId nodeId : nodes) { | 130 | for (NodeId nodeId : nodes) { |
128 | if (!nodeId.equals(localNode.id())) { | 131 | if (!nodeId.equals(localNode.id())) { |
129 | - ok = unicastUnchecked(message, nodeId) && ok; | 132 | + ok = unicastUnchecked(message.subject(), payload, nodeId) && ok; |
130 | } | 133 | } |
131 | } | 134 | } |
132 | return ok; | 135 | return ok; |
... | @@ -134,12 +137,15 @@ public class ClusterCommunicationManager | ... | @@ -134,12 +137,15 @@ public class ClusterCommunicationManager |
134 | 137 | ||
135 | @Override | 138 | @Override |
136 | public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException { | 139 | public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException { |
140 | + return unicast(message.subject(), SERIALIZER.encode(message), toNodeId); | ||
141 | + } | ||
142 | + | ||
143 | + private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException { | ||
137 | ControllerNode node = clusterService.getNode(toNodeId); | 144 | ControllerNode node = clusterService.getNode(toNodeId); |
138 | checkArgument(node != null, "Unknown nodeId: %s", toNodeId); | 145 | checkArgument(node != null, "Unknown nodeId: %s", toNodeId); |
139 | Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort()); | 146 | Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort()); |
140 | try { | 147 | try { |
141 | - messagingService.sendAsync(nodeEp, | 148 | + messagingService.sendAsync(nodeEp, subject.value(), payload); |
142 | - message.subject().value(), SERIALIZER.encode(message)); | ||
143 | return true; | 149 | return true; |
144 | } catch (IOException e) { | 150 | } catch (IOException e) { |
145 | log.trace("Failed to send cluster message to nodeId: " + toNodeId, e); | 151 | log.trace("Failed to send cluster message to nodeId: " + toNodeId, e); |
... | @@ -147,9 +153,10 @@ public class ClusterCommunicationManager | ... | @@ -147,9 +153,10 @@ public class ClusterCommunicationManager |
147 | } | 153 | } |
148 | } | 154 | } |
149 | 155 | ||
150 | - private boolean unicastUnchecked(ClusterMessage message, NodeId toNodeId) { | 156 | + |
157 | + private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) { | ||
151 | try { | 158 | try { |
152 | - return unicast(message, toNodeId); | 159 | + return unicast(subject, payload, toNodeId); |
153 | } catch (IOException e) { | 160 | } catch (IOException e) { |
154 | return false; | 161 | return false; |
155 | } | 162 | } | ... | ... |
-
Please register or login to post a comment