Skip to content

Commit

Permalink
add consumer client
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 24, 2017
1 parent 187cbd0 commit 84fba08
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.github.wenweihu86.distmq.client.consumer;

import com.github.wenweihu86.distmq.client.BrokerClient;
import com.github.wenweihu86.distmq.client.BrokerClientManager;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.zk.ZKClient;
import com.github.wenweihu86.distmq.client.zk.ZKData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;

/**
* Created by wenweihu86 on 2017/6/24.
*/
public class Consumer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private ConsumerConfig config;
private ZKClient zkClient;
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);
private MessageListener listener;

public Consumer(ConsumerConfig config, MessageListener listener) {
this.config = config;
this.listener = listener;
BrokerClientManager.setRpcClientOptions(this.config.getRPCClientOptions());
zkClient = new ZKClient(config);
zkClient.registerConsumer(config.getConsumerGroup(), config.getConsumerId());
zkClient.subscribeConsumer(config.getConsumerGroup());
zkClient.subscribeBroker();
zkClient.subscribeTopic();
this.timer.scheduleAtFixedRate(this, 1000, 5000, TimeUnit.MILLISECONDS);
}

@Override
public void run() {
Map<Integer, Integer> queueMap = getConsumedQueue();
for (Map.Entry<Integer, Integer> entry : queueMap.entrySet()) {
Integer queueId = entry.getKey();
Integer shardingId = entry.getValue();
BrokerMessage.PullMessageRequest request = BrokerMessage.PullMessageRequest.newBuilder()
.setTopic(config.getTopic())
.setQueue(queueId)
.setMessageCount(config.getMaxMessageCountPerRequest())
.setOffset(0) // TODO: offset从zk获取
.build();

List<String> brokers;
ZKData zkData = ZKData.getInstance();
zkData.getBrokerLock().lock();
try {
brokers = zkData.getBrokerMap().get(shardingId);
} finally {
zkData.getBrokerLock().unlock();
}

int randIndex = ThreadLocalRandom.current().nextInt(0, brokers.size());
ConcurrentMap<String, BrokerClient> brokerClientMap
= BrokerClientManager.getInstance().getBrokerClientMap();
BrokerClient brokerClient = brokerClientMap.get(brokers.get(randIndex));

BrokerMessage.PullMessageResponse response = brokerClient.getBrokerAPI().pullMessage(request);
if (response == null || response.getBaseRes().getResCode() != BrokerMessage.ResCode.RES_CODE_SUCCESS) {
LOG.warn("pullMessage failed, topic={}, queue={}, offset={}, broker={}",
request.getTopic(), request.getQueue(), request.getOffset(),
brokers.get(randIndex));
} else {
LOG.info("pullMessage success, topic={}, queue={}, offset={}, size={}",
request.getTopic(), request.getQueue(), request.getOffset(),
response.getContentsCount());
listener.consumeMessage(response.getContentsList());
}
}
}

/**
* 获取分配给本节点的queue,以及对应的broker sharding id
* @return key是queueId,value是shardingId
*/
private Map<Integer, Integer> getConsumedQueue() {
ZKData zkData = ZKData.getInstance();
// 获取所有queueMap
Map<Integer, Integer> queueMap = new HashMap<>();
zkData.getTopicLock().lock();
try {
if (zkData.getTopicMap().containsKey(config.getTopic())) {
queueMap.putAll(zkData.getTopicMap().get(config.getTopic()));
}
} finally {
zkData.getTopicLock().unlock();
}
Integer[] queueIds = queueMap.keySet().toArray(new Integer[0]);
Arrays.sort(queueIds);

// 获取所有consumer list
List<String> consumerIdList = new ArrayList<>();
zkData.getConsumerIdsLock().lock();
try {
consumerIdList.addAll(zkData.getConsumerIds());
} finally {
zkData.getConsumerIdsLock().unlock();
}

int queueSize = queueMap.size();
int consumerSize = consumerIdList.size();
int index = consumerIdList.indexOf(config.getConsumerId());
int mod = queueSize % consumerSize;

int averageSize;
if (queueSize <= consumerSize) {
averageSize = 1;
} else {
if (mod > 0 && index < mod) {
averageSize = queueSize / consumerSize + 1;
} else {
averageSize = queueSize / consumerSize;
}
}

int startIndex;
if (mod > 0 && index < mod) {
startIndex = index * averageSize;
} else {
startIndex = index * averageSize + mod;
}

Map<Integer, Integer> result = new HashMap<>();
for (int i = startIndex; i < startIndex + averageSize && i < queueSize; i++) {
result.put(queueIds[i], queueMap.get(queueIds[i]));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,46 @@

import com.github.wenweihu86.distmq.client.CommonConfig;

import java.util.UUID;

/**
* Created by wenweihu86 on 2017/6/24.
*/
public class ConsumerConfig extends CommonConfig {
private String consumerGroup;
private String topic;
private String consumerId = UUID.randomUUID().toString();
private int maxMessageCountPerRequest = 100;

public String getConsumerGroup() {
return consumerGroup;
}

public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getConsumerId() {
return consumerId;
}

public void setConsumerId(String consumerId) {
this.consumerId = consumerId;
}

public int getMaxMessageCountPerRequest() {
return maxMessageCountPerRequest;
}

public void setMaxMessageCountPerRequest(int maxMessageCountPerRequest) {
this.maxMessageCountPerRequest = maxMessageCountPerRequest;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.wenweihu86.distmq.client.consumer;

import com.github.wenweihu86.distmq.client.api.BrokerMessage;

import java.util.List;

/**
* Created by wenweihu86 on 2017/6/24.
*/
public interface MessageListener {

void consumeMessage(List<BrokerMessage.MessageContent> messages);
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,55 @@ public void subscribeTopic() {
}
}

public void registerConsumer(String consumerGroup, String consumerId) {
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/" + consumerId;
try {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "".getBytes());
} catch (Exception ex) {
LOG.warn("registerConsumer exception:", ex);
}
LOG.info("registerConsumer sucess, consumerGroup={}, consumerId={}", consumerGroup, consumerId);
}

public void subscribeConsumer(String consumerGroup) {
ZKData zkData = ZKData.getInstance();
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup;
try {
List<String> consumerIds = zkClient.getChildren().forPath(path);
zkData.setConsumerIds(consumerIds);
zkClient.getChildren()
.usingWatcher(new ConsumerWatcher(consumerGroup))
.forPath(path);
} catch (Exception ex) {
LOG.warn("subscribeConsumer exception:", ex);
}
}

private class ConsumerWatcher implements CuratorWatcher {
private String consumerGroup;

public ConsumerWatcher(String consumerGroup) {
this.consumerGroup = consumerGroup;
}

@Override
public void process(WatchedEvent event) throws Exception {
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup;
try {
List<String> consumerIds = zkClient.getChildren().forPath(path);
ZKData zkData = ZKData.getInstance();
zkData.setConsumerIds(consumerIds);
} catch (Exception ex) {
LOG.warn("subscribeConsumer exception:", ex);
}
}
}
}

// 监听所有broker的分片信息变化事件
private class BrokersWatcher implements CuratorWatcher {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.wenweihu86.distmq.client.zk;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -26,10 +27,13 @@ public static ZKData getInstance() {

// topic -> (queueId -> shardingId)
private Map<String, Map<Integer, Integer>> topicMap = new HashMap<>();

private Lock topicLock = new ReentrantLock();
private Condition topicCondition = topicLock.newCondition();

// consumer ids of group
private List<String> consumerIds = new ArrayList<>();
private Lock consumerIdsLock = new ReentrantLock();

public static void setInstance(ZKData instance) {
ZKData.instance = instance;
}
Expand Down Expand Up @@ -61,4 +65,20 @@ public Lock getTopicLock() {
public Condition getTopicCondition() {
return topicCondition;
}

public List<String> getConsumerIds() {
return consumerIds;
}

public void setConsumerIds(List<String> consumerIds) {
this.consumerIds = consumerIds;
}

public Lock getConsumerIdsLock() {
return consumerIdsLock;
}

public void setConsumerIdsLock(Lock consumerIdsLock) {
this.consumerIdsLock = consumerIdsLock;
}
}

0 comments on commit 84fba08

Please sign in to comment.