Skip to content

Commit

Permalink
update segment log read write
Browse files Browse the repository at this point in the history
  • Loading branch information
wenweihu86 committed Jun 19, 2017
1 parent 2fbc99a commit af23de0
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static void main(String[] args) {
GlobalConf conf = GlobalConf.getInstance();
RaftMessage.Server localServer = conf.getLocalServer();
List<RaftMessage.Server> servers = conf.getServers();
String dataDir = conf.getString("data_dir");
String dataDir = conf.getDataDir();

// 初始化RPCServer
RPCServer server = new RPCServer(localServer.getEndPoint().getPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.zip.CRC32;

/**
* Created by wenweihu86 on 2017/6/17.
*/
Expand All @@ -22,4 +24,10 @@ public static String protoToJson(MessageOrBuilder message) {
}
}

public static long getCRC32(byte[] data) {
CRC32 crc32 = new CRC32();
crc32.update(data);
return crc32.getValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@ public class GlobalConf {
private static GlobalConf instance;

private Toml toml;
RaftMessage.Server localServer; // 本机节点
List<RaftMessage.Server> servers; // 集群所有节点
private String dataDir; // 数据目录
private int maxSegmentSize; // 单个segment文件最大大小

public GlobalConf() {
String fileName = "/broker.toml";
File file = new File(getClass().getResource(fileName).getFile());
toml = new Toml().read(file);
localServer = readLocalServer();
servers = readServers();
dataDir = toml.getString("data_dir");
maxSegmentSize = toml.getLong("max_segment_size").intValue();
}

public static GlobalConf getInstance() {
Expand All @@ -31,15 +39,7 @@ public static GlobalConf getInstance() {
return instance;
}

public String getString(String key) {
return toml.getString(key);
}

public int getInt(String key) {
return toml.getLong(key).intValue();
}

public RaftMessage.Server getLocalServer() {
private RaftMessage.Server readLocalServer() {
RaftMessage.Server.Builder serverBuilder = RaftMessage.Server.newBuilder();
RaftMessage.EndPoint.Builder endPointBuilder = RaftMessage.EndPoint.newBuilder();
Toml localServerConf = toml.getTable("local_server");
Expand All @@ -52,7 +52,7 @@ public RaftMessage.Server getLocalServer() {
return localServer;
}

public List<RaftMessage.Server> getServers() {
private List<RaftMessage.Server> readServers() {
List<RaftMessage.Server> servers = new ArrayList<>();
List<Toml> serverConfList = toml.getTables("servers");
for (Toml serverConf : serverConfList) {
Expand All @@ -70,4 +70,20 @@ public List<RaftMessage.Server> getServers() {
return servers;
}

public RaftMessage.Server getLocalServer() {
return localServer;
}

public List<RaftMessage.Server> getServers() {
return servers;
}

public String getDataDir() {
return dataDir;
}

public int getMaxSegmentSize() {
return maxSegmentSize;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.wenweihu86.distmq.broker.log;

import com.github.wenweihu86.distmq.broker.BrokerUtils;
import com.github.wenweihu86.raft.util.RaftFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,7 +18,8 @@
* Created by wenweihu86 on 2017/6/19.
*/
public class Segment {
public static int HEADER_LENGTH = Long.SIZE / Byte.SIZE;
public static int SEGMENT_HEADER_LENGTH = Long.SIZE / Byte.SIZE;
public static int MESSAGE_HEADER_LENGTH = (Long.SIZE + Integer.SIZE) / Byte.SIZE;
private static final Logger LOG = LoggerFactory.getLogger(Segment.class);

private String dirName;
Expand Down Expand Up @@ -77,16 +79,21 @@ public long append(byte[] messageContent) {
long offset = 0;
try {
int writeSize;
// TODO: 复用messageContent内存
ByteBuffer byteBuffer = ByteBuffer.allocate(Segment.SEGMENT_HEADER_LENGTH
+ Segment.MESSAGE_HEADER_LENGTH + + messageContent.length);
if (fileSize == 0) {
// TODO: 复用messageContent内存
ByteBuffer byteBuffer = ByteBuffer.allocate(Segment.HEADER_LENGTH + messageContent.length);
byteBuffer.putLong(System.currentTimeMillis());
byteBuffer.putLong(BrokerUtils.getCRC32(messageContent));
byteBuffer.putInt(messageContent.length);
byteBuffer.put(messageContent);
writeSize = channel.write(byteBuffer);
endOffset += writeSize;
offset = startOffset;
} else {
ByteBuffer byteBuffer = ByteBuffer.wrap(messageContent);
byteBuffer.putLong(BrokerUtils.getCRC32(messageContent));
byteBuffer.putInt(messageContent.length);
byteBuffer.put(messageContent);
writeSize = channel.write(byteBuffer);
offset = endOffset;
endOffset += writeSize;
Expand All @@ -98,6 +105,39 @@ public long append(byte[] messageContent) {
return offset;
}

public byte[] read(long offset) {
if (offset >= startOffset + fileSize) {
LOG.warn("invalid offset={}", offset);
return null;
}
try {
channel.position(offset - startOffset);
ByteBuffer headerBuffer = ByteBuffer.allocate(MESSAGE_HEADER_LENGTH);
int readLen = channel.read(headerBuffer);
if (readLen < MESSAGE_HEADER_LENGTH) {
LOG.warn("read message error");
return null;
}
long crc32 = headerBuffer.getLong();
int messageLen = headerBuffer.getInt();
ByteBuffer messageContentBuffer = ByteBuffer.allocate(messageLen);
readLen = channel.read(messageContentBuffer);
if (readLen < messageLen) {
LOG.warn("read message error");
return null;
}
if (BrokerUtils.getCRC32(messageContentBuffer.array()) != crc32) {
LOG.warn("read message error: crc32 check failed");
return null;
}
return messageContentBuffer.array();
} catch (IOException ex) {
LOG.warn("read segment error, dir={}, file={}, offset={}",
dirName, fileName, offset);
return null;
}
}

public String getDirName() {
return dirName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.*;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
Expand Down Expand Up @@ -50,7 +51,7 @@ public long append(byte[] messageContent) {
if (!lastSegment.isCanWrite()) {
isNeedNewSegmentFile = true;
} else {
int maxSegmentSize = GlobalConf.getInstance().getInt("max_segment_size");
int maxSegmentSize = GlobalConf.getInstance().getMaxSegmentSize();
if (lastSegment.getFileSize() + messageContent.length > maxSegmentSize) {
isNeedNewSegmentFile = true;
}
Expand All @@ -75,7 +76,7 @@ public long append(byte[] messageContent) {
// 新建segment文件
if (isNeedNewSegmentFile) {
// open new segment file
long newStartOffset = getLastEndOffset() + Segment.HEADER_LENGTH;
long newStartOffset = getLastEndOffset() + Segment.SEGMENT_HEADER_LENGTH;
String newSegmentFileName = String.format("open-%d", newStartOffset);
String newFullFileName = segmentDir + File.separator + newSegmentFileName;
File newSegmentFile = new File(newFullFileName);
Expand All @@ -92,6 +93,16 @@ public long append(byte[] messageContent) {
}
}

public byte[] read(long offset) {
Map.Entry<Long, Segment> entry = startOffsetSegmentMap.floorEntry(offset);
if (entry == null) {
LOG.warn("message not found, offset={}", offset);
return null;
}
Segment segment = entry.getValue();
return segment.read(offset);
}

public void readSegments() {
List<String> fileNames = RaftFileUtils.getSortedFilesInDirectory(segmentDir);
for (String fileName: fileNames) {
Expand All @@ -103,7 +114,8 @@ public void readSegments() {
public void validateSegments() {
long lastEndOffset = 0;
for (Segment segment : startOffsetSegmentMap.values()) {
if (lastEndOffset > 0 && segment.getStartOffset() != lastEndOffset + Segment.HEADER_LENGTH) {
if (lastEndOffset > 0 && segment.getStartOffset()
!= lastEndOffset + Segment.SEGMENT_HEADER_LENGTH) {
throw new RuntimeException("segment dir not valid:" + segmentDir);
}
lastEndOffset = segment.getEndOffset();
Expand Down

0 comments on commit af23de0

Please sign in to comment.