Shashikanth VH
Committed by Gerrit Code Review

[Emu] [ONOS-2591,ONOS-2594] Implementation of BGP channel handler to manage each…

… BGP peer connection.

Change-Id: I14e90c9437f676698f89da79e736a81035689492
......@@ -25,6 +25,21 @@ import org.onosproject.bgpio.protocol.BGPMessage;
public interface BGPController {
/**
* Returns list of bgp peers connected to this BGP controller.
*
* @return Iterable of BGPPeer elements
*/
Iterable<BGPPeer> getPeers();
/**
* Returns the actual bgp peer for the given ip address.
*
* @param bgpId the id of the bgp peer to fetch
* @return the interface to this bgp peer
*/
BGPPeer getPeer(BGPId bgpId);
/**
* Send a message to a particular bgp peer.
*
* @param bgpId the id of the peer to send message.
......@@ -41,9 +56,22 @@ public interface BGPController {
void processBGPPacket(BGPId bgpId, BGPMessage msg);
/**
* Close all connected BGP peers.
*
*/
void closeConnectedPeers();
/**
* Get the BGPConfig class to the caller.
*
* @return configuration object
*/
BGPCfg getConfig();
/**
* Get the BGP connected peers to this controller.
*
* @return the integer number
*/
int getBGPConnNumber();
}
\ No newline at end of file
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.bgp.controller;
import java.util.List;
import org.jboss.netty.channel.Channel;
import org.onosproject.bgpio.protocol.BGPMessage;
import org.onosproject.bgpio.protocol.BGPVersion;
/**
* Represents the peer side of an bgp peer.
*
*/
public interface BGPPeer {
/**
* Sets the BGP version for this bgp peer.
*
* @param bgpVersion the version to set.
*/
void setBgpPeerVersion(BGPVersion bgpVersion);
/**
* Gets the BGP version for this bgp peer.
*
* @return bgp identifier.
*/
int getBgpPeerIdentifier();
/**
* Sets the associated Netty channel for this bgp peer.
*
* @param channel the Netty channel
*/
void setChannel(Channel channel);
/**
* Gets the associated Netty channel handler for this bgp peer.
*
* @return Channel channel connected.
*/
Channel getChannel();
/**
* Sets the AS Number for this bgp peer.
*
* @param peerASNum the autonomous system number value to set.
*/
void setBgpPeerASNum(short peerASNum);
/**
* Sets the hold time for this bgp peer.
*
* @param peerHoldTime the hold timer value to set.
*/
void setBgpPeerHoldTime(short peerHoldTime);
/**
* Sets the peer identifier value.
*
* @param peerIdentifier the bgp peer identifier value.
*/
void setBgpPeerIdentifier(int peerIdentifier);
/**
* Sets whether the bgp peer is connected.
*
* @param connected whether the bgp peer is connected
*/
void setConnected(boolean connected);
/**
* Initialises the behaviour.
*
* @param bgpId id of bgp peer
* @param bgpVersion BGP version
* @param pktStats packet statistics
*/
void init(BGPId bgpId, BGPVersion bgpVersion, BGPPacketStats pktStats);
/**
* Checks whether the handshake is complete.
*
* @return true is finished, false if not.
*/
boolean isHandshakeComplete();
/**
* Writes the message to the peer.
*
* @param msg the message to write
*/
void sendMessage(BGPMessage msg);
/**
* Writes the BGPMessage list to the peer.
*
* @param msgs the messages to be written
*/
void sendMessage(List<BGPMessage> msgs);
/**
* Gets a string version of the ID for this bgp peer.
*
* @return string version of the ID
*/
String getStringId();
/**
* Gets the ipAddress of the peer.
*
* @return the peer bgpId in IPAddress format
*/
BGPId getBGPId();
/**
* Checks if the bgp peer is still connected.
*
* @return whether the bgp peer is still connected
*/
boolean isConnected();
/**
* Disconnects the bgp peer by closing the TCP connection. Results in a call to the channel handler's
* channelDisconnected method for cleanup
*/
void disconnectPeer();
/**
* Identifies the channel used to communicate with the bgp peer.
*
* @return string representation of the connection to the peer
*/
String channelId();
/**
* Gets the negotiated hold time.
*
* @return the negotiated hold time
*/
int getNegotiatedHoldTime();
/**
* Sets negotiated hold time for the peer.
*
* @param negotiatedHoldTime negotiated hold time
*/
void setNegotiatedHoldTime(short negotiatedHoldTime);
}
......@@ -61,27 +61,6 @@ public interface BGPMessage extends Writeable {
BGPMessage build() throws BGPParseException;
/**
* Returns BGP Version of BGP Message.
*
* @return BGP Version of BGP Message
*/
BGPVersion getVersion();
/**
* Returns BGP Type of BGP Message.
*
* @return BGP Type of BGP Message
*/
BGPType getType();
/**
* Returns BGP Header of BGP Message.
*
* @return BGP Header of BGP Message
*/
BGPHeader getHeader();
/**
* Sets BgpHeader and return its builder.
*
* @param bgpMsgHeader BGP Message Header
......
......@@ -17,16 +17,24 @@
package org.onosproject.bgp.controller.impl;
import static org.onlab.util.Tools.groupedThreads;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onosproject.bgp.controller.BGPCfg;
import org.onosproject.bgp.controller.BGPController;
import org.onosproject.bgp.controller.BGPId;
import org.onosproject.bgp.controller.BGPPacketStats;
import org.onosproject.bgp.controller.BGPPeer;
import org.onosproject.bgpio.protocol.BGPMessage;
import org.onosproject.bgpio.protocol.BGPVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -42,8 +50,10 @@ public class BGPControllerImpl implements BGPController {
private final ExecutorService executorBarrier = Executors.newFixedThreadPool(4,
groupedThreads("onos/bgp",
"event-barrier-%d"));
"event-barrier-%d"));
protected ConcurrentHashMap<BGPId, BGPPeer> connectedPeers = new ConcurrentHashMap<BGPId, BGPPeer>();
protected BGPPeerManager peerManager = new BGPPeerManager();
final Controller ctrl = new Controller(this);
private BGPConfig bgpconfig = new BGPConfig();
......@@ -57,11 +67,22 @@ public class BGPControllerImpl implements BGPController {
@Deactivate
public void deactivate() {
// Close all connected peers
closeConnectedPeers();
this.ctrl.stop();
log.info("Stopped");
}
@Override
public Iterable<BGPPeer> getPeers() {
return this.connectedPeers.values();
}
@Override
public BGPPeer getPeer(BGPId bgpId) {
return this.connectedPeers.get(bgpId);
}
@Override
public void writeMsg(BGPId bgpId, BGPMessage msg) {
// TODO: Send message
}
......@@ -88,17 +109,167 @@ public class BGPControllerImpl implements BGPController {
}
}
@Override
public void closeConnectedPeers() {
BGPPeer bgpPeer;
for (BGPId id : this.connectedPeers.keySet()) {
bgpPeer = getPeer(id);
bgpPeer.disconnectPeer();
}
}
/**
* Get controller instance.
*
* @return ctrl the controller.
* Implementation of an BGP Peer which is responsible for keeping track of connected peers and the state in which
* they are.
*/
public class BGPPeerManager {
private final Logger log = LoggerFactory.getLogger(BGPPeerManager.class);
private final Lock peerLock = new ReentrantLock();
/**
* Add a BGP peer that has just connected to the system.
*
* @param bgpId the id of bgp peer to add
* @param bgpPeer the actual bgp peer object.
* @return true if added, false otherwise.
*/
public boolean addConnectedPeer(BGPId bgpId, BGPPeer bgpPeer) {
if (connectedPeers.get(bgpId) != null) {
this.log.error("Trying to add connectedPeer but found previous " + "value for bgp ip: {}",
bgpId.toString());
return false;
} else {
this.log.debug("Added Peer {}", bgpId.toString());
connectedPeers.put(bgpId, bgpPeer);
return true;
}
}
/**
* Checks if the activation for this bgp peer is valid.
*
* @param bgpId the id of bgp peer to check
* @return true if valid, false otherwise
*/
public boolean isPeerConnected(BGPId bgpId) {
if (connectedPeers.get(bgpId) == null) {
this.log.error("Trying to activate peer but is not in " + "connected peer: bgpIp {}. Aborting ..",
bgpId.toString());
return false;
}
return true;
}
/**
* Checks if the activation for this bgp peer is valid.
*
* @param routerid the routerid of bgp peer to check
* @return true if valid, false otherwise
*/
public boolean isPeerConnected(String routerid) {
final BGPId bgpId;
bgpId = BGPId.bgpId(IpAddress.valueOf(routerid));
if (connectedPeers.get(bgpId) != null) {
this.log.info("Peer connection exist ");
return true;
}
this.log.info("Initiate connect request to " + "peer: bgpIp {}", bgpId.toString());
return false;
}
/**
* Clear all state in controller peer maps for a bgp peer that has
* disconnected from the local controller.
*
* @param bgpId the id of bgp peer to remove.
*/
public void removeConnectedPeer(BGPId bgpId) {
connectedPeers.remove(bgpId);
}
/**
* Clear all state in controller peer maps for a bgp peer that has
* disconnected from the local controller.
*
* @param routerid the router id of bgp peer to remove.
*/
public void removeConnectedPeer(String routerid) {
final BGPId bgpId;
bgpId = BGPId.bgpId(IpAddress.valueOf(routerid));
connectedPeers.remove(bgpId);
}
/**
* Gets bgp peer for connected peer map.
*
* @param routerid router id
* @return peer if available, null otherwise
*/
public BGPPeer getPeer(String routerid) {
final BGPId bgpId;
bgpId = BGPId.bgpId(IpAddress.valueOf(routerid));
return connectedPeers.get(bgpId);
}
/**
* Gets bgp peer instance.
*
* @param bgpId bgp identifier.
* @param pv bgp version.
* @param pktStats packet statistics.
* @return BGPPeer peer instance.
*/
public BGPPeer getBGPPeerInstance(BGPId bgpId, BGPVersion pv, BGPPacketStats pktStats) {
BGPPeer bgpPeer = new BGPPeerImpl();
bgpPeer.init(bgpId, pv, pktStats);
return bgpPeer;
}
}
/**
* Gets controller instance.
*
* @return Controller instance.
*/
public Controller getController() {
return ctrl;
}
/**
* Gets connected peers.
*
* @return connectedPeers from connected Peers Map.
*/
public ConcurrentHashMap<BGPId, BGPPeer> getConnectedPeers() {
return connectedPeers;
}
/**
* Gets peer manager.
*
* @return peerManager.
*/
public BGPPeerManager getPeerManager() {
return peerManager;
}
@Override
public BGPCfg getConfig() {
return this.bgpconfig;
}
@Override
public int getBGPConnNumber() {
return connectedPeers.size();
}
}
\ No newline at end of file
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.bgp.controller.impl;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implement sending keepalive message to connected peer periodically based on negotiated holdtime.
*/
public class BGPKeepAliveTimer {
private Timer keepAliveTimer;
private BGPChannelHandler handler;
private static final Logger log = LoggerFactory.getLogger(BGPKeepAliveTimer.class);
/**
* Gets keepalive timer object.
*
* @return keepAliveTimer keepalive timer.
*/
public Timer getKeepAliveTimer() {
return keepAliveTimer;
}
/**
* Initialize timer to send keepalive message periodically.
*
* @param h channel handler
* @param seconds time interval.
*/
public BGPKeepAliveTimer(BGPChannelHandler h, int seconds) {
this.handler = h;
this.keepAliveTimer = new Timer();
this.keepAliveTimer.schedule(new SendKeepAlive(), 0, seconds * 1000);
}
/**
* Send keepalive message to connected peer on schedule.
*/
class SendKeepAlive extends TimerTask {
@Override
public void run() {
log.debug("Sending periodic KeepAlive");
try {
// Send keep alive message
handler.sendKeepAliveMessage();
handler.getBgpPacketStats().addOutPacket();
} catch (Exception e) {
log.info("Exception occured while sending keepAlive message" + e.toString());
}
}
}
}
......@@ -43,8 +43,7 @@ public class BGPPacketStatsImpl implements BGPPacketStats {
/**
* Get the outgoing packet count number.
*
* @return
* packet count
* @return packet count
*/
public int outPacketCount() {
return outPacketCount;
......@@ -53,8 +52,7 @@ public class BGPPacketStatsImpl implements BGPPacketStats {
/**
* Get the incoming packet count number.
*
* @return
* packet count
* @return packet count
*/
public int inPacketCount() {
return inPacketCount;
......@@ -63,8 +61,7 @@ public class BGPPacketStatsImpl implements BGPPacketStats {
/**
* Get the wrong packet count number.
*
* @return
* packet count
* @return packet count
*/
public int wrongPacketCount() {
return wrongPacketCount;
......@@ -110,8 +107,7 @@ public class BGPPacketStatsImpl implements BGPPacketStats {
/**
* Get the time.
*
* @return
* time
* @return time
*/
public long getTime() {
return this.time;
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.bgp.controller.impl;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.Channel;
import org.onlab.packet.IpAddress;
import org.onosproject.bgp.controller.BGPId;
import org.onosproject.bgp.controller.BGPPacketStats;
import org.onosproject.bgp.controller.BGPPeer;
import org.onosproject.bgpio.protocol.BGPMessage;
import org.onosproject.bgpio.protocol.BGPVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.MoreObjects;
/**
* BGPPeerImpl implements BGPPeer, maintains peer information and store updates in RIB .
*/
public class BGPPeerImpl implements BGPPeer {
protected final Logger log = LoggerFactory.getLogger(BGPPeerImpl.class);
private static final String SHUTDOWN_MSG = "Worker has already been shutdown";
private Channel channel;
protected String channelId;
private boolean connected;
protected boolean isHandShakeComplete = false;
public BGPSessionInfo sessionInfo;
private BGPPacketStatsImpl pktStats;
@Override
public void init(BGPId bgpId, BGPVersion bgpVersion, BGPPacketStats pktStats) {
this.sessionInfo.setRemoteBgpId(bgpId);
this.sessionInfo.setRemoteBgpVersion(bgpVersion);
this.pktStats = (BGPPacketStatsImpl) pktStats;
this.sessionInfo = new BGPSessionInfo();
}
// ************************
// Channel related
// ************************
@Override
public final void disconnectPeer() {
this.channel.close();
}
@Override
public final void sendMessage(BGPMessage m) {
log.debug("Sending message to {}", channel.getRemoteAddress());
try {
channel.write(Collections.singletonList(m));
this.pktStats.addOutPacket();
} catch (RejectedExecutionException e) {
log.warn(e.getMessage());
if (!e.getMessage().contains(SHUTDOWN_MSG)) {
throw e;
}
}
}
@Override
public final void sendMessage(List<BGPMessage> msgs) {
try {
channel.write(msgs);
this.pktStats.addOutPacket(msgs.size());
} catch (RejectedExecutionException e) {
log.warn(e.getMessage());
if (!e.getMessage().contains(SHUTDOWN_MSG)) {
throw e;
}
}
}
@Override
public final boolean isConnected() {
return this.connected;
}
@Override
public final void setConnected(boolean connected) {
this.connected = connected;
};
@Override
public final void setChannel(Channel channel) {
this.channel = channel;
final SocketAddress address = channel.getRemoteAddress();
if (address instanceof InetSocketAddress) {
final InetSocketAddress inetAddress = (InetSocketAddress) address;
final IpAddress ipAddress = IpAddress.valueOf(inetAddress.getAddress());
if (ipAddress.isIp4()) {
channelId = ipAddress.toString() + ':' + inetAddress.getPort();
} else {
channelId = '[' + ipAddress.toString() + "]:" + inetAddress.getPort();
}
}
};
@Override
public final Channel getChannel() {
return this.channel;
};
@Override
public String channelId() {
return channelId;
}
// ************************
// BGP Peer features related
// ************************
@Override
public final BGPId getBGPId() {
return this.sessionInfo.getRemoteBgpId();
};
@Override
public final String getStringId() {
return this.sessionInfo.getRemoteBgpId().toString();
}
@Override
public final void setBgpPeerVersion(BGPVersion peerVersion) {
this.sessionInfo.setRemoteBgpVersion(peerVersion);
}
@Override
public void setBgpPeerASNum(short peerASNum) {
this.sessionInfo.setRemoteBgpASNum(peerASNum);
}
@Override
public void setBgpPeerHoldTime(short peerHoldTime) {
this.sessionInfo.setRemoteBgpHoldTime(peerHoldTime);
}
@Override
public void setBgpPeerIdentifier(int peerIdentifier) {
this.sessionInfo.setRemoteBgpIdentifier(peerIdentifier);
}
@Override
public int getBgpPeerIdentifier() {
return this.sessionInfo.getRemoteBgpIdentifier();
}
@Override
public int getNegotiatedHoldTime() {
return this.sessionInfo.getNegotiatedholdTime();
}
@Override
public void setNegotiatedHoldTime(short negotiatedHoldTime) {
this.sessionInfo.setNegotiatedholdTime(negotiatedHoldTime);
}
@Override
public boolean isHandshakeComplete() {
return isHandShakeComplete;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass()).omitNullValues().add("channel", channelId())
.add("bgpId", getBGPId()).toString();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.bgp.controller.impl;
import org.onosproject.bgp.controller.BGPId;
import org.onosproject.bgpio.protocol.BGPVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class maintains BGP peer session info.
*/
public class BGPSessionInfo {
protected final Logger log = LoggerFactory.getLogger(BGPSessionInfo.class);
private BGPId remoteBgpId;
private BGPVersion remoteBgpVersion;
private short remoteBgpASNum;
private short remoteBgpholdTime;
private int remoteBgpIdentifier;
private short negotiatedholdTime;
/**
* Gets the negotiated hold time for the session.
*
* @return negotiated hold time.
*/
public short getNegotiatedholdTime() {
return negotiatedholdTime;
}
/**
* Sets the negotiated hold time for the session.
*
* @param negotiatedholdTime negotiated hold time.
*/
public void setNegotiatedholdTime(short negotiatedholdTime) {
this.negotiatedholdTime = negotiatedholdTime;
}
/**
* Gets the BGP ID of BGP peer.
*
* @return bgp ID.
*/
public BGPId getRemoteBgpId() {
return remoteBgpId;
}
/**
* Sets the BGP ID of bgp peer.
*
* @param bgpId BGP ID to set.
*/
public void setRemoteBgpId(BGPId bgpId) {
log.debug("Remote BGP ID {}", bgpId);
this.remoteBgpId = bgpId;
}
/**
* Gets the BGP version of peer.
*
* @return bgp version.
*/
public BGPVersion getRemoteBgpVersion() {
return remoteBgpVersion;
}
/**
* Sets the BGP version for this bgp peer.
*
* @param bgpVersion bgp version to set.
*/
public void setRemoteBgpVersion(BGPVersion bgpVersion) {
log.debug("Remote BGP version {}", bgpVersion);
this.remoteBgpVersion = bgpVersion;
}
/**
* Gets the BGP remote bgp AS number.
*
* @return remoteBgpASNum peer AS number.
*/
public short getRemoteBgpASNum() {
return remoteBgpASNum;
}
/**
* Sets the AS Number for this bgp peer.
*
* @param bgpASNum the autonomous system number value to set.
*/
public void setRemoteBgpASNum(short bgpASNum) {
log.debug("Remote BGP AS number {}", bgpASNum);
this.remoteBgpASNum = bgpASNum;
}
/**
* Gets the BGP peer hold time.
*
* @return bgp hold time.
*/
public short getRemoteBgpHoldTime() {
return remoteBgpholdTime;
}
/**
* Sets the hold time for this bgp peer.
*
* @param holdTime the hold timer value to set.
*/
public void setRemoteBgpHoldTime(short holdTime) {
log.debug("Remote BGP HoldTime {}", holdTime);
this.remoteBgpholdTime = holdTime;
}
/**
* Gets the BGP version for this bgp peer.
*
* @return bgp identifier.
*/
public int getRemoteBgpIdentifier() {
return remoteBgpIdentifier;
}
/**
* Sets the peer identifier value.
*
* @param bgpIdentifier the bgp peer identifier value.
*/
public void setRemoteBgpIdentifier(int bgpIdentifier) {
log.debug("Remote BGP Identifier {}", bgpIdentifier);
this.remoteBgpIdentifier = bgpIdentifier;
}
}