Skip to content

Commit

Permalink
complete broker api
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 23, 2017
1 parent c47ea2c commit 5ecbf35
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 58 deletions.
38 changes: 38 additions & 0 deletions distmq-broker/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
http://maven.apache.org/xsd/assembly-2.0.0.xsd">
<id>deploy</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<!--<baseDirectory>distmq-broker</baseDirectory>-->
<fileSets>
<!-- app bin -->
<fileSet>
<directory>src/main/assembly/bin</directory>
<outputDirectory>bin</outputDirectory>
<includes>
<include>./**/run.sh</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<!-- app main conf -->
<fileSet>
<directory>target/conf</directory>
<outputDirectory>conf</outputDirectory>
<includes>
<include>./**/*.xml</include>
<include>./**/*.properties</include>
<include>./**/*.toml</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
</dependencySet>
</dependencySets>
</assembly>
29 changes: 29 additions & 0 deletions distmq-broker/src/main/assembly/bin/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash

JMX_PORT=18501
GC_LOG=./logs/gc.log
#jvm config
JAVA_BASE_OPTS=" -Djava.awt.headless=true -Dfile.encoding=UTF-8 "

JAVA_JMX_OPTS=" -Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=$JMX_PORT \
-Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.authenticate=false "

JAVA_MEM_OPTS=" -server -Xms2g -Xmx2g -Xmn600m -XX:PermSize=128m \
-XX:MaxPermSize=128m -Xss256K -XX:+DisableExplicitGC \
-XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled \
-XX:+UseCMSCompactAtFullCollection -XX:LargePageSizeInBytes=128m \
-XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly \
-XX:CMSInitiatingOccupancyFraction=70 "

JAVA_GC_OPTS=" -verbose:gc -Xloggc:$GC_LOG \
-XX:+PrintGCDetails -XX:+PrintGCDateStamps "

JAVA_CP=" -cp conf:lib/* "

JAVA_OPTS=" $JAVA_BASE_OPTS $JAVA_MEM_OPTS $JAVA_JMX_OPTS $JAVA_GC_OPTS $JAVA_CP"

RUNJAVA="$JAVA_HOME/bin/java"

$RUNJAVA $JAVA_CP com.github.wenweihu86.distmq.broker.BrokerMain
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe

@Override
public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRequest request) {
return null;
BrokerMessage.PullMessageResponse response = stateMachine.pullMessage(request);
LOG.info("pullMessage request, topic={}, queue={}, "
+ "resCode, resSize={}",
request.getTopic(), request.getQueue(),
response.getBaseRes().getResCode(),
response.getContentsCount());
return response;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.zk.ZKClient;
import com.github.wenweihu86.distmq.client.zk.ZKConf;
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.RaftOptions;
import com.github.wenweihu86.raft.proto.RaftMessage;
Expand Down Expand Up @@ -43,6 +45,14 @@ public static void main(String[] args) {
// 启动RPCServer,初始化Raft节点
server.start();
raftNode.init();
// 注册zk
ZKConf zkConf = conf.getZkConf();
ZKClient zkClient = new ZKClient(zkConf);
zkClient.registerBroker(conf.getShardingId(),
conf.getLocalServer().getEndPoint().getHost(),
conf.getLocalServer().getEndPoint().getPort());
zkClient.subscribeBroker();
zkClient.subscribeTopic();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.broker.log.LogManager;
import com.github.wenweihu86.distmq.broker.log.Segment;
import com.github.wenweihu86.distmq.broker.log.SegmentedLog;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.distmq.client.zk.ZKData;
import com.github.wenweihu86.raft.StateMachine;
import com.google.protobuf.ByteString;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,9 +68,36 @@ public void readSnapshot(String snapshotDir) {
public void apply(byte[] dataBytes) {
try {
BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.parseFrom(dataBytes);
SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue());
segmentedLog.append(request.getContent().toByteArray());
} catch (Exception ex) {
LOG.warn("apply exception:", ex);
}
}

public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRequest request) {
BrokerMessage.PullMessageResponse.Builder responseBuilder = BrokerMessage.PullMessageResponse.newBuilder();
SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue());
int readCount = 0;
long offset = request.getOffset();
if (offset == 0) {
offset = Segment.SEGMENT_HEADER_LENGTH;
}
while (readCount < request.getMessageCount()) {
byte[] messageBytes = segmentedLog.read(offset);
if (messageBytes == null) {
break;
}
BrokerMessage.MessageContent message = BrokerMessage.MessageContent.newBuilder()
.setTopic(request.getTopic())
.setQueue(request.getQueue())
.setOffset(offset)
.setContent(ByteString.copyFrom(messageBytes))
.build();
responseBuilder.addContents(message);
offset += messageBytes.length + Segment.MESSAGE_HEADER_LENGTH;
}
return responseBuilder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Created by wenweihu86 on 2017/6/20.
Expand All @@ -14,10 +16,10 @@ public class LogManager {
private static final Logger LOG = LoggerFactory.getLogger(LogManager.class);
private String logDir;
// topic -> (queueId -> segment log)
private Map<String, Map<Integer, SegmentedLog>> topicLogMap;
private ConcurrentMap<String, ConcurrentMap<Integer, SegmentedLog>> topicLogMap;

public LogManager(String logDir) {
this.topicLogMap = new HashMap<>();
this.topicLogMap = new ConcurrentHashMap<>();
this.logDir = logDir;
File dirFile = new File(logDir);
if (!dirFile.exists()) {
Expand All @@ -32,7 +34,7 @@ public LogManager(String logDir) {
}
LOG.info("Loading log from " + topicDir.getAbsolutePath());
if (!this.topicLogMap.containsKey(topicDir)) {
this.topicLogMap.put(topicDir.getName(), new HashMap<Integer, SegmentedLog>());
this.topicLogMap.put(topicDir.getName(), new ConcurrentHashMap<Integer, SegmentedLog>());
}
Map<Integer, SegmentedLog> queueMap = this.topicLogMap.get(topicDir.getName());
File[] queueDirs = topicDir.listFiles();
Expand All @@ -53,10 +55,20 @@ public LogManager(String logDir) {
}

public SegmentedLog getOrCreateQueueLog(String topic, int queue) {
// TODO:
// 需要读取zookeeper中topic/queue信息来判断该queue是否应该存在本broker集群分片
// zookeeper存储结构为/distmq/topics/topicName/queueId -> brokerShardingId
return null;
ConcurrentMap<Integer, SegmentedLog> queueMap = topicLogMap.get(topic);
if (queueMap == null) {
queueMap = new ConcurrentHashMap<>();
topicLogMap.putIfAbsent(topic, queueMap);
queueMap = topicLogMap.get(topic);
}
SegmentedLog segmentedLog = queueMap.get(queue);
if (segmentedLog == null) {
String fullQueuePath = logDir + File.separator + topic + File.separator + queue;
segmentedLog = new SegmentedLog(fullQueuePath);
queueMap.putIfAbsent(queue, segmentedLog);
segmentedLog = queueMap.get(queue);
}
return segmentedLog;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.wenweihu86.distmq.broker.log;

import com.github.wenweihu86.distmq.broker.BrokerUtils;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.raft.util.RaftFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,8 +89,8 @@ public long append(byte[] messageContent) {
byteBuffer.putInt(messageContent.length);
byteBuffer.put(messageContent);
writeSize = channel.write(byteBuffer);
endOffset += writeSize;
offset = startOffset;
endOffset = startOffset + writeSize;
} else {
byteBuffer.putLong(BrokerUtils.getCRC32(messageContent));
byteBuffer.putInt(messageContent.length);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.wenweihu86.distmq.broker.log;

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.raft.util.RaftFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,15 +104,15 @@ public byte[] read(long offset) {
return segment.read(offset);
}

public void readSegments() {
private void readSegments() {
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(segmentDir);
for (String fileName: fileNames) {
Segment segment = new Segment(segmentDir, fileName);
startOffsetSegmentMap.put(segment.getStartOffset(), segment);
}
}

public void validateSegments() {
private void validateSegments() {
long lastEndOffset = 0;
for (Segment segment : startOffsetSegmentMap.values()) {
if (lastEndOffset > 0 && segment.getStartOffset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Created by wenweihu86 on 2017/6/15.
*/
public interface BrokerAPI {

BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRequest request);

BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRequest request);
}
Loading

0 comments on commit 5ecbf35

Please sign in to comment.