Madan Jampani
Committed by Gerrit Code Review

ONOS-1621: Test app for measuring messaging layer performance

Change-Id: Idc2f2e70833bf30c05310f8f09c7daeb68149f98
1 +<?xml version="1.0" encoding="UTF-8"?>
2 +<!--
3 + ~ Copyright 2015 Open Networking Laboratory
4 + ~
5 + ~ Licensed under the Apache License, Version 2.0 (the "License");
6 + ~ you may not use this file except in compliance with the License.
7 + ~ You may obtain a copy of the License at
8 + ~
9 + ~ http://www.apache.org/licenses/LICENSE-2.0
10 + ~
11 + ~ Unless required by applicable law or agreed to in writing, software
12 + ~ distributed under the License is distributed on an "AS IS" BASIS,
13 + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 + ~ See the License for the specific language governing permissions and
15 + ~ limitations under the License.
16 + -->
17 +<project xmlns="http://maven.apache.org/POM/4.0.0"
18 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
20 + <modelVersion>4.0.0</modelVersion>
21 +
22 + <parent>
23 + <groupId>org.onosproject</groupId>
24 + <artifactId>onos-apps-test</artifactId>
25 + <version>1.2.0-SNAPSHOT</version>
26 + <relativePath>../pom.xml</relativePath>
27 + </parent>
28 +
29 + <artifactId>onos-app-messaging-perf</artifactId>
30 + <packaging>bundle</packaging>
31 +
32 + <description>Messaging performance test application</description>
33 +
34 + <properties>
35 + <onos.app.name>org.onosproject.messagingperf</onos.app.name>
36 + </properties>
37 +
38 + <dependencies>
39 + <dependency>
40 + <groupId>org.onosproject</groupId>
41 + <artifactId>onos-api</artifactId>
42 + <version>${project.version}</version>
43 + </dependency>
44 + <dependency>
45 + <groupId>org.onosproject</groupId>
46 + <artifactId>onos-core-serializers</artifactId>
47 + <version>${project.version}</version>
48 + </dependency>
49 + <dependency>
50 + <groupId>org.osgi</groupId>
51 + <artifactId>org.osgi.compendium</artifactId>
52 + </dependency>
53 + <!-- Required for javadoc generation -->
54 + <dependency>
55 + <groupId>org.osgi</groupId>
56 + <artifactId>org.osgi.core</artifactId>
57 + </dependency>
58 + </dependencies>
59 +
60 +</project>
1 +package org.onosproject.messagingperf;
2 +
3 +import static com.google.common.base.Strings.isNullOrEmpty;
4 +import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
5 +import static org.onlab.util.Tools.get;
6 +import static org.onlab.util.Tools.groupedThreads;
7 +import static org.slf4j.LoggerFactory.getLogger;
8 +
9 +import java.util.Dictionary;
10 +import java.util.List;
11 +import java.util.Objects;
12 +import java.util.Set;
13 +import java.util.concurrent.CompletableFuture;
14 +import java.util.concurrent.Executor;
15 +import java.util.concurrent.ExecutorService;
16 +import java.util.concurrent.Executors;
17 +import java.util.concurrent.ScheduledExecutorService;
18 +import java.util.concurrent.TimeUnit;
19 +import java.util.concurrent.atomic.AtomicInteger;
20 +import java.util.function.Function;
21 +import java.util.stream.IntStream;
22 +
23 +import org.apache.felix.scr.annotations.Activate;
24 +import org.apache.felix.scr.annotations.Component;
25 +import org.apache.felix.scr.annotations.Deactivate;
26 +import org.apache.felix.scr.annotations.Modified;
27 +import org.apache.felix.scr.annotations.Property;
28 +import org.apache.felix.scr.annotations.Reference;
29 +import org.apache.felix.scr.annotations.ReferenceCardinality;
30 +import org.apache.felix.scr.annotations.Service;
31 +import org.onlab.util.BoundedThreadPool;
32 +import org.onlab.util.KryoNamespace;
33 +import org.onosproject.cfg.ComponentConfigService;
34 +import org.onosproject.cluster.ClusterService;
35 +import org.onosproject.cluster.NodeId;
36 +import org.onosproject.core.CoreService;
37 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38 +import org.onosproject.store.cluster.messaging.MessageSubject;
39 +import org.onosproject.store.serializers.KryoNamespaces;
40 +import org.onosproject.store.serializers.KryoSerializer;
41 +import org.osgi.service.component.ComponentContext;
42 +import org.slf4j.Logger;
43 +
44 +import com.google.common.collect.ImmutableList;
45 +import com.google.common.collect.ImmutableSet;
46 +import com.google.common.collect.Lists;
47 +import com.google.common.collect.Sets;
48 +import com.google.common.util.concurrent.MoreExecutors;
49 +
50 +/**
51 + * Application for measuring cluster messaging performance.
52 + */
53 +@Component(immediate = true, enabled = true)
54 +@Service(value = MessagingPerfApp.class)
55 +public class MessagingPerfApp {
56 + private final Logger log = getLogger(getClass());
57 +
58 + @Reference(cardinality = MANDATORY_UNARY)
59 + protected ClusterService clusterService;
60 +
61 + @Reference(cardinality = MANDATORY_UNARY)
62 + protected ClusterCommunicationService communicationService;
63 +
64 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 + protected CoreService coreService;
66 +
67 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 + protected ComponentConfigService configService;
69 +
70 + private static final MessageSubject TEST_UNICAST_MESSAGE_TOPIC =
71 + new MessageSubject("net-perf-unicast-message");
72 +
73 + private static final MessageSubject TEST_REQUEST_REPLY_TOPIC =
74 + new MessageSubject("net-perf-rr-message");
75 +
76 + private static final int DEFAULT_SENDER_THREAD_POOL_SIZE = 2;
77 + private static final int DEFAULT_RECEIVER_THREAD_POOL_SIZE = 2;
78 +
79 + @Property(name = "totalSenderThreads", intValue = DEFAULT_SENDER_THREAD_POOL_SIZE,
80 + label = "Number of sender threads")
81 + protected int totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
82 +
83 + @Property(name = "totalReceiverThreads", intValue = DEFAULT_RECEIVER_THREAD_POOL_SIZE,
84 + label = "Number of receiver threads")
85 + protected int totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
86 +
87 + @Property(name = "serializationOn", boolValue = true,
88 + label = "Turn serialization on/off")
89 + private boolean serializationOn = true;
90 +
91 + @Property(name = "receiveOnIOLoopThread", boolValue = false,
92 + label = "Set this to true to handle message on IO thread")
93 + private boolean receiveOnIOLoopThread = false;
94 +
95 + protected int reportIntervalSeconds = 1;
96 +
97 + private Executor messageReceivingExecutor;
98 +
99 + private ExecutorService messageSendingExecutor =
100 + BoundedThreadPool.newFixedThreadPool(totalSenderThreads,
101 + groupedThreads("onos/messaging-perf-test", "sender-%d"));
102 +
103 + private final ScheduledExecutorService reporter =
104 + Executors.newSingleThreadScheduledExecutor(
105 + groupedThreads("onos/net-perf-test", "reporter"));
106 +
107 + private AtomicInteger received = new AtomicInteger(0);
108 + private AtomicInteger sent = new AtomicInteger(0);
109 + private AtomicInteger attempted = new AtomicInteger(0);
110 + private AtomicInteger completed = new AtomicInteger(0);
111 +
112 + protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
113 + @Override
114 + protected void setupKryoPool() {
115 + serializerPool = KryoNamespace.newBuilder()
116 + .register(KryoNamespaces.BASIC)
117 + .register(KryoNamespaces.MISC)
118 + .register(byte[].class)
119 + .register(Data.class)
120 + .build();
121 + }
122 + };
123 +
124 + private final Data data = new Data().withStringField("test")
125 + .withListField(Lists.newArrayList("1", "2", "3"))
126 + .withSetField(Sets.newHashSet("1", "2", "3"));
127 + private final byte[] dataBytes = SERIALIZER.encode(new Data().withStringField("test")
128 + .withListField(Lists.newArrayList("1", "2", "3"))
129 + .withSetField(Sets.newHashSet("1", "2", "3")));
130 +
131 + private Function<Data, byte[]> encoder;
132 + private Function<byte[], Data> decoder;
133 +
134 + @Activate
135 + public void activate(ComponentContext context) {
136 + configService.registerProperties(getClass());
137 + setupCodecs();
138 + messageReceivingExecutor = receiveOnIOLoopThread
139 + ? MoreExecutors.directExecutor()
140 + : Executors.newFixedThreadPool(
141 + totalReceiverThreads,
142 + groupedThreads("onos/net-perf-test", "receiver-%d"));
143 + registerMessageHandlers();
144 + startTest();
145 + reporter.scheduleWithFixedDelay(this::reportPerformance,
146 + reportIntervalSeconds,
147 + reportIntervalSeconds,
148 + TimeUnit.SECONDS);
149 + logConfig("Started");
150 + }
151 +
152 + @Deactivate
153 + public void deactivate(ComponentContext context) {
154 + configService.unregisterProperties(getClass(), false);
155 + stopTest();
156 + reporter.shutdown();
157 + unregisterMessageHandlers();
158 + log.info("Stopped.");
159 + }
160 +
161 + @Modified
162 + public void modified(ComponentContext context) {
163 + if (context == null) {
164 + totalSenderThreads = DEFAULT_SENDER_THREAD_POOL_SIZE;
165 + totalReceiverThreads = DEFAULT_RECEIVER_THREAD_POOL_SIZE;
166 + serializationOn = true;
167 + receiveOnIOLoopThread = false;
168 + return;
169 + }
170 +
171 + Dictionary properties = context.getProperties();
172 +
173 + int newTotalSenderThreads = totalSenderThreads;
174 + int newTotalReceiverThreads = totalReceiverThreads;
175 + boolean newSerializationOn = serializationOn;
176 + boolean newReceiveOnIOLoopThread = receiveOnIOLoopThread;
177 + try {
178 + String s = get(properties, "totalSenderThreads");
179 + newTotalSenderThreads = isNullOrEmpty(s)
180 + ? totalSenderThreads : Integer.parseInt(s.trim());
181 +
182 + s = get(properties, "totalReceiverThreads");
183 + newTotalReceiverThreads = isNullOrEmpty(s)
184 + ? totalReceiverThreads : Integer.parseInt(s.trim());
185 +
186 + s = get(properties, "serializationOn");
187 + newSerializationOn = isNullOrEmpty(s)
188 + ? serializationOn : Boolean.parseBoolean(s.trim());
189 +
190 + s = get(properties, "receiveOnIOLoopThread");
191 + newReceiveOnIOLoopThread = isNullOrEmpty(s)
192 + ? receiveOnIOLoopThread : Boolean.parseBoolean(s.trim());
193 +
194 + } catch (NumberFormatException | ClassCastException e) {
195 + return;
196 + }
197 +
198 + boolean modified = newTotalSenderThreads != totalSenderThreads ||
199 + newTotalReceiverThreads != totalReceiverThreads ||
200 + newSerializationOn != serializationOn ||
201 + newReceiveOnIOLoopThread != receiveOnIOLoopThread;
202 +
203 + // If nothing has changed, simply return.
204 + if (!modified) {
205 + return;
206 + }
207 +
208 + totalSenderThreads = newTotalSenderThreads;
209 + totalReceiverThreads = newTotalReceiverThreads;
210 + serializationOn = newSerializationOn;
211 + if (!receiveOnIOLoopThread && newReceiveOnIOLoopThread != receiveOnIOLoopThread) {
212 + ((ExecutorService) messageReceivingExecutor).shutdown();
213 + }
214 + receiveOnIOLoopThread = newReceiveOnIOLoopThread;
215 +
216 + // restart test.
217 +
218 + stopTest();
219 + unregisterMessageHandlers();
220 + setupCodecs();
221 + messageSendingExecutor =
222 + BoundedThreadPool.newFixedThreadPool(
223 + totalSenderThreads,
224 + groupedThreads("onos/net-perf-test", "sender-%d"));
225 + messageReceivingExecutor = receiveOnIOLoopThread
226 + ? MoreExecutors.directExecutor()
227 + : Executors.newFixedThreadPool(
228 + totalReceiverThreads,
229 + groupedThreads("onos/net-perf-test", "receiver-%d"));
230 +
231 + registerMessageHandlers();
232 + startTest();
233 +
234 + logConfig("Reconfigured");
235 + }
236 +
237 +
238 + private void logConfig(String prefix) {
239 + log.info("{} with senderThreadPoolSize = {}; receivingThreadPoolSize = {}"
240 + + " serializationOn = {}, receiveOnIOLoopThread = {}",
241 + prefix,
242 + totalSenderThreads,
243 + totalReceiverThreads,
244 + serializationOn,
245 + receiveOnIOLoopThread);
246 + }
247 +
248 + private void setupCodecs() {
249 + encoder = serializationOn ? SERIALIZER::encode : d -> dataBytes;
250 + decoder = serializationOn ? SERIALIZER::decode : b -> data;
251 + }
252 +
253 + private void registerMessageHandlers() {
254 + communicationService.<Data>addSubscriber(
255 + TEST_UNICAST_MESSAGE_TOPIC,
256 + decoder,
257 + d -> { received.incrementAndGet(); },
258 + messageReceivingExecutor);
259 +
260 + communicationService.<Data, Data>addSubscriber(
261 + TEST_REQUEST_REPLY_TOPIC,
262 + decoder,
263 + Function.identity(),
264 + encoder,
265 + messageReceivingExecutor);
266 + }
267 +
268 + private void unregisterMessageHandlers() {
269 + communicationService.removeSubscriber(TEST_UNICAST_MESSAGE_TOPIC);
270 + communicationService.removeSubscriber(TEST_REQUEST_REPLY_TOPIC);
271 + }
272 +
273 + private void startTest() {
274 + IntStream.range(0, totalSenderThreads).forEach(i -> requestReply());
275 + }
276 +
277 + private void stopTest() {
278 + messageSendingExecutor.shutdown();
279 + }
280 +
281 + private void requestReply() {
282 + try {
283 + attempted.incrementAndGet();
284 + CompletableFuture<Data> response =
285 + communicationService.<Data, Data>sendAndReceive(
286 + data,
287 + TEST_REQUEST_REPLY_TOPIC,
288 + encoder,
289 + decoder,
290 + randomPeer());
291 + response.whenComplete((result, error) -> {
292 + if (Objects.equals(data, result)) {
293 + completed.incrementAndGet();
294 + }
295 + messageSendingExecutor.submit(this::requestReply);
296 + });
297 + } catch (Exception e) {
298 + e.printStackTrace();
299 + }
300 + }
301 +
302 + private void unicast() {
303 + try {
304 + sent.incrementAndGet();
305 + communicationService.<Data>unicast(
306 + data,
307 + TEST_UNICAST_MESSAGE_TOPIC,
308 + encoder,
309 + randomPeer());
310 + } catch (Exception e) {
311 + e.printStackTrace();
312 + }
313 + messageSendingExecutor.submit(this::unicast);
314 + }
315 +
316 + private void broadcast() {
317 + try {
318 + sent.incrementAndGet();
319 + communicationService.<Data>broadcast(
320 + data,
321 + TEST_UNICAST_MESSAGE_TOPIC,
322 + encoder);
323 + } catch (Exception e) {
324 + e.printStackTrace();
325 + }
326 + messageSendingExecutor.submit(this::broadcast);
327 + }
328 +
329 + private NodeId randomPeer() {
330 + return clusterService.getNodes()
331 + .stream()
332 + .filter(node -> clusterService.getLocalNode().equals(node))
333 + .findAny()
334 + .get()
335 + .id();
336 + }
337 +
338 + private void reportPerformance() {
339 + log.info("Attempted: {} Completed: {}", attempted.getAndSet(0), completed.getAndSet(0));
340 + }
341 +
342 + private static class Data {
343 + private String stringField;
344 + private List<String> listField;
345 + private Set<String> setField;
346 +
347 + public Data withStringField(String value) {
348 + stringField = value;
349 + return this;
350 + }
351 +
352 + public Data withListField(List<String> value) {
353 + listField = ImmutableList.copyOf(value);
354 + return this;
355 + }
356 +
357 + public Data withSetField(Set<String> value) {
358 + setField = ImmutableSet.copyOf(value);
359 + return this;
360 + }
361 +
362 + @Override
363 + public int hashCode() {
364 + return Objects.hash(stringField, listField, setField);
365 + }
366 +
367 + @Override
368 + public boolean equals(Object other) {
369 + if (other instanceof Data) {
370 + Data that = (Data) other;
371 + return Objects.equals(this.stringField, that.stringField) &&
372 + Objects.equals(this.listField, that.listField) &&
373 + Objects.equals(this.setField, that.setField);
374 + }
375 + return false;
376 + }
377 + }
378 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +/**
18 + * Performance test application for the cluster messaging subsystem.
19 + */
20 +package org.onosproject.messagingperf;
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
34 <modules> 34 <modules>
35 <module>election</module> 35 <module>election</module>
36 <module>intent-perf</module> 36 <module>intent-perf</module>
37 + <module>messaging-perf</module>
37 <module>demo</module> 38 <module>demo</module>
38 </modules> 39 </modules>
39 40
......