Madan Jampani
Committed by Gerrit Code Review

Implementation of Hybrid Logical Clock Service.

Change-Id: I52e231433d044f9e6156db7e28bde9fd199118e8
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.core;
import org.onosproject.store.service.WallClockTimestamp;
/**
* The <a href="http://www.cse.buffalo.edu/tech-reports/2014-04.pdf">hybrid logical time</a> keeper service.
*/
public interface HybridLogicalClockService {
/**
* Returns the current hybrid logical time.
* @return current hybrid logical time
*/
HybridLogicalTime timeNow();
/**
* Records a (receive) event and accordingly makes adjustments to the hybrid logical time.
* @param time received event time
*/
void recordEventTime(HybridLogicalTime time);
/**
* Returns the current time derived from the hybrid logical time.
* @return current system time
*/
default long now() {
return timeNow().time();
}
/**
* Returns the current time as a {@code WallClockTimestamp}.
* @return wall clock timestamp
*/
default WallClockTimestamp wallClockTimestamp() {
return new WallClockTimestamp(now());
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.core;
import com.google.common.base.MoreObjects;
/**
* Time provided by a Hybrid Logical Clock described in
* this <a href="http://www.cse.buffalo.edu/tech-reports/2014-04.pdf">paper</a>.
*/
public class HybridLogicalTime {
private final long logicalTime;
private final long logicalCounter;
public HybridLogicalTime(long logicalTime, long logicalCounter) {
this.logicalTime = logicalTime;
this.logicalCounter = logicalCounter;
}
/**
* Returns the logical time component of a HLT.
* @return logical time
*/
public long logicalTime() {
return logicalTime;
}
/**
* Returns the logical counter component of a HLT.
* @return logical counter
*/
public long logicalCounter() {
return logicalCounter;
}
/**
* Returns the real system time represented by this HLT.
* @return real system time
*/
public long time() {
return (logicalTime >> 16 << 16) | (logicalCounter << 48 >> 48);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("logicalTime", logicalTime)
.add("logicalCounter", logicalCounter)
.toString();
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.core.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.function.Supplier;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.HybridLogicalClockService;
import org.onosproject.core.HybridLogicalTime;
import org.slf4j.Logger;
/**
* Implementation of {@link HybridLogicalClockService}.
* <p>
* Implementation is based on HLT <a href="http://www.cse.buffalo.edu/tech-reports/2014-04.pdf">paper</a>.
*/
@Component(immediate = true)
@Service
public class HybridLogicalClockManager implements HybridLogicalClockService {
private final Logger log = getLogger(getClass());
protected Supplier<Long> physicalTimeSource = System::currentTimeMillis;
private long logicalTime = 0;
private long logicalCounter = 0;
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public synchronized HybridLogicalTime timeNow() {
final long oldLogicalTime = logicalTime;
logicalTime = Math.max(oldLogicalTime, physicalTimeSource.get());
if (logicalTime == oldLogicalTime) {
logicalCounter++;
} else {
logicalCounter = 0;
}
return new HybridLogicalTime(logicalTime, logicalCounter);
}
@Override
public synchronized void recordEventTime(HybridLogicalTime eTime) {
final long oldLogicalTime = logicalTime;
logicalTime = Math.max(oldLogicalTime, Math.max(eTime.logicalTime(), physicalTimeSource.get()));
if (logicalTime == oldLogicalTime && oldLogicalTime == eTime.logicalTime()) {
logicalCounter = Math.max(logicalCounter, eTime.logicalCounter()) + 1;
} else if (logicalTime == oldLogicalTime) {
logicalCounter++;
} else if (logicalTime == eTime.logicalTime()) {
logicalCounter = eTime.logicalCounter() + 1;
} else {
logicalCounter = 0;
}
}
protected long logicalTime() {
return logicalTime;
}
protected long logicalCounter() {
return logicalCounter;
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.core.impl;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Test;
import org.onosproject.core.HybridLogicalTime;
/**
* Unit tests for {@link HybridLogicalClockManager}.
*/
public class HybridLogicalClockManagerTest {
@Test
public void testLocalEvents() {
AtomicLong ticker = new AtomicLong();
Supplier<Long> ptSource = ticker::get;
HybridLogicalClockManager clockManager = new HybridLogicalClockManager();
clockManager.physicalTimeSource = ptSource;
HybridLogicalTime time1 = clockManager.timeNow();
Assert.assertEquals(0, time1.logicalTime());
Assert.assertEquals(1, time1.logicalCounter());
HybridLogicalTime time2 = clockManager.timeNow();
Assert.assertEquals(0, time2.logicalTime());
Assert.assertEquals(2, time2.logicalCounter());
ticker.incrementAndGet();
HybridLogicalTime time3 = clockManager.timeNow();
Assert.assertEquals(1, time3.logicalTime());
Assert.assertEquals(0, time3.logicalCounter());
HybridLogicalTime time4 = clockManager.timeNow();
Assert.assertEquals(1, time4.logicalTime());
Assert.assertEquals(1, time4.logicalCounter());
}
@Test
public void testReceiveEvents() {
AtomicLong ticker = new AtomicLong(1);
Supplier<Long> ptSource = ticker::get;
HybridLogicalClockManager clockManager = new HybridLogicalClockManager();
clockManager.physicalTimeSource = ptSource;
HybridLogicalTime time1 = clockManager.timeNow();
Assert.assertEquals(1, time1.logicalTime());
Assert.assertEquals(0, time1.logicalCounter());
HybridLogicalTime eventTime1 = new HybridLogicalTime(1, 0);
clockManager.recordEventTime(eventTime1);
Assert.assertEquals(1, clockManager.logicalTime());
Assert.assertEquals(1, clockManager.logicalCounter());
HybridLogicalTime eventTime2 = new HybridLogicalTime(2, 0);
clockManager.recordEventTime(eventTime2);
Assert.assertEquals(2, clockManager.logicalTime());
Assert.assertEquals(1, clockManager.logicalCounter());
HybridLogicalTime eventTime3 = new HybridLogicalTime(2, 2);
clockManager.recordEventTime(eventTime3);
Assert.assertEquals(2, clockManager.logicalTime());
Assert.assertEquals(3, clockManager.logicalCounter());
HybridLogicalTime eventTime4 = new HybridLogicalTime(2, 1);
clockManager.recordEventTime(eventTime4);
Assert.assertEquals(2, clockManager.logicalTime());
Assert.assertEquals(4, clockManager.logicalCounter());
ticker.set(4);
HybridLogicalTime eventTime5 = new HybridLogicalTime(3, 0);
clockManager.recordEventTime(eventTime5);
Assert.assertEquals(4, clockManager.logicalTime());
Assert.assertEquals(0, clockManager.logicalCounter());
}
}
......@@ -20,6 +20,8 @@ package org.onosproject.store.cluster.messaging.impl;
*/
public enum DecoderState {
READ_MESSAGE_PREAMBLE,
READ_LOGICAL_TIME,
READ_LOGICAL_COUNTER,
READ_MESSAGE_ID,
READ_SENDER_IP_VERSION,
READ_SENDER_IP,
......
......@@ -16,7 +16,9 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.MoreObjects;
import org.onlab.util.ByteArraySizeHashPrinter;
import org.onosproject.core.HybridLogicalTime;
import org.onosproject.store.cluster.messaging.Endpoint;
/**
......@@ -55,18 +57,31 @@ public final class InternalMessage {
}
private final int preamble;
private final HybridLogicalTime time;
private final long id;
private final Endpoint sender;
private final String type;
private final byte[] payload;
private final Status status;
public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload) {
this(preamble, id, sender, type, payload, Status.OK);
public InternalMessage(int preamble,
HybridLogicalTime time,
long id,
Endpoint sender,
String type,
byte[] payload) {
this(preamble, time, id, sender, type, payload, Status.OK);
}
public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload, Status status) {
public InternalMessage(int preamble,
HybridLogicalTime time,
long id,
Endpoint sender,
String type,
byte[] payload,
Status status) {
this.preamble = preamble;
this.time = time;
this.id = id;
this.sender = sender;
this.type = type;
......@@ -74,6 +89,10 @@ public final class InternalMessage {
this.status = status;
}
public HybridLogicalTime time() {
return time;
}
public int preamble() {
return preamble;
}
......@@ -101,6 +120,7 @@ public final class InternalMessage {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("time", time)
.add("id", id)
.add("type", type)
.add("sender", sender)
......
......@@ -23,6 +23,7 @@ import io.netty.handler.codec.ReplayingDecoder;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.core.HybridLogicalTime;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
......@@ -39,6 +40,8 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final Logger log = LoggerFactory.getLogger(getClass());
private long logicalTime;
private long logicalCounter;
private long messageId;
private int preamble;
private Version ipVersion;
......@@ -63,6 +66,12 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
switch (state()) {
case READ_MESSAGE_PREAMBLE:
preamble = buffer.readInt();
checkpoint(DecoderState.READ_LOGICAL_TIME);
case READ_LOGICAL_TIME:
logicalTime = buffer.readLong();
checkpoint(DecoderState.READ_LOGICAL_COUNTER);
case READ_LOGICAL_COUNTER:
logicalCounter = buffer.readLong();
checkpoint(DecoderState.READ_MESSAGE_ID);
case READ_MESSAGE_ID:
messageId = buffer.readLong();
......@@ -102,6 +111,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
payload = new byte[0];
}
InternalMessage message = new InternalMessage(preamble,
new HybridLogicalTime(logicalTime, logicalCounter),
messageId,
new Endpoint(senderIp, senderPort),
messageType,
......
......@@ -51,6 +51,10 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
out.writeInt(this.preamble);
// write time
out.writeLong(message.time().logicalTime());
out.writeLong(message.time().logicalCounter());
// write message id
out.writeLong(message.id());
......
......@@ -53,6 +53,7 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.HybridLogicalClockService;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
......@@ -99,6 +100,9 @@ public class NettyMessagingManager implements MessagingService {
private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HybridLogicalClockService clockService;
private Endpoint localEp;
private int preamble;
private final AtomicBoolean started = new AtomicBoolean(false);
......@@ -218,6 +222,7 @@ public class NettyMessagingManager implements MessagingService {
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
checkPermission(CLUSTER_WRITE);
InternalMessage message = new InternalMessage(preamble,
clockService.timeNow(),
messageIdGenerator.incrementAndGet(),
localEp,
type,
......@@ -264,7 +269,12 @@ public class NettyMessagingManager implements MessagingService {
Callback callback = new Callback(response, executor);
Long messageId = messageIdGenerator.incrementAndGet();
callbacks.put(messageId, callback);
InternalMessage message = new InternalMessage(preamble, messageId, localEp, type, payload);
InternalMessage message = new InternalMessage(preamble,
clockService.timeNow(),
messageId,
localEp,
type,
payload);
return sendAsync(ep, message).whenComplete((r, e) -> {
if (e != null) {
callbacks.invalidate(messageId);
......@@ -502,6 +512,7 @@ public class NettyMessagingManager implements MessagingService {
log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
}
clockService.recordEventTime(message.time());
String type = message.type();
if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
......@@ -538,6 +549,7 @@ public class NettyMessagingManager implements MessagingService {
private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
InternalMessage response = new InternalMessage(preamble,
clockService.timeNow(),
message.id(),
localEp,
REPLY_MESSAGE_TYPE,
......
......@@ -21,10 +21,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -34,6 +36,8 @@ import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.HybridLogicalClockService;
import org.onosproject.core.HybridLogicalTime;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.cluster.messaging.Endpoint;
......@@ -48,6 +52,18 @@ import static org.onlab.junit.TestTools.findAvailablePort;
*/
public class NettyMessagingManagerTest {
HybridLogicalClockService testClockService = new HybridLogicalClockService() {
AtomicLong counter = new AtomicLong();
@Override
public HybridLogicalTime timeNow() {
return new HybridLogicalTime(counter.incrementAndGet(), 0);
}
@Override
public void recordEventTime(HybridLogicalTime time) {
}
};
NettyMessagingManager netty1;
NettyMessagingManager netty2;
......@@ -63,11 +79,13 @@ public class NettyMessagingManagerTest {
ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
netty1 = new NettyMessagingManager();
netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
netty1.clockService = testClockService;
netty1.activate();
ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
netty2 = new NettyMessagingManager();
netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
netty2.clockService = testClockService;
netty2.activate();
}
......