Madan Jampani
Committed by Gerrit Code Review

Fixes NPE in map event listener

Minor logging improvements in IntentPartitionManager

Change-Id: I7b41428c5b56fcb7f98850f50a804468743b984a
......@@ -72,6 +72,7 @@ public class IntentPartitionManager implements IntentPartitionService {
private static final String ELECTION_PREFIX = "intent-partition-";
protected NodeId localNodeId;
private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
......@@ -80,6 +81,7 @@ public class IntentPartitionManager implements IntentPartitionService {
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leaderListener);
listenerRegistry = new ListenerRegistry<>();
......@@ -87,10 +89,12 @@ public class IntentPartitionManager implements IntentPartitionService {
for (int i = 0; i < NUM_PARTITIONS; i++) {
leadershipService.runForLeadership(getPartitionPath(i));
log.debug("Registered to run for {}", getPartitionPath(i));
}
executor.scheduleAtFixedRate(() -> scheduleRebalance(0), 0,
CHECK_PARTITION_BALANCE_PERIOD_SEC, TimeUnit.SECONDS);
log.info("Started");
}
@Deactivate
......@@ -99,6 +103,7 @@ public class IntentPartitionManager implements IntentPartitionService {
eventDispatcher.removeSink(IntentPartitionEvent.class);
leadershipService.removeListener(leaderListener);
log.info("Stopped");
}
/**
......@@ -132,7 +137,7 @@ public class IntentPartitionManager implements IntentPartitionService {
@Override
public boolean isMine(Key intentKey) {
return Objects.equals(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
clusterService.getLocalNode().id());
localNodeId);
}
@Override
......@@ -175,7 +180,7 @@ public class IntentPartitionManager implements IntentPartitionService {
List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
.stream()
.filter(l -> clusterService.getLocalNode().id().equals(l.leaderNodeId()))
.filter(l -> localNodeId.equals(l.leaderNodeId()))
.filter(l -> l.topic().startsWith(ELECTION_PREFIX))
.collect(Collectors.toList());
......@@ -215,7 +220,7 @@ public class IntentPartitionManager implements IntentPartitionService {
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
if (Objects.equals(leadership.leaderNodeId(), clusterService.getLocalNode().id()) &&
if (Objects.equals(leadership.leaderNodeId(), localNodeId) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
......
......@@ -87,6 +87,7 @@ public class IntentPartitionManagerTest {
.withScheduledExecutor(new NullScheduledExecutor());
partitionManager.clusterService = new TestClusterService();
partitionManager.localNodeId = MY_NODE_ID;
partitionManager.leadershipService = leadershipService;
partitionManager.eventDispatcher = new TestEventDispatcher();
}
......
......@@ -208,8 +208,8 @@ public class TranscodingAsyncConsistentMap<K1, V1, K2, V2> implements AsyncConsi
public void event(MapEvent<K2, V2> event) {
listener.event(new MapEvent<K1, V1>(event.name(),
keyDecoder.apply(event.key()),
event.newValue().map(valueDecoder),
event.oldValue().map(valueDecoder)));
event.newValue() != null ? event.newValue().map(valueDecoder) : null,
event.oldValue() != null ? event.oldValue().map(valueDecoder) : null));
}
}
}
......