From f0ec50a74d23eff0d6df1eb67ce31b2260f39601 Mon Sep 17 00:00:00 2001 From: wenweihu86 Date: Mon, 26 Jun 2017 22:56:08 +0800 Subject: [PATCH] remove expired log timer --- README.md | 1 - distmq-broker/pom.xml | 10 ++++ .../distmq/broker/config/GlobalConf.java | 20 +++++++ .../distmq/broker/log/LogManager.java | 60 +++++++++++++++++-- .../wenweihu86/distmq/broker/log/Segment.java | 17 +++--- .../distmq/broker/log/SegmentedLog.java | 12 ++++ distmq-broker/src/main/resources/broker.toml | 2 + .../distmq/broker/log/TestLogManager.java | 57 ++++++++++++++++++ 8 files changed, 166 insertions(+), 13 deletions(-) create mode 100644 distmq-broker/src/test/java/com/github/wenweihu86/distmq/broker/log/TestLogManager.java diff --git a/README.md b/README.md index 082e2fd..2aea512 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,6 @@ Distributed Message Queue based on Raft # 开发中,会定期更新。 # 欢迎感兴趣者一起加入开发。 # TODOS: -* broker删除过期消息 * consumer支持rebalance消费 * raft snapshot使用硬链接 * 支持多consumerGroup和多topic diff --git a/distmq-broker/pom.xml b/distmq-broker/pom.xml index 25dcaff..81f124f 100644 --- a/distmq-broker/pom.xml +++ b/distmq-broker/pom.xml @@ -89,6 +89,16 @@ + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + + ${basedir}/target/conf + + + diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/config/GlobalConf.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/config/GlobalConf.java index c7d61a9..bd875f2 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/config/GlobalConf.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/config/GlobalConf.java @@ -24,6 +24,8 @@ public class GlobalConf { private String dataDir; // 数据目录 private int defaultQueueNumPerTopic; // 每个topic的默认queue个数 private int maxSegmentSize; // 单个segment文件最大大小 + private int expiredLogCheckInterval; // log检查时间间隔 + private int expiredLogDuration; // log过期时长 // 该server属于哪个分片集群,每个分片是leader/followers的raft集群 private int shardingId; private ZKConf zkConf; @@ -38,6 +40,8 @@ public GlobalConf() { defaultQueueNumPerTopic = toml.getLong("default_queue_num_per_topic").intValue(); maxSegmentSize = toml.getLong("max_segment_size").intValue(); shardingId = toml.getLong("sharding_id").intValue(); + expiredLogCheckInterval = toml.getLong("expired_log_check_interval").intValue(); + expiredLogDuration = toml.getLong("expired_log_duration").intValue(); zkConf = readZKConf(); } @@ -115,6 +119,22 @@ public int getShardingId() { return shardingId; } + public int getExpiredLogCheckInterval() { + return expiredLogCheckInterval; + } + + public int getExpiredLogDuration() { + return expiredLogDuration; + } + + public void setMaxSegmentSize(int maxSegmentSize) { + this.maxSegmentSize = maxSegmentSize; + } + + public void setExpiredLogDuration(int expiredLogDuration) { + this.expiredLogDuration = expiredLogDuration; + } + public ZKConf getZkConf() { return zkConf; } diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/LogManager.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/LogManager.java index 59e8237..3c68844 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/LogManager.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/LogManager.java @@ -1,22 +1,25 @@ package com.github.wenweihu86.distmq.broker.log; +import com.github.wenweihu86.distmq.broker.config.GlobalConf; +import com.github.wenweihu86.distmq.client.api.BrokerMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.util.HashMap; +import java.util.Iterator; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.Set; +import java.util.concurrent.*; /** * Created by wenweihu86 on 2017/6/20. */ -public class LogManager { +public class LogManager implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(LogManager.class); private String logDir; // topic -> (queueId -> segment log) private ConcurrentMap> topicLogMap; + private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1); public LogManager(String logDir) { this.topicLogMap = new ConcurrentHashMap<>(); @@ -52,6 +55,11 @@ public LogManager(String logDir) { } } } + + timer.scheduleAtFixedRate(this, + GlobalConf.getInstance().getExpiredLogCheckInterval(), + GlobalConf.getInstance().getExpiredLogCheckInterval(), + TimeUnit.SECONDS); } public SegmentedLog getOrCreateQueueLog(String topic, int queue) { @@ -71,4 +79,48 @@ public SegmentedLog getOrCreateQueueLog(String topic, int queue) { return segmentedLog; } + @Override + public void run() { + GlobalConf conf = GlobalConf.getInstance(); + Set topicSet = topicLogMap.keySet(); + for (String topic : topicSet) { + ConcurrentMap queueLogMap = topicLogMap.get(topic); + if (queueLogMap != null) { + Set queueSet = queueLogMap.keySet(); + for (Integer queue : queueSet) { + try { + SegmentedLog log = topicLogMap.get(topic).get(queue); + log.getLock().lock(); + try { + Segment lastSegment = null; + Iterator> iterator + = log.getStartOffsetSegmentMap().entrySet().iterator(); + while (iterator.hasNext()){ + Segment segment = iterator.next().getValue(); + if (lastSegment == null) { + lastSegment = segment; + continue; + } + BrokerMessage.MessageContent message = segment.read(segment.getStartOffset()); + if (System.currentTimeMillis() / 1000 + - message.getCreateTime() / 1000 + > conf.getExpiredLogDuration()) { + lastSegment.delete(); + iterator.remove(); + } else { + break; + } + lastSegment = segment; + } + } finally { + log.getLock().unlock(); + } + } catch (Exception ex) { + LOG.warn("clear expired log error"); + } + } + } + } + } + } diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java index 6abeed2..ae6d212 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/Segment.java @@ -3,9 +3,11 @@ import com.github.wenweihu86.distmq.broker.BrokerUtils; import com.github.wenweihu86.distmq.client.api.BrokerMessage; import com.github.wenweihu86.raft.util.RaftFileUtils; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; @@ -27,7 +29,6 @@ public class Segment { private long fileSize; private RandomAccessFile randomAccessFile; private FileChannel channel; - private long lastModifiedTime; // 文件最后修改时间 public Segment(String dirName, String fileName) { this.dirName = dirName; @@ -72,6 +73,13 @@ public void close() { } } + public void delete() { + close(); + String fullFileName = dirName + File.separator + fileName; + File file = new File(fullFileName); + file.delete(); + } + public boolean append(BrokerMessage.MessageContent.Builder messageBuilder) { try { if (fileSize == 0) { @@ -210,11 +218,4 @@ public void setChannel(FileChannel channel) { this.channel = channel; } - public long getLastModifiedTime() { - return lastModifiedTime; - } - - public void setLastModifiedTime(long lastModifiedTime) { - this.lastModifiedTime = lastModifiedTime; - } } diff --git a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java index 2f201cf..ae55f14 100644 --- a/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java +++ b/distmq-broker/src/main/java/com/github/wenweihu86/distmq/broker/log/SegmentedLog.java @@ -132,4 +132,16 @@ private long getLastEndOffset() { return lastSegment.getEndOffset(); } + public String getSegmentDir() { + return segmentDir; + } + + public TreeMap getStartOffsetSegmentMap() { + return startOffsetSegmentMap; + } + + public Lock getLock() { + return lock; + } + } diff --git a/distmq-broker/src/main/resources/broker.toml b/distmq-broker/src/main/resources/broker.toml index e98e623..279d669 100644 --- a/distmq-broker/src/main/resources/broker.toml +++ b/distmq-broker/src/main/resources/broker.toml @@ -1,6 +1,8 @@ data_dir = "./data" default_queue_num_per_topic = 8 max_segment_size = 100000000 # 100m +expired_log_check_interval = 86400 # 24h +expired_log_duration = 604800 # 7 day # 该server属于哪个分片集群,每个分片是leader/followers的raft集群 sharding_id = 1 diff --git a/distmq-broker/src/test/java/com/github/wenweihu86/distmq/broker/log/TestLogManager.java b/distmq-broker/src/test/java/com/github/wenweihu86/distmq/broker/log/TestLogManager.java new file mode 100644 index 0000000..fe9b10c --- /dev/null +++ b/distmq-broker/src/test/java/com/github/wenweihu86/distmq/broker/log/TestLogManager.java @@ -0,0 +1,57 @@ +package com.github.wenweihu86.distmq.broker.log; + +import com.github.wenweihu86.distmq.broker.config.GlobalConf; +import com.github.wenweihu86.distmq.client.api.BrokerMessage; +import com.google.protobuf.ByteString; +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +/** + * Created by wenweihu86 on 2017/6/26. + */ +public class TestLogManager { + + private BrokerMessage.MessageContent.Builder createMessage(String topic, Integer queue) { + BrokerMessage.MessageContent.Builder message = BrokerMessage.MessageContent.newBuilder() + .setContent(ByteString.copyFrom(UUID.randomUUID().toString().getBytes())) + .setTopic(topic) + .setQueue(queue); + return message; + } + + @Test + public void testClearExpiredLog() { + GlobalConf conf = GlobalConf.getInstance(); + conf.setMaxSegmentSize(128); + conf.setExpiredLogDuration(1); + LogManager logManager = new LogManager(conf.getDataDir()); + String topic = "test-topic"; + Integer queue = 0; + SegmentedLog log = logManager.getOrCreateQueueLog(topic, queue); + for (int i = 0; i < 1000; i++) { + log.append(createMessage(topic, queue)); + } + + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + + System.out.println("before clear log count=" + log.getStartOffsetSegmentMap().size()); + logManager.run(); + System.out.println("after clear log count=" + log.getStartOffsetSegmentMap().size()); + Assert.assertTrue(log.getStartOffsetSegmentMap().size() == 1); + File file = new File(conf.getDataDir()); + try { + FileUtils.deleteDirectory(file); + } catch (IOException ex) { + ex.printStackTrace(); + } + } +}