Fix hashing logic for storge partitions to get good distribution
Change-Id: I06e935144d177f61c8c7a1598d735e54c5a858d0
Showing
1 changed file
with
6 additions
and
8 deletions
... | @@ -37,9 +37,7 @@ import com.google.common.collect.ImmutableSet; | ... | @@ -37,9 +37,7 @@ import com.google.common.collect.ImmutableSet; |
37 | import com.google.common.collect.Lists; | 37 | import com.google.common.collect.Lists; |
38 | import com.google.common.collect.Maps; | 38 | import com.google.common.collect.Maps; |
39 | import com.google.common.collect.Sets; | 39 | import com.google.common.collect.Sets; |
40 | -import com.google.common.hash.HashCode; | ||
41 | import com.google.common.hash.Hashing; | 40 | import com.google.common.hash.Hashing; |
42 | -import com.google.common.primitives.Bytes; | ||
43 | 41 | ||
44 | /** | 42 | /** |
45 | * {@code DistributedPrimitiveCreator} that federates responsibility for creating | 43 | * {@code DistributedPrimitiveCreator} that federates responsibility for creating |
... | @@ -63,8 +61,8 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv | ... | @@ -63,8 +61,8 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv |
63 | Maps.transformValues(members, | 61 | Maps.transformValues(members, |
64 | partition -> partition.newAsyncConsistentMap(name, serializer)); | 62 | partition -> partition.newAsyncConsistentMap(name, serializer)); |
65 | Hasher<K> hasher = key -> { | 63 | Hasher<K> hasher = key -> { |
66 | - long hashCode = HashCode.fromBytes(Bytes.ensureCapacity(serializer.encode(key), 8, 0)).asLong(); | 64 | + int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt(); |
67 | - return sortedMemberPartitionIds.get(Hashing.consistentHash(hashCode, members.size())); | 65 | + return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size()); |
68 | }; | 66 | }; |
69 | return new PartitionedAsyncConsistentMap<>(name, maps, hasher); | 67 | return new PartitionedAsyncConsistentMap<>(name, maps, hasher); |
70 | } | 68 | } |
... | @@ -96,8 +94,8 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv | ... | @@ -96,8 +94,8 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv |
96 | Maps.transformValues(members, | 94 | Maps.transformValues(members, |
97 | partition -> partition.newAsyncLeaderElector(name)); | 95 | partition -> partition.newAsyncLeaderElector(name)); |
98 | Hasher<String> hasher = topic -> { | 96 | Hasher<String> hasher = topic -> { |
99 | - long hashCode = HashCode.fromBytes(topic.getBytes(Charsets.UTF_8)).asLong(); | 97 | + int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt(); |
100 | - return sortedMemberPartitionIds.get(Hashing.consistentHash(hashCode, members.size())); | 98 | + return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size()); |
101 | }; | 99 | }; |
102 | return new PartitionedAsyncLeaderElector(name, leaderElectors, hasher); | 100 | return new PartitionedAsyncLeaderElector(name, leaderElectors, hasher); |
103 | } | 101 | } |
... | @@ -126,7 +124,7 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv | ... | @@ -126,7 +124,7 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv |
126 | * @return primitive creator | 124 | * @return primitive creator |
127 | */ | 125 | */ |
128 | private DistributedPrimitiveCreator getCreator(String name) { | 126 | private DistributedPrimitiveCreator getCreator(String name) { |
129 | - int index = Hashing.consistentHash(name.hashCode(), members.size()); | 127 | + int hashCode = Hashing.sha256().hashString(name, Charsets.UTF_8).asInt(); |
130 | - return members.get(sortedMemberPartitionIds.get(index)); | 128 | + return members.get(sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size())); |
131 | } | 129 | } |
132 | } | 130 | } | ... | ... |
-
Please register or login to post a comment