V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
johnsona
V2EX  ›  Python

Flask 源码系列文章

  •  
  •   johnsona · 2020-12-29 18:49:34 +08:00 · 2316 次点击
    这是一个创建于 1274 天前的主题,其中的信息可能已经有所发展或是发生改变。

    写了几篇关于 Flask 源码的文章

    Flask 源码系列

    也可以关注我的公众号

    持续更新中

    11 条回复    2020-12-30 00:02:00 +08:00
    ErwinCheung
        1
    ErwinCheung  
       2020-12-29 19:01:56 +08:00
    很不错的样子 最近不更新了吗
    johnsona
        2
    johnsona  
    OP
       2020-12-29 19:10:03 +08:00
    @ErwinCheung 会把 Flask 源码持续更新完
    stdout
        3
    stdout  
       2020-12-29 19:33:31 +08:00
    flask 的 pycharm 的自动提示太烂了。动态绑定的变量和方法导致补全太差。不知道后期是否会有改善。
    johnsona
        4
    johnsona  
    OP
       2020-12-29 19:40:05 +08:00
    @stdout 动态绑定的属性和方法,这个怎么感觉换其他框架也不好解决,fastapi 可以吗
    learningman
        5
    learningman  
       2020-12-29 20:32:07 +08:00
    @johnsona fastapi 也不行,运行时加载的东西想自动补全估计要上 AI 了
    shroxd
        6
    shroxd  
       2020-12-29 20:38:59 +08:00 via iPhone
    正好想看看这方面内容,感谢
    stdout
        7
    stdout  
       2020-12-29 20:54:11 +08:00
    @johnsona 边看文档边写,或者自己抽象一层。我打包的 base 参考
    ```

    class BaseModel(db.Model):
    __abstract__ = True

    create_time = db.Column(db.DateTime, nullable=False, index=True, comment='创建时间')
    update_time = db.Column(db.DateTime, nullable=False, index=True, comment='修改时间')

    # 另外一种方式处理自动更新
    # created_at = db.Column(db.DateTime, default=sqlalchemy.func.now(), nullable=False)
    # updated_at = db.Column(db.DateTime, default=sqlalchemy.func.now(), onupdate=sqlalchemy.func.now(), nullable=False)

    @classmethod
    def qry(cls) -> BaseQuery:
    return db.session.query(cls)

    @classmethod
    def filter(cls, *args) -> Query:
    return cls.qry().filter(*args)

    @classmethod
    def count(cls, rsql=None, sort=None, ignore_fields=None):
    _q = cls.qry().filter(to_db_query_sql(cls, rsql, ignore_fields))
    if sort is not None and sort != '':
    _q = _q.order_by(to_db_order(cls, sort))
    return _q.count()

    @classmethod
    def count_by(cls, **kwargs):
    return cls.qry().filter_by(**kwargs).count()

    @classmethod
    def find(cls, rsql=None, sort=None, ignore_fields=None):
    _q = cls.qry().filter(to_db_query_sql(cls, rsql, ignore_fields))
    if sort is not None and sort != '':
    _q = _q.order_by(to_db_order(cls, sort))
    return _q.all()

    @classmethod
    def page_find(cls, page_num, page_size, rsql=None, sort=None, and_sql=None, ignore_fields=None):
    if rsql is None or rsql.strip() == '':
    rsql = and_sql
    elif and_sql is not None:
    rsql = "(%s) and (%s)" % (rsql, and_sql)
    _q = cls.qry().filter(to_db_query_sql(cls, rsql, ignore_fields))
    if sort is not None and sort != '':
    _q = _q.order_by(to_db_order(cls, sort))
    ret = _q.paginate(page_num, page_size)
    return ret

    @classmethod
    def get(cls, _id):
    # print(cls)
    # print(__class__)
    # print(__class__.__dict__)
    # print(__class__.__bases__)
    # print(__class__.__subclasses__())
    return cls.qry().get(_id)

    @classmethod
    def delete(cls, _id):
    cls.qry().filter(cls.id == _id).delete()
    db.session.commit()

    def save(self):
    try:
    db.session.add(self)
    db.session.flush()
    db.session.commit()
    except BaseException as e:
    _log.error(e)
    db.session.rollback()

    @classmethod
    def add(cls, obj, save=True):
    item = None
    _type = type(obj)
    if _type is dict:
    item = cls.from_dict(obj)
    elif _type is cls:
    item = obj
    if item is not None:
    if hasattr(item, 'check') and callable(getattr(item, 'check')):
    getattr(item, 'check')()
    if save:
    item.save()
    return item

    def update_from_dict(self, obj, valid_fields=None):
    columns = [m.key for m in self.__table__.columns]
    valid_keys = [k for k in obj.keys() if k in columns]
    item = self.from_dict(obj)
    for k in valid_keys:
    if valid_fields is not None and k not in valid_fields:
    continue
    setattr(self, k, getattr(item, k))

    @classmethod
    def update(cls, obj, save=True, valid_fields=None):
    item = None
    _type = type(obj)
    if _type is dict:
    pk = inspect(cls).primary_key[0].name
    if pk not in obj.keys():
    return None
    item = cls.get(obj[pk])
    item.update_from_dict(obj, valid_fields)
    elif _type is cls:
    item = obj
    if item is not None:
    if hasattr(item, 'check') and callable(getattr(item, 'check')):
    getattr(item, 'check')()
    if save:
    item.save()
    return item

    @classmethod
    def find_one_by(cls, **kwargs):
    return cls.qry().filter_by(**kwargs).one_or_none()

    @classmethod
    def find_first_by(cls, **kwargs):
    return cls.qry().filter_by(**kwargs).first()

    @classmethod
    def find_all_by(cls, **kwargs) -> list:
    return cls.qry().filter_by(**kwargs).all()

    @classmethod
    def from_dict(cls, obj):
    if type(obj) not in [dict, RowProxy]:
    return None
    _obj = {}
    columns = inspect(cls).columns
    for column in columns:
    if column.name not in obj.keys():
    continue
    obj_value = obj[column.name]
    _type = type(column.type)
    value = obj_value
    if value is None:
    pass
    elif _type is Date:
    value = to_local_datetime(obj_value).date()
    elif _type is DateTime:
    value = to_local_datetime(obj_value)
    elif _type is Boolean:
    if type(value) is str:
    value = value.lower() == 'true'
    else:
    value = bool(obj_value)
    elif _type is Integer:
    if type(obj_value) is int:
    value = obj_value
    elif type(obj_value) is str:
    if not obj_value.isdigit():
    value = None
    else:
    value = int(obj_value)
    elif _type is Float:
    value = float(obj_value)
    elif _type is int:
    value = int(obj_value)
    elif _type:
    value = str(obj_value)
    _obj[column.name] = value
    # columns = [m.key for m in cls.__table__.columns]
    # _obj = {k: v for k, v in obj.items() if k in columns}
    return cls(**_obj)

    def to_dict(self, rel=True, ignore=None):
    """Returns the model properties as a dict

    :rtype: dict
    """
    result = {}

    if ignore is None:
    ignore = []

    columns = set([attr for attr in dir(self) if not attr.startswith('_')
    and attr not in ignore
    and attr not in ['metadata', 'query', 'query_class']
    and not callable(getattr(self, attr))
    ])
    if not rel:
    columns = [m.key for m in self.__table__.columns]
    for attr in columns:
    value = getattr(self, attr)
    if isinstance(value, list):
    result[attr] = list(map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value))
    elif hasattr(value, "to_dict"):
    result[attr] = value.to_dict()
    elif isinstance(value, dict):
    result[attr] = dict(map(
    lambda item: (item[0], item[1].to_dict())
    if hasattr(item[1], "to_dict") else item,
    value.items()
    ))
    else:
    result[attr] = value
    return result

    def to_json(self):
    def extended_encoder(x):
    if isinstance(x, datetime.datetime):
    return x.strftime("%Y-%m-%d %H:%M:%S")
    if isinstance(x, datetime.date):
    return x.strftime("%Y-%m-%d")
    if isinstance(x, UUID):
    return str(x)

    json_ignore = []
    if hasattr(self, '_json_ignore'):
    json_ignore = self._json_ignore
    return json.dumps(self.to_dict(ignore=json_ignore), default=extended_encoder)

    @staticmethod
    def result_to_dict(result, keys):
    new_result = []
    for row in result:
    row = dict(zip([attr.name for attr in keys], row))
    new_result.append(row)
    return new_result
    ```
    johnsona
        8
    johnsona  
    OP
       2020-12-29 21:15:40 +08:00
    @stdout 挺好的,sqlalchemy 还是要手动封装一些东西的
    我之前还试过 Marshallmallow 来把 model 转化为 dict 以及反序列化和校验入参,更加接近 drf 一点,但是别人可能就要花多一些时间看 Marshallmallow 的文档
    所以我觉得其实这样写就挺好了
    johnsona
        9
    johnsona  
    OP
       2020-12-29 21:16:13 +08:00
    @shroxd 不客气~
    ErwinCheung
        10
    ErwinCheung  
       2020-12-29 22:34:03 +08:00
    laoge 你这个代码的缩进 处女座无法忍受😂
    SjwNo1
        11
    SjwNo1  
       2020-12-30 00:02:00 +08:00
    那么,谁写过分析 django 源码的文章
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   4872 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 09:40 · PVG 17:40 · LAX 02:40 · JFK 05:40
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.