Madan Jampani

Log failures in state machine processing

Change-Id: Ib92768cf4cf5cce5e2642265d1c1aa3e2f13b246
......@@ -259,36 +259,41 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
* @return update result
*/
protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
String key = commit.operation().key();
MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
try {
MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
String key = commit.operation().key();
MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
if (updateStatus != MapEntryUpdateResult.Status.OK) {
commit.close();
return new MapEntryUpdateResult<>(updateStatus, "", key,
oldMapValue, oldMapValue);
}
if (updateStatus != MapEntryUpdateResult.Status.OK) {
commit.close();
return new MapEntryUpdateResult<>(updateStatus, "", key,
oldMapValue, oldMapValue);
}
byte[] newValue = commit.operation().value();
long newVersion = versionCounter.incrementAndGet();
Versioned<byte[]> newMapValue = newValue == null ? null
: new Versioned<>(newValue, newVersion);
byte[] newValue = commit.operation().value();
long newVersion = versionCounter.incrementAndGet();
Versioned<byte[]> newMapValue = newValue == null ? null
: new Versioned<>(newValue, newVersion);
MapEvent.Type updateType = newValue == null ? REMOVE
: oldCommitValue == null ? INSERT : UPDATE;
if (updateType == REMOVE || updateType == UPDATE) {
mapEntries.remove(key);
oldCommitValue.discard();
}
if (updateType == INSERT || updateType == UPDATE) {
mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
} else {
commit.close();
MapEvent.Type updateType = newValue == null ? REMOVE
: oldCommitValue == null ? INSERT : UPDATE;
if (updateType == REMOVE || updateType == UPDATE) {
mapEntries.remove(key);
oldCommitValue.discard();
}
if (updateType == INSERT || updateType == UPDATE) {
mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
} else {
commit.close();
}
publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
newMapValue);
} catch (Exception e) {
log.error("State machine operation failed", e);
throw Throwables.propagate(e);
}
publish(Lists.newArrayList(new MapEvent<>("", key, newMapValue, oldMapValue)));
return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
newMapValue);
}
/**
......
......@@ -58,6 +58,7 @@ import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
......@@ -170,6 +171,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
notifyLeadershipChange(oldLeadership, newLeadership);
}
return newLeadership;
} catch (Exception e) {
log.error("State machine operation failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
......@@ -189,6 +193,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
}
} catch (Exception e) {
log.error("State machine operation failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
......@@ -213,6 +220,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
return (electionState != null &&
electionState.leader() != null &&
commit.operation().nodeId().equals(electionState.leader().nodeId()));
} catch (Exception e) {
log.error("State machine operation failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
......@@ -237,6 +247,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
notifyLeadershipChange(oldLeadership, newLeadership);
}
return true;
} catch (Exception e) {
log.error("State machine operation failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
......@@ -260,6 +273,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
}
});
notifyLeadershipChanges(changes);
} catch (Exception e) {
log.error("State machine operation failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
......@@ -274,6 +290,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
String topic = commit.operation().topic();
try {
return leadership(topic);
} catch (Exception e) {
log.error("State machine operation failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
......@@ -291,6 +310,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
Leader leader = leadership(e.getKey()).leader();
return leader != null && leader.nodeId().equals(nodeId);
}).keySet();
} catch (Exception e) {
log.error("State machine operation failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
......@@ -306,6 +328,9 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
try {
result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
return result;
} catch (Exception e) {
log.error("State machine operation failed", e);
throw Throwables.propagate(e);
} finally {
commit.close();
}
......