EventuallyConsistentMapImpl: pushing serialization and sending to caller thread
This has the effect of limiting the caller so that it can't overrun the single thread that previously did the job. If you let this back up, it will use all of your memory. :( Change-Id: I0a3b93cfa7004e0430d228a68c60e2b7ba966d4e
Showing
1 changed file
with
8 additions
and
2 deletions
... | @@ -458,18 +458,24 @@ public class EventuallyConsistentMapImpl<K, V> | ... | @@ -458,18 +458,24 @@ public class EventuallyConsistentMapImpl<K, V> |
458 | } | 458 | } |
459 | 459 | ||
460 | private void notifyPeers(InternalPutEvent event) { | 460 | private void notifyPeers(InternalPutEvent event) { |
461 | - broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event)); | 461 | + // FIXME extremely memory expensive when we are overrun |
462 | +// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event)); | ||
463 | + broadcastMessage(updateMessageSubject, event); | ||
462 | } | 464 | } |
463 | 465 | ||
464 | private void notifyPeers(InternalRemoveEvent event) { | 466 | private void notifyPeers(InternalRemoveEvent event) { |
465 | - broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event)); | 467 | + // FIXME extremely memory expensive when we are overrun |
468 | +// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event)); | ||
469 | + broadcastMessage(removeMessageSubject, event); | ||
466 | } | 470 | } |
467 | 471 | ||
468 | private void broadcastMessage(MessageSubject subject, Object event) { | 472 | private void broadcastMessage(MessageSubject subject, Object event) { |
473 | + // FIXME can we parallelize the serialization... use the caller??? | ||
469 | ClusterMessage message = new ClusterMessage( | 474 | ClusterMessage message = new ClusterMessage( |
470 | clusterService.getLocalNode().id(), | 475 | clusterService.getLocalNode().id(), |
471 | subject, | 476 | subject, |
472 | serializer.encode(event)); | 477 | serializer.encode(event)); |
478 | + //broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message)); | ||
473 | clusterCommunicator.broadcast(message); | 479 | clusterCommunicator.broadcast(message); |
474 | } | 480 | } |
475 | 481 | ... | ... |
-
Please register or login to post a comment