logstash的lumberjack协议解析
最近在实现一个agent采集服务器日志,设计agent需要能够将数据发送给logstash。当然logstash支持很多输入协议,其中,logstash技术栈(包括ElasticSearch)内有一种叫做lumberjack的协议,可能是专门为传输日志数据设计的。不过网上对于lumberjack协议没有公开的资料,而且实现上,只有java、ruby、golang版。笔者通过参考golang版(elastic/go-lumber)和java版(logstash-forwarder-java),用C实现了agent对lumberjack的支持。本文总结一下lumberjack协议的协议报文格式。
lumberjack版本
lumberjack总共有两个版本,logstash-forwarder-java只实现了第一版,elastic/go-lumber两个版本都实现了。新版的logstash作为服务端同时支持两个版本。相比而言,V2在格式定义上支持json,因此比V1简化很多,而且冗余信息略少于V1。由于json的引入,使得V2版本支持json支持的所有类型,而V1却甚至无法表达一个整型类型(只能全部用字符串表达)。
日志对象
一个日志对象是一个map,所以可以用json形式来序列化表达(V2才支持json)。当然这种数据结构不限于传输日志数据。一个完整的lumberjack报文可以包含多个日志对象,即可以支持批量发送日志。在同一个报文中,每个日志对象用sequence(uint32_t)来区分,类似数组的索引(index)。
V1版本格式
Window头
Window头包含有协议的版本、W标志、日志对象数量 这3个信息,下面是Window头的报文格式:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7+---------------+---------------+---------------------------------------------------------------+| '1'(0x31) | 'W'(0x57) | window size(uint32 bigendian) |+---------------+---------------+---------------------------------------------------------------+
'1'(0x31)是一个ASCII字符1占用一个字节,表示V1版本
'W'(0x57)是W标识,表示是Window头
window size是一个uint32_t的整型(大端存储),这个值表示报文的日志对象有多少个
日志对象格式
一个日志对象包含对象头和对象体
下面是日志对象头
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7+---------------+---------------+---------------------------------------------------------------+| '1'(0x31) | 'D'(0x44) | seq(uint32 bigendian) |+---------------------------------------------------------------+-------------------------------+| count of key value pairs(uint32 bigendian) |+---------------------------------------------------------------+
'1'(0x31)是一个ASCII字符1占用一个字节,表示V1版本
'D'(0x44)是D标识,表示是Data
seq即为日志对象的序号,一般可以从1开始累加,用来在后面的确认报文中会带上序号,发送端就可以知道接收端究竟确认了哪些对象了
count of key value pairs,由于日志对象的基本数据类型是map所以构成了一系列的key value pair,这个值就是标识一个日志对象中,究竟包含多少个key value pair。
下面是日志对象体
既然日志对象是map结构,那么数据体就是要存储这个结构,我们来继续看日志体的格式
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +---------------------------------------------------------------+| 1st key size(in bytes,big endian) |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +| || 1st key payload || |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +| 1st payload size(in bytes,big endian) |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +| || 1st payload payload || |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +| 2nd key size(in bytes,big endian) |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +| || 2nd key payload || |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +| 2nd payload size(in bytes,big endian) |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +| || 2nd payload payload || |: :
可以看到,每个分为key和payload两部分,string自然是key,而byte[]自然是payload。
在组包时,先填入key的长度,然后填入key,再填入payload长度,再填入payload,如此往复,就可以完成一个map结构的表达。
最终,一个完整的lumberjack请求包,包含:
Window头 + N x 日志对象(日志对象头+日志对象体)
ACK
lumberjack设计了应用层ACK机制,即接收端可以在一次请求过程中,随时确认收到的每一个日志对象(通过seq标示)。当然也可以全部收完以后,只确认最后一个日志对象。客户端一般可以设计成等待服务端确认最后一个日志对象的ACK后,放心的认为服务端接收全部收完了。
ACK的格式如下:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7+---------------+---------------+---------------------------------------------------------------+| '1'(0x31) | 'A'(0x41) | record seq(uint32 bigendian) |+---------------+---------------+---------------------------------------------------------------+
压缩报文
lumberjack支持将日志对象数据压缩后发送,不过在一个完成的报文中,不能同时包含压缩和非压缩数据,即要么对所有的日志对象一起压缩,要么都不压缩。压缩形态的报文格式为:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7+---------------+---------------+---------------------------------------------------------------+| '1'(0x31) | 'C'(0x43) | payload length(in bytes,uint32 bigendian) |+-----------------------------------------------------------------------------------------------+| || compressed record segment(s) || |+-----------------------------------------------------------------------------------------------+
'C'(0x43)表示Compress数据
payload length表示压缩数据的长度
compressed record segment(s)是对一个或多个日志对象(包括日志头和日志体)进行压缩以后的字节数据
所以一个完整的压缩形态的数据报文应该是:
Window头 + 压缩报文
值得注意的是,尽管使用了压缩报文,Window头中的日志对象数量还是需要指明日志对象的数量的。
关于压缩,是基于deflate进行的,使用默认的压缩级别即可(level6)。各个语言都可以使用zlib进行压缩和解压,以C语言为例,zlib Usage Example。
下面的代码用于压缩一段内存数据,代码大致如下:
# include z_stream strm;int ret;unsigned char output[CHUNK];strm.zalloc = Z_NULL;strm.zfree = Z_NULL;strm.opaque = Z_NULL;//Z_DEFAULT_COMPRESSION是-1,是默认的压缩级别,相当于压缩级别6deflateInit(&strm, Z_DEFAULT_COMPRESSION);strm.next_in = (unsigned char *)input;strm.avail_in = len;strm.next_out = output;do { strm.avail_out = CHUNK; ret = deflate(&strm, Z_FINISH); //写如结果 memcpy(result,output,CHUNK-strm.avail_out);} while(strm.avail_out == 0 && strm.avail_in != 0);deflateEnd(&strm);
V2版本格式
V2版本就没有V1版本那么啰嗦了,直接上图:
window header与v1的协议一致,只是version部分填入2。
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7+---------------+---------------+---------------------------------------------------------------+| '2'(0x31) | 'W'(0x57) | window size(uint32 bigendian) |+---------------+---------------+---------------------------------------------------------------+
日志对象的头部用J代替D,表示是json格式的数据
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7+---------------+---------------+---------------------------------------------------------------+| '1'(0x31) | 'J'(0x4A) | seq(uint32 bigendian) |+---------------------------------------------------------------+-------------------------------+| payload length(uint32 bigendian) | +---------------------------------------------------------------+| || json encode string || |+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
除此之外,由于payload部分不需要通过协议本身来表达map,直接借助json格式,所以只需增加json字符串的串长就可以了(图中的payload length)。
V2版本的ACK与V1版本完全相同。
关键字:logstash, 日志, payload, 对象
版权声明
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处。如若内容有涉嫌抄袭侵权/违法违规/事实不符,请点击 举报 进行投诉反馈!