SQLAlchemy批量更新策略
我目前正在使用SQLAlchemy(在GAE上连接到Google的云MySQL)编写Web应用程序(Flask)并需要对表进行批量更新。总之,进行了一些计算,导致需要在1000个对象上更新单个值。目前我正在做一切交易,但最后还是要花费很多时间。SQLAlchemy批量更新策略
该表的索引号为id,这些都是在单个事务中执行的。所以我相信我避免了通常的错误,但仍然非常缓慢。
INFO  2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s 2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656} 
从我的理解是没有办法做到在SQL实际上是一个批量更新,以上声明最终被多个UPDATE语句被发送到服务器。
我试过使用Session.bulk_update_mappings(),但似乎并没有真正做任何事情:(不知道为什么,但更新从未真正发生过,我看不到任何实际使用的方法的实例(包括在性能套件)所以不知道它是否打算使用。
One technique I've seen discussed正在做一个批量插入到另一个表,然后做一个UPDATE JOIN。我已经给它一个测试,就像下面,它似乎是明显更快。
wallets = db_session.query(Wallet).all() ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ] 
db_session.bulk_save_objects(ledgers) 
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount') 
db_session.execute('TRUNCATE ledger') 
但现在的问题是如何构建我的代码。我使用ORM和我需要以某种方式不“肮脏”原始Wallet对象,以便他们不会以旧的方式承诺。我可以创建这些Ledger对象,并保留它们的列表,然后在批量操作结束时手动插入它们。但是,这几乎闻起来像我复制ORM机制的一些工作。
有没有更聪明的方法来做到这一点?到目前为止,我的大脑正在下降,是这样的:正如我所说的
class Wallet(Base):     ... 
    _balance = Column(Float) 
    ... 
@property 
def balance(self): 
    # first check if we have a ledger of the same id 
    # and return the amount in that, otherwise... 
    return self._balance 
@balance.setter 
def balance(self, amount): 
    l = Ledger(id=self.id, amount=amount) 
    # add l to a list somewhere then process later 
# At the end of the transaction, do a bulk insert of Ledgers 
# and then do an UPDATE JOIN and TRUNCATE 
,这一切似乎是对抗我(可能)拥有的工具。有没有更好的方式来处理这个问题?我可以利用ORM机制来做到这一点吗?或者是否有更好的方式来进行批量更新?
编辑:或者是否有事情和会议巧妙的事情?也许before_flush?
编辑2:所以我试图进军事件机器,现在有这样的:
@event.listens_for(SignallingSession, 'before_flush') def before_flush(session, flush_context, instances): 
    ledgers = [] 
    if session.dirty: 
     for elem in session.dirty: 
      if (session.is_modified(elem, include_collections=False)): 
       if isinstance(elem, Wallet): 
        session.expunge(elem) 
        ledgers.append(Ledger(id=elem.id, amount=elem.balance)) 
    if ledgers: 
     session.bulk_save_objects(ledgers) 
     session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount') 
     session.execute('TRUNCATE ledger') 
这似乎很哈克和邪恶的我,但似乎工作确定。任何陷阱或更好的方法?
-Matt
回答:
你实质上是在绕过ORM来优化性能。因此,不要惊讶于“复制ORM正在做的工作”,因为这正是您需要做的。
除非你有很多地方需要做这样的批量更新,否则我会建议不要使用神奇事件方法;简单地写明确的查询要简单得多。
我的建议是使用SQLAlchemy的核心,而不是ORM做更新做:
ledger = Table("ledger", db.metadata,     Column("wallet_id", Integer, primary_key=True), 
    Column("new_balance", Float), 
    prefixes=["TEMPORARY"], 
) 
wallets = db_session.query(Wallet).all() 
# figure out new balances 
balance_map = {} 
for w in wallets: 
    balance_map[w.id] = calculate_new_balance(w) 
# create temp table with balances we need to update 
ledger.create(bind=db.session.get_bind()) 
# insert update data 
db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v} 
              for k, v in balance_map.items()]) 
# perform update 
db.session.execute(Wallet.__table__ 
         .update() 
         .values(balance=ledger.c.new_balance) 
         .where(Wallet.__table__.c.id == ledger.c.wallet_id)) 
# drop temp table 
ledger.drop(bind=db.session.get_bind()) 
# commit changes 
db.session.commit() 
回答:
一般来说是架构设计穷人需要频繁更新数千行。这且不说...
计划A:编写生成
START TRANSACTION; UPDATE wallet SET balance = ... WHERE id = ...; 
UPDATE wallet SET balance = ... WHERE id = ...; 
UPDATE wallet SET balance = ... WHERE id = ...; 
... 
COMMIT; 
B计划ORM代码:编写生成
CREATE TEMPORARY TABLE ToDo (    id ..., 
    new_balance ... 
); 
INSERT INTO ToDo -- either one row at a time, or a bulk insert 
UPDATE wallet 
    JOIN ToDo USING(id) 
    SET wallet.balance = ToDo.new_balance; -- bulk update 
ORM代码(检查语法,测试等等)
以上是 SQLAlchemy批量更新策略 的全部内容, 来源链接: utcz.com/qa/263805.html

