Shravan Ambati
Committed by Gerrit Code Review

Intial Stub code for the Cluster features in Kafka Application.(patchset #4)

Stub Implementation of KafkaStorage Service.

Change-Id: Iad929a8f1b13149583c9526b41e8f1a3a829fa1e
1 +/**
2 + * Copyright 2016 Open Networking Laboratory
3 + * Licensed under the Apache License, Version 2.0 (the "License");
4 + * you may not use this file except in compliance with the License.
5 + * You may obtain a copy of the License at
6 +
7 + * http://www.apache.org/licenses/LICENSE-2.0
8 +
9 + * Unless required by applicable law or agreed to in writing, software
10 + * distributed under the License is distributed on an "AS IS" BASIS,
11 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 + * See the License for the specific language governing permissions and
13 + * limitations under the License.
14 + */
15 +package org.onosproject.kafkaintegration.api;
16 +
17 +import org.onosproject.kafkaintegration.api.dto.OnosEvent;
18 +
19 +/**
20 + * APIs to insert and delete into a local store. This store is used to keep
21 + * track of events that are being published.
22 + */
23 +public interface KafkaEventStorageService {
24 +
25 + /**
26 + * Inserts the Generated event into the local cache.
27 + *
28 + * @param e the ONOS Event
29 + * @return true if the insertion was successful
30 + */
31 + boolean insertCacheEntry(OnosEvent e);
32 +
33 + /**
34 + * Updates the counter with the most recently published event's sequence
35 + * number.
36 + *
37 + * @param sequenceNumber the updated value of sequence number.
38 + */
39 + void updateLastPublishedEntry(Long sequenceNumber);
40 +}
1 +/**
2 + * Copyright 2016 Open Networking Laboratory
3 + * Licensed under the Apache License, Version 2.0 (the "License");
4 + * you may not use this file except in compliance with the License.
5 + * You may obtain a copy of the License at
6 +
7 + * http://www.apache.org/licenses/LICENSE-2.0
8 +
9 + * Unless required by applicable law or agreed to in writing, software
10 + * distributed under the License is distributed on an "AS IS" BASIS,
11 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 + * See the License for the specific language governing permissions and
13 + * limitations under the License.
14 + */
15 +package org.onosproject.kafkaintegration.impl;
16 +
17 +import java.util.TreeMap;
18 +import java.util.concurrent.Executors;
19 +import java.util.concurrent.ScheduledExecutorService;
20 +import java.util.concurrent.TimeUnit;
21 +
22 +import org.apache.felix.scr.annotations.Activate;
23 +import org.apache.felix.scr.annotations.Component;
24 +import org.apache.felix.scr.annotations.Deactivate;
25 +import org.apache.felix.scr.annotations.Reference;
26 +import org.apache.felix.scr.annotations.ReferenceCardinality;
27 +import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
28 +import org.onosproject.kafkaintegration.api.dto.OnosEvent;
29 +import org.onosproject.store.service.AtomicValue;
30 +import org.onosproject.store.service.StorageService;
31 +import org.slf4j.Logger;
32 +import org.slf4j.LoggerFactory;
33 +
34 +@Component(immediate = true)
35 +public class KafkaStorageManager implements KafkaEventStorageService {
36 +
37 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
38 + protected StorageService storageService;
39 +
40 + private TreeMap<Long, OnosEvent> kafkaEventStore;
41 +
42 + private AtomicValue<Long> lastPublishedEvent;
43 +
44 + private final Logger log = LoggerFactory.getLogger(getClass());
45 +
46 + private ScheduledExecutorService gcExService;
47 +
48 + private InternalGarbageCollector gcTask;
49 +
50 + // Thread scheduler parameters.
51 + private final long delay = 0;
52 + private final long period = 1;
53 +
54 + @Activate
55 + protected void activate() {
56 + kafkaEventStore = new TreeMap<Long, OnosEvent>();
57 + lastPublishedEvent = storageService.<Long>atomicValueBuilder()
58 + .withName("onos-app-kafka-published-seqNumber").build()
59 + .asAtomicValue();
60 +
61 + startGC();
62 +
63 + log.info("Started");
64 + }
65 +
66 + private void startGC() {
67 + log.info("Starting Garbage Collection Service");
68 + gcExService = Executors.newSingleThreadScheduledExecutor();
69 + gcTask = new InternalGarbageCollector();
70 + gcExService.scheduleAtFixedRate(gcTask, delay, period,
71 + TimeUnit.SECONDS);
72 + }
73 +
74 + @Deactivate
75 + protected void deactivate() {
76 + stopGC();
77 + log.info("Stopped");
78 + }
79 +
80 + private void stopGC() {
81 + log.info("Stopping Garbage Collection Service");
82 + gcExService.shutdown();
83 + }
84 +
85 + @Override
86 + public boolean insertCacheEntry(OnosEvent e) {
87 + // TODO: Fill in the code once the event carries timestamp info.
88 + return true;
89 + }
90 +
91 + @Override
92 + public void updateLastPublishedEntry(Long sequenceNumber) {
93 + this.lastPublishedEvent.set(sequenceNumber);
94 + }
95 +
96 + /**
97 + * Removes events from the Kafka Event Store which have been published.
98 + *
99 + */
100 + private class InternalGarbageCollector implements Runnable {
101 +
102 + @Override
103 + public void run() {
104 + kafkaEventStore.headMap(lastPublishedEvent.get(), true).clear();
105 + }
106 + }
107 +
108 +}