Brian O'Connor

Changing line endings to \n in NewAdaptiveFlowStatsCollector

Change-Id: I233c6f4161d0e3af308f744908c52a091824eca8
1 -/* 1 +/*
2 - * Copyright 2015 Open Networking Laboratory 2 + * Copyright 2015 Open Networking Laboratory
3 - * 3 + *
4 - * Licensed under the Apache License, Version 2.0 (the "License"); 4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 - * you may not use this file except in compliance with the License. 5 + * you may not use this file except in compliance with the License.
6 - * You may obtain a copy of the License at 6 + * You may obtain a copy of the License at
7 - * 7 + *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 - * 9 + *
10 - * Unless required by applicable law or agreed to in writing, software 10 + * Unless required by applicable law or agreed to in writing, software
11 - * distributed under the License is distributed on an "AS IS" BASIS, 11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 - * See the License for the specific language governing permissions and 13 + * See the License for the specific language governing permissions and
14 - * limitations under the License. 14 + * limitations under the License.
15 - */ 15 + */
16 - 16 +
17 -package org.onosproject.provider.of.flow.impl; 17 +package org.onosproject.provider.of.flow.impl;
18 - 18 +
19 -import com.google.common.base.Objects; 19 +import com.google.common.base.Objects;
20 -import com.google.common.collect.ImmutableSet; 20 +import com.google.common.collect.ImmutableSet;
21 -import com.google.common.collect.Maps; 21 +import com.google.common.collect.Maps;
22 -import com.google.common.collect.Sets; 22 +import com.google.common.collect.Sets;
23 -import org.onosproject.net.flow.DefaultTypedFlowEntry; 23 +import org.onosproject.net.flow.DefaultTypedFlowEntry;
24 -import org.onosproject.net.flow.FlowEntry; 24 +import org.onosproject.net.flow.FlowEntry;
25 -import org.onosproject.net.flow.FlowId; 25 +import org.onosproject.net.flow.FlowId;
26 -import org.onosproject.net.flow.FlowRule; 26 +import org.onosproject.net.flow.FlowRule;
27 -import org.onosproject.net.flow.StoredFlowEntry; 27 +import org.onosproject.net.flow.StoredFlowEntry;
28 -import org.onosproject.net.flow.TypedStoredFlowEntry; 28 +import org.onosproject.net.flow.TypedStoredFlowEntry;
29 -import org.onosproject.net.flow.instructions.Instruction; 29 +import org.onosproject.net.flow.instructions.Instruction;
30 -import org.onosproject.net.flow.instructions.Instructions; 30 +import org.onosproject.net.flow.instructions.Instructions;
31 -import org.onosproject.openflow.controller.OpenFlowSwitch; 31 +import org.onosproject.openflow.controller.OpenFlowSwitch;
32 -import org.onosproject.openflow.controller.RoleState; 32 +import org.onosproject.openflow.controller.RoleState;
33 -import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest; 33 +import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
34 -import org.projectfloodlight.openflow.protocol.match.Match; 34 +import org.projectfloodlight.openflow.protocol.match.Match;
35 -import org.projectfloodlight.openflow.types.OFPort; 35 +import org.projectfloodlight.openflow.types.OFPort;
36 -import org.projectfloodlight.openflow.types.TableId; 36 +import org.projectfloodlight.openflow.types.TableId;
37 -import org.slf4j.Logger; 37 +import org.slf4j.Logger;
38 - 38 +
39 -import java.util.HashSet; 39 +import java.util.HashSet;
40 -import java.util.List; 40 +import java.util.List;
41 -import java.util.Map; 41 +import java.util.Map;
42 -import java.util.Optional; 42 +import java.util.Optional;
43 -import java.util.Set; 43 +import java.util.Set;
44 -import java.util.concurrent.Executors; 44 +import java.util.concurrent.Executors;
45 -import java.util.concurrent.ScheduledExecutorService; 45 +import java.util.concurrent.ScheduledExecutorService;
46 -import java.util.concurrent.ScheduledFuture; 46 +import java.util.concurrent.ScheduledFuture;
47 -import java.util.concurrent.TimeUnit; 47 +import java.util.concurrent.TimeUnit;
48 - 48 +
49 -import static com.google.common.base.Preconditions.checkNotNull; 49 +import static com.google.common.base.Preconditions.checkNotNull;
50 -import static org.onlab.util.Tools.groupedThreads; 50 +import static org.onlab.util.Tools.groupedThreads;
51 -import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType; 51 +import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType;
52 -import static org.slf4j.LoggerFactory.getLogger; 52 +import static org.slf4j.LoggerFactory.getLogger;
53 - 53 +
54 -/** 54 +/**
55 - * Efficiently and adaptively collects flow statistics for the specified switch. 55 + * Efficiently and adaptively collects flow statistics for the specified switch.
56 - */ 56 + */
57 -public class NewAdaptiveFlowStatsCollector { 57 +public class NewAdaptiveFlowStatsCollector {
58 - 58 +
59 - private final Logger log = getLogger(getClass()); 59 + private final Logger log = getLogger(getClass());
60 - 60 +
61 - private final OpenFlowSwitch sw; 61 + private final OpenFlowSwitch sw;
62 - 62 +
63 - private ScheduledExecutorService adaptiveFlowStatsScheduler = 63 + private ScheduledExecutorService adaptiveFlowStatsScheduler =
64 - Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d")); 64 + Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
65 - private ScheduledFuture<?> calAndShortFlowsThread; 65 + private ScheduledFuture<?> calAndShortFlowsThread;
66 - private ScheduledFuture<?> midFlowsThread; 66 + private ScheduledFuture<?> midFlowsThread;
67 - private ScheduledFuture<?> longFlowsThread; 67 + private ScheduledFuture<?> longFlowsThread;
68 - 68 +
69 - // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval 69 + // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
70 - private CalAndShortFlowsTask calAndShortFlowsTask; 70 + private CalAndShortFlowsTask calAndShortFlowsTask;
71 - // Task that collects stats MID flows every 2*calAndPollInterval 71 + // Task that collects stats MID flows every 2*calAndPollInterval
72 - private MidFlowsTask midFlowsTask; 72 + private MidFlowsTask midFlowsTask;
73 - // Task that collects stats LONG flows every 3*calAndPollInterval 73 + // Task that collects stats LONG flows every 3*calAndPollInterval
74 - private LongFlowsTask longFlowsTask; 74 + private LongFlowsTask longFlowsTask;
75 - 75 +
76 - private static final int CAL_AND_POLL_TIMES = 1; // must be always 0 76 + private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
77 - private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1 77 + private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
78 - private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES 78 + private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
79 - //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable 79 + //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
80 - // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES 80 + // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
81 - private static final int ENTIRE_POLL_TIMES = 6; 81 + private static final int ENTIRE_POLL_TIMES = 6;
82 - 82 +
83 - private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5; 83 + private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
84 - private static final int MIN_CAL_AND_POLL_FREQUENCY = 2; 84 + private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
85 - private static final int MAX_CAL_AND_POLL_FREQUENCY = 60; 85 + private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
86 - 86 +
87 - private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; 87 + private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
88 - private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; 88 + private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
89 - private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; 89 + private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
90 - // only used for checking condition at each task if it collects entire flows from a given switch or not 90 + // only used for checking condition at each task if it collects entire flows from a given switch or not
91 - private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY; 91 + private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
92 - 92 +
93 - // Number of call count of each Task, 93 + // Number of call count of each Task,
94 - // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask 94 + // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
95 - private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called 95 + private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
96 - private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called 96 + private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
97 - private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called 97 + private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
98 - 98 +
99 - private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable(); 99 + private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
100 - 100 +
101 - private boolean isFirstTimeStart = true; 101 + private boolean isFirstTimeStart = true;
102 - 102 +
103 - public static final long NO_FLOW_MISSING_XID = (-1); 103 + public static final long NO_FLOW_MISSING_XID = (-1);
104 - private long flowMissingXid = NO_FLOW_MISSING_XID; 104 + private long flowMissingXid = NO_FLOW_MISSING_XID;
105 - 105 +
106 - /** 106 + /**
107 - * Creates a new adaptive collector for the given switch and default cal_and_poll frequency. 107 + * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
108 - * 108 + *
109 - * @param sw switch to pull 109 + * @param sw switch to pull
110 - * @param pollInterval cal and immediate poll frequency in seconds 110 + * @param pollInterval cal and immediate poll frequency in seconds
111 - */ 111 + */
112 - NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) { 112 + NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {
113 - this.sw = sw; 113 + this.sw = sw;
114 - 114 +
115 - initMemberVars(pollInterval); 115 + initMemberVars(pollInterval);
116 - } 116 + }
117 - 117 +
118 - // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count 118 + // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
119 - private void initMemberVars(int pollInterval) { 119 + private void initMemberVars(int pollInterval) {
120 - if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) { 120 + if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
121 - this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY; 121 + this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
122 - } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) { 122 + } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
123 - this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY; 123 + this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
124 - } else { 124 + } else {
125 - this.calAndPollInterval = pollInterval; 125 + this.calAndPollInterval = pollInterval;
126 - } 126 + }
127 - 127 +
128 - calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval; 128 + calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
129 - midPollInterval = MID_POLL_TIMES * calAndPollInterval; 129 + midPollInterval = MID_POLL_TIMES * calAndPollInterval;
130 - longPollInterval = LONG_POLL_TIMES * calAndPollInterval; 130 + longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
131 - entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval; 131 + entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
132 - 132 +
133 - callCountCalAndShortFlowsTask = 0; 133 + callCountCalAndShortFlowsTask = 0;
134 - callCountMidFlowsTask = 0; 134 + callCountMidFlowsTask = 0;
135 - callCountLongFlowsTask = 0; 135 + callCountLongFlowsTask = 0;
136 - 136 +
137 - flowMissingXid = NO_FLOW_MISSING_XID; 137 + flowMissingXid = NO_FLOW_MISSING_XID;
138 - } 138 + }
139 - 139 +
140 - /** 140 + /**
141 - * Adjusts adaptive poll frequency. 141 + * Adjusts adaptive poll frequency.
142 - * 142 + *
143 - * @param pollInterval poll frequency in seconds 143 + * @param pollInterval poll frequency in seconds
144 - */ 144 + */
145 - synchronized void adjustCalAndPollInterval(int pollInterval) { 145 + synchronized void adjustCalAndPollInterval(int pollInterval) {
146 - initMemberVars(pollInterval); 146 + initMemberVars(pollInterval);
147 - 147 +
148 - if (calAndShortFlowsThread != null) { 148 + if (calAndShortFlowsThread != null) {
149 - calAndShortFlowsThread.cancel(false); 149 + calAndShortFlowsThread.cancel(false);
150 - } 150 + }
151 - if (midFlowsThread != null) { 151 + if (midFlowsThread != null) {
152 - midFlowsThread.cancel(false); 152 + midFlowsThread.cancel(false);
153 - } 153 + }
154 - if (longFlowsThread != null) { 154 + if (longFlowsThread != null) {
155 - longFlowsThread.cancel(false); 155 + longFlowsThread.cancel(false);
156 - } 156 + }
157 - 157 +
158 - calAndShortFlowsTask = new CalAndShortFlowsTask(); 158 + calAndShortFlowsTask = new CalAndShortFlowsTask();
159 - calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( 159 + calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
160 - calAndShortFlowsTask, 160 + calAndShortFlowsTask,
161 - 0, 161 + 0,
162 - calAndPollInterval, 162 + calAndPollInterval,
163 - TimeUnit.SECONDS); 163 + TimeUnit.SECONDS);
164 - 164 +
165 - midFlowsTask = new MidFlowsTask(); 165 + midFlowsTask = new MidFlowsTask();
166 - midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( 166 + midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
167 - midFlowsTask, 167 + midFlowsTask,
168 - 0, 168 + 0,
169 - midPollInterval, 169 + midPollInterval,
170 - TimeUnit.SECONDS); 170 + TimeUnit.SECONDS);
171 - 171 +
172 - longFlowsTask = new LongFlowsTask(); 172 + longFlowsTask = new LongFlowsTask();
173 - longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( 173 + longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
174 - longFlowsTask, 174 + longFlowsTask,
175 - 0, 175 + 0,
176 - longPollInterval, 176 + longPollInterval,
177 - TimeUnit.SECONDS); 177 + TimeUnit.SECONDS);
178 - 178 +
179 - log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted"); 179 + log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
180 - } 180 + }
181 - 181 +
182 - private class CalAndShortFlowsTask implements Runnable { 182 + private class CalAndShortFlowsTask implements Runnable {
183 - @Override 183 + @Override
184 - public void run() { 184 + public void run() {
185 - if (sw.getRole() == RoleState.MASTER) { 185 + if (sw.getRole() == RoleState.MASTER) {
186 - log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId()); 186 + log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
187 - 187 +
188 - if (isFirstTimeStart) { 188 + if (isFirstTimeStart) {
189 - // isFirstTimeStart, get entire flow stats from a given switch sw 189 + // isFirstTimeStart, get entire flow stats from a given switch sw
190 - log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}", 190 + log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
191 - sw.getStringId()); 191 + sw.getStringId());
192 - ofFlowStatsRequestAllSend(); 192 + ofFlowStatsRequestAllSend();
193 - 193 +
194 - callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES; 194 + callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
195 - isFirstTimeStart = false; 195 + isFirstTimeStart = false;
196 - } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) { 196 + } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
197 - // entire_poll_times, get entire flow stats from a given switch sw 197 + // entire_poll_times, get entire flow stats from a given switch sw
198 - log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId()); 198 + log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
199 - ofFlowStatsRequestAllSend(); 199 + ofFlowStatsRequestAllSend();
200 - 200 +
201 - callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES; 201 + callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
202 - //TODO: check flows deleted in switch, but exist in controller flow table, then remove them 202 + //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
203 - // 203 + //
204 - } else { 204 + } else {
205 - calAndShortFlowsTaskInternal(); 205 + calAndShortFlowsTaskInternal();
206 - callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES; 206 + callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
207 - } 207 + }
208 - } 208 + }
209 - } 209 + }
210 - } 210 + }
211 - 211 +
212 - // send openflow flow stats request message with getting all flow entries to a given switch sw 212 + // send openflow flow stats request message with getting all flow entries to a given switch sw
213 - private void ofFlowStatsRequestAllSend() { 213 + private void ofFlowStatsRequestAllSend() {
214 - OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest() 214 + OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
215 - .setMatch(sw.factory().matchWildcardAll()) 215 + .setMatch(sw.factory().matchWildcardAll())
216 - .setTableId(TableId.ALL) 216 + .setTableId(TableId.ALL)
217 - .setOutPort(OFPort.NO_MASK) 217 + .setOutPort(OFPort.NO_MASK)
218 - .build(); 218 + .build();
219 - 219 +
220 - synchronized (this) { 220 + synchronized (this) {
221 - // set the request xid to check the reply in OpenFlowRuleProvider 221 + // set the request xid to check the reply in OpenFlowRuleProvider
222 - // After processing the reply of this request message, 222 + // After processing the reply of this request message,
223 - // this must be set to NO_FLOW_MISSING_XID(-1) by provider 223 + // this must be set to NO_FLOW_MISSING_XID(-1) by provider
224 - setFlowMissingXid(request.getXid()); 224 + setFlowMissingXid(request.getXid());
225 - log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId()); 225 + log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
226 - 226 +
227 - sw.sendMsg(request); 227 + sw.sendMsg(request);
228 - } 228 + }
229 - } 229 + }
230 - 230 +
231 - // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw 231 + // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
232 - private void ofFlowStatsRequestFlowSend(FlowEntry fe) { 232 + private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
233 - // set find match 233 + // set find match
234 - Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(), 234 + Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(),
235 - Optional.empty()).buildMatch(); 235 + Optional.empty()).buildMatch();
236 - // set find tableId 236 + // set find tableId
237 - TableId tableId = TableId.of(fe.tableId()); 237 + TableId tableId = TableId.of(fe.tableId());
238 - // set output port 238 + // set output port
239 - Instruction ins = fe.treatment().allInstructions().stream() 239 + Instruction ins = fe.treatment().allInstructions().stream()
240 - .filter(i -> (i.type() == Instruction.Type.OUTPUT)) 240 + .filter(i -> (i.type() == Instruction.Type.OUTPUT))
241 - .findFirst() 241 + .findFirst()
242 - .orElse(null); 242 + .orElse(null);
243 - OFPort ofPort = OFPort.NO_MASK; 243 + OFPort ofPort = OFPort.NO_MASK;
244 - if (ins != null) { 244 + if (ins != null) {
245 - Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins; 245 + Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
246 - ofPort = OFPort.of((int) ((out.port().toLong()))); 246 + ofPort = OFPort.of((int) ((out.port().toLong())));
247 - } 247 + }
248 - 248 +
249 - OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest() 249 + OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
250 - .setMatch(match) 250 + .setMatch(match)
251 - .setTableId(tableId) 251 + .setTableId(tableId)
252 - .setOutPort(ofPort) 252 + .setOutPort(ofPort)
253 - .build(); 253 + .build();
254 - 254 +
255 - synchronized (this) { 255 + synchronized (this) {
256 - if (getFlowMissingXid() != NO_FLOW_MISSING_XID) { 256 + if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
257 - log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet," 257 + log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
258 - + " set no flow missing xid anyway, for {}", 258 + + " set no flow missing xid anyway, for {}",
259 - sw.getStringId()); 259 + sw.getStringId());
260 - setFlowMissingXid(NO_FLOW_MISSING_XID); 260 + setFlowMissingXid(NO_FLOW_MISSING_XID);
261 - } 261 + }
262 - 262 +
263 - sw.sendMsg(request); 263 + sw.sendMsg(request);
264 - } 264 + }
265 - } 265 + }
266 - 266 +
267 - private void calAndShortFlowsTaskInternal() { 267 + private void calAndShortFlowsTaskInternal() {
268 - deviceFlowTable.checkAndMoveLiveFlowAll(); 268 + deviceFlowTable.checkAndMoveLiveFlowAll();
269 - 269 +
270 - deviceFlowTable.getShortFlows().forEach(fe -> { 270 + deviceFlowTable.getShortFlows().forEach(fe -> {
271 - ofFlowStatsRequestFlowSend(fe); 271 + ofFlowStatsRequestFlowSend(fe);
272 - }); 272 + });
273 - } 273 + }
274 - 274 +
275 - private class MidFlowsTask implements Runnable { 275 + private class MidFlowsTask implements Runnable {
276 - @Override 276 + @Override
277 - public void run() { 277 + public void run() {
278 - if (sw.getRole() == RoleState.MASTER) { 278 + if (sw.getRole() == RoleState.MASTER) {
279 - log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId()); 279 + log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
280 - 280 +
281 - // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw 281 + // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
282 - if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) { 282 + if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
283 - callCountMidFlowsTask = MID_POLL_TIMES; 283 + callCountMidFlowsTask = MID_POLL_TIMES;
284 - } else { 284 + } else {
285 - midFlowsTaskInternal(); 285 + midFlowsTaskInternal();
286 - callCountMidFlowsTask += MID_POLL_TIMES; 286 + callCountMidFlowsTask += MID_POLL_TIMES;
287 - } 287 + }
288 - } 288 + }
289 - } 289 + }
290 - } 290 + }
291 - 291 +
292 - private void midFlowsTaskInternal() { 292 + private void midFlowsTaskInternal() {
293 - deviceFlowTable.getMidFlows().forEach(fe -> { 293 + deviceFlowTable.getMidFlows().forEach(fe -> {
294 - ofFlowStatsRequestFlowSend(fe); 294 + ofFlowStatsRequestFlowSend(fe);
295 - }); 295 + });
296 - } 296 + }
297 - 297 +
298 - private class LongFlowsTask implements Runnable { 298 + private class LongFlowsTask implements Runnable {
299 - @Override 299 + @Override
300 - public void run() { 300 + public void run() {
301 - if (sw.getRole() == RoleState.MASTER) { 301 + if (sw.getRole() == RoleState.MASTER) {
302 - log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId()); 302 + log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
303 - 303 +
304 - // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw 304 + // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
305 - if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) { 305 + if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
306 - callCountLongFlowsTask = LONG_POLL_TIMES; 306 + callCountLongFlowsTask = LONG_POLL_TIMES;
307 - } else { 307 + } else {
308 - longFlowsTaskInternal(); 308 + longFlowsTaskInternal();
309 - callCountLongFlowsTask += LONG_POLL_TIMES; 309 + callCountLongFlowsTask += LONG_POLL_TIMES;
310 - } 310 + }
311 - } 311 + }
312 - } 312 + }
313 - } 313 + }
314 - 314 +
315 - private void longFlowsTaskInternal() { 315 + private void longFlowsTaskInternal() {
316 - deviceFlowTable.getLongFlows().forEach(fe -> { 316 + deviceFlowTable.getLongFlows().forEach(fe -> {
317 - ofFlowStatsRequestFlowSend(fe); 317 + ofFlowStatsRequestFlowSend(fe);
318 - }); 318 + });
319 - } 319 + }
320 - 320 +
321 - /** 321 + /**
322 - * start adaptive flow statistic collection. 322 + * start adaptive flow statistic collection.
323 - * 323 + *
324 - */ 324 + */
325 - public synchronized void start() { 325 + public synchronized void start() {
326 - log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId()); 326 + log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
327 - callCountCalAndShortFlowsTask = 0; 327 + callCountCalAndShortFlowsTask = 0;
328 - callCountMidFlowsTask = 0; 328 + callCountMidFlowsTask = 0;
329 - callCountLongFlowsTask = 0; 329 + callCountLongFlowsTask = 0;
330 - 330 +
331 - isFirstTimeStart = true; 331 + isFirstTimeStart = true;
332 - 332 +
333 - // Initially start polling quickly. Then drop down to configured value 333 + // Initially start polling quickly. Then drop down to configured value
334 - calAndShortFlowsTask = new CalAndShortFlowsTask(); 334 + calAndShortFlowsTask = new CalAndShortFlowsTask();
335 - calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( 335 + calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
336 - calAndShortFlowsTask, 336 + calAndShortFlowsTask,
337 - 1, 337 + 1,
338 - calAndPollInterval, 338 + calAndPollInterval,
339 - TimeUnit.SECONDS); 339 + TimeUnit.SECONDS);
340 - 340 +
341 - midFlowsTask = new MidFlowsTask(); 341 + midFlowsTask = new MidFlowsTask();
342 - midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( 342 + midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
343 - midFlowsTask, 343 + midFlowsTask,
344 - 1, 344 + 1,
345 - midPollInterval, 345 + midPollInterval,
346 - TimeUnit.SECONDS); 346 + TimeUnit.SECONDS);
347 - 347 +
348 - longFlowsTask = new LongFlowsTask(); 348 + longFlowsTask = new LongFlowsTask();
349 - longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay( 349 + longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
350 - longFlowsTask, 350 + longFlowsTask,
351 - 1, 351 + 1,
352 - longPollInterval, 352 + longPollInterval,
353 - TimeUnit.SECONDS); 353 + TimeUnit.SECONDS);
354 - 354 +
355 - log.info("Started"); 355 + log.info("Started");
356 - } 356 + }
357 - 357 +
358 - /** 358 + /**
359 - * stop adaptive flow statistic collection. 359 + * stop adaptive flow statistic collection.
360 - * 360 + *
361 - */ 361 + */
362 - public synchronized void stop() { 362 + public synchronized void stop() {
363 - log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId()); 363 + log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
364 - if (calAndShortFlowsThread != null) { 364 + if (calAndShortFlowsThread != null) {
365 - calAndShortFlowsThread.cancel(true); 365 + calAndShortFlowsThread.cancel(true);
366 - } 366 + }
367 - if (midFlowsThread != null) { 367 + if (midFlowsThread != null) {
368 - midFlowsThread.cancel(true); 368 + midFlowsThread.cancel(true);
369 - } 369 + }
370 - if (longFlowsThread != null) { 370 + if (longFlowsThread != null) {
371 - longFlowsThread.cancel(true); 371 + longFlowsThread.cancel(true);
372 - } 372 + }
373 - 373 +
374 - adaptiveFlowStatsScheduler.shutdownNow(); 374 + adaptiveFlowStatsScheduler.shutdownNow();
375 - 375 +
376 - isFirstTimeStart = false; 376 + isFirstTimeStart = false;
377 - 377 +
378 - log.info("Stopped"); 378 + log.info("Stopped");
379 - } 379 + }
380 - 380 +
381 - /** 381 + /**
382 - * add typed flow entry from flow rule into the internal flow table. 382 + * add typed flow entry from flow rule into the internal flow table.
383 - * 383 + *
384 - * @param flowRules the flow rules 384 + * @param flowRules the flow rules
385 - * 385 + *
386 - */ 386 + */
387 - public synchronized void addWithFlowRule(FlowRule... flowRules) { 387 + public synchronized void addWithFlowRule(FlowRule... flowRules) {
388 - for (FlowRule fr : flowRules) { 388 + for (FlowRule fr : flowRules) {
389 - // First remove old entry unconditionally, if exist 389 + // First remove old entry unconditionally, if exist
390 - deviceFlowTable.remove(fr); 390 + deviceFlowTable.remove(fr);
391 - 391 +
392 - // add new flow entry, we suppose IMMEDIATE_FLOW 392 + // add new flow entry, we suppose IMMEDIATE_FLOW
393 - TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr, 393 + TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
394 - FlowLiveType.IMMEDIATE_FLOW); 394 + FlowLiveType.IMMEDIATE_FLOW);
395 - deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry); 395 + deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
396 - } 396 + }
397 - } 397 + }
398 - 398 +
399 - /** 399 + /**
400 - * add or update typed flow entry from flow entry into the internal flow table. 400 + * add or update typed flow entry from flow entry into the internal flow table.
401 - * 401 + *
402 - * @param flowEntries the flow entries 402 + * @param flowEntries the flow entries
403 - * 403 + *
404 - */ 404 + */
405 - public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) { 405 + public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
406 - for (FlowEntry fe : flowEntries) { 406 + for (FlowEntry fe : flowEntries) {
407 - // check if this new rule is an update to an existing entry 407 + // check if this new rule is an update to an existing entry
408 - TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe); 408 + TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
409 - 409 +
410 - if (stored != null) { 410 + if (stored != null) {
411 - // duplicated flow entry is collected!, just skip 411 + // duplicated flow entry is collected!, just skip
412 - if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets() 412 + if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
413 - && fe.life() == stored.life()) { 413 + && fe.life() == stored.life()) {
414 - log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value()) 414 + log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
415 - + ",is DUPLICATED stats collection, just skip." 415 + + ",is DUPLICATED stats collection, just skip."
416 - + " AdaptiveStats collection thread for {}", 416 + + " AdaptiveStats collection thread for {}",
417 - sw.getStringId()); 417 + sw.getStringId());
418 - 418 +
419 - stored.setLastSeen(); 419 + stored.setLastSeen();
420 - continue; 420 + continue;
421 - } else if (fe.life() < stored.life()) { 421 + } else if (fe.life() < stored.life()) {
422 - // Invalid updates the stats values, i.e., bytes, packets, durations ... 422 + // Invalid updates the stats values, i.e., bytes, packets, durations ...
423 - log.debug("addOrUpdateFlows():" + 423 + log.debug("addOrUpdateFlows():" +
424 - " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." + 424 + " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
425 - " new flowId=" + Long.toHexString(fe.id().value()) + 425 + " new flowId=" + Long.toHexString(fe.id().value()) +
426 - ", old flowId=" + Long.toHexString(stored.id().value()) + 426 + ", old flowId=" + Long.toHexString(stored.id().value()) +
427 - ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() + 427 + ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
428 - ", new life=" + fe.life() + ", old life=" + stored.life() + 428 + ", new life=" + fe.life() + ", old life=" + stored.life() +
429 - ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen()); 429 + ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
430 - // go next 430 + // go next
431 - stored.setLastSeen(); 431 + stored.setLastSeen();
432 - continue; 432 + continue;
433 - } 433 + }
434 - 434 +
435 - // update now 435 + // update now
436 - stored.setLife(fe.life()); 436 + stored.setLife(fe.life());
437 - stored.setPackets(fe.packets()); 437 + stored.setPackets(fe.packets());
438 - stored.setBytes(fe.bytes()); 438 + stored.setBytes(fe.bytes());
439 - stored.setLastSeen(); 439 + stored.setLastSeen();
440 - if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) { 440 + if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
441 - // flow is really RULE_ADDED 441 + // flow is really RULE_ADDED
442 - stored.setState(FlowEntry.FlowEntryState.ADDED); 442 + stored.setState(FlowEntry.FlowEntryState.ADDED);
443 - } 443 + }
444 - // flow is RULE_UPDATED, skip adding and just updating flow live table 444 + // flow is RULE_UPDATED, skip adding and just updating flow live table
445 - //deviceFlowTable.calAndSetFlowLiveType(stored); 445 + //deviceFlowTable.calAndSetFlowLiveType(stored);
446 - continue; 446 + continue;
447 - } 447 + }
448 - 448 +
449 - // add new flow entry, we suppose IMMEDIATE_FLOW 449 + // add new flow entry, we suppose IMMEDIATE_FLOW
450 - TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe, 450 + TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
451 - FlowLiveType.IMMEDIATE_FLOW); 451 + FlowLiveType.IMMEDIATE_FLOW);
452 - deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry); 452 + deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
453 - } 453 + }
454 - } 454 + }
455 - 455 +
456 - /** 456 + /**
457 - * remove typed flow entry from the internal flow table. 457 + * remove typed flow entry from the internal flow table.
458 - * 458 + *
459 - * @param flowRules the flow entries 459 + * @param flowRules the flow entries
460 - * 460 + *
461 - */ 461 + */
462 - public synchronized void removeFlows(FlowRule... flowRules) { 462 + public synchronized void removeFlows(FlowRule... flowRules) {
463 - for (FlowRule rule : flowRules) { 463 + for (FlowRule rule : flowRules) {
464 - deviceFlowTable.remove(rule); 464 + deviceFlowTable.remove(rule);
465 - } 465 + }
466 - } 466 + }
467 - 467 +
468 - // same as removeFlows() function 468 + // same as removeFlows() function
469 - /** 469 + /**
470 - * remove typed flow entry from the internal flow table. 470 + * remove typed flow entry from the internal flow table.
471 - * 471 + *
472 - * @param flowRules the flow entries 472 + * @param flowRules the flow entries
473 - * 473 + *
474 - */ 474 + */
475 - public void flowRemoved(FlowRule... flowRules) { 475 + public void flowRemoved(FlowRule... flowRules) {
476 - removeFlows(flowRules); 476 + removeFlows(flowRules);
477 - } 477 + }
478 - 478 +
479 - // same as addOrUpdateFlows() function 479 + // same as addOrUpdateFlows() function
480 - /** 480 + /**
481 - * add or update typed flow entry from flow entry into the internal flow table. 481 + * add or update typed flow entry from flow entry into the internal flow table.
482 - * 482 + *
483 - * @param flowEntries the flow entry list 483 + * @param flowEntries the flow entry list
484 - * 484 + *
485 - */ 485 + */
486 - public void pushFlowMetrics(List<FlowEntry> flowEntries) { 486 + public void pushFlowMetrics(List<FlowEntry> flowEntries) {
487 - flowEntries.forEach(fe -> { 487 + flowEntries.forEach(fe -> {
488 - addOrUpdateFlows(fe); 488 + addOrUpdateFlows(fe);
489 - }); 489 + });
490 - } 490 + }
491 - 491 +
492 - /** 492 + /**
493 - * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)). 493 + * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
494 - * 494 + *
495 - * @return xid of missing flow 495 + * @return xid of missing flow
496 - */ 496 + */
497 - public long getFlowMissingXid() { 497 + public long getFlowMissingXid() {
498 - return flowMissingXid; 498 + return flowMissingXid;
499 - } 499 + }
500 - 500 +
501 - /** 501 + /**
502 - * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id. 502 + * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
503 - * 503 + *
504 - * @param flowMissingXid the OFFlowStatsRequest message Id 504 + * @param flowMissingXid the OFFlowStatsRequest message Id
505 - * 505 + *
506 - */ 506 + */
507 - public void setFlowMissingXid(long flowMissingXid) { 507 + public void setFlowMissingXid(long flowMissingXid) {
508 - this.flowMissingXid = flowMissingXid; 508 + this.flowMissingXid = flowMissingXid;
509 - } 509 + }
510 - 510 +
511 - private class InternalDeviceFlowTable { 511 + private class InternalDeviceFlowTable {
512 - 512 +
513 - private final Map<FlowId, Set<TypedStoredFlowEntry>> 513 + private final Map<FlowId, Set<TypedStoredFlowEntry>>
514 - flowEntries = Maps.newConcurrentMap(); 514 + flowEntries = Maps.newConcurrentMap();
515 - 515 +
516 - private final Set<StoredFlowEntry> shortFlows = new HashSet<>(); 516 + private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
517 - private final Set<StoredFlowEntry> midFlows = new HashSet<>(); 517 + private final Set<StoredFlowEntry> midFlows = new HashSet<>();
518 - private final Set<StoredFlowEntry> longFlows = new HashSet<>(); 518 + private final Set<StoredFlowEntry> longFlows = new HashSet<>();
519 - 519 +
520 - // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply 520 + // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
521 - private final long latencyFlowStatsRequestAndReplyMillis = 500; 521 + private final long latencyFlowStatsRequestAndReplyMillis = 500;
522 - 522 +
523 - 523 +
524 - // Statistics for table operation 524 + // Statistics for table operation
525 - private long addCount = 0, addWithSetFlowLiveTypeCount = 0; 525 + private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
526 - private long removeCount = 0; 526 + private long removeCount = 0;
527 - 527 +
528 - /** 528 + /**
529 - * Resets all count values with zero. 529 + * Resets all count values with zero.
530 - * 530 + *
531 - */ 531 + */
532 - public void resetAllCount() { 532 + public void resetAllCount() {
533 - addCount = 0; 533 + addCount = 0;
534 - addWithSetFlowLiveTypeCount = 0; 534 + addWithSetFlowLiveTypeCount = 0;
535 - removeCount = 0; 535 + removeCount = 0;
536 - } 536 + }
537 - 537 +
538 - // get set of flow entries for the given flowId 538 + // get set of flow entries for the given flowId
539 - private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) { 539 + private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
540 - return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet()); 540 + return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
541 - } 541 + }
542 - 542 +
543 - // get flow entry for the given flow rule 543 + // get flow entry for the given flow rule
544 - private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) { 544 + private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
545 - Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id()); 545 + Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
546 - return flowEntries.stream() 546 + return flowEntries.stream()
547 - .filter(entry -> Objects.equal(entry, rule)) 547 + .filter(entry -> Objects.equal(entry, rule))
548 - .findAny() 548 + .findAny()
549 - .orElse(null); 549 + .orElse(null);
550 - } 550 + }
551 - 551 +
552 - // get the flow entries for all flows in flow table 552 + // get the flow entries for all flows in flow table
553 - private Set<TypedStoredFlowEntry> getFlowEntriesInternal() { 553 + private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
554 - Set<TypedStoredFlowEntry> result = Sets.newHashSet(); 554 + Set<TypedStoredFlowEntry> result = Sets.newHashSet();
555 - 555 +
556 - flowEntries.values().forEach(result::addAll); 556 + flowEntries.values().forEach(result::addAll);
557 - return result; 557 + return result;
558 - } 558 + }
559 - 559 +
560 - /** 560 + /**
561 - * Gets the number of flow entry in flow table. 561 + * Gets the number of flow entry in flow table.
562 - * 562 + *
563 - * @return the number of flow entry. 563 + * @return the number of flow entry.
564 - * 564 + *
565 - */ 565 + */
566 - public long getFlowCount() { 566 + public long getFlowCount() {
567 - return flowEntries.values().stream().mapToLong(Set::size).sum(); 567 + return flowEntries.values().stream().mapToLong(Set::size).sum();
568 - } 568 + }
569 - 569 +
570 - /** 570 + /**
571 - * Gets the number of flow entry in flow table. 571 + * Gets the number of flow entry in flow table.
572 - * 572 + *
573 - * @param rule the flow rule 573 + * @param rule the flow rule
574 - * @return the typed flow entry. 574 + * @return the typed flow entry.
575 - * 575 + *
576 - */ 576 + */
577 - public TypedStoredFlowEntry getFlowEntry(FlowRule rule) { 577 + public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
578 - checkNotNull(rule); 578 + checkNotNull(rule);
579 - 579 +
580 - return getFlowEntryInternal(rule); 580 + return getFlowEntryInternal(rule);
581 - } 581 + }
582 - 582 +
583 - /** 583 + /**
584 - * Gets the all typed flow entries in flow table. 584 + * Gets the all typed flow entries in flow table.
585 - * 585 + *
586 - * @return the set of typed flow entry. 586 + * @return the set of typed flow entry.
587 - * 587 + *
588 - */ 588 + */
589 - public Set<TypedStoredFlowEntry> getFlowEntries() { 589 + public Set<TypedStoredFlowEntry> getFlowEntries() {
590 - return getFlowEntriesInternal(); 590 + return getFlowEntriesInternal();
591 - } 591 + }
592 - 592 +
593 - /** 593 + /**
594 - * Gets the short typed flow entries in flow table. 594 + * Gets the short typed flow entries in flow table.
595 - * 595 + *
596 - * @return the set of typed flow entry. 596 + * @return the set of typed flow entry.
597 - * 597 + *
598 - */ 598 + */
599 - public Set<StoredFlowEntry> getShortFlows() { 599 + public Set<StoredFlowEntry> getShortFlows() {
600 - return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows); 600 + return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
601 - } 601 + }
602 - 602 +
603 - /** 603 + /**
604 - * Gets the mid typed flow entries in flow table. 604 + * Gets the mid typed flow entries in flow table.
605 - * 605 + *
606 - * @return the set of typed flow entry. 606 + * @return the set of typed flow entry.
607 - * 607 + *
608 - */ 608 + */
609 - public Set<StoredFlowEntry> getMidFlows() { 609 + public Set<StoredFlowEntry> getMidFlows() {
610 - return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows); 610 + return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
611 - } 611 + }
612 - 612 +
613 - /** 613 + /**
614 - * Gets the long typed flow entries in flow table. 614 + * Gets the long typed flow entries in flow table.
615 - * 615 + *
616 - * @return the set of typed flow entry. 616 + * @return the set of typed flow entry.
617 - * 617 + *
618 - */ 618 + */
619 - public Set<StoredFlowEntry> getLongFlows() { 619 + public Set<StoredFlowEntry> getLongFlows() {
620 - return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows); 620 + return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
621 - } 621 + }
622 - 622 +
623 - /** 623 + /**
624 - * Add typed flow entry into table only. 624 + * Add typed flow entry into table only.
625 - * 625 + *
626 - * @param rule the flow rule 626 + * @param rule the flow rule
627 - * 627 + *
628 - */ 628 + */
629 - public synchronized void add(TypedStoredFlowEntry rule) { 629 + public synchronized void add(TypedStoredFlowEntry rule) {
630 - checkNotNull(rule); 630 + checkNotNull(rule);
631 - 631 +
632 - //rule have to be new DefaultTypedFlowEntry 632 + //rule have to be new DefaultTypedFlowEntry
633 - boolean result = getFlowEntriesInternal(rule.id()).add(rule); 633 + boolean result = getFlowEntriesInternal(rule.id()).add(rule);
634 - 634 +
635 - if (result) { 635 + if (result) {
636 - addCount++; 636 + addCount++;
637 - } 637 + }
638 - } 638 + }
639 - 639 +
640 - /** 640 + /**
641 - * Calculates and set the flow live type at the first time, 641 + * Calculates and set the flow live type at the first time,
642 - * and then add it into a corresponding typed flow table. 642 + * and then add it into a corresponding typed flow table.
643 - * 643 + *
644 - * @param rule the flow rule 644 + * @param rule the flow rule
645 - * 645 + *
646 - */ 646 + */
647 - public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) { 647 + public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
648 - checkNotNull(rule); 648 + checkNotNull(rule);
649 - 649 +
650 - calAndSetFlowLiveTypeInternal(rule); 650 + calAndSetFlowLiveTypeInternal(rule);
651 - } 651 + }
652 - 652 +
653 - /** 653 + /**
654 - * Add the typed flow entry into table, and calculates and set the flow live type, 654 + * Add the typed flow entry into table, and calculates and set the flow live type,
655 - * and then add it into a corresponding typed flow table. 655 + * and then add it into a corresponding typed flow table.
656 - * 656 + *
657 - * @param rule the flow rule 657 + * @param rule the flow rule
658 - * 658 + *
659 - */ 659 + */
660 - public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) { 660 + public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
661 - checkNotNull(rule); 661 + checkNotNull(rule);
662 - 662 +
663 - //rule have to be new DefaultTypedFlowEntry 663 + //rule have to be new DefaultTypedFlowEntry
664 - boolean result = getFlowEntriesInternal(rule.id()).add(rule); 664 + boolean result = getFlowEntriesInternal(rule.id()).add(rule);
665 - if (result) { 665 + if (result) {
666 - calAndSetFlowLiveTypeInternal(rule); 666 + calAndSetFlowLiveTypeInternal(rule);
667 - addWithSetFlowLiveTypeCount++; 667 + addWithSetFlowLiveTypeCount++;
668 - } else { 668 + } else {
669 - log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value()) 669 + log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
670 - + " ADD Failed, cause it may already exists in table !!!," 670 + + " ADD Failed, cause it may already exists in table !!!,"
671 - + " AdaptiveStats collection thread for {}", 671 + + " AdaptiveStats collection thread for {}",
672 - sw.getStringId()); 672 + sw.getStringId());
673 - } 673 + }
674 - } 674 + }
675 - 675 +
676 - // In real, calculates and set the flow live type at the first time, 676 + // In real, calculates and set the flow live type at the first time,
677 - // and then add it into a corresponding typed flow table 677 + // and then add it into a corresponding typed flow table
678 - private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) { 678 + private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
679 - long life = rule.life(); 679 + long life = rule.life();
680 - FlowLiveType prevFlowLiveType = rule.flowLiveType(); 680 + FlowLiveType prevFlowLiveType = rule.flowLiveType();
681 - 681 +
682 - if (life >= longPollInterval) { 682 + if (life >= longPollInterval) {
683 - rule.setFlowLiveType(FlowLiveType.LONG_FLOW); 683 + rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
684 - longFlows.add(rule); 684 + longFlows.add(rule);
685 - } else if (life >= midPollInterval) { 685 + } else if (life >= midPollInterval) {
686 - rule.setFlowLiveType(FlowLiveType.MID_FLOW); 686 + rule.setFlowLiveType(FlowLiveType.MID_FLOW);
687 - midFlows.add(rule); 687 + midFlows.add(rule);
688 - } else if (life >= calAndPollInterval) { 688 + } else if (life >= calAndPollInterval) {
689 - rule.setFlowLiveType(FlowLiveType.SHORT_FLOW); 689 + rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
690 - shortFlows.add(rule); 690 + shortFlows.add(rule);
691 - } else if (life >= 0) { 691 + } else if (life >= 0) {
692 - rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW); 692 + rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
693 - } else { // life < 0 693 + } else { // life < 0
694 - rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW); 694 + rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
695 - } 695 + }
696 - 696 +
697 - if (rule.flowLiveType() != prevFlowLiveType) { 697 + if (rule.flowLiveType() != prevFlowLiveType) {
698 - switch (prevFlowLiveType) { 698 + switch (prevFlowLiveType) {
699 - // delete it from previous flow table 699 + // delete it from previous flow table
700 - case SHORT_FLOW: 700 + case SHORT_FLOW:
701 - shortFlows.remove(rule); 701 + shortFlows.remove(rule);
702 - break; 702 + break;
703 - case MID_FLOW: 703 + case MID_FLOW:
704 - midFlows.remove(rule); 704 + midFlows.remove(rule);
705 - break; 705 + break;
706 - case LONG_FLOW: 706 + case LONG_FLOW:
707 - longFlows.remove(rule); 707 + longFlows.remove(rule);
708 - break; 708 + break;
709 - default: 709 + default:
710 - break; 710 + break;
711 - } 711 + }
712 - } 712 + }
713 - } 713 + }
714 - 714 +
715 - 715 +
716 - // check the flow live type based on current time, then set and add it into corresponding table 716 + // check the flow live type based on current time, then set and add it into corresponding table
717 - private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) { 717 + private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
718 - long curTime = (cTime > 0 ? cTime : System.currentTimeMillis()); 718 + long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
719 - // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply 719 + // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
720 - long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000); 720 + long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
721 - // fe.life() unit is SECOND! 721 + // fe.life() unit is SECOND!
722 - long liveTime = fe.life() + fromLastSeen; 722 + long liveTime = fe.life() + fromLastSeen;
723 - 723 +
724 - 724 +
725 - switch (fe.flowLiveType()) { 725 + switch (fe.flowLiveType()) {
726 - case IMMEDIATE_FLOW: 726 + case IMMEDIATE_FLOW:
727 - if (liveTime >= longPollInterval) { 727 + if (liveTime >= longPollInterval) {
728 - fe.setFlowLiveType(FlowLiveType.LONG_FLOW); 728 + fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
729 - longFlows.add(fe); 729 + longFlows.add(fe);
730 - } else if (liveTime >= midPollInterval) { 730 + } else if (liveTime >= midPollInterval) {
731 - fe.setFlowLiveType(FlowLiveType.MID_FLOW); 731 + fe.setFlowLiveType(FlowLiveType.MID_FLOW);
732 - midFlows.add(fe); 732 + midFlows.add(fe);
733 - } else if (liveTime >= calAndPollInterval) { 733 + } else if (liveTime >= calAndPollInterval) {
734 - fe.setFlowLiveType(FlowLiveType.SHORT_FLOW); 734 + fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
735 - shortFlows.add(fe); 735 + shortFlows.add(fe);
736 - } 736 + }
737 - break; 737 + break;
738 - case SHORT_FLOW: 738 + case SHORT_FLOW:
739 - if (liveTime >= longPollInterval) { 739 + if (liveTime >= longPollInterval) {
740 - fe.setFlowLiveType(FlowLiveType.LONG_FLOW); 740 + fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
741 - shortFlows.remove(fe); 741 + shortFlows.remove(fe);
742 - longFlows.add(fe); 742 + longFlows.add(fe);
743 - } else if (liveTime >= midPollInterval) { 743 + } else if (liveTime >= midPollInterval) {
744 - fe.setFlowLiveType(FlowLiveType.MID_FLOW); 744 + fe.setFlowLiveType(FlowLiveType.MID_FLOW);
745 - shortFlows.remove(fe); 745 + shortFlows.remove(fe);
746 - midFlows.add(fe); 746 + midFlows.add(fe);
747 - } 747 + }
748 - break; 748 + break;
749 - case MID_FLOW: 749 + case MID_FLOW:
750 - if (liveTime >= longPollInterval) { 750 + if (liveTime >= longPollInterval) {
751 - fe.setFlowLiveType(FlowLiveType.LONG_FLOW); 751 + fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
752 - midFlows.remove(fe); 752 + midFlows.remove(fe);
753 - longFlows.add(fe); 753 + longFlows.add(fe);
754 - } 754 + }
755 - break; 755 + break;
756 - case LONG_FLOW: 756 + case LONG_FLOW:
757 - if (fromLastSeen > entirePollInterval) { 757 + if (fromLastSeen > entirePollInterval) {
758 - log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch."); 758 + log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
759 - return false; 759 + return false;
760 - } 760 + }
761 - break; 761 + break;
762 - case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through 762 + case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
763 - default : 763 + default :
764 - // Error Unknown Live Type 764 + // Error Unknown Live Type
765 - log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!" 765 + log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
766 - + "AdaptiveStats collection thread for {}", 766 + + "AdaptiveStats collection thread for {}",
767 - sw.getStringId()); 767 + sw.getStringId());
768 - return false; 768 + return false;
769 - } 769 + }
770 - 770 +
771 - log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value()) 771 + log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
772 - + ", state=" + fe.state() 772 + + ", state=" + fe.state()
773 - + ", After liveType=" + fe.flowLiveType() 773 + + ", After liveType=" + fe.flowLiveType()
774 - + ", liveTime=" + liveTime 774 + + ", liveTime=" + liveTime
775 - + ", life=" + fe.life() 775 + + ", life=" + fe.life()
776 - + ", bytes=" + fe.bytes() 776 + + ", bytes=" + fe.bytes()
777 - + ", packets=" + fe.packets() 777 + + ", packets=" + fe.packets()
778 - + ", fromLastSeen=" + fromLastSeen 778 + + ", fromLastSeen=" + fromLastSeen
779 - + ", priority=" + fe.priority() 779 + + ", priority=" + fe.priority()
780 - + ", selector=" + fe.selector().criteria() 780 + + ", selector=" + fe.selector().criteria()
781 - + ", treatment=" + fe.treatment() 781 + + ", treatment=" + fe.treatment()
782 - + " AdaptiveStats collection thread for {}", 782 + + " AdaptiveStats collection thread for {}",
783 - sw.getStringId()); 783 + sw.getStringId());
784 - 784 +
785 - return true; 785 + return true;
786 - } 786 + }
787 - 787 +
788 - /** 788 + /**
789 - * Check and move live type for all type flow entries in table at every calAndPollInterval time. 789 + * Check and move live type for all type flow entries in table at every calAndPollInterval time.
790 - * 790 + *
791 - */ 791 + */
792 - public void checkAndMoveLiveFlowAll() { 792 + public void checkAndMoveLiveFlowAll() {
793 - Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal(); 793 + Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
794 - 794 +
795 - long calCurTime = System.currentTimeMillis(); 795 + long calCurTime = System.currentTimeMillis();
796 - typedFlowEntries.forEach(fe -> { 796 + typedFlowEntries.forEach(fe -> {
797 - if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) { 797 + if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
798 - remove(fe); 798 + remove(fe);
799 - } 799 + }
800 - }); 800 + });
801 - 801 +
802 - // print table counts for debug 802 + // print table counts for debug
803 - if (log.isDebugEnabled()) { 803 + if (log.isDebugEnabled()) {
804 - synchronized (this) { 804 + synchronized (this) {
805 - long totalFlowCount = getFlowCount(); 805 + long totalFlowCount = getFlowCount();
806 - long shortFlowCount = shortFlows.size(); 806 + long shortFlowCount = shortFlows.size();
807 - long midFlowCount = midFlows.size(); 807 + long midFlowCount = midFlows.size();
808 - long longFlowCount = longFlows.size(); 808 + long longFlowCount = longFlows.size();
809 - long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount; 809 + long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
810 - long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount; 810 + long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
811 - 811 +
812 - log.debug("--------------------------------------------------------------------------- for {}", 812 + log.debug("--------------------------------------------------------------------------- for {}",
813 - sw.getStringId()); 813 + sw.getStringId());
814 - log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount 814 + log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
815 - + ", add - remove_Count=" + calTotalCount 815 + + ", add - remove_Count=" + calTotalCount
816 - + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount 816 + + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
817 - + ", SHORT_FLOW_Count=" + shortFlowCount 817 + + ", SHORT_FLOW_Count=" + shortFlowCount
818 - + ", MID_FLOW_Count=" + midFlowCount 818 + + ", MID_FLOW_Count=" + midFlowCount
819 - + ", LONG_FLOW_Count=" + longFlowCount 819 + + ", LONG_FLOW_Count=" + longFlowCount
820 - + ", add_Count=" + addCount 820 + + ", add_Count=" + addCount
821 - + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount 821 + + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
822 - + ", remove_Count=" + removeCount 822 + + ", remove_Count=" + removeCount
823 - + " AdaptiveStats collection thread for {}", sw.getStringId()); 823 + + " AdaptiveStats collection thread for {}", sw.getStringId());
824 - log.debug("--------------------------------------------------------------------------- for {}", 824 + log.debug("--------------------------------------------------------------------------- for {}",
825 - sw.getStringId()); 825 + sw.getStringId());
826 - if (totalFlowCount != calTotalCount) { 826 + if (totalFlowCount != calTotalCount) {
827 - log.error("checkAndMoveLiveFlowAll, Real total flow count and " 827 + log.error("checkAndMoveLiveFlowAll, Real total flow count and "
828 - + "calculated total flow count do NOT match, something is wrong internally " 828 + + "calculated total flow count do NOT match, something is wrong internally "
829 - + "or check counter value bound is over!"); 829 + + "or check counter value bound is over!");
830 - } 830 + }
831 - if (immediateFlowCount < 0) { 831 + if (immediateFlowCount < 0) {
832 - log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, " 832 + log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
833 - + "something is wrong internally " 833 + + "something is wrong internally "
834 - + "or check counter value bound is over!"); 834 + + "or check counter value bound is over!");
835 - } 835 + }
836 - } 836 + }
837 - } 837 + }
838 - log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId()); 838 + log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
839 - } 839 + }
840 - 840 +
841 - /** 841 + /**
842 - * Remove the typed flow entry from table. 842 + * Remove the typed flow entry from table.
843 - * 843 + *
844 - * @param rule the flow rule 844 + * @param rule the flow rule
845 - * 845 + *
846 - */ 846 + */
847 - public synchronized void remove(FlowRule rule) { 847 + public synchronized void remove(FlowRule rule) {
848 - checkNotNull(rule); 848 + checkNotNull(rule);
849 - 849 +
850 - TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule); 850 + TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
851 - if (removeStore != null) { 851 + if (removeStore != null) {
852 - removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore); 852 + removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
853 - boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore); 853 + boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
854 - 854 +
855 - if (result) { 855 + if (result) {
856 - removeCount++; 856 + removeCount++;
857 - } 857 + }
858 - } 858 + }
859 - } 859 + }
860 - 860 +
861 - // Remove the typed flow entry from corresponding table 861 + // Remove the typed flow entry from corresponding table
862 - private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) { 862 + private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
863 - switch (fe.flowLiveType()) { 863 + switch (fe.flowLiveType()) {
864 - case IMMEDIATE_FLOW: 864 + case IMMEDIATE_FLOW:
865 - // do nothing 865 + // do nothing
866 - break; 866 + break;
867 - case SHORT_FLOW: 867 + case SHORT_FLOW:
868 - shortFlows.remove(fe); 868 + shortFlows.remove(fe);
869 - break; 869 + break;
870 - case MID_FLOW: 870 + case MID_FLOW:
871 - midFlows.remove(fe); 871 + midFlows.remove(fe);
872 - break; 872 + break;
873 - case LONG_FLOW: 873 + case LONG_FLOW:
874 - longFlows.remove(fe); 874 + longFlows.remove(fe);
875 - break; 875 + break;
876 - default: // error in Flow Live Type 876 + default: // error in Flow Live Type
877 - log.error("removeLiveFlowsInternal, Unknown Live Type error!"); 877 + log.error("removeLiveFlowsInternal, Unknown Live Type error!");
878 - break; 878 + break;
879 - } 879 + }
880 - } 880 + }
881 - } 881 + }
882 -} 882 +}
......