Thomas Vachuska

Fixed error due to storage exception timeout. Added a delay between retries.

Change-Id: I99bdfbe980eac7069f34203ee69fe0c5c480db45
......@@ -17,7 +17,6 @@ package org.onosproject.store.app;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -43,12 +42,12 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
......@@ -63,15 +62,10 @@ import java.util.function.Function;
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.app.ApplicationEvent.Type.APP_ACTIVATED;
import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
import static org.onosproject.app.ApplicationEvent.Type.APP_INSTALLED;
import static org.onosproject.app.ApplicationEvent.Type.APP_PERMISSIONS_CHANGED;
import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
import static org.onosproject.app.ApplicationEvent.Type.*;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -85,12 +79,13 @@ import static org.slf4j.LoggerFactory.getLogger;
public class GossipApplicationStore extends ApplicationArchive
implements ApplicationStore {
private static final int MAX_LOAD_RETRIES = 3;
private final Logger log = getLogger(getClass());
private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
private static final int MAX_LOAD_RETRIES = 3;
private static final int RETRY_DELAY_MS = 2_000;
private static final int FETCH_TIMEOUT_MS = 10_000;
private static final int LOAD_TIMEOUT_MS = 5_000;
......@@ -174,6 +169,7 @@ public class GossipApplicationStore extends ApplicationArchive
}
} catch (Exception e) {
log.warn("Unable to load application {} from disk; retrying", name);
delay(RETRY_DELAY_MS); // FIXME: This is a deliberate hack; fix in Drake
}
}
}
......@@ -372,21 +368,21 @@ public class GossipApplicationStore extends ApplicationArchive
continue;
}
clusterCommunicator.sendAndReceive(app.id().name(),
APP_BITS_REQUEST,
s -> s.getBytes(Charsets.UTF_8),
Function.identity(),
node.id())
.whenCompleteAsync((bits, error) -> {
if (error == null && latch.getCount() > 0) {
saveApplication(new ByteArrayInputStream(bits));
log.info("Downloaded bits for application {} from node {}",
app.id().name(), node.id());
latch.countDown();
} else if (error != null) {
log.warn("Unable to fetch bits for application {} from node {}",
app.id().name(), node.id());
}
}, executor);
APP_BITS_REQUEST,
s -> s.getBytes(Charsets.UTF_8),
Function.identity(),
node.id())
.whenCompleteAsync((bits, error) -> {
if (error == null && latch.getCount() > 0) {
saveApplication(new ByteArrayInputStream(bits));
log.info("Downloaded bits for application {} from node {}",
app.id().name(), node.id());
latch.countDown();
} else if (error != null) {
log.warn("Unable to fetch bits for application {} from node {}",
app.id().name(), node.id());
}
}, executor);
}
try {
......@@ -412,6 +408,7 @@ public class GossipApplicationStore extends ApplicationArchive
}
}
}
/**
* Prunes applications which are not in the map, but are on disk.
*/
......
package org.onosproject.store.core.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -17,7 +14,10 @@ import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
import java.util.Map;
import static org.onlab.util.Tools.delay;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of {@code IdBlockStore} using {@code AtomicCounter}.
......@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
public class ConsistentIdBlockStore implements IdBlockStore {
private static final int MAX_TRIES = 3;
private static final int RETRY_DELAY_MS = 2_000;
private final Logger log = getLogger(getClass());
private final Map<String, AtomicCounter> topicCounters = Maps.newConcurrentMap();
......@@ -62,6 +63,7 @@ public class ConsistentIdBlockStore implements IdBlockStore {
log.warn("Unable to allocate ID block due to {}; retrying...",
e.getMessage());
exc = e;
delay(RETRY_DELAY_MS); // FIXME: This is a deliberate hack; fix in Drake
}
}
throw new IllegalStateException("Unable to allocate ID block", exc);
......