Skip to content

Commit

Permalink
add LogManager to manager topic read/write
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 20, 2017
1 parent af23de0 commit 859572d
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,22 +1,70 @@
package com.github.wenweihu86.distmq.broker;

import com.github.wenweihu86.distmq.broker.log.LogManager;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.raft.StateMachine;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

/**
* Created by wenweihu86 on 2017/6/17.
*/
public class BrokerStateMachine implements StateMachine {
private static final Logger LOG = LoggerFactory.getLogger(BrokerStateMachine.class);
private String messageDir;
private LogManager logManager;

public BrokerStateMachine() {
String dataDir = GlobalConf.getInstance().getDataDir();
this.messageDir = dataDir + File.separator + "message";
}

@Override
public void writeSnapshot(String snapshotDir) {
try {
File messageDirFile = new File(messageDir);
File snapshotDirFile = new File(snapshotDir);
if (snapshotDirFile.exists()) {
FileUtils.deleteDirectory(snapshotDirFile);
}
if (messageDirFile.exists()) {
FileUtils.copyDirectory(messageDirFile, snapshotDirFile);
}
} catch (IOException ex) {
LOG.warn("snapshot failed");
}
}

@Override
public void readSnapshot(String snapshotDir) {
try {
File mqDirFile = new File(messageDir);
if (mqDirFile.exists()) {
FileUtils.deleteDirectory(mqDirFile);
}
File snapshotDirFile = new File(snapshotDir);
if (snapshotDirFile.exists()) {
FileUtils.copyDirectory(snapshotDirFile, mqDirFile);
}
logManager = new LogManager(messageDir);
} catch (IOException ex) {
LOG.error("readSnapshot error");
throw new RuntimeException(ex);
}
}

@Override
public void apply(byte[] dataBytes) {
try {
BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.parseFrom(dataBytes);
// TODO: 找到segment log,写入消息
} catch (Exception ex) {
LOG.warn("apply exception:", ex);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class GlobalConf {
RaftMessage.Server localServer; // 本机节点
List<RaftMessage.Server> servers; // 集群所有节点
private String dataDir; // 数据目录
private int defaultQueueNumPerTopic; // 每个topic的默认queue个数
private int maxSegmentSize; // 单个segment文件最大大小

public GlobalConf() {
Expand All @@ -29,6 +30,7 @@ public GlobalConf() {
localServer = readLocalServer();
servers = readServers();
dataDir = toml.getString("data_dir");
defaultQueueNumPerTopic = toml.getLong("default_queue_num_per_topic").intValue();
maxSegmentSize = toml.getLong("max_segment_size").intValue();
}

Expand Down Expand Up @@ -82,6 +84,10 @@ public String getDataDir() {
return dataDir;
}

public int getDefaultQueueNumPerTopic() {
return defaultQueueNumPerTopic;
}

public int getMaxSegmentSize() {
return maxSegmentSize;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.github.wenweihu86.distmq.broker.log;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

/**
* Created by wenweihu86 on 2017/6/20.
*/
public class LogManager {
private static final Logger LOG = LoggerFactory.getLogger(LogManager.class);
private String logDir;
private Map<String, Map<Integer, SegmentedLog>> topicLogMap;

public LogManager(String logDir) {
this.topicLogMap = new HashMap<>();
this.logDir = logDir;
File dirFile = new File(logDir);
if (!dirFile.exists()) {
dirFile.mkdirs();
}
File[] topicDirs = dirFile.listFiles();
if (topicDirs != null) {
for (File topicDir : topicDirs) {
if (!topicDir.isDirectory()) {
LOG.warn("{} is not directory", topicDir.getAbsolutePath());
continue;
}
LOG.info("Loading log from " + topicDir.getAbsolutePath());
if (!this.topicLogMap.containsKey(topicDir)) {
this.topicLogMap.put(topicDir.getName(), new HashMap<Integer, SegmentedLog>());
}
Map<Integer, SegmentedLog> queueMap = this.topicLogMap.get(topicDir.getName());
File[] queueDirs = topicDir.listFiles();
if (queueDirs != null) {
for (File queueDir : queueDirs) {
if (!queueDir.isDirectory()) {
LOG.warn("{} is not directory", queueDir.getAbsolutePath());
continue;
}
Integer queueId = Integer.valueOf(queueDir.getName());
String fullQueuePath = logDir + File.separator + topicDir + File.separator + queueDir;
SegmentedLog queueLog = new SegmentedLog(fullQueuePath);
queueMap.put(queueId, queueLog);
}
}
}
}
}

public SegmentedLog getOrCreateQueueLog(String topic, int queue) {
// TODO:
// 需要读取zookeeper中topic/queue信息来判断该queue是否应该存在本broker集群分片
// zookeeper存储结构为/distmq/topics/topicName/queueId -> brokerShardingId
return null;
}

}
1 change: 1 addition & 0 deletions distmq-broker/src/main/resources/broker.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
data_dir = "./data"
default_queue_num_per_topic = 8
max_segment_size = 100000000 # 100m

[local_server]
Expand Down

0 comments on commit 859572d

Please sign in to comment.