Skip to content

Commit

Permalink
support subdir for snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jul 3, 2017
1 parent 455f2dd commit 9901d25
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ public void close() {
// 清理完过期消息后,需要重新执行take snapshot
@Override
public void run() {
if (!stateMachine.getRaftNode().getSnapshot().getIsInSnapshot().compareAndSet(false, true)) {
LOG.info("state machine is busy");
if (stateMachine.getRaftNode().getSnapshot().getIsInstallSnapshot().get()) {
LOG.info("already in install snapshot, please clear expired messages later");
return;
}
if (!stateMachine.getRaftNode().getSnapshot().getIsTakeSnapshot().compareAndSet(false, true)) {
LOG.info("already in take snapshot, please clear expired messages later");
return;
}
LOG.info("start to clear expired messages");
Expand Down Expand Up @@ -143,7 +147,7 @@ public void run() {
}
}
} finally {
stateMachine.getRaftNode().getSnapshot().getIsInSnapshot().compareAndSet(true, false);
stateMachine.getRaftNode().getSnapshot().getIsTakeSnapshot().compareAndSet(true, false);
}
LOG.info("end to clear expired messages");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,15 @@ public void close() {
}

private void readSegments() {
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(segmentDir);
for (String fileName: fileNames) {
Segment segment = new Segment(segmentDir, fileName);
startOffsetSegmentMap.put(segment.getStartOffset(), segment);
try {
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(segmentDir, segmentDir);
for (String fileName : fileNames) {
Segment segment = new Segment(segmentDir, fileName);
startOffsetSegmentMap.put(segment.getStartOffset(), segment);
}
} catch (IOException ex) {
LOG.warn("read segments exception:", ex);
throw new RuntimeException("read segments exception", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ private BrokerMessage.MessageContent.Builder createMessage(String topic, Integer
public void testClearExpiredLog() {
// mock
Snapshot snapshot = Mockito.mock(Snapshot.class);
Mockito.when(snapshot.getIsInSnapshot()).thenReturn(new AtomicBoolean(false));
Mockito.when(snapshot.getIsInstallSnapshot()).thenReturn(new AtomicBoolean(false));
Mockito.when(snapshot.getIsTakeSnapshot()).thenReturn(new AtomicBoolean(false));
RaftNode raftNode = Mockito.mock(RaftNode.class);
Mockito.when(raftNode.getSnapshot()).thenReturn(snapshot);
Mockito.doNothing().when(raftNode).takeSnapshot();
Expand Down

0 comments on commit 9901d25

Please sign in to comment.