python之sqlalchemy的使用
准备数据
1 from sqlalchemy.ext.declarative import declarative_base2 from sqlalchemy import Column
3 from sqlalchemy import Integer, String, Text, Date, DateTime, ForeignKey, UniqueConstraint, Index
4 from sqlalchemy import create_engine
5 from sqlalchemy.orm import relationship
6
7 Base = declarative_base()
8
9
10 class Depart(Base):
11 __tablename__ = 'depart'
12 id = Column(Integer, primary_key=True)
13 title = Column(String(32), index=True, nullable=False)
14
15
16 class Users(Base):
17 __tablename__ = 'users'
18
19 id = Column(Integer, primary_key=True)
20 name = Column(String(32), index=True, nullable=False)
21 depart_id = Column(Integer, ForeignKey("depart.id"))
22
23 # 用于链表操作 与表的创建无关
24 dp = relationship("Depart", backref='pers')
25
26
27 class Student(Base):
28 __tablename__ = 'student'
29 id = Column(Integer, primary_key=True)
30 name = Column(String(32), index=True, nullable=False)
31
32 course_list = relationship('Course', secondary='student2course', backref='student_list')
33
34
35 class Course(Base):
36 __tablename__ = 'course'
37 id = Column(Integer, primary_key=True)
38 title = Column(String(32), index=True, nullable=False)
39
40
41 class Student2Course(Base):
42 __tablename__ = 'student2course'
43 id = Column(Integer, primary_key=True, autoincrement=True)
44 student_id = Column(Integer, ForeignKey('student.id'))
45 course_id = Column(Integer, ForeignKey('course.id'))
46
47 __table_args__ = (
48 UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引
49 # Index('ix_id_name', 'name', 'extra'), # 联合索引
50 )
51
52
53 def create_all():
54 engine = create_engine(
55 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
56 max_overflow=0, # 超过连接池大小外最多创建的连接
57 pool_size=5, # 连接池大小
58 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
59 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
60 )
61
62 Base.metadata.create_all(engine)
63
64
65 def drop_all():
66 engine = create_engine(
67 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
68 max_overflow=0, # 超过连接池大小外最多创建的连接
69 pool_size=5, # 连接池大小
70 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
71 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
72 )
73 Base.metadata.drop_all(engine)
74
75
76 if __name__ == '__main__':
77 # drop_all()
78 create_all()
models.py
基本操作
1 from sqlalchemy.orm import sessionmaker2 from sqlalchemy import create_engine
3 from models import Users, Student, Depart
4
5 engine = create_engine(
6 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
7 max_overflow=0, # 超过连接池大小外最多创建的连接
8 pool_size=5, # 连接池大小
9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
11 )
12 SessionFactory = sessionmaker(bind=engine)
13
14 # 从连接池获取一个连接
15 session = SessionFactory()
16
17 # ############################## 基本增删改查 ###############################
18 # 1. 增加
19 obj = Users(name='tang')
20 session.add(obj)
21 session.commit()
22
23 # 批量增加
24 session.add_all([
25 Users(name='tang'),
26 Users(name='chen')
27 ])
28 session.commit()
29
30 # 2. 查
31 result = session.query(Users).all()
32 for row in result:
33 print(row.id,row.name)
34
35 # sqlalchemy 的语法跟Python很相似
36 result = session.query(Users).filter(Users.id >= 2)
37 for row in result:
38 print(row.id,row.name)
39
40
41 # 获取第一个
42 result = session.query(Users).filter(Users.id >= 2).first()
43 print(result)
44
45 # 3.删
46 session.query(Users).filter(Users.id >= 2).delete()
47 session.commit()
48
49 # 4.改 通过字典
50 session.query(Users).filter(Users.id == 4).update({Users.name:'tang'})
51 session.query(Users).filter(Users.id == 4).update({'name':'tang'})
52 session.query(Users).filter(Users.id == 4).update({'name':Users.name+"_lao"},synchronize_session=False)
53 session.commit()
54
55 # ############################## 其他常用 ###############################
56 # 1. 指定列 去别名
57 # 对应原生SQL:select id,name as cname from users;
58 result = session.query(Users.id,Users.name.label('cname')).all()
59 for item in result:
60 print(item[0],item.id,item.cname)
61
62
63 # 2. 默认条件and
64 session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
65
66 # 3. between
67 session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
68
69 # 4. in
70 session.query(Users).filter(Users.id.in_([1,3,4])).all()
71 # not in
72 session.query(Users).filter(~Users.id.in_([1,3,4])).all()
73
74 # 5. 子查询
75 session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='tang'))).all()
76
77 # 6. and 和 or
78 from sqlalchemy import and_, or_
79 session.query(Users).filter(Users.id > 3, Users.name == 'tang').all()
80 session.query(Users).filter(and_(Users.id > 3, Users.name == 'tang')).all()
81 session.query(Users).filter(or_(Users.id < 2, Users.name == 'tang')).all()
82 session.query(Users).filter(or_(Users.id < 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all()
83
84 # 7. filter_by 只需字段名
85 session.query(Users).filter_by(name='alex').all()
86
87 # 8. 通配符
88 ret = session.query(Users).filter(Users.name.like('e%')).all()
89 ret = session.query(Users).filter(~Users.name.like('e%')).all()
90
91 # 9. 切片
92 result = session.query(Users)[1:2]
93
94 # 10.排序
95 ret = session.query(Users).order_by(Users.name.desc()).all()
96 ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
97
98 # 11. group by
99 from sqlalchemy.sql import func
100
101 ret = session.query(Users.depart_id,func.count(Users.id),).group_by(Users.depart_id).all()
102 for item in ret:
103 print(item)
104 #
105 # from sqlalchemy.sql import func
106 # 分组之后再进行查询
107 ret = session.query(
108 Users.depart_id,
109 func.count(Users.id),
110 ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
111 for item in ret:
112 print(item)
113
114 # 12.union 和 union all
115 """
116 select id,name from users
117 UNION
118 select id,name from users;
119 """
120 """
121 select id,name from users
122 UNION ALL
123 select id,name from users;
124 """
125 q1 = session.query(Depart.title).filter(Depart.id > 2)
126 q2 = session.query(Student.name).filter(Student.id < 2)
127 ret = q1.union(q2).all()
128 #
129 # q1 = session.query(Users.name).filter(Users.id > 2)
130 # q2 = session.query(Favor.caption).filter(Favor.nid < 2)
131 # ret = q1.union_all(q2).all()
132
133 """
134 union 和 union_all 的区别
135 union 去重
136 union_all 不去重
137
138 相同点:合并的两张表的列要相同
139 """
140
141 """
142 union 和 join的区别
143 union是垂直合并成一张表
144 join是水平合并成一张表
145 """
146
147 """
148 查看原生sql 打印不获取结果的语句就可以
149 sql = session.query(Users).filter(Users.id==1)
150 print(sql)
151 """
152
153 session.close()
链表操作 与 外键relation字段的使用
1 from sqlalchemy.orm import sessionmaker2 from sqlalchemy import create_engine
3 from models import Users,Depart
4
5
6 engine = create_engine(
7 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
8 max_overflow=0, # 超过连接池大小外最多创建的连接
9 pool_size=5, # 连接池大小
10 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
11 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
12 )
13
14 SessionFactory = sessionmaker(bind=engine)
15 session = SessionFactory()
16
17
18 # 单表操作
19 ret = session.query(Users).all()
20 for row in ret:
21 print(row.id,row.name, row.depart_id)
22
23
24 # 链表操作
25 ret = session.query(Users.id, Users.name, Depart.title).join(Depart,Users.depart_id==Depart.id).all()
26 for row in ret:
27 print(row.id, row.name, row.title)
28
29 # isouter 表示 left join 没有right join 只能调换查询顺序
30 ret = session.query(Users.id, Users.name, Depart.title).join(Users,isouter=True).all()
31 # print(ret)
32 for row in ret:
33 print(row.id, row.name, row.title)
34
35
36 # 3. relation字段:查询所有用户+所属部门名称
37 ret = session.query(Users).all()
38 for row in ret:
39 # relation dp的作用
40 print(row.id,row.name,row.depart_id, row.dp.title)
41
42 # 4. relation字段:查询销售部所有的人员
43 ret = session.query(Depart).filter(Depart.title=='销售部').first()
44 for row in ret.pers:
45 print(row.id, row.name, ret.title)
46
47 # 5. 创建一个名称叫:IT部门,再在该部门中添加一个员工:tanglaoer
48 u1 = Users(name='tanglaoer',dp=Depart(title='IT'))
49 session.add(u1)
50 session.commit()
51
52 # 6. 创建一个名称叫:技术部,再在该部门中添加一个员工:tang lao san
53 d1 = Depart(title='技术部')
54 d1.pers = [Users(name='tang'),Users(name='lao'), Users(name='san')]
55 session.add(d1)
56 session.commit()
57
58 # 在已存在的技术部 添加几名员工
59 d1 = session.query(Depart).filter(Depart.title == '技术部').first()
60 d1.pers = [Users(name='LIN'), Users(name='WU'),Users(name='SEN')]
61 session.add(d1)
62 session.commit()
63
64 session.close()
Foreign and join
多对多操作
1 from sqlalchemy.orm import sessionmaker2 from sqlalchemy import create_engine
3 from models import Student, Course, Student2Course
4
5 engine = create_engine(
6 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
7 max_overflow=0, # 超过连接池大小外最多创建的连接
8 pool_size=5, # 连接池大小
9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
11 )
12
13 SessionFactory = sessionmaker(bind=engine)
14
15 session = SessionFactory()
16 # 1. 录入数据
17 session.add_all([
18 Student(name='tang'),
19 Student(name='chen'),
20 Course(title='生物'),
21 Course(title='体育'),
22 ])
23 session.commit()
24
25 # 可批量增加多对多外键
26 session.add_all([
27 Student2Course(student_id=2,course_id=1),
28 Student2Course(student_id=1,course_id=1),
29 Student2Course(student_id=1,course_id=2),
30 ])
31
32 # 2. 三张表关联
33 ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).order_by(Course.id.asc()).all()
34 print(ret)
35 session.commit()
36
37 # 3. “tang”选的所有课
38 ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).filter(Student.name=='tang').all()
39 print(ret)
40
41 # relation 字段的使用
42 ret = session.query(Student).filter(Student.name== 'tang').first()
43 for row in ret.course_list:
44 print(row.title)
45
46
47 # 4. 选了“生物”的所有人
48 # relation 字段的方向使用
49 ret = session.query(Course).filter(Course.title == '生物').first()
50 for row in ret.student_list:
51 print(row.name, ret.title)
52
53 # 5. 创建一个课程,创建2学生,两个学生选新创建的课程。
54 obj = Course(title='英语')
55 obj.student_list = [Student(name='lin'), Student(name='wu')]
56 session.add(obj)
57 session.commit()
58
59 # 创建一个学生,加入多门新创建课程
60 stu = Student(name='tang')
61 stu.course_list = [Course(title='数学'), Course(title='地理')]
62 session.add(stu)
63 session.commit()
64
65 # 把tang添加到已存在的课程中
66 from sqlalchemy import or_
67 stu = session.query(Student).filter(Student.name=='tang').first()
68 stu.course_list = session.query(Course).filter(or_(Course.id == 1, Course.id ==3)).all()
69 print(stu.course_list)
70 session.add(stu)
71 session.commit()
72
73 session.close()
many2many
sqlalchemy 连接与多线程的操作
1 from sqlalchemy.orm import sessionmaker2 from sqlalchemy import create_engine
3 from models import Student
4 engine = create_engine(
5 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
6 max_overflow=0, # 超过连接池大小外最多创建的连接
7 pool_size=5, # 连接池大小
8 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
9 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
10 )
11 SessionFactory = sessionmaker(bind=engine)
12
13 def task():
14 # 去连接池中获取一个连接
15 # 第一版本
16 session = SessionFactory()
17
18 ret = session.query(Student).all()
19 print(ret)
20 # 将连接交还给连接池
21 session.close()
22
23
24 from threading import Thread
25
26 for i in range(20):
27 t = Thread(target=task)
28 t.start()
第一版本
1 from sqlalchemy.orm import sessionmaker2 from sqlalchemy import create_engine
3 from sqlalchemy.orm import scoped_session
4 from models import Student,Course,Student2Course
5
6 engine = create_engine(
7 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
8 max_overflow=0, # 超过连接池大小外最多创建的连接
9 pool_size=5, # 连接池大小
10 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
11 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
12 )
13 SessionFactory = sessionmaker(bind=engine)
14 session = scoped_session(SessionFactory)
15 # scoped_session 里面有threading.local
16 # 为每个线程赋予一个连接
17
18 def task():
19 ret = session.query(Student).all()
20 print(ret)
21 # 将连接交还给连接池
22 session.remove()
23
24
25 from threading import Thread
26
27 for i in range(20):
28 t = Thread(target=task)
29 t.start()
第二版本scoped_session
sqlalchemy 写原生SQL语句
1 from sqlalchemy.orm import sessionmaker2 from sqlalchemy import create_engine
3 from sqlalchemy.orm import scoped_session
4
5 engine = create_engine(
6 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",
7 max_overflow=0, # 超过连接池大小外最多创建的连接
8 pool_size=5, # 连接池大小
9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
11 )
12 SessionFactory = sessionmaker(bind=engine)
13 session = scoped_session(SessionFactory)
14
15
16 def task():
17 """"""
18 # 方式一:
19 # 查询
20 cursor = session.execute('select * from users')
21 result = cursor.fetchall()
22 print(result)
23
24 # 添加 参数通过"冒号"
25 cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'tanglaoer'})
26 session.commit()
27 print(cursor.lastrowid)
28
29 # 方式二:
30 # 与pymysql的链接一模一样
31 conn = engine.raw_connection()
32 cursor = conn.cursor()
33 cursor.execute(
34 "select * from users"
35 )
36 result = cursor.fetchall()
37 print(result)
38 cursor.close()
39 conn.close()
40
41 # 将连接交还给连接池
42 session.remove()
43
44
45 from threading import Thread
46
47 for i in range(20):
48 t = Thread(target=task)
49 t.start()
原生SQL
以上是 python之sqlalchemy的使用 的全部内容, 来源链接: utcz.com/z/388187.html