阿里中间件性能大赛初赛小结

前段时间参加了阿里举办的一期阿里中间件性能大赛,初赛的信息详见链接,说简单点就是实现一个可以持久化的消息队列引擎,能够hold住亿万级别的消息不丢失,抵挡住消息洪峰。消息具有producer和comsumer两个角色,不同的comsumer可以订阅感兴趣的Topic消息,同一个topic的消息会发给所有对这个topic感兴趣的comsumer,同时每个comsumer可以拥有一个唯一的消息管道queue,producer发送到该管道queue里面的消息只能被该comsumer消费。在题目的限制条件下,要求设计出这么一个消息引擎,在完成功能的基础上,要求消息发送和读取均有较高的性能。

拿到题目的时候还是比较开心的,是一个我感兴趣的问题,以前了解过kafka,但是让自己设计毕竟还是头一遭,但是抱着试一试学习的心态便报名了,中间一段时间遇到了点事情,一度中止,这直接导致我在最后快写完的时候也快到截止日期了,最后还是坚持写完终结了各种调试bug,不过没赶到截止日期之前提交上去,于是也没能拿到啥成绩,但是本地本机测试的结果感觉不赖,更重要的是这个过程挖掘了很多以前不太清楚没有接触到的内容,还是收获挺大的,唯一的遗憾就是没能早点写完代码提交,这样也许就能进复赛了。
这里记录一些初赛的过程,权当是知识点的串联和归纳,如果有错误,欢迎指出。

设计思路

再一开始没有任何想法的时候,借鉴了RocketMqfqueue这两个软件设计的思路。最终设计如下:

  • 消息连续写入:考虑写消息的效率,将所有的消息连续写入到一个文件中,如果一个文件写满,再新建一个新文件。
  • 消息索引持久化:因为每条消息是发到topic或者queue中的,根据不同的topic或者queue,持久化该条消息的索引到索引文件中(这里即为,topic或者queue对应的索引文件)。
  • 消息读取:每个comsumer根据自己绑定的兴趣集合,轮询的读取最新的消息。
    • 下一个要消费的消息索引的地址:通过特殊的map缓存数据结构记录每个consumer的topic或者queue兴趣集合中,下一个要消费的消息索引的地址。
    • 下一条消息的索引:根据下一条消息索引的地址获取消息索引,然后根据消息索引去消息文件中便可以读到对应文件位置上的消息。

这是整个消息存和取的一个大致的流程设计记录,个人觉得这么设计主要是出于连续写入一个文件的效率比随机写到各个文件的效率要高,但是却牺牲了读取的效率,读取的过程需要先读索引文件,再读消息文件。
从另外一个方向考虑,如果不按照连续存的方式,而是每个topic或者queue都建一个文件来存储消息,这样读取的时候就只用一次了,但是写起来的话落盘的速度可能会慢一些,但是二者方式综合起来孰优孰劣,没有做实际比较,这里也无法给出结论。

实现细节

消息编码

消息有Headers、Properties、Body三个属性,其中Headers、Properties都是KV键值对,Body是字节数组,前两者的value分别有longdoubleStringint四种数据类型,如果要保证消息的正确性,消息在进行存之前和取出消息之后,消息中KV中的value数据类型必须是要一致的。当时在旺旺群里讨论的同学,很多对数据类型持久化这一点如何实现很困惑,不过主要有两种声音,一种是直接用Java的原生的序列化接口,另外就是自己手动序列化。
我在思考时候,隐隐约约中想起了Redis的持久化实现原理,Redis本身也是KV键值存储,对于不同的数据类型的存储是天然支持,于是在查找了一些资料之后,确定了消息的编码格式,每条消息是按照下面的格式进行存储:

1
2
3
+----------+-----------+----------+---------+-----+-----------+-------+-----+------------+----+
| MSG_FLAG | HEAD_FLAG | LEN_TYPE | LEN_KEY | key | LEN_VALUE | value | ... | PROP_FLAG | ...|
+----------+-----------+----------+---------+-----+-----------+-------+-----+------------+----+

部分代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class DefaultBytesMessage implements BytesMessage {
public static byte VALUE_TYPE_INT=1;
public static byte VALUE_TYPE_LONG=2;
public static byte VALUE_TYPE_DOUBLE=3;
public static byte VALUE_TYPE_STRING=4;
public static int LEN_TYPE=1;
public static int LEN_KEY=1;
public static int LEN_VALUE=1;
public static int LEN_BODY=1;
public static int LEN_INT=4;
public static int LEN_LONG=8;
public static int LEN_DOUBLE=8;
public static byte HEAD_FLAG=-1;
public static byte PROP_FLAG=-2;
public static byte BODY_FLAG=-3;
public static byte MSG_FLAG=-4;
private KeyValue headers = new DefaultKeyValue();
private byte[] heads={};
private KeyValue properties;
private byte[] props={};
private byte[] body={};
...
public Message putHeaders(String key, long value) {
headers.put(key, value);
appendHeader(key,AllUtils.longToBytes(value),VALUE_TYPE_LONG,LEN_LONG);
return this;
}
private void appendHeader(String key,byte[] value,byte typeOfvalue,int lenOfvalue){
//拼接header的byte数组
if (heads.length==0)
heads=new byte[]{HEAD_FLAG};
int srcLen=heads.length;
byte[] newArray = new byte[srcLen+LEN_TYPE+LEN_KEY+key.getBytes().length+LEN_VALUE+lenOfvalue];//1字节表示value的数据类型,1字节表示key的长度,最后4字节表示固定的int值
System.arraycopy(heads,0,newArray,0,srcLen);
//数据类型
int index=srcLen;
newArray[index]=typeOfvalue;
//key的长度
newArray[++index]=(byte) key.getBytes().length;
//key值
System.arraycopy(key.getBytes(), 0, newArray, ++index, key.getBytes().length);
index+=key.getBytes().length;
//value长度
newArray[index]=(byte) value.length;
//value值
System.arraycopy(value, 0, newArray, ++index, lenOfvalue);
heads=newArray;
}
}

编码存储之后,读取的时候再按照上述编码方式进行解码,便能还原整个消息。

大文件读写

java处理大文件,一般用BufferedReader,BufferedInputStream这类带缓冲的Io类,不过如果文件超大的话,更快的方式是采用MappedByteBuffer。MappedByteBuffer是java nio引入的文件内存映射方案,读写性能极高。下面这段话是引用《Java NIO》对内存映射文件的解释:

传统的文件 I/O 是通过用户进程发布 read( )和 write( )系统调用来传输数据的。为了在内核空间
的文件系统页与用户空间的内存区之间移动数据,一次以上的拷贝操作几乎总是免不了的。这是因为,在文件系统页与用户缓冲区之间往往没有一一对应关系。但是,还有一种大多数操作系统都支持的特殊类型的 I/O操作,允许用户进程最大限度地利用面向页的系统 I/O 特性,并完全摒弃缓冲区拷贝。这就是内存映射 I/O。如下图所示:


这种映射文件的方案的设计确实看起来不错,在Java中创建一个MappedByteBuffer的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class MappedFile {
private String filename;
private int filesize;
private FileChannel fileChannel;
private File file;
private MappedByteBuffer mappedByteBuffer;
//下一节中提到的,用于对映射文件的刷新线程
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public MappedFile(String filename, int filesize) throws IOException {
init(filename,filesize);
}
public void init(String filename, int filesize) throws IOException {
this.filename = filename;
this.filesize = filesize;
this.file = new File(filename);
...
try {
//=======创建映射文件 START========
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, filesize);
//=======创建映射文件 END========
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
//执行线程任务
executor.execute(new Sync());
}
//内部类的实现见下节中代码
public class Sync implements Runnable {...}
}

filechannel的map方法是将文件映射进内存中的一个区域,针对mappedByteBuffer的操作可以完全不用管文件,可以这么理解,操作mappedByteBuffer就是操作文件。在FileChannel.map方法的jdk源码中,有这么一句注释:

A mapping, once established, is not dependent upon the file channel that was used to create it. Closing the channel, in particular, has no effect upon the validity of the mapping.

意思就是映射文件一旦创建就跟原来创建它的filechannel没有任何关系了,即使关闭filechannel也对映射文件没有任何影响。
还有一句话关于MappedByteBuffer的应用场景:

For most operating systems, mapping a file into memory is more expensive than reading or writing a few tens of kilobytes of data via the usual {@link #read read} and {@link #write write} methods. From the standpoint of performance it is generally only worth mapping relatively large files into memory.

翻译过来就是当数据量比较小的时候,比如几十M的数据量,相比较传统read和write的的io 方式,用映射文件是非常不划算的。因此从性能的角度考虑,在处理较大文件的时候才用映射文件的方案。

文件强制刷盘

对MappedByteBuffer的修改并不会立刻反应到文件上,而只是停留在内存上,但是以防消息发生意外在内存中丢失,所以需要定期将文件的修改刷新到磁盘上。MappedByteBuffer中提供了force的方法,去完成这个操作。将mappedByteBuffer.force();这个操作封装在了一个线程当中,隔一段时间去强制刷盘一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Sync implements Runnable {
@Override
public void run() {
while (true) {
if (mappedByteBuffer != null) {
try {
mappedByteBuffer.force();
Thread.sleep(500);
} catch (Exception e) {
break;
}
} else {
break;
}
}
}
}

压缩与解压缩

这是跟几个参赛队友在一起讨论的时候他们的观点,但是并没有在我的实现中用到,但是我觉得是一个不错的想法,因为消息数据进行压缩之后会变少,这样既可以缓解IO的压力,同时也可以充分利用CPU的计算能力,是一个两全其美的方案。压缩过程示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
ByteBuffer byteBuffer = ByteBuffer.allocate(Size);
byte[] zipBuffer = new byte[Size];
//java自带的压缩工具
Deflater compresser =new Deflater(Deflater.BEST_SPEED);
//设置压缩数据来源,byteBuffer是参考
compresser.setInput(byteBuffer.array(),0,byteBuffer.position());
//表示这次压缩以当前的输入内容结束
compresser.finish();
//返回压缩结果的长度,zipBuffer存放压缩结果
int compressedDataLength = compresser.deflate(zipBuffer);
//reset重置,准备下次压缩
compresser.reset();

解压缩的过程也很简单,示例代码如下:

1
2
3
4
5
6
Inflater decompresser = new Inflater();
byte[] byte4zip = new byte[2*1024*1024];
decompresser.setInput(byte4zip, 0, len);
int resultLength = decompresser.inflate(byte4message);
decompresser.reset();

在准备的过程中,还是接触到了很多东西的,不过有些需要时间慢慢去弄明白,比如page cache是怎么回事,Linux的直接I/OLinux的zerocopy(零拷贝)技术等等,这些都是偏向操作系统层面的东西,当时在看这些东西的时候,想起了当时阿里实习面试的时候跟一个面试官聊天的内容,面试快结束的时候,面试官意味深长的说起,工作之后才发现当初没好好的学习计算机的一些基础课程和基础内容,比如操作系统里面的进程调度、文件系统,还有编译原理等等。

感觉这些基础的东西,对技术的成长还是很关键的,那个面试当时举了个例子,说这些基础的知识和算法就是盖房子需要的一些基本知识,只要好好掌握了,盖什么高楼大厦都是一样的,融会贯通都能盖得起来。个人也有这种感觉,基础性的东西,带来的思考和认知层面的提升,虽不能立马反应到当前的实践中,但是却会不断的修炼和完善你的内功,产生潜移默化的提高,这一点我是相信的。

比如今天看到的一篇关于语言学习的文章,文章标题很浮夸,但是在你已经掌握了一门语言,又想去学习另外一门语言的时候,这篇文章内容绝对值得一看的:如何掌握所有的程序语言-by 王垠

参考

1、java大文件读写操作
2、Java NIO学习笔记——内存映射缓冲区

-EOF-