Skip to content

Commit

Permalink
fix consumer offset bug
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 25, 2017
1 parent fe2b0cd commit 087fd30
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public boolean append(BrokerMessage.MessageContent.Builder messageBuilder) {

public BrokerMessage.MessageContent read(long offset) {
if (offset >= startOffset + fileSize) {
LOG.warn("invalid offset={}", offset);
LOG.debug("invalid offset={}", offset);
return null;
}
try {
Expand Down
5 changes: 5 additions & 0 deletions distmq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.wenweihu86.distmq.client.BrokerClient;
import com.github.wenweihu86.distmq.client.BrokerClientManager;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.utils.JsonUtil;
import com.github.wenweihu86.distmq.client.zk.ZKClient;
import com.github.wenweihu86.distmq.client.zk.ZKData;
import org.slf4j.Logger;
Expand Down Expand Up @@ -31,17 +32,23 @@ public Consumer(ConsumerConfig config, MessageListener listener) {
zkClient.subscribeBroker();
zkClient.subscribeTopic();
// 更新offset
Map<Integer, Long> queueOffsetMap = zkClient.readConsumerOffset(config.getConsumerGroup(), config.getTopic());
ZKData zkData = ZKData.getInstance();
Map<String, Map<Integer, Long>> topicOffsetMap = zkData.getConsumerOffsetMap().get(config.getConsumerGroup());
Map<String, Map<Integer, Long>> topicOffsetMap
= zkData.getConsumerOffsetMap().get(config.getConsumerGroup());
if (topicOffsetMap == null) {
topicOffsetMap = new HashMap<>();
topicOffsetMap.put(config.getTopic(), queueOffsetMap);
zkData.getConsumerOffsetMap().put(config.getConsumerGroup(), topicOffsetMap);
} else {
Map<Integer, Long> oldQueueOffsetMap = topicOffsetMap.get(config.getTopic());
if (oldQueueOffsetMap == null) {
topicOffsetMap.put(config.getTopic(), queueOffsetMap);
} else {
oldQueueOffsetMap.putAll(queueOffsetMap);
}
}
Map<Integer, Long> queueOffsetMap = topicOffsetMap.get(config.getTopic());
if (queueOffsetMap == null) {
queueOffsetMap = new HashMap<>();
}
queueOffsetMap.putAll(zkClient.readConsumerOffset(config.getConsumerGroup(), config.getTopic()));
LOG.info("new consumer offset={}", JsonUtil.toJson(zkData.getConsumerOffsetMap()));
}

public void start() {
Expand Down Expand Up @@ -109,14 +116,22 @@ public void run() {
Map<String, Map<Integer, Long>> topicOffsetMap
= zkData.getConsumerOffsetMap().get(config.getConsumerGroup());
if (topicOffsetMap == null) {
Map<Integer, Long> queueOffsetMap = new HashMap<>();
queueOffsetMap.put(queueId, offset);
topicOffsetMap = new HashMap<>();
topicOffsetMap.put(config.getTopic(), queueOffsetMap);
zkData.getConsumerOffsetMap().put(config.getConsumerGroup(), topicOffsetMap);
} else {
Map<Integer, Long> queueOffsetMap = topicOffsetMap.get(config.getTopic());
if (queueOffsetMap == null) {
queueOffsetMap = new HashMap<>();
queueOffsetMap.put(queueId, offset);
topicOffsetMap.put(config.getTopic(), queueOffsetMap);
} else {
queueOffsetMap.put(queueId, offset);
}
}
Map<Integer, Long> queueOffsetMap = topicOffsetMap.get(config.getTopic());
if (queueOffsetMap == null) {
queueOffsetMap = new HashMap<>();
}
queueOffsetMap.put(queueId, offset);
LOG.debug("new consumer offset={}", JsonUtil.toJson(zkData.getConsumerOffsetMap()));
} finally {
zkData.getConsumerOffsetLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.github.wenweihu86.distmq.client.utils;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

public class JsonUtil {

private static ObjectMapper objectMapper = new ObjectMapper();

static {
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}

public static JsonNode readTree(String jsonString) {
JsonNode node = null;
try {
node = objectMapper.readTree(jsonString);
} catch (IOException e) {
e.printStackTrace();
}
return node;
}

public static <T> String toJson(T object) {
String s = "";
try {
s = objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return s;
}

public static String toJson(JsonNode node) {
String s = "";
try {
s = objectMapper.writeValueAsString(node);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return s;
}

public static <T> T fromJson(String jsonString, TypeReference<T> tr) {
try {
return (T) objectMapper.readValue(jsonString, tr);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

public static <T> T fromJson(String jsonString, Class<T> classOfT) {
try {
return (T) objectMapper.readValue(jsonString, classOfT);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

}

0 comments on commit 087fd30

Please sign in to comment.