Skip to content

Commit

Permalink
fix some consumer bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 25, 2017
1 parent 7f1674e commit 2251ae7
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe
// 验证queue存在,并且属于该sharding
ZKData zkData = ZKData.getInstance();
GlobalConf conf = GlobalConf.getInstance();
zkData.getTopicLock().lock();
Map<String, Map<Integer, Integer>> topicMap = zkData.getTopicMap();
Map<Integer, Integer> queueMap = topicMap.get(request.getTopic());
// topic由producer提前创建完成,所以这里会校验不存在的话,直接返回失败
if (queueMap == null
|| !queueMap.containsKey(request.getQueue())
|| queueMap.get(request.getQueue()) != conf.getShardingId()) {
queueMap = zkClient.readTopicInfo(request.getTopic());

zkData.getTopicLock().lock();
try {
zkData.getTopicMap().put(request.getTopic(), queueMap);
Expand Down Expand Up @@ -77,7 +75,12 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe
RPCClient rpcClient = raftNode.getPeerMap().get(raftNode.getLeaderId()).getRpcClient();
BrokerAPI brokerAPI = RPCProxy.getProxy(rpcClient, BrokerAPI.class);
BrokerMessage.SendMessageResponse responseFromLeader = brokerAPI.sendMessage(request);
responseBuilder.mergeFrom(responseFromLeader);
if (responseFromLeader == null) {
baseResBuilder.setResMsg("leader timeout");
responseBuilder.setBaseRes(baseResBuilder);
} else {
responseBuilder.mergeFrom(responseFromLeader);
}
} else {
// 数据同步写入raft集群
byte[] data = request.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ public void readSnapshot(String snapshotDir) {
public void apply(byte[] dataBytes) {
try {
BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.parseFrom(dataBytes);
BrokerMessage.MessageContent.Builder message = BrokerMessage.MessageContent.newBuilder()
.setTopic(request.getTopic())
.setQueue(request.getQueue())
.setContent(request.getContent());
SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue());
segmentedLog.append(request.getContent().toByteArray());
segmentedLog.append(message);
} catch (Exception ex) {
LOG.warn("apply exception:", ex);
}
Expand All @@ -80,22 +84,14 @@ public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRe
SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue());
int readCount = 0;
long offset = request.getOffset();
if (offset == 0) {
offset = Segment.SEGMENT_HEADER_LENGTH;
}
while (readCount < request.getMessageCount()) {
byte[] messageBytes = segmentedLog.read(offset);
if (messageBytes == null) {
BrokerMessage.MessageContent message = segmentedLog.read(offset);
if (message == null) {
break;
}
BrokerMessage.MessageContent message = BrokerMessage.MessageContent.newBuilder()
.setTopic(request.getTopic())
.setQueue(request.getQueue())
.setOffset(offset)
.setContent(ByteString.copyFrom(messageBytes))
.build();
responseBuilder.addContents(message);
offset += messageBytes.length + Segment.MESSAGE_HEADER_LENGTH;
offset = message.getOffset() + message.getSize();
readCount++;
}
return responseBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@
import java.nio.channels.FileChannel;

/**
* queue中每个文件用segment表示,
* segment头部8字节固定是文件修改时间,
* 之所以保存在文件里,没用使用文件的stat信息,
* 是因为raft会每次启动会将snapshot文件拷贝到状态机。
* queue中每个文件用segment表示
* Created by wenweihu86 on 2017/6/19.
*/
public class Segment {
public static int SEGMENT_HEADER_LENGTH = Long.SIZE / Byte.SIZE;
public static int MESSAGE_HEADER_LENGTH = (Long.SIZE + Integer.SIZE) / Byte.SIZE;
private static final Logger LOG = LoggerFactory.getLogger(Segment.class);

Expand Down Expand Up @@ -76,42 +72,42 @@ public void close() {
}
}

public long append(byte[] messageContent) {
long offset = 0;
public boolean append(BrokerMessage.MessageContent.Builder messageBuilder) {
try {
int writeSize;
// TODO: 复用messageContent内存
ByteBuffer byteBuffer = ByteBuffer.allocate(Segment.SEGMENT_HEADER_LENGTH
+ Segment.MESSAGE_HEADER_LENGTH + + messageContent.length);
if (fileSize == 0) {
byteBuffer.putLong(System.currentTimeMillis());
byteBuffer.putLong(BrokerUtils.getCRC32(messageContent));
byteBuffer.putInt(messageContent.length);
byteBuffer.put(messageContent);
byteBuffer.flip();
writeSize = channel.write(byteBuffer);
channel.force(true);
offset = startOffset;
messageBuilder.setOffset(startOffset);
} else {
messageBuilder.setOffset(endOffset);
}
messageBuilder.setCreateTime(System.currentTimeMillis());
BrokerMessage.MessageContent message = messageBuilder.build();
byte[] messageBytes = message.toByteArray();
int totalSize = Segment.MESSAGE_HEADER_LENGTH + messageBytes.length;
ByteBuffer byteBuffer = ByteBuffer.allocate(totalSize);
byteBuffer.putLong(BrokerUtils.getCRC32(messageBytes));
byteBuffer.putInt(messageBytes.length);
byteBuffer.put(messageBytes);
byteBuffer.flip();
int writeSize = channel.write(byteBuffer);
channel.force(true);
if (writeSize != totalSize) {
LOG.warn("append message failed");
return false;
}
if (fileSize == 0) {
endOffset = startOffset + writeSize;
} else {
byteBuffer.putLong(BrokerUtils.getCRC32(messageContent));
byteBuffer.putInt(messageContent.length);
byteBuffer.put(messageContent);
byteBuffer.flip();
channel.position(endOffset);
writeSize = channel.write(byteBuffer);
channel.force(true);
offset = endOffset;
endOffset += writeSize;
}
fileSize += writeSize;
} catch (IOException ex) {
LOG.warn("append message exception:", ex);
return false;
}
return offset;
return true;
}

public byte[] read(long offset) {
public BrokerMessage.MessageContent read(long offset) {
if (offset >= startOffset + fileSize) {
LOG.warn("invalid offset={}", offset);
return null;
Expand All @@ -120,13 +116,14 @@ public byte[] read(long offset) {
channel.position(offset - startOffset);
ByteBuffer headerBuffer = ByteBuffer.allocate(MESSAGE_HEADER_LENGTH);
int readLen = channel.read(headerBuffer);
if (readLen < MESSAGE_HEADER_LENGTH) {
if (readLen != MESSAGE_HEADER_LENGTH) {
LOG.warn("read message error");
return null;
}
headerBuffer.flip();
long crc32 = headerBuffer.getLong();
int messageLen = headerBuffer.getInt();
LOG.info("messageLen={}", messageLen);
ByteBuffer messageContentBuffer = ByteBuffer.allocate(messageLen);
readLen = channel.read(messageContentBuffer);
if (readLen != messageLen) {
Expand All @@ -137,7 +134,11 @@ public byte[] read(long offset) {
LOG.warn("read message error: crc32 check failed");
return null;
}
return messageContentBuffer.array();
BrokerMessage.MessageContent message
= BrokerMessage.MessageContent.parseFrom(messageContentBuffer.array());
BrokerMessage.MessageContent result = BrokerMessage.MessageContent.newBuilder()
.mergeFrom(message).setSize(MESSAGE_HEADER_LENGTH + messageLen).build();
return result;
} catch (IOException ex) {
LOG.warn("read segment error, dir={}, file={}, offset={}",
dirName, fileName, offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public long getLastEndOffset() {
return lastSegment.getEndOffset();
}

public long append(byte[] messageContent) {
public boolean append(BrokerMessage.MessageContent.Builder message) {
boolean isNeedNewSegmentFile = false;
int segmentSize = startOffsetSegmentMap.size();
try {
Expand All @@ -53,7 +53,7 @@ public long append(byte[] messageContent) {
isNeedNewSegmentFile = true;
} else {
int maxSegmentSize = GlobalConf.getInstance().getMaxSegmentSize();
if (lastSegment.getFileSize() + messageContent.length > maxSegmentSize) {
if (lastSegment.getFileSize() + message.build().getSerializedSize() > maxSegmentSize) {
isNeedNewSegmentFile = true;
// 最后一个segment的文件close并改名
lastSegment.close();
Expand Down Expand Up @@ -89,13 +89,13 @@ public long append(byte[] messageContent) {
} else {
newSegment = startOffsetSegmentMap.lastEntry().getValue();
}
return newSegment.append(messageContent);
return newSegment.append(message);
} catch (IOException ex) {
throw new RuntimeException("meet exception, msg=" + ex.getMessage());
}
}

public byte[] read(long offset) {
public BrokerMessage.MessageContent read(long offset) {
Map.Entry<Long, Segment> entry = startOffsetSegmentMap.floorEntry(offset);
if (entry == null) {
LOG.warn("message not found, offset={}", offset);
Expand All @@ -116,8 +116,7 @@ private void readSegments() {
private void validateSegments() {
long lastEndOffset = 0;
for (Segment segment : startOffsetSegmentMap.values()) {
if (lastEndOffset > 0 && segment.getStartOffset()
!= lastEndOffset + Segment.SEGMENT_HEADER_LENGTH) {
if (lastEndOffset > 0 && segment.getStartOffset() != lastEndOffset) {
throw new RuntimeException("segment dir not valid:" + segmentDir);
}
lastEndOffset = segment.getEndOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/
public class CommonConfig extends ZKConf {
private int brokerConnectTimeoutMs = 200;
private int brokerReadTimeoutMs = 500;
private int brokerReadTimeoutMs = 1000;
private int brokerWriteTimeoutMs = 200;

public RPCClientOptions getRPCClientOptions() {
Expand Down
Loading

0 comments on commit 2251ae7

Please sign in to comment.