Skip to content

Commit

Permalink
fix zookeeper subscribe and broker log's lock
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 26, 2017
1 parent b75686f commit 2281c76
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,54 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe
BrokerMessage.BaseResponse.Builder baseResBuilder = BrokerMessage.BaseResponse.newBuilder();
baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_FAIL);

// 验证queue存在,并且属于该sharding
// 查询topic是否存在
ZKData zkData = ZKData.getInstance();
GlobalConf conf = GlobalConf.getInstance();
Map<String, Map<Integer, Integer>> topicMap = zkData.getTopicMap();
Map<Integer, Integer> queueMap = topicMap.get(request.getTopic());
// topic由producer提前创建完成,所以这里会校验不存在的话,直接返回失败
if (queueMap == null
|| !queueMap.containsKey(request.getQueue())
|| queueMap.get(request.getQueue()) != conf.getShardingId()) {
queueMap = zkClient.readTopicInfo(request.getTopic());
boolean topicExist = false;
boolean shardingValid = false;
zkData.getTopicLock().lock();
try {
Map<Integer, Integer> queueMap = zkData.getTopicMap().get(request.getTopic());
if (queueMap != null && queueMap.size() > 0) {
topicExist = true;
if (queueMap.get(request.getQueue()) == conf.getShardingId()) {
shardingValid = true;
}
}
} finally {
zkData.getTopicLock().unlock();
}

// 如果topic尚不存在,请求zookeeper读取
if (!topicExist) {
Map<Integer, Integer> queueMap = zkClient.readTopicInfo(request.getTopic());
zkData.getTopicLock().lock();
try {
zkData.getTopicMap().put(request.getTopic(), queueMap);
if (!zkData.getTopicMap().containsKey(request.getTopic())) {
zkData.getTopicMap().put(request.getTopic(), queueMap);
}
queueMap = zkData.getTopicMap().get(request.getTopic());
if (queueMap.size() > 0) {
topicExist = true;
}
if (queueMap.get(request.getQueue()) == conf.getShardingId()) {
shardingValid = true;
}
} finally {
zkData.getTopicLock().unlock();
}
if (queueMap == null
|| !queueMap.containsKey(request.getQueue())
|| queueMap.get(request.getQueue()) != conf.getShardingId()) {
String message = "queue not exist or not be included by this sharding";
baseResBuilder.setResMsg(message);
responseBuilder.setBaseRes(baseResBuilder.build());
LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}",
request.getTopic(), request.getQueue(),
responseBuilder.getBaseRes().getResCode(),
responseBuilder.getBaseRes().getResMsg());
return responseBuilder.build();
}
}

// 验证queue存在,并且属于该sharding
if (!topicExist || !shardingValid) {
String message = "queue not exist or not be included by this sharding";
baseResBuilder.setResMsg(message);
responseBuilder.setBaseRes(baseResBuilder.build());
LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}",
request.getTopic(), request.getQueue(),
responseBuilder.getBaseRes().getResCode(),
responseBuilder.getBaseRes().getResMsg());
return responseBuilder.build();
}

// 如果自己不是leader,将写请求转发给leader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public BrokerMessage.MessageContent read(long offset) {
headerBuffer.flip();
long crc32 = headerBuffer.getLong();
int messageLen = headerBuffer.getInt();
LOG.info("messageLen={}", messageLen);
LOG.debug("messageLen={}", messageLen);
ByteBuffer messageContentBuffer = ByteBuffer.allocate(messageLen);
readLen = channel.read(messageContentBuffer);
if (readLen != messageLen) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by wenweihu86 on 2017/6/19.
Expand All @@ -20,8 +22,7 @@ public class SegmentedLog {

private String segmentDir;
private TreeMap<Long, Segment> startOffsetSegmentMap = new TreeMap<>();
// segment log占用的内存大小,用于判断是否需要做snapshot
private volatile long totalSize;
private Lock lock = new ReentrantLock();

public SegmentedLog(String segmentDir) {
this.segmentDir = segmentDir;
Expand All @@ -33,17 +34,10 @@ public SegmentedLog(String segmentDir) {
validateSegments();
}

public long getLastEndOffset() {
if (startOffsetSegmentMap.size() == 0) {
return 0;
}
Segment lastSegment = startOffsetSegmentMap.lastEntry().getValue();
return lastSegment.getEndOffset();
}

public boolean append(BrokerMessage.MessageContent.Builder message) {
boolean isNeedNewSegmentFile = false;
int segmentSize = startOffsetSegmentMap.size();
lock.lock();
try {
if (segmentSize == 0) {
isNeedNewSegmentFile = true;
Expand Down Expand Up @@ -92,17 +86,24 @@ public boolean append(BrokerMessage.MessageContent.Builder message) {
return newSegment.append(message);
} catch (IOException ex) {
throw new RuntimeException("meet exception, msg=" + ex.getMessage());
} finally {
lock.unlock();
}
}

public BrokerMessage.MessageContent read(long offset) {
Map.Entry<Long, Segment> entry = startOffsetSegmentMap.floorEntry(offset);
if (entry == null) {
LOG.warn("message not found, offset={}", offset);
return null;
lock.lock();
try {
Map.Entry<Long, Segment> entry = startOffsetSegmentMap.floorEntry(offset);
if (entry == null) {
LOG.warn("message not found, offset={}", offset);
return null;
}
Segment segment = entry.getValue();
return segment.read(offset);
} finally {
lock.unlock();
}
Segment segment = entry.getValue();
return segment.read(offset);
}

private void readSegments() {
Expand All @@ -123,4 +124,12 @@ private void validateSegments() {
}
}

private long getLastEndOffset() {
if (startOffsetSegmentMap.size() == 0) {
return 0;
}
Segment lastSegment = startOffsetSegmentMap.lastEntry().getValue();
return lastSegment.getEndOffset();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,39 @@ public Producer(ProducerConfig config) {

public boolean send(String topic, byte[] messageBytes) {
ZKData zkData = ZKData.getInstance();
Integer queueId;
Integer shardingId;
if (zkData.getTopicMap().get(topic) == null) {
// 查询topic是否存在
boolean topicExist = false;
zkData.getTopicLock().lock();
try {
Map<Integer, Integer> queueMap = zkData.getTopicMap().get(topic);
if (queueMap != null && queueMap.size() > 0) {
topicExist = true;
}
} finally {
zkData.getTopicLock().unlock();
}

// 如果topic尚不存在,则创建
if (!topicExist) {
zkClient.registerTopic(topic, config.getQueueCountPerTopic());
Map<Integer, Integer> queueMap = zkClient.readTopicInfo(topic);
if (queueMap.size() != config.getQueueCountPerTopic()) {
LOG.warn("create topic failed, topic={}", topic);
return false;
}
zkData.getTopicLock().lock();
try {
while (zkData.getTopicMap().get(topic) == null
|| zkData.getTopicMap().get(topic).size() != config.getQueueCountPerTopic()) {
zkData.getTopicCondition().awaitUninterruptibly();
if (!zkData.getTopicMap().containsKey(topic)) {
zkData.getTopicMap().put(topic, queueMap);
}
} finally {
zkData.getTopicLock().unlock();
}
}

// 获取topic的queueId和对应的shardingId
Integer queueId;
Integer shardingId;
zkData.getTopicLock().lock();
try {
Map<Integer, Integer> queueMap = zkData.getTopicMap().get(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,32 @@ public synchronized void registerTopic(String topic, int queueNum) {
.creatingParentsIfNeeded()
.forPath(queuePath, queueData);
} catch (Exception ex) {
LOG.warn("registerTopic exception:", ex);
LOG.warn("registerTopic failed, queue={}", queueId);
}
}
}

public Map<Integer, Integer> readTopic(String topic) {
Map<Integer, Integer> queueMap = new HashMap<>();
ZKData zkData = ZKData.getInstance();
String path = zkConf.getBasePath() + "/topics/" + topic;
try {
List<String> queues = zkClient.getChildren().forPath(path);
for (String queue : queues) {
String queuePath = path + "/" + queue;
byte[] dataBytes = zkClient.getData().forPath(queuePath);
if (dataBytes != null) {
Integer shardingId = Integer.valueOf(new String(dataBytes));
queueMap.put(Integer.valueOf(queue), shardingId);
}
}
LOG.info("readTopic success, topic={}", topic);
} catch (Exception ex) {
LOG.warn("readTopic exception:", ex);
}
return queueMap;
}

// 启动时调用,所以不用加锁
public void subscribeTopic() {
ZKData zkData = ZKData.getInstance();
Expand Down Expand Up @@ -278,8 +299,9 @@ public ConsumerWatcher(String consumerGroup) {

@Override
public void process(WatchedEvent event) throws Exception {
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/ids";
LOG.info("get zookeeper notification for path={}", path);
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
String path = zkConf.getBasePath() + "/consumers/" + consumerGroup + "/ids";
try {
List<String> consumerIds = zkClient.getChildren().forPath(path);
ZKData zkData = ZKData.getInstance();
Expand All @@ -288,6 +310,9 @@ public void process(WatchedEvent event) throws Exception {
LOG.warn("subscribeConsumer exception:", ex);
}
}
zkClient.getChildren()
.usingWatcher(new ConsumerWatcher(consumerGroup))
.forPath(path);
}
}

Expand All @@ -296,8 +321,9 @@ private class BrokersWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent event) throws Exception {
ZKData zkData = ZKData.getInstance();
String brokerPath = zkConf.getBasePath() + "/brokers";
LOG.info("get zookeeper notification for path={}", brokerPath);
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
String brokerPath = zkConf.getBasePath() + "/brokers";
List<String> newShardings = zkClient.getChildren().forPath(brokerPath);

List<String> oldShardings = new ArrayList<>();
Expand Down Expand Up @@ -354,6 +380,9 @@ public void process(WatchedEvent event) throws Exception {
}
}
}
zkClient.getChildren()
.usingWatcher(new BrokersWatcher())
.forPath(brokerPath);
}
}

Expand All @@ -368,8 +397,9 @@ public BrokerShardingWather(int shardingId) {
@Override
public void process(WatchedEvent event) throws Exception {
ZKData zkData = ZKData.getInstance();
String shardingPath = zkConf.getBasePath() + "/brokers/" + shardingId;
LOG.info("get zookeeper notification for path={}", shardingPath);
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
String shardingPath = zkConf.getBasePath() + "/brokers/" + shardingId;
List<String> newBrokerAddressList = zkClient.getChildren().forPath(shardingPath);
List<String> oldBrokerAddressList;
zkData.getBrokerLock().lock();
Expand Down Expand Up @@ -414,6 +444,9 @@ public void process(WatchedEvent event) throws Exception {
}
}
}
zkClient.getChildren()
.usingWatcher(new BrokerShardingWather(shardingId))
.forPath(shardingPath);
}
}

Expand All @@ -422,9 +455,9 @@ private class TopicsWather implements CuratorWatcher {
@Override
public void process(WatchedEvent event) throws Exception {
ZKData zkData = ZKData.getInstance();
String topicParentPath = zkConf.getBasePath() + "/topics";
LOG.info("get zookeeper notification for path={}", topicParentPath);
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
String topicParentPath = zkConf.getBasePath() + "/topics";
LOG.info("get zookeeper notification for path={}", topicParentPath);
List<String> newTopics = zkClient.getChildren().forPath(topicParentPath);
List<String> oldTopics;
zkData.getTopicLock().lockInterruptibly();
Expand All @@ -437,8 +470,8 @@ public void process(WatchedEvent event) throws Exception {
Collection<String> deletedTopics = CollectionUtils.subtract(oldTopics, newTopics);
for (String topic : addedTopics) {
String topicPath = topicParentPath + "/" + topic;
zkClient.getChildren().usingWatcher(
new TopicWatcher(topic))
zkClient.getChildren()
.usingWatcher(new TopicWatcher(topic))
.forPath(topicPath);
Map<Integer, Integer> queueMap = readTopicInfo(topic);

Expand All @@ -455,12 +488,14 @@ public void process(WatchedEvent event) throws Exception {
try {
for (String topic : deletedTopics) {
zkData.getTopicMap().remove(topic);
// TODO: is need remove watcher?
}
} finally {
zkData.getTopicLock().unlock();
}
}
zkClient.getChildren()
.usingWatcher(new TopicsWather())
.forPath(topicParentPath);
}
}

Expand All @@ -475,9 +510,9 @@ public TopicWatcher(String topic) {
@Override
public void process(WatchedEvent event) throws Exception {
ZKData zkData = ZKData.getInstance();
String topicPath = zkConf.getBasePath() + "/topics/" + topic;
LOG.info("get zookeeper notification for path={}", topicPath);
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
String topicPath = zkConf.getBasePath() + "/topics/" + topic;
LOG.info("get zookeeper notification for path={}", topicPath);
List<String> newQueues = zkClient.getChildren().forPath(topicPath);
List<Integer> newQueueIds = new ArrayList<>();
for (String queue : newQueues) {
Expand Down Expand Up @@ -517,6 +552,9 @@ public void process(WatchedEvent event) throws Exception {
zkData.getTopicLock().unlock();
}
}
zkClient.getChildren()
.usingWatcher(new TopicWatcher(topic))
.forPath(topicPath);
}
}

Expand Down

0 comments on commit 2281c76

Please sign in to comment.