首页 > 其他 > 详细

golang goroutine源码阅读(上)

时间:2020-07-07 23:44:43      阅读:74      评论:0      收藏:0      [点我收藏+]

数据结构

调度相关的数据结构有三个,M(线程),P(调度器),G(goroutine)
M表示线程,P作为调度器用来帮助每个线程管理自己的goroutine,G就是golang的协程。我们可以通过runtime.GOMAXPROCS(n int)函数设置P的个数,注意P的个数并不代表M的个数,例如程序启动时runtime代码会出实话procs个P,但开始的时候只会启动一个M,就是M0和一个栈为64K(其他goroutine默认初始栈大小2K)来执行runtime代码。
那其他线程是什么时候创建的呐?
当goroutine被唤醒时,要在M上运行(恢复goroutine的上下文),P是帮助M管理goroutine的,恢复上下文的操作也由P来完成。如果被唤醒时发现还有空闲的P,并且没有其他M在窃取goroutine(M发现本地goroutine队列和全局goroutine队列都没有goroutine的时候,会去其他线程窃取goroutine),说明其他M都在忙,就会创建一个M让这个空闲的P帮他来管理goroutine。
总之一句话,开始的时候创建一个M,当发现调度不过来且还有空闲P没有工作就在创建新的,直到创建procs个M(procs通过runtime.GOMAXPROCS设置)

技术分享图片

G

golang 用结构体g表示goroutine

g

type g struct {
	stack       stack   // 当前栈的范围[stack.lo, stack.hi)
	stackguard0 uintptr // 用于抢占的,一般情况值为stack.lo + StackGuard
	stackguard1 uintptr // 用于C语言的抢占
	_panic         *_panic // 最内侧的panic函数
	_defer         *_defer // 最外侧的defer函数
	m              *m      // 当前goroutine属于哪个m
	sched          gobuf // 调度相关信息
	...
	schedlink      guintptr // sched是全局的goroutine链表,schedlink表示这个goroutine在链表中的下一个goroutine的指针
	...
	preempt        bool       // 抢占标志,如果需要抢占就将preempt设置为true
	...
}

gobuf

gobuf保存goroutine的调度信息,当一个goroutine被调度的时,本质上就是把这个goroutine放到cpu,恢复各个寄存器的值,然后运行

type gobuf struct {
	// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
	//
	// ctxt is unusual with respect to GC: it may be a
	// heap-allocated funcval, so GC needs to track it, but it
	// needs to be set and cleared from assembly, where it‘s
	// difficult to have write barriers. However, ctxt is really a
	// saved, live register, and we only ever exchange it between
	// the real register and the gobuf. Hence, we treat it as a
	// root during stack scanning, which means assembly that saves
	// and restores it doesn‘t need write barriers. It‘s still
	// typed as a pointer so that any other writes from Go get
	// write barriers.
	sp   uintptr // 栈指针
	pc   uintptr // 程序计数器
	g    guintptr // 当前被哪个goroutine持有
	ctxt unsafe.Pointer
	ret  sys.Uintreg // 系统调用返回值,防止系统调用后被其他goroutine抢占,所以有个地方保存返回值
	lr   uintptr
	bp   uintptr // 保存CPU的rip寄存器的值
}

M

golang中M表示实际操作系统的线程

m

type m struct {
	g0      *g     // g0帮M处理大小事务的goroutine,他是m中的第一个goroutine
	...
	gsignal       *g           // 用于信号处理的goroutine
	tls           [6]uintptr   // 线程私有空间
	mstartfn      func()
	curg          *g       // current running goroutine
	...
	p             puintptr // 当前正在运行的p(处理器)
	nextp         puintptr // 暂存的p
	oldp          puintptr // 执行系统调用之前的p
	...
	spinning      bool // 表示当前m没有goroutine了,正在从其他m偷取goroutine
	blocked       bool // m is blocked on a note
	...
	park          note // m没有goroutine的时候会在park上sleep,需要其他m在park中wake up这个m
	alllink       *m // on allm // 所有m的链表
	...
	thread        uintptr // thread handle
	...
}

P

golang中P表示一个调度器,为M提供上下文环境,使得M可以执行多个goroutine

p

type p struct {
	m           muintptr   // 与哪个M关联(可能为空的)
	...
	runqhead uint32 // p本地goroutine队列的头
	runqtail uint32 // p本地goroutine队列的尾
	runq     [256]guintptr // 队列指针,和sync.pool中数据结构一样也是循环队列
	...
	sudogcache []*sudog // sudog缓存,channel用的
	sudogbuf   [128]*sudog // 也是防止false sharing
	...
	pad cpu.CacheLinePad // 防止false sharing
}

schedt

schedt结构体用来保存P的状态信息和goroutine的全局运行队列

type schedt struct {
	...
	lock mutex // 全局锁

	// When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
	// sure to call checkdead().
	
	// 维护空闲的M
	midle        muintptr // 等待中的M链表
	nmidle       int32    // 等待中的M的数量
	nmidlelocked int32    // number of locked m‘s waiting for work
	mnext        int64    // number of m‘s that have been created and next M ID
	maxmcount    int32    // 最多创建多少个M(10000)
	nmsys        int32    // number of system m‘s not counted for deadlock
	nmfreed      int64    // cumulative number of freed m‘s

	ngsys uint32 // number of system goroutines; updated atomically
	
	// 维护空闲的P
	pidle      puintptr // idle p‘s
	npidle     uint32
	nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

	// goroutine的全局队列
	runq     gQueue
	runqsize int32
	...
	// 全局缓存已经退出的goroutine链表,下次再创建的时候直接用
	// Global cache of dead G‘s.
	gFree struct {
		lock    mutex
		stack   gList // Gs with stacks
		noStack gList // Gs without stacks
		n       int32
	}
	...
}

重要的全局变量

allgs    []*g   // 保存所有的g
allm     *m     // 所有的m构成的一个链表,包括下面的m0
allp     []*p  // 保存所有的p,len(allp) == gomaxprocs

ncpu         int32  // 系统中cpu核的数量,程序启动时由runtime代码初始化
gomaxprocs   int32  // p的最大值,默认等于ncpu,但可以通过GOMAXPROCS修改

sched     schedt    // 调度器结构体对象,记录了调度器的工作状态

m0 m        // 代表进程的主线程
g0  g       // m0的g0,也就是m0.g0 = &g0

分步骤剖析调度的初始化

下面是用go实现的hello world,代码里并没有关于调度的初始化,所以程序的入口并非是main.main,下面通过gdb一步步找到go是如何初始化调度的。

// test.go
package main

func main() {
	println("hello, world!")
}

编译

go build -gcflags "-N -l" test.go

使用OS X的同学注意,go1.11之后压缩的debug信息,OS X的同学需要同时做以下设置参考Debug Go Program With Gdb On Macos

export GOFLAGS="-ldflags=-compressdwarf=false"

调试

  • 利用断点可以找出目标文件的信息,在入口处打一个断点,找到程序入口在rt0_darwin_amd64.s的第8行
?  sudo gdb test 
(gdb) info files
Symbols from "/Users/journey/workspace/src/tool/gdb/test".
Local exec file:
	`/Users/journey/workspace/src/tool/gdb/test‘, file type mach-o-x86-64.
	Entry point: 0x104cd00
	0x0000000001001000 - 0x00000000010515b1 is .text
	0x00000000010515c0 - 0x000000000108162a is __TEXT.__rodata
	0x0000000001081640 - 0x0000000001081706 is __TEXT.__symbol_stub1
	0x0000000001081720 - 0x0000000001081e80 is __TEXT.__typelink
	0x0000000001081e80 - 0x0000000001081e88 is __TEXT.__itablink
	0x0000000001081e88 - 0x0000000001081e88 is __TEXT.__gosymtab
	0x0000000001081ea0 - 0x00000000010bfacd is __TEXT.__gopclntab
	0x00000000010c0000 - 0x00000000010c0020 is __DATA.__go_buildinfo
	0x00000000010c0020 - 0x00000000010c0128 is __DATA.__nl_symbol_ptr
	0x00000000010c0140 - 0x00000000010c0d08 is __DATA.__noptrdata
	0x00000000010c0d20 - 0x00000000010c27f0 is .data
	0x00000000010c2800 - 0x00000000010ddc90 is .bss
	0x00000000010ddca0 - 0x00000000010e01e8 is __DATA.__noptrbss
(gdb) b *0x104cd00
Breakpoint 1 at 0x104cd00: file /usr/local/go/src/runtime/rt0_darwin_amd64.s, line 8.
  • 进入上面找到的文件rt0_darwin_amd64.s(不同的架构文件是不同的)
?  runtime ls rt0_*
rt0_aix_ppc64.s       rt0_darwin_amd64.s    rt0_freebsd_arm.s     rt0_linux_arm64.s     rt0_nacl_386.s        rt0_netbsd_arm64.s    rt0_plan9_amd64.s
rt0_android_386.s     rt0_darwin_arm.s      rt0_illumos_amd64.s   rt0_linux_mips64x.s   rt0_nacl_amd64p32.s   rt0_openbsd_386.s     rt0_plan9_arm.s
rt0_android_amd64.s   rt0_darwin_arm64.s    rt0_js_wasm.s         rt0_linux_mipsx.s     rt0_nacl_arm.s        rt0_openbsd_amd64.s   rt0_solaris_amd64.s
rt0_android_arm.s     rt0_dragonfly_amd64.s rt0_linux_386.s       rt0_linux_ppc64.s     rt0_netbsd_386.s      rt0_openbsd_arm.s     rt0_windows_386.s
rt0_android_arm64.s   rt0_freebsd_386.s     rt0_linux_amd64.s     rt0_linux_ppc64le.s   rt0_netbsd_amd64.s    rt0_openbsd_arm64.s   rt0_windows_amd64.s
rt0_darwin_386.s      rt0_freebsd_amd64.s   rt0_linux_arm.s       rt0_linux_s390x.s     rt0_netbsd_arm.s      rt0_plan9_386.s       rt0_windows_arm.s
  • 打开文件go/src/runtime/rt0_darwin_amd64.s:8
    这里没有做什么就调了函数_rt0_amd64
TEXT _rt0_amd64_darwin(SB),NOSPLIT,$-8 // 参数+返回值共8字节
	JMP	_rt0_amd64(SB)
  • 然后在打断点看看_rt0_amd64在哪
    在ams_amd64.s第15行
(gdb) b _rt0_amd64
Breakpoint 2 at 0x1049350: file /usr/local/go/src/runtime/asm_amd64.s, line 15.

这里首先把参数放到DI,SI寄存器中,然后调用runtime.rt0_go,这就是进程初始化主要函数了
参数0放在DI通用寄存器
参数1放在SI通用寄存器
参数2放在DX通用寄存器
参数3放在CX通用寄存器

TEXT _rt0_amd64(SB),NOSPLIT,$-8 // 参数+返回值共8字节
	MOVQ	0(SP), DI	// argc
	LEAQ	8(SP), SI	// argv
	JMP	runtime·rt0_go(SB)
  • 然后跳转到runtime.rt0_go
(gdb) b runtime.rt0_go
Breakpoint 3 at 0x1049360: file /usr/local/go/src/runtime/asm_amd64.s, line 89.

初始化

这个函数有点长,下面我们分段来看rt0_go这个函数

初始化参数以及创建g0

  1. 首先将之前放入通用寄存器的参数放入AX,BX寄存器,然后调整栈顶指针(真SP寄存器)的位置,SP指针先减39,关于16字节向下对齐(因为CPU有一组 SSE 指令,这些指令中出现的内存地址必须是16的倍数),然后把参数放到SP+16字节和SP+24字节处
    golang的汇编有抽象出来的寄存器,通过是否有前缀变量区分真假寄存器,例如a+8(SP)就是golang的寄存器,8(SP)就是真的寄存器

  2. 创建g0,并初始化g.stackgruard0,g.stackguard1以及g.stack.lo,g.stack.hi的值(实际上是分配一段内存,然后分割成小段,约定哪小段表示哪个变量)

TEXT runtime·rt0_go(SB),NOSPLIT,$0
	// copy arguments forward on an even stack
	MOVQ	DI, AX		// argc
	MOVQ	SI, BX		// argv
	SUBQ	$(4*8+7), SP		// 2args 2auto
	ANDQ	$~15, SP
	MOVQ	AX, 16(SP)
	MOVQ	BX, 24(SP)

	// create istack out of the given (operating system) stack.
	// _cgo_init may update stackguard.
	// 初始化g0,g0就是go的第一个协程
	// 给g0分配栈空间大概64K
	// 
	MOVQ	$runtime·g0(SB), DI
	LEAQ	(-64*1024+104)(SP), BX // BX = SP - 64 * 1024 + 104
	MOVQ	BX, g_stackguard0(DI) // g0.g_stackguard0 = SP - 64 * 1024 + 104
	MOVQ	BX, g_stackguard1(DI) // g0.g_stackguard1 = SP - 64 * 1024 + 104
	MOVQ	BX, (g_stack+stack_lo)(DI) // g0.stack.lo = SP - 64 * 1024 + 104
	MOVQ	SP, (g_stack+stack_hi)(DI) // g0.stack.hi = SP

创建完g0的内存分布

技术分享图片

然后略过一段CPU型号检测和CGO初始化的代码

...

创建m0

  1. 创建将m0.tls放入DI寄存器,然后调用runtime.settls将m0设置为线程私有变量(mac下什么也没干),将m0与主线程绑定,然后对m0.tls进行存取操作验证是否能用,不能用就直接退出
  2. 绑定m0和g0的关系,m0.g0 = g0,g0.m = m0
	// 将m0与主线程绑定
	LEAQ	runtime·m0+m_tls(SB), DI // 将m0的thread local store成员的地址到DI
	CALL	runtime·settls(SB) // 调用settls设置线程本地存储(mac 下settls什么都没做,线程已经设置好本地存储了)

	// 通过往TLS存0x123在判断tls[0]是不是0x123验证TLS是否可用,如果不可用就abort
	// store through it, to make sure it works
	get_tls(BX)
	MOVQ	$0x123, g(BX)
	MOVQ	runtime·m0+m_tls(SB), AX
	CMPQ	AX, $0x123
	JEQ 2(PC)
	CALL	runtime·abort(SB)
ok:
	// set the per-goroutine and per-mach "registers"
	// 把g0存入m0的本地存储tls[0]
	get_tls(BX) // 将m0.tls[0]地址放入BX
	LEAQ	runtime·g0(SB), CX // 将g0地址放入CX
	MOVQ	CX, g(BX) // m0.tls[0] = &g0
	LEAQ	runtime·m0(SB), AX // 将m0地址放入AX

	// 将m0和g0建立映射关系
	// save m->g0 = g0
	MOVQ	CX, m_g0(AX) // m0.g0 = g0
	// save m0 to g0->m
	MOVQ	AX, g_m(CX) // g0.m = m0

	CLD				// convention is D is always left cleared
	CALL	runtime·check(SB)

创建完m0之后的内存分布

技术分享图片

m0和g0的关系

  1. m0表示主线程,g0表示主线程的第一个goroutine
  2. g0主要是记录主线程的栈信息,执行调度函数(schedule后边会讲)时会用,而用户goroutine有自己的栈,执行的时候会从g0栈切换到用户goroutine栈

初始化调度

g0和m0都创建并初始化好了,下面就该进行调度初始化了

  1. 将参数放入AX(初始化g0时将参数放入SP+16和SP+24的位置
  2. runtime.args初始化参数的
  3. runtime.osinit是初始化CPU核数的
  4. 重点看runtime.schedinit
	// 初始化m0
	// 将argc和argv入栈
	MOVL	16(SP), AX		// copy argc
	MOVL	AX, 0(SP)
	MOVQ	24(SP), AX		// copy argv
	MOVQ	AX, 8(SP)
	// 处理参数
	CALL	runtime·args(SB)
	// 获取cpu的核数
	CALL	runtime·osinit(SB)
	// 调度系统初始化
	CALL	runtime·schedinit(SB)

runtime.schedinit

下面函数省略了调度无关的代码,大概流程:

  1. 设置最大线程数
  2. 根据GOMAXPROCS设置procs(P的数量)
  3. 调用procresizeprocs调整P的数量
func schedinit() {
	// 取出g0
	_g_ := getg()
	if raceenabled {
		_g_.racectx, raceprocctx0 = raceinit()
	}	

	// 设置最大线程数
	sched.maxmcount = 10000
	
	...	

	// 初始化m0, 前边已经将m0和g0的关系绑定好了
	// 只是检查一下各种变量,然后将m0挂到allm链表中
	mcommoninit(_g_.m)

	...

	sched.lastpoll = uint64(nanotime())
	// ncpu在osinit时已经获取
	procs := ncpu
	// 如果GOMAXPROCS设置并且合法就将procs的设置为GOMAXPROCS
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}

	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}

	...	
}

runtime.procresize

  • 调度初始化最后一步
  1. 更新最后一次修改P数量动作的时间戳并累加花费时间
  2. 根据nprocs调整P的数量(加锁)
    1. nprocs > 现有P数量,就扩展allp(p的全局数组)的长度为nprocs
    2. nprocs < 现有P数量,就缩容allp的长度为nprocs
  3. 如果上一步是扩容了,就从堆中创建新P,并把P放入扩容出来的位置
  4. 通过g0找到m0,然后将allp[0]和m0绑定
  5. 如果allp缩容了,就将多余的p销毁
  6. 将空闲的p加入空闲链表
    到目前为止,创建了m0,g0,和nprocs个P,但是还是没有让调度真正的跑起来
func procresize(nprocs int32) *p {
	old := gomaxprocs
	if old < 0 || nprocs <= 0 {
		throw("procresize: invalid arg")
	}
	if trace.enabled {
		traceGomaxprocs(nprocs)
	}

	// update statistics
	now := nanotime()
	if sched.procresizetime != 0 {
		sched.totaltime += int64(old) * (now - sched.procresizetime)
	}
	sched.procresizetime = now

	// Grow allp if necessary.
	if nprocs > int32(len(allp)) { // 初始化的len(allp) == 0
		// Synchronize with retake, which could be running
		// concurrently since it doesn‘t run on a P.
		lock(&allpLock)
		if nprocs <= int32(cap(allp)) { // 需要缩容
			allp = allp[:nprocs]
		} else { // 扩容
			nallp := make([]*p, nprocs)
			// Copy everything up to allp‘s cap so we
			// never lose old allocated Ps.
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}
		unlock(&allpLock)
	}

	// initialize new P‘s
	for i := old; i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
		}
		pp.init(i)
		atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
	}

	_g_ := getg() // 获取g0
	if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // 进程初始化时g0.m与p没有绑定,所以g0.m.p == 0
		// continue to use the current P
		_g_.m.p.ptr().status = _Prunning
		_g_.m.p.ptr().mcache.prepareForSweep()
	} else {
		// release the current P and acquire allp[0].
		//
		// We must do this before destroying our current P
		// because p.destroy itself has write barriers, so we
		// need to do that from a valid P.
		if _g_.m.p != 0 {
			if trace.enabled {
				// Pretend that we were descheduled
				// and then scheduled again to keep
				// the trace sane.
				traceGoSched()
				traceProcStop(_g_.m.p.ptr())
			}
			_g_.m.p.ptr().m = 0
		}
		_g_.m.p = 0
		_g_.m.mcache = nil
		p := allp[0]
		p.m = 0
		p.status = _Pidle
		acquirep(p) // 把allp[0]和m0关联起来
		if trace.enabled {
			traceGoStart()
		}
	}

	// 如果有需要销毁的p,就是销毁
	// release resources from unused P‘s
	for i := nprocs; i < old; i++ {
		p := allp[i]
		p.destroy()
		// can‘t free P itself because it can be referenced by an M in syscall
	}

	// Trim allp.
	if int32(len(allp)) != nprocs {
		lock(&allpLock)
		allp = allp[:nprocs]
		unlock(&allpLock)
	}

	// 将空闲p放入空闲链表
	var runnablePs *p
	for i := nprocs - 1; i >= 0; i-- {
		p := allp[i]
		if _g_.m.p.ptr() == p { // allp[0]已经和m0关联了,所以不用放入空闲链表
			continue
		}
		p.status = _Pidle
		if runqempty(p) {
			pidleput(p)
		} else {
			p.m.set(mget())
			p.link.set(runnablePs)
			runnablePs = p
		}
	}
	stealOrder.reset(uint32(nprocs))
	var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
	atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
	return runnablePs
}

附录

data structure.dot

digraph {
    rankdir=TB;
    subgraph cluster_all{
        label="全局G";
        G10[shape=circle,label="G ",style=filled,fillcolor=purple]
        G11[shape=circle,label="G ",style=filled,fillcolor=purple]
        G12[shape=circle,label="G ",style=filled,fillcolor=purple]

        G10->G11[dir=none]
        G11->G12[dir=none]
    }
    subgraph cluster_in{
        label="运行中G"
        G1[shape=circle,label="G1",style=filled,fillcolor=red]
        G2[shape=circle,label="G2",style=filled,fillcolor=red]
        G3[shape=circle,label="G3",style=filled,fillcolor=red]
    }
    
    M1[shape=triangle,label="M1",style=filled,fillcolor=yellow]
    P1[shape=record,label="P1",style=filled,fillcolor=blue]

    M2[shape=triangle,label="M2",style=filled,fillcolor=yellow]
    P2[shape=record,label="P2",style=filled,fillcolor=blue]

    M3[shape=triangle,label="M3",style=filled,fillcolor=yellow]
    P3[shape=record,label="P3",style=filled,fillcolor=blue]

    subgraph cluster_wait{
        label="等待中G"
        G4[shape=circle,label="G ",style=filled,fillcolor=green]
        G5[shape=circle,label="G ",style=filled,fillcolor=green]
        G6[shape=circle,label="G ",style=filled,fillcolor=green]
        G7[shape=circle,label="G ",style=filled,fillcolor=green]
        G8[shape=circle,label="G ",style=filled,fillcolor=green]
        G9[shape=circle,label="G ",style=filled,fillcolor=green]
    }
    

    G1->M1[dir=none]
    M1->P1[dir=none]
    P1->G4[dir=none]
    G4->G5[dir=none]

    G2->M2[dir=none]
    M2->P2[dir=none]
    P2->G6[dir=none]
    G6->G7[dir=none]

    G3->M3[dir=none]
    M3->P3[dir=none]
    P3->G8[dir=none]
    G8->G9[dir=none]
}

golang goroutine源码阅读(上)

原文:https://www.cnblogs.com/wuwangchuxin0924/p/13264054.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!