从原openstack转型至docker已有一段时间。更稳定的使用docker了解docker的各流程,从源代码层面了解下containerd。本文基于docker 1.12版本,从1.11开始docker已拆分docker daemon
list --> /types.API/ListCheckpoint create --> /types.API/CreateCheckpoint delete --> /types.API/DeleteCheckpoint
list、state -->/types.API/State pause、resume、update -->/types.API/UpdateContainer create --> /types.API/CreateContainer stats --> /types.API/Stats watch -->/types.API/State 、/types.API/Events exec -->/types.API/Events 、/types.API/AddProcess 、/types.API/UpdateProcess kill -->/types.API/Signal start -->/types.API/Events 、 /types.API/CreateContainer 、/types.API/UpdateProcess update -->/types.API/UpdateContainer
/types.API/Events
/types.API/State
/types.API/GetServerVersion --return result
注:API--server.go --> daemon – supervisor.go(handleTask func)
/types.API/ListCheckpoint (supervisor.GetContainersTask)--> getContainers /types.API/CreateCheckpoint --> createCheckpoint /types.API/DeleteCheckpoint --> deleteCheckpoint
/types.API/State /types.API/Stats (supervisor.GetContainersTask)--> getContainers /types.API/UpdateContainer (supervisor.UpdateTask)-->updateContainer /types.API/CreateContainer (supervisor.StartTask)-->start /types.API/Events --> Events --return result /types.API/AddProcess -->addProcess /types.API/UpdateProcess -->updateProcess /types.API/Signal -->signal
getContainers -- return result createCheckpoint -->(runtime)CheckPoint -->exec.Command(c.runtime,arg....) deleteCheckpoint -->(runtime)DeleteCheckpoint -- return result
getContainers -- return result updateContainer -->(runtime)Resume Pause UpdateResources-->exec.Command(c.runtime,arg....) start -->(runtime supervisor/worker.go) Start -->exec.Command(c.shim,c.id,c.bundle,c.runtime) addProcess -->(runtime) exec --> exec.Command(c.shim,c.id,c.bundle,c.runtime) updateProcess -->return result signal -->return result
createContainer示例
deamon启动监听tasks及startTasks进程
a)进入main.go main方法调用daemon方法
app.Action = func(context *cli.Context) {
if err := daemon(context); err != nil {
logrus.Fatal(err)
}
}b)进入main.go daemon方法
for i := 0; i < 10; i++ {
wg.Add(1)
w := supervisor.NewWorker(sv, wg)
go w.Start()
}
if err := sv.Start(); err != nil {
return err
}c)初始化supervisor/worker.go NewWorker并启动监听startTask并处理
func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
return &worker{
s: s,
wg: wg,
}
}
func (w *worker) Start() {
defer w.wg.Done()
for t := range w.s.startTasks {
started := time.Now()
process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil {
logrus.WithFields(logrus.Fields{
"error": err,
"id": t.Container.ID(),
}).Error("containerd: start container")
t.Err <- err
evt := &DeleteTask{
ID: t.Container.ID(),
NoEvent: true,
Process: process,
}
w.s.SendTask(evt)
continue
}d)启动supervisor/supervisor.go task监听task并处理
func (s *Supervisor) Start() error {
logrus.WithFields(logrus.Fields{
"stateDir": s.stateDir,
"runtime": s.runtime,
"runtimeArgs": s.runtimeArgs,
"memory": s.machine.Memory,
"cpus": s.machine.Cpus,
}).Debug("containerd: supervisor running")
go func() {
for i := range s.tasks {
s.handleTask(i)
}ctr/main.go containersCommand
execCommand, killCommand, listCommand, pauseCommand, resumeCommand, startCommand, stateCommand, statsCommand, watchCommand, updateCommand,
ctr/container.go
var startCommand = cli.Command{
Name: "start",
Usage: "start a container",
ArgsUsage: "ID BundlePath”, ————…...
events, err := c.Events(netcontext.Background(), &types.EventsRequest{})/*事件创建*/
if err != nil {
fatal(err.Error(), 1)
}
if _, err := c.CreateContainer(netcontext.Background(), r); err != nil {/*容器创建*/
fatal(err.Error(), 1)
}
if context.Bool("attach") {
go func() {
io.Copy(stdin, os.Stdin)
if _, err := c.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{/*更新进程*/
Id: id,
Pid: "init",
CloseStdin: true,
}); err != nil {
fatal(err.Error(), 1)
}
restoreAndCloseStdin()
}()
if tty {
resize(id, "init", c)
go func() {
s := make(chan os.Signal, 64)
signal.Notify(s, syscall.SIGWINCH)
for range s {
if err := resize(id, "init", c); err != nil {
log.Println(err)
}
}
}()
}
waitForExit(c, events, id, "init", restoreAndCloseStdin)
}
},api/grpc/types/api.pb.go
func (c *aPIClient) Events(ctx context.Context, in *EventsRequest, opts ...grpc.CallOption) (API_EventsClient, error) {
stream, err := grpc.NewClientStream(ctx, &_API_serviceDesc.Streams[0], c.cc, "/types.API/Events", opts...)
if err != nil {
return nil, err
}
x := &aPIEventsClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
func (c *aPIClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
out := new(CreateContainerResponse)
err := grpc.Invoke(ctx, "/types.API/CreateContainer", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *aPIClient) UpdateProcess(ctx context.Context, in *UpdateProcessRequest, opts ...grpc.CallOption) (*UpdateProcessResponse, error) {
out := new(UpdateProcessResponse)
err := grpc.Invoke(ctx, "/types.API/UpdateProcess", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}api/grpc/types/api.pb.go
func _API_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(EventsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(APIServer).Events(m, &aPIEventsServer{stream})
}
func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateContainerRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(APIServer).CreateContainer(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/types.API/CreateContainer",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest))
}
return interceptor(ctx, in, info, handler)
}
func _API_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateProcessRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(APIServer).UpdateProcess(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/types.API/UpdateProcess",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(APIServer).UpdateProcess(ctx, req.(*UpdateProcessRequest))
}
return interceptor(ctx, in, info, handler)api/grpc/server/server.go 进入第一步中的tasks及sendTasks处理队列
func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
events := s.sv.Events(t, r.StoredOnly, r.Id)
func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
s.sv.SendTask(e)
apiC, err := createAPIContainer(r.Container, false)
func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
e := &supervisor.UpdateProcessTask{}
e.ID = r.Id
e.PID = r.Pid
e.Height = int(r.Height)
e.Width = int(r.Width)
e.CloseStdin = r.CloseStdin
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
}
return &types.UpdateProcessResponse{}, nil
}supervisor/create.go
func (s *Supervisor) start(t *StartTask) error {
s.startTasks <- tasksupervisor/worker.go
func (w *worker) Start() {
defer w.wg.Done()
for t := range w.s.startTasks {runtime/container.go
func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {
processRoot := filepath.Join(c.root, c.id, InitProcessID)
if err := os.Mkdir(processRoot, 0755); err != nil {
return nil, err
}
cmd := exec.Command(c.shim,
c.id, c.bundle, c.runtime,
) ---执行 docker-containerd-shim命令
cmd.Dir = processRoot
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
spec, err := c.readSpec()
if err != nil {
return nil, err
}
config := &processConfig{
checkpoint: checkpointPath,
root: processRoot,
id: InitProcessID,
c: c,
stdio: s,
spec: spec,
processSpec: specs.ProcessSpec(spec.Process),
}
p, err := newProcess(config)
if err != nil {
return nil, err
}
if err := c.createCmd(InitProcessID, cmd, p); err != nil {
return nil, err
}
return p, nil
}containerd-shim接收后处理
containerd-shim/main.go
func start(log *os.File) error {
p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
if err != nil {
return err
}
defer func() {
if err := p.Close(); err != nil {
writeMessage(log, "warn", err)
}
}()
if err := p.create(); err != nil {
p.delete()
return err
}containerd-shim/process.go跳转执行runc命令
func (p *process) create() error {
cmd := exec.Command(p.runtime, args...)本文出自 “虫子” 博客,请务必保留此出处http://bingdian.blog.51cto.com/94171/1893470
原文:http://bingdian.blog.51cto.com/94171/1893470