# coding=utf-8
import celery
import celery.states
import celery.events
import collections
from itertools import chain
import logging
import prometheus_client
import sys
from threading import Thread
import time
import json
import os
from app.tasks import celery as app
# 监测的指标
# 在线 worker 数量
WORKERS = prometheus_client.Gauge('celery_workers', 'Number of alive workers')
# 每种状态任务的数量
TASKS = prometheus_client.Gauge('celery_tasks', 'Number of tasks per state', ['state'])
# 每种状态任务的名字和数量, celery 所有任务概览
TASKS_NAME = prometheus_client.Gauge('celery_tasks_by_name', 'Number of tasks per state and name', ['state', 'name'])
# 每个任务的执行时间,监测任务本身性能,用于优化 sql
TASKS_RUNTIME = prometheus_client.Histogram('celery_tasks_runtime_seconds', 'Task runtime (seconds)', ['name'])
# 每个任务的启动时间,监测阻塞情况, 用于分配调节 worker 数量
LATENCY = prometheus_client.Histogram('celery_task_latency', 'Seconds between a task is received and started.')
logger = logging.getLogger(__name__)
class WorkerMonitoring:
def __init__(self, app):
self._app = app
def run(self):
while True:
self.update_workers_count()
time.sleep(5)
def update_workers_count(self):
try:
WORKERS.set(len(self._app.control.ping(timeout=5)))
except Exception as exc:
logger.exception("Error while pinging workers")
class TaskMonitoring:
def __init__(self, app):
self._app = app
self._state = self._app.events.State()
self._known_states = set()
self._known_states_names = set()
def run(self):
self._monitor()
def _process_event(self, event):
print(event)
# 时间可能并发过来,加锁
with self._state._mutex:
if event['type'] != 'worker-heartbeat':
event_type = event['type'][5:]
state = celery.events.state.TASK_EVENT_TO_STATE[event_type]
if state == celery.states.STARTED:
# 监测启动时间
self._observe_latency(event)
self._collect_tasks(event, state)
def _observe_latency(self, event):
try:
prev_evt = self._state.tasks[event['uuid']]
except KeyError:
pass
else:
if prev_evt.state == celery.states.RECEIVED:
LATENCY.observe(
event['local_received'] - prev_evt.local_received)
def _collect_tasks(self, event, state):
if state in celery.states.READY_STATES:
self._incr_ready_task(event, state)
else:
self._state._event(event)
self._collect_unready_tasks()
def _incr_ready_task(self, event, state):
# 'FAILURE', 'REVOKED', 'SUCCESS' 任务信息
TASKS.labels(state=state).inc()
try:
name = self._state.tasks.pop(event['uuid']).name
runtime = event.get('runtime')
if name is not None and runtime is not None:
TASKS_RUNTIME.labels(name=name).observe(runtime)
except (KeyError, AttributeError):
pass
def _collect_unready_tasks(self):
# 'PENDING', 'RECEIVED', 'REJECTED', 'RETRY', 'STARTED 任务信息
cnt = collections.Counter(t.state for t in self._state.tasks.values())
self._known_states.update(cnt.elements())
for task_state in self._known_states:
TASKS.labels(state=task_state).set(cnt[task_state])
cnt = collections.Counter((t.state, t.name) for t in self._state.tasks.values() if t.name)
self._known_states_names.update(cnt.elements())
for task_state in self._known_states_names:
TASKS_NAME.labels(state=task_state[0], name=task_state[1], ).set(cnt[task_state])
def _monitor(self):
while True:
try:
with self._app.connection() as conn:
# 从 broker 接收所有的事件,并交给 process_event 处理
logger.info("Try to connect to broker")
recv = self._app.events.Receiver(conn, handlers={'*': self._process_event, })
setup_metrics(self._app)
recv.capture(limit=None, timeout=None, wakeup=True)
logger.info("Connected to broker")
except Exception as e:
logger.exception("Queue connection failed")
setup_metrics(self._app)
time.sleep(5)
def setup_metrics(app):
WORKERS.set(0)
try:
registered_tasks = app.control.inspect().registered_tasks().values()
except Exception as e:
for metric in TASKS.collect():
for name, labels, cnt in metric.samples:
TASKS.labels(**labels).set(0)
for metric in TASKS_NAME.collect():
for name, labels, cnt in metric.samples:
TASKS_NAME.labels(**labels).set(0)
else:
# 'FAILURE', 'PENDING', 'RECEIVED', 'RETRY', 'REVOKED', 'STARTED', 'SUCCESS'
for state in celery.states.ALL_STATES:
TASKS.labels(state=state).set(0)
for task_name in set(chain.from_iterable(registered_tasks)):
TASKS_NAME.labels(state=state, name=task_name).set(0)
class EnableEvents:
# celery 有个问题,即使配置了 CELERY_SEND_EVENTS,也不发送事件,采取这种方式
def __init__(self, app):
self._app = app
def run(self): # pragma: no cover
while True:
try:
self.enable_events()
except Exception as exc:
self.log.exception("Error while trying to enable events")
time.sleep(5)
def enable_events(self):
self._app.control.enable_events()
def start_httpd(addr):
host, port = addr.split(':')
logging.info('Starting HTTPD on {}:{}'.format(host, port))
prometheus_client.start_http_server(int(port), host)
def celery_monitoring():
setup_metrics(app)
e = Thread(target=EnableEvents(app).run)
e.daemon = True
e.start()
w = Thread(target=WorkerMonitoring(app).run)
w.daemon = True
w.start()
t = Thread(target=TaskMonitoring(app).run)
t.daemon = True
t.start()
start_httpd('0.0.0.0:49792')
t.join()
w.join()
e.join()
@manager.command
def start_celery_monitoring():
"""
nohup python manage.py start_celery_monitoring &
"""
celery_monitoring()
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.