DatabasePerfInstaller.java 5.58 KB
/*
 * Copyright 2015 Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.databaseperf;

import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;

/**
 * Application to measure partitioned database performance.
 */
@Component(immediate = true)
public class DatabasePerfInstaller {

    private final Logger log = getLogger(getClass());

    @Reference(cardinality = MANDATORY_UNARY)
    protected CoreService coreService;

    @Reference(cardinality = MANDATORY_UNARY)
    protected ClusterService clusterService;

    @Reference(cardinality = MANDATORY_UNARY)
    protected StorageService storageService;

    private boolean stopped;

    private ApplicationId appId;

    private static final long REPORT_PERIOD = 5000L; //ms
    private Timer reportTimer;

    private AtomicInteger successCount = new AtomicInteger(0);
    private AtomicInteger failureCount = new AtomicInteger(0);

    private ConsistentMap<String, String> cmap;

    private ControllerNode localNode;

    private long reportStartTime = System.currentTimeMillis();

    private static final int NUM_TASK_THREADS = 2;
    private ExecutorService taskExecutor;

    private static final Serializer SERIALIZER = new Serializer() {

        KryoNamespace kryo = new KryoNamespace.Builder().build();

        @Override
        public <T> byte[] encode(T object) {
            return kryo.serialize(object);
        }

        @Override
        public <T> T decode(byte[] bytes) {
            return kryo.deserialize(bytes);
        }

    };

    @Activate
    public void activate() {
        localNode = clusterService.getLocalNode();
        String nodeId = localNode.ip().toString();
        appId = coreService.registerApplication("org.onosproject.nettyperf."
                                                        + nodeId);

        cmap = storageService.createConsistentMap("onos-app-database-perf-test-map", SERIALIZER);
        taskExecutor = Executors.newFixedThreadPool(NUM_TASK_THREADS, groupedThreads("onos/database-perf", "worker"));
        log.info("Started with Application ID {}", appId.id());
        start();
    }

    @Deactivate
    public void deactivate() {
        stop();
        log.info("Stopped");
    }

    public void start() {
        long delay = System.currentTimeMillis() % REPORT_PERIOD;
        reportTimer = new Timer("onos-netty-perf-reporter");
        reportTimer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                report();
            }
        }, delay, REPORT_PERIOD);

        stopped = false;
        IntStream.range(0, NUM_TASK_THREADS).forEach(i -> {
            taskExecutor.submit(() -> {
                delay(2000); // take a breath to start
                while (!stopped) {
                    performDBOperation();
                    delay(2); // take a breather
                }
            });
        });
    }

    private void performDBOperation() {
        String key = String.format("test%d", RandomUtils.nextInt(1000));
        try {
            if (RandomUtils.nextBoolean()) {
                cmap.put(key, UUID.randomUUID().toString());
            } else {
                cmap.get(key);
            }
            successCount.incrementAndGet();
        } catch (Exception e) {
            failureCount.incrementAndGet();
        }
    }

    private void report() {
        long delta = System.currentTimeMillis() - reportStartTime;
        if (delta > 0) {
            int rate = (int) Math.round(((successCount.get() * 1000.0) / delta));
            log.info("Passed: {}, Failed: {}, Rate: {}",
                    successCount.getAndSet(0), failureCount.getAndSet(0), rate);
            reportStartTime = System.currentTimeMillis();
        }
    }

    public void stop() {
        reportTimer.cancel();
        reportTimer = null;
        stopped = true;
        try {
            taskExecutor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("Failed to stop worker.");
        }
    }
}