Madan Jampani
Committed by Gerrit Code Review

Log failures in state machine processing

Change-Id: Ib92768cf4cf5cce5e2642265d1c1aa3e2f13b246
...@@ -259,36 +259,41 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se ...@@ -259,36 +259,41 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
259 * @return update result 259 * @return update result
260 */ 260 */
261 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) { 261 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
262 - MapEntryUpdateResult.Status updateStatus = validate(commit.operation()); 262 + try {
263 - String key = commit.operation().key(); 263 + MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
264 - MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key()); 264 + String key = commit.operation().key();
265 - Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue); 265 + MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
266 + Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
266 267
267 - if (updateStatus != MapEntryUpdateResult.Status.OK) { 268 + if (updateStatus != MapEntryUpdateResult.Status.OK) {
268 - commit.close(); 269 + commit.close();
269 - return new MapEntryUpdateResult<>(updateStatus, "", key, 270 + return new MapEntryUpdateResult<>(updateStatus, "", key,
270 - oldMapValue, oldMapValue); 271 + oldMapValue, oldMapValue);
271 - } 272 + }
272 273
273 - byte[] newValue = commit.operation().value(); 274 + byte[] newValue = commit.operation().value();
274 - long newVersion = versionCounter.incrementAndGet(); 275 + long newVersion = versionCounter.incrementAndGet();
275 - Versioned<byte[]> newMapValue = newValue == null ? null 276 + Versioned<byte[]> newMapValue = newValue == null ? null
276 - : new Versioned<>(newValue, newVersion); 277 + : new Versioned<>(newValue, newVersion);
277 278
278 - MapEvent.Type updateType = newValue == null ? REMOVE 279 + MapEvent.Type updateType = newValue == null ? REMOVE
279 - : oldCommitValue == null ? INSERT : UPDATE; 280 + : oldCommitValue == null ? INSERT : UPDATE;
280 - if (updateType == REMOVE || updateType == UPDATE) { 281 + if (updateType == REMOVE || updateType == UPDATE) {
281 - mapEntries.remove(key); 282 + mapEntries.remove(key);
282 - oldCommitValue.discard(); 283 + oldCommitValue.discard();
283 - } 284 + }
284 - if (updateType == INSERT || updateType == UPDATE) { 285 + if (updateType == INSERT || updateType == UPDATE) {
285 - mapEntries.put(key, new NonTransactionalCommit(newVersion, commit)); 286 + mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
286 - } else { 287 + } else {
287 - commit.close(); 288 + commit.close();
289 + }
290 + publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
291 + return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
292 + newMapValue);
293 + } catch (Exception e) {
294 + log.error("State machine operation failed", e);
295 + throw Throwables.propagate(e);
288 } 296 }
289 - publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
290 - return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
291 - newMapValue);
292 } 297 }
293 298
294 /** 299 /**
......
...@@ -60,6 +60,7 @@ import org.slf4j.Logger; ...@@ -60,6 +60,7 @@ import org.slf4j.Logger;
60 60
61 import com.google.common.base.MoreObjects; 61 import com.google.common.base.MoreObjects;
62 import com.google.common.base.Objects; 62 import com.google.common.base.Objects;
63 +import com.google.common.base.Throwables;
63 import com.google.common.collect.Lists; 64 import com.google.common.collect.Lists;
64 import com.google.common.collect.Maps; 65 import com.google.common.collect.Maps;
65 66
...@@ -172,6 +173,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -172,6 +173,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
172 notifyLeadershipChange(oldLeadership, newLeadership); 173 notifyLeadershipChange(oldLeadership, newLeadership);
173 } 174 }
174 return newLeadership; 175 return newLeadership;
176 + } catch (Exception e) {
177 + log.error("State machine operation failed", e);
178 + throw Throwables.propagate(e);
175 } finally { 179 } finally {
176 commit.close(); 180 commit.close();
177 } 181 }
...@@ -191,6 +195,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -191,6 +195,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
191 if (!Objects.equal(oldLeadership, newLeadership)) { 195 if (!Objects.equal(oldLeadership, newLeadership)) {
192 notifyLeadershipChange(oldLeadership, newLeadership); 196 notifyLeadershipChange(oldLeadership, newLeadership);
193 } 197 }
198 + } catch (Exception e) {
199 + log.error("State machine operation failed", e);
200 + throw Throwables.propagate(e);
194 } finally { 201 } finally {
195 commit.close(); 202 commit.close();
196 } 203 }
...@@ -215,6 +222,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -215,6 +222,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
215 return (electionState != null && 222 return (electionState != null &&
216 electionState.leader() != null && 223 electionState.leader() != null &&
217 commit.operation().nodeId().equals(electionState.leader().nodeId())); 224 commit.operation().nodeId().equals(electionState.leader().nodeId()));
225 + } catch (Exception e) {
226 + log.error("State machine operation failed", e);
227 + throw Throwables.propagate(e);
218 } finally { 228 } finally {
219 commit.close(); 229 commit.close();
220 } 230 }
...@@ -239,6 +249,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -239,6 +249,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
239 notifyLeadershipChange(oldLeadership, newLeadership); 249 notifyLeadershipChange(oldLeadership, newLeadership);
240 } 250 }
241 return true; 251 return true;
252 + } catch (Exception e) {
253 + log.error("State machine operation failed", e);
254 + throw Throwables.propagate(e);
242 } finally { 255 } finally {
243 commit.close(); 256 commit.close();
244 } 257 }
...@@ -262,6 +275,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -262,6 +275,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
262 } 275 }
263 }); 276 });
264 notifyLeadershipChanges(changes); 277 notifyLeadershipChanges(changes);
278 + } catch (Exception e) {
279 + log.error("State machine operation failed", e);
280 + throw Throwables.propagate(e);
265 } finally { 281 } finally {
266 commit.close(); 282 commit.close();
267 } 283 }
...@@ -276,6 +292,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -276,6 +292,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
276 String topic = commit.operation().topic(); 292 String topic = commit.operation().topic();
277 try { 293 try {
278 return leadership(topic); 294 return leadership(topic);
295 + } catch (Exception e) {
296 + log.error("State machine operation failed", e);
297 + throw Throwables.propagate(e);
279 } finally { 298 } finally {
280 commit.close(); 299 commit.close();
281 } 300 }
...@@ -293,6 +312,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -293,6 +312,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
293 Leader leader = leadership(e.getKey()).leader(); 312 Leader leader = leadership(e.getKey()).leader();
294 return leader != null && leader.nodeId().equals(nodeId); 313 return leader != null && leader.nodeId().equals(nodeId);
295 }).keySet()); 314 }).keySet());
315 + } catch (Exception e) {
316 + log.error("State machine operation failed", e);
317 + throw Throwables.propagate(e);
296 } finally { 318 } finally {
297 commit.close(); 319 commit.close();
298 } 320 }
...@@ -308,6 +330,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -308,6 +330,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
308 try { 330 try {
309 result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k))); 331 result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
310 return result; 332 return result;
333 + } catch (Exception e) {
334 + log.error("State machine operation failed", e);
335 + throw Throwables.propagate(e);
311 } finally { 336 } finally {
312 commit.close(); 337 commit.close();
313 } 338 }
......