Skip to content

Commit

Permalink
remove expired log timer
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 26, 2017
1 parent 2281c76 commit f0ec50a
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 13 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ Distributed Message Queue based on Raft
# 开发中,会定期更新。
# 欢迎感兴趣者一起加入开发。
# TODOS:
* broker删除过期消息
* consumer支持rebalance消费
* raft snapshot使用硬链接
* 支持多consumerGroup和多topic
Expand Down
10 changes: 10 additions & 0 deletions distmq-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<additionalClasspathElements>
<additionalClasspathElement>${basedir}/target/conf</additionalClasspathElement>
</additionalClasspathElements>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class GlobalConf {
private String dataDir; // 数据目录
private int defaultQueueNumPerTopic; // 每个topic的默认queue个数
private int maxSegmentSize; // 单个segment文件最大大小
private int expiredLogCheckInterval; // log检查时间间隔
private int expiredLogDuration; // log过期时长
// 该server属于哪个分片集群,每个分片是leader/followers的raft集群
private int shardingId;
private ZKConf zkConf;
Expand All @@ -38,6 +40,8 @@ public GlobalConf() {
defaultQueueNumPerTopic = toml.getLong("default_queue_num_per_topic").intValue();
maxSegmentSize = toml.getLong("max_segment_size").intValue();
shardingId = toml.getLong("sharding_id").intValue();
expiredLogCheckInterval = toml.getLong("expired_log_check_interval").intValue();
expiredLogDuration = toml.getLong("expired_log_duration").intValue();
zkConf = readZKConf();
}

Expand Down Expand Up @@ -115,6 +119,22 @@ public int getShardingId() {
return shardingId;
}

public int getExpiredLogCheckInterval() {
return expiredLogCheckInterval;
}

public int getExpiredLogDuration() {
return expiredLogDuration;
}

public void setMaxSegmentSize(int maxSegmentSize) {
this.maxSegmentSize = maxSegmentSize;
}

public void setExpiredLogDuration(int expiredLogDuration) {
this.expiredLogDuration = expiredLogDuration;
}

public ZKConf getZkConf() {
return zkConf;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
package com.github.wenweihu86.distmq.broker.log;

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.Set;
import java.util.concurrent.*;

/**
* Created by wenweihu86 on 2017/6/20.
*/
public class LogManager {
public class LogManager implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(LogManager.class);
private String logDir;
// topic -> (queueId -> segment log)
private ConcurrentMap<String, ConcurrentMap<Integer, SegmentedLog>> topicLogMap;
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);

public LogManager(String logDir) {
this.topicLogMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -52,6 +55,11 @@ public LogManager(String logDir) {
}
}
}

timer.scheduleAtFixedRate(this,
GlobalConf.getInstance().getExpiredLogCheckInterval(),
GlobalConf.getInstance().getExpiredLogCheckInterval(),
TimeUnit.SECONDS);
}

public SegmentedLog getOrCreateQueueLog(String topic, int queue) {
Expand All @@ -71,4 +79,48 @@ public SegmentedLog getOrCreateQueueLog(String topic, int queue) {
return segmentedLog;
}

@Override
public void run() {
GlobalConf conf = GlobalConf.getInstance();
Set<String> topicSet = topicLogMap.keySet();
for (String topic : topicSet) {
ConcurrentMap<Integer, SegmentedLog> queueLogMap = topicLogMap.get(topic);
if (queueLogMap != null) {
Set<Integer> queueSet = queueLogMap.keySet();
for (Integer queue : queueSet) {
try {
SegmentedLog log = topicLogMap.get(topic).get(queue);
log.getLock().lock();
try {
Segment lastSegment = null;
Iterator<Map.Entry<Long, Segment>> iterator
= log.getStartOffsetSegmentMap().entrySet().iterator();
while (iterator.hasNext()){
Segment segment = iterator.next().getValue();
if (lastSegment == null) {
lastSegment = segment;
continue;
}
BrokerMessage.MessageContent message = segment.read(segment.getStartOffset());
if (System.currentTimeMillis() / 1000
- message.getCreateTime() / 1000
> conf.getExpiredLogDuration()) {
lastSegment.delete();
iterator.remove();
} else {
break;
}
lastSegment = segment;
}
} finally {
log.getLock().unlock();
}
} catch (Exception ex) {
LOG.warn("clear expired log error");
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import com.github.wenweihu86.distmq.broker.BrokerUtils;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.raft.util.RaftFileUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
Expand All @@ -27,7 +29,6 @@ public class Segment {
private long fileSize;
private RandomAccessFile randomAccessFile;
private FileChannel channel;
private long lastModifiedTime; // 文件最后修改时间

public Segment(String dirName, String fileName) {
this.dirName = dirName;
Expand Down Expand Up @@ -72,6 +73,13 @@ public void close() {
}
}

public void delete() {
close();
String fullFileName = dirName + File.separator + fileName;
File file = new File(fullFileName);
file.delete();
}

public boolean append(BrokerMessage.MessageContent.Builder messageBuilder) {
try {
if (fileSize == 0) {
Expand Down Expand Up @@ -210,11 +218,4 @@ public void setChannel(FileChannel channel) {
this.channel = channel;
}

public long getLastModifiedTime() {
return lastModifiedTime;
}

public void setLastModifiedTime(long lastModifiedTime) {
this.lastModifiedTime = lastModifiedTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,16 @@ private long getLastEndOffset() {
return lastSegment.getEndOffset();
}

public String getSegmentDir() {
return segmentDir;
}

public TreeMap<Long, Segment> getStartOffsetSegmentMap() {
return startOffsetSegmentMap;
}

public Lock getLock() {
return lock;
}

}
2 changes: 2 additions & 0 deletions distmq-broker/src/main/resources/broker.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
data_dir = "./data"
default_queue_num_per_topic = 8
max_segment_size = 100000000 # 100m
expired_log_check_interval = 86400 # 24h
expired_log_duration = 604800 # 7 day
# 该server属于哪个分片集群,每个分片是leader/followers的raft集群
sharding_id = 1

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.github.wenweihu86.distmq.broker.log;

import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.google.protobuf.ByteString;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.UUID;

/**
* Created by wenweihu86 on 2017/6/26.
*/
public class TestLogManager {

private BrokerMessage.MessageContent.Builder createMessage(String topic, Integer queue) {
BrokerMessage.MessageContent.Builder message = BrokerMessage.MessageContent.newBuilder()
.setContent(ByteString.copyFrom(UUID.randomUUID().toString().getBytes()))
.setTopic(topic)
.setQueue(queue);
return message;
}

@Test
public void testClearExpiredLog() {
GlobalConf conf = GlobalConf.getInstance();
conf.setMaxSegmentSize(128);
conf.setExpiredLogDuration(1);
LogManager logManager = new LogManager(conf.getDataDir());
String topic = "test-topic";
Integer queue = 0;
SegmentedLog log = logManager.getOrCreateQueueLog(topic, queue);
for (int i = 0; i < 1000; i++) {
log.append(createMessage(topic, queue));
}

try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}

System.out.println("before clear log count=" + log.getStartOffsetSegmentMap().size());
logManager.run();
System.out.println("after clear log count=" + log.getStartOffsetSegmentMap().size());
Assert.assertTrue(log.getStartOffsetSegmentMap().size() == 1);
File file = new File(conf.getDataDir());
try {
FileUtils.deleteDirectory(file);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

0 comments on commit f0ec50a

Please sign in to comment.