使用multiprocessing.Process调用start方法后,有较小的几率子进程中run方法未执行

继承multiprocessing.Process实现了一个worker类,在父进程中,自己实现了一个最多启动N的限制(出问题的环境是30个)。
实际运行中发现,大约有万分之二(当前每天运行46000+次,大约出现11次)的概率,子进程创建后run方法未执行。
代码和日志如下,注意打印日志的语句
父进程启动子进程(父进程里还有一个控制并发进程数量的逻辑,如果需要的话我贴出来):

...

def run_task(self, task):

logging.info('execute monitor %s' % task['id'])

worker = execute_worker.ExecuteWorkerProcess(task)

logging.debug('execute process start %s' % task['id'])

worker.start()

logging.info('worker pid is %s (%s)' % (worker.pid, task['id']))

logging.debug('execute process started %s' % task['id'])

self.worker_pool.append(worker)

...

子进程run方法

class ExecuteWorkerProcess(multiprocessing.Process):

...

def __init__(self, task):

super(ExecuteWorkerProcess, self).__init__()

self.stopping = False

self.task = task

self.worker = ExecuteWorker(task)

if 'task' in task:

self.routine = False

else:

self.routine = True

self.zk = None

logging.debug('process created %s' % self.task['id'])

...

def run(self):

logging.debug('process start %s' % self.task['id'])

try:

logging.debug('process run before %s' % self.task['id'])

self._run()

logging.debug('process run after %s' % self.task['id'])

except:

logging.exception('')

title = u'监控执行进程报错'

text = u'监控项id:%s\n错误信息:\n%s' % (self.task['id'], traceback.format_exc())

warning.email_warning(title, text, to_dev=True)

logging.debug('process start done %s' % self.task['id'])

...

出现问题的进程日志如下:
图片描述

正常任务日志如下:
图片描述

可以看到正常和异常的日志主进程中都打印除了子进程的pid,但是异常继承子进程run行数的第一行没有执行。
是否有人遇到过?这个是不是multiprocessing.Process的坑,有没有规避办法...

回答:

原因已找到,由于主进程中使用了thread+mutiprocessing(fork),导致logging出现死锁,现象就是遇到子进程里第一句logging就hang住。问题只会发生在Linux下。
看了stckoverflow这个答案找到的复现方法,另一个回答解决方案
复现demo:

#coding=utf-8

import os

import time

import logging

import threading

import multiprocessing

import logging.handlers

def init_log(log_path, level=logging.INFO, when="midnight", backup=7,

format="%(levelname)s:[%(asctime)s][%(filename)s:%(lineno)d][%(process)s][%(thread)d] %(message)s",

datefmt="%Y-%m-%d %H:%M:%S"):

formatter = logging.Formatter(format, datefmt)

logger = logging.getLogger()

logger.setLevel(level)

dir = os.path.dirname(log_path)

if not os.path.isdir(dir):

os.makedirs(dir)

handler = logging.handlers.TimedRotatingFileHandler(log_path + ".log",

when=when,

backupCount=backup)

handler.setLevel(level)

handler.setFormatter(formatter)

logger.addHandler(handler)

stop = False

class Worker(multiprocessing.Process):

u"""

多进程方式执行任务,使用CPU密集型

"""

def __init__(self):

super(Worker, self).__init__()

self.reset_ts = time.time() + 3

self.stopping = False

def run(self):

logging.info('Process pid is %s' % os.getpid())

def check_timeout(self, now):

u"""

若当前时间已超过复位时间,则结束进程

:param now:

:return:

"""

global stop

if now > self.reset_ts and not self.stopping:

self.stopping = True

logging.info('error pid is %s' % os.getpid())

stop = True

def main():

global stop

worker_pool = []

while 1:

if worker_pool:

now = time.time()

for worker in worker_pool:

worker.check_timeout(now)

if stop:

logging.error('Process not run, exit!')

exit(-1)

alive_workers = [worker for worker in worker_pool if worker.is_alive()]

over_workers = list(set(alive_workers) ^ set(worker_pool))

for over_worker in over_workers:

over_worker.join()

worker_pool = alive_workers

if len(worker_pool) < 1000:

logging.info('create worker')

worker = Worker()

worker.start()

logging.info('worker pid is %s' % worker.pid)

worker_pool.append(worker)

time.sleep(0.001)

class ExecuteThread(threading.Thread):

def run(self):

main()

class ExecuteThread2(threading.Thread):

def run(self):

global stop

while 1:

logging.info('main thread')

time.sleep(0.001)

if stop:

exit(-1)

if __name__ == '__main__':

init_log('/yourpath/timeout')

#main()

thread = ExecuteThread()

thread2 = ExecuteThread2()

thread.start()

thread2.start()

thread.join()

thread2.join()

复现完了记得清掉hang住的进程....

以上是 使用multiprocessing.Process调用start方法后,有较小的几率子进程中run方法未执行 的全部内容, 来源链接: utcz.com/a/165072.html

回到顶部