Brian O'Connor
Committed by Gerrit Code Review

ONOS-3350 AbstractOFSwitch race fixes

Protecting callers with synchronized block during
role request and reply methods.

Change-Id: Ie82f84d1d462923c9f410e6950e846ee3b05551c
......@@ -29,14 +29,10 @@ public interface OpenFlowSwitch {
/**
* Writes the message to the driver.
*
* Note:
* Calling {@link #sendMsg(OFMessage)} does NOT guarantee the messages to be
* transmitted on the wire in order, especially during role transition.
* The messages may be reordered at the switch side.
*
* Calling {@link #sendMsg(List)} guarantee the messages inside the list
* to be transmitted on the wire in order.
* <p>
* Note: Messages may be silently dropped/lost due to IOExceptions or
* role. If this is a concern, then a caller should use barriers.
* </p>
*
* @param msg the message to write
*/
......@@ -44,6 +40,10 @@ public interface OpenFlowSwitch {
/**
* Writes the OFMessage list to the driver.
* <p>
* Note: Messages may be silently dropped/lost due to IOExceptions or
* role. If this is a concern, then a caller should use barriers.
* </p>
*
* @param msgs the messages to be written
*/
......
......@@ -16,6 +16,7 @@
package org.onosproject.openflow.controller.driver;
import com.google.common.collect.Lists;
import org.jboss.netty.channel.Channel;
import org.onlab.packet.IpAddress;
import org.onosproject.net.Device;
......@@ -46,6 +47,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
......@@ -74,12 +76,14 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
private RoleHandler roleMan;
protected RoleState role;
// TODO this is accessed from multiple threads, but volatile may have performance implications
protected volatile RoleState role;
protected OFFeaturesReply features;
protected OFDescStatsReply desc;
List<OFMessage> messagesPendingMastership;
private final AtomicReference<List<OFMessage>> messagesPendingMastership
= new AtomicReference<>();
@Override
public void init(Dpid dpid, OFDescStatsReply desc, OFVersion ofv) {
......@@ -104,15 +108,53 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
@Override
public final void sendMsg(List<OFMessage> msgs) {
if (role == RoleState.MASTER && channel.isConnected()) {
/*
It is possible that in this block, we transition to SLAVE/EQUAL.
If this is the case, the supplied messages will race with the
RoleRequest message, and they could be rejected by the switch.
In the interest of performance, we will not protect this block with
a synchronization primitive, because the message would have just been
dropped anyway.
*/
if (role == RoleState.MASTER) {
// fast path send when we are master
sendMsgsOnChannel(msgs);
return;
}
// check to see if mastership transition is in progress
synchronized (messagesPendingMastership) {
/*
messagesPendingMastership is used as synchronization variable for
all mastership related changes. In this block, mastership (including
role update) will have either occurred or not.
*/
if (role == RoleState.MASTER) {
// transition to MASTER complete, send messages
sendMsgsOnChannel(msgs);
return;
}
List<OFMessage> messages = messagesPendingMastership.get();
if (messages != null) {
// we are transitioning to MASTER, so add messages to queue
messages.addAll(msgs);
log.debug("Enqueue message for switch {}. queue size after is {}",
dpid, messages.size());
} else {
// not transitioning to MASTER
log.warn("Dropping message for switch {} (role: {}, connected: {}): {}",
dpid, role, channel.isConnected(), msgs);
}
}
}
private void sendMsgsOnChannel(List<OFMessage> msgs) {
if (channel.isConnected()) {
channel.write(msgs);
} else if (messagesPendingMastership != null) {
messagesPendingMastership.addAll(msgs);
log.debug("Enqueue message for switch {}. queue size after is {}",
dpid, messagesPendingMastership.size());
} else {
log.warn("Dropping message for switch {} (role: {}, connected: {}): {}",
dpid, role, channel.isConnected(), msgs);
log.warn("Dropping messages for switch {} because channel is not connected: {}",
dpid, msgs);
}
}
......@@ -120,7 +162,7 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
public final void sendRoleRequest(OFMessage msg) {
if (msg instanceof OFRoleRequest ||
msg instanceof OFNiciraControllerRoleRequest) {
channel.write(Collections.singletonList(msg));
sendMsgsOnChannel(Collections.singletonList(msg));
return;
}
throw new IllegalArgumentException("Someone is trying to send " +
......@@ -130,7 +172,7 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
@Override
public final void sendHandshakeMessage(OFMessage message) {
if (!this.isDriverHandshakeComplete()) {
channel.write(Collections.singletonList(message));
sendMsgsOnChannel(Collections.singletonList(message));
}
}
......@@ -239,11 +281,16 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
@Override
public final void transitionToMasterSwitch() {
this.agent.transitionToMasterSwitch(dpid);
if (messagesPendingMastership != null) {
this.sendMsg(messagesPendingMastership);
log.debug("Sending {} pending messages to switch {}",
messagesPendingMastership.size(), dpid);
messagesPendingMastership = null;
synchronized (messagesPendingMastership) {
List<OFMessage> messages = messagesPendingMastership.get();
if (messages != null) {
this.sendMsg(messages);
log.debug("Sending {} pending messages to switch {}",
messages.size(), dpid);
messagesPendingMastership.set(null);
}
// perform role transition after clearing messages queue
this.role = RoleState.MASTER;
}
}
......@@ -287,17 +334,27 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
@Override
public void setRole(RoleState role) {
try {
if (role == RoleState.SLAVE || role == RoleState.EQUAL) {
// perform role transition to SLAVE/EQUAL before sending role request
this.role = role;
}
if (this.roleMan.sendRoleRequest(role, RoleRecvStatus.MATCHED_SET_ROLE)) {
log.debug("Sending role {} to switch {}", role, getStringId());
if (role == RoleState.SLAVE || role == RoleState.EQUAL) {
this.role = role;
} else {
if (messagesPendingMastership == null) {
log.debug("Initializing new queue for switch {}", dpid);
messagesPendingMastership = new ArrayList<>();
if (role == RoleState.MASTER) {
synchronized (messagesPendingMastership) {
if (messagesPendingMastership.get() == null) {
log.debug("Initializing new message queue for switch {}", dpid);
/*
The presence of messagesPendingMastership indicates that
a switch is currently transitioning to MASTER, but
is still awaiting role reply from switch.
*/
messagesPendingMastership.set(Lists.newArrayList());
}
}
}
} else {
} else if (role == RoleState.MASTER) {
// role request not support; transition switch to MASTER
this.role = role;
}
} catch (IOException e) {
......@@ -307,6 +364,7 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
@Override
public void reassertRole() {
// TODO should messages be sent directly or queue during reassertion?
if (this.getRole() == RoleState.MASTER) {
log.warn("Received permission error from switch {} while " +
"being master. Reasserting master role.",
......@@ -315,18 +373,15 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
}
}
@Override
public void handleRole(OFMessage m) throws SwitchStateException {
RoleReplyInfo rri = roleMan.extractOFRoleReply((OFRoleReply) m);
RoleRecvStatus rrs = roleMan.deliverRoleReply(rri);
if (rrs == RoleRecvStatus.MATCHED_SET_ROLE) {
if (rri.getRole() == RoleState.MASTER) {
this.role = rri.getRole();
this.transitionToMasterSwitch();
} else if (rri.getRole() == RoleState.EQUAL ||
rri.getRole() == RoleState.SLAVE) {
rri.getRole() == RoleState.SLAVE) {
this.transitionToEqualSwitch();
}
} else {
......@@ -348,10 +403,9 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
new RoleReplyInfo(r, null, m.getXid()));
if (rrs == RoleRecvStatus.MATCHED_SET_ROLE) {
if (r == RoleState.MASTER) {
this.role = r;
this.transitionToMasterSwitch();
} else if (r == RoleState.EQUAL ||
r == RoleState.SLAVE) {
r == RoleState.SLAVE) {
this.transitionToEqualSwitch();
}
} else {
......@@ -369,8 +423,6 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
return true;
}
@Override
public final void setAgent(OpenFlowAgent ag) {
if (this.agent == null) {
......@@ -398,9 +450,8 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
@Override
public List<OFPortDesc> getPorts() {
return this.ports.stream()
.flatMap((portReply) -> (portReply.getEntries().stream()))
.flatMap(portReply -> portReply.getEntries().stream())
.collect(Collectors.toList());
//return Collections.unmodifiableList(ports.getEntries());
}
@Override
......@@ -408,13 +459,11 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
return this.desc.getMfrDesc();
}
@Override
public String datapathDescription() {
return this.desc.getDpDesc();
}
@Override
public String hardwareDescription() {
return this.desc.getHwDesc();
......@@ -430,20 +479,15 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
return this.desc.getSerialNum();
}
@Override
public Device.Type deviceType() {
return Device.Type.SWITCH;
}
@Override
public String toString() {
return this.getClass().getName() + " [" + ((channel != null)
? channel.getRemoteAddress() : "?")
+ " DPID[" + ((getStringId() != null) ? getStringId() : "?") + "]]";
}
}
......