大佬求解 sqlalchemy,项目中偶然发现 postgres+flask_sqlalchemy 在 query 时存在内存泄露问题

2022-09-20 09:58:54 +08:00
 djasdjds

先说环境: 1.python3.6-python3.9 都有使用,均发现有问题,gc 打印没有不可回收的对象,排除 python 本身的问题。 2.SQLAlchemy 版本 1.3-最新 1.4.41 都存在查询完内存不释放的问题,特别的是用原生 sql 查询会比 orm model 查询释放内存少的多,数据库表为了测试存放了 6000 多条数据,实际项目在正常运行中,会一段时间由于 sqlalchemy 内存不释放内存逐渐增大,直到 oom ,用了很多内存分析工具,定位出下面,不知道怎么可以解决,还是自己项目使用上哪里有问题,sqlalchemy 这么大的库,存在这种问题没有被发现

参考了 sqlalchemy 官方文档的各种查询语句都有问题,写的 demo 是,起一个多线程,然后查询,线程销毁后,内存只释放了一部分,6000 多条数据 orm 查完后仍然有 25M 没释放 a = session.query(TaskModel).all() 发不了图:我发下运行结果: begin 57.20703125 end 106.94140625 finish!! end 82.31640625 cost: 25.109375

使用: a = session.execute("select * from task").all() 内存释放很多: begin 57.2109375 end 104.3515625 finish!! end 61.8046875 cost: 4.59375

# -*- coding: utf-8 -*-
import enum
import gc
import os
import threading
# from sqlalchemy.orm import Session
import time
from contextlib import contextmanager
from datetime import datetime

import psutil
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, TIMESTAMP, VARCHAR, Integer, Text, Enum, BOOLEAN

# metadata = MetaData()
# Base = declarative_base()

user = "postgres"
pwd = ""
host = ""
port = 
database = ""
db_uri = f"postgresql+psycopg2://{user}:{pwd}@{host}:{port}/{database}"

app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = db_uri
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_RECORD_QUERIES"] = False
# app.config["SQLALCHEMY_ECHO"] = True
engine_options = {
    "pool_size": 50,
    "pool_timeout": 30,
    "pool_recycle": 2 * 60 * 60,
    "max_overflow": 10,
    # "echo": True,
    "query_cache_size": 0,
    "execution_options":
        {
            "compiled_cache": None,
        }
}

app.config["SQLALCHEMY_ENGINE_OPTIONS"] = engine_options
db = SQLAlchemy(app, session_options={"expire_on_commit": False})

# engine = create_engine(db_uri)
# engine = db.get_engine()
# engine = create_engine(db_uri, future=True, **engine_options)
# Session = sessionmaker(autoflush=True, bind=engine)

@enum.unique
class TaskStatus(enum.Enum):
    INIT = "INIT"
    RUNNING = "RUNNING"
    SUCCESS = "SUCCESS"
    FAILED = "FAILED"
    FAILED_NEED_ROLLBACK = "FAILED_NEED_ROLLBACK"
    FAILED_NEED_CLEAR = "FAILED_NEED_CLEAR"
    TIMEOUT = "TIMEOUT"
    PARTIAL_SUCCESS = "PARTIAL_SUCCESS"
    PARTIAL_SUCCESS_NEED_CLEAR = "PARTIAL_SUCCESS_NEED_CLEAR"
    CLEAN_SUCCESS = "CLEAN_SUCCESS"
    CLEAN_FAILED = "CLEAN_FAILED"
    ROLLBACK_SUCCESS = "ROLLBACK_SUCCESS"
    ROLLBACK_FAILED = "ROLLBACK_FAILED"


class TaskModel(db.Model):
    __tablename__ = "task"
    id = Column(VARCHAR(64), primary_key=True)  # uuid
    task_id = Column(VARCHAR(32), nullable=False)  # Internal definition
    name = Column(VARCHAR(128))
    description = Column(Text)
    
    ***省略


@contextmanager
def get_session():
    try:
        # session = Session()
        session = db.session
        yield session
        session.commit()
    except:
        session.rollback()
        raise
    finally:
        session.close()
        session.remove()


def sleep_fun():
    print("sleep_fun")
    global begin_rss
    begin_rss = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024

    for j in range(1):
        print(j)
        # with Session(engine) as session:
        #     a = session.query(TaskModel).all()

        # import sqlalchemy.sql.selectable.Select
        # with Session(engine, future=True) as session:
        #     a = session.execute(select(TaskModel)).all()

        # with get_session() as session:
        #     stat = session.query(TaskModel)
        #     a = session.execute(str(stat)).all()
            # a = session.execute(stat).all()

        with get_session() as session:
            a = session.execute("select * from task").all()
            # a = session.query(TaskModel).all()
            print(psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024)
            print(2222, len(a))
        # with Session(engine) as session:
        #     statement = select(TaskModel)
        #     a = session.exec(str(statement)).all()
        #     print(2222, len(a))

    print("begin", begin_rss)
    print("end", psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024)
    print("finish!!")


t = threading.Thread(target=sleep_fun)
t.start()
t.join()
gc.collect()

print("end")

end_rss = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
print(end_rss)

print("cost: ", end_rss - begin_rss)
time.sleep(1000000)


请忽略注释的部分,试了这些办法都有释放不了的情况, 之前下了 sqlalchemy 1.4.40 的源码,打断点发现内存释放不了在于 context.py 的最后 loading.instances 中,

@classmethod
    def orm_setup_cursor_result(
        cls,
        session,
        statement,
        params,
        execution_options,
        bind_arguments,
        result,
    ):
        execution_context = result.context
        compile_state = execution_context.compiled.compile_state

        # cover edge case where ORM entities used in legacy select
        # were passed to session.execute:
        # session.execute(legacy_select([User.id, User.name]))
        # see test_query->test_legacy_tuple_old_select

        load_options = execution_options.get(
            "_sa_orm_load_options", QueryContext.default_load_options
        )
        if compile_state.compile_options._is_star:
            return result

        querycontext = QueryContext(
            compile_state,
            statement,
            params,
            session,
            load_options,
            execution_options,
            bind_arguments,
        )

        return loading.instances(result, querycontext)

loading.py


    try:
        (process, labels, extra) = list(
            zip(
                *[
                    query_entity.row_processor(context, cursor)
                    for query_entity in context.compile_state._entities
                ]
            )
        )
        import sys
        print(process, labels, extra)
        if context.yield_per and (
            context.loaders_require_buffering
            or context.loaders_require_uniquing
        ):
            raise sa_exc.InvalidRequestError(
                "Can't use yield_per with eager loaders that require uniquing "
                "or row buffering, e.g. joinedload() against collections "
                "or subqueryload().  Consider the selectinload() strategy "
                "for better flexibility in loading objects."
            )

一跑到这部分代码后,就释放不了内存了

2198 次点击
所在节点    Python
10 条回复
sivacohan
2022-09-20 18:07:54 +08:00
加个 gc.collect() 看看
lolizeppelin
2022-09-21 09:39:49 +08:00
纯查询 commit 干嘛....用事务又没见 begin

非查询先 flush 再 commit
djasdjds
2022-09-21 15:17:17 +08:00
@sivacohan 查询完有手动 gc ,内存还是不变
djasdjds
2022-09-21 15:20:19 +08:00
commit 是项目框架统一封装的,和这个没关系,查询加不加都有内存泄露
lookStupiToForce
2022-09-22 18:49:48 +08:00
rgengpbinvest111
2022-10-25 17:48:06 +08:00
求问您的问题是否已解决,我也遇到类似的问题但不知道如何解决
djasdjds
2022-11-26 16:58:01 +08:00
无解决
onnethy
2022-12-09 15:04:13 +08:00
@djasdjds 我这边用下面的方式解决了
res = db.session.execute("select * from task").fetchall()
"""
do something for res
"""
del res
gc.collect()
djasdjds
2023-01-11 16:12:30 +08:00
@onnethy 题目上已经说了,用 sql 是正常的,orm 内存不释放
yuyanglive
2023-06-02 11:23:47 +08:00

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/881470

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX