diff --git a/README.md b/README.md index 2aea512..7f16e22 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,2 @@ # DistMQ -Distributed Message Queue based on Raft -# 开发中,会定期更新。 -# 欢迎感兴趣者一起加入开发。 -# TODOS: -* consumer支持rebalance消费 -* raft snapshot使用硬链接 -* 支持多consumerGroup和多topic -* 支持broker failover -* broker消息存储引擎加入mmap功能 -* ...... +Distributed Message Queue based on Raft. diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerAPIImpl.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerAPIImpl.java index 46aa438..b670f63 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerAPIImpl.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerAPIImpl.java @@ -3,8 +3,8 @@ import com.github.wenweihu86.distmq.broker.config.GlobalConf; import com.github.wenweihu86.distmq.client.api.BrokerAPI; import com.github.wenweihu86.distmq.client.api.BrokerMessage; -import com.github.wenweihu86.distmq.client.zk.ZKClient; -import com.github.wenweihu86.distmq.client.zk.ZKData; +import com.github.wenweihu86.distmq.client.zk.MetadataManager; +import com.github.wenweihu86.distmq.client.zk.Metadata; import com.github.wenweihu86.raft.RaftNode; import com.github.wenweihu86.raft.proto.RaftMessage; import com.github.wenweihu86.rpc.client.RPCClient; @@ -23,12 +23,12 @@ public class BrokerAPIImpl implements BrokerAPI { private RaftNode raftNode; private BrokerStateMachine stateMachine; - private ZKClient zkClient; + private MetadataManager metadataManager; - public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine, ZKClient zkClient) { + public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine, MetadataManager metadataManager) { this.raftNode = raftNode; this.stateMachine = stateMachine; - this.zkClient = zkClient; + this.metadataManager = metadataManager; } @Override @@ -38,44 +38,24 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_FAIL); // 查询topic是否存在 - ZKData zkData = ZKData.getInstance(); - GlobalConf conf = GlobalConf.getInstance(); - boolean topicExist = false; - boolean shardingValid = false; - zkData.getTopicLock().lock(); - try { - Map queueMap = zkData.getTopicMap().get(request.getTopic()); - if (queueMap != null && queueMap.size() > 0) { - topicExist = true; - if (queueMap.get(request.getQueue()) == conf.getShardingId()) { - shardingValid = true; - } - } - } finally { - zkData.getTopicLock().unlock(); - } - + boolean topicExist = metadataManager.checkTopicExist(request.getTopic()); // 如果topic尚不存在,请求zookeeper读取 if (!topicExist) { - Map queueMap = zkClient.readTopicInfo(request.getTopic()); - zkData.getTopicLock().lock(); - try { - if (!zkData.getTopicMap().containsKey(request.getTopic())) { - zkData.getTopicMap().put(request.getTopic(), queueMap); - } - queueMap = zkData.getTopicMap().get(request.getTopic()); - if (queueMap.size() > 0) { - topicExist = true; - } - if (queueMap.get(request.getQueue()) == conf.getShardingId()) { - shardingValid = true; - } - } finally { - zkData.getTopicLock().unlock(); + Map queueMap = metadataManager.readTopicInfo(request.getTopic()); + if (queueMap.size() > 0) { + topicExist = true; } + metadataManager.updateTopicMap(request.getTopic(), queueMap); } // 验证queue存在,并且属于该sharding + boolean shardingValid = false; + GlobalConf conf = GlobalConf.getInstance(); + Integer shardingId = metadataManager.getQueueSharding(request.getTopic(), request.getQueue()); + if (shardingId != null && shardingId.equals(conf.getShardingId())) { + shardingValid = true; + } + if (!topicExist || !shardingValid) { String message = "queue not exist or not be included by this sharding"; baseResBuilder.setResMsg(message); diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java index 075491f..28b1c49 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java @@ -1,7 +1,7 @@ package com.github.wenweihu86.distmq.broker; import com.github.wenweihu86.distmq.broker.config.GlobalConf; -import com.github.wenweihu86.distmq.client.zk.ZKClient; +import com.github.wenweihu86.distmq.client.zk.MetadataManager; import com.github.wenweihu86.distmq.client.zk.ZKConf; import com.github.wenweihu86.raft.RaftNode; import com.github.wenweihu86.raft.RaftOptions; @@ -28,7 +28,7 @@ public static void main(String[] args) { // 初始化zookeeper ZKConf zkConf = conf.getZkConf(); - ZKClient zkClient = new ZKClient(zkConf); + MetadataManager metadataManager = new MetadataManager(zkConf); // 初始化RPCServer RPCServer server = new RPCServer(localServer.getEndPoint().getPort()); @@ -45,15 +45,15 @@ public static void main(String[] args) { RaftClientService raftClientService = new RaftClientServiceImpl(raftNode); server.registerService(raftClientService); // 注册应用自己提供的服务 - BrokerAPIImpl brokerAPI = new BrokerAPIImpl(raftNode, stateMachine, zkClient); + BrokerAPIImpl brokerAPI = new BrokerAPIImpl(raftNode, stateMachine, metadataManager); server.registerService(brokerAPI); // 启动RPCServer,初始化Raft节点 server.start(); raftNode.init(); // 订阅broker和topic的变化 - zkClient.subscribeBroker(); - zkClient.subscribeTopic(); + metadataManager.subscribeBroker(); + metadataManager.subscribeTopic(); // 等成为raft集群成员后,才能注册到zk while (!ConfigurationUtils.containsServer( raftNode.getConfiguration(), conf.getLocalServer().getServerId())) { @@ -63,7 +63,7 @@ public static void main(String[] args) { ex.printStackTrace(); } } - zkClient.registerBroker(conf.getShardingId(), + metadataManager.registerBroker(conf.getShardingId(), conf.getLocalServer().getEndPoint().getHost(), conf.getLocalServer().getEndPoint().getPort()); } diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerStateMachine.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerStateMachine.java index c9a0c6d..a5caa03 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerStateMachine.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerStateMachine.java @@ -2,19 +2,15 @@ import com.github.wenweihu86.distmq.broker.config.GlobalConf; import com.github.wenweihu86.distmq.broker.log.LogManager; -import com.github.wenweihu86.distmq.broker.log.Segment; import com.github.wenweihu86.distmq.broker.log.SegmentedLog; import com.github.wenweihu86.distmq.client.api.BrokerMessage; -import com.github.wenweihu86.distmq.client.zk.ZKData; import com.github.wenweihu86.raft.StateMachine; -import com.google.protobuf.ByteString; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Map; /** * Created by wenweihu86 on 2017/6/17. diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/Consumer.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/Consumer.java index 231b5c0..23bad0b 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/Consumer.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/Consumer.java @@ -4,8 +4,8 @@ import com.github.wenweihu86.distmq.client.BrokerClientManager; import com.github.wenweihu86.distmq.client.api.BrokerMessage; import com.github.wenweihu86.distmq.client.utils.JsonUtil; -import com.github.wenweihu86.distmq.client.zk.ZKClient; -import com.github.wenweihu86.distmq.client.zk.ZKData; +import com.github.wenweihu86.distmq.client.zk.MetadataManager; +import com.github.wenweihu86.distmq.client.zk.Metadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,7 +18,7 @@ public class Consumer implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(Consumer.class); private ConsumerConfig config; - private ZKClient zkClient; + private MetadataManager metadataManager; private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1); private MessageListener listener; @@ -26,29 +26,14 @@ public Consumer(ConsumerConfig config, MessageListener listener) { this.config = config; this.listener = listener; BrokerClientManager.setRpcClientOptions(this.config.getRPCClientOptions()); - zkClient = new ZKClient(config); - zkClient.registerConsumer(config.getConsumerGroup(), config.getConsumerId()); - zkClient.subscribeConsumer(config.getConsumerGroup()); - zkClient.subscribeBroker(); - zkClient.subscribeTopic(); - // 更新offset - Map queueOffsetMap = zkClient.readConsumerOffset(config.getConsumerGroup(), config.getTopic()); - ZKData zkData = ZKData.getInstance(); - Map> topicOffsetMap - = zkData.getConsumerOffsetMap().get(config.getConsumerGroup()); - if (topicOffsetMap == null) { - topicOffsetMap = new HashMap<>(); - topicOffsetMap.put(config.getTopic(), queueOffsetMap); - zkData.getConsumerOffsetMap().put(config.getConsumerGroup(), topicOffsetMap); - } else { - Map oldQueueOffsetMap = topicOffsetMap.get(config.getTopic()); - if (oldQueueOffsetMap == null) { - topicOffsetMap.put(config.getTopic(), queueOffsetMap); - } else { - oldQueueOffsetMap.putAll(queueOffsetMap); - } - } - LOG.info("new consumer offset={}", JsonUtil.toJson(zkData.getConsumerOffsetMap())); + metadataManager = new MetadataManager(config); + metadataManager.registerConsumer(config.getConsumerGroup(), config.getConsumerId()); + metadataManager.updateConsumerIds(config.getConsumerGroup()); + metadataManager.subscribeConsumer(config.getConsumerGroup()); + metadataManager.subscribeBroker(); + metadataManager.subscribeTopic(); + // 从zk读取offset + metadataManager.readConsumerOffset(config.getConsumerGroup(), config.getTopic()); } public void start() { @@ -61,20 +46,13 @@ public void run() { for (Map.Entry entry : queueMap.entrySet()) { Integer queueId = entry.getKey(); Integer shardingId = entry.getValue(); - long offset; - // 获取offset - ZKData zkData = ZKData.getInstance(); - zkData.getConsumerOffsetLock().lock(); - try { - offset = zkData.getConsumerOffsetMap() - .get(config.getConsumerGroup()) - .get(config.getTopic()) - .get(queueId); - } catch (Exception ex) { - offset = 0; - } finally { - zkData.getConsumerOffsetLock().unlock(); - } + long offset = metadataManager.getConsumerOffset(queueId); + List brokers = metadataManager.getBrokerAddressList(shardingId); + + int randIndex = ThreadLocalRandom.current().nextInt(0, brokers.size()); + ConcurrentMap brokerClientMap + = BrokerClientManager.getInstance().getBrokerClientMap(); + BrokerClient brokerClient = brokerClientMap.get(brokers.get(randIndex)); BrokerMessage.PullMessageRequest request = BrokerMessage.PullMessageRequest.newBuilder() .setTopic(config.getTopic()) @@ -82,20 +60,6 @@ public void run() { .setMessageCount(config.getMaxMessageCountPerRequest()) .setOffset(offset) .build(); - - List brokers; - zkData.getBrokerLock().lock(); - try { - brokers = zkData.getBrokerMap().get(shardingId); - } finally { - zkData.getBrokerLock().unlock(); - } - - int randIndex = ThreadLocalRandom.current().nextInt(0, brokers.size()); - ConcurrentMap brokerClientMap - = BrokerClientManager.getInstance().getBrokerClientMap(); - BrokerClient brokerClient = brokerClientMap.get(brokers.get(randIndex)); - BrokerMessage.PullMessageResponse response = brokerClient.getBrokerAPI().pullMessage(request); if (response == null || response.getBaseRes().getResCode() != BrokerMessage.ResCode.RES_CODE_SUCCESS) { LOG.warn("pullMessage failed, topic={}, queue={}, offset={}, broker={}", @@ -109,32 +73,8 @@ public void run() { listener.consumeMessage(response.getContentsList()); BrokerMessage.MessageContent lastMessage = response.getContents(response.getContentsCount() - 1); offset = lastMessage.getOffset() + lastMessage.getSize(); - zkClient.updateConsumerOffset(config.getConsumerGroup(), config.getTopic(), queueId, offset); - // 更新offset - zkData.getConsumerOffsetLock().lock(); - try { - Map> topicOffsetMap - = zkData.getConsumerOffsetMap().get(config.getConsumerGroup()); - if (topicOffsetMap == null) { - Map queueOffsetMap = new HashMap<>(); - queueOffsetMap.put(queueId, offset); - topicOffsetMap = new HashMap<>(); - topicOffsetMap.put(config.getTopic(), queueOffsetMap); - zkData.getConsumerOffsetMap().put(config.getConsumerGroup(), topicOffsetMap); - } else { - Map queueOffsetMap = topicOffsetMap.get(config.getTopic()); - if (queueOffsetMap == null) { - queueOffsetMap = new HashMap<>(); - queueOffsetMap.put(queueId, offset); - topicOffsetMap.put(config.getTopic(), queueOffsetMap); - } else { - queueOffsetMap.put(queueId, offset); - } - } - LOG.debug("new consumer offset={}", JsonUtil.toJson(zkData.getConsumerOffsetMap())); - } finally { - zkData.getConsumerOffsetLock().unlock(); - } + metadataManager.updateConsumerOffset( + config.getConsumerGroup(), config.getTopic(), queueId, offset); } } } @@ -145,28 +85,12 @@ public void run() { * @return key是queueId,value是shardingId */ private Map getConsumedQueue() { - ZKData zkData = ZKData.getInstance(); // 获取所有queueMap - Map queueMap = new HashMap<>(); - zkData.getTopicLock().lock(); - try { - if (zkData.getTopicMap().containsKey(config.getTopic())) { - queueMap.putAll(zkData.getTopicMap().get(config.getTopic())); - } - } finally { - zkData.getTopicLock().unlock(); - } + Map queueMap = metadataManager.getTopicQueueMap(config.getTopic()); Integer[] queueIds = queueMap.keySet().toArray(new Integer[0]); Arrays.sort(queueIds); - // 获取所有consumer list - List consumerIdList = new ArrayList<>(); - zkData.getConsumerIdsLock().lock(); - try { - consumerIdList.addAll(zkData.getConsumerIds()); - } finally { - zkData.getConsumerIdsLock().unlock(); - } + List consumerIdList = metadataManager.getConsumerIds(); int queueSize = queueMap.size(); int consumerSize = consumerIdList.size(); diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java index 0bbd07b..61b7715 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java @@ -3,8 +3,8 @@ import com.github.wenweihu86.distmq.client.BrokerClient; import com.github.wenweihu86.distmq.client.BrokerClientManager; import com.github.wenweihu86.distmq.client.api.BrokerMessage; -import com.github.wenweihu86.distmq.client.zk.ZKClient; -import com.github.wenweihu86.distmq.client.zk.ZKData; +import com.github.wenweihu86.distmq.client.zk.MetadataManager; +import com.github.wenweihu86.distmq.client.zk.Metadata; import com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,80 +19,48 @@ public class Producer { private static final Logger LOG = LoggerFactory.getLogger(Producer.class); private ProducerConfig config; - private ZKClient zkClient; + private MetadataManager metadataManager; public Producer(ProducerConfig config) { this.config = config; BrokerClientManager.setRpcClientOptions(this.config.getRPCClientOptions()); - zkClient = new ZKClient(config); - zkClient.subscribeBroker(); - zkClient.subscribeTopic(); + metadataManager = new MetadataManager(config); + metadataManager.subscribeBroker(); + metadataManager.subscribeTopic(); } public boolean send(String topic, byte[] messageBytes) { - ZKData zkData = ZKData.getInstance(); - // 查询topic是否存在 - boolean topicExist = false; - zkData.getTopicLock().lock(); - try { - Map queueMap = zkData.getTopicMap().get(topic); - if (queueMap != null && queueMap.size() > 0) { - topicExist = true; - } - } finally { - zkData.getTopicLock().unlock(); - } - // 如果topic尚不存在,则创建 + boolean topicExist = metadataManager.checkTopicExist(topic); if (!topicExist) { - zkClient.registerTopic(topic, config.getQueueCountPerTopic()); - Map queueMap = zkClient.readTopicInfo(topic); + metadataManager.registerTopic(topic, config.getQueueCountPerTopic()); + Map queueMap = metadataManager.readTopicInfo(topic); if (queueMap.size() != config.getQueueCountPerTopic()) { LOG.warn("create topic failed, topic={}", topic); return false; } - zkData.getTopicLock().lock(); - try { - if (!zkData.getTopicMap().containsKey(topic)) { - zkData.getTopicMap().put(topic, queueMap); - } - } finally { - zkData.getTopicLock().unlock(); - } + metadataManager.updateTopicMap(topic, queueMap); } // 获取topic的queueId和对应的shardingId - Integer queueId; - Integer shardingId; - zkData.getTopicLock().lock(); - try { - Map queueMap = zkData.getTopicMap().get(topic); - int queueCount = queueMap.size(); - int randomIndex = ThreadLocalRandom.current().nextInt(0, queueCount); - Integer[] queueArray = queueMap.keySet().toArray(new Integer[0]); - queueId = queueArray[randomIndex]; - shardingId = queueMap.get(queueId); - } finally { - zkData.getTopicLock().unlock(); - } + Map queueMap = metadataManager.getTopicQueueMap(topic); + int queueCount = queueMap.size(); + int randomIndex = ThreadLocalRandom.current().nextInt(0, queueCount); + Integer[] queueArray = queueMap.keySet().toArray(new Integer[0]); + Integer queueId = queueArray[randomIndex]; + Integer shardingId = queueMap.get(queueId); // send message to broker + List brokerAddressList = metadataManager.getBrokerAddressList(shardingId); + int randIndex = ThreadLocalRandom.current().nextInt(0, brokerAddressList.size()); + String brokerAddress = brokerAddressList.get(randIndex); + BrokerClient brokerClient = BrokerClientManager.getInstance().getBrokerClientMap().get(brokerAddress); + BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.newBuilder() .setTopic(topic) .setQueue(queueId) .setContent(ByteString.copyFrom(messageBytes)) .build(); - - List brokerAddressList; - zkData.getBrokerLock().lock(); - try { - brokerAddressList = zkData.getBrokerMap().get(shardingId); - } finally { - zkData.getBrokerLock().unlock(); - } - int randIndex = ThreadLocalRandom.current().nextInt(0, brokerAddressList.size()); - String brokerAddress = brokerAddressList.get(randIndex); - BrokerClient brokerClient = BrokerClientManager.getInstance().getBrokerClientMap().get(brokerAddress); BrokerMessage.SendMessageResponse response = brokerClient.getBrokerAPI().sendMessage(request); if (response == null || response.getBaseRes().getResCode() != BrokerMessage.ResCode.RES_CODE_SUCCESS) { LOG.warn("send message failed, topic={}, queue={}, brokerAddress={}", diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/Metadata.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/Metadata.java new file mode 100644 index 0000000..39265e5 --- /dev/null +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/Metadata.java @@ -0,0 +1,271 @@ +package com.github.wenweihu86.distmq.client.zk; + +import java.util.*; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Created by huwenwei on 2017/6/21. + */ +public class Metadata { + // shardingId -> broker address list + private Map> brokerMap = new HashMap<>(); + private Lock brokerLock = new ReentrantLock(); + + // topic -> (queueId -> shardingId) + private Map> topicMap = new HashMap<>(); + private Lock topicLock = new ReentrantLock(); + private Condition topicCondition = topicLock.newCondition(); + + // consumer ids of group + private List consumerIds = new ArrayList<>(); + private Lock consumerIdsLock = new ReentrantLock(); + + // consumer offset, consumerGroup和topic确定情况下, 只需存储queue -> offset的映射 + private Map consumerOffsetMap = new HashMap<>(); + private Lock consumerOffsetLock = new ReentrantLock(); + + public long getConsumerOffset(Integer queueId) { + long offset = 0; + consumerOffsetLock.lock(); + try { + if (consumerOffsetMap.containsKey(queueId)) { + offset = consumerOffsetMap.get(queueId); + } + } finally { + consumerOffsetLock.unlock(); + } + return offset; + } + + public List getBrokerAddressList(Integer shardingId) { + List result = new ArrayList<>(); + brokerLock.lock(); + try { + List brokers = brokerMap.get(shardingId); + if (brokers != null) { + result.addAll(brokers); + } + } finally { + brokerLock.unlock(); + } + return result; + } + + public void addShardingBrokerAddress(Integer shardingId, String address) { + brokerLock.lock(); + try { + brokerMap.get(shardingId).add(address); + } finally { + brokerLock.unlock(); + } + } + + public void removeShardingBrokerAddress(Integer shardingId, String address) { + brokerLock.lock(); + try { + brokerMap.get(shardingId).remove(address); + } finally { + brokerLock.unlock(); + } + } + + public void updateBrokerSharding(Integer shardingId, List brokerAddressList) { + brokerLock.lock(); + try { + brokerMap.put(shardingId, brokerAddressList); + } finally { + brokerLock.unlock(); + } + } + + public List removeBrokerSharding(Integer shardingId) { + brokerLock.lock(); + try { + return brokerMap.remove(Integer.valueOf(shardingId)); + } finally { + brokerLock.unlock(); + } + } + + public List getBrokerShardings() { + List shardings = new ArrayList<>(); + brokerLock.lock(); + try { + for (Integer shardingId : brokerMap.keySet()) { + shardings.add(String.valueOf(shardingId)); + } + } finally { + brokerLock.unlock(); + } + return shardings; + } + + public List getBrokerShardingIds() { + brokerLock.lock(); + try { + return new ArrayList<>(brokerMap.keySet()); + } finally { + brokerLock.unlock(); + } + } + + public List getAllTopics() { + topicLock.lock(); + try { + return new ArrayList<>(topicMap.keySet()); + } finally { + topicLock.unlock(); + } + } + + public boolean checkTopicExist(String topic) { + boolean topicExist = false; + topicLock.lock(); + try { + Map queueMap = topicMap.get(topic); + if (queueMap != null && queueMap.size() > 0) { + topicExist = true; + } + } finally { + topicLock.unlock(); + } + return topicExist; + } + + public void updateTopicMap(String topic, Map queueMap) { + topicLock.lock(); + try { + topicMap.put(topic, queueMap); + topicCondition.signalAll(); + } finally { + topicLock.unlock(); + } + } + + public void removeTopics(Collection deletedTopics) { + topicLock.lock(); + try { + for (String topic : deletedTopics) { + topicMap.remove(topic); + } + } finally { + topicLock.unlock(); + } + } + + public Map getTopicQueueMap(String topic) { + Map result = new HashMap<>(); + topicLock.lock(); + try { + Map queueMap = topicMap.get(topic); + if (queueMap != null) { + result.putAll(queueMap); + } + } finally { + topicLock.unlock(); + } + return result; + } + + public List getTopicQueues(String topic) { + topicLock.lock(); + try { + return new ArrayList<>(topicMap.get(topic).keySet()); + } finally { + topicLock.unlock(); + } + } + + public Integer getQueueSharding(String topic, Integer queueId) { + topicLock.lock(); + try { + Map queueMap = topicMap.get(topic); + if (queueMap != null) { + return queueMap.get(queueId); + } + } finally { + topicLock.unlock(); + } + return null; + } + + public void addTopicQueue(String topic, Integer queueId, Integer shardingId) { + topicLock.lock(); + try { + topicMap.get(topic).put(queueId, shardingId); + topicCondition.signalAll(); + } finally { + topicLock.unlock(); + } + } + + public void deleteTopicQueue(String topic, Collection queueIds) { + topicLock.lock(); + try { + for (Integer queueId : queueIds) { + topicMap.get(topic).remove(queueId); + } + } finally { + topicLock.unlock(); + } + } + + public Map> getBrokerMap() { + return brokerMap; + } + + public void setBrokerMap(Map> brokerMap) { + this.brokerMap = brokerMap; + } + + public Lock getBrokerLock() { + return brokerLock; + } + + public Map> getTopicMap() { + return topicMap; + } + + public void setTopicMap(Map> topicMap) { + this.topicMap = topicMap; + } + + public Lock getTopicLock() { + return topicLock; + } + + public Condition getTopicCondition() { + return topicCondition; + } + + public List getConsumerIds() { + consumerIdsLock.lock(); + try { + return new ArrayList<>(consumerIds); + } finally { + consumerIdsLock.unlock(); + } + } + + public void setConsumerIds(List consumerIds) { + this.consumerIds = consumerIds; + } + + public Lock getConsumerIdsLock() { + return consumerIdsLock; + } + + public void setConsumerIdsLock(Lock consumerIdsLock) { + this.consumerIdsLock = consumerIdsLock; + } + + public Map getConsumerOffsetMap() { + return consumerOffsetMap; + } + + public Lock getConsumerOffsetLock() { + return consumerOffsetLock; + } +} diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/MetadataManager.java similarity index 70% rename from distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java rename to distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/MetadataManager.java index e4164a5..4be2882 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/MetadataManager.java @@ -4,6 +4,7 @@ import com.github.wenweihu86.distmq.client.BrokerClientManager; import com.github.wenweihu86.distmq.client.consumer.ConsumerConfig; import com.github.wenweihu86.distmq.client.producer.ProducerConfig; +import com.github.wenweihu86.distmq.client.utils.JsonUtil; import com.github.wenweihu86.rpc.client.RPCClientOptions; import org.apache.commons.collections.CollectionUtils; import org.apache.curator.RetryPolicy; @@ -25,20 +26,23 @@ /** * Created by wenweihu86 on 2017/6/21. */ -public class ZKClient { - private static final Logger LOG = LoggerFactory.getLogger(ZKClient.class); +public class MetadataManager { + private static final Logger LOG = LoggerFactory.getLogger(MetadataManager.class); private ZKConf zkConf; private CuratorFramework zkClient; + private Metadata metadata; private boolean isProducer = false; private boolean isConsumer = false; - public ZKClient(ZKConf conf) { + public MetadataManager(ZKConf conf) { this.zkConf = conf; if (conf instanceof ProducerConfig) { isProducer = true; } else if (conf instanceof ConsumerConfig) { isConsumer = true; } + this.metadata = new Metadata(); + RetryPolicy retryPolicy = new ExponentialBackoffRetry( zkConf.getRetryIntervalMs(), zkConf.getRetryCount()); this.zkClient = CuratorFrameworkFactory.builder() @@ -51,47 +55,48 @@ public ZKClient(ZKConf conf) { // create path String brokersPath = zkConf.getBasePath() + "/brokers"; - createPath(brokersPath); + createPath(brokersPath, CreateMode.PERSISTENT); String topicsPath = zkConf.getBasePath() + "/topics"; - createPath(topicsPath); + createPath(topicsPath, CreateMode.PERSISTENT); if (isConsumer) { ConsumerConfig consumerConfig = (ConsumerConfig) conf; String consumerIdsPath = zkConf.getBasePath() + "/consumers/" + consumerConfig.getConsumerGroup() + "/ids"; - createPath(consumerIdsPath); + createPath(consumerIdsPath, CreateMode.PERSISTENT); String offsetsPath = zkConf.getBasePath() + "/consumers/" + consumerConfig.getConsumerGroup() + "/offsets"; - createPath(offsetsPath); + createPath(offsetsPath, CreateMode.PERSISTENT); } } - private void createPath(String path) { + private boolean createPath(String path, CreateMode createMode) { + boolean success; try { zkClient.create() .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) + .withMode(createMode) .forPath(path, "".getBytes()); + success = true; LOG.info("create path success, path={}", path); - } catch (Exception ex) { - LOG.debug("createPath exception:", ex); + } catch (KeeperException.NodeExistsException ex1) { + success = true; + LOG.debug("node exist, path={}", path); + } catch (Exception ex2) { + success = false; + LOG.debug("createPath exception:", ex2); } + return success; } public void registerBroker(int shardingId, String ip, int port) { String path = zkConf.getBasePath() + "/brokers/" + shardingId + "/" + ip + ":" + port; - try { - zkClient.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.EPHEMERAL) - .forPath(path, "".getBytes()); - } catch (Exception ex) { - LOG.warn("registerBroker exception:", ex); + boolean success = createPath(path, CreateMode.EPHEMERAL); + if (success) { + LOG.info("register broker sucess, ip={}, port={}", ip, port); + } else { + LOG.warn("register broker failed, ip={}, port={}", ip, port); } - LOG.info("register broker sucess, ip={}, port={}", ip, port); } - // 启动时调用,所以不用加锁 public void subscribeBroker() { - final ZKData zkData = ZKData.getInstance(); - final Map> brokerMap = zkData.getBrokerMap(); try { final String brokerParentPath = zkConf.getBasePath() + "/brokers"; List shardings = zkClient.getChildren().forPath(brokerParentPath); @@ -106,11 +111,16 @@ public void subscribeBroker() { BrokerClientManager.getInstance().getBrokerClientMap().put(address, brokerClient); } } - brokerMap.put(shardingId, brokerAddressList); + + metadata.getBrokerLock().lock(); + try { + metadata.getBrokerMap().put(shardingId, brokerAddressList); + } finally { + metadata.getBrokerLock().unlock(); + } + // 监听broker分片变化 - zkClient.getChildren().usingWatcher( - new BrokerShardingWather(shardingId)) - .forPath(shardingPath); + zkClient.getChildren().usingWatcher(new BrokerShardingWather(shardingId)).forPath(shardingPath); } // 监听/brokers孩子节点变化 zkClient.getChildren().usingWatcher(new BrokersWatcher()).forPath(brokerParentPath); @@ -125,16 +135,7 @@ public void subscribeBroker() { * @param queueNum queue个数 */ public synchronized void registerTopic(String topic, int queueNum) { - ZKData zkData = ZKData.getInstance(); - List shardingIds; - zkData.getBrokerLock().lock(); - try { - Map> brokerMap = zkData.getBrokerMap(); - shardingIds = new ArrayList<>(brokerMap.keySet()); - } finally { - zkData.getBrokerLock().unlock(); - } - + List shardingIds = metadata.getBrokerShardingIds(); int shardingNum = shardingIds.size(); String topicPath = zkConf.getBasePath() + "/topics/" + topic; for (int queueId = 0; queueId < queueNum; queueId++) { @@ -154,7 +155,6 @@ public synchronized void registerTopic(String topic, int queueNum) { public Map readTopic(String topic) { Map queueMap = new HashMap<>(); - ZKData zkData = ZKData.getInstance(); String path = zkConf.getBasePath() + "/topics/" + topic; try { List queues = zkClient.getChildren().forPath(path); @@ -173,50 +173,54 @@ public Map readTopic(String topic) { return queueMap; } - // 启动时调用,所以不用加锁 public void subscribeTopic() { - ZKData zkData = ZKData.getInstance(); String topicParentPath = zkConf.getBasePath() + "/topics"; try { List topics = zkClient.getChildren().forPath(topicParentPath); for (String topic : topics) { Map queueMap = readTopicInfo(topic); - zkData.getTopicMap().put(topic, queueMap); + metadata.updateTopicMap(topic, queueMap); // 监听topic下的queue变化事件 // 这里假定queue与shardingId映射关系不会发生变化,所以没有监听queue节点变化事情 String topicPath = topicParentPath + "/" + topic; - zkClient.getChildren().usingWatcher( - new TopicWatcher(topic)) - .forPath(topicPath); + zkClient.getChildren().usingWatcher(new TopicWatcher(topic)).forPath(topicPath); } // 监听/topics孩子节点变化情况 - zkClient.getChildren().usingWatcher( - new TopicsWather()) - .forPath(topicParentPath); + zkClient.getChildren().usingWatcher(new TopicsWather()).forPath(topicParentPath); } catch (Exception ex) { LOG.warn("subscribeTopic exception:", ex); } } - public void registerConsumer(String consumerGroup, String consumerId) { + public boolean registerConsumer(String consumerGroup, String consumerId) { String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/ids/" + consumerId; + boolean success = createPath(path, CreateMode.EPHEMERAL); + if (success) { + LOG.info("registerConsumer sucess, consumerGroup={}, consumerId={}", consumerGroup, consumerId); + } else { + LOG.warn("registerConsumer failed, consumerGroup={}, consumerId={}", consumerGroup, consumerId); + } + return success; + } + + public void updateConsumerIds(String consumerGroup) { + String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/ids"; try { - zkClient.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.EPHEMERAL) - .forPath(path, "".getBytes()); + List consumerIds = zkClient.getChildren().forPath(path); + metadata.getConsumerIdsLock().lock(); + try { + metadata.setConsumerIds(consumerIds); + } finally { + metadata.getConsumerIdsLock().unlock(); + } } catch (Exception ex) { - LOG.warn("registerConsumer exception:", ex); + LOG.warn("updateConsumerIds exception:", ex); } - LOG.info("registerConsumer sucess, consumerGroup={}, consumerId={}", consumerGroup, consumerId); } public void subscribeConsumer(String consumerGroup) { - ZKData zkData = ZKData.getInstance(); String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/ids"; try { - List consumerIds = zkClient.getChildren().forPath(path); - zkData.setConsumerIds(consumerIds); zkClient.getChildren() .usingWatcher(new ConsumerWatcher(consumerGroup)) .forPath(path); @@ -235,6 +239,7 @@ public Map readConsumerOffset(String consumerGroup, String topic) Map queueOffsetMap = new HashMap<>(); String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/offsets/" + topic; try { + // 从zk读取 List queues = zkClient.getChildren().forPath(path); for (String queue : queues) { String queuePath = path + "/" + queue; @@ -244,7 +249,16 @@ public Map readConsumerOffset(String consumerGroup, String topic) queueOffsetMap.put(Integer.valueOf(queue), offset); } } - LOG.info("readConsumerOffset success, consumerGroup={}, topic={}", consumerGroup, topic); + + // 更新本地内存 + metadata.getConsumerOffsetLock().lock(); + try { + metadata.getConsumerOffsetMap().putAll(queueOffsetMap); + LOG.info("read consumer offset success, result={}", + JsonUtil.toJson(metadata.getConsumerOffsetMap())); + } finally { + metadata.getConsumerOffsetLock().unlock(); + } } catch (Exception ex) { LOG.debug("readConsumerOffset exception:", ex); } @@ -257,13 +271,24 @@ public void updateConsumerOffset(String consumerGroup, String topic, Integer que int currentTryCount = 0; while (currentTryCount++ < maxTryCount) { try { + // 更新zk byte[] dataBytes = String.valueOf(offset).getBytes(); zkClient.setData().forPath(path, dataBytes); LOG.info("updateConsumerOffset success, consumerGroup={}, topic={}, queue={}, offset={}", consumerGroup, topic, queueId, offset); + + // 更新本地内存 + metadata.getConsumerOffsetLock().lock(); + try { + metadata.getConsumerOffsetMap().put(queueId, offset); + LOG.info("new consumer offset map={}", + JsonUtil.toJson(metadata.getConsumerOffsetMap())); + } finally { + metadata.getConsumerOffsetLock().unlock(); + } break; } catch (KeeperException.NoNodeException ex1) { - createPath(path); + createPath(path, CreateMode.PERSISTENT); continue; } catch (Exception ex2) { LOG.warn("updateConsumerOffset exception:", ex2); @@ -290,6 +315,35 @@ public Map readTopicInfo(String topic) { return queueMap; } + public long getConsumerOffset(Integer queueId) { + return metadata.getConsumerOffset(queueId); + } + + public List getBrokerAddressList(Integer shardingId) { + return metadata.getBrokerAddressList(shardingId); + } + + public boolean checkTopicExist(String topic) { + return metadata.checkTopicExist(topic); + } + + public void updateTopicMap(String topic, Map queueMap) { + metadata.updateTopicMap(topic, queueMap); + } + + public Map getTopicQueueMap(String topic) { + return metadata.getTopicQueueMap(topic); + } + + public Integer getQueueSharding(String topic, Integer queueId) { + return metadata.getQueueSharding(topic, queueId); + } + + public List getConsumerIds() { + return metadata.getConsumerIds(); + } + + // 监听consumerGroup下的consumer节点数变化 private class ConsumerWatcher implements CuratorWatcher { private String consumerGroup; @@ -302,13 +356,7 @@ public void process(WatchedEvent event) throws Exception { String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/ids"; LOG.info("get zookeeper notification for path={}", path); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { - try { - List consumerIds = zkClient.getChildren().forPath(path); - ZKData zkData = ZKData.getInstance(); - zkData.setConsumerIds(consumerIds); - } catch (Exception ex) { - LOG.warn("subscribeConsumer exception:", ex); - } + updateConsumerIds(consumerGroup); } zkClient.getChildren() .usingWatcher(new ConsumerWatcher(consumerGroup)) @@ -320,22 +368,11 @@ public void process(WatchedEvent event) throws Exception { private class BrokersWatcher implements CuratorWatcher { @Override public void process(WatchedEvent event) throws Exception { - ZKData zkData = ZKData.getInstance(); String brokerPath = zkConf.getBasePath() + "/brokers"; LOG.info("get zookeeper notification for path={}", brokerPath); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { List newShardings = zkClient.getChildren().forPath(brokerPath); - - List oldShardings = new ArrayList<>(); - zkData.getBrokerLock().lock(); - try { - for (Integer shardingId : zkData.getBrokerMap().keySet()) { - oldShardings.add(String.valueOf(shardingId)); - } - } finally { - zkData.getBrokerLock().unlock(); - } - + List oldShardings = metadata.getBrokerShardings(); Collection addedShardings = CollectionUtils.subtract(newShardings, oldShardings); Collection deletedShardings = CollectionUtils.subtract(oldShardings, newShardings); for (String sharding : addedShardings) { @@ -344,31 +381,17 @@ public void process(WatchedEvent event) throws Exception { List brokerAddressList = zkClient.getChildren().forPath(shardingPath); for (String address : brokerAddressList) { if (isProducer || isConsumer) { - BrokerClient brokerClient = new BrokerClient(address, BrokerClientManager.getRpcClientOptions()); + BrokerClient brokerClient = new BrokerClient( + address, BrokerClientManager.getRpcClientOptions()); BrokerClientManager.getInstance().getBrokerClientMap().putIfAbsent(address, brokerClient); } } - - zkData.getBrokerLock().lock(); - try { - zkData.getBrokerMap().put(shardingId, brokerAddressList); - } finally { - zkData.getBrokerLock().unlock(); - } - - zkClient.getChildren().usingWatcher( - new BrokerShardingWather(shardingId)) - .forPath(shardingPath); + metadata.updateBrokerSharding(shardingId, brokerAddressList); + zkClient.getChildren().usingWatcher(new BrokerShardingWather(shardingId)).forPath(shardingPath); } for (String sharding : deletedShardings) { - List brokerList; - zkData.getBrokerLock().lock(); - try { - brokerList = zkData.getBrokerMap().remove(Integer.valueOf(sharding)); - } finally { - zkData.getBrokerLock().unlock(); - } + List brokerList = metadata.removeBrokerSharding(Integer.valueOf(sharding)); if ((isProducer || isConsumer) && CollectionUtils.isNotEmpty(brokerList)) { ConcurrentMap brokerClientMap = BrokerClientManager.getInstance().getBrokerClientMap(); @@ -396,47 +419,25 @@ public BrokerShardingWather(int shardingId) { @Override public void process(WatchedEvent event) throws Exception { - ZKData zkData = ZKData.getInstance(); String shardingPath = zkConf.getBasePath() + "/brokers/" + shardingId; LOG.info("get zookeeper notification for path={}", shardingPath); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { List newBrokerAddressList = zkClient.getChildren().forPath(shardingPath); - List oldBrokerAddressList; - zkData.getBrokerLock().lock(); - try { - oldBrokerAddressList = zkData.getBrokerMap().get(shardingPath); - if (oldBrokerAddressList == null) { - oldBrokerAddressList = new ArrayList<>(); - } - } finally { - zkData.getBrokerLock().unlock(); - } - + List oldBrokerAddressList = metadata.getBrokerAddressList(shardingId); Collection addedBrokerAddressList = CollectionUtils.subtract(newBrokerAddressList, oldBrokerAddressList); Collection deletedBrokerAddressList = CollectionUtils.subtract(oldBrokerAddressList, newBrokerAddressList); for (String address : addedBrokerAddressList) { - zkData.getBrokerLock().lock(); - try { - zkData.getBrokerMap().get(shardingId).add(address); - } finally { - zkData.getBrokerLock().unlock(); - } - + metadata.addShardingBrokerAddress(shardingId, address); if (isProducer || isConsumer) { - BrokerClient brokerClient = new BrokerClient(address, BrokerClientManager.getRpcClientOptions()); + BrokerClient brokerClient = new BrokerClient( + address, BrokerClientManager.getRpcClientOptions()); BrokerClientManager.getInstance().getBrokerClientMap().putIfAbsent(address, brokerClient); } } for (String address : deletedBrokerAddressList) { - zkData.getBrokerLock().lock(); - try { - zkData.getBrokerMap().get(shardingId).remove(address); - } finally { - zkData.getBrokerLock().unlock(); - } - + metadata.removeShardingBrokerAddress(shardingId, address); if (isProducer || isConsumer) { BrokerClient brokerClient = BrokerClientManager.getInstance().getBrokerClientMap().remove(address); @@ -454,18 +455,11 @@ public void process(WatchedEvent event) throws Exception { private class TopicsWather implements CuratorWatcher { @Override public void process(WatchedEvent event) throws Exception { - ZKData zkData = ZKData.getInstance(); String topicParentPath = zkConf.getBasePath() + "/topics"; LOG.info("get zookeeper notification for path={}", topicParentPath); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { List newTopics = zkClient.getChildren().forPath(topicParentPath); - List oldTopics; - zkData.getTopicLock().lockInterruptibly(); - try { - oldTopics = new ArrayList<>(zkData.getTopicMap().keySet()); - } finally { - zkData.getTopicLock().unlock(); - } + List oldTopics = metadata.getAllTopics(); Collection addedTopics = CollectionUtils.subtract(newTopics, oldTopics); Collection deletedTopics = CollectionUtils.subtract(oldTopics, newTopics); for (String topic : addedTopics) { @@ -474,24 +468,9 @@ public void process(WatchedEvent event) throws Exception { .usingWatcher(new TopicWatcher(topic)) .forPath(topicPath); Map queueMap = readTopicInfo(topic); - - zkData.getTopicLock().lock(); - try { - zkData.getTopicMap().put(topic, queueMap); - zkData.getTopicCondition().signalAll(); - } finally { - zkData.getTopicLock().unlock(); - } - } - - zkData.getTopicLock().lock(); - try { - for (String topic : deletedTopics) { - zkData.getTopicMap().remove(topic); - } - } finally { - zkData.getTopicLock().unlock(); + metadata.updateTopicMap(topic, queueMap); } + metadata.removeTopics(deletedTopics); } zkClient.getChildren() .usingWatcher(new TopicsWather()) @@ -509,7 +488,6 @@ public TopicWatcher(String topic) { @Override public void process(WatchedEvent event) throws Exception { - ZKData zkData = ZKData.getInstance(); String topicPath = zkConf.getBasePath() + "/topics/" + topic; LOG.info("get zookeeper notification for path={}", topicPath); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { @@ -518,39 +496,16 @@ public void process(WatchedEvent event) throws Exception { for (String queue : newQueues) { newQueueIds.add(Integer.valueOf(queue)); } - - List oldQueueIds; - zkData.getTopicLock().lockInterruptibly(); - try { - oldQueueIds = new ArrayList<>(zkData.getTopicMap().get(topic).keySet()); - } finally { - zkData.getTopicLock().unlock(); - } - + List oldQueueIds = metadata.getTopicQueues(topic); Collection addedQueueIds = CollectionUtils.subtract(newQueueIds, oldQueueIds); Collection deletedQueueIds = CollectionUtils.subtract(oldQueueIds, newQueueIds); for (Integer queueId : addedQueueIds) { String queuePath = topicPath + "/" + queueId; String queueData = new String(zkClient.getData().forPath(queuePath)); Integer shardingId = Integer.valueOf(queueData); - - zkData.getTopicLock().lock(); - try { - zkData.getTopicMap().get(topic).put(queueId, shardingId); - zkData.getTopicCondition().signalAll(); - } finally { - zkData.getTopicLock().unlock(); - } - } - - zkData.getTopicLock().lock(); - try { - for (Integer queueId : deletedQueueIds) { - zkData.getTopicMap().get(topic).remove(queueId); - } - } finally { - zkData.getTopicLock().unlock(); + metadata.addTopicQueue(topic, queueId, shardingId); } + metadata.deleteTopicQueue(topic, deletedQueueIds); } zkClient.getChildren() .usingWatcher(new TopicWatcher(topic)) diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKData.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKData.java deleted file mode 100644 index 289d63d..0000000 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKData.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.github.wenweihu86.distmq.client.zk; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Created by huwenwei on 2017/6/21. - */ -public class ZKData { - private static ZKData instance; - - public static ZKData getInstance() { - if (instance == null) { - instance = new ZKData(); - } - return instance; - } - - // shardingId -> broker address list - private Map> brokerMap = new HashMap<>(); - private Lock brokerLock = new ReentrantLock(); - - // topic -> (queueId -> shardingId) - private Map> topicMap = new HashMap<>(); - private Lock topicLock = new ReentrantLock(); - private Condition topicCondition = topicLock.newCondition(); - - // consumer ids of group - private List consumerIds = new ArrayList<>(); - private Lock consumerIdsLock = new ReentrantLock(); - - // consumer offset - // consumerGroup -> (topic -> (queue -> offset)) - private Map>> consumerOffsetMap = new HashMap<>(); - private Lock consumerOffsetLock = new ReentrantLock(); - - public static void setInstance(ZKData instance) { - ZKData.instance = instance; - } - - public Map> getBrokerMap() { - return brokerMap; - } - - public void setBrokerMap(Map> brokerMap) { - this.brokerMap = brokerMap; - } - - public Lock getBrokerLock() { - return brokerLock; - } - - public Map> getTopicMap() { - return topicMap; - } - - public void setTopicMap(Map> topicMap) { - this.topicMap = topicMap; - } - - public Lock getTopicLock() { - return topicLock; - } - - public Condition getTopicCondition() { - return topicCondition; - } - - public List getConsumerIds() { - return consumerIds; - } - - public void setConsumerIds(List consumerIds) { - this.consumerIds = consumerIds; - } - - public Lock getConsumerIdsLock() { - return consumerIdsLock; - } - - public void setConsumerIdsLock(Lock consumerIdsLock) { - this.consumerIdsLock = consumerIdsLock; - } - - public Map>> getConsumerOffsetMap() { - return consumerOffsetMap; - } - - public Lock getConsumerOffsetLock() { - return consumerOffsetLock; - } -}