Skip to content

Commit

Permalink
update sendMessage api for broker
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 22, 2017
1 parent 5078b5d commit c47ea2c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.api.BrokerAPI;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.zk.ZKData;
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.rpc.client.RPCClient;
import com.github.wenweihu86.rpc.client.RPCProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* Created by wenweihu86 on 2017/6/17.
*/
Expand All @@ -23,7 +30,61 @@ public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine) {

@Override
public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRequest request) {
return null;
BrokerMessage.SendMessageResponse.Builder responseBuilder = BrokerMessage.SendMessageResponse.newBuilder();
BrokerMessage.BaseResponse.Builder baseResBuilder = BrokerMessage.BaseResponse.newBuilder();
baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_FAIL);
ZKData zkData = ZKData.getInstance();
Map<String, Map<Integer, Integer>> topicMap = zkData.getTopicMap();
Map<Integer, Integer> queueMap = topicMap.get(request.getTopic());
// topic由producer提前创建完成,所以这里会校验不存在的话,直接返回失败
if (queueMap == null) {
String message = "topic is not exist";
baseResBuilder.setResMsg(message);
responseBuilder.setBaseRes(baseResBuilder.build());
LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}",
request.getTopic(), request.getQueue(),
responseBuilder.getBaseRes().getResCode(),
responseBuilder.getBaseRes().getResMsg());
return responseBuilder.build();
}
GlobalConf conf = GlobalConf.getInstance();
Integer shardingId = queueMap.get(request.getQueue());
if (shardingId == null || shardingId != conf.getShardingId()) {
String message = "queue not exist or not be included by this sharding";
baseResBuilder.setResMsg(message);
responseBuilder.setBaseRes(baseResBuilder.build());
LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}",
request.getTopic(), request.getQueue(),
responseBuilder.getBaseRes().getResCode(),
responseBuilder.getBaseRes().getResMsg());
return responseBuilder.build();
}

// 如果自己不是leader,将写请求转发给leader
if (raftNode.getLeaderId() <= 0) {
baseResBuilder.setResMsg("leader not exists");
responseBuilder.setBaseRes(baseResBuilder);
} else if (raftNode.getLeaderId() != raftNode.getLocalServer().getServerId()) {
RPCClient rpcClient = raftNode.getPeerMap().get(raftNode.getLeaderId()).getRpcClient();
BrokerAPI brokerAPI = RPCProxy.getProxy(rpcClient, BrokerAPI.class);
BrokerMessage.SendMessageResponse responseFromLeader = brokerAPI.sendMessage(request);
responseBuilder.mergeFrom(responseFromLeader);
} else {
// 数据同步写入raft集群
byte[] data = request.toByteArray();
boolean success = raftNode.replicate(data, RaftMessage.EntryType.ENTRY_TYPE_DATA);
baseResBuilder.setResCode(
success ? BrokerMessage.ResCode.RES_CODE_SUCCESS
: BrokerMessage.ResCode.RES_CODE_FAIL);
responseBuilder.setBaseRes(baseResBuilder);
}

BrokerMessage.SendMessageResponse response = responseBuilder.build();
LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}",
request.getTopic(), request.getQueue(),
responseBuilder.getBaseRes().getResCode(),
responseBuilder.getBaseRes().getResMsg());
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.broker.log.LogManager;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.zk.ZKData;
import com.github.wenweihu86.raft.StateMachine;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Map;

/**
* Created by wenweihu86 on 2017/6/17.
Expand Down Expand Up @@ -63,7 +65,6 @@ public void readSnapshot(String snapshotDir) {
public void apply(byte[] dataBytes) {
try {
BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.parseFrom(dataBytes);
// TODO: 找到segment log,写入消息
} catch (Exception ex) {
LOG.warn("apply exception:", ex);
}
Expand Down

0 comments on commit c47ea2c

Please sign in to comment.