Skip to content

Commit

Permalink
add example and fix some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 25, 2017
1 parent 3d12a35 commit 7f1674e
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.api.BrokerAPI;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.zk.ZKClient;
import com.github.wenweihu86.distmq.client.zk.ZKData;
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.proto.RaftMessage;
Expand All @@ -22,42 +23,50 @@ public class BrokerAPIImpl implements BrokerAPI {

private RaftNode raftNode;
private BrokerStateMachine stateMachine;
private ZKClient zkClient;

public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine) {
public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine, ZKClient zkClient) {
this.raftNode = raftNode;
this.stateMachine = stateMachine;
this.zkClient = zkClient;
}

@Override
public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRequest request) {
BrokerMessage.SendMessageResponse.Builder responseBuilder = BrokerMessage.SendMessageResponse.newBuilder();
BrokerMessage.BaseResponse.Builder baseResBuilder = BrokerMessage.BaseResponse.newBuilder();
baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_FAIL);

// 验证queue存在,并且属于该sharding
ZKData zkData = ZKData.getInstance();
GlobalConf conf = GlobalConf.getInstance();
zkData.getTopicLock().lock();
Map<String, Map<Integer, Integer>> topicMap = zkData.getTopicMap();
Map<Integer, Integer> queueMap = topicMap.get(request.getTopic());
// topic由producer提前创建完成,所以这里会校验不存在的话,直接返回失败
if (queueMap == null) {
String message = "topic is not exist";
baseResBuilder.setResMsg(message);
responseBuilder.setBaseRes(baseResBuilder.build());
LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}",
request.getTopic(), request.getQueue(),
responseBuilder.getBaseRes().getResCode(),
responseBuilder.getBaseRes().getResMsg());
return responseBuilder.build();
}
GlobalConf conf = GlobalConf.getInstance();
Integer shardingId = queueMap.get(request.getQueue());
if (shardingId == null || shardingId != conf.getShardingId()) {
String message = "queue not exist or not be included by this sharding";
baseResBuilder.setResMsg(message);
responseBuilder.setBaseRes(baseResBuilder.build());
LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}",
request.getTopic(), request.getQueue(),
responseBuilder.getBaseRes().getResCode(),
responseBuilder.getBaseRes().getResMsg());
return responseBuilder.build();
if (queueMap == null
|| !queueMap.containsKey(request.getQueue())
|| queueMap.get(request.getQueue()) != conf.getShardingId()) {
queueMap = zkClient.readTopicInfo(request.getTopic());

zkData.getTopicLock().lock();
try {
zkData.getTopicMap().put(request.getTopic(), queueMap);
} finally {
zkData.getTopicLock().unlock();
}
if (queueMap == null
|| !queueMap.containsKey(request.getQueue())
|| queueMap.get(request.getQueue()) != conf.getShardingId()) {
String message = "queue not exist or not be included by this sharding";
baseResBuilder.setResMsg(message);
responseBuilder.setBaseRes(baseResBuilder.build());
LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}",
request.getTopic(), request.getQueue(),
responseBuilder.getBaseRes().getResCode(),
responseBuilder.getBaseRes().getResMsg());
return responseBuilder.build();
}
}

// 如果自己不是leader,将写请求转发给leader
Expand Down Expand Up @@ -91,7 +100,7 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe
public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRequest request) {
BrokerMessage.PullMessageResponse response = stateMachine.pullMessage(request);
LOG.info("pullMessage request, topic={}, queue={}, "
+ "resCode, resSize={}",
+ "resCode={}, resSize={}",
request.getTopic(), request.getQueue(),
response.getBaseRes().getResCode(),
response.getContentsCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public static void main(String[] args) {
List<RaftMessage.Server> servers = conf.getServers();
String dataDir = conf.getDataDir();

// 初始化zookeeper
ZKConf zkConf = conf.getZkConf();
ZKClient zkClient = new ZKClient(zkConf);

// 初始化RPCServer
RPCServer server = new RPCServer(localServer.getEndPoint().getPort());
// 应用状态机
Expand All @@ -41,19 +45,17 @@ public static void main(String[] args) {
RaftClientService raftClientService = new RaftClientServiceImpl(raftNode);
server.registerService(raftClientService);
// 注册应用自己提供的服务
BrokerAPIImpl brokerAPI = new BrokerAPIImpl(raftNode, stateMachine);
BrokerAPIImpl brokerAPI = new BrokerAPIImpl(raftNode, stateMachine, zkClient);
server.registerService(brokerAPI);
// 启动RPCServer,初始化Raft节点
server.start();
raftNode.init();
// 注册zk
ZKConf zkConf = conf.getZkConf();
ZKClient zkClient = new ZKClient(zkConf);

// 订阅broker和topic的变化
zkClient.subscribeBroker();
zkClient.subscribeTopic();
// 等成为raft集群成员后,才能注册到zk

while (ConfigurationUtils.containsServer(
while (!ConfigurationUtils.containsServer(
raftNode.getConfiguration(), conf.getLocalServer().getServerId())) {
try {
Thread.sleep(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ private List<RaftMessage.Server> readServers() {
}

private ZKConf readZKConf() {
Toml zookeeperToml = toml.getTable("zookeeper");
zkConf = new ZKConf();
zkConf.setServers(toml.getString("servers"));
zkConf.setConnectTimeoutMs(toml.getLong("connect_timeout_ms").intValue());
zkConf.setSessionTimeoutMs(toml.getLong("session_timeout_ms").intValue());
zkConf.setRetryCount(toml.getLong("retry_count").intValue());
zkConf.setRetryIntervalMs(toml.getLong("retry_interval_ms").intValue());
zkConf.setBasePath(toml.getString("base_path"));
zkConf.setServers(zookeeperToml.getString("servers"));
zkConf.setConnectTimeoutMs(zookeeperToml.getLong("connect_timeout_ms").intValue());
zkConf.setSessionTimeoutMs(zookeeperToml.getLong("session_timeout_ms").intValue());
zkConf.setRetryCount(zookeeperToml.getLong("retry_count").intValue());
zkConf.setRetryIntervalMs(zookeeperToml.getLong("retry_interval_ms").intValue());
zkConf.setBasePath(zookeeperToml.getString("base_path"));
return zkConf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,19 @@ public long append(byte[] messageContent) {
byteBuffer.putLong(BrokerUtils.getCRC32(messageContent));
byteBuffer.putInt(messageContent.length);
byteBuffer.put(messageContent);
byteBuffer.flip();
writeSize = channel.write(byteBuffer);
channel.force(true);
offset = startOffset;
endOffset = startOffset + writeSize;
} else {
byteBuffer.putLong(BrokerUtils.getCRC32(messageContent));
byteBuffer.putInt(messageContent.length);
byteBuffer.put(messageContent);
byteBuffer.flip();
channel.position(endOffset);
writeSize = channel.write(byteBuffer);
channel.force(true);
offset = endOffset;
endOffset += writeSize;
}
Expand All @@ -119,11 +124,12 @@ public byte[] read(long offset) {
LOG.warn("read message error");
return null;
}
headerBuffer.flip();
long crc32 = headerBuffer.getLong();
int messageLen = headerBuffer.getInt();
ByteBuffer messageContentBuffer = ByteBuffer.allocate(messageLen);
readLen = channel.read(messageContentBuffer);
if (readLen < messageLen) {
if (readLen != messageLen) {
LOG.warn("read message error");
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,36 +55,37 @@ public long append(byte[] messageContent) {
int maxSegmentSize = GlobalConf.getInstance().getMaxSegmentSize();
if (lastSegment.getFileSize() + messageContent.length > maxSegmentSize) {
isNeedNewSegmentFile = true;
// 最后一个segment的文件close并改名
lastSegment.close();
lastSegment.setCanWrite(false);
String newFileName = String.format("%020d-%020d",
lastSegment.getStartOffset(), lastSegment.getEndOffset());
String newFullFileName = segmentDir + File.separator + newFileName;
File newFile = new File(newFullFileName);
newFile.createNewFile();
String oldFullFileName = segmentDir + File.separator + lastSegment.getFileName();
File oldFile = new File(oldFullFileName);
oldFile.renameTo(newFile);
lastSegment.setFileName(newFileName);
lastSegment.setRandomAccessFile(RaftFileUtils.openFile(segmentDir, newFileName, "r"));
lastSegment.setChannel(lastSegment.getRandomAccessFile().getChannel());
}
// 最后一个segment的文件close并改名
lastSegment.close();
lastSegment.setCanWrite(false);
String newFileName = String.format("%020d-%020d",
lastSegment.getStartOffset(), lastSegment.getEndOffset());
String newFullFileName = segmentDir + File.separator + newFileName;
File newFile = new File(newFullFileName);
newFile.createNewFile();
String oldFullFileName = segmentDir + File.separator + lastSegment.getFileName();
File oldFile = new File(oldFullFileName);
oldFile.renameTo(newFile);
lastSegment.setFileName(newFileName);
lastSegment.setRandomAccessFile(RaftFileUtils.openFile(segmentDir, newFileName, "r"));
lastSegment.setChannel(lastSegment.getRandomAccessFile().getChannel());
}
}

Segment newSegment;
// 新建segment文件
if (isNeedNewSegmentFile) {
// open new segment file
long newStartOffset = getLastEndOffset() + Segment.SEGMENT_HEADER_LENGTH;
long newStartOffset = getLastEndOffset();
String newSegmentFileName = String.format("open-%d", newStartOffset);
String newFullFileName = segmentDir + File.separator + newSegmentFileName;
File newSegmentFile = new File(newFullFileName);
if (!newSegmentFile.exists()) {
newSegmentFile.createNewFile();
}
newSegment = new Segment(segmentDir, newSegmentFileName);
startOffsetSegmentMap.put(newSegment.getStartOffset(), newSegment);
} else {
newSegment = startOffsetSegmentMap.lastEntry().getValue();
}
Expand Down
2 changes: 1 addition & 1 deletion distmq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>3.3.0</version>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ public Consumer(ConsumerConfig config, MessageListener listener) {
zkClient.subscribeBroker();
zkClient.subscribeTopic();
this.offset = zkClient.readConsumerOffset(config.getConsumerGroup(), config.getTopic());
this.timer.scheduleAtFixedRate(this, 1000, 5000, TimeUnit.MILLISECONDS);
}

public void start() {
timer.scheduleAtFixedRate(this, 1000, 5000, TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,14 @@ public Producer(ProducerConfig config) {

public boolean send(String topic, byte[] messageBytes) {
ZKData zkData = ZKData.getInstance();
Map<Integer, Integer> queueMap;
zkData.getTopicLock().lock();
try {
queueMap = zkData.getTopicMap().get(topic);
} finally {
zkData.getTopicLock().unlock();
}
Integer queueId;
Integer shardingId;
if (queueMap == null) {
if (zkData.getTopicMap().get(topic) == null) {
zkClient.registerTopic(topic, config.getQueueCountPerTopic());
zkData.getTopicLock().lock();
try {
while (!zkData.getTopicMap().containsKey(topic)) {
while (zkData.getTopicMap().get(topic) == null
|| zkData.getTopicMap().get(topic).size() != config.getQueueCountPerTopic()) {
zkData.getTopicCondition().awaitUninterruptibly();
}
} finally {
Expand All @@ -54,7 +48,7 @@ public boolean send(String topic, byte[] messageBytes) {

zkData.getTopicLock().lock();
try {
queueMap = zkData.getTopicMap().get(topic);
Map<Integer, Integer> queueMap = zkData.getTopicMap().get(topic);
int queueCount = queueMap.size();
int randomIndex = ThreadLocalRandom.current().nextInt(0, queueCount);
Integer[] queueArray = queueMap.keySet().toArray(new Integer[0]);
Expand Down
Loading

0 comments on commit 7f1674e

Please sign in to comment.