大佬们,想请教一下数据库设计

22 天前
 iamtuzi3333
小弟目前遇到一个棘手的问题,就是现在咱们的公司用的数据库是 MongoDB ,目前出现吃内存严重现象,同时查询效率不高,数据其实很简单,但是量很多,都是传感器数据,现在每秒都有数据入库,都是一条条的 json ,现在用的 MongoDB ,单个集合就存储一个传感器的数据,但是我发现查询接口太慢了,查询过程只有一个字段去比较,就是大于 and 小于这个值的字段的所有数据,这个都很慢,数据关键一个字段就是 data 数组,200 个浮点数。大佬们有其他数据库推荐吗,不涉及多表联合查询,都是单表操作。
4991 次点击
所在节点    数据库
68 条回复
rickiey
22 天前
如果觉得数据较多,可以尝试将旧数据定时导出备份,使数据库的数据始终保持最近一段时间内的量,需要查询旧数据了,再把备份导入到本地的数据库查询
celaraze
22 天前
@iamtuzi3333 #56 很高兴看到大家的答案能解决你的问题。
但是还是要提醒一下,索引自然是会加快速度,但是也会加重写入负担(你可以理解为每次写入都会更新索引),一旦你写入数据量大、频次高,写入的开销会很大,记录的数据很难保证原子性(如果有连续的业务处理),我建议你当前尽速解决眼前的问题后,花点时间研究下 TSDB ,毕竟是针对 IoT 场景的特效药。比你给时间戳加索引靠谱的多。
wxiao333
21 天前
单个集合就存储一个传感器的数据, 相当于设计上已经将传感器分表了,加时间索引完全没有问题。
长久来看还是得时序数据库,除了速度快,还有很多优秀功能,比如 influxdb 的连续查询,你可以查任意时间间隔下的特征值,比如你现在时间间隔是 1s 的,我查一周的数据,点太多不好展示,我想切换为间隔为 1 分钟的,那 1 分钟的 60 个的点,是取第一个,最后一个,平均值,最大/小值,都很方便,秒出结果。
iamtuzi3333
21 天前
@rickiey 这个数据对 MongoDB 还算正常,只是比较吃内存。
@celaraze 是的大佬,目前我给单表的时间戳字段都建立了索引,写入过程需要一直维护这个索引,确实开销大,目前我的写入逻辑是批量写,异步定时每间隔 5 秒写入一批数据,数据接收村存到了 redis 的队列中,后面再从 list 中读取数据入库操作,这里采用了多线程,redis 的分布式锁,保证数据不重复不交叉,暂时应该还好,后续我想着继续把写入逻辑延迟,积累到一定数据量再写入,比如说几百条甚至上万条。小弟目前公司就我一个人,这个项目也是我全程一个人推进的,有点心累,接下来我花时间看看 TDengine 这个数据库,似乎这个还不错,非常谢谢各位大佬提供解决方案,小弟是真的感谢!
@wxiao333 是的,单集合存单传感器分表这个逻辑比较合理,索引就是维护需要系统开销。目前也在关注时序数据库,大佬说的查询那个功能确实比较优秀,接下来会重点花时间去了解时序数据库,目前物联网比较适合这类数据库,不过就是学习成本有点高,公司不等我哈哈哈。
wxf666
17 天前
用 SQLite 试了一下,亿级数据,上万并发,好像没啥问题?



- 单表数据:1.3 亿,100 GB

- 事务每秒:4.6 W 随机读,1 W 随机写

- 内存占用:16 MB ( Python 脚本,包括 SQLite 内存缓存)

- 测试硬件:六七年前轻薄本,SATA 低端固态

- 测试内容:模拟 500 设备,每秒各保存 200 浮点数据,连续三天



## 脚本使用方法

- 随机写入测试

```shell
# 从上次保存时间戳开始(不存在则为年初),每递增一秒,就写入 500 设备,各 200 浮点数据。直至写入 1W 记录为止
$ python3 test.py -w -d 设备数(默认 500 ) -n 写入行数(默认 1W )
```

- 随机读取测试

```shell
# 从 500 设备中,随机选一台,再随机选某个时间,取数据。直至读取 1W 记录为止
$ python3 test.py -r -d 设备数(默认 500 ) -n 读取行数(默认 1W )

# 最多运行 10 秒
$ timeout -s INT 10 python3 test.py -r

# 八进程同时测试
$ seq 8 | xargs -P 8 -I{} python3 test.py -r
```



## 测试脚本代码

```python
# V 站吞空格,缩进改为全角空格了

import time
import apsw
import random
import struct
import argparse
import itertools
from datetime import datetime, timezone

DEFAULT_DEVICES = 500
DEFAULT_RECORDS = 10000
SQLITE_MAX_WAL_PAGES = 10000
DB_PATH = '/数据库路径/文件名.db'
DEFAULT_START_TIME = int(datetime.strptime('2024-01-01 00:00:00', '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc).timestamp())

count = 0
db: apsw.Connection
devices = DEFAULT_DEVICES
records = DEFAULT_RECORDS
dev_time_range: list[range] = []


def parse_args():
   parser = argparse.ArgumentParser(description="SQLite 测试读写多传感器数据")
   group = parser.add_mutually_exclusive_group(required=True)
   group.add_argument('-r', action='store_true', help="随机读取")
   group.add_argument('-w', action='store_true', help="随机写入")
   parser.add_argument('-d', type=int, default=DEFAULT_DEVICES, help=f"设备数(默认 {DEFAULT_DEVICES})")
   parser.add_argument('-n', type=int, default=DEFAULT_RECORDS, help=f"要测试的记录数(默认 {DEFAULT_RECORDS})")

   global devices, records
   args = parser.parse_args()
   devices = args.d
   records = args.n
   return args


# 随机写的页面足够多时,确保落盘并重置 WAL 文件
def sqlite3_wal_hook(db: apsw.Connection, name: str, pages: int):
   if pages > SQLITE_MAX_WAL_PAGES:
     db.wal_checkpoint(mode=apsw.SQLITE_CHECKPOINT_RESTART)
   return apsw.SQLITE_OK


def init_db():
   global db
   db = apsw.Connection(DB_PATH)
   db.execute('PRAGMA journal_mode = WAL')
   db.execute('PRAGMA busy_timeout = 5000')
   db.execute('PRAGMA synchronous = NORMAL')
   db.setwalhook(sqlite3_wal_hook)

   db.execute('''
     CREATE TABLE IF NOT EXISTS device_data (
       id     INTEGER PRIMARY KEY,
       dev_id   AS (id >> 32),
       created AS (id & 0xFFFFFFFF),
       data    BLOB
    )
  ''')


# 获取每个设备,已保存数据的时间范围
def get_dev_time_range():
   rows = db.execute('''
     SELECT dev_min.created, dev_max.created + 1
     FROM (SELECT (max(id) >> 32) + 1 dev_count FROM device_data)
     JOIN json_each(REPLACE(FORMAT('[%*.*s]', dev_count, dev_count, '0'), ' ', '0,')) dev
     JOIN device_data dev_min ON dev_min.id = (SELECT min(id) FROM device_data WHERE id >= dev.key << 32)
     JOIN device_data dev_max ON dev_max.id = (SELECT max(id) FROM device_data WHERE id <= dev.key << 32 | 0xFFFFFFFF)
  ''').fetchall()

   dev_time_range.extend(list(itertools.starmap(range, rows))[:devices])
   dev_time_range.extend([range(DEFAULT_START_TIME, DEFAULT_START_TIME)] * max(devices - len(rows), 0))


def test_read():
   global count
   items = list(enumerate(dev_time_range))
   weights = list(itertools.accumulate(map(lambda i: i.stop - i.start, dev_time_range)))

   while count < records:
    # 以每设备时长为权重,随机抽取一个设备,再从其时间范围随机抽取时间点
     dev, time_range = random.choices(items, cum_weights=weights)[0]
     db.execute('''
       SELECT data
       FROM device_data
       WHERE id = ? << 32 | ?
    ''', (dev, random.choice(time_range))).fetchone()
     count += 1


def test_write():
   global count
   start_time = min(dev_time_range, key=lambda i: i.stop).stop

   for ts in itertools.count(start_time):
     for dev in range(devices):
       if count >= records:
         return
       elif ts in dev_time_range[dev]:
         continue

       floats = [random.random() for i in range(200)]
       data = struct.pack('200f', *floats)

       db.execute('BEGIN IMMEDIATE')
       db.execute('''
         INSERT INTO device_data (id, data)
         VALUES (? << 32 | ?, ?)
      ''', (dev, ts, data))
       db.execute('COMMIT')
       count += 1


def test(is_read: bool):
   init_db()
   get_dev_time_range()
   start_time = time.time()
   try:
     test_read() if is_read else test_write()
   except KeyboardInterrupt:
     pass
   finally:
     duration = time.time() - start_time
     print(f'在 {duration:6.2f} 秒内,随机{"写读"[is_read]} {count:6d} 行,平均 {count / duration:8.2f} 行/秒')


if __name__ == '__main__':
   args = parse_args()
   test(args.r)
```



## 1.3 亿 100 GB 数据库,文件结构信息分析

iamtuzi3333
10 天前
@wxf666 哇塞,首先谢谢大佬指点,大佬很强,不好意思,我假期没看论坛。目前我是建了索引,基本上解决问题。看了您的测试,太强了,我深感自己缺少这个精神,汗颜。不过我觉得单表不适合多传感器数据的存储,一开始我就 pass 了,数据太过分散,不方便后续读取维护。sqllite 我看到了乙方存的是记录,他们用文件存数据,然后有记录索引,用起始位置来标记数据,这个方案比较难,对我个人来说;所以就考虑用 mongoDB ,现在确实好用,有了索引查询效率瞬间上来了,之前占用内存大可能因为写入较频繁,每秒实时写入。我现在改成了异步延时写入,一开始存到了 redis 的 list ,然后再去 list 取数据写入到数据库,算是减少了内存消耗。不过大佬的实践很强,有时间我试试该数据库以及您说的方案,再次感谢大佬的指点!!!
wxf666
9 天前
@iamtuzi3333 #66

1. 《数据太过分散,不方便后续读取维护》是啥意思呢。。

这个测试的主键类似于(设备 ID ,时间戳),如果你要查今天所有设备的数据,可以:

```sql
SELECT *
FROM data
JOIN generate_series(1, 500) 设备
WHERE 设备 ID = 设备.value
AND 时间戳 BETWEEN '今天' AND '现在'
```

我觉得单表数据过大,需要担心的是 B+ 树层级过高,每次读取需要再耗费一个 IO 。。



2. 测试中,我也是用了类似《先缓存写入的行,一定数量后再刷写回数据库》方法,提高写事务速度的。

设定缓存最多 1W 行,平均每设备 20 行,每 5 行一个 4K 页,即每设备 16KB ,

所以就能参考 16KB 随机写速度了。。
iamtuzi3333
9 天前
@wxf666 我的意思是单表存一个传感器设备的数据,这样相对来说方便后续读取,数据字段可能还会添加。查询这个还好,现在加上了索引,即使我查询前几个月的数据也能够很快就响应了。写入这个问题暂时不管了,mongo 数据库就是吃内存,空间换时间了,还是谢谢大佬的指点,很强,这个测试能力。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1075881

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX