syzkaller: syz-manager源码分析
196082 慢慢好起来

前言

这篇文章的前言主要提一下前段时间在强网拟态中的pwn,首先是内核pwn。

其实很简单就是一个简单版的off by null。但是我犯蠢了,在堆喷pipe_buffer时计算错误导致堆喷失败以至于在开始放弃了使用此方法,转而使用msg_msg形成双向链表造成UAF,可惜的是此方法因为off by null的限制使得其申请的堆块应该是从kmalloc-192或以下申请,所以很多可以堆喷来写入的结构体无法使用,如果继续使用msg_msgseg结构体来实现任意写也会因为其存在一个next指针导致无法free。最后这道题依旧是通过构造多级管道解出。

这里重点提一下那一道堆题(做完内核凌晨四点了,这道题没做)。简单描述一下漏洞,首先其edit函数中存在off by null,然后还可以在create函数中一直申请堆块,不过这里限制大小为 0~0x78 ,所有的堆块范围都在fastbin内。然后估摸着这题的利用方法应该和top chunk有关,但是仔细看了一下house of force发现条件并不满足,在比赛快结束时看了一下源码发现了以往不知道的机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
use_top:

victim = av->top;
size = chunksize (victim);

if (__glibc_unlikely (size > av->system_mem))
malloc_printerr ("malloc(): corrupted top size");

if ((unsigned long) (size) >= (unsigned long) (nb + MINSIZE))
{
remainder_size = size - nb;
remainder = chunk_at_offset (victim, nb);
av->top = remainder;
set_head (victim, nb | PREV_INUSE |
(av != &main_arena ? NON_MAIN_ARENA : 0));
set_head (remainder, remainder_size | PREV_INUSE);

check_malloced_chunk (av, victim, nb);
void *p = chunk2mem (victim);
alloc_perturb (p, bytes);
return p;
}

/* When we are using atomic ops to free fast chunks we can get
here for all block sizes. */
else if (atomic_load_relaxed (&av->have_fastchunks))
{
malloc_consolidate (av);
/* restore original bin index */
if (in_smallbin_range (nb))
idx = smallbin_index (nb);
else
idx = largebin_index (nb);
}

/*
Otherwise, relay to handle system-dependent cases
*/
else
{
void *p = sysmalloc (nb, av);
if (p != NULL)
alloc_perturb (p, bytes);
return p;
}
}

上述代码是_int_malloc函数中的片段,这个片段是使用top chunk进行分配时的片段,首先是申请的大小加上16要小于top chunk的大小时进入到第一个分支中,这个分支所做的事情的对top chunk进行切割。

然后直接看最末的这一个分支,这个分支是前两个都不满足时会进入也就是无法从现有的av中得到空间了便开始使用sysmalloc进行分配。

中间的分支首先会检查fastbin中是否存在堆块随后进入到malloc_consolidate函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
static void malloc_consolidate(mstate av)
{
mfastbinptr* fb; /* current fastbin being consolidated */
mfastbinptr* maxfb; /* last fastbin (for loop control) */
mchunkptr p; /* current chunk being consolidated */
mchunkptr nextp; /* next chunk to consolidate */
mchunkptr unsorted_bin; /* bin header */
mchunkptr first_unsorted; /* chunk to link to */

/* These have same use as in free() */
mchunkptr nextchunk;
INTERNAL_SIZE_T size;
INTERNAL_SIZE_T nextsize;
INTERNAL_SIZE_T prevsize;
int nextinuse;

atomic_store_relaxed (&av->have_fastchunks, false);

unsorted_bin = unsorted_chunks(av);

/*
Remove each chunk from fast bin and consolidate it, placing it
then in unsorted bin. Among other reasons for doing this,
placing in unsorted bin avoids needing to calculate actual bins
until malloc is sure that chunks aren't immediately going to be
reused anyway.
*/

maxfb = &fastbin (av, NFASTBINS - 1);
fb = &fastbin (av, 0);
do {
p = atomic_exchange_acq (fb, NULL);
if (p != 0) {
do {
{
if (__glibc_unlikely (misaligned_chunk (p)))
malloc_printerr ("malloc_consolidate(): "
"unaligned fastbin chunk detected");

unsigned int idx = fastbin_index (chunksize (p));
if ((&fastbin (av, idx)) != fb)
malloc_printerr ("malloc_consolidate(): invalid chunk size");
}

check_inuse_chunk(av, p);
nextp = REVEAL_PTR (p->fd);

/* Slightly streamlined version of consolidation code in free() */
size = chunksize (p);
nextchunk = chunk_at_offset(p, size);
nextsize = chunksize(nextchunk);

if (!prev_inuse(p)) {
prevsize = prev_size (p);
size += prevsize;
p = chunk_at_offset(p, -((long) prevsize));
if (__glibc_unlikely (chunksize(p) != prevsize))
malloc_printerr ("corrupted size vs. prev_size in fastbins");
unlink_chunk (av, p);
}

if (nextchunk != av->top) {
nextinuse = inuse_bit_at_offset(nextchunk, nextsize);

if (!nextinuse) {
size += nextsize;
unlink_chunk (av, nextchunk);
} else
clear_inuse_bit_at_offset(nextchunk, 0);

first_unsorted = unsorted_bin->fd;
unsorted_bin->fd = p;
first_unsorted->bk = p;

if (!in_smallbin_range (size)) {
p->fd_nextsize = NULL;
p->bk_nextsize = NULL;
}

set_head(p, size | PREV_INUSE);
p->bk = unsorted_bin;
p->fd = first_unsorted;
set_foot(p, size);
}

else {
size += nextsize;
set_head(p, size | PREV_INUSE);
av->top = p;
}

} while ( (p = nextp) != 0);

}
} while (fb++ != maxfb);
}

这个函数的所做的事情就显而易见了,就是从fastbin中取出堆块进行合并放入到unsorted bin中。后续利用就不详细提了(因为我也没看了)但是估计就是通过off by nullunlink实现任意地址写。

下面回归正题开始syzkaller的分析,首先其工作原理在前一篇文章中提过,这里就不再重提了。前一篇文章提到syzkaller分为了三大组件,其实通过图就能看出来syz-fuzzersyz-executor都是位于虚拟机中的,而syz-manager位于Host主机中。

这里直接分析函数,在分析函数的过程中将有用的结构体再进一步分析。

RunManager函数

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
if prog.GitRevision == "" {
log.Fatalf("bad syz-manager build: build with make, run bin/syz-manager")
}
flag.Parse()
log.EnableLogCaching(1000, 1<<20)
cfg, err := mgrconfig.LoadFile(*flagConfig)
if err != nil {
log.Fatalf("%v", err)
}
RunManager(cfg)
}

首先看syz-managermain函数,其会读取传入的配置文件然后直接调用RunManager函数。

1
2
3
4
5
6
7
8
9
10
11
func RunManager(cfg *mgrconfig.Config) {
var vmPool *vm.Pool
if cfg.Type != "none" {
var err error
vmPool, err = vm.Create(cfg, *flagDebug)
if err != nil {
log.Fatalf("%v", err)
}
}
// ... ...
}

这里首先是初始化VM pool使用vm.Create函数进行创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func Create(cfg *mgrconfig.Config, debug bool) (*Pool, error) {
typ, ok := vmimpl.Types[vmType(cfg.Type)]
if !ok {
return nil, fmt.Errorf("unknown instance type '%v'", cfg.Type)
}
env := &vmimpl.Env{
Name: cfg.Name,
OS: cfg.TargetOS,
Arch: cfg.TargetVMArch,
Workdir: cfg.Workdir,
Image: cfg.Image,
SSHKey: cfg.SSHKey,
SSHUser: cfg.SSHUser,
Timeouts: cfg.Timeouts,
Debug: debug,
Config: cfg.VM,
KernelSrc: cfg.KernelSrc,
}
impl, err := typ.Ctor(env)
if err != nil {
return nil, err
}
return &Pool{
impl: impl,
workdir: env.Workdir,
template: cfg.WorkdirTemplate,
timeouts: cfg.Timeouts,
}, nil
}

这里主要做的事情是,首先获取VM类型,随后封装env结构体,最后调用VM Pool构造函数并返回。

1
2
3
4
5
6
7
type Pool struct {
impl vmimpl.Pool
workdir string
template string
timeouts targets.Timeouts
activeCount int32
}

这里简单提一下Pool结构体,类似于线程池的概念,在syz-manager使用一个VM池也就是Pool结构体来管理Guest VM

1
2
3
4
5
6
7
8
// Pool represents a set of test machines (VMs, physical devices, etc) of particular type.
type Pool interface {
// Count returns total number of VMs in the pool.
Count() int

// Create creates and boots a new VM instance.
Create(workdir string, index int) (Instance, error)
}

其接口实现又两个函数,第一个是返回当前池子里所有的VM数量,第二个是创建并启动一个新的实例,并且返回这个实例。

1
2
3
4
5
6
7
crashdir := filepath.Join(cfg.Workdir, "crashes")
osutil.MkdirAll(crashdir)

reporter, err := report.NewReporter(cfg)
if err != nil {
log.Fatalf("%v", err)
}

回到RunManager函数中,这里主要做的事情是创建一个crashes目录,然后生成一个reporter实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type Report struct {
// Title contains a representative description of the first oops.
Title string
// Alternative titles, used for better deduplication.
// If two crashes have a non-empty intersection of Title/AltTitles, they are considered the same bug.
AltTitles []string
// Bug type (e.g. hang, memory leak, etc).
Type Type
// The indicative function name.
Frame string
// Report contains whole oops text.
Report []byte
// Output contains whole raw console output as passed to Reporter.Parse.
Output []byte
// StartPos/EndPos denote region of output with oops message(s).
StartPos int
EndPos int
// SkipPos is position in output where parsing for the next report should start.
SkipPos int
// Suppressed indicates whether the report should not be reported to user.
Suppressed bool
// Corrupted indicates whether the report is truncated of corrupted in some other way.
Corrupted bool
// CorruptedReason contains reason why the report is marked as corrupted.
CorruptedReason string
// Recipients is a list of RecipientInfo with Email, Display Name, and type.
Recipients vcs.Recipients
// GuiltyFile is the source file that we think is to blame for the crash (filled in by Symbolize).
GuiltyFile string
// reportPrefixLen is length of additional prefix lines that we added before actual crash report.
reportPrefixLen int
// symbolized is set if the report is symbolized.
symbolized bool
}

Report结构体用来表示单次执行的结果,包括是否产生了crashOops的信息等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
mgr := &Manager{
cfg: cfg,
vmPool: vmPool,
target: cfg.Target,
sysTarget: cfg.SysTarget,
reporter: reporter,
crashdir: crashdir,
startTime: time.Now(),
stats: &Stats{haveHub: cfg.HubClient != ""},
crashTypes: make(map[string]bool),
corpus: make(map[string]CorpusItem),
disabledHashes: make(map[string]struct{}),
memoryLeakFrames: make(map[string]bool),
dataRaceFrames: make(map[string]bool),
fresh: true,
vmStop: make(chan bool),
hubReproQueue: make(chan *Crash, 10),
needMoreRepros: make(chan chan bool),
reproRequest: make(chan chan map[string]bool),
usedFiles: make(map[string]time.Time),
saturatedCalls: make(map[string]bool),
}

mgr.preloadCorpus()
mgr.initStats() // Initializes prometheus variables.
mgr.initHTTP() // Creates HTTP server.
mgr.collectUsedFiles()

// Create RPC server for fuzzers.
mgr.serv, err = startRPCServer(mgr)
if err != nil {
log.Fatalf("failed to create rpc server: %v", err)
}

if cfg.DashboardAddr != "" {
mgr.dash, err = dashapi.New(cfg.DashboardClient, cfg.DashboardAddr, cfg.DashboardKey)
if err != nil {
log.Fatalf("failed to create dashapi connection: %v", err)
}
}

if !cfg.AssetStorage.IsEmpty() {
mgr.assetStorage, err = asset.StorageFromConfig(cfg.AssetStorage, mgr.dash)
if err != nil {
log.Fatalf("failed to init asset storage: %v", err)
}
}

跟着函数流程往下走,首先这里会创建一个Manager实例,然后下面四个函数的作用分别是:

mgr.preloadCorpus(): 检查 corpus.db 文件是否存在(若不存在则创建)并载入 sys/linux/test 目录下的测试用模板

1
2
3
4
5
6
7
8
9
10
11
12
13
# Create an io_uring instance
r0 = syz_io_uring_setup(0xF00, &AUTO={0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, "000000000000000000000000", [0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0], [0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0]}, &AUTO=<r1=>0x0, &AUTO=<r2=>0x0)
# Set IORING_CQ_EVENTFD_DISABLED. Has no side-effect for the test,
# only tests syz_memcpy_off().
syz_memcpy_off$IO_URING_METADATA_FLAGS(r1, 0x114, &AUTO=0x1, 0x0, AUTO)
# Write an openat2 operation to the submission queue
syz_io_uring_submit(r1, r2, &AUTO=@IORING_OP_OPENAT2={AUTO, 0x0, AUTO, 0xffffffffffffff9c, &AUTO={0x42, 0x0, 0x0}, &AUTO='./file1\x00', AUTO, AUTO, 0x12345, {AUTO, 0x0, "0000000000000000000000000000000000000000"}})
# Notify the kernel about the submission and wait until completion
io_uring_enter(r0, 0x1, 0x1, 0x1, 0x0, 0x0)
# Get the resulting fd from the completion queue
r3 = syz_io_uring_complete(r1)
# Close the file
close(r3)

如上是sys/linux/test/io_uring模板

mgr.initStates(): 注册一个 prometheus 监视器(一个开源的监视&预警工具包)

mgr.initHTTP(): 创建一个 HTTP 服务器并注册一系列的目录

mgr.collectUsedFiles(): 检查所需文件是否存在

随后通过startRPCServer函数创建一个RPC服务器,该服务器用于Host与Guest VMs进行通信。最后初始化dashboard相关内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
type Manager struct {
cfg *mgrconfig.Config
vmPool *vm.Pool
target *prog.Target
sysTarget *targets.Target
reporter *report.Reporter
crashdir string
serv *RPCServer
corpusDB *db.DB
startTime time.Time
firstConnect time.Time
fuzzingTime time.Duration
stats *Stats
crashTypes map[string]bool
vmStop chan bool
checkResult *rpctype.CheckArgs
fresh bool
numFuzzing uint32
numReproducing uint32

dash *dashapi.Dashboard

mu sync.Mutex
phase int
targetEnabledSyscalls map[*prog.Syscall]bool

candidates []rpctype.Candidate // untriaged inputs from corpus and hub
disabledHashes map[string]struct{}
corpus map[string]CorpusItem
seeds [][]byte
newRepros [][]byte
lastMinCorpus int
memoryLeakFrames map[string]bool
dataRaceFrames map[string]bool
saturatedCalls map[string]bool

needMoreRepros chan chan bool
hubReproQueue chan *Crash
reproRequest chan chan map[string]bool

// For checking that files that we are using are not changing under us.
// Maps file name to modification time.
usedFiles map[string]time.Time

modules []host.KernelModule
coverFilter map[uint32]uint32
coverFilterBitmap []byte
modulesInitialized bool

assetStorage *asset.Storage
}

上述就是Manager结构体,这里只说几个重要的成员:

  • cfg: 基本设置信息,存放在一个json文件中
  • vmPool: 所用的vm Pool
  • reporter: 用以报告crash
  • serv: RPC server用于与Guest VM通信
  • corpusDB: 用于存放语料的数据库
  • targetEnabledSyscalls: 测试用例所允许使用的系统调用
  • candidates: 待执行测试用例
  • corpus: 语料库
  • seeds: 用来对语料库变异的种子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
go func() {
for lastTime := time.Now(); ; {
time.Sleep(10 * time.Second)
now := time.Now()
diff := now.Sub(lastTime)
lastTime = now
mgr.mu.Lock()
if mgr.firstConnect.IsZero() {
mgr.mu.Unlock()
continue
}
mgr.fuzzingTime += diff * time.Duration(atomic.LoadUint32(&mgr.numFuzzing))
executed := mgr.stats.execTotal.get()
crashes := mgr.stats.crashes.get()
corpusCover := mgr.stats.corpusCover.get()
corpusSignal := mgr.stats.corpusSignal.get()
maxSignal := mgr.stats.maxSignal.get()
triageQLen := len(mgr.candidates)
mgr.mu.Unlock()
numReproducing := atomic.LoadUint32(&mgr.numReproducing)
numFuzzing := atomic.LoadUint32(&mgr.numFuzzing)

log.Logf(0, "VMs %v, executed %v, cover %v, signal %v/%v, crashes %v, repro %v, triageQLen %v",
numFuzzing, executed, corpusCover, corpusSignal, maxSignal, crashes, numReproducing, triageQLen)
}
}()

这里会新起一个写成进行数据记录的工作,函数内部就是一个for循环并且没有停止的,其作用就是每隔十秒进行一次进度采集并输出日志,主要是采集执行信息、语料覆盖率、crashes 信息等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if *flagBench != "" {
mgr.initBench()
}

if mgr.dash != nil {
go mgr.dashboardReporter()
}

osutil.HandleInterrupts(vm.Shutdown)
if mgr.vmPool == nil {
log.Logf(0, "no VMs started (type=none)")
log.Logf(0, "you are supposed to start syz-fuzzer manually as:")
log.Logf(0, "syz-fuzzer -manager=manager.ip:%v [other flags as necessary]", mgr.serv.port)
<-vm.Shutdown
return
}
mgr.vmLoop()

首先,这里会先判断flagBench是否不为空字符串,如果不为空字符串则调用initBench函数。

1
2
3
4
5
var (
flagConfig = flag.String("config", "", "configuration file")
flagDebug = flag.Bool("debug", false, "dump all VM output to console")
flagBench = flag.String("bench", "", "write execution statistics into this file periodically")
)

这里的flagBench是一个全局变量,使用的是golang的flag包解析命令行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (mgr *Manager) initBench() {
f, err := os.OpenFile(*flagBench, os.O_WRONLY|os.O_CREATE|os.O_EXCL, osutil.DefaultFilePerm)
if err != nil {
log.Fatalf("failed to open bench file: %v", err)
}
go func() {
for {
time.Sleep(time.Minute)
vals := mgr.stats.all()
mgr.mu.Lock()
if mgr.firstConnect.IsZero() {
mgr.mu.Unlock()
continue
}
mgr.minimizeCorpus()
vals["corpus"] = uint64(len(mgr.corpus))
vals["uptime"] = uint64(time.Since(mgr.firstConnect)) / 1e9
vals["fuzzing"] = uint64(mgr.fuzzingTime) / 1e9
vals["candidates"] = uint64(len(mgr.candidates))
mgr.mu.Unlock()

data, err := json.MarshalIndent(vals, "", " ")
if err != nil {
log.Fatalf("failed to serialize bench data")
}
if _, err := f.Write(append(data, '\n')); err != nil {
log.Fatalf("failed to write bench data")
}
}
}()
}

initBench会启动一个协程,这个协程会每隔一分钟循环一次,其主要功能是调用minimizeCorpus将语料库进行最小化,想bench参数指定的文件写入语料库长度、启动时间、fuzzing 时间。

回到RunManager函数中,接下来又会调用dashboardReporter启动一个新的协程,这里的作用就是每隔一分钟上报一次syz-manager的状态。

最后检查一下VM Pool并调用vmloop函数。

vmloop函数

1
2
3
4
5
6
7
8
9
10
11
12
13
// Manager needs to be refactored (#605).
// nolint: gocyclo, gocognit, funlen
func (mgr *Manager) vmLoop() {
log.Logf(0, "booting test machines...")
log.Logf(0, "wait for the connection from test machine...")
instancesPerRepro := 3
vmCount := mgr.vmPool.Count()
maxReproVMs := vmCount - mgr.cfg.FuzzingVMs
if instancesPerRepro > maxReproVMs && maxReproVMs > 0 {
instancesPerRepro = maxReproVMs
}
// ... ...
}

函数首先对VM进行分组,一共分为两组,一组负责fuzzing,一组负责复现crash( maxReproVMs )。

1
2
3
4
5
6
7
8
instances := SequentialResourcePool(vmCount, 10*time.Second*mgr.cfg.Timeouts.Scale)
runDone := make(chan *RunResult, 1)
pendingRepro := make(map[*Crash]bool)
reproducing := make(map[string]bool)
var reproQueue []*Crash
reproDone := make(chan *ReproResult, 1)
stopPending := false
shutdown := vm.Shutdown

接着会调用SequentialResourcePool函数新建一个ResourcePool队列,主要负责对空闲的VM的使用顺序进行调控。

随后会初始化一系列的变量:

  • runDone:保存 fuzzing 结果为 crash 的 Crash 队列
  • pendingRepro:标识待复现的 Crash
  • reproducing:标识某个类型 Crash 是否准备被复现
  • reproQueue:Crash 的复现队列
  • reproDone:Crash 的复现结果
  • stopPending:等待停止标志位
  • shutdown:工作终止标志位

最后进入到一个大循环中,而这个大循环才是真正意义上的fuzzing调控流程。

1
2
3
4
5
6
7
for shutdown != nil || instances.Len() != vmCount {
mgr.mu.Lock()
phase := mgr.phase
mgr.mu.Unlock()

// ... ...
}

大循环终止循环的条件是shutdown != nil或者ResourcePool中的VM数量与总数量不相等,进入循环后首先做的事就是获取当前所进行的阶段。

1
2
3
4
5
6
7
8
9
10
11
12
for crash := range pendingRepro {
if reproducing[crash.Title] {
continue
}
delete(pendingRepro, crash)
if !mgr.needRepro(crash) {
continue
}
log.Logf(1, "loop: add to repro queue '%v'", crash.Title)
reproducing[crash.Title] = true
reproQueue = append(reproQueue, crash)
}

紧接着进入到内层小循环,这里会遍历reproducing中的crash,如果没有被复现过则从pendingRepro中删除,随后调用mgr.needRepro来看crash是否需要被复现,后面标记该标题的crash为已复现,最后加入到复现的队列中。

这里的crash.Title为Oops的第一行文本,从上面的逻辑可以看出来,一次只能复现同类crash中的一个。

1
2
3
4
5
6
7
8
log.Logf(1, "loop: phase=%v shutdown=%v instances=%v/%v %+v repro: pending=%v reproducing=%v queued=%v",
phase, shutdown == nil, instances.Len(), vmCount, instances.Snapshot(),
len(pendingRepro), len(reproducing), len(reproQueue))

canRepro := func() bool {
return phase >= phaseTriagedHub && len(reproQueue) != 0 &&
(int(atomic.LoadUint32(&mgr.numReproducing))+1)*instancesPerRepro <= maxReproVMs
}

接下来先输出一段日志,随后定义了一个闭包函数canRepro,该函数的功能就是判断是否能够进行crash复现返回的是bool类型。

首先会判断当前阶段是否已经超过了phaseTriagedHub阶段,随后判断reproQueue也就是复现队列是否为空,最后判断加上该crash后用于复现的VM是否小于等于maxReproVMs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if shutdown != nil {
for canRepro() {
vmIndexes := instances.Take(instancesPerRepro)
if vmIndexes == nil {
break
}
last := len(reproQueue) - 1
crash := reproQueue[last]
reproQueue[last] = nil
reproQueue = reproQueue[:last]
atomic.AddUint32(&mgr.numReproducing, 1)
log.Logf(0, "loop: starting repro of '%v' on instances %+v", crash.Title, vmIndexes)
go func() {
reproDone <- mgr.runRepro(crash, vmIndexes, instances.Put)
}()
}
for !canRepro() {
idx := instances.TakeOne()
if idx == nil {
break
}
log.Logf(1, "loop: starting instance %v", *idx)
go func() {
crash, err := mgr.runInstance(*idx)
runDone <- &RunResult{*idx, crash, err}
}()
}
}

这里首先会判断shutdown,然后进入道两个小循环中。首先看第一个小循环,洗衣歌循环的进入条件就是能够进行crash复现。

这里首先从资源池中取出一个vmIndexes,如果返回的是nil则直接退出循环。随后从reproQueue中取出一个crash,随后更新mgr.numReproducing计数。随后新开启一个写成调用mgr.runReprocrash进行复现,并将返回值输入到reproDone队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(...int)) *ReproResult {
features := mgr.checkResult.Features
res, stats, err := repro.Run(crash.Output, mgr.cfg, features, mgr.reporter, mgr.vmPool, vmIndexes)
ret := &ReproResult{
instances: vmIndexes,
report0: crash.Report,
repro: res,
stats: stats,
err: err,
hub: crash.hub,
}
if err == nil && res != nil && mgr.cfg.StraceBin != "" {
// We need only one instance to get strace output, release the rest.
putInstances(vmIndexes[1:]...)
defer putInstances(vmIndexes[0])

const straceAttempts = 2
for i := 1; i <= straceAttempts; i++ {
strace := repro.RunStrace(res, mgr.cfg, mgr.reporter, mgr.vmPool, vmIndexes[0])
sameBug := strace.IsSameBug(res)
log.Logf(0, "strace run attempt %d/%d for '%s': same bug %v, error %v",
i, straceAttempts, res.Report.Title, sameBug, strace.Error)
// We only want to save strace output if it resulted in the same bug.
// Otherwise, it will be hard to reproduce on syzbot and will confuse users.
if sameBug {
ret.strace = strace
break
}
}
} else {
putInstances(vmIndexes...)
}
return ret
}

这里主要做的事情其实就是直接调用了repro.Run函数,并且在后面进行了一下检查之后将vm重新放回资源池中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func Run(crashLog []byte, cfg *mgrconfig.Config, features *host.Features, reporter *report.Reporter,
vmPool *vm.Pool, vmIndexes []int) (*Result, *Stats, error) {
ctx, err := prepareCtx(crashLog, cfg, features, reporter, len(vmIndexes))
if err != nil {
return nil, nil, err
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ctx.createInstances(cfg, vmPool)
}()
// Prepare VMs in advance.
for _, idx := range vmIndexes {
ctx.bootRequests <- idx
}
// Wait until all VMs are really released.
defer wg.Wait()
return ctx.run()
}

可以看到这里首先调用了prepareCtx函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func prepareCtx(crashLog []byte, cfg *mgrconfig.Config, features *host.Features, reporter *report.Reporter,
VMs int) (*context, error) {
if VMs == 0 {
return nil, fmt.Errorf("no VMs provided")
}
entries := cfg.Target.ParseLog(crashLog)
if len(entries) == 0 {
return nil, ErrNoPrograms
}
crashStart := len(crashLog)
crashTitle, crashType := "", crash.UnknownType
if rep := reporter.Parse(crashLog); rep != nil {
crashStart = rep.StartPos
crashTitle = rep.Title
crashType = rep.Type
}
testTimeouts := []time.Duration{
3 * cfg.Timeouts.Program, // to catch simpler crashes (i.e. no races and no hangs)
20 * cfg.Timeouts.Program,
cfg.Timeouts.NoOutputRunningTime, // to catch "no output", races and hangs
}
switch {
case crashTitle == "":
crashTitle = "no output/lost connection"
// Lost connection can be detected faster,
// but theoretically if it's caused by a race it may need the largest timeout.
// No output can only be reproduced with the max timeout.
// As a compromise we use the smallest and the largest timeouts.
testTimeouts = []time.Duration{testTimeouts[0], testTimeouts[2]}
case crashType == crash.MemoryLeak:
// Memory leaks can't be detected quickly because of expensive setup and scanning.
testTimeouts = testTimeouts[1:]
case crashType == crash.Hang:
testTimeouts = testTimeouts[2:]
}
ctx := &context{
target: cfg.SysTarget,
reporter: reporter,
crashTitle: crashTitle,
crashType: crashType,
crashStart: crashStart,
entries: entries,
instances: make(chan *reproInstance, VMs),
bootRequests: make(chan int, VMs),
testTimeouts: testTimeouts,
startOpts: createStartOptions(cfg, features, crashType),
stats: new(Stats),
timeouts: cfg.Timeouts,
}
ctx.reproLogf(0, "%v programs, %v VMs, timeouts %v", len(entries), VMs, testTimeouts)
return ctx, nil
}

函数内部主要做的事就是一些检测然后初始化ctx,函数结束之后定义可一个sync.WaitGroup类型的变量。随后创建一个新的协程调用ctx.createInstances函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (ctx *context) createInstances(cfg *mgrconfig.Config, vmPool *vm.Pool) {
var wg sync.WaitGroup
for vmIndex := range ctx.bootRequests {
wg.Add(1)
vmIndex := vmIndex
go func() {
defer wg.Done()

var inst *instance.ExecProgInstance
maxTry := 3
for try := 0; try < maxTry; try++ {
select {
case <-vm.Shutdown:
try = maxTry
continue
default:
}
var err error
inst, err = instance.CreateExecProgInstance(vmPool, vmIndex, cfg,
ctx.reporter, &instance.OptionalConfig{Logf: ctx.reproLogf})
if err != nil {
ctx.reproLogf(0, "failed to init instance: %v, attempt %d/%d",
err, try+1, maxTry)
time.Sleep(10 * time.Second)
continue
}
break
}
if inst != nil {
ctx.instances <- &reproInstance{execProg: inst, index: vmIndex}
}
}()
}
wg.Wait()
// Clean up.
close(ctx.instances)
for inst := range ctx.instances {
inst.execProg.Close()
}
}

函数中,循环获取vmIndex,并且开启新的协程,在新的协程中调用instance.CreateExecProgInstance函数创建VM并拷贝crash程序,如果失败则休眠十秒如果成功则将结果输出到ctx.instances中,这里的最多尝试次数为maxTry

1
2
3
4
5
6
7
8
9
10
11
12
13
func CreateExecProgInstance(vmPool *vm.Pool, vmIndex int, mgrCfg *mgrconfig.Config,
reporter *report.Reporter, opt *OptionalConfig) (*ExecProgInstance, error) {
vmInst, err := vmPool.Create(vmIndex)
if err != nil {
return nil, fmt.Errorf("failed to create VM: %w", err)
}
ret, err := SetupExecProg(vmInst, mgrCfg, reporter, opt)
if err != nil {
vmInst.Close()
return nil, err
}
return ret, nil
}

这里CreateExecProgInstance函数的实现就是通过vmPool.Create创建启动虚拟机之后调用SetupExecProg函数拷贝要执行的二进制文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (ctx *context) run() (*Result, *Stats, error) {
// Indicate that we no longer need VMs.
defer close(ctx.bootRequests)

res, err := ctx.repro()
if err != nil {
return nil, nil, err
}
if res != nil {
ctx.reproLogf(3, "repro crashed as (corrupted=%v):\n%s",
ctx.report.Corrupted, ctx.report.Report)
// Try to rerun the repro if the report is corrupted.
for attempts := 0; ctx.report.Corrupted && attempts < 3; attempts++ {
ctx.reproLogf(3, "report is corrupted, running repro again")
if res.CRepro {
_, err = ctx.testCProg(res.Prog, res.Duration, res.Opts)
} else {
_, err = ctx.testProg(res.Prog, res.Duration, res.Opts)
}
if err != nil {
return nil, nil, err
}
}
ctx.reproLogf(3, "final repro crashed as (corrupted=%v):\n%s",
ctx.report.Corrupted, ctx.report.Report)
res.Report = ctx.report
}
return res, ctx.stats, nil
}

回到前面的RUN函数,最后会调用到ctx.run(),进入到上面的这个函数中,而在这个函数中则是调用ctx.repro()正式进行复现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (ctx *context) repro() (*Result, error) {
// Cut programs that were executed after crash.
for i, ent := range ctx.entries {
if ent.Start > ctx.crashStart {
ctx.entries = ctx.entries[:i]
break
}
}

reproStart := time.Now()
defer func() {
ctx.reproLogf(3, "reproducing took %s", time.Since(reproStart))
}()

res, err := ctx.extractProg(ctx.entries)
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
defer func() {
if res != nil {
res.Opts.Repro = false
}
}()
res, err = ctx.minimizeProg(res)
if err != nil {
return nil, err
}

// Try extracting C repro without simplifying options first.
res, err = ctx.extractC(res)
if err != nil {
return nil, err
}

// Simplify options and try extracting C repro.
if !res.CRepro {
res, err = ctx.simplifyProg(res)
if err != nil {
return nil, err
}
}

// Simplify C related options.
if res.CRepro {
res, err = ctx.simplifyC(res)
if err != nil {
return nil, err
}
}

return res, nil
}

这里首先会去除掉发生crash之后执行的程序,随后调用ctx.extractProg获取触发crash的集合,接着调用ctx.minimizeProg尝试最小化程序集合,调用ctx.extractC函数尝试在不简化配置的情况下提取C repro,然后调用ctx.simplifyProg简化配置并尝试提取C repro,最后调用ctx.simplifyC简化C相关配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (ctx *context) extractProg(entries []*prog.LogEntry) (*Result, error) {
ctx.reproLogf(2, "extracting reproducer from %v programs", len(entries))
start := time.Now()
defer func() {
ctx.stats.ExtractProgTime = time.Since(start)
}()

// Extract last program on every proc.
procs := make(map[int]int)
for i, ent := range entries {
procs[ent.Proc] = i
}
var indices []int
for _, idx := range procs {
indices = append(indices, idx)
}
sort.Ints(indices)
var lastEntries []*prog.LogEntry
for i := len(indices) - 1; i >= 0; i-- {
lastEntries = append(lastEntries, entries[indices[i]])
}
for _, timeout := range ctx.testTimeouts {
// Execute each program separately to detect simple crashes caused by a single program.
// Programs are executed in reverse order, usually the last program is the guilty one.
res, err := ctx.extractProgSingle(lastEntries, timeout)
if err != nil {
return nil, err
}
if res != nil {
ctx.reproLogf(3, "found reproducer with %d syscalls", len(res.Prog.Calls))
return res, nil
}

// Don't try bisecting if there's only one entry.
if len(entries) == 1 {
continue
}

// Execute all programs and bisect the log to find multiple guilty programs.
res, err = ctx.extractProgBisect(entries, timeout)
if err != nil {
return nil, err
}
if res != nil {
ctx.reproLogf(3, "found reproducer with %d syscalls", len(res.Prog.Calls))
return res, nil
}
}

ctx.reproLogf(0, "failed to extract reproducer")
return nil, nil
}

函数开始位置首先将程序逆序,随后调用ctx.extractProgSingle逐个运行单个程序,如果遇到crash则立刻返回(一般来说都是最后一个程序引起的)。如果没能找到则会进入到下面的判断中,如果程序个数不为1则会进入到ctx.extractProgBisect函数进行二分查找找出crash程序集合。

1
2
3
4
5
6
7
8
9
10
11
for !canRepro() {
idx := instances.TakeOne()
if idx == nil {
break
}
log.Logf(1, "loop: starting instance %v", *idx)
go func() {
crash, err := mgr.runInstance(*idx)
runDone <- &RunResult{*idx, crash, err}
}()
}

在看前面的两个小循环中的第二个循环,第二个循环的条件为不能进行crash复现,所以这里进入协程将剩余的VM调度去fuzz并将结果输出到runDone中去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (mgr *Manager) runInstance(index int) (*Crash, error) {
mgr.checkUsedFiles()
instanceName := fmt.Sprintf("vm-%d", index)

rep, vmInfo, err := mgr.runInstanceInner(index, instanceName)

machineInfo := mgr.serv.shutdownInstance(instanceName)
if len(vmInfo) != 0 {
machineInfo = append(append(vmInfo, '\n'), machineInfo...)
}

// Error that is not a VM crash.
if err != nil {
return nil, err
}
// No crash.
if rep == nil {
return nil, nil
}
crash := &Crash{
vmIndex: index,
hub: false,
Report: rep,
machineInfo: machineInfo,
}
return crash, nil
}

该函数实际调用的是mgr.runInstanceInner函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
func (mgr *Manager) runInstanceInner(index int, instanceName string) (*report.Report, []byte, error) {
inst, err := mgr.vmPool.Create(index)
if err != nil {
return nil, nil, fmt.Errorf("failed to create instance: %w", err)
}
defer inst.Close()

fwdAddr, err := inst.Forward(mgr.serv.port)
if err != nil {
return nil, nil, fmt.Errorf("failed to setup port forwarding: %w", err)
}

fuzzerBin, err := inst.Copy(mgr.cfg.FuzzerBin)
if err != nil {
return nil, nil, fmt.Errorf("failed to copy binary: %w", err)
}

// If ExecutorBin is provided, it means that syz-executor is already in the image,
// so no need to copy it.
executorBin := mgr.sysTarget.ExecutorBin
if executorBin == "" {
executorBin, err = inst.Copy(mgr.cfg.ExecutorBin)
if err != nil {
return nil, nil, fmt.Errorf("failed to copy binary: %w", err)
}
}

fuzzerV := 0
procs := mgr.cfg.Procs
if *flagDebug {
fuzzerV = 100
procs = 1
}

// Run the fuzzer binary.
start := time.Now()
atomic.AddUint32(&mgr.numFuzzing, 1)
defer atomic.AddUint32(&mgr.numFuzzing, ^uint32(0))

args := &instance.FuzzerCmdArgs{
Fuzzer: fuzzerBin,
Executor: executorBin,
Name: instanceName,
OS: mgr.cfg.TargetOS,
Arch: mgr.cfg.TargetArch,
FwdAddr: fwdAddr,
Sandbox: mgr.cfg.Sandbox,
Procs: procs,
Verbosity: fuzzerV,
Cover: mgr.cfg.Cover,
Debug: *flagDebug,
Test: false,
Runtest: false,
Optional: &instance.OptionalFuzzerArgs{
Slowdown: mgr.cfg.Timeouts.Slowdown,
RawCover: mgr.cfg.RawCover,
SandboxArg: mgr.cfg.SandboxArg,
},
}
cmd := instance.FuzzerCmd(args)
outc, errc, err := inst.Run(mgr.cfg.Timeouts.VMRunningTime, mgr.vmStop, cmd)
if err != nil {
return nil, nil, fmt.Errorf("failed to run fuzzer: %w", err)
}

var vmInfo []byte
rep := inst.MonitorExecution(outc, errc, mgr.reporter, vm.ExitTimeout)
if rep == nil {
// This is the only "OK" outcome.
log.Logf(0, "%s: running for %v, restarting", instanceName, time.Since(start))
} else {
vmInfo, err = inst.Info()
if err != nil {
vmInfo = []byte(fmt.Sprintf("error getting VM info: %v\n", err))
}
}

return rep, vmInfo, nil
}

这里首先调用mgr.vmPool.Create创建VM,随后调用inst.Forward进行TCP转发,然后通过inst.Copy拷贝syz-fuzzersyz-executor到VM文件系统下。随后调用instance.FuzzerCmd函数生成命令行后调用inst.Run函数启动syz-fuzzer,随后调用inst.MonitorExecution监视VM运行,这个函数主要是通过获取kernel oops来判断是否出现了crash

这里提一下vm实例,在syz-manaer中的vm实例其实是如下结构体表示的。

1
2
3
4
5
6
7
type Instance struct {
impl vmimpl.Instance
workdir string
timeouts targets.Timeouts
index int
onClose func()
}

其需要定义一些interface接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type Instance interface {
// Copy copies a hostSrc file into VM and returns file name in VM.
Copy(hostSrc string) (string, error)

// Forward sets up forwarding from within VM to the given tcp
// port on the host and returns the address to use in VM.
Forward(port int) (string, error)

// Run runs cmd inside of the VM (think of ssh cmd).
// outc receives combined cmd and kernel console output.
// errc receives either command Wait return error or vmimpl.ErrTimeout.
// Command is terminated after timeout. Send on the stop chan can be used to terminate it earlier.
Run(timeout time.Duration, stop <-chan bool, command string) (outc <-chan []byte, errc <-chan error, err error)

// Diagnose retrieves additional debugging info from the VM
// (e.g. by sending some sys-rq's or SIGABORT'ing a Go program).
//
// Optionally returns (some or all) of the info directly. If wait == true,
// the caller must wait for the VM to output info directly to its log.
//
// rep describes the reason why Diagnose was called.
Diagnose(rep *report.Report) (diagnosis []byte, wait bool)

// Close stops and destroys the VM.
Close()
}

Copy():将一个来自宿主机的文件拷贝至虚拟机中,返回虚拟机中的文件名

Forward():设置从虚拟机内到主机上给定 tcp 端口的转发,并返回要在虚拟机中使用的地址

Run():在虚拟机内执行命令

Diagnose():在虚拟机上检索额外的调试信息

Close():停止并销毁虚拟机

需要注意的是,不同的guest VM所实现的interface是不同的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (mgr *Manager) vmLoop() {
// ... ...
for shutdown != nil || instances.Len() != vmCount {
// ... ...
var stopRequest chan bool
if !stopPending && canRepro() {
stopRequest = mgr.vmStop
}

wait:
select {
case <-instances.Freed:
// An instance has been released.
case stopRequest <- true:
log.Logf(1, "loop: issued stop request")
stopPending = true
case res := <-runDone:
log.Logf(1, "loop: instance %v finished, crash=%v", res.idx, res.crash != nil)
if res.err != nil && shutdown != nil {
log.Logf(0, "%v", res.err)
}
stopPending = false
instances.Put(res.idx)
// On shutdown qemu crashes with "qemu: terminating on signal 2",
// which we detect as "lost connection". Don't save that as crash.
if shutdown != nil && res.crash != nil {
needRepro := mgr.saveCrash(res.crash)
if needRepro {
log.Logf(1, "loop: add pending repro for '%v'", res.crash.Title)
pendingRepro[res.crash] = true
}
}
case res := <-reproDone:
atomic.AddUint32(&mgr.numReproducing, ^uint32(0))
crepro := false
title := ""
if res.repro != nil {
crepro = res.repro.CRepro
title = res.repro.Report.Title
}
log.Logf(0, "loop: repro on %+v finished '%v', repro=%v crepro=%v desc='%v'",
res.instances, res.report0.Title, res.repro != nil, crepro, title)
if res.err != nil {
reportReproError(res.err)
}
delete(reproducing, res.report0.Title)
if res.repro == nil {
if !res.hub {
mgr.saveFailedRepro(res.report0, res.stats)
}
} else {
mgr.saveRepro(res)
}
case <-shutdown:
log.Logf(1, "loop: shutting down...")
shutdown = nil
case crash := <-mgr.hubReproQueue:
log.Logf(1, "loop: get repro from hub")
pendingRepro[crash] = true
case reply := <-mgr.needMoreRepros:
reply <- phase >= phaseTriagedHub &&
len(reproQueue)+len(pendingRepro)+len(reproducing) == 0
goto wait
case reply := <-mgr.reproRequest:
repros := make(map[string]bool)
for title := range reproducing {
repros[title] = true
}
reply <- repros
goto wait
}
}
}

进入最后阶段,等待处理不同的channel数据,这里逐case分析。

首先第一个的触发条件为instances.Freed即当空间的VM被Put返回资源池时则会向该channel送入一个true,进入其代码块,但是并没做什么事情。

第二个的条件可以注意到前面的赋值操作stopRequest其实就是mgr.vmStop,而这个channel会在VM instance的RUN函数中使用。

第三个的条件为res := <-runDonerunDone中有数据时进入,其有数据代表的是有crash产生了,这里主要做的事就是将crash加入到pendingRepro中。

第四个的条件就是复现时出现结果,这里主要做的事情是,从reproducing删除对应的crash,随后保存复现结果。

第五个条件就是存在终止信号。

第六个则是hubReproQueue也可能传来crash,如果传来了crash则将其加入到pendingRepro中。

第七个根据mgr.needMoreRepros字面意思就是需要更多的repros,其内部代码就是判断当前阶段以及等待复现、复现队列、复现中的数量是否为0并将最后的结果返回到channel中并在此运行wait重新等待。

最后mgr.reproRequest意为主动请求复现,这里会拷贝reproducing的位图并返回到channel中最后跳转wait重新等待。

 评论
评论插件加载失败
正在加载评论插件
由 Hexo 驱动 & 主题 Keep
本站由 提供部署服务
总字数 335.6k 访客数 访问量