Skip to content

Commit

Permalink
update metadata data structure
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 27, 2017
1 parent f0ec50a commit 0db889f
Show file tree
Hide file tree
Showing 9 changed files with 468 additions and 480 deletions.
11 changes: 1 addition & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,2 @@
# DistMQ
Distributed Message Queue based on Raft
# 开发中,会定期更新。
# 欢迎感兴趣者一起加入开发。
# TODOS:
* consumer支持rebalance消费
* raft snapshot使用硬链接
* 支持多consumerGroup和多topic
* 支持broker failover
* broker消息存储引擎加入mmap功能
* ......
Distributed Message Queue based on Raft.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.api.BrokerAPI;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.zk.ZKClient;
import com.github.wenweihu86.distmq.client.zk.ZKData;
import com.github.wenweihu86.distmq.client.zk.MetadataManager;
import com.github.wenweihu86.distmq.client.zk.Metadata;
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.rpc.client.RPCClient;
Expand All @@ -23,12 +23,12 @@ public class BrokerAPIImpl implements BrokerAPI {

private RaftNode raftNode;
private BrokerStateMachine stateMachine;
private ZKClient zkClient;
private MetadataManager metadataManager;

public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine, ZKClient zkClient) {
public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine, MetadataManager metadataManager) {
this.raftNode = raftNode;
this.stateMachine = stateMachine;
this.zkClient = zkClient;
this.metadataManager = metadataManager;
}

@Override
Expand All @@ -38,44 +38,24 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe
baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_FAIL);

// 查询topic是否存在
ZKData zkData = ZKData.getInstance();
GlobalConf conf = GlobalConf.getInstance();
boolean topicExist = false;
boolean shardingValid = false;
zkData.getTopicLock().lock();
try {
Map<Integer, Integer> queueMap = zkData.getTopicMap().get(request.getTopic());
if (queueMap != null && queueMap.size() > 0) {
topicExist = true;
if (queueMap.get(request.getQueue()) == conf.getShardingId()) {
shardingValid = true;
}
}
} finally {
zkData.getTopicLock().unlock();
}

boolean topicExist = metadataManager.checkTopicExist(request.getTopic());
// 如果topic尚不存在,请求zookeeper读取
if (!topicExist) {
Map<Integer, Integer> queueMap = zkClient.readTopicInfo(request.getTopic());
zkData.getTopicLock().lock();
try {
if (!zkData.getTopicMap().containsKey(request.getTopic())) {
zkData.getTopicMap().put(request.getTopic(), queueMap);
}
queueMap = zkData.getTopicMap().get(request.getTopic());
if (queueMap.size() > 0) {
topicExist = true;
}
if (queueMap.get(request.getQueue()) == conf.getShardingId()) {
shardingValid = true;
}
} finally {
zkData.getTopicLock().unlock();
Map<Integer, Integer> queueMap = metadataManager.readTopicInfo(request.getTopic());
if (queueMap.size() > 0) {
topicExist = true;
}
metadataManager.updateTopicMap(request.getTopic(), queueMap);
}

// 验证queue存在,并且属于该sharding
boolean shardingValid = false;
GlobalConf conf = GlobalConf.getInstance();
Integer shardingId = metadataManager.getQueueSharding(request.getTopic(), request.getQueue());
if (shardingId != null && shardingId.equals(conf.getShardingId())) {
shardingValid = true;
}

if (!topicExist || !shardingValid) {
String message = "queue not exist or not be included by this sharding";
baseResBuilder.setResMsg(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.zk.ZKClient;
import com.github.wenweihu86.distmq.client.zk.MetadataManager;
import com.github.wenweihu86.distmq.client.zk.ZKConf;
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.RaftOptions;
Expand All @@ -28,7 +28,7 @@ public static void main(String[] args) {

// 初始化zookeeper
ZKConf zkConf = conf.getZkConf();
ZKClient zkClient = new ZKClient(zkConf);
MetadataManager metadataManager = new MetadataManager(zkConf);

// 初始化RPCServer
RPCServer server = new RPCServer(localServer.getEndPoint().getPort());
Expand All @@ -45,15 +45,15 @@ public static void main(String[] args) {
RaftClientService raftClientService = new RaftClientServiceImpl(raftNode);
server.registerService(raftClientService);
// 注册应用自己提供的服务
BrokerAPIImpl brokerAPI = new BrokerAPIImpl(raftNode, stateMachine, zkClient);
BrokerAPIImpl brokerAPI = new BrokerAPIImpl(raftNode, stateMachine, metadataManager);
server.registerService(brokerAPI);
// 启动RPCServer,初始化Raft节点
server.start();
raftNode.init();

// 订阅broker和topic的变化
zkClient.subscribeBroker();
zkClient.subscribeTopic();
metadataManager.subscribeBroker();
metadataManager.subscribeTopic();
// 等成为raft集群成员后,才能注册到zk
while (!ConfigurationUtils.containsServer(
raftNode.getConfiguration(), conf.getLocalServer().getServerId())) {
Expand All @@ -63,7 +63,7 @@ public static void main(String[] args) {
ex.printStackTrace();
}
}
zkClient.registerBroker(conf.getShardingId(),
metadataManager.registerBroker(conf.getShardingId(),
conf.getLocalServer().getEndPoint().getHost(),
conf.getLocalServer().getEndPoint().getPort());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.broker.log.LogManager;
import com.github.wenweihu86.distmq.broker.log.Segment;
import com.github.wenweihu86.distmq.broker.log.SegmentedLog;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.zk.ZKData;
import com.github.wenweihu86.raft.StateMachine;
import com.google.protobuf.ByteString;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Map;

/**
* Created by wenweihu86 on 2017/6/17.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import com.github.wenweihu86.distmq.client.BrokerClientManager;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.utils.JsonUtil;
import com.github.wenweihu86.distmq.client.zk.ZKClient;
import com.github.wenweihu86.distmq.client.zk.ZKData;
import com.github.wenweihu86.distmq.client.zk.MetadataManager;
import com.github.wenweihu86.distmq.client.zk.Metadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -18,37 +18,22 @@
public class Consumer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private ConsumerConfig config;
private ZKClient zkClient;
private MetadataManager metadataManager;
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);
private MessageListener listener;

public Consumer(ConsumerConfig config, MessageListener listener) {
this.config = config;
this.listener = listener;
BrokerClientManager.setRpcClientOptions(this.config.getRPCClientOptions());
zkClient = new ZKClient(config);
zkClient.registerConsumer(config.getConsumerGroup(), config.getConsumerId());
zkClient.subscribeConsumer(config.getConsumerGroup());
zkClient.subscribeBroker();
zkClient.subscribeTopic();
// 更新offset
Map<Integer, Long> queueOffsetMap = zkClient.readConsumerOffset(config.getConsumerGroup(), config.getTopic());
ZKData zkData = ZKData.getInstance();
Map<String, Map<Integer, Long>> topicOffsetMap
= zkData.getConsumerOffsetMap().get(config.getConsumerGroup());
if (topicOffsetMap == null) {
topicOffsetMap = new HashMap<>();
topicOffsetMap.put(config.getTopic(), queueOffsetMap);
zkData.getConsumerOffsetMap().put(config.getConsumerGroup(), topicOffsetMap);
} else {
Map<Integer, Long> oldQueueOffsetMap = topicOffsetMap.get(config.getTopic());
if (oldQueueOffsetMap == null) {
topicOffsetMap.put(config.getTopic(), queueOffsetMap);
} else {
oldQueueOffsetMap.putAll(queueOffsetMap);
}
}
LOG.info("new consumer offset={}", JsonUtil.toJson(zkData.getConsumerOffsetMap()));
metadataManager = new MetadataManager(config);
metadataManager.registerConsumer(config.getConsumerGroup(), config.getConsumerId());
metadataManager.updateConsumerIds(config.getConsumerGroup());
metadataManager.subscribeConsumer(config.getConsumerGroup());
metadataManager.subscribeBroker();
metadataManager.subscribeTopic();
// 从zk读取offset
metadataManager.readConsumerOffset(config.getConsumerGroup(), config.getTopic());
}

public void start() {
Expand All @@ -61,41 +46,20 @@ public void run() {
for (Map.Entry<Integer, Integer> entry : queueMap.entrySet()) {
Integer queueId = entry.getKey();
Integer shardingId = entry.getValue();
long offset;
// 获取offset
ZKData zkData = ZKData.getInstance();
zkData.getConsumerOffsetLock().lock();
try {
offset = zkData.getConsumerOffsetMap()
.get(config.getConsumerGroup())
.get(config.getTopic())
.get(queueId);
} catch (Exception ex) {
offset = 0;
} finally {
zkData.getConsumerOffsetLock().unlock();
}
long offset = metadataManager.getConsumerOffset(queueId);
List<String> brokers = metadataManager.getBrokerAddressList(shardingId);

int randIndex = ThreadLocalRandom.current().nextInt(0, brokers.size());
ConcurrentMap<String, BrokerClient> brokerClientMap
= BrokerClientManager.getInstance().getBrokerClientMap();
BrokerClient brokerClient = brokerClientMap.get(brokers.get(randIndex));

BrokerMessage.PullMessageRequest request = BrokerMessage.PullMessageRequest.newBuilder()
.setTopic(config.getTopic())
.setQueue(queueId)
.setMessageCount(config.getMaxMessageCountPerRequest())
.setOffset(offset)
.build();

List<String> brokers;
zkData.getBrokerLock().lock();
try {
brokers = zkData.getBrokerMap().get(shardingId);
} finally {
zkData.getBrokerLock().unlock();
}

int randIndex = ThreadLocalRandom.current().nextInt(0, brokers.size());
ConcurrentMap<String, BrokerClient> brokerClientMap
= BrokerClientManager.getInstance().getBrokerClientMap();
BrokerClient brokerClient = brokerClientMap.get(brokers.get(randIndex));

BrokerMessage.PullMessageResponse response = brokerClient.getBrokerAPI().pullMessage(request);
if (response == null || response.getBaseRes().getResCode() != BrokerMessage.ResCode.RES_CODE_SUCCESS) {
LOG.warn("pullMessage failed, topic={}, queue={}, offset={}, broker={}",
Expand All @@ -109,32 +73,8 @@ public void run() {
listener.consumeMessage(response.getContentsList());
BrokerMessage.MessageContent lastMessage = response.getContents(response.getContentsCount() - 1);
offset = lastMessage.getOffset() + lastMessage.getSize();
zkClient.updateConsumerOffset(config.getConsumerGroup(), config.getTopic(), queueId, offset);
// 更新offset
zkData.getConsumerOffsetLock().lock();
try {
Map<String, Map<Integer, Long>> topicOffsetMap
= zkData.getConsumerOffsetMap().get(config.getConsumerGroup());
if (topicOffsetMap == null) {
Map<Integer, Long> queueOffsetMap = new HashMap<>();
queueOffsetMap.put(queueId, offset);
topicOffsetMap = new HashMap<>();
topicOffsetMap.put(config.getTopic(), queueOffsetMap);
zkData.getConsumerOffsetMap().put(config.getConsumerGroup(), topicOffsetMap);
} else {
Map<Integer, Long> queueOffsetMap = topicOffsetMap.get(config.getTopic());
if (queueOffsetMap == null) {
queueOffsetMap = new HashMap<>();
queueOffsetMap.put(queueId, offset);
topicOffsetMap.put(config.getTopic(), queueOffsetMap);
} else {
queueOffsetMap.put(queueId, offset);
}
}
LOG.debug("new consumer offset={}", JsonUtil.toJson(zkData.getConsumerOffsetMap()));
} finally {
zkData.getConsumerOffsetLock().unlock();
}
metadataManager.updateConsumerOffset(
config.getConsumerGroup(), config.getTopic(), queueId, offset);
}
}
}
Expand All @@ -145,28 +85,12 @@ public void run() {
* @return key是queueId,value是shardingId
*/
private Map<Integer, Integer> getConsumedQueue() {
ZKData zkData = ZKData.getInstance();
// 获取所有queueMap
Map<Integer, Integer> queueMap = new HashMap<>();
zkData.getTopicLock().lock();
try {
if (zkData.getTopicMap().containsKey(config.getTopic())) {
queueMap.putAll(zkData.getTopicMap().get(config.getTopic()));
}
} finally {
zkData.getTopicLock().unlock();
}
Map<Integer, Integer> queueMap = metadataManager.getTopicQueueMap(config.getTopic());
Integer[] queueIds = queueMap.keySet().toArray(new Integer[0]);
Arrays.sort(queueIds);

// 获取所有consumer list
List<String> consumerIdList = new ArrayList<>();
zkData.getConsumerIdsLock().lock();
try {
consumerIdList.addAll(zkData.getConsumerIds());
} finally {
zkData.getConsumerIdsLock().unlock();
}
List<String> consumerIdList = metadataManager.getConsumerIds();

int queueSize = queueMap.size();
int consumerSize = consumerIdList.size();
Expand Down
Loading

0 comments on commit 0db889f

Please sign in to comment.