Madan Jampani
Committed by Gerrit Code Review

LoadTest update to use multiple counters in parallel for load generation

Change-Id: I7d7f13024372c8c998dc427cf30fdc2e2c68c5f9
...@@ -20,12 +20,17 @@ import static org.onlab.util.Tools.get; ...@@ -20,12 +20,17 @@ import static org.onlab.util.Tools.get;
20 import static org.slf4j.LoggerFactory.getLogger; 20 import static org.slf4j.LoggerFactory.getLogger;
21 21
22 import java.util.Dictionary; 22 import java.util.Dictionary;
23 -import java.util.concurrent.ExecutorService; 23 +import java.util.List;
24 import java.util.concurrent.Executors; 24 import java.util.concurrent.Executors;
25 +import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.Semaphore; 26 import java.util.concurrent.Semaphore;
27 +import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean; 28 import java.util.concurrent.atomic.AtomicBoolean;
29 +import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong; 30 import java.util.concurrent.atomic.AtomicLong;
31 +import java.util.stream.Collectors;
28 32
33 +import org.apache.commons.lang.math.RandomUtils;
29 import org.apache.felix.scr.annotations.Activate; 34 import org.apache.felix.scr.annotations.Activate;
30 import org.apache.felix.scr.annotations.Component; 35 import org.apache.felix.scr.annotations.Component;
31 import org.apache.felix.scr.annotations.Deactivate; 36 import org.apache.felix.scr.annotations.Deactivate;
...@@ -34,6 +39,7 @@ import org.apache.felix.scr.annotations.Property; ...@@ -34,6 +39,7 @@ import org.apache.felix.scr.annotations.Property;
34 import org.apache.felix.scr.annotations.Reference; 39 import org.apache.felix.scr.annotations.Reference;
35 import org.apache.felix.scr.annotations.ReferenceCardinality; 40 import org.apache.felix.scr.annotations.ReferenceCardinality;
36 import org.apache.felix.scr.annotations.Service; 41 import org.apache.felix.scr.annotations.Service;
42 +import org.onlab.util.Tools;
37 import org.onosproject.cfg.ComponentConfigService; 43 import org.onosproject.cfg.ComponentConfigService;
38 import org.onosproject.core.ApplicationId; 44 import org.onosproject.core.ApplicationId;
39 import org.onosproject.core.CoreService; 45 import org.onosproject.core.CoreService;
...@@ -42,6 +48,7 @@ import org.onosproject.store.service.StorageService; ...@@ -42,6 +48,7 @@ import org.onosproject.store.service.StorageService;
42 import org.osgi.service.component.ComponentContext; 48 import org.osgi.service.component.ComponentContext;
43 import org.slf4j.Logger; 49 import org.slf4j.Logger;
44 50
51 +import com.google.common.collect.Lists;
45 import com.google.common.util.concurrent.RateLimiter; 52 import com.google.common.util.concurrent.RateLimiter;
46 53
47 /** 54 /**
...@@ -69,24 +76,46 @@ public class DistributedConsensusLoadTest { ...@@ -69,24 +76,46 @@ public class DistributedConsensusLoadTest {
69 protected CoreService coreService; 76 protected CoreService coreService;
70 77
71 private static final int DEFAULT_RATE = 100; 78 private static final int DEFAULT_RATE = 100;
79 + private static final int TOTAL_COUNTERS = 50;
72 80
73 @Property(name = "rate", intValue = DEFAULT_RATE, 81 @Property(name = "rate", intValue = DEFAULT_RATE,
74 label = "Total number of increments per second to the atomic counter") 82 label = "Total number of increments per second to the atomic counter")
75 protected int rate = 0; 83 protected int rate = 0;
76 84
77 - private AtomicLong lastValue = new AtomicLong(0); 85 + private final AtomicLong previousReportTime = new AtomicLong(0);
78 - private AtomicLong lastLoggedTime = new AtomicLong(0); 86 + private final AtomicLong previousCount = new AtomicLong(0);
79 - private AsyncAtomicCounter counter; 87 + private final AtomicInteger increments = new AtomicInteger(0);
80 - private ExecutorService testExecutor = Executors.newSingleThreadExecutor(); 88 + private final List<AsyncAtomicCounter> counters = Lists.newArrayList();
89 + private final ScheduledExecutorService runner = Executors.newSingleThreadScheduledExecutor();
90 + private final ScheduledExecutorService reporter = Executors.newSingleThreadScheduledExecutor();
81 91
82 @Activate 92 @Activate
83 public void activate(ComponentContext context) { 93 public void activate(ComponentContext context) {
84 configService.registerProperties(getClass()); 94 configService.registerProperties(getClass());
85 appId = coreService.registerApplication("org.onosproject.loadtest"); 95 appId = coreService.registerApplication("org.onosproject.loadtest");
86 log.info("Started with {}", appId); 96 log.info("Started with {}", appId);
87 - counter = storageService.atomicCounterBuilder() 97 + for (int i = 0; i < TOTAL_COUNTERS; ++i) {
88 - .withName("onos-app-loadtest-counter") 98 + AsyncAtomicCounter counter = storageService.atomicCounterBuilder()
99 + .withName(String.format("onos-app-loadtest-counter-%d", i))
89 .build(); 100 .build();
101 + counters.add(counter);
102 + }
103 + reporter.scheduleWithFixedDelay(() -> {
104 + Tools.allOf(counters.stream()
105 + .map(AsyncAtomicCounter::get)
106 + .collect(Collectors.toList()))
107 + .whenComplete((r, e) -> {
108 + if (e == null) {
109 + long newCount = r.stream().reduce(Long::sum).get();
110 + long currentTime = System.currentTimeMillis();
111 + long delta = currentTime - previousReportTime.getAndSet(currentTime);
112 + long rate = (newCount - previousCount.getAndSet(newCount)) * 1000 / delta;
113 + log.info("{} updates per second", rate);
114 + } else {
115 + log.warn(e.getMessage());
116 + }
117 + });
118 + }, 5, 5, TimeUnit.SECONDS);
90 modified(null); 119 modified(null);
91 } 120 }
92 121
...@@ -97,16 +126,10 @@ public class DistributedConsensusLoadTest { ...@@ -97,16 +126,10 @@ public class DistributedConsensusLoadTest {
97 while (!stopped.get()) { 126 while (!stopped.get()) {
98 limiter.acquire(); 127 limiter.acquire();
99 s.acquireUninterruptibly(); 128 s.acquireUninterruptibly();
100 - counter.incrementAndGet().whenComplete((r, e) -> { 129 + counters.get(RandomUtils.nextInt(TOTAL_COUNTERS)).incrementAndGet().whenComplete((r, e) -> {
101 s.release(); 130 s.release();
102 - long delta = System.currentTimeMillis() - lastLoggedTime.get();
103 if (e == null) { 131 if (e == null) {
104 - if (delta > 1000) { 132 + increments.incrementAndGet();
105 - long tps = (long) ((r - lastValue.get()) * 1000.0) / delta;
106 - lastValue.set(r);
107 - lastLoggedTime.set(System.currentTimeMillis());
108 - log.info("Rate: {}", tps);
109 - }
110 } 133 }
111 }); 134 });
112 } 135 }
...@@ -120,7 +143,8 @@ public class DistributedConsensusLoadTest { ...@@ -120,7 +143,8 @@ public class DistributedConsensusLoadTest {
120 public void deactivate(ComponentContext context) { 143 public void deactivate(ComponentContext context) {
121 configService.unregisterProperties(getClass(), false); 144 configService.unregisterProperties(getClass(), false);
122 stopTest(); 145 stopTest();
123 - testExecutor.shutdown(); 146 + runner.shutdown();
147 + reporter.shutdown();
124 log.info("Stopped"); 148 log.info("Stopped");
125 } 149 }
126 150
...@@ -138,10 +162,10 @@ public class DistributedConsensusLoadTest { ...@@ -138,10 +162,10 @@ public class DistributedConsensusLoadTest {
138 } 162 }
139 } 163 }
140 if (newRate != rate) { 164 if (newRate != rate) {
141 - log.info("Rate changed to {}", newRate); 165 + log.info("Per node rate changed to {}", newRate);
142 rate = newRate; 166 rate = newRate;
143 stopTest(); 167 stopTest();
144 - testExecutor.execute(this::startTest); 168 + runner.execute(this::startTest);
145 } 169 }
146 } 170 }
147 } 171 }
......