From 7f1674e36266fdadd38d426a3fa39e48fdab597e Mon Sep 17 00:00:00 2001 From: wenweihu86 Date: Sun, 25 Jun 2017 17:44:30 +0800 Subject: [PATCH] add example and fix some bugs --- .../distmq/broker/BrokerAPIImpl.java | 55 ++++++---- .../wenweihu86/distmq/broker/BrokerMain.java | 14 +-- .../distmq/broker/config/GlobalConf.java | 13 +-- .../wenweihu86/distmq/broker/log/Segment.java | 8 +- .../distmq/broker/log/SegmentedLog.java | 31 +++--- distmq-client/pom.xml | 2 +- .../distmq/client/consumer/Consumer.java | 5 +- .../distmq/client/producer/Producer.java | 14 +-- .../wenweihu86/distmq/client/zk/ZKClient.java | 102 ++++++++++++------ distmq-example/build.sh | 4 + distmq-example/pom.xml | 68 +----------- distmq-example/run.sh | 7 ++ .../distmq/example/ConsumerMain.java | 34 ++++++ .../distmq/example/ProducerMain.java | 26 +++++ distmq-example/src/main/resources/log4j2.xml | 39 +++++++ 15 files changed, 262 insertions(+), 160 deletions(-) create mode 100644 distmq-example/build.sh create mode 100644 distmq-example/run.sh create mode 100644 distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ConsumerMain.java create mode 100644 distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ProducerMain.java create mode 100644 distmq-example/src/main/resources/log4j2.xml diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerAPIImpl.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerAPIImpl.java index a01b99f..fdabd8f 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerAPIImpl.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerAPIImpl.java @@ -3,6 +3,7 @@ import com.github.wenweihu86.distmq.broker.config.GlobalConf; import com.github.wenweihu86.distmq.client.api.BrokerAPI; import com.github.wenweihu86.distmq.client.api.BrokerMessage; +import com.github.wenweihu86.distmq.client.zk.ZKClient; import com.github.wenweihu86.distmq.client.zk.ZKData; import com.github.wenweihu86.raft.RaftNode; import com.github.wenweihu86.raft.proto.RaftMessage; @@ -22,10 +23,12 @@ public class BrokerAPIImpl implements BrokerAPI { private RaftNode raftNode; private BrokerStateMachine stateMachine; + private ZKClient zkClient; - public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine) { + public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine, ZKClient zkClient) { this.raftNode = raftNode; this.stateMachine = stateMachine; + this.zkClient = zkClient; } @Override @@ -33,31 +36,37 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe BrokerMessage.SendMessageResponse.Builder responseBuilder = BrokerMessage.SendMessageResponse.newBuilder(); BrokerMessage.BaseResponse.Builder baseResBuilder = BrokerMessage.BaseResponse.newBuilder(); baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_FAIL); + + // 验证queue存在,并且属于该sharding ZKData zkData = ZKData.getInstance(); + GlobalConf conf = GlobalConf.getInstance(); + zkData.getTopicLock().lock(); Map> topicMap = zkData.getTopicMap(); Map queueMap = topicMap.get(request.getTopic()); // topic由producer提前创建完成,所以这里会校验不存在的话,直接返回失败 - if (queueMap == null) { - String message = "topic is not exist"; - baseResBuilder.setResMsg(message); - responseBuilder.setBaseRes(baseResBuilder.build()); - LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}", - request.getTopic(), request.getQueue(), - responseBuilder.getBaseRes().getResCode(), - responseBuilder.getBaseRes().getResMsg()); - return responseBuilder.build(); - } - GlobalConf conf = GlobalConf.getInstance(); - Integer shardingId = queueMap.get(request.getQueue()); - if (shardingId == null || shardingId != conf.getShardingId()) { - String message = "queue not exist or not be included by this sharding"; - baseResBuilder.setResMsg(message); - responseBuilder.setBaseRes(baseResBuilder.build()); - LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}", - request.getTopic(), request.getQueue(), - responseBuilder.getBaseRes().getResCode(), - responseBuilder.getBaseRes().getResMsg()); - return responseBuilder.build(); + if (queueMap == null + || !queueMap.containsKey(request.getQueue()) + || queueMap.get(request.getQueue()) != conf.getShardingId()) { + queueMap = zkClient.readTopicInfo(request.getTopic()); + + zkData.getTopicLock().lock(); + try { + zkData.getTopicMap().put(request.getTopic(), queueMap); + } finally { + zkData.getTopicLock().unlock(); + } + if (queueMap == null + || !queueMap.containsKey(request.getQueue()) + || queueMap.get(request.getQueue()) != conf.getShardingId()) { + String message = "queue not exist or not be included by this sharding"; + baseResBuilder.setResMsg(message); + responseBuilder.setBaseRes(baseResBuilder.build()); + LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}", + request.getTopic(), request.getQueue(), + responseBuilder.getBaseRes().getResCode(), + responseBuilder.getBaseRes().getResMsg()); + return responseBuilder.build(); + } } // 如果自己不是leader,将写请求转发给leader @@ -91,7 +100,7 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRequest request) { BrokerMessage.PullMessageResponse response = stateMachine.pullMessage(request); LOG.info("pullMessage request, topic={}, queue={}, " - + "resCode, resSize={}", + + "resCode={}, resSize={}", request.getTopic(), request.getQueue(), response.getBaseRes().getResCode(), response.getContentsCount()); diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java index eed8c40..075491f 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java @@ -26,6 +26,10 @@ public static void main(String[] args) { List servers = conf.getServers(); String dataDir = conf.getDataDir(); + // 初始化zookeeper + ZKConf zkConf = conf.getZkConf(); + ZKClient zkClient = new ZKClient(zkConf); + // 初始化RPCServer RPCServer server = new RPCServer(localServer.getEndPoint().getPort()); // 应用状态机 @@ -41,19 +45,17 @@ public static void main(String[] args) { RaftClientService raftClientService = new RaftClientServiceImpl(raftNode); server.registerService(raftClientService); // 注册应用自己提供的服务 - BrokerAPIImpl brokerAPI = new BrokerAPIImpl(raftNode, stateMachine); + BrokerAPIImpl brokerAPI = new BrokerAPIImpl(raftNode, stateMachine, zkClient); server.registerService(brokerAPI); // 启动RPCServer,初始化Raft节点 server.start(); raftNode.init(); - // 注册zk - ZKConf zkConf = conf.getZkConf(); - ZKClient zkClient = new ZKClient(zkConf); + + // 订阅broker和topic的变化 zkClient.subscribeBroker(); zkClient.subscribeTopic(); // 等成为raft集群成员后,才能注册到zk - - while (ConfigurationUtils.containsServer( + while (!ConfigurationUtils.containsServer( raftNode.getConfiguration(), conf.getLocalServer().getServerId())) { try { Thread.sleep(1000); diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/config/GlobalConf.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/config/GlobalConf.java index a3775cc..c7d61a9 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/config/GlobalConf.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/config/GlobalConf.java @@ -80,13 +80,14 @@ private List readServers() { } private ZKConf readZKConf() { + Toml zookeeperToml = toml.getTable("zookeeper"); zkConf = new ZKConf(); - zkConf.setServers(toml.getString("servers")); - zkConf.setConnectTimeoutMs(toml.getLong("connect_timeout_ms").intValue()); - zkConf.setSessionTimeoutMs(toml.getLong("session_timeout_ms").intValue()); - zkConf.setRetryCount(toml.getLong("retry_count").intValue()); - zkConf.setRetryIntervalMs(toml.getLong("retry_interval_ms").intValue()); - zkConf.setBasePath(toml.getString("base_path")); + zkConf.setServers(zookeeperToml.getString("servers")); + zkConf.setConnectTimeoutMs(zookeeperToml.getLong("connect_timeout_ms").intValue()); + zkConf.setSessionTimeoutMs(zookeeperToml.getLong("session_timeout_ms").intValue()); + zkConf.setRetryCount(zookeeperToml.getLong("retry_count").intValue()); + zkConf.setRetryIntervalMs(zookeeperToml.getLong("retry_interval_ms").intValue()); + zkConf.setBasePath(zookeeperToml.getString("base_path")); return zkConf; } diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java index 2a7fff6..c346899 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java @@ -88,14 +88,19 @@ public long append(byte[] messageContent) { byteBuffer.putLong(BrokerUtils.getCRC32(messageContent)); byteBuffer.putInt(messageContent.length); byteBuffer.put(messageContent); + byteBuffer.flip(); writeSize = channel.write(byteBuffer); + channel.force(true); offset = startOffset; endOffset = startOffset + writeSize; } else { byteBuffer.putLong(BrokerUtils.getCRC32(messageContent)); byteBuffer.putInt(messageContent.length); byteBuffer.put(messageContent); + byteBuffer.flip(); + channel.position(endOffset); writeSize = channel.write(byteBuffer); + channel.force(true); offset = endOffset; endOffset += writeSize; } @@ -119,11 +124,12 @@ public byte[] read(long offset) { LOG.warn("read message error"); return null; } + headerBuffer.flip(); long crc32 = headerBuffer.getLong(); int messageLen = headerBuffer.getInt(); ByteBuffer messageContentBuffer = ByteBuffer.allocate(messageLen); readLen = channel.read(messageContentBuffer); - if (readLen < messageLen) { + if (readLen != messageLen) { LOG.warn("read message error"); return null; } diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java index 5fa1462..66e414a 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java @@ -55,21 +55,21 @@ public long append(byte[] messageContent) { int maxSegmentSize = GlobalConf.getInstance().getMaxSegmentSize(); if (lastSegment.getFileSize() + messageContent.length > maxSegmentSize) { isNeedNewSegmentFile = true; + // 最后一个segment的文件close并改名 + lastSegment.close(); + lastSegment.setCanWrite(false); + String newFileName = String.format("%020d-%020d", + lastSegment.getStartOffset(), lastSegment.getEndOffset()); + String newFullFileName = segmentDir + File.separator + newFileName; + File newFile = new File(newFullFileName); + newFile.createNewFile(); + String oldFullFileName = segmentDir + File.separator + lastSegment.getFileName(); + File oldFile = new File(oldFullFileName); + oldFile.renameTo(newFile); + lastSegment.setFileName(newFileName); + lastSegment.setRandomAccessFile(RaftFileUtils.openFile(segmentDir, newFileName, "r")); + lastSegment.setChannel(lastSegment.getRandomAccessFile().getChannel()); } - // 最后一个segment的文件close并改名 - lastSegment.close(); - lastSegment.setCanWrite(false); - String newFileName = String.format("%020d-%020d", - lastSegment.getStartOffset(), lastSegment.getEndOffset()); - String newFullFileName = segmentDir + File.separator + newFileName; - File newFile = new File(newFullFileName); - newFile.createNewFile(); - String oldFullFileName = segmentDir + File.separator + lastSegment.getFileName(); - File oldFile = new File(oldFullFileName); - oldFile.renameTo(newFile); - lastSegment.setFileName(newFileName); - lastSegment.setRandomAccessFile(RaftFileUtils.openFile(segmentDir, newFileName, "r")); - lastSegment.setChannel(lastSegment.getRandomAccessFile().getChannel()); } } @@ -77,7 +77,7 @@ public long append(byte[] messageContent) { // 新建segment文件 if (isNeedNewSegmentFile) { // open new segment file - long newStartOffset = getLastEndOffset() + Segment.SEGMENT_HEADER_LENGTH; + long newStartOffset = getLastEndOffset(); String newSegmentFileName = String.format("open-%d", newStartOffset); String newFullFileName = segmentDir + File.separator + newSegmentFileName; File newSegmentFile = new File(newFullFileName); @@ -85,6 +85,7 @@ public long append(byte[] messageContent) { newSegmentFile.createNewFile(); } newSegment = new Segment(segmentDir, newSegmentFileName); + startOffsetSegmentMap.put(newSegment.getStartOffset(), newSegment); } else { newSegment = startOffsetSegmentMap.lastEntry().getValue(); } diff --git a/distmq-client/pom.xml b/distmq-client/pom.xml index 9f5585e..94081ea 100644 --- a/distmq-client/pom.xml +++ b/distmq-client/pom.xml @@ -53,7 +53,7 @@ org.apache.curator curator-recipes - 3.3.0 + 2.12.0 junit diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/Consumer.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/Consumer.java index f14b567..f438a9e 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/Consumer.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/consumer/Consumer.java @@ -32,7 +32,10 @@ public Consumer(ConsumerConfig config, MessageListener listener) { zkClient.subscribeBroker(); zkClient.subscribeTopic(); this.offset = zkClient.readConsumerOffset(config.getConsumerGroup(), config.getTopic()); - this.timer.scheduleAtFixedRate(this, 1000, 5000, TimeUnit.MILLISECONDS); + } + + public void start() { + timer.scheduleAtFixedRate(this, 1000, 5000, TimeUnit.MILLISECONDS); } @Override diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java index a4f310f..4c4122d 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/producer/Producer.java @@ -31,20 +31,14 @@ public Producer(ProducerConfig config) { public boolean send(String topic, byte[] messageBytes) { ZKData zkData = ZKData.getInstance(); - Map queueMap; - zkData.getTopicLock().lock(); - try { - queueMap = zkData.getTopicMap().get(topic); - } finally { - zkData.getTopicLock().unlock(); - } Integer queueId; Integer shardingId; - if (queueMap == null) { + if (zkData.getTopicMap().get(topic) == null) { zkClient.registerTopic(topic, config.getQueueCountPerTopic()); zkData.getTopicLock().lock(); try { - while (!zkData.getTopicMap().containsKey(topic)) { + while (zkData.getTopicMap().get(topic) == null + || zkData.getTopicMap().get(topic).size() != config.getQueueCountPerTopic()) { zkData.getTopicCondition().awaitUninterruptibly(); } } finally { @@ -54,7 +48,7 @@ public boolean send(String topic, byte[] messageBytes) { zkData.getTopicLock().lock(); try { - queueMap = zkData.getTopicMap().get(topic); + Map queueMap = zkData.getTopicMap().get(topic); int queueCount = queueMap.size(); int randomIndex = ThreadLocalRandom.current().nextInt(0, queueCount); Integer[] queueArray = queueMap.keySet().toArray(new Integer[0]); diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java index 5b1ef48..c95451e 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/ZKClient.java @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory; import java.util.*; +import java.util.List; import java.util.concurrent.ConcurrentMap; /** @@ -46,11 +47,35 @@ public ZKClient(ZKConf conf) { .sessionTimeoutMs(zkConf.getSessionTimeoutMs()) .build(); this.zkClient.start(); + + // create path + String brokersPath = zkConf.getBasePath() + "/brokers"; + createPath(brokersPath); + String topicsPath = zkConf.getBasePath() + "/topics"; + createPath(topicsPath); + if (isConsumer) { + ConsumerConfig consumerConfig = (ConsumerConfig) conf; + String consumerIdsPath = zkConf.getBasePath() + "/consumers/" + consumerConfig.getConsumerGroup() + "/ids"; + createPath(consumerIdsPath); + String offsetsPath = zkConf.getBasePath() + "/consumers/" + consumerConfig.getConsumerGroup() + "/offsets"; + createPath(offsetsPath); + } + } + + private void createPath(String path) { + try { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, "".getBytes()); + LOG.info("create path success, path={}", path); + } catch (Exception ex) { + LOG.debug("createPath exception:", ex); + } } public void registerBroker(int shardingId, String ip, int port) { - String path = String.format("%s/brokers/%d/%s:%d", - zkConf.getBasePath(), shardingId, ip, port); + String path = zkConf.getBasePath() + "/brokers/" + shardingId + "/" + ip + ":" + port; try { zkClient.create() .creatingParentsIfNeeded() @@ -129,27 +154,15 @@ public synchronized void registerTopic(String topic, int queueNum) { // 启动时调用,所以不用加锁 public void subscribeTopic() { ZKData zkData = ZKData.getInstance(); - Map> topicMap = zkData.getTopicMap(); - String topicParentPath = zkConf.getBasePath() + "/topics/"; + String topicParentPath = zkConf.getBasePath() + "/topics"; try { List topics = zkClient.getChildren().forPath(topicParentPath); for (String topic : topics) { - Map queueMap = topicMap.get(topic); - if (queueMap == null) { - queueMap = new HashMap<>(); - topicMap.put(topic, queueMap); - } - String topicPath = topicParentPath + "/" + topic; - List queues = zkClient.getChildren().forPath(topicPath); - for (String queue : queues) { - String queuePath = topicPath + "/" + queue; - String queueData = new String(zkClient.getData().forPath(queuePath)); - Integer shardingId = Integer.valueOf(queueData); - Integer queueId = Integer.valueOf(queue); - queueMap.put(queueId, shardingId); - } + Map queueMap = readTopicInfo(topic); + zkData.getTopicMap().put(topic, queueMap); // 监听topic下的queue变化事件 // 这里假定queue与shardingId映射关系不会发生变化,所以没有监听queue节点变化事情 + String topicPath = topicParentPath + "/" + topic; zkClient.getChildren().usingWatcher( new TopicWatcher(topic)) .forPath(topicPath); @@ -198,8 +211,10 @@ public long readConsumerOffset(String consumerGroup, String topic) { if (dataBytes != null) { offset = Long.valueOf(new String(dataBytes)); } + LOG.info("readConsumerOffset success, consumerGroup={}, topic={}, offset={}", + consumerGroup, topic, offset); } catch (Exception ex) { - LOG.warn("readConsumerOffset exception:", ex); + LOG.debug("readConsumerOffset exception:", ex); } return offset; } @@ -209,11 +224,31 @@ public void updateConsumerOffset(String consumerGroup, String topic, long offset try { byte[] dataBytes = String.valueOf(offset).getBytes(); zkClient.setData().forPath(path, dataBytes); + LOG.info("updateConsumerOffset success, consumerGroup={}, topic={}, offset={}", + consumerGroup, topic, offset); } catch (Exception ex) { LOG.warn("updateConsumerOffset exception:", ex); } } + public Map readTopicInfo(String topic) { + Map queueMap = new HashMap<>(); + String topicPath = zkConf.getBasePath() + "/topics/" + topic; + try { + List queues = zkClient.getChildren().forPath(topicPath); + for (String queue : queues) { + String queuePath = topicPath + "/" + queue; + String queueData = new String(zkClient.getData().forPath(queuePath)); + Integer shardingId = Integer.valueOf(queueData); + Integer queueId = Integer.valueOf(queue); + queueMap.put(queueId, shardingId); + } + } catch (Exception ex) { + LOG.info("readTopic failed, exception:", ex); + } + return queueMap; + } + private class ConsumerWatcher implements CuratorWatcher { private String consumerGroup; @@ -241,7 +276,6 @@ private class BrokersWatcher implements CuratorWatcher { @Override public void process(WatchedEvent event) throws Exception { ZKData zkData = ZKData.getInstance(); - Map> brokerMap = zkData.getBrokerMap(); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { String brokerPath = zkConf.getBasePath() + "/brokers"; List newShardings = zkClient.getChildren().forPath(brokerPath); @@ -314,7 +348,6 @@ public BrokerShardingWather(int shardingId) { @Override public void process(WatchedEvent event) throws Exception { ZKData zkData = ZKData.getInstance(); - Map> brokerMap = zkData.getBrokerMap(); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { String shardingPath = zkConf.getBasePath() + "/brokers/" + shardingId; List newBrokerAddressList = zkClient.getChildren().forPath(shardingPath); @@ -322,9 +355,13 @@ public void process(WatchedEvent event) throws Exception { zkData.getBrokerLock().lock(); try { oldBrokerAddressList = zkData.getBrokerMap().get(shardingPath); + if (oldBrokerAddressList == null) { + oldBrokerAddressList = new ArrayList<>(); + } } finally { zkData.getBrokerLock().unlock(); } + Collection addedBrokerAddressList = CollectionUtils.subtract(newBrokerAddressList, oldBrokerAddressList); Collection deletedBrokerAddressList @@ -367,6 +404,7 @@ public void process(WatchedEvent event) throws Exception { ZKData zkData = ZKData.getInstance(); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { String topicParentPath = zkConf.getBasePath() + "/topics"; + LOG.info("get zookeeper notification for path={}", topicParentPath); List newTopics = zkClient.getChildren().forPath(topicParentPath); List oldTopics; zkData.getTopicLock().lockInterruptibly(); @@ -378,20 +416,13 @@ public void process(WatchedEvent event) throws Exception { Collection addedTopics = CollectionUtils.subtract(newTopics, oldTopics); Collection deletedTopics = CollectionUtils.subtract(oldTopics, newTopics); for (String topic : addedTopics) { - Map queueMap = new HashMap<>(); String topicPath = topicParentPath + "/" + topic; - List queueList = zkClient.getChildren().forPath(topicPath); - for (String queue : queueList) { - String queuePath = topicPath + "/" + queue; - String queueData = new String(zkClient.getData().forPath(queuePath)); - int shardingId = Integer.valueOf(queueData); - int queueId = Integer.valueOf(queue); - queueMap.put(queueId, shardingId); - } zkClient.getChildren().usingWatcher( new TopicWatcher(topic)) .forPath(topicPath); - zkData.getTopicLock().lockInterruptibly(); + Map queueMap = readTopicInfo(topic); + + zkData.getTopicLock().lock(); try { zkData.getTopicMap().put(topic, queueMap); zkData.getTopicCondition().signalAll(); @@ -400,7 +431,7 @@ public void process(WatchedEvent event) throws Exception { } } - zkData.getTopicLock().lockInterruptibly(); + zkData.getTopicLock().lock(); try { for (String topic : deletedTopics) { zkData.getTopicMap().remove(topic); @@ -426,6 +457,7 @@ public void process(WatchedEvent event) throws Exception { ZKData zkData = ZKData.getInstance(); if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { String topicPath = zkConf.getBasePath() + "/topics/" + topic; + LOG.info("get zookeeper notification for path={}", topicPath); List newQueues = zkClient.getChildren().forPath(topicPath); List newQueueIds = new ArrayList<>(); for (String queue : newQueues) { @@ -447,14 +479,16 @@ public void process(WatchedEvent event) throws Exception { String queueData = new String(zkClient.getData().forPath(queuePath)); Integer shardingId = Integer.valueOf(queueData); - zkData.getTopicLock().lockInterruptibly(); + zkData.getTopicLock().lock(); try { zkData.getTopicMap().get(topic).put(queueId, shardingId); + zkData.getTopicCondition().signalAll(); } finally { zkData.getTopicLock().unlock(); } } - zkData.getTopicLock().lockInterruptibly(); + + zkData.getTopicLock().lock(); try { for (Integer queueId : deletedQueueIds) { zkData.getTopicMap().get(topic).remove(queueId); diff --git a/distmq-example/build.sh b/distmq-example/build.sh new file mode 100644 index 0000000..02ba628 --- /dev/null +++ b/distmq-example/build.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +mvn clean package +mvn dependency:copy-dependencies diff --git a/distmq-example/pom.xml b/distmq-example/pom.xml index a683c22..38cdb7c 100644 --- a/distmq-example/pom.xml +++ b/distmq-example/pom.xml @@ -15,50 +15,6 @@ - - - src/main/resources - true - ../conf - - **/*.xml - **/*.properties - **/*.toml - - - - src/main/resources - false - ../conf - - **/*.xml - **/*.properties - **/*.toml - - - - - - - src/test/resources - true - - **/*.xml - **/*.properties - **/*.toml - - - - src/test/resources - false - - **/*.xml - **/*.properties - **/*.toml - - - - org.apache.maven.plugins @@ -70,25 +26,6 @@ UTF-8 - - org.apache.maven.plugins - maven-assembly-plugin - 3.0.0 - - - src/main/assembly/assembly.xml - - - - - make-assembly - package - - single - - - - @@ -103,6 +40,11 @@ toml4j 0.7.1 + + com.github.wenweihu86.distmq + distmq-client + 1.0.0-SNAPSHOT + junit junit diff --git a/distmq-example/run.sh b/distmq-example/run.sh new file mode 100644 index 0000000..aae60a3 --- /dev/null +++ b/distmq-example/run.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +# run producer +java -cp dependency/*:distmq-example-1.0.0-SNAPSHOT.jar com.github.wenweihu86.distmq.example.ProducerMain + +# consumer command, should be run at another terminal +java -cp dependency/*:distmq-example-1.0.0-SNAPSHOT.jar com.github.wenweihu86.distmq.example.ConsumerMain diff --git a/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ConsumerMain.java b/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ConsumerMain.java new file mode 100644 index 0000000..6aba7c8 --- /dev/null +++ b/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ConsumerMain.java @@ -0,0 +1,34 @@ +package com.github.wenweihu86.distmq.example; + +import com.github.wenweihu86.distmq.client.api.BrokerMessage; +import com.github.wenweihu86.distmq.client.consumer.Consumer; +import com.github.wenweihu86.distmq.client.consumer.ConsumerConfig; +import com.github.wenweihu86.distmq.client.consumer.MessageListener; + +import java.util.List; + +/** + * Created by wenweihu86 on 2017/6/25. + */ +public class ConsumerMain { + public static void main(String[] args) { + final ConsumerConfig config = new ConsumerConfig(); + config.setServers("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"); + config.setConsumerGroup("example-consumer-group"); + config.setTopic("example-topic"); + Consumer consumer = new Consumer(config, new MessageListener() { + @Override + public void consumeMessage(List messages) { + for (BrokerMessage.MessageContent message : messages) { + String content = new String(message.getContent().toByteArray()); + String topic = message.getTopic(); + int queue = message.getQueue(); + long offset = message.getOffset(); + System.out.printf("topic=%s, queue=%d, offset=%d, message=%s\n", + topic, queue, offset, content); + } + } + }); + consumer.start(); + } +} diff --git a/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ProducerMain.java b/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ProducerMain.java new file mode 100644 index 0000000..ac62e93 --- /dev/null +++ b/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ProducerMain.java @@ -0,0 +1,26 @@ +package com.github.wenweihu86.distmq.example; + +import com.github.wenweihu86.distmq.client.producer.Producer; +import com.github.wenweihu86.distmq.client.producer.ProducerConfig; + +/** + * Created by wenweihu86 on 2017/6/25. + */ +public class ProducerMain { + + public static void main(String[] args) { + ProducerConfig config = new ProducerConfig(); + config.setServers("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"); + Producer producer = new Producer(config); + String topic = "example-topic"; + for (int i = 0; i < 100; i++) { + String message = "hello distmq:" + i; + boolean success = producer.send(topic, message.getBytes()); + if (success) { + System.out.printf("send message success, topic=%s, message=%s\n", + topic, message); + } + } + } + +} diff --git a/distmq-example/src/main/resources/log4j2.xml b/distmq-example/src/main/resources/log4j2.xml new file mode 100644 index 0000000..4c5eb0f --- /dev/null +++ b/distmq-example/src/main/resources/log4j2.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + %d %p [%t]\t%m%n + + + + + + + + + + + + + + + + + + + + \ No newline at end of file