python多进程共享一个可操作的变量, 如何保证原子操作?
python多进程共享一个可操作的变量, 如何保证原子操作?
我的需求
目前有多组数据需要执行计算型任务, 每执行完一组数据就要给java通知并传递生成的结果文件, 并且我要在把所有任务都执行完的时候(也就是最后一组数据执行完的时候)告诉java本轮次数据全部执行完毕, java那边就要去整体做数据库入库的操作。
我的设想
多进程之间维护一个整型数值, 在执行完一组数据的时候自增1, 并在通知java的方法中去比较这个整型数值和总任务数量, 相等就代表全部完毕。(这中间不考虑计算任务执行失败的情况)
面临的情况
我通过 multiprocessing
模块中的 Manager
去声明了一个整型变量, 然后给每个进程传递了过去, 在执行自增的时候, 出现了多个进程读取到同一个值的情况(详细可以看下面的输出, 或者自行执行下面的demo), 无法满足数值的读写原子, 我尝试了加锁, 但是没有生效。
这是我抽象出来的demo
from concurrent.futures import ProcessPoolExecutorimport ctypes
from multiprocessing import Manager, Lock
from multiprocessing.managers import ValueProxy
import os
m = Manager().Value(ctypes.c_int, 0)
def calc_number(x: int, y: int, _m: "ValueProxy", total_tasks: int):
"""模拟耗时任务函数"""
# 模拟耗时计算
res = x**y
# with Lock(): 加锁也不管用...
# 多进程共享变量, 用于比较总任务数量
with Lock():
_m.value += 1
# 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了
if _m.value == total_tasks:
print(True)
print(f"m_value: {_m.value}, p_id: {os.getpid()}, res: {res}")
def main():
# 以下假设有8组任务
t1 = (100, 200, 300, 400, 500, 600, 700, 800)
t2 = (80, 70, 60, 50, 40, 30, 20, 10)
len_t = len(t1)
# 多进程执行cpu耗时性任务
with ProcessPoolExecutor(max_workers=len_t) as executor:
{executor.submit(calc_number, x, y, m, len_t) for x, y in zip(t1, t2)}
if __name__ == "__main__":
main()
这是我目前的demo,从我的业务代码中抽象出来的。
这是代码的输出打印(很明显是错误的):
m_value: 2, p_id: 14873, res: 118059162071741130342400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000m_value: 2, p_id: 14877, res: 12676506002282294014967032053760000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14875, res: 42391158275216203514294433201000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 3, p_id: 14872, res: 10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 4, p_id: 14883, res: 797922662976120010000000000000000000000000000000000000000
m_value: 5, p_id: 14879, res: 909494701772928237915039062500000000000000000000000000000000000000000000000000000000000000000000000000000000
m_value: 5, p_id: 14881, res: 221073919720733357899776000000000000000000000000000000000000000000000000000000000000
m_value: 6, p_id: 14885, res: 107374182400000000000000000000
正确的输出应该 m_value
是 1,2,3,4,5,6,7,8
以此打印出来的。
希望对此颇有研究的大佬指点迷津, 或者给出其他可行方案。最好贴出代码, 不胜感激。
问题已解决: 锁多次创建没有保证是同一把锁是主因
回答:
from concurrent.futures import ProcessPoolExecutorimport ctypes
from multiprocessing import Manager, Lock
import os
# 创建 Manager 和 Lock
manager = Manager()
m = manager.Value(ctypes.c_int, 0)
lock = manager.Lock()
def calc_number(x: int, y: int, _m, total_tasks: int, _lock):
"""模拟耗时任务函数"""
# 模拟耗时计算
res = x ** y
# 用锁来保证原子操作
with _lock:
_m.value += 1
current_value = _m.value
# 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了
if current_value == total_tasks:
print(True)
print(f"m_value: {current_value}, p_id: {os.getpid()}, res: {res}")
def main():
# 任务参数
t1 = (100, 200, 300, 400, 500, 600, 700, 800)
t2 = (80, 70, 60, 50, 40, 30, 20, 10)
len_t = len(t1)
# 多进程执行任务
with ProcessPoolExecutor(max_workers=len_t) as executor:
for x, y in zip(t1, t2):
executor.submit(calc_number, x, y, m, len_t, lock)
if __name__ == "__main__":
main()
以上是 python多进程共享一个可操作的变量, 如何保证原子操作? 的全部内容, 来源链接: utcz.com/p/939064.html