AI智能摘要
通过logcat_v3和logcat_vma两套解析方案,工具能够应对大多数Android设备的内存布局变化,成功从ramdump中还原出logcat日志。整个过程涉及进程内存读取、数据结构逆向、多线程解析、日志去重等技术点,充分体现了ramdump parser工具的灵活性和强大功能。 本文重点剖析了logcat_v3的实现细节,从定位log buffer,到解析日志块,再到合并内核日志,每一步都力求清晰。希望读者能从中了解到Android logcat在内存中的存储方式,以及离线解析工具背后的工作原理。
此摘要由AI分析文章内容生成,仅供参考。

使用Linux ramdump parser工具解析后,我们通常是可以得到解析出来的logcat日志的,那工具是如何从memory dump中解析出logcat的呢?

插件入口

插件是通过register_parser注册的,然后自动执行插件的parse函数。所以logcat也不例外。

parsers/logcat.py

@register_parser('--logcat', 'Extract logcat logs from ramdump ')
class Logcat(RamParser):
    LOGCAT_BIN = "logcat.bin"
    def __init__(self, *args):
        super(Logcat, self).__init__(*args)
        self.f_path_offset = self.ramdump.field_offset('struct file', 'f_path')
        self.dentry_offset = self.ramdump.field_offset('struct path', 'dentry')
        self.d_iname_offset = self.ramdump.field_offset('struct dentry', 'd_iname')
        self.limit_size = int("0x20000000", 16)
        self.vma_list = []

通过register_parser注册插件,插件参数为:--logcat,默认解析。

注册好后,自动调用parse函数执行解析。

    def parse(self):
        if self.ramdump.logcat_limit_time == 0:
            self.__parse()
        else:
            from func_timeout import func_timeout
            print_out_str("Limit logcat parser running time to {}s".format(self.ramdump.logcat_limit_time))
            func_timeout(self.ramdump.logcat_limit_time, self.__parse)

这里有一ramdump.logcat_limit_time的参数,这个参数是从执行参数传入的,默认一般不设置,所以为0

    parser.add_option('', '--logcat_limit_time_sec',
                      dest='logcat_limit_time', type='int', default=0,
                      help='Defined the max time logcat parse running')

然后调用__parse()函数

解析函数__parse

    def __parse(self):
        try:
            try:
                taskinfo = UTaskLib(self.ramdump).get_utask_info("logd")   # 调用UtaskLib的接口获取logd的taskinfo
            except ProcessNotFoundExcetion:
                print_out_str("logd process is not started")
                return
            propertyParser = Properties(self.ramdump)   # 调用Properties获取property的解析器
            ver = -1
            try:
                # generate system/vendor properties to Properties.txt
                propertyParser.parse()                # 执行property解析
                for name, value in propertyParser.proplist:
                    if name == "ro.build.version.sdk" or name == "ro.vndk.version":  # 获取两个属性值
                        ver = int(value)
            except:
                ver = -1

            print_out_str("Current sdk version is "+ str(ver))   # 获取sdk版本号
            if ver >= 31: # Android S      # 我们现在的版本一般都>31,故分析这边
                from parsers.logcat_v3 import Logcat_v3   # 调用logcat_v3脚本继续执行
                logcat = Logcat_v3(self.ramdump, taskinfo)   # 初始化logcat_v3解析器
                try:
                    is_success = logcat.parse()           # 解析logcat
                except Exception as e:
                    is_success = False
                    print_out_str("logcat_v3 parser failed " + str(e))
                    traceback.print_exc()
                if is_success:
                    print_out_str("logcat_v3 parse logcat success")
                    return
                try:
                    from parsers.logcat_v3 import Logcat_vma
                    logcat = Logcat_vma(self.ramdump, taskinfo)
                    is_success = logcat.parse()
                except Exception as e:
                    is_success = False
                    print_out_str("logcat_vma parser failed" + str(e))
                    traceback.print_exc()
                if is_success:
                    print_out_str("logcat_vma parse logcat success")
                else:
                    # generate logcat.bin when both logcat_v3 and logcat_vma parse failed
                    self.generate_logcat_bin(taskinfo)

            elif self.is_LE_process(taskinfo):
                print_out_str("LE ramdump")
                from parsers.logcat_m import Logcat_m
                #parser to supprot Android M
                logcat = Logcat_m(self.ramdump, taskinfo)
                logcat.parse()
            elif self.is_openwrt_process(taskinfo):
                print_out_str("Openwrt ramdump")
                from parsers.logcat_openwrt import Logcat_openwrt
                #parser to supprot openwrt platform
                logcat = Logcat_openwrt(self.ramdump, taskinfo)
                logcat.parse()
            else:
                self.generate_logcat_bin(taskinfo)
        except Exception as result:
            print_out_str(str(result))
            traceback.print_exc()

这个函数虽然不长,但是调用各种函数还是比较复杂的,故本小节分子章节分别叙述。

获取logd的taskinfo

taskinfo = UTaskLib(self.ramdump).get_utask_info("logd")

TODO:这里插个眼,LRDP2下一篇文章会去了解UtaskLib的实现,本章这里先默认调这个接口就能够获得logd的taskinfo

获取Properties

            propertyParser = Properties(self.ramdump)
            ver = -1
            try:
                # generate system/vendor properties to Properties.txt
                propertyParser.parse()
                for name, value in propertyParser.proplist:
                    if name == "ro.build.version.sdk" or name == "ro.vndk.version":
                        ver = int(value)
            except:
                ver = -1

TODO:这里插个眼,LRDP2接下去的文章会去了解Properties的实现,本章这里先默认调这个接口就能够获得相关的prop属性。

logcat_v3解析

            if ver >= 31: # Android S
                from parsers.logcat_v3 import Logcat_v3
                logcat = Logcat_v3(self.ramdump, taskinfo)
                try:
                    is_success = logcat.parse()
                except Exception as e:
                    is_success = False
                    print_out_str("logcat_v3 parser failed " + str(e))
                    traceback.print_exc()
                if is_success:
                    print_out_str("logcat_v3 parse logcat success")
                    return
                try:
                    from parsers.logcat_v3 import Logcat_vma
                    logcat = Logcat_vma(self.ramdump, taskinfo)
                    is_success = logcat.parse()
                except Exception as e:
                    is_success = False
                    print_out_str("logcat_vma parser failed" + str(e))
                    traceback.print_exc()
                if is_success:
                    print_out_str("logcat_vma parse logcat success")
                else:
                    # generate logcat.bin when both logcat_v3 and logcat_vma parse failed
                    self.generate_logcat_bin(taskinfo)

logcat_v3初始化

class Logcat_v3(Logcat_base):
    def __init__(self, ramdump, taskinfo):
        super().__init__(ramdump, taskinfo)

logcat_v3解析

    def parse(self):
        self.read_dmesg()   # 读取dmesg,通过接口extract_lockless_dmesg获取,之前一篇讲过
        self.wall_to_mono_found, self.wall_to_monotonic_tv_sec, self.wall_to_monotonic_tv_nsec = self.findCorrection()
        logbuf_addrs = self.get_logbuffer_addr()
        for __logbuf_addr in logbuf_addrs:
            logchunk_list_addr = __logbuf_addr + 0x60
            try:
                self.process_chunklist_and_save(logchunk_list_addr)
            except Exception as e:
                print(str(e))
                traceback.print_exc()
            if self.is_success:
                print_out_str("logbuf_addr = 0x%x" %(__logbuf_addr))
                break

        return self.is_success
findCorrection

findCorrection: 从内存中提取日志时间校正值,这段计算校正值的原理,个人确实一头雾水

    def findCorrection(self):
        sec = 0
        nsec = 0
        found = False
        correction_addr = 0
        bss_addrs = self.find_bss_addrs()
        for bss_start, bss_end in bss_addrs:
            idx = 0
            bss_size = bss_end - bss_start
            while idx < bss_size:
                if self.is_equal(bss_start + idx, 8, 3) and \
                        self.is_equal(bss_start + idx + 8*2, 8, 7) and \
                            self.is_equal(bss_start + idx + 8*4, 8, 5) and \
                                self.is_equal(bss_start + idx + 8*6, 8, 4):
                    correction_addr = bss_start + idx - 40
                    sec = self.read_bytes(correction_addr, 4)
                    nsec = self.read_bytes(correction_addr + 4, 4)
                    found = True
                    break
                idx += 8

            if found:
                break

        if found:
            print_out_str(("Found &LogBuffer::Correction=0x%x LogBuffer::Correction=%ld.%ld")
                          % (correction_addr, sec, nsec))
        else:
            print_out_str("&LogBuffer::Correction not found")
        return found, sec, nsec

从代码逻辑来看,是从bss段里3 7 5 4 的这个段,找到后这个地址往前偏移40就可以得到correction_addr。

作者对这块不是很了解,linux kernel代码里在何处写入这个修正值,如果有知道的可以评论交流!

get_logbuffer_addr
    def get_logbuffer_addr(self):
        stack_offset = self.ramdump.field_offset('struct task_struct', 'stack') # 获取task_struct的成员stack的偏移
        stack_addr = self.ramdump.read_word(self.logd_task + stack_offset) # 得到logd的stack_addr
        pt_regs_size = self.ramdump.sizeof('struct pt_regs')  # 获取pt_regs的size
        pt_regs_addr = self.ramdump.thread_size + stack_addr - pt_regs_size # logd的stack_addr + 8192 - pt_regs的size
        user_regs_addr = pt_regs_addr + self.ramdump.field_offset('struct pt_regs', 'user_regs')
        #find x22 register value
        x22_r_addr = self.ramdump.array_index(user_regs_addr, 'unsigned long', 22)
        x22_value    = self.ramdump.read_word(x22_r_addr)
        x22_logbuf_addr = self.read_bytes(x22_value + 0x88, self.addr_length)

        logbuf_addrs = []
        if x22_logbuf_addr and x22_logbuf_addr != 0: # for logd orginal code
            logbuf_addrs.append(x22_logbuf_addr)
            print_out_str("logbuf_addr from x22 = 0x%x" %(x22_logbuf_addr))

        x21_r_addr = self.ramdump.array_index(user_regs_addr, 'unsigned long', 21)
        x21_value    = self.ramdump.read_word(x21_r_addr)
        x21_logbuf_addr = self.read_bytes(x21_value + 0x88, self.addr_length)
        if x21_logbuf_addr and x21_logbuf_addr != 0:
            logbuf_addrs.append(x21_logbuf_addr)
            print_out_str("logbuf_addr from x21 = 0x%x" %(x21_logbuf_addr))

        x23_r_addr = self.ramdump.array_index(user_regs_addr, 'unsigned long', 23)
        x23_value    = self.ramdump.read_word(x23_r_addr)
        x23_logbuf_addr = self.read_bytes(x23_value + 0x88, self.addr_length)
        if x23_logbuf_addr or x23_logbuf_addr == 0:
            logbuf_addrs.append(x23_logbuf_addr)
            print_out_str("logbuf_addr from x23 = 0x%x" %(x23_logbuf_addr))
        return logbuf_addrs

下面是几个需要注意的知识点:

pt_regs的地址

从代码来看pt_regs的地址计算公式为:stack_addr+thread_size-pt_regs_size

当进程通过系统调用或异常陷入内核时,CPU 会自动将寄存器压入内核栈顶,形成 pt_regs。内核栈从高地址向低地址增长,pt_regs 保存在栈顶

看上面这张图会更清晰一点!

接下来,让我们继续深入探索logcat_v3解析器的核心——process_chunklist_and_save函数,看看它究竟是如何从logd的内存中提取出一条条日志的。

process_chunklist_and_save——遍历日志缓冲区

当拿到logchunk_list_addr(即std::list<SerializedLogChunk>的地址)后,解析工作正式进入数据读取阶段。这个函数的目标是遍历每个日志ID(main、radio、events...),遍历每个日志块(SerializedLogChunk),读取原始数据,并最终解析成可读的LogEntry

python

复制

下载

def process_chunklist_and_save(self, logchunk_list_addr):
    log_id = 0
    threads = []
    with futures.ThreadPoolExecutor(8) as executor:
        while log_id <= self.LOG_ID_MAX:
            is_binary = (log_id == self.LOG_ID_EVENTS) or (log_id == self.LOG_ID_STATS) or (log_id == self.LOG_ID_SECURITY)
            # 计算当前log_id对应的list头地址(std::list<SerializedLogChunk>)
            _addr = logchunk_list_addr + log_id * 0x18
            first_node_addr = self.read_bytes(_addr + self.addr_length, self.addr_length)
            list_count = self.read_bytes(_addr + self.addr_length *2, self.addr_length)
            if not list_count or list_count <= 0:
                log_id += 1
                continue

            # 读取缓冲区最大大小(用于后续判断是否超大)
            tail_node_addr = self.read_bytes(_addr, self.addr_length)
            current_node = tail_node_addr + self.addr_length * 2
            _data_size = self.read_bytes(current_node + self.addr_length, self.addr_length)
            self.maxSize[log_id] = _data_size * 4   # 每个chunk按页对齐?这里可能是估算总大小
            print_out_str(f"Log_id:{log_id} buffer size set to {self.maxSize[log_id]/1024:.1f} KB")

            huge_buffersize = False
            if self.maxSize[log_id] > self.LIMIT_LOGD_BUFFER_SZIE:
                huge_buffersize = True
                print_out_str(f"      !!! WARN: Log_id:{log_id} buffer size over than "
                              f"{self.LIMIT_LOGD_BUFFER_SZIE/1024:.0f} kb, will only parser last 2 chunks")

            next_node_addr = first_node_addr
            self.sizeUsed[log_id] = 0
            section = 0
            while (section < list_count):
                # 如果缓冲区超大,只解析最后两个chunk
                if huge_buffersize:
                    if section < list_count - 2:
                        section = section + 1
                        next_node_addr = self.read_bytes(next_node_addr + self.addr_length, self.addr_length)
                        continue

                current_node = next_node_addr + self.addr_length * 2  # 跳过list节点内的prev/next,指向SerializedLogChunk
                write_offset = self.read_bytes(current_node + 0x10, 4)   # write_offset_,已写入的数据长度
                write_active = self.read_bytes(current_node + 0x18, 1)   # write_active_,0表示已压缩,1表示未压缩
                _data = None
                if write_active == 0:  # 压缩块
                    if self.zstd:
                        compressed_log_addr = current_node + 0x28
                        _data_addr = self.read_bytes(compressed_log_addr, self.addr_length)
                        _data_size = self.read_bytes(compressed_log_addr + self.addr_length, self.addr_length)
                        _data = self.read_binary(_data_addr, _data_size)
                        self.sizeUsed[log_id] = self.sizeUsed[log_id] + _data_size
                else:  # 未压缩块
                    _data_addr = self.read_bytes(current_node, self.addr_length)
                    _data_size = self.read_bytes(current_node + self.addr_length, self.addr_length)
                    self.sizeUsed[log_id] = self.sizeUsed[log_id] + write_offset
                    _data = self.read_binary(_data_addr, write_offset)

                if _data:
                    # 提交到线程池解析
                    future = executor.submit(self.process_work_chunk, _data, log_id, section, is_binary, write_active)
                    threads.append(future)

                section = section + 1
                next_node_addr = self.read_bytes(next_node_addr + self.addr_length, self.addr_length)

            log_id = log_id + 1

    # 收集所有线程的解析结果
    loglist = {}
    for future in futures.as_completed(threads):
        log_id, section, ret = future.result()
        if not ret:
            continue
        if log_id in loglist:
            sections = loglist[log_id]
        else:
            sections = {}
            loglist[log_id] = sections
        sections[section] = ret

    self.save_log_to_file(loglist)

这段代码逻辑清晰,但有几个关键点值得注意:

  • std::list<SerializedLogChunk>的内存布局:在64位系统中,std::list的每个节点通常包含prevnext指针(各8字节)以及节点数据(即SerializedLogChunk对象)。因此,给定list头地址,通过head+8得到第一个节点地址,节点地址+16得到SerializedLogChunk对象的起始地址。这正是代码中next_node_addr + self.addr_length*2的由来。

  • SerializedLogChunk的结构:从代码中读取的字段可以推测出其大致布局:

    • offset 0x00: data指针(8字节)

    • offset 0x08: data_size(8字节)

    • offset 0x10: write_offset_(4字节)

    • offset 0x18: write_active_(1字节)

    • offset 0x28: compressed_log指针(8字节)和compressed_log_size(8字节)
      这基本符合Android源码中SerializedLogChunk的定义。

  • 多线程加速:使用8个线程的线程池并行解析每个chunk,大大缩短解析时间。

  • 超大缓冲区处理:如果某个日志缓冲区大小超过40MB,则只解析最后两个chunk。这是为了避免处理过旧的数据,因为通常我们只关心最近的日志。

解析工作线程——process_work_chunk

每个线程调用process_work_chunk来处理一个chunk的数据。它根据是否压缩、是否是二进制日志,调用相应的解析函数。

def process_work_chunk(self, _data, log_id, section, is_binary, write_active):
    if write_active == 0:  # 需要解压
        if not self.zstd:
            return log_id, section, None
        try:
            _data = self.zstd.ZstdDecompressor().decompress(_data)
        except:
            print_out_str("decompress caused error on logid:section(%d:%d), size(%d)" %(log_id, section, len(_data)))
            _data = None

    ret = None
    if _data:
        try:
            if is_binary:
                ret = self.process_binary_log_and_save(_data)
            else:
                ret = self.process_log_and_save(_data, log_id)
        except:
            traceback.print_exc()
    return log_id, section, ret
普通日志解析——process_log_and_save

普通日志(main、radio、system等)的格式相对简单:每个日志条目由一个固定长度的头部和变长的消息体组成。

def process_log_and_save(self, _data, log_id):
    ret = []
    pos = 0
    while pos < len(_data):
        if pos + self.SIZEOF_LOG_ENTRY > len(_data):
            break
        # 解包头部:uid(4), pid(4), tid(4), sequence(8), tv_sec(4), tv_nsec(4), msg_len(2), priority(1)
        logEntry = struct.unpack('<IIIQIIHB', _data[pos:pos+self.SIZEOF_LOG_ENTRY+1])
        pos = pos + self.SIZEOF_LOG_ENTRY + 1 + self.extra_offset
        uid, pid, tid, sequence, tv_sec, tv_nsec, msg_len, priority = logEntry
        if msg_len is None or msg_len < 1:
            break
        msg = _data[pos:pos+msg_len-1]  # 消息末尾可能有一个结束符,减去1
        msgList = msg.decode('ascii', 'ignore').split('\0')
        pos = pos + msg_len - 1
        if len(msgList) < 2:
            continue

        try:
            if log_id == self.LOG_ID_KERNEL:
                entry = LogEntry_Dmesg()
                entry.mono_format = self.wall_to_mono_found
                entry.set_rtc_time(tv_sec, tv_nsec, self.wall_to_monotonic_tv_sec, self.wall_to_monotonic_tv_nsec)
            else:
                entry = LogEntry()
                entry.tv_sec = tv_sec
                entry.tv_nsec = tv_nsec

            entry.pid = pid
            entry.uid = uid
            entry.tid = tid
            entry.prior = priority
            entry.tag = cleanupString(msgList[0].strip())
            entry.set_msg(msgList[1])
            entry.tz_minuteswest = self.tz_minuteswest
            ret.append(entry)
        except Exception:
            traceback.print_exc()
    return ret

日志头部格式与Android的log_entry结构一致。注意消息体是以tag\0message形式存储的,所以用split('\0')分割。

二进制日志解析——process_binary_log_and_save

二进制日志(events、stats、security)的格式复杂一些,因为它需要解析嵌套的事件数据。

def process_binary_log_and_save(self, _data):
    ret = []
    pos = 0
    while pos < len(_data):
        # 头部同样有uid,pid,tid等,但无priority字段
        logEntry = struct.unpack('<IIIQIIH', _data[pos : pos + self.SIZEOF_LOG_ENTRY])
        pos = pos + self.SIZEOF_LOG_ENTRY + self.extra_offset
        uid, pid, tid, sequence, tv_sec, tv_nsec, msg_len = logEntry
        # 接着是tag索引(4字节)
        tagidx = struct.unpack('<I', _data[pos : pos + self.SIZEOF_HEADER_T])[0]
        pos = pos + self.SIZEOF_HEADER_T
        # 解析事件数据
        evt_type, tmpmsg, length = self.get_evt_data(_data, pos)
        pos = pos + length
        if evt_type == -1:
            break
        if evt_type != self.EVENT_TYPE_LIST:
            # 简单类型(int/long/string/float)直接构造条目
            entry = LogEntry()
            entry.is_binary = True
            entry.tv_sec = tv_sec
            entry.tv_nsec = tv_nsec
            entry.pid = pid
            entry.uid = uid
            entry.tid = tid
            entry.prior = self.ANDROID_LOG_INFO
            entry.tag = str(tagidx)
            entry.set_msg(tmpmsg)
            entry.tz_minuteswest = self.tz_minuteswest
            ret.append(entry)
            continue
        # 如果是列表类型,则需要递归解析列表中的每个元素
        list_t = struct.unpack('<BB', _data[pos : pos + self.SIZEOF_EVT_LIST_T])
        pos = pos + self.SIZEOF_EVT_LIST_T
        evt_type = list_t[0]
        evt_cnt = list_t[1]
        i = 0
        msg = ""
        while i < evt_cnt:
            evt_type, tmpmsg, length = self.get_evt_data(_data, pos)
            if evt_type == -1:
                break
            pos = pos + length
            msg = msg + tmpmsg
            if i < evt_cnt -1:
                msg = msg + ","
            i = i+1

        entry = LogEntry()
        entry.is_binary = True
        entry.tv_sec = tv_sec
        entry.tv_nsec = tv_nsec
        entry.pid = pid
        entry.uid = uid
        entry.tid = tid
        entry.prior = self.ANDROID_LOG_INFO
        entry.tag = str(tagidx)
        entry.set_msg("[" + msg + "]")
        entry.tz_minuteswest = self.tz_minuteswest
        ret.append(entry)
    return ret

get_evt_data函数根据事件类型解析出对应的值,例如int类型占5字节(1字节类型+4字节值),string类型前5字节记录长度,后面跟着字符串数据。这些细节与Android的log_event定义完全吻合。

保存文件与去重——save_log_to_file

所有chunk解析完成后,得到的是一个按log_id和section组织的LogEntry列表。save_log_to_file负责将这些条目写入对应的文件。

def save_log_to_file(self, loglist):
    if not loglist or len(loglist) == 0:
        return
    if not self.is_success:
        self.is_success = True

    for log_id in loglist.keys():
        if log_id == self.LOG_ID_KERNEL and self.wall_to_mono_found:
            continue  # 内核日志将特殊处理
        sections  = loglist[log_id]
        if not sections:
            continue
        filename = self.get_output_filename(log_id)
        log_file = self.ramdump.open_file(filename)
        write_head = False
        for section in sorted(sections.keys()):
            if sections[section] and len(sections[section]) >= 0:
                if not write_head:
                    head = "{} log buffer used: {}k   Max size:{}k\n".format(
                        self.LOG_NAME[log_id],
                        round(self.sizeUsed[log_id]/1024,1),
                        round(self.maxSize[log_id]/1024,1))
                    log_file.write(head)
                    write_head = True
                head="--------- beginning of {} section: {}\n".format(
                    self.LOG_NAME[log_id], str(section))
                log_file.write(head)
                for item in sections[section]:
                    log_file.write(str(item))

    # 如果找到了时间校正值,则需要合并内核日志与dmesg
    if not self.wall_to_mono_found:
        return
    dmesgDict = []
    if self.LOG_ID_KERNEL in loglist.keys():
        sections = loglist[self.LOG_ID_KERNEL]
        if sections:
            for section in sorted(sections.keys()):
                dmesgDict.extend(sections[section])

    filename = self.get_output_filename(self.LOG_ID_KERNEL)
    log_file = self.ramdump.open_file(filename)
    if len(dmesgDict) > 0:
        head = "{} log buffer used: {}k   Max size:{}k\n".format(
            self.LOG_NAME[self.LOG_ID_KERNEL],
            round(self.sizeUsed[self.LOG_ID_KERNEL]/1024,1),
            round(self.maxSize[self.LOG_ID_KERNEL]/1024,1))
        log_file.write(head)

    if len(self.dmesg_list) <= 0:
        for item in dmesgDict:
            log_file.write(str(item))
    else:
        self.combine_dmesg(dmesgDict, log_file)

对于普通日志,直接按section顺序写入即可。对于内核日志,由于logd中也可能记录内核日志(通过LOG_ID_KERNEL),而我们在read_dmesg中已经提取了dmesg环形缓冲区的日志,两者可能有重叠。因此需要调用combine_dmesg进行智能合并。

内核日志合并——combine_dmesg

合并的核心思想是:将logd中的内核日志与dmesg中的日志按单调时间排序,如果时间接近(1ms内)且内容相似(一个字符串包含另一个),则认为是同一条日志,只保留dmesg中的版本(因为dmesg更原始,格式更统一)。最终按时间顺序输出所有条目。

def combine_dmesg(self, dmesgDict, log_file):
    same_log_count = 0
    log_added_count = 0

    keys = sorted(self.dmesg_list)  # dmesg中的单调时间列表
    dmesg_time_start = keys[0]

    index = 0
    for item in dmesgDict:
        if item.mono_time() < dmesg_time_start:
            log_file.write(str(item))
        else:
            should_delete = []
            while index < len(keys):
                mono_time = keys[index]
                s_pid = self.dmesg_list[mono_time][0]
                s_line = self.dmesg_list[mono_time][1]
                entry = LogEntry_Dmesg()
                entry.mono_format = self.wall_to_mono_found
                entry.set_mono_time(mono_time, self.wall_to_monotonic_tv_sec, self.wall_to_monotonic_tv_nsec)
                entry.pid = s_pid
                entry.uid = 0
                entry.tid = s_pid
                entry.set_msg(cleanupString(s_line))
                entry.tz_minuteswest = self.tz_minuteswest
                cmpval = item.__cmp__(entry)  # 自定义比较:时间和内容
                if cmpval > 0:  # logd条目时间晚于dmesg条目,先输出dmesg
                    log_added_count += 1
                    index += 1
                    log_file.write(str(entry))
                    should_delete.append(mono_time)
                    continue
                elif cmpval == 0:  # 完全相同,跳过logd条目
                    index += 1
                    same_log_count += 1
                    should_delete.append(mono_time)
                    continue
                else:  # logd条目时间早于dmesg条目,输出logd条目
                    log_file.write(str(item))
                    break
            for time in should_delete:
                del self.dmesg_list[time]

    # 输出剩余的dmesg日志
    for mono_time in self.dmesg_list:
        s_pid = self.dmesg_list[mono_time][0]
        s_line = self.dmesg_list[mono_time][1]
        entry = LogEntry_Dmesg()
        entry.mono_format = self.wall_to_mono_found
        entry.set_mono_time(mono_time, self.wall_to_monotonic_tv_sec, self.wall_to_monotonic_tv_nsec)
        entry.pid = s_pid
        entry.uid = 0
        entry.tid = s_pid
        entry.set_msg(cleanupString(s_line))
        entry.tz_minuteswest = self.tz_minuteswest
        log_added_count += 1
        log_file.write(str(entry))
    print_out_str("Total dmesg log count %d, same count %d, added count %d" %
                  (len(self.dmesg_list), same_log_count, log_added_count))

这种去重机制确保了最终的内核日志文件既包含了dmesg的完整内容,又补充了logd中可能独有的内核日志(例如通过logd记录的一些用户空间相关日志),同时避免了重复。


如果logcat_v3失败:回退到Logcat_vma

Android不同版本、不同厂商的内核可能修改了logd的内存布局,导致通过寄存器定位log buffer的方法失效。此时解析器会尝试另一种方法——Logcat_vma,它通过扫描logd进程的所有可读写虚拟内存区域(VMA),寻找符合std::list<SerializedLogChunk>特征的数据结构。

Logcat_vma继承自Logcat_base,核心区别在于内存读取方式和定位log buffer的方法。

获取VMA数据

def get_vmas_with_rw(self):
    for vma in self.taskinfo.vmalist:
        if vma.flags & 0b11 != 0b11:
            continue
        item = {}
        item["vmstart"] = vma.vm_start
        item["size"] = vma.vm_end - vma.vm_start
        item["data"] = super().read_binary(item["vmstart"], item["size"])
        self.vmas.append(item)

它将logd进程中所有可读写(rw)的VMA的完整内容读入内存,然后后续的read_bytesread_binary都从这些内存块中查找,避免反复读取ramdump。

定位list头

def find_log_chunklist_addr(self, vma):
    vma_size = vma["size"]
    vma_data = vma["data"]
    offset = 0
    while offset < vma_size:
        if self.is_log_chunklist_addr(vma_data, offset):
            log_id = 1
            while log_id <= self.LOG_ID_MAX:
                if not self.is_log_chunklist_addr(vma_data, offset+0x18*log_id):
                    return 0
                log_id = log_id + 1
            break
        offset = offset + 4
    return offset if offset < vma_size else 0

验证chunk有效性

def is_log_chunk_addr(self, addr):
    data_addr = self.read_bytes(addr, self.addr_length)
    write_offset = self.read_bytes(addr + 0x10, 4)
    write_active = self.read_bytes(addr + 0x18, 1)
    data_size = self.read_bytes(addr + self.addr_length, self.addr_length)
    compress_data_addr = self.read_bytes(addr + 0x28, self.addr_length)
    compress_data_size = self.read_bytes(addr + 0x28 + self.addr_length, self.addr_length)
    if (write_active == 1 and data_addr != 0 and data_size !=0 and
        write_offset !=0 and write_offset < data_size):
        return True
    elif (write_active == 0 and compress_data_addr != 0 and compress_data_size !=0 and
          write_offset !=0 and compress_data_size < write_offset):
        return True
    return False

通过对chunk内关键字段的检查,判断是否是一个合法的日志块。一旦找到正确的log buffer基址,后续的解析流程与Logcat_v3完全一样。

最后的防线:generate_logcat_bin

如果上述两种方法都失败,说明我们无法从内存中结构化解析logcat日志。这时,工具会退而求其次,直接将logd进程的整个可读内存区域dump下来,保存为一个名为logcat.bin的二进制文件,供开发者日后使用更底层的手段分析。

def generate_logcat_bin(self, taskinfo):
    filename = "{}-{}.bin".format(self.LOGCAT_BIN, datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
    out_file = self.ramdump.open_file(filename)
    # 遍历logd进程的所有vma,将可读的内存区域写入文件
    for vma in taskinfo.vmalist:
        if vma.vm_start < vma.vm_end and vma.flags & 0b100:  # readable
            data = self.ramdump.read_cstring(vma.vm_start, vma.vm_end - vma.vm_start)
            if data:
                out_file.write(data)
    print_out_str("Generated raw logcat binary: {}".format(filename))

这个文件虽然不能直接阅读,但可以用十六进制编辑器或strings命令查看,也许能从中提取出有用的信息。

总结

通过logcat_v3logcat_vma两套解析方案,工具能够应对大多数Android设备的内存布局变化,成功从ramdump中还原出logcat日志。整个过程涉及进程内存读取、数据结构逆向、多线程解析、日志去重等技术点,充分体现了ramdump parser工具的灵活性和强大功能。

本文重点剖析了logcat_v3的实现细节,从定位log buffer,到解析日志块,再到合并内核日志,每一步都力求清晰。希望读者能从中了解到Android logcat在内存中的存储方式,以及离线解析工具背后的工作原理。