Skip to content

Commit

Permalink
read/write consumer offset on zookeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 24, 2017
1 parent 84fba08 commit 7f3c8cb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class Consumer implements Runnable {
private ZKClient zkClient;
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);
private MessageListener listener;
private long offset;

public Consumer(ConsumerConfig config, MessageListener listener) {
this.config = config;
Expand All @@ -30,6 +31,7 @@ public Consumer(ConsumerConfig config, MessageListener listener) {
zkClient.subscribeConsumer(config.getConsumerGroup());
zkClient.subscribeBroker();
zkClient.subscribeTopic();
this.offset = zkClient.readConsumerOffset(config.getConsumerGroup(), config.getTopic());
this.timer.scheduleAtFixedRate(this, 1000, 5000, TimeUnit.MILLISECONDS);
}

Expand All @@ -43,7 +45,7 @@ public void run() {
.setTopic(config.getTopic())
.setQueue(queueId)
.setMessageCount(config.getMaxMessageCountPerRequest())
.setOffset(0) // TODO: offset从zk获取
.setOffset(offset)
.build();

List<String> brokers;
Expand All @@ -69,7 +71,12 @@ public void run() {
LOG.info("pullMessage success, topic={}, queue={}, offset={}, size={}",
request.getTopic(), request.getQueue(), request.getOffset(),
response.getContentsCount());
listener.consumeMessage(response.getContentsList());
if (response.getContentsCount() > 0) {
listener.consumeMessage(response.getContentsList());
BrokerMessage.MessageContent lastMessage = response.getContents(response.getContentsCount() - 1);
offset = lastMessage.getOffset() + 1;
zkClient.updateConsumerOffset(config.getConsumerGroup(), config.getTopic(), offset);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void subscribeTopic() {
}

public void registerConsumer(String consumerGroup, String consumerId) {
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/" + consumerId;
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/ids/" + consumerId;
try {
zkClient.create()
.creatingParentsIfNeeded()
Expand All @@ -178,7 +178,7 @@ public void registerConsumer(String consumerGroup, String consumerId) {

public void subscribeConsumer(String consumerGroup) {
ZKData zkData = ZKData.getInstance();
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup;
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/ids";
try {
List<String> consumerIds = zkClient.getChildren().forPath(path);
zkData.setConsumerIds(consumerIds);
Expand All @@ -190,6 +190,30 @@ public void subscribeConsumer(String consumerGroup) {
}
}

public long readConsumerOffset(String consumerGroup, String topic) {
long offset = 0;
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/offsets/" + topic;
try {
byte[] dataBytes = zkClient.getData().forPath(path);
if (dataBytes != null) {
offset = Long.valueOf(new String(dataBytes));
}
} catch (Exception ex) {
LOG.warn("readConsumerOffset exception:", ex);
}
return offset;
}

public void updateConsumerOffset(String consumerGroup, String topic, long offset) {
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/offsets/" + topic;
try {
byte[] dataBytes = String.valueOf(offset).getBytes();
zkClient.setData().forPath(path, dataBytes);
} catch (Exception ex) {
LOG.warn("updateConsumerOffset exception:", ex);
}
}

private class ConsumerWatcher implements CuratorWatcher {
private String consumerGroup;

Expand All @@ -200,7 +224,7 @@ public ConsumerWatcher(String consumerGroup) {
@Override
public void process(WatchedEvent event) throws Exception {
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup;
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/ids";
try {
List<String> consumerIds = zkClient.getChildren().forPath(path);
ZKData zkData = ZKData.getInstance();
Expand Down

0 comments on commit 7f3c8cb

Please sign in to comment.