基于 Sanic 的微服务基础架构

2017-12-26 09:21:54 +08:00
 songcser

介绍

使用 python 做 web 开发面临的一个最大的问题就是性能,在解决 C10K 问题上显的有点吃力。有些异步框架 Tornado、Twisted、Gevent 等就是为了解决性能问题。这些框架在性能上有些提升,但是也出现了各种古怪的问题难以解决。

在 python3.6 中,官方的异步协程库 asyncio 正式成为标准。在保留便捷性的同时对性能有了很大的提升,已经出现许多的异步框架使用 asyncio。

使用较早的异步框架是 aiohttp,它提供了 server 端和 client 端,对 asyncio 做了很好的封装。但是开发方式和最流行的微框架 flask 不同,flask 开发简单,轻量,高效。

微服务是最近最火开发模式,它解决了复杂性问题,提高开发效率,便于部署等优点。

正是结合这些优点, 以 Sanic 为基础,集成多个流行的库来搭建微服务。Sanic 框架是和 Flask 相似的异步协程框架,简单轻量,并且性能很高。

本项目就是以 Sanic 为基础搭建的微服务框架。

特点

使用

项目地址: sanic-ms

Example

服务端

使用 sanic 异步框架,有较高的性能,但是使用不当会造成 blocking, 对于有 IO 请求的都要选用异步库。添加库要慎重。 sanic 使用 uvloop 异步驱动,uvloop 基于 libuv 使用 Cython 编写,性能比 nodejs 还要高。

功能说明:

启动前

@app.listener('before_server_start')
async def before_srver_start(app, loop):
    queue = asyncio.Queue()
    app.queue = queue
    loop.create_task(consume(queue, app.config.ZIPKIN_SERVER))
    reporter = AioReporter(queue=queue)
    tracer = BasicTracer(recorder=reporter)
    tracer.register_required_propagators()
    opentracing.tracer = tracer
    app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)

中间件

@app.middleware('request')
async def cros(request):
    if request.method == 'POST' or request.method == 'PUT':
        request['data'] = request.json
    span = before_request(request)
    request['span'] = span


@app.middleware('response')
async def cors_res(request, response):
    span = request['span'] if 'span' in request else None
    if response is None:
        return response
    result = {'code': 0}
    if not isinstance(response, HTTPResponse):
        if isinstance(response, tuple) and len(response) == 2:
            result.update({
                'data': response[0],
                'pagination': response[1]
            })
        else:
            result.update({'data': response})
        response = json(result)
        if span:
            span.set_tag('http.status_code', "200")
    if span:
        span.set_tag('component', request.app.name)
        span.finish()
    return response

异常处理

对抛出的异常进行处理,返回统一格式

任务

创建 task 消费 queue 中对 span,用于日志追踪

异步处理

由于使用的是异步框架,可以将一些 IO 请求并行处理

Example:

async def async_request(datas):
    # async handler request
    results = await asyncio.gather(*[data[2] for data in datas])
    for index, obj in enumerate(results):
        data = datas[index]
        data[0][data[1]] = results[index]

@user_bp.get('/<id:int>')
@doc.summary("get user info")
@doc.description("get user info by id")
@doc.produces(Users)
async def get_users_list(request, id):
    async with request.app.db.acquire(request) as cur:
        record = await cur.fetch(
            """ SELECT * FROM users WHERE id = $1 """, id)
        datas = [
            [record, 'city_id', get_city_by_id(request, record['city_id'])]
            [record, 'role_id', get_role_by_id(request, record['role_id'])]
        ]
        await async_request(datas)
        return record

get_city_by_id, get_role_by_id 是并行处理。

相关连接

sanic

模型设计 & ORM

Peewee is a simple and small ORM. It has few (but expressive) concepts, making it easy to learn and intuitive to use。

ORM 使用 peewee, 只是用来做模型设计和 migration, 数据库操作使用 asyncpg。

Example:

# models.py

class Users(Model):
    id = PrimaryKeyField()
    create_time = DateTimeField(verbose_name='create time',
                                default=datetime.datetime.utcnow)
    name = CharField(max_length=128, verbose_name="user's name")
    age = IntegerField(null=False, verbose_name="user's age")
    sex = CharField(max_length=32, verbose_name="user's sex")
    city_id = IntegerField(verbose_name='city for user', help_text=CityApi)
    role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)

    class Meta:
        db_table = 'users'


# migrations.py

from sanic_ms.migrations import MigrationModel, info, db

class UserMigration(MigrationModel):
    _model = Users

    # @info(version="v1")
    # def migrate_v1(self):
    #     migrate(self.add_column('sex'))

def migrations():
    try:
        um = UserMigration()
        with db.transaction():
            um.auto_migrate()
            print("Success Migration")
    except Exception as e:
        raise e

if __name__ == '__main__':
    migrations()

相关连接

peewee

数据库操作

asyncpg is the fastest driver among common Python, NodeJS and Go implementations

使用 asyncpg 为数据库驱动, 对数据库连接进行封装, 执行数据库操作。

不使用 ORM 做数据库操作,一个原因是性能,ORM 会有性能的损耗,并且无法使用 asyncpg 高性能库。另一个是单个微服务是很简单的,表结构不会很复杂,简单的 SQL 语句就可以处理来,没必要引入 ORM。使用 peewee 只是做模型设计

Example:

sql = "SELECT * FROM users WHERE name=$1"
name = "test"
async with request.app.db.acquire(request) as cur:
    data = await cur.fetchrow(sql, name)

async with request.app.db.transaction(request) as cur:
    data = await cur.fetchrow(sql, name)

相关连接

asyncpg benchmarks

客户端

使用 aiohttp 中的 client,对客户端进行了简单的封装,用于微服务之间访问。

Don ’ t create a session per request. Most likely you need a session per application which performs all requests altogether. A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.

Example:

@app.listener('before_server_start')
async def before_srver_start(app, loop):
    app.client =  Client(loop, url='http://host:port')

async def get_role_by_id(request, id):
    cli = request.app.client.cli(request)
    async with cli.get('/cities/{}'.format(id)) as res:
        return await res.json()

@app.listener('before_server_stop')
async def before_server_stop(app, loop):
    app.client.close()

对于访问不同的微服务可以创建多个不同的 client,这样每个 client 都会 keep-alives

相关连接

aiohttp

日志 & 分布式追踪系统

使用官方 logging, 配置文件为 logging.yml, sanic 版本要 0.6.0 及以上。JsonFormatter 将日志转成 json 格式,用于输入到 ES

Enter OpenTracing: by offering consistent, expressive, vendor-neutral APIs for popular platforms, OpenTracing makes it easy for developers to add (or switch) tracing implementations with an O(1) configuration change. OpenTracing also offers a lingua franca for OSS instrumentation and platform-specific tracing helper libraries. Please refer to the Semantic Specification.

装饰器 logger

@logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)
async def get_city_by_id(request, id):
    cli = request.app.client.cli(request)

分布式追踪系统

相关连接

opentracing zipkin jaeger

API 接口

api 文档使用 swagger 标准。

Example:

from sanic_ms import doc

@user_bp.post('/')
@doc.summary('create user')
@doc.description('create user info')
@doc.consumes(Users)
@doc.produces({'id': int})
async def create_user(request):
    data = request['data']
    async with request.app.db.transaction(request) as cur:
        record = await cur.fetchrow(
            """ INSERT INTO users(name, age, city_id, role_id)
                VALUES($1, $2, $3, $4, $5)
                RETURNING id
            """, data['name'], data['age'], data['city_id'], data['role_id']
        )
        return {'id': record['id']}

相关连接

swagger

Response 数据

在返回时,不要返回 sanic 的 response,直接返回原始数据,会在 Middleware 中对返回的数据进行处理,返回统一的格式,具体的格式可以[查看]

单元测试

单元测试使用 unittest。mock 是自己创建了 MockClient,因为 unittest 还没有 asyncio 的 mock,并且 sanic 的测试接口也是发送 request 请求,所以比较麻烦. 后期可以使用 pytest。

Example:

from sanic_ms.tests import APITestCase
from server import app

class TestCase(APITestCase):
    _app = app
    _blueprint = 'visit'

    def setUp(self):
        super(TestCase, self).setUp()
        self._mock.get('/cities/1',
                       payload={'id': 1, 'name': 'shanghai'})
        self._mock.get('/roles/1',
                       payload={'id': 1, 'name': 'shanghai'})

    def test_create_user(self):
        data = {
            'name': 'test',
            'age': 2,
            'city_id': 1,
            'role_id': 1,
        }
        res = self.client.create_user(data=data)
        body = ujson.loads(res.text)
        self.assertEqual(res.status, 200)

代码覆盖

coverage erase
coverage run --source . -m sanic_ms tests
coverage xml -o reports/coverage.xml
coverage2clover -i reports/coverage.xml -o reports/clover.xml
coverage html -d reports

相关连接

unittest coverage

异常处理

使用 app.error_handler = CustomHander() 对抛出的异常进行处理

Example:

from sanic_ms.exception import ServerError

@visit_bp.delete('/users/<id:int>')
async def del_user(request, id):
    raise ServerError(error='内部错误',code=10500, message="msg")
3803 次点击
所在节点    Python
2 条回复
timqi
2018-01-06 20:36:41 +08:00
sanic 已经有生产环境使用的了么?
yeyu123
2019-07-19 10:01:21 +08:00
赞一个

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/417601

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX