Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jul 4, 2017
1 parent 9901d25 commit d3f4e4f
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,25 @@ public BrokerMessage.SendMessageResponse sendMessage(BrokerMessage.SendMessageRe
metadataManager.updateTopicMap(request.getTopic(), queueMap);
}

// 由producer来保证topic已经存在,broker就不再校验topic是否存在。
// 验证queue存在,并且属于该sharding
boolean shardingValid = false;
GlobalConf conf = GlobalConf.getInstance();
Integer shardingId = metadataManager.getQueueSharding(request.getTopic(), request.getQueue());
if (shardingId != null && shardingId.equals(conf.getShardingId())) {
shardingValid = true;
}

if (!topicExist || !shardingValid) {
String message = "queue not exist or not be included by this sharding";
baseResBuilder.setResMsg(message);
responseBuilder.setBaseRes(baseResBuilder.build());
LOG.info("sendMessage request, topic={}, queue={}, resCode={}, resMsg={}",
request.getTopic(), request.getQueue(),
responseBuilder.getBaseRes().getResCode(),
responseBuilder.getBaseRes().getResMsg());
return responseBuilder.build();
}
// boolean shardingValid = false;
// GlobalConf conf = GlobalConf.getInstance();
// Integer shardingId = metadataManager.getQueueSharding(request.getTopic(), request.getQueue());
// if (shardingId != null && shardingId.equals(conf.getShardingId())) {
// shardingValid = true;
// }
// if (!topicExist || !shardingValid) {
// String message = "queue not exist or not be included by this sharding";
// baseResBuilder.setResMsg(message);
// responseBuilder.setBaseRes(baseResBuilder.build());
// LOG.warn("sendMessage request, topic={}, queue={}, resCode={}, "
// + "topicExist={}, shardingValid={}, resMsg={}",
// request.getTopic(), request.getQueue(),
// responseBuilder.getBaseRes().getResCode(),
// topicExist, shardingValid, responseBuilder.getBaseRes().getResMsg());
// return responseBuilder.build();
// }

// 如果自己不是leader,将写请求转发给leader
if (raftNode.getLeaderId() <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.github.wenweihu86.raft.util.ConfigurationUtils;
import com.github.wenweihu86.rpc.server.RPCServer;

import java.io.File;
import java.util.List;

/**
Expand All @@ -24,7 +25,7 @@ public static void main(String[] args) {
GlobalConf conf = GlobalConf.getInstance();
RaftMessage.Server localServer = conf.getLocalServer();
List<RaftMessage.Server> servers = conf.getServers();
String dataDir = conf.getDataDir();
String dataDir = System.getProperty("user.dir") + File.separator + conf.getDataDir();

// 初始化zookeeper
ZKConf zkConf = conf.getZkConf();
Expand All @@ -33,7 +34,7 @@ public static void main(String[] args) {
// 初始化RPCServer
RPCServer server = new RPCServer(localServer.getEndPoint().getPort());
// 应用状态机
BrokerStateMachine stateMachine = new BrokerStateMachine();
BrokerStateMachine stateMachine = new BrokerStateMachine(dataDir);
// 设置数据目录
RaftOptions.dataDir = dataDir;
// just for test snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public class BrokerStateMachine implements StateMachine {
private AtomicBoolean isAvailable = new AtomicBoolean(true);
private RaftNode raftNode;

public BrokerStateMachine() {
String dataDir = GlobalConf.getInstance().getDataDir();
public BrokerStateMachine(String dataDir) {
this.messageDir = dataDir + File.separator + "message";
}

Expand Down Expand Up @@ -62,12 +61,12 @@ public void readSnapshot(String snapshotDir) {
if (logManager != null) {
logManager.close();
}
File messageDirFile = new File(messageDir);
if (messageDirFile.exists()) {
FileUtils.deleteDirectory(messageDirFile);
}
File snapshotDirFile = new File(snapshotDir);
if (snapshotDirFile.exists()) {
if (snapshotDirFile.exists() && snapshotDirFile.listFiles().length > 0) {
File messageDirFile = new File(messageDir);
if (messageDirFile.exists()) {
FileUtils.deleteDirectory(messageDirFile);
}
FileUtils.copyDirectory(snapshotDirFile, messageDirFile);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;

Expand All @@ -21,7 +22,7 @@ public class LogManager implements Runnable {
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1);
private BrokerStateMachine stateMachine;

public LogManager(String logDir, BrokerStateMachine stateMachine) {
public LogManager(String logDir, BrokerStateMachine stateMachine) throws IOException {
this.stateMachine = stateMachine;
this.topicLogMap = new ConcurrentHashMap<>();
this.logDir = logDir;
Expand Down Expand Up @@ -49,7 +50,7 @@ public LogManager(String logDir, BrokerStateMachine stateMachine) {
continue;
}
Integer queueId = Integer.valueOf(queueDir.getName());
String fullQueuePath = logDir + File.separator + topicDir + File.separator + queueDir;
String fullQueuePath = queueDir.getCanonicalPath();
SegmentedLog queueLog = new SegmentedLog(fullQueuePath);
queueMap.put(queueId, queueLog);
}
Expand Down Expand Up @@ -117,6 +118,7 @@ public void run() {
SegmentedLog log = topicLogMap.get(topic).get(queue);
log.getLock().lock();
try {
List<Long> deletedKeyList = new ArrayList<>();
Segment lastSegment = null;
Iterator<Map.Entry<Long, Segment>> iterator
= log.getStartOffsetSegmentMap().entrySet().iterator();
Expand All @@ -131,12 +133,15 @@ public void run() {
- message.getCreateTime() / 1000
> conf.getExpiredLogDuration()) {
lastSegment.delete();
iterator.remove();
deletedKeyList.add(lastSegment.getStartOffset());
} else {
break;
}
lastSegment = segment;
}
for (Long startOffset : deletedKeyList) {
log.getStartOffsetSegmentMap().remove(startOffset);
}
} finally {
log.getLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void delete() {
close();
String fullFileName = dirName + File.separator + fileName;
File file = new File(fullFileName);
file.delete();
try {
FileUtils.forceDelete(file);
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.wenweihu86.distmq.broker.config.GlobalConf;
import com.github.wenweihu86.distmq.client.api.BrokerMessage;
import com.github.wenweihu86.raft.util.RaftFileUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,10 +57,9 @@ public boolean append(BrokerMessage.MessageContent.Builder message) {
lastSegment.getStartOffset(), lastSegment.getEndOffset());
String newFullFileName = segmentDir + File.separator + newFileName;
File newFile = new File(newFullFileName);
newFile.createNewFile();
String oldFullFileName = segmentDir + File.separator + lastSegment.getFileName();
File oldFile = new File(oldFullFileName);
oldFile.renameTo(newFile);
FileUtils.moveFile(oldFile, newFile);
lastSegment.setFileName(newFileName);
lastSegment.setRandomAccessFile(RaftFileUtils.openFile(segmentDir, newFileName, "r"));
lastSegment.setChannel(lastSegment.getRandomAccessFile().getChannel());
Expand Down Expand Up @@ -94,6 +94,12 @@ public boolean append(BrokerMessage.MessageContent.Builder message) {
public BrokerMessage.MessageContent read(long offset) {
lock.lock();
try {
if (startOffsetSegmentMap.size() > 0) {
long firstOffset = startOffsetSegmentMap.firstKey();
if (offset < firstOffset) {
offset = firstOffset;
}
}
Map.Entry<Long, Segment> entry = startOffsetSegmentMap.floorEntry(offset);
if (entry == null) {
LOG.warn("message not found, offset={}", offset);
Expand Down Expand Up @@ -122,6 +128,7 @@ private void readSegments() {
try {
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(segmentDir, segmentDir);
for (String fileName : fileNames) {
LOG.info("read segment filename={}", fileName);
Segment segment = new Segment(segmentDir, fileName);
startOffsetSegmentMap.put(segment.getStartOffset(), segment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,21 @@ private BrokerMessage.MessageContent.Builder createMessage(String topic, Integer
}

@Test
public void testClearExpiredLog() {
public void testClearExpiredLog() throws IOException {
GlobalConf conf = GlobalConf.getInstance();
conf.setMaxSegmentSize(128);
conf.setExpiredLogDuration(1);

// mock
Snapshot snapshot = Mockito.mock(Snapshot.class);
Mockito.when(snapshot.getIsInstallSnapshot()).thenReturn(new AtomicBoolean(false));
Mockito.when(snapshot.getIsTakeSnapshot()).thenReturn(new AtomicBoolean(false));
RaftNode raftNode = Mockito.mock(RaftNode.class);
Mockito.when(raftNode.getSnapshot()).thenReturn(snapshot);
Mockito.doNothing().when(raftNode).takeSnapshot();
BrokerStateMachine stateMachine = new BrokerStateMachine();
BrokerStateMachine stateMachine = new BrokerStateMachine(conf.getDataDir());
stateMachine.setRaftNode(raftNode);

GlobalConf conf = GlobalConf.getInstance();
conf.setMaxSegmentSize(128);
conf.setExpiredLogDuration(1);
LogManager logManager = new LogManager(conf.getDataDir(), stateMachine);
String topic = "test-topic";
Integer queue = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public Map<Integer, Integer> readTopicInfo(String topic) {
queueMap.put(queueId, shardingId);
}
} catch (Exception ex) {
LOG.info("readTopic failed, exception:", ex);
LOG.warn("readTopic failed, exception:", ex);
}
return queueMap;
}
Expand Down

0 comments on commit d3f4e4f

Please sign in to comment.