Committed by
Gerrit Code Review
Support for watching changes to static cluster metadata file
Change-Id: I5f9f89997288ca9a33a9e41f7520b875aceeffbe
Showing
1 changed file
with
62 additions
and
10 deletions
... | @@ -15,11 +15,20 @@ | ... | @@ -15,11 +15,20 @@ |
15 | */ | 15 | */ |
16 | package org.onosproject.cluster.impl; | 16 | package org.onosproject.cluster.impl; |
17 | 17 | ||
18 | +import static org.onlab.util.Tools.groupedThreads; | ||
18 | import static org.slf4j.LoggerFactory.getLogger; | 19 | import static org.slf4j.LoggerFactory.getLogger; |
19 | 20 | ||
20 | import java.io.File; | 21 | import java.io.File; |
21 | import java.io.IOException; | 22 | import java.io.IOException; |
23 | +import java.nio.file.FileSystems; | ||
24 | +import java.nio.file.Path; | ||
25 | +import java.nio.file.StandardWatchEventKinds; | ||
26 | +import java.nio.file.WatchEvent; | ||
27 | +import java.nio.file.WatchKey; | ||
28 | +import java.nio.file.WatchService; | ||
22 | import java.util.Set; | 29 | import java.util.Set; |
30 | +import java.util.concurrent.ExecutorService; | ||
31 | +import java.util.concurrent.Executors; | ||
23 | import java.util.concurrent.atomic.AtomicReference; | 32 | import java.util.concurrent.atomic.AtomicReference; |
24 | 33 | ||
25 | import org.apache.felix.scr.annotations.Activate; | 34 | import org.apache.felix.scr.annotations.Activate; |
... | @@ -71,13 +80,17 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr | ... | @@ -71,13 +80,17 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr |
71 | private static final String PORT = "port"; | 80 | private static final String PORT = "port"; |
72 | private static final String IP = "ip"; | 81 | private static final String IP = "ip"; |
73 | 82 | ||
74 | - private static final File CONFIG_FILE = new File("../config/cluster.json"); | 83 | + private static final String CONFIG_DIR = "../config"; |
84 | + private static final String CONFIG_FILE_NAME = "cluster.json"; | ||
85 | + private static final File CONFIG_FILE = new File(CONFIG_DIR, CONFIG_FILE_NAME); | ||
75 | 86 | ||
76 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 87 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
77 | protected ClusterMetadataProviderRegistry providerRegistry; | 88 | protected ClusterMetadataProviderRegistry providerRegistry; |
78 | 89 | ||
79 | private static final ProviderId PROVIDER_ID = new ProviderId("config", "none"); | 90 | private static final ProviderId PROVIDER_ID = new ProviderId("config", "none"); |
80 | - private AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>(); | 91 | + private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>(); |
92 | + private final ExecutorService configFileChangeDetector = | ||
93 | + Executors.newSingleThreadExecutor(groupedThreads("onos/cluster/metadata/config-watcher", "")); | ||
81 | 94 | ||
82 | private ObjectMapper mapper; | 95 | private ObjectMapper mapper; |
83 | private ClusterMetadataProviderService providerService; | 96 | private ClusterMetadataProviderService providerService; |
... | @@ -95,11 +108,20 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr | ... | @@ -95,11 +108,20 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr |
95 | module.addDeserializer(PartitionId.class, new PartitionIdDeserializer()); | 108 | module.addDeserializer(PartitionId.class, new PartitionIdDeserializer()); |
96 | mapper.registerModule(module); | 109 | mapper.registerModule(module); |
97 | providerService = providerRegistry.register(this); | 110 | providerService = providerRegistry.register(this); |
111 | + configFileChangeDetector.execute(() -> { | ||
112 | + try { | ||
113 | + watchConfigFile(); | ||
114 | + } catch (IOException e) { | ||
115 | + log.warn("Failure in setting up a watch for config " | ||
116 | + + "file updates. updates to {} will be ignored", CONFIG_FILE, e); | ||
117 | + } | ||
118 | + }); | ||
98 | log.info("Started"); | 119 | log.info("Started"); |
99 | } | 120 | } |
100 | 121 | ||
101 | @Deactivate | 122 | @Deactivate |
102 | public void deactivate() { | 123 | public void deactivate() { |
124 | + configFileChangeDetector.shutdown(); | ||
103 | providerRegistry.unregister(this); | 125 | providerRegistry.unregister(this); |
104 | log.info("Stopped"); | 126 | log.info("Stopped"); |
105 | } | 127 | } |
... | @@ -114,7 +136,7 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr | ... | @@ -114,7 +136,7 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr |
114 | checkState(isAvailable()); | 136 | checkState(isAvailable()); |
115 | synchronized (this) { | 137 | synchronized (this) { |
116 | if (cachedMetadata.get() == null) { | 138 | if (cachedMetadata.get() == null) { |
117 | - loadMetadata(); | 139 | + cachedMetadata.set(fetchMetadata()); |
118 | } | 140 | } |
119 | return cachedMetadata.get(); | 141 | return cachedMetadata.get(); |
120 | } | 142 | } |
... | @@ -151,20 +173,20 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr | ... | @@ -151,20 +173,20 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr |
151 | return CONFIG_FILE.exists(); | 173 | return CONFIG_FILE.exists(); |
152 | } | 174 | } |
153 | 175 | ||
154 | - private void loadMetadata() { | 176 | + private Versioned<ClusterMetadata> fetchMetadata() { |
155 | ClusterMetadata metadata = null; | 177 | ClusterMetadata metadata = null; |
156 | long version = 0; | 178 | long version = 0; |
157 | try { | 179 | try { |
158 | metadata = mapper.readValue(CONFIG_FILE, ClusterMetadata.class); | 180 | metadata = mapper.readValue(CONFIG_FILE, ClusterMetadata.class); |
159 | - version = metadata.hashCode(); | 181 | + version = CONFIG_FILE.lastModified(); |
160 | } catch (IOException e) { | 182 | } catch (IOException e) { |
161 | Throwables.propagate(e); | 183 | Throwables.propagate(e); |
162 | } | 184 | } |
163 | - cachedMetadata.set(new Versioned<>(new ClusterMetadata(PROVIDER_ID, | 185 | + return new Versioned<>(new ClusterMetadata(PROVIDER_ID, |
164 | - metadata.getName(), | 186 | + metadata.getName(), |
165 | - Sets.newHashSet(metadata.getNodes()), | 187 | + Sets.newHashSet(metadata.getNodes()), |
166 | - Sets.newHashSet(metadata.getPartitions())), | 188 | + Sets.newHashSet(metadata.getPartitions())), |
167 | - version)); | 189 | + version); |
168 | } | 190 | } |
169 | 191 | ||
170 | private static class PartitionDeserializer extends JsonDeserializer<Partition> { | 192 | private static class PartitionDeserializer extends JsonDeserializer<Partition> { |
... | @@ -232,4 +254,34 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr | ... | @@ -232,4 +254,34 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr |
232 | return new NodeId(node.asText()); | 254 | return new NodeId(node.asText()); |
233 | } | 255 | } |
234 | } | 256 | } |
257 | + | ||
258 | + /** | ||
259 | + * Monitors the config file for any updates and notifies providerService accordingly. | ||
260 | + * @throws IOException | ||
261 | + */ | ||
262 | + private void watchConfigFile() throws IOException { | ||
263 | + WatchService watcher = FileSystems.getDefault().newWatchService(); | ||
264 | + Path configFilePath = FileSystems.getDefault().getPath(CONFIG_DIR); | ||
265 | + configFilePath.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY); | ||
266 | + while (true) { | ||
267 | + try { | ||
268 | + final WatchKey watchKey = watcher.take(); | ||
269 | + for (WatchEvent<?> event : watchKey.pollEvents()) { | ||
270 | + final Path changed = (Path) event.context(); | ||
271 | + log.info("{} was updated", changed); | ||
272 | + // TODO: Fix concurrency issues | ||
273 | + Versioned<ClusterMetadata> latestMetadata = fetchMetadata(); | ||
274 | + cachedMetadata.set(latestMetadata); | ||
275 | + providerService.clusterMetadataChanged(latestMetadata); | ||
276 | + } | ||
277 | + if (!watchKey.reset()) { | ||
278 | + log.debug("WatchKey has been unregistered"); | ||
279 | + break; | ||
280 | + } | ||
281 | + } catch (InterruptedException e) { | ||
282 | + Thread.currentThread().interrupt(); | ||
283 | + break; | ||
284 | + } | ||
285 | + } | ||
286 | + } | ||
235 | } | 287 | } |
... | \ No newline at end of file | ... | \ No newline at end of file | ... | ... |
-
Please register or login to post a comment