Skip to content

Commit

Permalink
update snapshot to create symbol link
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jul 2, 2017
1 parent 48b5559 commit e25be4d
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 26 deletions.
2 changes: 1 addition & 1 deletion distmq-broker/src/main/assembly/bin/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ JAVA_OPTS=" $JAVA_BASE_OPTS $JAVA_MEM_OPTS $JAVA_JMX_OPTS $JAVA_GC_OPTS $JAVA_CP

RUNJAVA="$JAVA_HOME/bin/java"

$RUNJAVA $JAVA_CP com.github.wenweihu86.distmq.broker.BrokerMain
$RUNJAVA $JAVA_OPTS $JAVA_CP com.github.wenweihu86.distmq.broker.BrokerMain
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public static void main(String[] args) {
BrokerStateMachine stateMachine = new BrokerStateMachine();
// 设置数据目录
RaftOptions.dataDir = dataDir;
// just for test snapshot
RaftOptions.snapshotMinLogSize = 10 * 1024;
RaftOptions.snapshotPeriodSeconds = 30;
RaftOptions.maxSegmentFileSize = 1024 * 1024;
// 初始化RaftNode
RaftNode raftNode = new RaftNode(servers, localServer, stateMachine);
// 注册Raft节点之间相互调用的服务
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Created by wenweihu86 on 2017/6/17.
Expand All @@ -19,6 +23,8 @@ public class BrokerStateMachine implements StateMachine {
private static final Logger LOG = LoggerFactory.getLogger(BrokerStateMachine.class);
private String messageDir;
private LogManager logManager;
// 状态机数据是否可用,当在read snapshot时,状态机数据不可用,主要发生在每次install snapshot时。
private AtomicBoolean isAvailable = new AtomicBoolean(true);

public BrokerStateMachine() {
String dataDir = GlobalConf.getInstance().getDataDir();
Expand All @@ -27,56 +33,77 @@ public BrokerStateMachine() {

@Override
public void writeSnapshot(String snapshotDir) {
// TODO:改成硬链接形式,提升速度和节省空间
// 采用软链接形式,提升速度和节省空间
try {
File messageDirFile = new File(messageDir);
File snapshotDirFile = new File(snapshotDir);
if (snapshotDirFile.exists()) {
FileUtils.deleteDirectory(snapshotDirFile);
}
if (messageDirFile.exists()) {
FileUtils.copyDirectory(messageDirFile, snapshotDirFile);
Path link = FileSystems.getDefault().getPath(snapshotDir);
Path target = FileSystems.getDefault().getPath(messageDir);
Files.createSymbolicLink(link, target);
}
} catch (IOException ex) {
LOG.warn("snapshot failed");
LOG.warn("write snapshot failed, exception:", ex);
}
}

@Override
public void readSnapshot(String snapshotDir) {
try {
File mqDirFile = new File(messageDir);
if (mqDirFile.exists()) {
FileUtils.deleteDirectory(mqDirFile);
}
File snapshotDirFile = new File(snapshotDir);
if (snapshotDirFile.exists()) {
FileUtils.copyDirectory(snapshotDirFile, mqDirFile);
isAvailable.compareAndSet(true, false);
Path link = FileSystems.getDefault().getPath(snapshotDir);
if (!Files.isSymbolicLink(link)) {
// 非符号链接,表示从leader节点同步拷贝的
if (logManager != null) {
logManager.close();
}
File messageDirFile = new File(messageDir);
if (messageDirFile.exists()) {
FileUtils.deleteDirectory(messageDirFile);
}
File snapshotDirFile = new File(snapshotDir);
if (snapshotDirFile.exists()) {
FileUtils.copyDirectory(snapshotDirFile, messageDirFile);
}
}
logManager = new LogManager(messageDir);
} catch (IOException ex) {
LOG.error("readSnapshot error");
LOG.error("readSnapshot exception:", ex);
throw new RuntimeException(ex);
} finally {
isAvailable.compareAndSet(false, true);
}
}

@Override
public void apply(byte[] dataBytes) {
try {
BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.parseFrom(dataBytes);
BrokerMessage.MessageContent.Builder message = BrokerMessage.MessageContent.newBuilder()
.setTopic(request.getTopic())
.setQueue(request.getQueue())
.setContent(request.getContent());
SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue());
segmentedLog.append(message);
if (isAvailable.get()) {
BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.parseFrom(dataBytes);
BrokerMessage.MessageContent.Builder message = BrokerMessage.MessageContent.newBuilder()
.setTopic(request.getTopic())
.setQueue(request.getQueue())
.setContent(request.getContent());
SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue());
segmentedLog.append(message);
}
} catch (Exception ex) {
LOG.warn("apply exception:", ex);
}
}

public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRequest request) {
BrokerMessage.PullMessageResponse.Builder responseBuilder = BrokerMessage.PullMessageResponse.newBuilder();
BrokerMessage.BaseResponse.Builder baseResBuilder = BrokerMessage.BaseResponse.newBuilder();
if (!isAvailable.get()) {
baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_FAIL);
baseResBuilder.setResMsg("state machine is busy");
responseBuilder.setBaseRes(baseResBuilder);
return responseBuilder.build();
}
SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue());
int readCount = 0;
long offset = request.getOffset();
Expand All @@ -89,6 +116,9 @@ public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRe
offset = message.getOffset() + message.getSize();
readCount++;
}

baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_SUCCESS);
responseBuilder.setBaseRes(baseResBuilder);
return responseBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.*;

/**
Expand Down Expand Up @@ -79,6 +77,18 @@ public SegmentedLog getOrCreateQueueLog(String topic, int queue) {
return segmentedLog;
}

public void close() {
Collection<ConcurrentMap<Integer, SegmentedLog>> queueLogs = topicLogMap.values();
List<SegmentedLog> allLogs = new ArrayList<>();
for (ConcurrentMap<Integer, SegmentedLog> queueLogMap : queueLogs) {
allLogs.addAll(queueLogMap.values());
}
for (SegmentedLog log : allLogs) {
log.close();
}
topicLogMap.clear();
}

@Override
public void run() {
GlobalConf conf = GlobalConf.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ public boolean append(BrokerMessage.MessageContent.Builder messageBuilder) {
byteBuffer.putInt(messageBytes.length);
byteBuffer.put(messageBytes);
byteBuffer.flip();
int writeSize = channel.write(byteBuffer);
channel.position(channel.size());
int writeSize = 0;
while (byteBuffer.hasRemaining()) {
writeSize += channel.write(byteBuffer);
}
channel.force(true);
if (writeSize != totalSize) {
LOG.warn("append message failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ public BrokerMessage.MessageContent read(long offset) {
}
}

public void close() {
lock.lock();
try {
for (Segment segment : startOffsetSegmentMap.values()) {
segment.close();
}
startOffsetSegmentMap.clear();
} finally {
lock.unlock();
}
}

private void readSegments() {
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(segmentDir);
for (String fileName: fileNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ public boolean checkTopicExist(String topic) {
public void updateTopicMap(String topic, Map<Integer, Integer> queueMap) {
topicLock.lock();
try {
topicMap.put(topic, queueMap);
if (!topicMap.containsKey(topic)) {
topicMap.put(topic, queueMap);
} else {
topicMap.get(topic).putAll(queueMap);
}
topicCondition.signalAll();
} finally {
topicLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.github.wenweihu86.distmq.client.producer.Producer;
import com.github.wenweihu86.distmq.client.producer.ProducerConfig;

import java.util.UUID;

/**
* Created by wenweihu86 on 2017/6/25.
*/
Expand All @@ -13,8 +15,8 @@ public static void main(String[] args) {
config.setZKServers("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
Producer producer = new Producer(config);
String topic = "example-topic";
for (int i = 0; i < 100; i++) {
String message = "hello-distmq-" + i;
while (true) {
String message = UUID.randomUUID().toString();
boolean success = producer.send(topic, message.getBytes());
if (success) {
System.out.printf("send message success, topic=%s, message=%s\n",
Expand Down

0 comments on commit e25be4d

Please sign in to comment.