Skip to content

Commit

Permalink
update broker rpc client
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jul 2, 2017
1 parent 0db889f commit 48b5559
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 255 deletions.
2 changes: 1 addition & 1 deletion distmq-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
<dependency>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.2.0</version>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public static void main(String[] args) {
server.start();
raftNode.init();

// 订阅broker和topic的变化
metadataManager.subscribeBroker();
// 订阅topic的变化
metadataManager.subscribeTopic();
// 等成为raft集群成员后,才能注册到zk
while (!ConfigurationUtils.containsServer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ private List<RaftMessage.Server> readServers() {
private ZKConf readZKConf() {
Toml zookeeperToml = toml.getTable("zookeeper");
zkConf = new ZKConf();
zkConf.setServers(zookeeperToml.getString("servers"));
zkConf.setConnectTimeoutMs(zookeeperToml.getLong("connect_timeout_ms").intValue());
zkConf.setSessionTimeoutMs(zookeeperToml.getLong("session_timeout_ms").intValue());
zkConf.setRetryCount(zookeeperToml.getLong("retry_count").intValue());
zkConf.setRetryIntervalMs(zookeeperToml.getLong("retry_interval_ms").intValue());
zkConf.setBasePath(zookeeperToml.getString("base_path"));
zkConf.setZKServers(zookeeperToml.getString("servers"));
zkConf.setZKConnectTimeoutMs(zookeeperToml.getLong("connect_timeout_ms").intValue());
zkConf.setZKSessionTimeoutMs(zookeeperToml.getLong("session_timeout_ms").intValue());
zkConf.setZKRetryCount(zookeeperToml.getLong("retry_count").intValue());
zkConf.setZKRetryIntervalMs(zookeeperToml.getLong("retry_interval_ms").intValue());
zkConf.setZKBasePath(zookeeperToml.getString("base_path"));
return zkConf;
}

Expand Down
2 changes: 1 addition & 1 deletion distmq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<dependency>
<groupId>com.github.wenweihu86.rpc</groupId>
<artifactId>rpc-java</artifactId>
<version>1.5.0</version>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,49 @@
import com.github.wenweihu86.rpc.client.RPCClient;
import com.github.wenweihu86.rpc.client.RPCClientOptions;
import com.github.wenweihu86.rpc.client.RPCProxy;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;

import java.util.Collection;
import java.util.List;

/**
* Created by wenweihu86 on 2017/6/24.
*/
public class BrokerClient {
private String address;
private List<String> addressList;
private RPCClient rpcClient;
private BrokerAPI brokerAPI;

public BrokerClient(String address, RPCClientOptions options) {
this.address = address;
this.rpcClient = new RPCClient(address, options);
public BrokerClient(List<String> addressList, RPCClientOptions options) {
this.addressList = addressList;
String ipPorts = StringUtils.join(addressList, ",");
this.rpcClient = new RPCClient(ipPorts, options);
this.brokerAPI = RPCProxy.getProxy(this.rpcClient, BrokerAPI.class);
}

public void addEndPoint(Collection<String> ipPortList) {
addressList.addAll(ipPortList);
String ipPorts = StringUtils.join(ipPortList, ",");
rpcClient.addEndPoints(ipPorts);
}

public void removeEndPoint(Collection<String> ipPortList) {
addressList.removeAll(ipPortList);
String ipPorts = StringUtils.join(ipPortList, ",");
rpcClient.removeEndPoints(ipPorts);
}

@Override
public boolean equals(Object object) {
boolean flag = false;
if (object != null && BrokerClient.class.isAssignableFrom(object.getClass())) {
BrokerClient rhs = (BrokerClient) object;
flag = new EqualsBuilder()
.append(address, rhs.address)
.append(addressList, rhs.addressList)
.isEquals();
}
return flag;
Expand All @@ -36,16 +55,16 @@ public boolean equals(Object object) {
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(address)
.append(addressList)
.toHashCode();
}

public String getAddress() {
return address;
public List<String> getAddressList() {
return addressList;
}

public void setAddress(String address) {
this.address = address;
public void setAddressList(List<String> addressList) {
this.addressList = addressList;
}

public RPCClient getRpcClient() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package com.github.wenweihu86.distmq.client.consumer;

import com.github.wenweihu86.distmq.client.BrokerClient;
import com.github.wenweihu86.distmq.client.BrokerClientManager;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.utils.JsonUtil;
import com.github.wenweihu86.distmq.client.zk.MetadataManager;
import com.github.wenweihu86.distmq.client.zk.Metadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -25,7 +22,6 @@ public class Consumer implements Runnable {
public Consumer(ConsumerConfig config, MessageListener listener) {
this.config = config;
this.listener = listener;
BrokerClientManager.setRpcClientOptions(this.config.getRPCClientOptions());
metadataManager = new MetadataManager(config);
metadataManager.registerConsumer(config.getConsumerGroup(), config.getConsumerId());
metadataManager.updateConsumerIds(config.getConsumerGroup());
Expand All @@ -47,12 +43,7 @@ public void run() {
Integer queueId = entry.getKey();
Integer shardingId = entry.getValue();
long offset = metadataManager.getConsumerOffset(queueId);
List<String> brokers = metadataManager.getBrokerAddressList(shardingId);

int randIndex = ThreadLocalRandom.current().nextInt(0, brokers.size());
ConcurrentMap<String, BrokerClient> brokerClientMap
= BrokerClientManager.getInstance().getBrokerClientMap();
BrokerClient brokerClient = brokerClientMap.get(brokers.get(randIndex));
BrokerClient brokerClient = metadataManager.getBrokerClient(shardingId);

BrokerMessage.PullMessageRequest request = BrokerMessage.PullMessageRequest.newBuilder()
.setTopic(config.getTopic())
Expand All @@ -62,9 +53,8 @@ public void run() {
.build();
BrokerMessage.PullMessageResponse response = brokerClient.getBrokerAPI().pullMessage(request);
if (response == null || response.getBaseRes().getResCode() != BrokerMessage.ResCode.RES_CODE_SUCCESS) {
LOG.warn("pullMessage failed, topic={}, queue={}, offset={}, broker={}",
request.getTopic(), request.getQueue(), request.getOffset(),
brokers.get(randIndex));
LOG.warn("pullMessage failed, topic={}, queue={}, offset={}, shardingId={}",
request.getTopic(), request.getQueue(), request.getOffset(), shardingId);
} else {
LOG.info("pullMessage success, topic={}, queue={}, offset={}, size={}",
request.getTopic(), request.getQueue(), request.getOffset(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package com.github.wenweihu86.distmq.client.producer;

import com.github.wenweihu86.distmq.client.BrokerClient;
import com.github.wenweihu86.distmq.client.BrokerClientManager;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.zk.MetadataManager;
import com.github.wenweihu86.distmq.client.zk.Metadata;
import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

Expand All @@ -23,7 +20,6 @@ public class Producer {

public Producer(ProducerConfig config) {
this.config = config;
BrokerClientManager.setRpcClientOptions(this.config.getRPCClientOptions());
metadataManager = new MetadataManager(config);
metadataManager.subscribeBroker();
metadataManager.subscribeTopic();
Expand Down Expand Up @@ -51,20 +47,16 @@ public boolean send(String topic, byte[] messageBytes) {
Integer shardingId = queueMap.get(queueId);

// send message to broker
List<String> brokerAddressList = metadataManager.getBrokerAddressList(shardingId);
int randIndex = ThreadLocalRandom.current().nextInt(0, brokerAddressList.size());
String brokerAddress = brokerAddressList.get(randIndex);
BrokerClient brokerClient = BrokerClientManager.getInstance().getBrokerClientMap().get(brokerAddress);

BrokerClient brokerClient = metadataManager.getBrokerClient(shardingId);
BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.newBuilder()
.setTopic(topic)
.setQueue(queueId)
.setContent(ByteString.copyFrom(messageBytes))
.build();
BrokerMessage.SendMessageResponse response = brokerClient.getBrokerAPI().sendMessage(request);
if (response == null || response.getBaseRes().getResCode() != BrokerMessage.ResCode.RES_CODE_SUCCESS) {
LOG.warn("send message failed, topic={}, queue={}, brokerAddress={}",
topic, queueId, brokerAddress);
LOG.warn("send message failed, topic={}, queue={}, shardingId={}",
topic, queueId, shardingId);
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.github.wenweihu86.distmq.client.zk;

import com.github.wenweihu86.distmq.client.BrokerClient;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -10,8 +14,7 @@
*/
public class Metadata {
// shardingId -> broker address list
private Map<Integer, List<String>> brokerMap = new HashMap<>();
private Lock brokerLock = new ReentrantLock();
private ConcurrentMap<Integer, BrokerClient> brokerMap = new ConcurrentHashMap<>();

// topic -> (queueId -> shardingId)
private Map<String, Map<Integer, Integer>> topicMap = new HashMap<>();
Expand Down Expand Up @@ -39,76 +42,29 @@ public long getConsumerOffset(Integer queueId) {
return offset;
}

public List<String> getBrokerAddressList(Integer shardingId) {
List<String> result = new ArrayList<>();
brokerLock.lock();
try {
List<String> brokers = brokerMap.get(shardingId);
if (brokers != null) {
result.addAll(brokers);
}
} finally {
brokerLock.unlock();
}
return result;
}

public void addShardingBrokerAddress(Integer shardingId, String address) {
brokerLock.lock();
try {
brokerMap.get(shardingId).add(address);
} finally {
brokerLock.unlock();
}
}

public void removeShardingBrokerAddress(Integer shardingId, String address) {
brokerLock.lock();
try {
brokerMap.get(shardingId).remove(address);
} finally {
brokerLock.unlock();
}
}

public void updateBrokerSharding(Integer shardingId, List<String> brokerAddressList) {
brokerLock.lock();
try {
brokerMap.put(shardingId, brokerAddressList);
} finally {
brokerLock.unlock();
}
}

public List<String> removeBrokerSharding(Integer shardingId) {
brokerLock.lock();
try {
return brokerMap.remove(Integer.valueOf(shardingId));
} finally {
brokerLock.unlock();
public List<String> getBrokerAddressList(Integer shardingId) {
List<String> result = new ArrayList<>();
BrokerClient brokerClient = brokerMap.get(shardingId);
if (brokerClient == null) {
return result;
} else {
return brokerClient.getAddressList();
}
}

public List<String> getBrokerShardings() {
List<String> shardings = new ArrayList<>();
brokerLock.lock();
try {
for (Integer shardingId : brokerMap.keySet()) {
shardings.add(String.valueOf(shardingId));
}
} finally {
brokerLock.unlock();
Set<Integer> shardingIdSet = brokerMap.keySet();
for (Integer shardingId : shardingIdSet) {
shardings.add(String.valueOf(shardingId));
}
return shardings;
}

public List<Integer> getBrokerShardingIds() {
brokerLock.lock();
try {
return new ArrayList<>(brokerMap.keySet());
} finally {
brokerLock.unlock();
}
return new ArrayList<>(brokerMap.keySet());
}

public List<String> getAllTopics() {
Expand Down Expand Up @@ -212,18 +168,10 @@ public void deleteTopicQueue(String topic, Collection<Integer> queueIds) {
}
}

public Map<Integer, List<String>> getBrokerMap() {
public ConcurrentMap<Integer, BrokerClient> getBrokerMap() {
return brokerMap;
}

public void setBrokerMap(Map<Integer, List<String>> brokerMap) {
this.brokerMap = brokerMap;
}

public Lock getBrokerLock() {
return brokerLock;
}

public Map<String, Map<Integer, Integer>> getTopicMap() {
return topicMap;
}
Expand Down Expand Up @@ -257,10 +205,6 @@ public Lock getConsumerIdsLock() {
return consumerIdsLock;
}

public void setConsumerIdsLock(Lock consumerIdsLock) {
this.consumerIdsLock = consumerIdsLock;
}

public Map<Integer, Long> getConsumerOffsetMap() {
return consumerOffsetMap;
}
Expand Down
Loading

0 comments on commit 48b5559

Please sign in to comment.