From e25be4d7199c30f5bd35387b54f37625c9d5f27d Mon Sep 17 00:00:00 2001 From: wenweihu86 Date: Sun, 2 Jul 2017 22:59:01 +0800 Subject: [PATCH] update snapshot to create symbol link --- distmq-broker/src/main/assembly/bin/run.sh | 2 +- .../wenweihu86/distmq/broker/BrokerMain.java | 4 ++ .../distmq/broker/BrokerStateMachine.java | 66 ++++++++++++++----- .../distmq/broker/log/LogManager.java | 16 ++++- .../wenweihu86/distmq/broker/log/Segment.java | 6 +- .../distmq/broker/log/SegmentedLog.java | 12 ++++ .../wenweihu86/distmq/client/zk/Metadata.java | 6 +- .../distmq/example/ProducerMain.java | 6 +- 8 files changed, 92 insertions(+), 26 deletions(-) diff --git a/distmq-broker/src/main/assembly/bin/run.sh b/distmq-broker/src/main/assembly/bin/run.sh index 824e32a..9d9256a 100644 --- a/distmq-broker/src/main/assembly/bin/run.sh +++ b/distmq-broker/src/main/assembly/bin/run.sh @@ -26,4 +26,4 @@ JAVA_OPTS=" $JAVA_BASE_OPTS $JAVA_MEM_OPTS $JAVA_JMX_OPTS $JAVA_GC_OPTS $JAVA_CP RUNJAVA="$JAVA_HOME/bin/java" -$RUNJAVA $JAVA_CP com.github.wenweihu86.distmq.broker.BrokerMain +$RUNJAVA $JAVA_OPTS $JAVA_CP com.github.wenweihu86.distmq.broker.BrokerMain diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java index 18fe774..1923a5d 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerMain.java @@ -36,6 +36,10 @@ public static void main(String[] args) { BrokerStateMachine stateMachine = new BrokerStateMachine(); // 设置数据目录 RaftOptions.dataDir = dataDir; + // just for test snapshot + RaftOptions.snapshotMinLogSize = 10 * 1024; + RaftOptions.snapshotPeriodSeconds = 30; + RaftOptions.maxSegmentFileSize = 1024 * 1024; // 初始化RaftNode RaftNode raftNode = new RaftNode(servers, localServer, stateMachine); // 注册Raft节点之间相互调用的服务 diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerStateMachine.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerStateMachine.java index a5caa03..1fe12a6 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerStateMachine.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/BrokerStateMachine.java @@ -11,6 +11,10 @@ import java.io.File; import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicBoolean; /** * Created by wenweihu86 on 2017/6/17. @@ -19,6 +23,8 @@ public class BrokerStateMachine implements StateMachine { private static final Logger LOG = LoggerFactory.getLogger(BrokerStateMachine.class); private String messageDir; private LogManager logManager; + // 状态机数据是否可用,当在read snapshot时,状态机数据不可用,主要发生在每次install snapshot时。 + private AtomicBoolean isAvailable = new AtomicBoolean(true); public BrokerStateMachine() { String dataDir = GlobalConf.getInstance().getDataDir(); @@ -27,7 +33,7 @@ public BrokerStateMachine() { @Override public void writeSnapshot(String snapshotDir) { - // TODO:改成硬链接形式,提升速度和节省空间 + // 采用软链接形式,提升速度和节省空间 try { File messageDirFile = new File(messageDir); File snapshotDirFile = new File(snapshotDir); @@ -35,41 +41,55 @@ public void writeSnapshot(String snapshotDir) { FileUtils.deleteDirectory(snapshotDirFile); } if (messageDirFile.exists()) { - FileUtils.copyDirectory(messageDirFile, snapshotDirFile); + Path link = FileSystems.getDefault().getPath(snapshotDir); + Path target = FileSystems.getDefault().getPath(messageDir); + Files.createSymbolicLink(link, target); } } catch (IOException ex) { - LOG.warn("snapshot failed"); + LOG.warn("write snapshot failed, exception:", ex); } } @Override public void readSnapshot(String snapshotDir) { try { - File mqDirFile = new File(messageDir); - if (mqDirFile.exists()) { - FileUtils.deleteDirectory(mqDirFile); - } - File snapshotDirFile = new File(snapshotDir); - if (snapshotDirFile.exists()) { - FileUtils.copyDirectory(snapshotDirFile, mqDirFile); + isAvailable.compareAndSet(true, false); + Path link = FileSystems.getDefault().getPath(snapshotDir); + if (!Files.isSymbolicLink(link)) { + // 非符号链接,表示从leader节点同步拷贝的 + if (logManager != null) { + logManager.close(); + } + File messageDirFile = new File(messageDir); + if (messageDirFile.exists()) { + FileUtils.deleteDirectory(messageDirFile); + } + File snapshotDirFile = new File(snapshotDir); + if (snapshotDirFile.exists()) { + FileUtils.copyDirectory(snapshotDirFile, messageDirFile); + } } logManager = new LogManager(messageDir); } catch (IOException ex) { - LOG.error("readSnapshot error"); + LOG.error("readSnapshot exception:", ex); throw new RuntimeException(ex); + } finally { + isAvailable.compareAndSet(false, true); } } @Override public void apply(byte[] dataBytes) { try { - BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.parseFrom(dataBytes); - BrokerMessage.MessageContent.Builder message = BrokerMessage.MessageContent.newBuilder() - .setTopic(request.getTopic()) - .setQueue(request.getQueue()) - .setContent(request.getContent()); - SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue()); - segmentedLog.append(message); + if (isAvailable.get()) { + BrokerMessage.SendMessageRequest request = BrokerMessage.SendMessageRequest.parseFrom(dataBytes); + BrokerMessage.MessageContent.Builder message = BrokerMessage.MessageContent.newBuilder() + .setTopic(request.getTopic()) + .setQueue(request.getQueue()) + .setContent(request.getContent()); + SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue()); + segmentedLog.append(message); + } } catch (Exception ex) { LOG.warn("apply exception:", ex); } @@ -77,6 +97,13 @@ public void apply(byte[] dataBytes) { public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRequest request) { BrokerMessage.PullMessageResponse.Builder responseBuilder = BrokerMessage.PullMessageResponse.newBuilder(); + BrokerMessage.BaseResponse.Builder baseResBuilder = BrokerMessage.BaseResponse.newBuilder(); + if (!isAvailable.get()) { + baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_FAIL); + baseResBuilder.setResMsg("state machine is busy"); + responseBuilder.setBaseRes(baseResBuilder); + return responseBuilder.build(); + } SegmentedLog segmentedLog = logManager.getOrCreateQueueLog(request.getTopic(), request.getQueue()); int readCount = 0; long offset = request.getOffset(); @@ -89,6 +116,9 @@ public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRe offset = message.getOffset() + message.getSize(); readCount++; } + + baseResBuilder.setResCode(BrokerMessage.ResCode.RES_CODE_SUCCESS); + responseBuilder.setBaseRes(baseResBuilder); return responseBuilder.build(); } diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/LogManager.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/LogManager.java index 3c68844..88f392b 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/LogManager.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/LogManager.java @@ -6,9 +6,7 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.*; /** @@ -79,6 +77,18 @@ public SegmentedLog getOrCreateQueueLog(String topic, int queue) { return segmentedLog; } + public void close() { + Collection> queueLogs = topicLogMap.values(); + List allLogs = new ArrayList<>(); + for (ConcurrentMap queueLogMap : queueLogs) { + allLogs.addAll(queueLogMap.values()); + } + for (SegmentedLog log : allLogs) { + log.close(); + } + topicLogMap.clear(); + } + @Override public void run() { GlobalConf conf = GlobalConf.getInstance(); diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java index ae6d212..27346d9 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java @@ -96,7 +96,11 @@ public boolean append(BrokerMessage.MessageContent.Builder messageBuilder) { byteBuffer.putInt(messageBytes.length); byteBuffer.put(messageBytes); byteBuffer.flip(); - int writeSize = channel.write(byteBuffer); + channel.position(channel.size()); + int writeSize = 0; + while (byteBuffer.hasRemaining()) { + writeSize += channel.write(byteBuffer); + } channel.force(true); if (writeSize != totalSize) { LOG.warn("append message failed"); diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java index ae55f14..b82f001 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java @@ -106,6 +106,18 @@ public BrokerMessage.MessageContent read(long offset) { } } + public void close() { + lock.lock(); + try { + for (Segment segment : startOffsetSegmentMap.values()) { + segment.close(); + } + startOffsetSegmentMap.clear(); + } finally { + lock.unlock(); + } + } + private void readSegments() { List fileNames = RaftFileUtils.getSortedFilesInDirectory(segmentDir); for (String fileName: fileNames) { diff --git a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/Metadata.java b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/Metadata.java index 2ee953a..9fc8a92 100644 --- a/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/Metadata.java +++ b/distmq-client/src/main/java/com/github/wenweihu86/distmq/client/zk/Metadata.java @@ -93,7 +93,11 @@ public boolean checkTopicExist(String topic) { public void updateTopicMap(String topic, Map queueMap) { topicLock.lock(); try { - topicMap.put(topic, queueMap); + if (!topicMap.containsKey(topic)) { + topicMap.put(topic, queueMap); + } else { + topicMap.get(topic).putAll(queueMap); + } topicCondition.signalAll(); } finally { topicLock.unlock(); diff --git a/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ProducerMain.java b/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ProducerMain.java index d449dc9..561913d 100644 --- a/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ProducerMain.java +++ b/distmq-example/src/main/java/com/github/wenweihu86/distmq/example/ProducerMain.java @@ -3,6 +3,8 @@ import com.github.wenweihu86.distmq.client.producer.Producer; import com.github.wenweihu86.distmq.client.producer.ProducerConfig; +import java.util.UUID; + /** * Created by wenweihu86 on 2017/6/25. */ @@ -13,8 +15,8 @@ public static void main(String[] args) { config.setZKServers("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"); Producer producer = new Producer(config); String topic = "example-topic"; - for (int i = 0; i < 100; i++) { - String message = "hello-distmq-" + i; + while (true) { + String message = UUID.randomUUID().toString(); boolean success = producer.send(topic, message.getBytes()); if (success) { System.out.printf("send message success, topic=%s, message=%s\n",