# coding=utf-8
import sqlite3
import random
import threading
import time
db_path = "/Users/admin/test.sqlite"
connection = sqlite3.connect(db_path, check_same_thread=False)
CREATE_SQL = """
CREATE TABLE IF NOT EXISTS test_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT
)
"""
write_count = 0
def write_handle():
while True:
connection.cursor().execute("INSERT INTO test_table (name) VALUES ('%s')" % random.random())
connection.commit()
global write_count
write_count += 1
print(str(int(time.time())) + "Insert_" + str(write_count))
def read_handle():
while True:
res = connection.cursor().execute("SELECT * FROM test_table LIMIT 1").fetchone()
if res is None or res[0] is None:
continue
print(str(int(time.time())) + "Read")
if __name__ == '__main__':
c = connection.cursor()
c.execute(CREATE_SQL)
connection.commit()
time.sleep(1)
threading.Thread(target=write_handle).start()
threading.Thread(target=read_handle).start()
time.sleep(1000)
import sqlite3
import random
import threading
import time
db_path = "/Users/admin/test.sqlite"
connection = sqlite3.connect(db_path, check_same_thread=False)
CREATE_SQL = """
CREATE TABLE IF NOT EXISTS test_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT
)
"""
write_count = 0
def write_handle():
while True:
connection.cursor().execute("INSERT INTO test_table (name) VALUES ('%s')" % random.random())
connection.commit()
global write_count
write_count += 1
print(str(int(time.time())) + "Insert_" + str(write_count))
def read_handle():
while True:
res = connection.cursor().execute("SELECT * FROM test_table LIMIT 1").fetchone()
if res is None or res[0] is None:
continue
print(str(int(time.time())) + "Read")
if __name__ == '__main__':
c = connection.cursor()
c.execute(CREATE_SQL)
connection.commit()
time.sleep(1)
threading.Thread(target=write_handle).start()
threading.Thread(target=read_handle).start()
time.sleep(1000)