Yuta HIGUCHI

DatabaseClient: fixed to use copycat instance instead

Change-Id: If13ec051f362e0d3bc8311dc30e2c0f70e55c42e
1 package org.onlab.onos.store.service.impl; 1 package org.onlab.onos.store.service.impl;
2 2
3 -import java.util.Arrays; 3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +
4 import java.util.List; 5 import java.util.List;
5 -import java.util.UUID;
6 import java.util.concurrent.CompletableFuture; 6 import java.util.concurrent.CompletableFuture;
7 import java.util.concurrent.ExecutionException; 7 import java.util.concurrent.ExecutionException;
8 8
9 -import net.kuujo.copycat.protocol.Response.Status; 9 +import net.kuujo.copycat.Copycat;
10 -import net.kuujo.copycat.protocol.SubmitRequest;
11 -import net.kuujo.copycat.protocol.SubmitResponse;
12 -import net.kuujo.copycat.spi.protocol.ProtocolClient;
13 10
14 import org.onlab.onos.store.service.DatabaseException; 11 import org.onlab.onos.store.service.DatabaseException;
15 import org.onlab.onos.store.service.ReadRequest; 12 import org.onlab.onos.store.service.ReadRequest;
...@@ -20,31 +17,17 @@ import org.onlab.onos.store.service.WriteRequest; ...@@ -20,31 +17,17 @@ import org.onlab.onos.store.service.WriteRequest;
20 */ 17 */
21 public class DatabaseClient { 18 public class DatabaseClient {
22 19
23 - private final ProtocolClient client; 20 + private final Copycat copycat;
24 -
25 - public DatabaseClient(ProtocolClient client) {
26 - this.client = client;
27 - }
28 21
29 - private static String nextId() { 22 + public DatabaseClient(Copycat copycat) {
30 - return UUID.randomUUID().toString(); 23 + this.copycat = checkNotNull(copycat);
31 } 24 }
32 25
33 public boolean createTable(String tableName) { 26 public boolean createTable(String tableName) {
34 27
35 - SubmitRequest request = 28 + CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
36 - new SubmitRequest(
37 - nextId(),
38 - "createTable",
39 - Arrays.asList(tableName));
40 - CompletableFuture<SubmitResponse> future = client.submit(request);
41 try { 29 try {
42 - final SubmitResponse submitResponse = future.get(); 30 + return future.get();
43 - if (submitResponse.status() == Status.OK) {
44 - return (boolean) submitResponse.result();
45 - } else {
46 - throw new DatabaseException(submitResponse.error());
47 - }
48 } catch (InterruptedException | ExecutionException e) { 31 } catch (InterruptedException | ExecutionException e) {
49 throw new DatabaseException(e); 32 throw new DatabaseException(e);
50 } 33 }
...@@ -52,17 +35,9 @@ public class DatabaseClient { ...@@ -52,17 +35,9 @@ public class DatabaseClient {
52 35
53 public void dropTable(String tableName) { 36 public void dropTable(String tableName) {
54 37
55 - SubmitRequest request = 38 + CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
56 - new SubmitRequest(
57 - nextId(),
58 - "dropTable",
59 - Arrays.asList(tableName));
60 - CompletableFuture<SubmitResponse> future = client.submit(request);
61 try { 39 try {
62 - if (future.get().status() != Status.OK) { 40 + future.get();
63 - throw new DatabaseException(future.get().toString());
64 - }
65 -
66 } catch (InterruptedException | ExecutionException e) { 41 } catch (InterruptedException | ExecutionException e) {
67 throw new DatabaseException(e); 42 throw new DatabaseException(e);
68 } 43 }
...@@ -70,79 +45,39 @@ public class DatabaseClient { ...@@ -70,79 +45,39 @@ public class DatabaseClient {
70 45
71 public void dropAllTables() { 46 public void dropAllTables() {
72 47
73 - SubmitRequest request = 48 + CompletableFuture<Void> future = copycat.submit("dropAllTables");
74 - new SubmitRequest(
75 - nextId(),
76 - "dropAllTables",
77 - Arrays.asList());
78 - CompletableFuture<SubmitResponse> future = client.submit(request);
79 try { 49 try {
80 - if (future.get().status() != Status.OK) { 50 + future.get();
81 - throw new DatabaseException(future.get().toString());
82 - }
83 } catch (InterruptedException | ExecutionException e) { 51 } catch (InterruptedException | ExecutionException e) {
84 throw new DatabaseException(e); 52 throw new DatabaseException(e);
85 } 53 }
86 } 54 }
87 55
88 - @SuppressWarnings("unchecked")
89 public List<String> listTables() { 56 public List<String> listTables() {
90 57
91 - SubmitRequest request = 58 + CompletableFuture<List<String>> future = copycat.submit("listTables");
92 - new SubmitRequest(
93 - nextId(),
94 - "listTables",
95 - Arrays.asList());
96 - CompletableFuture<SubmitResponse> future = client.submit(request);
97 try { 59 try {
98 - final SubmitResponse submitResponse = future.get(); 60 + return future.get();
99 - if (submitResponse.status() == Status.OK) {
100 - return (List<String>) submitResponse.result();
101 - } else {
102 - throw new DatabaseException(submitResponse.error());
103 - }
104 } catch (InterruptedException | ExecutionException e) { 61 } catch (InterruptedException | ExecutionException e) {
105 throw new DatabaseException(e); 62 throw new DatabaseException(e);
106 } 63 }
107 } 64 }
108 65
109 - @SuppressWarnings("unchecked")
110 public List<InternalReadResult> batchRead(List<ReadRequest> requests) { 66 public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
111 67
112 - SubmitRequest request = new SubmitRequest( 68 + CompletableFuture<List<InternalReadResult>> future = copycat.submit("read", requests);
113 - nextId(),
114 - "read",
115 - Arrays.asList(requests));
116 -
117 - CompletableFuture<SubmitResponse> future = client.submit(request);
118 try { 69 try {
119 - final SubmitResponse submitResponse = future.get(); 70 + return future.get();
120 - if (submitResponse.status() == Status.OK) {
121 - return (List<InternalReadResult>) submitResponse.result();
122 - } else {
123 - throw new DatabaseException(submitResponse.error());
124 - }
125 } catch (InterruptedException | ExecutionException e) { 71 } catch (InterruptedException | ExecutionException e) {
126 throw new DatabaseException(e); 72 throw new DatabaseException(e);
127 } 73 }
128 } 74 }
129 75
130 - @SuppressWarnings("unchecked")
131 public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) { 76 public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
132 77
133 - SubmitRequest request = new SubmitRequest( 78 + CompletableFuture<List<InternalWriteResult>> future = copycat.submit("write", requests);
134 - nextId(),
135 - "write",
136 - Arrays.asList(requests));
137 -
138 - CompletableFuture<SubmitResponse> future = client.submit(request);
139 try { 79 try {
140 - final SubmitResponse submitResponse = future.get(); 80 + return future.get();
141 - if (submitResponse.status() == Status.OK) {
142 - return (List<InternalWriteResult>) submitResponse.result();
143 - } else {
144 - throw new DatabaseException(submitResponse.error());
145 - }
146 } catch (InterruptedException | ExecutionException e) { 81 } catch (InterruptedException | ExecutionException e) {
147 throw new DatabaseException(e); 82 throw new DatabaseException(e);
148 } 83 }
......
...@@ -25,7 +25,6 @@ import net.kuujo.copycat.cluster.TcpMember; ...@@ -25,7 +25,6 @@ import net.kuujo.copycat.cluster.TcpMember;
25 import net.kuujo.copycat.log.InMemoryLog; 25 import net.kuujo.copycat.log.InMemoryLog;
26 import net.kuujo.copycat.log.Log; 26 import net.kuujo.copycat.log.Log;
27 27
28 -import org.apache.commons.lang3.RandomUtils;
29 import org.apache.felix.scr.annotations.Activate; 28 import org.apache.felix.scr.annotations.Activate;
30 import org.apache.felix.scr.annotations.Component; 29 import org.apache.felix.scr.annotations.Component;
31 import org.apache.felix.scr.annotations.Deactivate; 30 import org.apache.felix.scr.annotations.Deactivate;
...@@ -53,7 +52,6 @@ import org.onlab.packet.IpAddress; ...@@ -53,7 +52,6 @@ import org.onlab.packet.IpAddress;
53 import org.slf4j.Logger; 52 import org.slf4j.Logger;
54 53
55 import com.google.common.collect.ImmutableList; 54 import com.google.common.collect.ImmutableList;
56 -import com.google.common.collect.Iterables;
57 55
58 /** 56 /**
59 * Strongly consistent and durable state management service based on 57 * Strongly consistent and durable state management service based on
...@@ -125,30 +123,16 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -125,30 +123,16 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
125 } 123 }
126 124
127 final ControllerNode localNode = clusterService.getLocalNode(); 125 final ControllerNode localNode = clusterService.getLocalNode();
128 - TcpMember clientHandler = null;
129 for (ControllerNode member : defaultMember) { 126 for (ControllerNode member : defaultMember) {
130 final TcpMember tcpMember = new TcpMember(member.ip().toString(), 127 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
131 member.tcpPort()); 128 member.tcpPort());
132 if (localNode.equals(member)) { 129 if (localNode.equals(member)) {
133 - clientHandler = tcpMember;
134 clusterConfig.setLocalMember(tcpMember); 130 clusterConfig.setLocalMember(tcpMember);
135 } else { 131 } else {
136 clusterConfig.addRemoteMember(tcpMember); 132 clusterConfig.addRemoteMember(tcpMember);
137 } 133 }
138 } 134 }
139 135
140 - // TODO should be removed after DatabaseClient refactoring
141 - if (clientHandler == null) {
142 - Set<TcpMember> members = clusterConfig.getMembers();
143 - if (members.isEmpty()) {
144 - log.error("No member found in [{}] tablet configuration.",
145 - DEFAULT_TABLET);
146 - throw new IllegalStateException("No member found in tablet configuration");
147 - }
148 - int position = RandomUtils.nextInt(0, members.size());
149 - clientHandler = Iterables.get(members, position);
150 - }
151 -
152 // note: from this point beyond, clusterConfig requires synchronization 136 // note: from this point beyond, clusterConfig requires synchronization
153 clusterEventLatch = new CountDownLatch(1); 137 clusterEventLatch = new CountDownLatch(1);
154 clusterEventListener = new InternalClusterEventListener(); 138 clusterEventListener = new InternalClusterEventListener();
...@@ -182,8 +166,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -182,8 +166,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
182 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol); 166 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
183 copycat.start(); 167 copycat.start();
184 168
185 - // FIXME Redo DatabaseClient. Needs fall back mechanism etc. 169 + client = new DatabaseClient(copycat);
186 - client = new DatabaseClient(copycatMessagingProtocol.createClient(clientHandler));
187 170
188 log.info("Started."); 171 log.info("Started.");
189 } 172 }
......