Skip to content

Commit

Permalink
add symbolic link for snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jul 3, 2017
1 parent e25be4d commit 455f2dd
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 47 deletions.
8 changes: 7 additions & 1 deletion distmq-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
<dependency>
<groupId>com.github.wenweihu86.raft</groupId>
<artifactId>raft-java-core</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Expand All @@ -129,6 +129,12 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.8.47</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static void main(String[] args) {
RaftOptions.maxSegmentFileSize = 1024 * 1024;
// 初始化RaftNode
RaftNode raftNode = new RaftNode(servers, localServer, stateMachine);
stateMachine.setRaftNode(raftNode);
// 注册Raft节点之间相互调用的服务
RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode);
server.registerService(raftConsensusService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.github.wenweihu86.distmq.broker.log.LogManager;
import com.github.wenweihu86.distmq.broker.log.SegmentedLog;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.StateMachine;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
Expand All @@ -25,6 +26,7 @@ public class BrokerStateMachine implements StateMachine {
private LogManager logManager;
// 状态机数据是否可用,当在read snapshot时,状态机数据不可用,主要发生在每次install snapshot时。
private AtomicBoolean isAvailable = new AtomicBoolean(true);
private RaftNode raftNode;

public BrokerStateMachine() {
String dataDir = GlobalConf.getInstance().getDataDir();
Expand All @@ -42,7 +44,7 @@ public void writeSnapshot(String snapshotDir) {
}
if (messageDirFile.exists()) {
Path link = FileSystems.getDefault().getPath(snapshotDir);
Path target = FileSystems.getDefault().getPath(messageDir);
Path target = FileSystems.getDefault().getPath(messageDir).toRealPath();
Files.createSymbolicLink(link, target);
}
} catch (IOException ex) {
Expand All @@ -69,7 +71,7 @@ public void readSnapshot(String snapshotDir) {
FileUtils.copyDirectory(snapshotDirFile, messageDirFile);
}
}
logManager = new LogManager(messageDir);
logManager = new LogManager(messageDir, this);
} catch (IOException ex) {
LOG.error("readSnapshot exception:", ex);
throw new RuntimeException(ex);
Expand Down Expand Up @@ -122,4 +124,11 @@ public BrokerMessage.PullMessageResponse pullMessage(BrokerMessage.PullMessageRe
return responseBuilder.build();
}

public RaftNode getRaftNode() {
return raftNode;
}

public void setRaftNode(RaftNode raftNode) {
this.raftNode = raftNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class GlobalConf {
RaftMessage.Server localServer; // 本机节点
List<RaftMessage.Server> servers; // 集群所有节点
private String dataDir; // 数据目录
private int defaultQueueNumPerTopic; // 每个topic的默认queue个数
private int maxSegmentSize; // 单个segment文件最大大小
private int expiredLogCheckInterval; // log检查时间间隔
private int expiredLogDuration; // log过期时长
Expand All @@ -37,7 +36,6 @@ public GlobalConf() {
localServer = readLocalServer();
servers = readServers();
dataDir = toml.getString("data_dir");
defaultQueueNumPerTopic = toml.getLong("default_queue_num_per_topic").intValue();
maxSegmentSize = toml.getLong("max_segment_size").intValue();
shardingId = toml.getLong("sharding_id").intValue();
expiredLogCheckInterval = toml.getLong("expired_log_check_interval").intValue();
Expand Down Expand Up @@ -107,10 +105,6 @@ public String getDataDir() {
return dataDir;
}

public int getDefaultQueueNumPerTopic() {
return defaultQueueNumPerTopic;
}

public int getMaxSegmentSize() {
return maxSegmentSize;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.wenweihu86.distmq.broker.log;

import com.github.wenweihu86.distmq.broker.BrokerStateMachine;
import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import org.slf4j.Logger;
Expand All @@ -18,8 +19,10 @@ public class LogManager implements Runnable {
// topic -> (queueId -> segment log)
private ConcurrentMap<String, ConcurrentMap<Integer, SegmentedLog>> topicLogMap;
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);
private BrokerStateMachine stateMachine;

public LogManager(String logDir) {
public LogManager(String logDir, BrokerStateMachine stateMachine) {
this.stateMachine = stateMachine;
this.topicLogMap = new ConcurrentHashMap<>();
this.logDir = logDir;
File dirFile = new File(logDir);
Expand Down Expand Up @@ -54,7 +57,7 @@ public LogManager(String logDir) {
}
}

timer.scheduleAtFixedRate(this,
timer.scheduleWithFixedDelay(this,
GlobalConf.getInstance().getExpiredLogCheckInterval(),
GlobalConf.getInstance().getExpiredLogCheckInterval(),
TimeUnit.SECONDS);
Expand Down Expand Up @@ -89,48 +92,62 @@ public void close() {
topicLogMap.clear();
}

// 清理过期消息期间,禁止进行snapshot;
// 清理完过期消息后,需要重新执行take snapshot
@Override
public void run() {
GlobalConf conf = GlobalConf.getInstance();
Set<String> topicSet = topicLogMap.keySet();
for (String topic : topicSet) {
ConcurrentMap<Integer, SegmentedLog> queueLogMap = topicLogMap.get(topic);
if (queueLogMap != null) {
Set<Integer> queueSet = queueLogMap.keySet();
for (Integer queue : queueSet) {
try {
SegmentedLog log = topicLogMap.get(topic).get(queue);
log.getLock().lock();
if (!stateMachine.getRaftNode().getSnapshot().getIsInSnapshot().compareAndSet(false, true)) {
LOG.info("state machine is busy");
return;
}
LOG.info("start to clear expired messages");
try {
GlobalConf conf = GlobalConf.getInstance();
Set<String> topicSet = topicLogMap.keySet();
for (String topic : topicSet) {
ConcurrentMap<Integer, SegmentedLog> queueLogMap = topicLogMap.get(topic);
if (queueLogMap != null) {
Set<Integer> queueSet = queueLogMap.keySet();
for (Integer queue : queueSet) {
try {
Segment lastSegment = null;
Iterator<Map.Entry<Long, Segment>> iterator
= log.getStartOffsetSegmentMap().entrySet().iterator();
while (iterator.hasNext()){
Segment segment = iterator.next().getValue();
if (lastSegment == null) {
SegmentedLog log = topicLogMap.get(topic).get(queue);
log.getLock().lock();
try {
Segment lastSegment = null;
Iterator<Map.Entry<Long, Segment>> iterator
= log.getStartOffsetSegmentMap().entrySet().iterator();
while (iterator.hasNext()) {
Segment segment = iterator.next().getValue();
if (lastSegment == null) {
lastSegment = segment;
continue;
}
BrokerMessage.MessageContent message = segment.read(segment.getStartOffset());
if (System.currentTimeMillis() / 1000
- message.getCreateTime() / 1000
> conf.getExpiredLogDuration()) {
lastSegment.delete();
iterator.remove();
} else {
break;
}
lastSegment = segment;
continue;
}
BrokerMessage.MessageContent message = segment.read(segment.getStartOffset());
if (System.currentTimeMillis() / 1000
- message.getCreateTime() / 1000
> conf.getExpiredLogDuration()) {
lastSegment.delete();
iterator.remove();
} else {
break;
}
lastSegment = segment;
} finally {
log.getLock().unlock();
}
} finally {
log.getLock().unlock();
} catch (Exception ex) {
LOG.warn("clear expired log error");
}
} catch (Exception ex) {
LOG.warn("clear expired log error");
}
}
}
} finally {
stateMachine.getRaftNode().getSnapshot().getIsInSnapshot().compareAndSet(true, false);
}
LOG.info("end to clear expired messages");

stateMachine.getRaftNode().takeSnapshot();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public void delete() {
String fullFileName = dirName + File.separator + fileName;
File file = new File(fullFileName);
file.delete();
try {
FileUtils.forceDelete(file);
} catch (IOException ex) {
LOG.warn("delete file exception:", ex);
}
}

public boolean append(BrokerMessage.MessageContent.Builder messageBuilder) {
Expand Down
7 changes: 3 additions & 4 deletions distmq-broker/src/main/resources/broker.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
data_dir = "./data"
default_queue_num_per_topic = 8
max_segment_size = 100000000 # 100m
expired_log_check_interval = 86400 # 24h
expired_log_duration = 604800 # 7 day
max_segment_size = 1000000 # 1m, just for debug
expired_log_check_interval = 30 # 30s, just for debug
expired_log_duration = 60 # 60s, just for test
# 该server属于哪个分片集群,每个分片是leader/followers的raft集群
sharding_id = 1

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package com.github.wenweihu86.distmq.broker.log;

import com.github.wenweihu86.distmq.broker.BrokerStateMachine;
import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.storage.Snapshot;
import com.google.protobuf.ByteString;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Created by wenweihu86 on 2017/6/26.
Expand All @@ -26,10 +31,19 @@ private BrokerMessage.MessageContent.Builder createMessage(String topic, Integer

@Test
public void testClearExpiredLog() {
// mock
Snapshot snapshot = Mockito.mock(Snapshot.class);
Mockito.when(snapshot.getIsInSnapshot()).thenReturn(new AtomicBoolean(false));
RaftNode raftNode = Mockito.mock(RaftNode.class);
Mockito.when(raftNode.getSnapshot()).thenReturn(snapshot);
Mockito.doNothing().when(raftNode).takeSnapshot();
BrokerStateMachine stateMachine = new BrokerStateMachine();
stateMachine.setRaftNode(raftNode);

GlobalConf conf = GlobalConf.getInstance();
conf.setMaxSegmentSize(128);
conf.setExpiredLogDuration(1);
LogManager logManager = new LogManager(conf.getDataDir());
LogManager logManager = new LogManager(conf.getDataDir(), stateMachine);
String topic = "test-topic";
Integer queue = 0;
SegmentedLog log = logManager.getOrCreateQueueLog(topic, queue);
Expand Down

0 comments on commit 455f2dd

Please sign in to comment.