From 2eada10b0440034b0a3b48cb630cfb4f8127344a Mon Sep 17 00:00:00 2001 From: wenweihu86 Date: Wed, 5 Jul 2017 18:29:52 +0800 Subject: [PATCH] fix bug:snapshot opened file leak --- .../com/github/wenweihu86/raft/RaftNode.java | 21 +++++++---- .../service/impl/RaftClientServiceImpl.java | 2 +- .../wenweihu86/raft/storage/SegmentedLog.java | 12 ++++--- .../wenweihu86/raft/storage/Snapshot.java | 35 +++++++++---------- .../wenweihu86/raft/storage/SnapshotTest.java | 2 +- 5 files changed, 41 insertions(+), 31 deletions(-) diff --git a/raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java b/raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java index 6edbd5f..519c490 100644 --- a/raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java +++ b/raft-java-core/src/main/java/com/github/wenweihu86/raft/RaftNode.java @@ -342,7 +342,7 @@ public void run() { // in lock, 开始心跳,对leader有效 private void startNewHeartbeat() { - LOG.debug("start new heartbeat"); + LOG.debug("start new heartbeat, peers={}", peerMap.keySet()); for (final Peer peer : peerMap.values()) { executorService.submit(new Runnable() { @Override @@ -370,6 +370,7 @@ public void appendEntries(Peer peer) { lock.unlock(); } + LOG.debug("is need snapshot={}, peer={}", isNeedInstallSnapshot, peer.getServer().getServerId()); if (isNeedInstallSnapshot) { if (!installSnapshot(peer)) { return; @@ -507,14 +508,19 @@ private long packEntries(long nextIndex, RaftMessage.AppendEntriesRequest.Builde } private boolean installSnapshot(Peer peer) { - if (!snapshot.getIsTakeSnapshot().get()) { + if (snapshot.getIsTakeSnapshot().get()) { LOG.info("already in take snapshot, please send install snapshot request later"); return false; } + if (!snapshot.getIsInstallSnapshot().compareAndSet(false, true)) { + LOG.info("already in install snapshot"); + return false; + } LOG.info("begin send install snapshot request to server={}", peer.getServer().getServerId()); - snapshot.getIsInstallSnapshot().set(true); boolean isSuccess = true; + TreeMap snapshotDataFileMap = snapshot.openSnapshotDataFiles(); + LOG.info("total snapshot files={}", snapshotDataFileMap.keySet()); try { boolean isLastRequest = false; String lastFileName = null; @@ -522,7 +528,7 @@ private boolean installSnapshot(Peer peer) { long lastLength = 0; while (!isLastRequest) { RaftMessage.InstallSnapshotRequest request - = buildInstallSnapshotRequest(lastFileName, lastOffset, lastLength); + = buildInstallSnapshotRequest(snapshotDataFileMap, lastFileName, lastOffset, lastLength); if (request == null) { LOG.warn("snapshot request == null"); isSuccess = false; @@ -562,7 +568,8 @@ private boolean installSnapshot(Peer peer) { } } } finally { - snapshot.getIsInstallSnapshot().set(false); + snapshot.closeSnapshotDataFiles(snapshotDataFileMap); + snapshot.getIsInstallSnapshot().compareAndSet(true, false); } LOG.info("end send install snapshot request to server={}, success={}", peer.getServer().getServerId(), isSuccess); @@ -570,12 +577,12 @@ private boolean installSnapshot(Peer peer) { } private RaftMessage.InstallSnapshotRequest buildInstallSnapshotRequest( + TreeMap snapshotDataFileMap, String lastFileName, long lastOffset, long lastLength) { RaftMessage.InstallSnapshotRequest.Builder requestBuilder = RaftMessage.InstallSnapshotRequest.newBuilder(); snapshot.getLock().lock(); try { - TreeMap snapshotDataFileMap = snapshot.getSnapshotDataFileMap(); if (lastFileName == null) { lastFileName = snapshotDataFileMap.firstKey(); lastOffset = 0; @@ -845,7 +852,7 @@ public NodeState getState() { return state; } - public Map getPeerMap() { + public ConcurrentMap getPeerMap() { return peerMap; } diff --git a/raft-java-core/src/main/java/com/github/wenweihu86/raft/service/impl/RaftClientServiceImpl.java b/raft-java-core/src/main/java/com/github/wenweihu86/raft/service/impl/RaftClientServiceImpl.java index 8c5dba4..c27efdd 100644 --- a/raft-java-core/src/main/java/com/github/wenweihu86/raft/service/impl/RaftClientServiceImpl.java +++ b/raft-java-core/src/main/java/com/github/wenweihu86/raft/service/impl/RaftClientServiceImpl.java @@ -110,7 +110,7 @@ public RaftMessage.AddPeersResponse addPeers(RaftMessage.AddPeersRequest request final Peer peer = new Peer(server); peer.setNextIndex(1); requestPeers.add(peer); - raftNode.getPeerMap().put(server.getServerId(), peer); + raftNode.getPeerMap().putIfAbsent(server.getServerId(), peer); raftNode.getExecutorService().submit(new Runnable() { @Override public void run() { diff --git a/raft-java-core/src/main/java/com/github/wenweihu86/raft/storage/SegmentedLog.java b/raft-java-core/src/main/java/com/github/wenweihu86/raft/storage/SegmentedLog.java index 76029a4..3cd76bf 100644 --- a/raft-java-core/src/main/java/com/github/wenweihu86/raft/storage/SegmentedLog.java +++ b/raft-java-core/src/main/java/com/github/wenweihu86/raft/storage/SegmentedLog.java @@ -109,10 +109,9 @@ public long append(List entries) { segment.getStartIndex(), segment.getEndIndex()); String newFullFileName = logDataDir + File.separator + newFileName; File newFile = new File(newFullFileName); - newFile.createNewFile(); String oldFullFileName = logDataDir + File.separator + segment.getFileName(); File oldFile = new File(oldFullFileName); - oldFile.renameTo(newFile); + FileUtils.moveFile(oldFile, newFile); segment.setFileName(newFileName); segment.setRandomAccessFile(RaftFileUtils.openFile(logDataDir, newFileName, "r")); } @@ -152,7 +151,7 @@ public long append(List entries) { } totalSize += entrySize; } catch (IOException ex) { - throw new RuntimeException("meet exception, msg=" + ex.getMessage()); + throw new RuntimeException("append raft log exception, msg=" + ex.getMessage()); } } return newLastLogIndex; @@ -165,6 +164,9 @@ public void truncatePrefix(long newFirstIndex) { long oldFirstIndex = getFirstLogIndex(); while (!startLogIndexSegmentMap.isEmpty()) { Segment segment = startLogIndexSegmentMap.firstEntry().getValue(); + if (segment.isCanWrite()) { + break; + } if (newFirstIndex > segment.getEndIndex()) { File oldFile = new File(logDataDir + File.separator + segment.getFileName()); try { @@ -183,7 +185,7 @@ public void truncatePrefix(long newFirstIndex) { if (startLogIndexSegmentMap.size() == 0) { newActualFirstIndex = newFirstIndex; } else { - newActualFirstIndex = getFirstLogIndex(); + newActualFirstIndex = startLogIndexSegmentMap.firstKey(); } updateMetaData(null, null, newActualFirstIndex); LOG.info("Truncating log from old first index {} to new first index {}", @@ -327,6 +329,8 @@ public void updateMetaData(Long currentTerm, Integer votedFor, Long firstLogInde File file = new File(fileName); try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) { RaftFileUtils.writeProtoToFile(randomAccessFile, metaData); + LOG.info("new segment meta info, currentTerm={}, votedFor={}, firstLogIndex={}", + metaData.getCurrentTerm(), metaData.getVotedFor(), metaData.getFirstLogIndex()); } catch (IOException ex) { LOG.warn("meta file not exist, name={}", fileName); } diff --git a/raft-java-core/src/main/java/com/github/wenweihu86/raft/storage/Snapshot.java b/raft-java-core/src/main/java/com/github/wenweihu86/raft/storage/Snapshot.java index d194bab..9765a68 100644 --- a/raft-java-core/src/main/java/com/github/wenweihu86/raft/storage/Snapshot.java +++ b/raft-java-core/src/main/java/com/github/wenweihu86/raft/storage/Snapshot.java @@ -14,6 +14,7 @@ import java.nio.file.FileSystems; import java.nio.file.Path; import java.util.List; +import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; @@ -32,7 +33,6 @@ public class SnapshotDataFile { private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class); private String snapshotDir = RaftOptions.dataDir + File.separator + "snapshot"; private RaftMessage.SnapshotMetaData metaData; - private TreeMap snapshotDataFileMap; // 表示是否正在安装snapshot,leader向follower安装,leader和follower同时处于installSnapshot状态 private AtomicBoolean isInstallSnapshot = new AtomicBoolean(false); // 表示节点自己是否在对状态机做snapshot @@ -48,25 +48,18 @@ public Snapshot() { } public void reload() { - if (snapshotDataFileMap != null) { - for (SnapshotDataFile file : snapshotDataFileMap.values()) { - RaftFileUtils.closeFile(file.randomAccessFile); - } - } - this.snapshotDataFileMap = readSnapshotDataFiles(); metaData = this.readMetaData(); if (metaData == null) { - if (snapshotDataFileMap.size() > 0) { - LOG.error("No readable metadata file but found snapshot in {}", snapshotDir); - throw new RuntimeException("No readable metadata file but found snapshot"); - } metaData = RaftMessage.SnapshotMetaData.newBuilder().build(); - snapshotDataFileMap = new TreeMap<>(); } } - // 如果是软链接,需要打开实际文件句柄 - public TreeMap readSnapshotDataFiles() { + /** + * 打开snapshot data目录下的文件, + * 如果是软链接,需要打开实际文件句柄 + * @return 文件名以及文件句柄map + */ + public TreeMap openSnapshotDataFiles() { TreeMap snapshotDataFileMap = new TreeMap<>(); String snapshotDataDir = snapshotDir + File.separator + "data"; try { @@ -88,6 +81,16 @@ public TreeMap readSnapshotDataFiles() { return snapshotDataFileMap; } + public void closeSnapshotDataFiles(TreeMap snapshotDataFileMap) { + for (Map.Entry entry : snapshotDataFileMap.entrySet()) { + try { + entry.getValue().randomAccessFile.close(); + } catch (IOException ex) { + LOG.warn("close snapshot files exception:", ex); + } + } + } + public RaftMessage.SnapshotMetaData readMetaData() { String fileName = snapshotDir + File.separator + "metadata"; File file = new File(fileName); @@ -147,10 +150,6 @@ public AtomicBoolean getIsTakeSnapshot() { return isTakeSnapshot; } - public TreeMap getSnapshotDataFileMap() { - return snapshotDataFileMap; - } - public Lock getLock() { return lock; } diff --git a/raft-java-core/src/test/java/com/github/wenweihu86/raft/storage/SnapshotTest.java b/raft-java-core/src/test/java/com/github/wenweihu86/raft/storage/SnapshotTest.java index cc5ab16..dd1c2ce 100644 --- a/raft-java-core/src/test/java/com/github/wenweihu86/raft/storage/SnapshotTest.java +++ b/raft-java-core/src/test/java/com/github/wenweihu86/raft/storage/SnapshotTest.java @@ -34,7 +34,7 @@ public void testReadSnapshotDataFiles() throws IOException { Files.createSymbolicLink(link, target); Snapshot snapshot = new Snapshot(); - TreeMap snapshotFileMap = snapshot.readSnapshotDataFiles(); + TreeMap snapshotFileMap = snapshot.openSnapshotDataFiles(); System.out.println(snapshotFileMap.keySet()); Assert.assertTrue(snapshotFileMap.size() == 2); Assert.assertTrue(snapshotFileMap.firstKey().equals("queue1.txt"));