在使用Linux ramdump parser解析离线ramdump时,我们看的最多的也就是dmesg。那工具是如何从fulldump中解析出内核日志的呢?
插件入口
通过前两篇的文章,我们可以知道,插件是通过register_parser注册的,然后自动执行插件的parse函数。所以dmesg也不例外
在工具内有一个dmesg.py
from parser_util import register_parser, RamParser
import print_out
import dmesglib
@register_parser('--dmesg', 'Print the dmesg', shortopt='-d')
class Dmesg(RamParser):
def parse(self):
dmesglib.DmesgLib(self.ramdump, print_out.out_file).extract_dmesg()
parse函数通过调用dmesglib的DmesgLib实例类的extract_dmesg函数,所以下面就要看这个DmesgLib实物类到底干了什么?
DmesgLib的初始化
在工具内有一个DmesgLib.py文件
class DmesgLib(object):
def __init__(self, ramdump, outfile):
self.ramdump = ramdump
self.wrap_cnt = 0
self.outfile = outfile
if (self.ramdump.sizeof('struct printk_log') is None):
self.struct_name = 'struct log'
else:
self.struct_name = 'struct printk_log'
将Ramdump实物类传递给DmesgLib实物类,并将日志输出的文件路径传给self.outfile
extract_dmesg函数
def extract_dmesg(self):
major, minor, patch = self.ramdump.kernel_version
if (major, minor) >= (5, 10):
return self.extract_lockless_dmesg()
if (major, minor) >= (3, 7):
self.extract_dmesg_binary()
return
self.extract_dmesg_flat()
我司的项目基本已经大于kernel-5.10,故这里走extract_lockless_dmesg
函数
dmesg解析的核心函数extract_lockless_dmesg
这个函数还是比较复杂的,这里按照功能分成子标题去讲解流程
def extract_lockless_dmesg(self, write_to_file=True):
prb_addr = self.ramdump.read_pointer('prb')
off = self.ramdump.field_offset('struct printk_ringbuffer', 'desc_ring')
desc_ring_addr = prb_addr + off
off = self.ramdump.field_offset('struct prb_desc_ring', 'count_bits')
desc_ring_count = 1 << self.ramdump.read_u32(desc_ring_addr + off)
desc_sz = self.ramdump.sizeof('struct prb_desc')
off = self.ramdump.field_offset('struct prb_desc_ring', 'descs')
descs_addr = self.ramdump.read_ulong(desc_ring_addr + off)
info_sz = self.ramdump.sizeof('struct printk_info')
off = self.ramdump.field_offset('struct prb_desc_ring', 'infos')
infos_addr = self.ramdump.read_ulong(desc_ring_addr + off)
off = self.ramdump.field_offset(
'struct printk_ringbuffer', 'text_data_ring')
text_data_ring_addr = prb_addr + off
off = self.ramdump.field_offset('struct prb_data_ring', 'size_bits')
text_data_sz = 1 << self.ramdump.read_u32(text_data_ring_addr + off)
off = self.ramdump.field_offset('struct prb_data_ring', 'data')
data_addr = self.ramdump.read_ulong(text_data_ring_addr + off)
sv_off = self.ramdump.field_offset('struct prb_desc', 'state_var')
off = self.ramdump.field_offset('struct prb_desc','text_blk_lpos')
begin_off = off + self.ramdump.field_offset(
'struct prb_data_blk_lpos', 'begin')
next_off = off + self.ramdump.field_offset(
'struct prb_data_blk_lpos', 'next')
ts_off = self.ramdump.field_offset('struct printk_info', 'ts_nsec')
callerid_off = self.ramdump.field_offset('struct printk_info', 'caller_id')
len_off = self.ramdump.field_offset('struct printk_info', 'text_len')
desc_committed = 1
desc_finalized = 2
desc_sv_bits = self.ramdump.sizeof('long') * 8
desc_flags_shift = desc_sv_bits - 2
desc_flags_mask = 3 << desc_flags_shift
desc_id_mask = ~desc_flags_mask
off = self.ramdump.field_offset('struct prb_desc_ring','tail_id')
tail_id = self.ramdump.read_ulong(desc_ring_addr + off)
off = self.ramdump.field_offset('struct prb_desc_ring','head_id')
head_id = self.ramdump.read_ulong(desc_ring_addr + off)
did = tail_id
dmesg_list={}
while True:
ind = did % desc_ring_count
desc_off = desc_sz * ind
info_off = info_sz * ind
# skip non-committed record
state = 3 & (self.ramdump.read_ulong(descs_addr + desc_off +
sv_off) >> desc_flags_shift)
if state != desc_committed and state != desc_finalized:
if did == head_id:
break
did = (did + 1) & desc_id_mask
continue
begin = self.ramdump.read_ulong(descs_addr + desc_off +
begin_off) % text_data_sz
end = self.ramdump.read_ulong(descs_addr + desc_off +
next_off) % text_data_sz
if begin & 1 == 1:
text = ""
else:
if begin > end:
begin = 0
text_start = begin + self.ramdump.sizeof('long')
text_len = self.ramdump.read_u16(infos_addr +
info_off + len_off)
if end - text_start < text_len:
text_len = end - text_start
if text_len < 0:
text_len = 0
text = self.ramdump.read_cstring(data_addr +
text_start, text_len)
time_stamp = self.ramdump.read_u64(infos_addr +
info_off + ts_off)
caller_data = self.ramdump.read_u32(infos_addr +
info_off + callerid_off)
tid_info = "T"
if (caller_data & 0x80000000):
tid_info = "C"
caller_id_data = caller_data & ~0x80000000
pid = caller_id_data
caller_id_data = tid_info + str(caller_id_data)
for line in text.splitlines():
msg = u"[{time:12.6f}][{caller_id_data:>6}] {line}\n".format(
time=time_stamp / 1000000000.0,
line=line,caller_id_data=caller_id_data)
#print(msg, write_to_file)
if write_to_file:
self.outfile.write(msg)
else:
dmesg = []
dmesg.append(pid)
dmesg.append(line)
dmesg_list[time_stamp] = dmesg
if did == head_id:
break
did = (did + 1) & desc_id_mask
return dmesg_list
初始化环形缓冲区参数
prb_addr = self.ramdump.read_pointer('prb')
off = self.ramdump.field_offset('struct printk_ringbuffer', 'desc_ring')
desc_ring_addr = prb_addr + off
off = self.ramdump.field_offset('struct prb_desc_ring', 'count_bits')
desc_ring_count = 1 << self.ramdump.read_u32(desc_ring_addr + off)
desc_sz = self.ramdump.sizeof('struct prb_desc')
off = self.ramdump.field_offset('struct prb_desc_ring', 'descs')
descs_addr = self.ramdump.read_ulong(desc_ring_addr + off)
info_sz = self.ramdump.sizeof('struct printk_info')
off = self.ramdump.field_offset('struct prb_desc_ring', 'infos')
infos_addr = self.ramdump.read_ulong(desc_ring_addr + off)
off = self.ramdump.field_offset(
'struct printk_ringbuffer', 'text_data_ring')
text_data_ring_addr = prb_addr + off
off = self.ramdump.field_offset('struct prb_data_ring', 'size_bits')
text_data_sz = 1 << self.ramdump.read_u32(text_data_ring_addr + off)
off = self.ramdump.field_offset('struct prb_data_ring', 'data')
data_addr = self.ramdump.read_ulong(text_data_ring_addr + off)
sv_off = self.ramdump.field_offset('struct prb_desc', 'state_var')
off = self.ramdump.field_offset('struct prb_desc','text_blk_lpos')
begin_off = off + self.ramdump.field_offset(
'struct prb_data_blk_lpos', 'begin')
next_off = off + self.ramdump.field_offset(
'struct prb_data_blk_lpos', 'next')
ts_off = self.ramdump.field_offset('struct printk_info', 'ts_nsec')
callerid_off = self.ramdump.field_offset('struct printk_info', 'caller_id')
len_off = self.ramdump.field_offset('struct printk_info', 'text_len')
off = self.ramdump.field_offset('struct prb_desc_ring','tail_id')
tail_id = self.ramdump.read_ulong(desc_ring_addr + off)
off = self.ramdump.field_offset('struct prb_desc_ring','head_id')
head_id = self.ramdump.read_ulong(desc_ring_addr + off)
解析 Linux 内核中的printk环形缓冲区(ring buffer)结构,主要用于读取内核日志信息
日志提取流程
环形缓冲区遍历
did = tail_id
while True:
ind = did % desc_ring_count # 计算当前描述符索引
# ... 解析描述符状态和数据 ...
if did == head_id:
break
did = (did + 1) & desc_id_mask # 移动到下一个描述符(处理溢出)
环形索引:通过取模运算(% desc_ring_count)实现环形遍历。
终止条件:当did追上head_id时结束。
文本数据读取
begin = read_ulong(begin_off) % text_data_sz
end = read_ulong(next_off) % text_data_sz
if begin & 1 == 1:
text = "" # 无效数据
else:
if begin > end:
begin = 0 # 处理跨环情况
text = read_cstring(data_addr + begin, text_len)
无效数据标记:begin的最低位为1时表示数据无效。
跨环处理:若begin > end,说明文本跨环,需重置begin。
调用者ID解析
if (caller_data & 0x80000000):
tid_info = "C" # 线程ID
else:
tid_info = "T" # 任务ID
pid = caller_data & ~0x80000000
最高位(0x80000000)标识调用者类型(线程或任务)。
其余位存储实际的PID/线程ID。
输出格式
文件写入模式
每行日志格式化为:
[时间戳秒数][调用者ID] 日志文本
例如:
[ 12.345678][C1234] Kernel panic: Unable to handle NULL pointer dereference
字典返回模式
{ 1234567890000: [1234, "Kernel panic: ..."], ... }
Key:时间戳(纳秒)。
Value:列表,包含[PID, 日志文本]。
总结
定位环形缓冲区
从内核内存中读取printk_ringbuffer(prb)结构的基地址。
解析其内部的描述符环(Desc Ring)和数据环(Text Data Ring)。
初始化关键参数
计算描述符数量、数据区大小、字段偏移量等。
获取head_id和tail_id以确定有效日志范围。
遍历描述符
从tail_id开始,向head_id方向环形遍历。
跳过未提交(非committed/finalized)的描述符。
提取日志数据
根据描述符中的text_blk_lpos定位文本在数据环中的位置。
处理跨环和无效数据情况,读取原始日志文本。
解析元数据
提取时间戳、调用者ID(PID/线程)、文本长度等信息。
格式化输出
按需写入文件或返回结构化数据(时间戳 → PID + 日志)。
原理
解析dmesg的原理就在于这个printk_ringbuffer
TODO:待整理printk_ringbuffer的文章,后续整理后会将链接更新到此处!