Skip to content

Commit

Permalink
add broker framework
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 17, 2017
1 parent ebb154c commit 52b9e42
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 0 deletions.
5 changes: 5 additions & 0 deletions distmq-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@
<artifactId>toml4j</artifactId>
<version>0.7.1</version>
</dependency>
<dependency>
<groupId>com.github.wenweihu86.distmq</groupId>
<artifactId>distmq-client</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.distmq.client.api.BrokerAPI;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.raft.RaftNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by wenweihu86 on 2017/6/17.
*/
public class BrokerAPIImpl implements BrokerAPI {

private static final Logger LOG = LoggerFactory.getLogger(BrokerAPIImpl.class);

private RaftNode raftNode;
private BrokerStateMachine stateMachine;

public BrokerAPIImpl(RaftNode raftNode, BrokerStateMachine stateMachine) {
this.raftNode = raftNode;
this.stateMachine = stateMachine;
}

@Override
public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRequest request) {
return null;
}

@Override
public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRequest request) {
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.RaftOptions;
import com.github.wenweihu86.raft.proto.RaftMessage;
import com.github.wenweihu86.raft.service.RaftClientService;
import com.github.wenweihu86.raft.service.RaftConsensusService;
import com.github.wenweihu86.raft.service.impl.RaftClientServiceImpl;
import com.github.wenweihu86.raft.service.impl.RaftConsensusServiceImpl;
import com.github.wenweihu86.rpc.server.RPCServer;

import java.util.List;

/**
* Created by wenweihu86 on 2017/6/17.
*/
public class BrokerMain {
public static void main(String[] args) {
// read conf
GlobalConf conf = GlobalConf.getInstance();
RaftMessage.Server localServer = conf.getLocalServer();
List<RaftMessage.Server> servers = conf.getServers();
String dataDir = conf.getString("data_dir");

// 初始化RPCServer
RPCServer server = new RPCServer(localServer.getEndPoint().getPort());
// 应用状态机
BrokerStateMachine stateMachine = new BrokerStateMachine();
// 设置数据目录
RaftOptions.dataDir = dataDir;
// 初始化RaftNode
RaftNode raftNode = new RaftNode(servers, localServer, stateMachine);
// 注册Raft节点之间相互调用的服务
RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode);
server.registerService(raftConsensusService);
// 注册给Client调用的Raft服务
RaftClientService raftClientService = new RaftClientServiceImpl(raftNode);
server.registerService(raftClientService);
// 注册应用自己提供的服务
BrokerAPIImpl brokerAPI = new BrokerAPIImpl(raftNode, stateMachine);
server.registerService(brokerAPI);
// 启动RPCServer,初始化Raft节点
server.start();
raftNode.init();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.raft.StateMachine;

/**
* Created by wenweihu86 on 2017/6/17.
*/
public class BrokerStateMachine implements StateMachine {

@Override
public void writeSnapshot(String snapshotDir) {
}

@Override
public void readSnapshot(String snapshotDir) {
}

@Override
public void apply(byte[] dataBytes) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.github.wenweihu86.distmq.broker;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by wenweihu86 on 2017/6/17.
*/
public class BrokerUtils {
private static final Logger LOG = LoggerFactory.getLogger(BrokerUtils.class);
private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();

public static String protoToJson(MessageOrBuilder message) {
try {
return PRINTER.print(message);
} catch (InvalidProtocolBufferException ex) {
LOG.warn("get exception: ", ex);
return "";
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.raft.proto.RaftMessage;
import com.moandjiezana.toml.Toml;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
* Created by wenweihu86 on 2017/6/17.
*/
public class GlobalConf {
private static final Logger LOG = LoggerFactory.getLogger(GlobalConf.class);
private static GlobalConf instance;

private Toml toml;

public GlobalConf() {
String fileName = "/broker.toml";
File file = new File(getClass().getResource(fileName).getFile());
toml = new Toml().read(file);
}

public static GlobalConf getInstance() {
if (instance == null) {
instance = new GlobalConf();
}
return instance;
}

public String getString(String key) {
return toml.getString(key);
}

public RaftMessage.Server getLocalServer() {
RaftMessage.Server.Builder serverBuilder = RaftMessage.Server.newBuilder();
RaftMessage.EndPoint.Builder endPointBuilder = RaftMessage.EndPoint.newBuilder();
Toml localServerConf = toml.getTable("local_server");
endPointBuilder.setHost(localServerConf.getString("ip"));
endPointBuilder.setPort(localServerConf.getLong("port").intValue());
serverBuilder.setEndPoint(endPointBuilder);
serverBuilder.setServerId(localServerConf.getLong("id").intValue());
RaftMessage.Server localServer = serverBuilder.build();
LOG.info("read local_server conf={}", BrokerUtils.protoToJson(localServer));
return localServer;
}

public List<RaftMessage.Server> getServers() {
List<RaftMessage.Server> servers = new ArrayList<>();
List<Toml> serverConfList = toml.getTables("servers");
for (Toml serverConf : serverConfList) {
RaftMessage.EndPoint endPoint = RaftMessage.EndPoint.newBuilder()
.setHost(serverConf.getString("ip"))
.setPort(serverConf.getLong("port").intValue())
.build();
RaftMessage.Server server = RaftMessage.Server.newBuilder()
.setEndPoint(endPoint)
.setServerId(serverConf.getLong("id").intValue())
.build();
LOG.info("read conf server={}", BrokerUtils.protoToJson(server));
servers.add(server);
}
return servers;
}

}
21 changes: 21 additions & 0 deletions distmq-broker/src/main/resources/broker.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
data_dir = "./data"

[local_server]
ip = "127.0.0.1"
port = 8501
id = 1

[[servers]]
ip = "127.0.0.1"
port = 8501
id = 1

[[servers]]
ip = "127.0.0.1"
port = 8502
id = 2

[[servers]]
ip = "127.0.0.1"
port = 8503
id = 3
39 changes: 39 additions & 0 deletions distmq-broker/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>

<!-- Don't forget to set system property
-DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
to make all loggers asynchronous. -->

<Configuration status="INFO">
<Appenders>

<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d %p [%t]\t%m%n" />
</Console>

<!-- Async Loggers will auto-flush in batches, so switch off immediateFlush. -->
<RollingFile name="RollingFile" fileName="logs/broker.log" immediateFlush="false"
bufferSize="65536"
filePattern="logs/broker.log.%d{yyyy-MM-dd}" append="true">
<PatternLayout>
<Pattern>%d %p [%t]\t%m%n</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy/>
</Policies>
</RollingFile>

</Appenders>

<Loggers>
<Logger name="com.github.wenweihu86.distmq" level="info" additivity="false">
<AppenderRef ref="Console" />
<AppenderRef ref="RollingFile"/>
</Logger>

<Root level="info" includeLocation="false">
<AppenderRef ref="Console" />
<AppenderRef ref="RollingFile"/>
</Root>
</Loggers>
</Configuration>

0 comments on commit 52b9e42

Please sign in to comment.