在使用Linux ramdump parser解析离线ramdump时,我们看的最多的也就是dmesg。那工具是如何从fulldump中解析出内核日志的呢?

插件入口

通过前两篇的文章,我们可以知道,插件是通过register_parser注册的,然后自动执行插件的parse函数。所以dmesg也不例外

在工具内有一个dmesg.py

from parser_util import register_parser, RamParser
import print_out
import dmesglib

@register_parser('--dmesg', 'Print the dmesg', shortopt='-d')
class Dmesg(RamParser):

    def parse(self):
        dmesglib.DmesgLib(self.ramdump, print_out.out_file).extract_dmesg()

parse函数通过调用dmesglib的DmesgLib实例类的extract_dmesg函数,所以下面就要看这个DmesgLib实物类到底干了什么?

DmesgLib的初始化

在工具内有一个DmesgLib.py文件

class DmesgLib(object):

    def __init__(self, ramdump, outfile):
        self.ramdump = ramdump
        self.wrap_cnt = 0
        self.outfile = outfile
        if (self.ramdump.sizeof('struct printk_log') is None):
           self.struct_name = 'struct log'
        else:
           self.struct_name = 'struct printk_log'

将Ramdump实物类传递给DmesgLib实物类,并将日志输出的文件路径传给self.outfile

extract_dmesg函数

    def extract_dmesg(self):
        major, minor, patch = self.ramdump.kernel_version
        if (major, minor) >= (5, 10):
            return self.extract_lockless_dmesg()
        if (major, minor) >= (3, 7):
            self.extract_dmesg_binary()
            return
        self.extract_dmesg_flat()

我司的项目基本已经大于kernel-5.10,故这里走extract_lockless_dmesg函数

dmesg解析的核心函数extract_lockless_dmesg

这个函数还是比较复杂的,这里按照功能分成子标题去讲解流程

    def extract_lockless_dmesg(self, write_to_file=True):
        prb_addr = self.ramdump.read_pointer('prb')
        off = self.ramdump.field_offset('struct printk_ringbuffer', 'desc_ring')
        desc_ring_addr = prb_addr + off

        off = self.ramdump.field_offset('struct prb_desc_ring', 'count_bits')
        desc_ring_count = 1 << self.ramdump.read_u32(desc_ring_addr + off)
        desc_sz = self.ramdump.sizeof('struct prb_desc')
        off = self.ramdump.field_offset('struct prb_desc_ring', 'descs')
        descs_addr = self.ramdump.read_ulong(desc_ring_addr + off)

        info_sz = self.ramdump.sizeof('struct printk_info')
        off = self.ramdump.field_offset('struct prb_desc_ring', 'infos')
        infos_addr = self.ramdump.read_ulong(desc_ring_addr + off)

        off = self.ramdump.field_offset(
                                'struct printk_ringbuffer', 'text_data_ring')
        text_data_ring_addr = prb_addr + off

        off = self.ramdump.field_offset('struct prb_data_ring', 'size_bits')
        text_data_sz = 1 << self.ramdump.read_u32(text_data_ring_addr + off)
        off = self.ramdump.field_offset('struct prb_data_ring', 'data')
        data_addr = self.ramdump.read_ulong(text_data_ring_addr + off)

        sv_off = self.ramdump.field_offset('struct prb_desc', 'state_var')
        off = self.ramdump.field_offset('struct prb_desc','text_blk_lpos')
        begin_off = off + self.ramdump.field_offset(
                                        'struct prb_data_blk_lpos', 'begin')
        next_off = off + self.ramdump.field_offset(
                                        'struct prb_data_blk_lpos', 'next')
        ts_off = self.ramdump.field_offset('struct printk_info', 'ts_nsec')
        callerid_off = self.ramdump.field_offset('struct printk_info', 'caller_id')
        len_off = self.ramdump.field_offset('struct printk_info', 'text_len')

        desc_committed = 1
        desc_finalized = 2
        desc_sv_bits = self.ramdump.sizeof('long') * 8
        desc_flags_shift = desc_sv_bits - 2
        desc_flags_mask = 3 << desc_flags_shift
        desc_id_mask = ~desc_flags_mask

        off = self.ramdump.field_offset('struct prb_desc_ring','tail_id')
        tail_id = self.ramdump.read_ulong(desc_ring_addr + off)
        off = self.ramdump.field_offset('struct prb_desc_ring','head_id')
        head_id = self.ramdump.read_ulong(desc_ring_addr + off)

        did = tail_id

        dmesg_list={}
        while True:
            ind = did % desc_ring_count
            desc_off = desc_sz * ind
            info_off = info_sz * ind

            # skip non-committed record
            state = 3 & (self.ramdump.read_ulong(descs_addr + desc_off +
                                            sv_off) >> desc_flags_shift)
            if state != desc_committed and state != desc_finalized:
                if did == head_id:
                    break
                did = (did + 1) & desc_id_mask
                continue

            begin = self.ramdump.read_ulong(descs_addr + desc_off +
                                                begin_off) % text_data_sz
            end = self.ramdump.read_ulong(descs_addr + desc_off +
                                                next_off) % text_data_sz

            if begin & 1 == 1:
                text = ""
            else:
                if begin > end:
                    begin = 0

                text_start = begin + self.ramdump.sizeof('long')
                text_len = self.ramdump.read_u16(infos_addr +
                                                    info_off + len_off)

                if end - text_start < text_len:
                    text_len = end - text_start
                if text_len < 0:
                    text_len = 0

                text = self.ramdump.read_cstring(data_addr +
                                                    text_start, text_len)

            time_stamp = self.ramdump.read_u64(infos_addr +
                                                    info_off + ts_off)
            caller_data = self.ramdump.read_u32(infos_addr +
                                                    info_off + callerid_off)
            tid_info = "T"
            if (caller_data & 0x80000000):
                tid_info = "C"

            caller_id_data = caller_data & ~0x80000000
            pid = caller_id_data
            caller_id_data = tid_info + str(caller_id_data)
            for line in text.splitlines():
                msg = u"[{time:12.6f}][{caller_id_data:>6}] {line}\n".format(
                    time=time_stamp / 1000000000.0,
                    line=line,caller_id_data=caller_id_data)
                #print(msg, write_to_file)
                if write_to_file:
                     self.outfile.write(msg)
                else:
                    dmesg = []
                    dmesg.append(pid)
                    dmesg.append(line)
                    dmesg_list[time_stamp] = dmesg

            if did == head_id:
                break
            did = (did + 1) & desc_id_mask
        return dmesg_list

初始化环形缓冲区参数

        prb_addr = self.ramdump.read_pointer('prb')
        off = self.ramdump.field_offset('struct printk_ringbuffer', 'desc_ring')
        desc_ring_addr = prb_addr + off

        off = self.ramdump.field_offset('struct prb_desc_ring', 'count_bits')
        desc_ring_count = 1 << self.ramdump.read_u32(desc_ring_addr + off)
        desc_sz = self.ramdump.sizeof('struct prb_desc')
        off = self.ramdump.field_offset('struct prb_desc_ring', 'descs')
        descs_addr = self.ramdump.read_ulong(desc_ring_addr + off)

        info_sz = self.ramdump.sizeof('struct printk_info')
        off = self.ramdump.field_offset('struct prb_desc_ring', 'infos')
        infos_addr = self.ramdump.read_ulong(desc_ring_addr + off)

        off = self.ramdump.field_offset(
                                'struct printk_ringbuffer', 'text_data_ring')
        text_data_ring_addr = prb_addr + off

        off = self.ramdump.field_offset('struct prb_data_ring', 'size_bits')
        text_data_sz = 1 << self.ramdump.read_u32(text_data_ring_addr + off)
        off = self.ramdump.field_offset('struct prb_data_ring', 'data')
        data_addr = self.ramdump.read_ulong(text_data_ring_addr + off)

        sv_off = self.ramdump.field_offset('struct prb_desc', 'state_var')
        off = self.ramdump.field_offset('struct prb_desc','text_blk_lpos')
        begin_off = off + self.ramdump.field_offset(
                                        'struct prb_data_blk_lpos', 'begin')
        next_off = off + self.ramdump.field_offset(
                                        'struct prb_data_blk_lpos', 'next')
        ts_off = self.ramdump.field_offset('struct printk_info', 'ts_nsec')
        callerid_off = self.ramdump.field_offset('struct printk_info', 'caller_id')
        len_off = self.ramdump.field_offset('struct printk_info', 'text_len')

        off = self.ramdump.field_offset('struct prb_desc_ring','tail_id')
        tail_id = self.ramdump.read_ulong(desc_ring_addr + off)
        off = self.ramdump.field_offset('struct prb_desc_ring','head_id')
        head_id = self.ramdump.read_ulong(desc_ring_addr + off)

解析 Linux 内核中的printk环形缓冲区(ring buffer)结构,主要用于读取内核日志信息

参数名

类型/计算方式

描述

prb_addr

void*(指针)

struct printk_ringbuffer的基地址,环形缓冲区的起始位置。

desc_ring_addr

prb_addr + offset

desc_ring(描述符环)的地址,属于printk_ringbuffer的成员。

desc_ring_count

1 << count_bits

描述符环的总条目数(通过count_bits计算得出)。

desc_sz

sizeof(prb_desc)

单个描述符(struct prb_desc)的大小。

descs_addr

ulong(指针)

描述符数组的基地址,存储所有prb_desc结构。

infos_addr

ulong(指针)

日志信息数组(struct printk_info)的基地址,存储每条日志的元数据。

info_sz

sizeof(printk_info)

单个printk_info结构的大小。

text_data_ring_addr

prb_addr + offset

数据环(text_data_ring)的地址,存储实际日志文本数据。

text_data_sz

1 << size_bits

数据环的总大小(通过size_bits计算得出)。

data_addr

ulong(指针)

数据环中存储文本数据的基地址。

sv_off

state_var

描述符的状态变量偏移量,用于同步。

begin_off

text_blk_lpos.begin

文本块起始位置在数据环中的偏移量。

next_off

text_blk_lpos.next

文本块下一个位置的偏移量,用于链式管理。

ts_off

ts_nsec

时间戳(纳秒)的偏移量。

callerid_off

caller_id

调用者 ID(如 CPU/线程)的偏移量。

len_off

text_len

日志文本长度的偏移量。

tail_id

ulong (读取值)

描述符环的 tail_id,表示下一个可分配的描述符位置

head_id

ulong (读取值)

描述符环的 head_id,表示最早未提交的描述符位置

日志提取流程

环形缓冲区遍历

did = tail_id
while True:
    ind = did % desc_ring_count  # 计算当前描述符索引
    # ... 解析描述符状态和数据 ...
    if did == head_id:
        break
    did = (did + 1) & desc_id_mask  # 移动到下一个描述符(处理溢出)
  • 环形索引:通过取模运算(% desc_ring_count)实现环形遍历。

  • 终止条件:当did追上head_id时结束。

文本数据读取

begin = read_ulong(begin_off) % text_data_sz
end = read_ulong(next_off) % text_data_sz
if begin & 1 == 1:
    text = ""  # 无效数据
else:
    if begin > end:
        begin = 0  # 处理跨环情况
    text = read_cstring(data_addr + begin, text_len)
  • 无效数据标记begin的最低位为1时表示数据无效。

  • 跨环处理:若begin > end,说明文本跨环,需重置begin

调用者ID解析

if (caller_data & 0x80000000):
    tid_info = "C"  # 线程ID
else:
    tid_info = "T"  # 任务ID
pid = caller_data & ~0x80000000
  • 最高位(0x80000000)标识调用者类型(线程或任务)。

  • 其余位存储实际的PID/线程ID。

输出格式

  1. 文件写入模式

    每行日志格式化为:

    [时间戳秒数][调用者ID] 日志文本

    例如:

    [   12.345678][C1234] Kernel panic: Unable to handle NULL pointer dereference
  2. 字典返回模式

    {
        1234567890000: [1234, "Kernel panic: ..."],
        ...
    }
    • Key:时间戳(纳秒)。

    • Value:列表,包含[PID, 日志文本]

总结

  1. 定位环形缓冲区

    • 从内核内存中读取printk_ringbufferprb)结构的基地址。

    • 解析其内部的描述符环(Desc Ring)数据环(Text Data Ring)

  2. 初始化关键参数

    • 计算描述符数量、数据区大小、字段偏移量等。

    • 获取head_idtail_id以确定有效日志范围。

  3. 遍历描述符

    • tail_id开始,向head_id方向环形遍历。

    • 跳过未提交(非committed/finalized)的描述符。

  4. 提取日志数据

    • 根据描述符中的text_blk_lpos定位文本在数据环中的位置。

    • 处理跨环和无效数据情况,读取原始日志文本。

  5. 解析元数据

    • 提取时间戳、调用者ID(PID/线程)、文本长度等信息。

  6. 格式化输出

    • 按需写入文件或返回结构化数据(时间戳 → PID + 日志)。

原理

解析dmesg的原理就在于这个printk_ringbuffer

TODO:待整理printk_ringbuffer的文章,后续整理后会将链接更新到此处!