From 187cbd0b2daf2b2e61145ff02f06014721c8f82b Mon Sep 17 00:00:00 2001 From: wenweihu86 Date: Sat, 24 Jun 2017 15:56:25 +0800 Subject: [PATCH] add producer client --- .../wenweihu86/distmq/broker/BrokerMain.java | 15 +- .../distmq/client/BrokerClient.java | 66 +++++ .../distmq/client/BrokerClientManager.java | 39 +++ .../distmq/client/CommonConfig.java | 45 +++ .../client/consumer/ConsumerConfig.java | 9 + .../distmq/client/producer/Producer.java | 100 +++++++ .../client/producer/ProducerConfig.java | 18 ++ .../wenweihu86/distmq/client/zk/ZKClient.java | 264 ++++++++++++++---- .../wenweihu86/distmq/client/zk/ZKConf.java | 10 +- .../wenweihu86/distmq/client/zk/ZKData.java | 34 ++- 10 files changed, 523 insertions(+), 77 deletions(-) create mode 100644 distmq-client/src/main/java/com/github/wenweihu86/distmq/client/BrokerClient.java create mode 100644 distmq-client/src/main/java/com/github/wenweihu86/distmq/client/BrokerClientManager.java create mode 100644 distmq-client/src/main/java/com/github/wenweihu86/distmq/client/CommonConfig.java create mode 100644 distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/ConsumerConfig.java create mode 100644 distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java create mode 100644 distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/ProducerConfig.java diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java index 13455fa..eed8c40 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java @@ -10,6 +10,7 @@ import com.github.wenweihu86.raft.service.RaftConsensusService; import com.github.wenweihu86.raft.service.impl.RaftClientServiceImpl; import com.github.wenweihu86.raft.service.impl.RaftConsensusServiceImpl; +import com.github.wenweihu86.raft.util.ConfigurationUtils; import com.github.wenweihu86.rpc.server.RPCServer; import java.util.List; @@ -48,11 +49,21 @@ public static void main(String[] args) { // 注册zk ZKConf zkConf = conf.getZkConf(); ZKClient zkClient = new ZKClient(zkConf); + zkClient.subscribeBroker(); + zkClient.subscribeTopic(); + // 等成为raft集群成员后,才能注册到zk + + while (ConfigurationUtils.containsServer( + raftNode.getConfiguration(), conf.getLocalServer().getServerId())) { + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } zkClient.registerBroker(conf.getShardingId(), conf.getLocalServer().getEndPoint().getHost(), conf.getLocalServer().getEndPoint().getPort()); - zkClient.subscribeBroker(); - zkClient.subscribeTopic(); } } diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/BrokerClient.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/BrokerClient.java new file mode 100644 index 0000000..75736d8 --- /dev/null +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/BrokerClient.java @@ -0,0 +1,66 @@ +package com.github.wenweihu86.distmq.client; + +import com.github.wenweihu86.distmq.client.api.BrokerAPI; +import com.github.wenweihu86.rpc.client.RPCClient; +import com.github.wenweihu86.rpc.client.RPCClientOptions; +import com.github.wenweihu86.rpc.client.RPCProxy; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; + +/** + * Created by wenweihu86 on 2017/6/24. + */ +public class BrokerClient { + private String address; + private RPCClient rpcClient; + private BrokerAPI brokerAPI; + + public BrokerClient(String address, RPCClientOptions options) { + this.address = address; + this.rpcClient = new RPCClient(address, options); + this.brokerAPI = RPCProxy.getProxy(this.rpcClient, BrokerAPI.class); + } + + @Override + public boolean equals(Object object) { + boolean flag = false; + if (object != null && BrokerClient.class.isAssignableFrom(object.getClass())) { + BrokerClient rhs = (BrokerClient) object; + flag = new EqualsBuilder() + .append(address, rhs.address) + .isEquals(); + } + return flag; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(address) + .toHashCode(); + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public RPCClient getRpcClient() { + return rpcClient; + } + + public void setRpcClient(RPCClient rpcClient) { + this.rpcClient = rpcClient; + } + + public BrokerAPI getBrokerAPI() { + return brokerAPI; + } + + public void setBrokerAPI(BrokerAPI brokerAPI) { + this.brokerAPI = brokerAPI; + } +} diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/BrokerClientManager.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/BrokerClientManager.java new file mode 100644 index 0000000..bdc2237 --- /dev/null +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/BrokerClientManager.java @@ -0,0 +1,39 @@ +package com.github.wenweihu86.distmq.client; + +import com.github.wenweihu86.rpc.client.RPCClientOptions; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Created by wenweihu86 on 2017/6/24. + */ +public class BrokerClientManager { + private static BrokerClientManager instance; + private static RPCClientOptions rpcClientOptions; + private ConcurrentMap brokerClientMap = new ConcurrentHashMap<>(); + + public static BrokerClientManager getInstance() { + if (instance == null) { + instance = new BrokerClientManager(); + } + return instance; + } + + public static RPCClientOptions getRpcClientOptions() { + return rpcClientOptions; + } + + public static void setRpcClientOptions(RPCClientOptions rpcClientOptions) { + BrokerClientManager.rpcClientOptions = rpcClientOptions; + } + + public ConcurrentMap getBrokerClientMap() { + return brokerClientMap; + } + + public void setBrokerClientMap(ConcurrentMap brokerClientMap) { + this.brokerClientMap = brokerClientMap; + } +} diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/CommonConfig.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/CommonConfig.java new file mode 100644 index 0000000..18c8c94 --- /dev/null +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/CommonConfig.java @@ -0,0 +1,45 @@ +package com.github.wenweihu86.distmq.client; + +import com.github.wenweihu86.distmq.client.zk.ZKConf; +import com.github.wenweihu86.rpc.client.RPCClientOptions; + +/** + * Created by wenweihu86 on 2017/6/24. + */ +public class CommonConfig extends ZKConf { + private int brokerConnectTimeoutMs = 200; + private int brokerReadTimeoutMs = 500; + private int brokerWriteTimeoutMs = 200; + + public RPCClientOptions getRPCClientOptions() { + RPCClientOptions rpcClientOptions = new RPCClientOptions(); + rpcClientOptions.setConnectTimeoutMillis(brokerConnectTimeoutMs); + rpcClientOptions.setReadTimeoutMillis(brokerReadTimeoutMs); + rpcClientOptions.setWriteTimeoutMillis(brokerWriteTimeoutMs); + return rpcClientOptions; + } + + public int getBrokerConnectTimeoutMs() { + return brokerConnectTimeoutMs; + } + + public void setBrokerConnectTimeoutMs(int brokerConnectTimeoutMs) { + this.brokerConnectTimeoutMs = brokerConnectTimeoutMs; + } + + public int getBrokerReadTimeoutMs() { + return brokerReadTimeoutMs; + } + + public void setBrokerReadTimeoutMs(int brokerReadTimeoutMs) { + this.brokerReadTimeoutMs = brokerReadTimeoutMs; + } + + public int getBrokerWriteTimeoutMs() { + return brokerWriteTimeoutMs; + } + + public void setBrokerWriteTimeoutMs(int brokerWriteTimeoutMs) { + this.brokerWriteTimeoutMs = brokerWriteTimeoutMs; + } +} diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/ConsumerConfig.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/ConsumerConfig.java new file mode 100644 index 0000000..50b6e72 --- /dev/null +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/ConsumerConfig.java @@ -0,0 +1,9 @@ +package com.github.wenweihu86.distmq.client.consumer; + +import com.github.wenweihu86.distmq.client.CommonConfig; + +/** + * Created by wenweihu86 on 2017/6/24. + */ +public class ConsumerConfig extends CommonConfig { +} diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java new file mode 100644 index 0000000..a4f310f --- /dev/null +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java @@ -0,0 +1,100 @@ +package com.github.wenweihu86.distmq.client.producer; + +import com.github.wenweihu86.distmq.client.BrokerClient; +import com.github.wenweihu86.distmq.client.BrokerClientManager; +import com.github.wenweihu86.distmq.client.api.BrokerMessage; +import com.github.wenweihu86.distmq.client.zk.ZKClient; +import com.github.wenweihu86.distmq.client.zk.ZKData; +import com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Created by wenweihu86 on 2017/6/24. + */ +public class Producer { + private static final Logger LOG = LoggerFactory.getLogger(Producer.class); + private ProducerConfig config; + private ZKClient zkClient; + + public Producer(ProducerConfig config) { + this.config = config; + BrokerClientManager.setRpcClientOptions(this.config.getRPCClientOptions()); + zkClient = new ZKClient(config); + zkClient.subscribeBroker(); + zkClient.subscribeTopic(); + } + + public boolean send(String topic, byte[] messageBytes) { + ZKData zkData = ZKData.getInstance(); + Map queueMap; + zkData.getTopicLock().lock(); + try { + queueMap = zkData.getTopicMap().get(topic); + } finally { + zkData.getTopicLock().unlock(); + } + Integer queueId; + Integer shardingId; + if (queueMap == null) { + zkClient.registerTopic(topic, config.getQueueCountPerTopic()); + zkData.getTopicLock().lock(); + try { + while (!zkData.getTopicMap().containsKey(topic)) { + zkData.getTopicCondition().awaitUninterruptibly(); + } + } finally { + zkData.getTopicLock().unlock(); + } + } + + zkData.getTopicLock().lock(); + try { + queueMap = zkData.getTopicMap().get(topic); + int queueCount = queueMap.size(); + int randomIndex = ThreadLocalRandom.current().nextInt(0, queueCount); + Integer[] queueArray = queueMap.keySet().toArray(new Integer[0]); + queueId = queueArray[randomIndex]; + shardingId = queueMap.get(queueId); + } finally { + zkData.getTopicLock().unlock(); + } + + // send message to broker + BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.newBuilder() + .setTopic(topic) + .setQueue(queueId) + .setContent(ByteString.copyFrom(messageBytes)) + .build(); + + List brokerAddressList; + zkData.getBrokerLock().lock(); + try { + brokerAddressList = zkData.getBrokerMap().get(shardingId); + } finally { + zkData.getBrokerLock().unlock(); + } + int randIndex = ThreadLocalRandom.current().nextInt(0, brokerAddressList.size()); + String brokerAddress = brokerAddressList.get(randIndex); + BrokerClient brokerClient = BrokerClientManager.getInstance().getBrokerClientMap().get(brokerAddress); + BrokerMessage.SendMessageResponse response = brokerClient.getBrokerAPI().sendMessage(request); + if (response == null || response.getBaseRes().getResCode() != BrokerMessage.ResCode.RES_CODE_SUCCESS) { + LOG.warn("send message failed, topic={}, queue={}, brokerAddress={}", + topic, queueId, brokerAddress); + return false; + } + return true; + } + + public ProducerConfig getConfig() { + return config; + } + + public void setConfig(ProducerConfig config) { + this.config = config; + } +} diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/ProducerConfig.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/ProducerConfig.java new file mode 100644 index 0000000..0cb594f --- /dev/null +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/ProducerConfig.java @@ -0,0 +1,18 @@ +package com.github.wenweihu86.distmq.client.producer; + +import com.github.wenweihu86.distmq.client.CommonConfig; + +/** + * Created by wenweihu86 on 2017/6/24. + */ +public class ProducerConfig extends CommonConfig { + private int queueCountPerTopic = 4; + + public int getQueueCountPerTopic() { + return queueCountPerTopic; + } + + public void setQueueCountPerTopic(int queueCountPerTopic) { + this.queueCountPerTopic = queueCountPerTopic; + } +} diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java index e16e86c..bcd61f7 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java @@ -1,5 +1,10 @@ package com.github.wenweihu86.distmq.client.zk; +import com.github.wenweihu86.distmq.client.BrokerClient; +import com.github.wenweihu86.distmq.client.BrokerClientManager; +import com.github.wenweihu86.distmq.client.consumer.ConsumerConfig; +import com.github.wenweihu86.distmq.client.producer.ProducerConfig; +import com.github.wenweihu86.rpc.client.RPCClientOptions; import org.apache.commons.collections.CollectionUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -22,9 +27,16 @@ public class ZKClient { private static final Logger LOG = LoggerFactory.getLogger(ZKClient.class); private ZKConf zkConf; private CuratorFramework zkClient; + private boolean isProducer = false; + private boolean isConsumer = false; public ZKClient(ZKConf conf) { this.zkConf = conf; + if (conf instanceof ProducerConfig) { + isProducer = true; + } else if (conf instanceof ConsumerConfig) { + isConsumer = true; + } RetryPolicy retryPolicy = new ExponentialBackoffRetry( zkConf.getRetryIntervalMs(), zkConf.getRetryCount()); this.zkClient = CuratorFrameworkFactory.builder() @@ -47,11 +59,13 @@ public void registerBroker(int shardingId, String ip, int port) { } catch (Exception ex) { LOG.warn("registerBroker exception:", ex); } + LOG.info("register broker sucess, ip={}, port={}", ip, port); } + // 启动时调用,所以不用加锁 public void subscribeBroker() { final ZKData zkData = ZKData.getInstance(); - final ConcurrentMap> brokerMap = zkData.getBrokerMap(); + final Map> brokerMap = zkData.getBrokerMap(); try { final String brokerParentPath = zkConf.getBasePath() + "/brokers"; List shardings = zkClient.getChildren().forPath(brokerParentPath); @@ -59,6 +73,13 @@ public void subscribeBroker() { final int shardingId = Integer.valueOf(sharding); final String shardingPath = brokerParentPath + "/" + sharding; List brokerAddressList = zkClient.getChildren().forPath(shardingPath); + if (isProducer || isConsumer) { + RPCClientOptions rpcClientOptions = BrokerClientManager.getRpcClientOptions(); + for (String address : brokerAddressList) { + BrokerClient brokerClient = new BrokerClient(address, rpcClientOptions); + BrokerClientManager.getInstance().getBrokerClientMap().put(address, brokerClient); + } + } brokerMap.put(shardingId, brokerAddressList); // 监听broker分片变化 zkClient.getChildren().usingWatcher( @@ -72,15 +93,28 @@ public void subscribeBroker() { } } - public void registerTopic(String topic, int queueNum) { + /** + * 创建新的topic,加锁是防止重复创建 + * @param topic topic名称 + * @param queueNum queue个数 + */ + public synchronized void registerTopic(String topic, int queueNum) { ZKData zkData = ZKData.getInstance(); - ConcurrentMap> brokerMap = zkData.getBrokerMap(); - List shardingIds = new ArrayList<>(brokerMap.keySet()); + List shardingIds; + zkData.getBrokerLock().lock(); + try { + Map> brokerMap = zkData.getBrokerMap(); + shardingIds = new ArrayList<>(brokerMap.keySet()); + } finally { + zkData.getBrokerLock().unlock(); + } + int shardingNum = shardingIds.size(); - String topicPath = zkConf.getBasePath() + "/topics/"; + String topicPath = zkConf.getBasePath() + "/topics/" + topic; for (int queueId = 0; queueId < queueNum; queueId++) { - int shardingId = queueId % shardingNum; - String queuePath = topicPath + queueId; + int index = queueId % shardingNum; + int shardingId = shardingIds.get(index); + String queuePath = topicPath + "/" + queueId; byte[] queueData = String.valueOf(shardingId).getBytes(); try { zkClient.create() @@ -92,9 +126,10 @@ public void registerTopic(String topic, int queueNum) { } } + // 启动时调用,所以不用加锁 public void subscribeTopic() { ZKData zkData = ZKData.getInstance(); - ConcurrentMap> topicMap = zkData.getTopicMap(); + Map> topicMap = zkData.getTopicMap(); String topicParentPath = zkConf.getBasePath() + "/topics/"; try { List topics = zkClient.getChildren().forPath(topicParentPath); @@ -128,6 +163,73 @@ public void subscribeTopic() { } } + // 监听所有broker的分片信息变化事件 + private class BrokersWatcher implements CuratorWatcher { + @Override + public void process(WatchedEvent event) throws Exception { + ZKData zkData = ZKData.getInstance(); + Map> brokerMap = zkData.getBrokerMap(); + if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { + String brokerPath = zkConf.getBasePath() + "/brokers"; + List newShardings = zkClient.getChildren().forPath(brokerPath); + + List oldShardings = new ArrayList<>(); + zkData.getBrokerLock().lock(); + try { + for (Integer shardingId : zkData.getBrokerMap().keySet()) { + oldShardings.add(String.valueOf(shardingId)); + } + } finally { + zkData.getBrokerLock().unlock(); + } + + Collection addedShardings = CollectionUtils.subtract(newShardings, oldShardings); + Collection deletedShardings = CollectionUtils.subtract(oldShardings, newShardings); + for (String sharding : addedShardings) { + int shardingId = Integer.valueOf(sharding); + String shardingPath = brokerPath + "/" + sharding; + List brokerAddressList = zkClient.getChildren().forPath(shardingPath); + for (String address : brokerAddressList) { + if (isProducer || isConsumer) { + BrokerClient brokerClient = new BrokerClient(address, BrokerClientManager.getRpcClientOptions()); + BrokerClientManager.getInstance().getBrokerClientMap().putIfAbsent(address, brokerClient); + } + } + + zkData.getBrokerLock().lock(); + try { + zkData.getBrokerMap().put(shardingId, brokerAddressList); + } finally { + zkData.getBrokerLock().unlock(); + } + + zkClient.getChildren().usingWatcher( + new BrokerShardingWather(shardingId)) + .forPath(shardingPath); + } + + for (String sharding : deletedShardings) { + List brokerList; + zkData.getBrokerLock().lock(); + try { + brokerList = zkData.getBrokerMap().remove(Integer.valueOf(sharding)); + } finally { + zkData.getBrokerLock().unlock(); + } + if ((isProducer || isConsumer) && CollectionUtils.isNotEmpty(brokerList)) { + ConcurrentMap brokerClientMap + = BrokerClientManager.getInstance().getBrokerClientMap(); + for (String address : brokerList) { + BrokerClient client = brokerClientMap.get(address); + client.getRpcClient().stop(); + brokerClientMap.remove(address); + } + } + } + } + } + } + // 监听broker某个分片下的节点变化 private class BrokerShardingWather implements CuratorWatcher { private int shardingId; @@ -139,40 +241,46 @@ public BrokerShardingWather(int shardingId) { @Override public void process(WatchedEvent event) throws Exception { ZKData zkData = ZKData.getInstance(); - ConcurrentMap> brokerMap = zkData.getBrokerMap(); + Map> brokerMap = zkData.getBrokerMap(); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { String shardingPath = zkConf.getBasePath() + "/brokers/" + shardingId; List newBrokerAddressList = zkClient.getChildren().forPath(shardingPath); - // TODO: 对于client需要关闭被删除节点的连接,以及新建新增节点连接 - brokerMap.put(shardingId, newBrokerAddressList); - } - } - } + List oldBrokerAddressList; + zkData.getBrokerLock().lock(); + try { + oldBrokerAddressList = zkData.getBrokerMap().get(shardingPath); + } finally { + zkData.getBrokerLock().unlock(); + } + Collection addedBrokerAddressList + = CollectionUtils.subtract(newBrokerAddressList, oldBrokerAddressList); + Collection deletedBrokerAddressList + = CollectionUtils.subtract(oldBrokerAddressList, newBrokerAddressList); + for (String address : addedBrokerAddressList) { + zkData.getBrokerLock().lock(); + try { + zkData.getBrokerMap().get(shardingId).add(address); + } finally { + zkData.getBrokerLock().unlock(); + } - // 监听所有broker的分片信息变化事件 - private class BrokersWatcher implements CuratorWatcher { - @Override - public void process(WatchedEvent event) throws Exception { - ZKData zkData = ZKData.getInstance(); - ConcurrentMap> brokerMap = zkData.getBrokerMap(); - if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { - String brokerPath = zkConf.getBasePath() + "/brokers"; - List newShardings = zkClient.getChildren().forPath(brokerPath); - Iterator>> iterator = brokerMap.entrySet().iterator(); - while (iterator.hasNext()){ - Map.Entry> entry = iterator.next(); - if (!newShardings.contains(Integer.valueOf(entry.getKey()))) { - // TODO:对于client,需要删除对应节点的连接 - iterator.remove(); + if (isProducer || isConsumer) { + BrokerClient brokerClient = new BrokerClient(address, BrokerClientManager.getRpcClientOptions()); + BrokerClientManager.getInstance().getBrokerClientMap().putIfAbsent(address, brokerClient); } } - for (String sharding : newShardings) { - int shardingId = Integer.valueOf(sharding); - if (!brokerMap.containsKey(shardingId)) { - String shardingPath = brokerPath + "/" + sharding; - zkClient.getChildren().usingWatcher( - new BrokerShardingWather(shardingId)) - .forPath(shardingPath); + for (String address : deletedBrokerAddressList) { + zkData.getBrokerLock().lock(); + try { + zkData.getBrokerMap().get(shardingId).remove(address); + } finally { + zkData.getBrokerLock().unlock(); + } + + if (isProducer || isConsumer) { + BrokerClient brokerClient + = BrokerClientManager.getInstance().getBrokerClientMap().remove(address); + brokerClient.getRpcClient().stop(); } } } @@ -184,34 +292,49 @@ private class TopicsWather implements CuratorWatcher { @Override public void process(WatchedEvent event) throws Exception { ZKData zkData = ZKData.getInstance(); - ConcurrentMap> topicMap = zkData.getTopicMap(); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { String topicParentPath = zkConf.getBasePath() + "/topics"; List newTopics = zkClient.getChildren().forPath(topicParentPath); - List oldTopics = new ArrayList<>(topicMap.keySet()); - if (CollectionUtils.isEmpty(newTopics)) { - LOG.warn("there is no topics"); - topicMap.clear(); - } else { - Collection addedTopics = CollectionUtils.subtract(newTopics, oldTopics); - Collection deletedTopics = CollectionUtils.subtract(oldTopics, newTopics); - for (String topic : addedTopics) { - Map queueMap = topicMap.get(topic); - String topicPath = topicParentPath + "/" + topic; - List queueList = zkClient.getChildren().forPath(topicPath); - for (String queue : queueList) { - String queuePath = topicPath + "/" + queue; - String queueData = new String(zkClient.getData().forPath(queuePath)); - int shardingId = Integer.valueOf(queueData); - int queueId = Integer.valueOf(queue); - queueMap.put(queueId, shardingId); - } + List oldTopics; + zkData.getTopicLock().lockInterruptibly(); + try { + oldTopics = new ArrayList<>(zkData.getTopicMap().keySet()); + } finally { + zkData.getTopicLock().unlock(); + } + Collection addedTopics = CollectionUtils.subtract(newTopics, oldTopics); + Collection deletedTopics = CollectionUtils.subtract(oldTopics, newTopics); + for (String topic : addedTopics) { + Map queueMap = new HashMap<>(); + String topicPath = topicParentPath + "/" + topic; + List queueList = zkClient.getChildren().forPath(topicPath); + for (String queue : queueList) { + String queuePath = topicPath + "/" + queue; + String queueData = new String(zkClient.getData().forPath(queuePath)); + int shardingId = Integer.valueOf(queueData); + int queueId = Integer.valueOf(queue); + queueMap.put(queueId, shardingId); } + zkClient.getChildren().usingWatcher( + new TopicWatcher(topic)) + .forPath(topicPath); + zkData.getTopicLock().lockInterruptibly(); + try { + zkData.getTopicMap().put(topic, queueMap); + zkData.getTopicCondition().signalAll(); + } finally { + zkData.getTopicLock().unlock(); + } + } + zkData.getTopicLock().lockInterruptibly(); + try { for (String topic : deletedTopics) { - topicMap.remove(topic); + zkData.getTopicMap().remove(topic); // TODO: is need remove watcher? } + } finally { + zkData.getTopicLock().unlock(); } } } @@ -228,8 +351,6 @@ public TopicWatcher(String topic) { @Override public void process(WatchedEvent event) throws Exception { ZKData zkData = ZKData.getInstance(); - ConcurrentMap> topicMap = zkData.getTopicMap(); - Map queueMap = topicMap.get(topic); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { String topicPath = zkConf.getBasePath() + "/topics/" + topic; List newQueues = zkClient.getChildren().forPath(topicPath); @@ -237,17 +358,36 @@ public void process(WatchedEvent event) throws Exception { for (String queue : newQueues) { newQueueIds.add(Integer.valueOf(queue)); } - List oldQueueIds = new ArrayList<>(queueMap.keySet()); + + List oldQueueIds; + zkData.getTopicLock().lockInterruptibly(); + try { + oldQueueIds = new ArrayList<>(zkData.getTopicMap().get(topic).keySet()); + } finally { + zkData.getTopicLock().unlock(); + } + Collection addedQueueIds = CollectionUtils.subtract(newQueueIds, oldQueueIds); Collection deletedQueueIds = CollectionUtils.subtract(oldQueueIds, newQueueIds); for (Integer queueId : addedQueueIds) { String queuePath = topicPath + "/" + queueId; String queueData = new String(zkClient.getData().forPath(queuePath)); Integer shardingId = Integer.valueOf(queueData); - queueMap.put(queueId, shardingId); + + zkData.getTopicLock().lockInterruptibly(); + try { + zkData.getTopicMap().get(topic).put(queueId, shardingId); + } finally { + zkData.getTopicLock().unlock(); + } } - for (Integer queueId : deletedQueueIds) { - queueMap.remove(queueId); + zkData.getTopicLock().lockInterruptibly(); + try { + for (Integer queueId : deletedQueueIds) { + zkData.getTopicMap().get(topic).remove(queueId); + } + } finally { + zkData.getTopicLock().unlock(); } } } diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKConf.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKConf.java index bf01784..7b54084 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKConf.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKConf.java @@ -5,11 +5,11 @@ */ public class ZKConf { private String servers; - private int connectTimeoutMs; - private int sessionTimeoutMs; - private int retryCount; - private int retryIntervalMs; - private String basePath; + private int connectTimeoutMs = 500; + private int sessionTimeoutMs = 5000; + private int retryCount = 3; + private int retryIntervalMs = 1000; + private String basePath = "/distmq"; public String getServers() { return servers; diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKData.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKData.java index bef9d76..843cc3a 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKData.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKData.java @@ -1,9 +1,11 @@ package com.github.wenweihu86.distmq.client.zk; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Created by huwenwei on 2017/6/21. @@ -19,28 +21,44 @@ public static ZKData getInstance() { } // shardingId -> broker address list - private ConcurrentMap> brokerMap = new ConcurrentHashMap<>(); + private Map> brokerMap = new HashMap<>(); + private Lock brokerLock = new ReentrantLock(); // topic -> (queueId -> shardingId) - private ConcurrentMap> topicMap = new ConcurrentHashMap<>(); + private Map> topicMap = new HashMap<>(); + + private Lock topicLock = new ReentrantLock(); + private Condition topicCondition = topicLock.newCondition(); public static void setInstance(ZKData instance) { ZKData.instance = instance; } - public ConcurrentMap> getBrokerMap() { + public Map> getBrokerMap() { return brokerMap; } - public void setBrokerMap(ConcurrentMap> brokerMap) { + public void setBrokerMap(Map> brokerMap) { this.brokerMap = brokerMap; } - public ConcurrentMap> getTopicMap() { + public Lock getBrokerLock() { + return brokerLock; + } + + public Map> getTopicMap() { return topicMap; } - public void setTopicMap(ConcurrentMap> topicMap) { + public void setTopicMap(Map> topicMap) { this.topicMap = topicMap; } + + public Lock getTopicLock() { + return topicLock; + } + + public Condition getTopicCondition() { + return topicCondition; + } }