Skip to content

Commit

Permalink
add zk client
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 21, 2017
1 parent 859572d commit 11b5aa9
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.RaftOptions;
import com.github.wenweihu86.raft.proto.RaftMessage;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.broker.log.LogManager;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.raft.StateMachine;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.github.wenweihu86.distmq.broker;
package com.github.wenweihu86.distmq.broker.config;

import com.github.wenweihu86.distmq.broker.BrokerUtils;
import com.github.wenweihu86.distmq.client.zk.ZKConf;
import com.github.wenweihu86.raft.proto.RaftMessage;
import com.moandjiezana.toml.Toml;
import org.slf4j.Logger;
Expand All @@ -22,6 +24,9 @@ public class GlobalConf {
private String dataDir; // 数据目录
private int defaultQueueNumPerTopic; // 每个topic的默认queue个数
private int maxSegmentSize; // 单个segment文件最大大小
// 该server属于哪个分片集群,每个分片是leader/followers的raft集群
private int shardingId;
private ZKConf zkConf;

public GlobalConf() {
String fileName = "/broker.toml";
Expand All @@ -32,6 +37,8 @@ public GlobalConf() {
dataDir = toml.getString("data_dir");
defaultQueueNumPerTopic = toml.getLong("default_queue_num_per_topic").intValue();
maxSegmentSize = toml.getLong("max_segment_size").intValue();
shardingId = toml.getLong("sharding_id").intValue();
zkConf = readZKConf();
}

public static GlobalConf getInstance() {
Expand Down Expand Up @@ -72,6 +79,17 @@ private List<RaftMessage.Server> readServers() {
return servers;
}

private ZKConf readZKConf() {
zkConf = new ZKConf();
zkConf.setServers(toml.getString("servers"));
zkConf.setConnectTimeoutMs(toml.getLong("connect_timeout_ms").intValue());
zkConf.setSessionTimeoutMs(toml.getLong("session_timeout_ms").intValue());
zkConf.setRetryCount(toml.getLong("retry_count").intValue());
zkConf.setRetryIntervalMs(toml.getLong("retry_interval_ms").intValue());
zkConf.setBasePath(toml.getString("base_path"));
return zkConf;
}

public RaftMessage.Server getLocalServer() {
return localServer;
}
Expand All @@ -92,4 +110,12 @@ public int getMaxSegmentSize() {
return maxSegmentSize;
}

public int getShardingId() {
return shardingId;
}

public ZKConf getZkConf() {
return zkConf;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.wenweihu86.distmq.broker.log;

import com.github.wenweihu86.distmq.broker.GlobalConf;
import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.raft.util.RaftFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
10 changes: 10 additions & 0 deletions distmq-broker/src/main/resources/broker.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
data_dir = "./data"
default_queue_num_per_topic = 8
max_segment_size = 100000000 # 100m
# 该server属于哪个分片集群,每个分片是leader/followers的raft集群
sharding_id = 1

[zookeeper]
servers = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
connect_timeout_ms = 500
session_timeout_ms = 5000
retry_count = 3
retry_interval_ms = 1000
base_path = "/distmq"

[local_server]
ip = "127.0.0.1"
Expand Down
5 changes: 5 additions & 0 deletions distmq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.github.wenweihu86.distmq.client.zk;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* Created by wenweihu86 on 2017/6/21.
*/
public class ZKClient {
private static final Logger LOG = LoggerFactory.getLogger(ZKClient.class);
private ZKConf zkConf;
private CuratorFramework zkClient;

public ZKClient(ZKConf conf) {
this.zkConf = conf;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(
zkConf.getRetryIntervalMs(), zkConf.getRetryCount());
this.zkClient = CuratorFrameworkFactory.builder()
.connectString(zkConf.getServers())
.retryPolicy(retryPolicy)
.connectionTimeoutMs(zkConf.getConnectTimeoutMs())
.sessionTimeoutMs(zkConf.getSessionTimeoutMs())
.build();
this.zkClient.start();
}

public void registerBroker(int shardingId, String ip, int port) {
String path = String.format("%s/brokers/%d/%s:%d",
zkConf.getBasePath(), shardingId, ip, port);
try {
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "".getBytes());
} catch (Exception ex) {
LOG.warn("registerBroker exception:", ex);
}
}

public void subscribeBroker() {
final ZKData zkData = ZKData.getInstance();
final Map<Integer, List<String>> brokerMap = zkData.getBrokerMap();
try {
final String brokerParentPath = zkConf.getBasePath() + "/brokers";
List<String> shardings = zkClient.getChildren().forPath(brokerParentPath);
for (String sharding : shardings) {
final int shardingId = Integer.valueOf(sharding);
final String shardingPath = brokerParentPath + "/" + sharding;
List<String> brokerAddressList = zkClient.getChildren().forPath(shardingPath);
brokerMap.put(shardingId, brokerAddressList);
zkClient.getChildren().usingWatcher(
new BrokerShardingChildrenWather(shardingId))
.forPath(shardingPath);
}
zkClient.getChildren().usingWatcher(new BrokerChildrenWatcher()).forPath(brokerParentPath);
} catch (Exception ex) {
LOG.warn("subscribeBroker exception:", ex);
}
}

public void createTopic(String topic, int queueNum) {
}

private class BrokerShardingChildrenWather implements CuratorWatcher {
private int shardingId;

public BrokerShardingChildrenWather(int shardingId) {
this.shardingId = shardingId;
}

@Override
public void process(WatchedEvent event) throws Exception {
ZKData zkData = ZKData.getInstance();
Map<Integer, List<String>> brokerMap = zkData.getBrokerMap();
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
String shardingPath = zkConf.getBasePath() + "/brokers/" + shardingId;
List<String> newBrokerAddressList = zkClient.getChildren().forPath(shardingPath);
// TODO: 对于client需要关闭被删除节点的连接,以及新建新增节点连接
brokerMap.put(shardingId, newBrokerAddressList);
}
}
}

private class BrokerChildrenWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent event) throws Exception {
ZKData zkData = ZKData.getInstance();
Map<Integer, List<String>> brokerMap = zkData.getBrokerMap();
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
String brokerPath = zkConf.getBasePath() + "/brokers";
List<String> newShardings = zkClient.getChildren().forPath(brokerPath);
Iterator<Map.Entry<Integer, List<String>>> iterator = brokerMap.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<Integer, List<String>> entry = iterator.next();
if (!newShardings.contains(Integer.valueOf(entry.getKey()))) {
// TODO:对于client,需要删除对应节点的连接
iterator.remove();
}
}
for (String sharding : newShardings) {
int shardingId = Integer.valueOf(sharding);
if (!brokerMap.containsKey(shardingId)) {
String shardingPath = brokerPath + "/" + sharding;
zkClient.getChildren().usingWatcher(
new BrokerShardingChildrenWather(shardingId))
.forPath(shardingPath);
}
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.github.wenweihu86.distmq.client.zk;

/**
* Created by wenweihu86 on 2017/6/21.
*/
public class ZKConf {
private String servers;
private int connectTimeoutMs;
private int sessionTimeoutMs;
private int retryCount;
private int retryIntervalMs;
private String basePath;

public String getServers() {
return servers;
}

public void setServers(String servers) {
this.servers = servers;
}

public int getConnectTimeoutMs() {
return connectTimeoutMs;
}

public void setConnectTimeoutMs(int connectTimeoutMs) {
this.connectTimeoutMs = connectTimeoutMs;
}

public int getSessionTimeoutMs() {
return sessionTimeoutMs;
}

public void setSessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}

public int getRetryCount() {
return retryCount;
}

public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}

public int getRetryIntervalMs() {
return retryIntervalMs;
}

public void setRetryIntervalMs(int retryIntervalMs) {
this.retryIntervalMs = retryIntervalMs;
}

public String getBasePath() {
return basePath;
}

public void setBasePath(String basePath) {
this.basePath = basePath;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.github.wenweihu86.distmq.client.zk;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Created by huwenwei on 2017/6/21.
*/
public class ZKData {
private static ZKData instance;

public static ZKData getInstance() {
if (instance == null) {
instance = new ZKData();
}
return instance;
}

// shardingId -> broker address list
private Map<Integer, List<String>> brokerMap = new ConcurrentHashMap<>();

public static void setInstance(ZKData instance) {
ZKData.instance = instance;
}

public Map<Integer, List<String>> getBrokerMap() {
return brokerMap;
}

public void setBrokerMap(Map<Integer, List<String>> brokerMap) {
this.brokerMap = brokerMap;
}
}

0 comments on commit 11b5aa9

Please sign in to comment.