一言不合秀代码 | 怎样写好 Mesos Framework

2017-03-27 20:21:48 +08:00
 dataman

“调度”这个词近两年被提到的比较多,资源调度管理应用生命周期等,带来了极大的便利,数人云开源的 Mesos 调度器 Swan ,基于 Mesos 实现应用调度框架,帮助用户轻松发布应用,实现应用的滚动更新,并根据用户指定的策略做应用的健康检测和故障转移。

授之以鱼不如授之以渔,小数带来工程师的代码级文章,透彻分析如何写 Mesos Framework :

运行任务

当我们接受到 Mesos 发来的 offer 以后,可以选择接受或拒绝 offer 。如果要运行任务,那么就 Accept offer, 否则的话应该 decline offer 。代码示例如下:

   
    for _, offer := range offers {
                cpus, mem, disk := OfferedResources(offer)
                var tasks []*mesos.TaskInfo
                for taskLaunched < tasksCount &&
                        cpus >= NeededCpus &&
                        mem >= NeededMem &&`
                        disk >= NeededDisk {
                        task, err := BuildTask(offer) 
                        if err != nil {
                                return fmt.Errorf("Build task failed: %s", err.Error())`
                        }   
 
         
                        taskInfo := BuildTaskInfo(offer, task)
                        tasks = append(tasks, taskInfo)
 
                         taskLaunched++
                        cpus -= version.Cpus
                        mem -= version.Mem
                        disk -= version.Disk
                 }
                 LaunchTasks(offer, tasks)
      }  

OfferedResources(offer) 用来计算 offer 提供的资源,包括 cpu, memory 和 disk 的大小. BuildTaskInfo 用来构造一个 mesos 所识别的 TaskInfo 结构体, LaunchTasks 用来给 Mesos 发命令以创建 task 。 LauchTasks 的过程实际上是一个接受 Mesos offer 的过程,示例代码如下:

        logrus.Infof("Launch %d tasks with offer %s", len(tasks), *offer.GetId().Value)
        call := &sched.Call{
                FrameworkId: s.framework.GetId(),
                Type:        sched.Call_ACCEPT.Enum(),
                Accept: &sched.Call_Accept{
                        OfferIds: []*mesos.OfferID{
                                offer.GetId(),
                        },
                        Operations: []*mesos.Offer_Operation{
                                &mesos.Offer_Operation{
                                        Type: mesos.Offer_Operation_LAUNCH.Enum(),
                                        Launch: &mesos.Offer_Operation_Launch{
                                                TaskInfos: tasks,
                                        },
                                },
                        },
                        Filters: &mesos.Filters{RefuseSeconds: proto.Float64(1)},
                },
        }
 
        return s.send(call)

之后,如果创建 tasks 成功的话,应该就可以在 Mesos ui 上看到刚才创建的 task 了。

Task 状态更新

Task 状态是通过 Mesos statusUpdate 事件来更新的。 Mesos task 状态大概分以下几种:

TaskState_TASK_STARTING TaskState = 0
TaskState_TASK_RUNNING  TaskState = 1
TaskState_TASK_KILLING  TaskState = 8
TaskState_TASK_FINISHED TaskState = 2
TaskState_TASK_FAILED   TaskState = 3
TaskState_TASK_KILLED   TaskState = 4
TaskState_TASK_ERROR    TaskState = 7
TaskState_TASK_LOST TaskState = 5
TaskState_TASK_DROPPED TaskState = 9
TaskState_TASK_UNREACHABLE TaskState = 10 
TaskState_TASK_GONE TaskState = 11 
TaskState_TASK_GONE_BY_OPERATOR TaskState = 12 
TaskState_TASK_UNKNOWN TaskState = 13}}}

 
更新状态的示例代码如下:
 

func (s *Scheduler) status(status *mesos.TaskStatus) {
    state := status.GetState()
    taskId := status.TaskId.GetValue()
    switch state {
        case mesos.TaskState_TASK_STAGING:
             doSometing()                  
        case mesos.TaskState_TASK_STARTING:
             doSometing()                 
        case mesos.TaskState_TASK_RUNNING:
             doSometing()
        case mesos.TaskState_TASK_FINISHED:
             doSometing()                
        case mesos.TaskState_TASK_FAILED:
             doSometing()                
        case mesos.TaskState_TASK_KILLED:
             doSometing()                
        case mesos.TaskState_TASK_LOST:
             doSometing()
    }

上面只是示例代码,具体的处理细节可以看 https://github.com/Dataman-Cloud/swan

删除任务

删除任务是通过给 Mesos 发送 Call_KILL 类型的消息来实现的,消息中指定了需要杀死的 task 的 ID ,具体示例代码如下:

        call := &sched.Call{
                FrameworkId: s.framework.GetId(),
                Type:        sched.Call_KILL.Enum(),
                Kill: &sched.Call_Kill{
                        TaskId: &mesos.TaskID{
                                Value: proto.String(task.ID),
                        },  
                        AgentId: &mesos.AgentID{
                                Value: task.AgentId,
                        },  
                },  
        }   
 
        duration := proto.Int64(task.KillPolicy.Duration * 1000 * 1000)
        if task.KillPolicy != nil {
                if task.KillPolicy.Duration != 0 {
                        call.Kill.KillPolicy = &mesos.KillPolicy{
                                GracePeriod: &mesos.DurationInfo{
                                        Nanoseconds: duration,
                                },  
                        }   
                }   
        }   
 
        return s.send(call)

其中, Type 类型指定了消息的类型, FrameworkId 指定了当前 Framework 在 Mesos 注册的 ID , task.ID 指定了需要 kill 的 task 的 ID , task.AgentId 指定了需要 kill 的 task 所在的 agentId.

killPolicy 是一个自定义的优雅终止相关的策略,其中指定了优雅终止的超时时间 duration,也就是 Mesos 先給 task 发一个 SIGTERM 的信号,让 task 有时间去做一些清理工作,如果 task 没有正常终止,在经过一定时间后发送 SIGKILL 去强制杀死 task 。这个时间由 duration 指定, Mesos 默认是 3 秒。

Mesos 断线重连

Framework 和 Mesos 之间通过一个长连接进行通信,在某些情况下,连接可能出错,这时候就需要 Framework 重新去连接 Mesos,示例代码如下:

    logrus.Infof("Subscribe with mesos master %s", s.Master)
    call := &sched.Call{
        Type: sched.Call_SUBSCRIBE.Enum(),
        Subscribe: &sched.Call_Subscribe{
            FrameworkInfo: s.Framework,
        },
    }
 
    if s.Framework.Id != nil {
        call.FrameworkId = &mesos.FrameworkID{
            Value: proto.String(s.Framework.Id.GetValue()),
        }
    }
 
    resp, err := s.Send(call)
    if err != nil {
        return err
    }
 
    // http might now be the default transport in future release
    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("Subscribe with unexpected response status: %d", resp.StatusCode)
    }
 
    go s.handleEvents(resp)
 
    return nil
}
 
func (s *Scheduler) resubscribe() {
    for {
        logrus.Warn("Connection to mesos got error, reconnect")
        if err := s.subscribe(); err == nil {
            return
        } else {
            logrus.Errorf("%s", err.Error())
        }
        <-time.After(2 * time.Second)
    }
}
 
func (s *Scheuler) handleEvents(resp *http.Response) {
    defer func() {
        resp.Body.Close()
    }()
 
    r := NewReader(resp.Body)
    dec := json.NewDecoder(r)
 
    for {
        event := new(sched.Event)
        if err := dec.Decode(event); err != nil {    //got some error, reconnect.
        go func() {
            s.resubscribe()
        }()
 
        return
        }
 
        switch event.GetType() {
            case sched.Event_SUBSCRIBED:
                doSomework()
            case sched.Event_OFFERS:
                doSomework()
            case sched.Event_RESCIND:
                doSomework()
            case sched.Event_UPDATE:
                doSomework()
            case sched.Event_MESSAGE:
                doSomework()
            case sched.Event_FAILURE:
                doSomework()
            case sched.Event_ERROR:
                doSomework()
            case sched.Event_HEARTBEAT:
                doSomework()
            }
        }
    }

函数 resubscribe 用来向 Mesos 重新注册,如果注册失败,隔 2 秒之后会重试,直到连接成功为止。注册成功后会在一个新的 goroutine 里继续原来逻辑的处理。具体可以查看 Mesos 文档(http://mesos.apache.org/documentation/latest/scheduler-http-api)的 Disconnections 小节关于重连的内容。

五步走的 Mesos Framework 教程就分享到这里了,更多代码请跳转https://github.com/Dataman-Cloud/swan,欢迎 Star&Fork 。

2679 次点击
所在节点    程序员
3 条回复
ydxred
2017-03-28 11:05:17 +08:00
一言不合就打广告....
v2dead
2017-03-28 12:49:31 +08:00
缩进有 4 格有 8 格,对不起,你是好人。
dataman
2017-03-28 13:58:16 +08:00
@v2dead 小编不是很懂代码缩进,可能复制过来的时候有问题,~~~~(>_<)~~~~

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/350716

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX