python之sqlalchemy的使用

python

准备数据

 1 from sqlalchemy.ext.declarative import declarative_base

2 from sqlalchemy import Column

3 from sqlalchemy import Integer, String, Text, Date, DateTime, ForeignKey, UniqueConstraint, Index

4 from sqlalchemy import create_engine

5 from sqlalchemy.orm import relationship

6

7 Base = declarative_base()

8

9

10 class Depart(Base):

11 __tablename__ = 'depart'

12 id = Column(Integer, primary_key=True)

13 title = Column(String(32), index=True, nullable=False)

14

15

16 class Users(Base):

17 __tablename__ = 'users'

18

19 id = Column(Integer, primary_key=True)

20 name = Column(String(32), index=True, nullable=False)

21 depart_id = Column(Integer, ForeignKey("depart.id"))

22

23 # 用于链表操作 与表的创建无关

24 dp = relationship("Depart", backref='pers')

25

26

27 class Student(Base):

28 __tablename__ = 'student'

29 id = Column(Integer, primary_key=True)

30 name = Column(String(32), index=True, nullable=False)

31

32 course_list = relationship('Course', secondary='student2course', backref='student_list')

33

34

35 class Course(Base):

36 __tablename__ = 'course'

37 id = Column(Integer, primary_key=True)

38 title = Column(String(32), index=True, nullable=False)

39

40

41 class Student2Course(Base):

42 __tablename__ = 'student2course'

43 id = Column(Integer, primary_key=True, autoincrement=True)

44 student_id = Column(Integer, ForeignKey('student.id'))

45 course_id = Column(Integer, ForeignKey('course.id'))

46

47 __table_args__ = (

48 UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引

49 # Index('ix_id_name', 'name', 'extra'), # 联合索引

50 )

51

52

53 def create_all():

54 engine = create_engine(

55 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",

56 max_overflow=0, # 超过连接池大小外最多创建的连接

57 pool_size=5, # 连接池大小

58 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

59 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

60 )

61

62 Base.metadata.create_all(engine)

63

64

65 def drop_all():

66 engine = create_engine(

67 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",

68 max_overflow=0, # 超过连接池大小外最多创建的连接

69 pool_size=5, # 连接池大小

70 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

71 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

72 )

73 Base.metadata.drop_all(engine)

74

75

76 if __name__ == '__main__':

77 # drop_all()

78 create_all()

models.py

基本操作

  1 from sqlalchemy.orm import sessionmaker

2 from sqlalchemy import create_engine

3 from models import Users, Student, Depart

4

5 engine = create_engine(

6 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",

7 max_overflow=0, # 超过连接池大小外最多创建的连接

8 pool_size=5, # 连接池大小

9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

11 )

12 SessionFactory = sessionmaker(bind=engine)

13

14 # 从连接池获取一个连接

15 session = SessionFactory()

16

17 # ############################## 基本增删改查 ###############################

18 # 1. 增加

19 obj = Users(name='tang')

20 session.add(obj)

21 session.commit()

22

23 # 批量增加

24 session.add_all([

25 Users(name='tang'),

26 Users(name='chen')

27 ])

28 session.commit()

29

30 # 2. 查

31 result = session.query(Users).all()

32 for row in result:

33 print(row.id,row.name)

34

35 # sqlalchemy 的语法跟Python很相似

36 result = session.query(Users).filter(Users.id >= 2)

37 for row in result:

38 print(row.id,row.name)

39

40

41 # 获取第一个

42 result = session.query(Users).filter(Users.id >= 2).first()

43 print(result)

44

45 # 3.删

46 session.query(Users).filter(Users.id >= 2).delete()

47 session.commit()

48

49 # 4.改 通过字典

50 session.query(Users).filter(Users.id == 4).update({Users.name:'tang'})

51 session.query(Users).filter(Users.id == 4).update({'name':'tang'})

52 session.query(Users).filter(Users.id == 4).update({'name':Users.name+"_lao"},synchronize_session=False)

53 session.commit()

54

55 # ############################## 其他常用 ###############################

56 # 1. 指定列 去别名

57 # 对应原生SQL:select id,name as cname from users;

58 result = session.query(Users.id,Users.name.label('cname')).all()

59 for item in result:

60 print(item[0],item.id,item.cname)

61

62

63 # 2. 默认条件and

64 session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()

65

66 # 3. between

67 session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()

68

69 # 4. in

70 session.query(Users).filter(Users.id.in_([1,3,4])).all()

71 # not in

72 session.query(Users).filter(~Users.id.in_([1,3,4])).all()

73

74 # 5. 子查询

75 session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='tang'))).all()

76

77 # 6. and 和 or

78 from sqlalchemy import and_, or_

79 session.query(Users).filter(Users.id > 3, Users.name == 'tang').all()

80 session.query(Users).filter(and_(Users.id > 3, Users.name == 'tang')).all()

81 session.query(Users).filter(or_(Users.id < 2, Users.name == 'tang')).all()

82 session.query(Users).filter(or_(Users.id < 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all()

83

84 # 7. filter_by 只需字段名

85 session.query(Users).filter_by(name='alex').all()

86

87 # 8. 通配符

88 ret = session.query(Users).filter(Users.name.like('e%')).all()

89 ret = session.query(Users).filter(~Users.name.like('e%')).all()

90

91 # 9. 切片

92 result = session.query(Users)[1:2]

93

94 # 10.排序

95 ret = session.query(Users).order_by(Users.name.desc()).all()

96 ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

97

98 # 11. group by

99 from sqlalchemy.sql import func

100

101 ret = session.query(Users.depart_id,func.count(Users.id),).group_by(Users.depart_id).all()

102 for item in ret:

103 print(item)

104 #

105 # from sqlalchemy.sql import func

106 # 分组之后再进行查询

107 ret = session.query(

108 Users.depart_id,

109 func.count(Users.id),

110 ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()

111 for item in ret:

112 print(item)

113

114 # 12.union 和 union all

115 """

116 select id,name from users

117 UNION

118 select id,name from users;

119 """

120 """

121 select id,name from users

122 UNION ALL

123 select id,name from users;

124 """

125 q1 = session.query(Depart.title).filter(Depart.id > 2)

126 q2 = session.query(Student.name).filter(Student.id < 2)

127 ret = q1.union(q2).all()

128 #

129 # q1 = session.query(Users.name).filter(Users.id > 2)

130 # q2 = session.query(Favor.caption).filter(Favor.nid < 2)

131 # ret = q1.union_all(q2).all()

132

133 """

134 union 和 union_all 的区别

135 union 去重

136 union_all 不去重

137

138 相同点:合并的两张表的列要相同

139 """

140

141 """

142 union 和 join的区别

143 union是垂直合并成一张表

144 join是水平合并成一张表

145 """

146

147 """

148 查看原生sql 打印不获取结果的语句就可以

149 sql = session.query(Users).filter(Users.id==1)

150 print(sql)

151 """

152

153 session.close()

链表操作 与 外键relation字段的使用

 1 from sqlalchemy.orm import sessionmaker

2 from sqlalchemy import create_engine

3 from models import Users,Depart

4

5

6 engine = create_engine(

7 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",

8 max_overflow=0, # 超过连接池大小外最多创建的连接

9 pool_size=5, # 连接池大小

10 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

11 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

12 )

13

14 SessionFactory = sessionmaker(bind=engine)

15 session = SessionFactory()

16

17

18 # 单表操作

19 ret = session.query(Users).all()

20 for row in ret:

21 print(row.id,row.name, row.depart_id)

22

23

24 # 链表操作

25 ret = session.query(Users.id, Users.name, Depart.title).join(Depart,Users.depart_id==Depart.id).all()

26 for row in ret:

27 print(row.id, row.name, row.title)

28

29 # isouter 表示 left join 没有right join 只能调换查询顺序

30 ret = session.query(Users.id, Users.name, Depart.title).join(Users,isouter=True).all()

31 # print(ret)

32 for row in ret:

33 print(row.id, row.name, row.title)

34

35

36 # 3. relation字段:查询所有用户+所属部门名称

37 ret = session.query(Users).all()

38 for row in ret:

39 # relation dp的作用

40 print(row.id,row.name,row.depart_id, row.dp.title)

41

42 # 4. relation字段:查询销售部所有的人员

43 ret = session.query(Depart).filter(Depart.title=='销售部').first()

44 for row in ret.pers:

45 print(row.id, row.name, ret.title)

46

47 # 5. 创建一个名称叫:IT部门,再在该部门中添加一个员工:tanglaoer

48 u1 = Users(name='tanglaoer',dp=Depart(title='IT'))

49 session.add(u1)

50 session.commit()

51

52 # 6. 创建一个名称叫:技术部,再在该部门中添加一个员工:tang lao san

53 d1 = Depart(title='技术部')

54 d1.pers = [Users(name='tang'),Users(name='lao'), Users(name='san')]

55 session.add(d1)

56 session.commit()

57

58 # 在已存在的技术部 添加几名员工

59 d1 = session.query(Depart).filter(Depart.title == '技术部').first()

60 d1.pers = [Users(name='LIN'), Users(name='WU'),Users(name='SEN')]

61 session.add(d1)

62 session.commit()

63

64 session.close()

Foreign and join

多对多操作

 1 from sqlalchemy.orm import sessionmaker

2 from sqlalchemy import create_engine

3 from models import Student, Course, Student2Course

4

5 engine = create_engine(

6 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",

7 max_overflow=0, # 超过连接池大小外最多创建的连接

8 pool_size=5, # 连接池大小

9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

11 )

12

13 SessionFactory = sessionmaker(bind=engine)

14

15 session = SessionFactory()

16 # 1. 录入数据

17 session.add_all([

18 Student(name='tang'),

19 Student(name='chen'),

20 Course(title='生物'),

21 Course(title='体育'),

22 ])

23 session.commit()

24

25 # 可批量增加多对多外键

26 session.add_all([

27 Student2Course(student_id=2,course_id=1),

28 Student2Course(student_id=1,course_id=1),

29 Student2Course(student_id=1,course_id=2),

30 ])

31

32 # 2. 三张表关联

33 ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).order_by(Course.id.asc()).all()

34 print(ret)

35 session.commit()

36

37 # 3. “tang”选的所有课

38 ret = session.query(Student2Course.id, Student.name, Course.title, Course.id).join(Student,Student2Course.student_id==Student.id).join(Course,Student2Course.course_id==Course.id).filter(Student.name=='tang').all()

39 print(ret)

40

41 # relation 字段的使用

42 ret = session.query(Student).filter(Student.name== 'tang').first()

43 for row in ret.course_list:

44 print(row.title)

45

46

47 # 4. 选了“生物”的所有人

48 # relation 字段的方向使用

49 ret = session.query(Course).filter(Course.title == '生物').first()

50 for row in ret.student_list:

51 print(row.name, ret.title)

52

53 # 5. 创建一个课程,创建2学生,两个学生选新创建的课程。

54 obj = Course(title='英语')

55 obj.student_list = [Student(name='lin'), Student(name='wu')]

56 session.add(obj)

57 session.commit()

58

59 # 创建一个学生,加入多门新创建课程

60 stu = Student(name='tang')

61 stu.course_list = [Course(title='数学'), Course(title='地理')]

62 session.add(stu)

63 session.commit()

64

65 # 把tang添加到已存在的课程中

66 from sqlalchemy import or_

67 stu = session.query(Student).filter(Student.name=='tang').first()

68 stu.course_list = session.query(Course).filter(or_(Course.id == 1, Course.id ==3)).all()

69 print(stu.course_list)

70 session.add(stu)

71 session.commit()

72

73 session.close()

many2many

sqlalchemy 连接与多线程的操作

 1 from sqlalchemy.orm import sessionmaker

2 from sqlalchemy import create_engine

3 from models import Student

4 engine = create_engine(

5 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",

6 max_overflow=0, # 超过连接池大小外最多创建的连接

7 pool_size=5, # 连接池大小

8 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

9 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

10 )

11 SessionFactory = sessionmaker(bind=engine)

12

13 def task():

14 # 去连接池中获取一个连接

15 # 第一版本

16 session = SessionFactory()

17

18 ret = session.query(Student).all()

19 print(ret)

20 # 将连接交还给连接池

21 session.close()

22

23

24 from threading import Thread

25

26 for i in range(20):

27 t = Thread(target=task)

28 t.start()

第一版本

 1 from sqlalchemy.orm import sessionmaker

2 from sqlalchemy import create_engine

3 from sqlalchemy.orm import scoped_session

4 from models import Student,Course,Student2Course

5

6 engine = create_engine(

7 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",

8 max_overflow=0, # 超过连接池大小外最多创建的连接

9 pool_size=5, # 连接池大小

10 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

11 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

12 )

13 SessionFactory = sessionmaker(bind=engine)

14 session = scoped_session(SessionFactory)

15 # scoped_session 里面有threading.local

16 # 为每个线程赋予一个连接

17

18 def task():

19 ret = session.query(Student).all()

20 print(ret)

21 # 将连接交还给连接池

22 session.remove()

23

24

25 from threading import Thread

26

27 for i in range(20):

28 t = Thread(target=task)

29 t.start()

第二版本scoped_session

sqlalchemy 写原生SQL语句

 1 from sqlalchemy.orm import sessionmaker

2 from sqlalchemy import create_engine

3 from sqlalchemy.orm import scoped_session

4

5 engine = create_engine(

6 "mysql+pymysql://root:123456@192.168.226.150:3306/flask_demo?charset=utf8",

7 max_overflow=0, # 超过连接池大小外最多创建的连接

8 pool_size=5, # 连接池大小

9 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错

10 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)

11 )

12 SessionFactory = sessionmaker(bind=engine)

13 session = scoped_session(SessionFactory)

14

15

16 def task():

17 """"""

18 # 方式一:

19 # 查询

20 cursor = session.execute('select * from users')

21 result = cursor.fetchall()

22 print(result)

23

24 # 添加 参数通过"冒号"

25 cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'tanglaoer'})

26 session.commit()

27 print(cursor.lastrowid)

28

29 # 方式二:

30 # 与pymysql的链接一模一样

31 conn = engine.raw_connection()

32 cursor = conn.cursor()

33 cursor.execute(

34 "select * from users"

35 )

36 result = cursor.fetchall()

37 print(result)

38 cursor.close()

39 conn.close()

40

41 # 将连接交还给连接池

42 session.remove()

43

44

45 from threading import Thread

46

47 for i in range(20):

48 t = Thread(target=task)

49 t.start()

原生SQL

以上是 python之sqlalchemy的使用 的全部内容, 来源链接: utcz.com/z/388187.html

回到顶部