/celerytask/myapp.py
app = Celery('celerytask')
app.config_from_object('celerytask.config')
/celerytask/config.py
broker_url = 'amqp://guest:guest@localhost:5672//'
result_backend = 'redis://localhost:6379/0'
accept_content = ['json', 'application/text']
task_queues = {
'celery': {
'exchange': 'celery',
'routing_key': 'celery',
},
}
task_routes = {
'celerytask.task.hello': {
'queue': 'celery',
'routing_key': 'celery'
},
}
imports = ['celerytask.task',]
/celerytask/task.py
@app.task
def hello(msg='hello'):
time.sleep(5)
return msg.upper()
class CallHandler(RequestHandler):
def get(self):
msg = self.get_argument('msg', 'default')
hello.apply_async(args=(msg,))
self.write('ok')
class PushHandler(RequestHandler):
def get(self):
msg = self.get_argument('msg', 'default')
self.application.publisher.publish(
exchange='celery',
routing_key='celery',
body={
'id': str(uuid.uuid1()),
'args': [msg, ],
'task': 'celerytask.task.hello'
}
)
self.write('ok')
直接调用(CallHandler)一切正常, 如果是把消息推到队列(PushHandler), celery 就报:
[ WARNING/MainProcess]Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"id": "6c50942e-1045-15ea-96c3-5251a82fa45f", "args": ["fsafifds"], "task": "celerytask.task.hello"}' (96b) {content_type:None content_encoding:None delivery_info:{'consumer_tag': 'None3', 'delivery_tag': 1, 'redelivered': False, 'exchange': 'celery', 'routing_key': 'celery'} headers={}}
这里 task.hello()也并没有执行, 关于这个报错, 网上查了能查到的几个解决方法, 包括 celery 版本降到 3.1, 卸载 librabbitmq 改用 pyamqp, 改 task_protocol=1, 都试了一个遍, 貌似都没用, 不知道该怎么解决?
另外有一点想知道的是直接调用任务函数和推送到消息队列有什么区别吗?
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.