分享个 celery 的监控脚本吧

2019-01-03 11:00:51 +08:00
 fanhaipeng0403
# coding=utf-8
import celery
import celery.states
import celery.events
import collections
from itertools import chain
import logging
import prometheus_client
import sys
from threading import Thread
import time
import json
import os
from app.tasks import celery as app

# 监测的指标

# 在线 worker 数量
WORKERS = prometheus_client.Gauge('celery_workers', 'Number of alive workers')

# 每种状态任务的数量
TASKS = prometheus_client.Gauge('celery_tasks', 'Number of tasks per state', ['state'])
# 每种状态任务的名字和数量, celery 所有任务概览
TASKS_NAME = prometheus_client.Gauge('celery_tasks_by_name', 'Number of tasks per state and name', ['state', 'name'])
# 每个任务的执行时间,监测任务本身性能,用于优化 sql
TASKS_RUNTIME = prometheus_client.Histogram('celery_tasks_runtime_seconds', 'Task runtime (seconds)', ['name'])
# 每个任务的启动时间,监测阻塞情况, 用于分配调节 worker 数量
LATENCY = prometheus_client.Histogram('celery_task_latency', 'Seconds between a task is received and started.')

logger = logging.getLogger(__name__)


class WorkerMonitoring:

    def __init__(self, app):
        self._app = app

    def run(self):
        while True:
            self.update_workers_count()
            time.sleep(5)

    def update_workers_count(self):
        try:
            WORKERS.set(len(self._app.control.ping(timeout=5)))
        except Exception as exc:
            logger.exception("Error while pinging workers")


class TaskMonitoring:

    def __init__(self, app):
        self._app = app
        self._state = self._app.events.State()
        self._known_states = set()
        self._known_states_names = set()

    def run(self):
        self._monitor()

    def _process_event(self, event):
        print(event)
        # 时间可能并发过来,加锁
        with self._state._mutex:
            if event['type'] != 'worker-heartbeat':
                event_type = event['type'][5:]
                state = celery.events.state.TASK_EVENT_TO_STATE[event_type]
                if state == celery.states.STARTED:
                    # 监测启动时间
                    self._observe_latency(event)

                self._collect_tasks(event, state)

    def _observe_latency(self, event):
        try:
            prev_evt = self._state.tasks[event['uuid']]
        except KeyError:
            pass
        else:
            if prev_evt.state == celery.states.RECEIVED:
                LATENCY.observe(
                    event['local_received'] - prev_evt.local_received)

    def _collect_tasks(self, event, state):
        if state in celery.states.READY_STATES:
            self._incr_ready_task(event, state)
        else:

            self._state._event(event)

        self._collect_unready_tasks()

    def _incr_ready_task(self, event, state):

        #  'FAILURE', 'REVOKED', 'SUCCESS' 任务信息
        TASKS.labels(state=state).inc()
        try:
            name = self._state.tasks.pop(event['uuid']).name
            runtime = event.get('runtime')
            
            if name is not None and runtime is not None:
                TASKS_RUNTIME.labels(name=name).observe(runtime)
        except (KeyError, AttributeError):
            pass

    def _collect_unready_tasks(self):
        # 'PENDING', 'RECEIVED', 'REJECTED', 'RETRY', 'STARTED 任务信息
        cnt = collections.Counter(t.state for t in self._state.tasks.values())
        self._known_states.update(cnt.elements())
        for task_state in self._known_states:
            TASKS.labels(state=task_state).set(cnt[task_state])

        cnt = collections.Counter((t.state, t.name) for t in self._state.tasks.values() if t.name)
        self._known_states_names.update(cnt.elements())
        for task_state in self._known_states_names:
            TASKS_NAME.labels(state=task_state[0], name=task_state[1], ).set(cnt[task_state])

    def _monitor(self):
        while True:
            try:
                with self._app.connection() as conn:
                    # 从 broker 接收所有的事件,并交给 process_event 处理
                    logger.info("Try to connect to broker")
                    recv = self._app.events.Receiver(conn, handlers={'*': self._process_event, })

                    setup_metrics(self._app)
                    recv.capture(limit=None, timeout=None, wakeup=True)
                    logger.info("Connected to broker")

            except Exception as e:
                logger.exception("Queue connection failed")
                setup_metrics(self._app)
                time.sleep(5)


def setup_metrics(app):
    WORKERS.set(0)
    try:
        registered_tasks = app.control.inspect().registered_tasks().values()
    except Exception as e:

        for metric in TASKS.collect():
            for name, labels, cnt in metric.samples:
                TASKS.labels(**labels).set(0)
        for metric in TASKS_NAME.collect():
            for name, labels, cnt in metric.samples:
                TASKS_NAME.labels(**labels).set(0)
    else:

        # 'FAILURE', 'PENDING', 'RECEIVED', 'RETRY', 'REVOKED', 'STARTED', 'SUCCESS'
        for state in celery.states.ALL_STATES:

            TASKS.labels(state=state).set(0)
            for task_name in set(chain.from_iterable(registered_tasks)):
                TASKS_NAME.labels(state=state, name=task_name).set(0)


class EnableEvents:

    # celery 有个问题,即使配置了 CELERY_SEND_EVENTS,也不发送事件,采取这种方式

    def __init__(self, app):
        self._app = app

    def run(self):  # pragma: no cover
        while True:
            try:
                self.enable_events()
            except Exception as exc:
                self.log.exception("Error while trying to enable events")
            time.sleep(5)

    def enable_events(self):
        self._app.control.enable_events()

def start_httpd(addr):
    host, port = addr.split(':')
    logging.info('Starting HTTPD on {}:{}'.format(host, port))
    prometheus_client.start_http_server(int(port), host)


def celery_monitoring():
    setup_metrics(app)

    e = Thread(target=EnableEvents(app).run)
    e.daemon = True
    e.start()

    w = Thread(target=WorkerMonitoring(app).run)
    w.daemon = True
    w.start()

    t = Thread(target=TaskMonitoring(app).run)
    t.daemon = True
    t.start()

    start_httpd('0.0.0.0:49792')

    t.join()
    w.join()
    e.join()

@manager.command
def start_celery_monitoring():
    """
    nohup python manage.py start_celery_monitoring &
    """
    celery_monitoring()
2243 次点击
所在节点    Python
4 条回复
leisurelylicht
2019-01-03 11:21:16 +08:00
推荐楼主传到 github 上给我们个链接就行了
zhoudaiyu
2019-01-03 12:18:24 +08:00
兄弟是看我帖子了吗
fanhaipeng0403
2019-01-03 12:46:16 +08:00
@zhoudaiyu 看了~我最近也要处理这个事情~
Nick2VIPUser
2019-01-03 12:47:43 +08:00
@leisurelylicht 最好加上 readme ~

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/523381

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX