先说环境: 1.python3.6-python3.9 都有使用,均发现有问题,gc 打印没有不可回收的对象,排除 python 本身的问题。 2.SQLAlchemy 版本 1.3-最新 1.4.41 都存在查询完内存不释放的问题,特别的是用原生 sql 查询会比 orm model 查询释放内存少的多,数据库表为了测试存放了 6000 多条数据,实际项目在正常运行中,会一段时间由于 sqlalchemy 内存不释放内存逐渐增大,直到 oom ,用了很多内存分析工具,定位出下面,不知道怎么可以解决,还是自己项目使用上哪里有问题,sqlalchemy 这么大的库,存在这种问题没有被发现
参考了 sqlalchemy 官方文档的各种查询语句都有问题,写的 demo 是,起一个多线程,然后查询,线程销毁后,内存只释放了一部分,6000 多条数据 orm 查完后仍然有 25M 没释放 a = session.query(TaskModel).all() 发不了图:我发下运行结果: begin 57.20703125 end 106.94140625 finish!! end 82.31640625 cost: 25.109375
使用: a = session.execute("select * from task").all() 内存释放很多: begin 57.2109375 end 104.3515625 finish!! end 61.8046875 cost: 4.59375
# -*- coding: utf-8 -*-
import enum
import gc
import os
import threading
# from sqlalchemy.orm import Session
import time
from contextlib import contextmanager
from datetime import datetime
import psutil
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, TIMESTAMP, VARCHAR, Integer, Text, Enum, BOOLEAN
# metadata = MetaData()
# Base = declarative_base()
user = "postgres"
pwd = ""
host = ""
port =
database = ""
db_uri = f"postgresql+psycopg2://{user}:{pwd}@{host}:{port}/{database}"
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = db_uri
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_RECORD_QUERIES"] = False
# app.config["SQLALCHEMY_ECHO"] = True
engine_options = {
"pool_size": 50,
"pool_timeout": 30,
"pool_recycle": 2 * 60 * 60,
"max_overflow": 10,
# "echo": True,
"query_cache_size": 0,
"execution_options":
{
"compiled_cache": None,
}
}
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = engine_options
db = SQLAlchemy(app, session_options={"expire_on_commit": False})
# engine = create_engine(db_uri)
# engine = db.get_engine()
# engine = create_engine(db_uri, future=True, **engine_options)
# Session = sessionmaker(autoflush=True, bind=engine)
@enum.unique
class TaskStatus(enum.Enum):
INIT = "INIT"
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
FAILED_NEED_ROLLBACK = "FAILED_NEED_ROLLBACK"
FAILED_NEED_CLEAR = "FAILED_NEED_CLEAR"
TIMEOUT = "TIMEOUT"
PARTIAL_SUCCESS = "PARTIAL_SUCCESS"
PARTIAL_SUCCESS_NEED_CLEAR = "PARTIAL_SUCCESS_NEED_CLEAR"
CLEAN_SUCCESS = "CLEAN_SUCCESS"
CLEAN_FAILED = "CLEAN_FAILED"
ROLLBACK_SUCCESS = "ROLLBACK_SUCCESS"
ROLLBACK_FAILED = "ROLLBACK_FAILED"
class TaskModel(db.Model):
__tablename__ = "task"
id = Column(VARCHAR(64), primary_key=True) # uuid
task_id = Column(VARCHAR(32), nullable=False) # Internal definition
name = Column(VARCHAR(128))
description = Column(Text)
***省略
@contextmanager
def get_session():
try:
# session = Session()
session = db.session
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
session.remove()
def sleep_fun():
print("sleep_fun")
global begin_rss
begin_rss = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
for j in range(1):
print(j)
# with Session(engine) as session:
# a = session.query(TaskModel).all()
# import sqlalchemy.sql.selectable.Select
# with Session(engine, future=True) as session:
# a = session.execute(select(TaskModel)).all()
# with get_session() as session:
# stat = session.query(TaskModel)
# a = session.execute(str(stat)).all()
# a = session.execute(stat).all()
with get_session() as session:
a = session.execute("select * from task").all()
# a = session.query(TaskModel).all()
print(psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024)
print(2222, len(a))
# with Session(engine) as session:
# statement = select(TaskModel)
# a = session.exec(str(statement)).all()
# print(2222, len(a))
print("begin", begin_rss)
print("end", psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024)
print("finish!!")
t = threading.Thread(target=sleep_fun)
t.start()
t.join()
gc.collect()
print("end")
end_rss = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
print(end_rss)
print("cost: ", end_rss - begin_rss)
time.sleep(1000000)
请忽略注释的部分,试了这些办法都有释放不了的情况, 之前下了 sqlalchemy 1.4.40 的源码,打断点发现内存释放不了在于 context.py 的最后 loading.instances 中,
@classmethod
def orm_setup_cursor_result(
cls,
session,
statement,
params,
execution_options,
bind_arguments,
result,
):
execution_context = result.context
compile_state = execution_context.compiled.compile_state
# cover edge case where ORM entities used in legacy select
# were passed to session.execute:
# session.execute(legacy_select([User.id, User.name]))
# see test_query->test_legacy_tuple_old_select
load_options = execution_options.get(
"_sa_orm_load_options", QueryContext.default_load_options
)
if compile_state.compile_options._is_star:
return result
querycontext = QueryContext(
compile_state,
statement,
params,
session,
load_options,
execution_options,
bind_arguments,
)
return loading.instances(result, querycontext)
try:
(process, labels, extra) = list(
zip(
*[
query_entity.row_processor(context, cursor)
for query_entity in context.compile_state._entities
]
)
)
import sys
print(process, labels, extra)
if context.yield_per and (
context.loaders_require_buffering
or context.loaders_require_uniquing
):
raise sa_exc.InvalidRequestError(
"Can't use yield_per with eager loaders that require uniquing "
"or row buffering, e.g. joinedload() against collections "
"or subqueryload(). Consider the selectinload() strategy "
"for better flexibility in loading objects."
)
一跑到这部分代码后,就释放不了内存了
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.