django+celery+ansibleApi无返回

1.python调用AnsibleApi远程执行任务,不用celery的情况下能正确运行,使用的话返回为空.pdb调试发现是调用Ansible返回异常,但具体原因几天实在无法查出

2.代码复现如现如下:

  • tasks.py

from celery import shared_task

from .deploy_tomcat2 import django_process

@shared_task

def deploy(jira_num):

#return 'hello world {0}'.format(jira_num)

#rdb.set_trace()

return django_process(jira_num)

  • deploy_tomcat2.py

from .AnsibleApi import CallApi

def django_process(jira_num):

server = '10.10.10.30'

name = 'abc'

port = 11011

code = 'efs'

jdk = '1.12.13'

jvm = 'xxxx'

if str.isdigit(jira_num):

# import pdb

# pdb.set_trace()

call = CallApi(server,name,port,code,jdk,jvm)

return call.run_task()

  • AnsibleApi.py

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import logging

from .Logger import Logger

from django.conf import settings

from collections import namedtuple

from ansible.parsing.dataloader import DataLoader

from ansible.vars import VariableManager

from ansible.inventory import Inventory

from ansible.playbook.play import Play

from ansible.executor.task_queue_manager import TaskQueueManager

from ansible.plugins.callback import CallbackBase

Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO)

class ResultCallback(CallbackBase):

def __init__(self, *args, **kwargs):

super(ResultCallback ,self).__init__(*args, **kwargs)

self.host_ok = {}

self.host_unreachable = {}

self.host_failed = {}

def v2_runner_on_unreachable(self, result):

self.host_unreachable[result._host.get_name()] = result

def v2_runner_on_ok(self, result, *args, **kwargs):

self.host_ok[result._host.get_name()] = result

def v2_runner_on_failed(self, result, *args, **kwargs):

self.host_failed[result._host.get_name()] = result

class CallApi(object):

user = settings.SSH_USER

ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE

results_callback = ResultCallback()

Options = namedtuple('Options',

['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method',

'become_user', 'check'])

def __init__(self,ip,name,port,code,jdk,jvm):

self.ip = ip

self.name = name

self.port = port

self.code = code

self.jdk = jdk

self.jvm = jvm

self.results_callback = ResultCallback()

self.results_raw = {}

def _gen_user_task(self):

tasks = []

deploy_script = 'autodeploy/tomcat_deploy.sh'

dst_script = '/tmp/tomcat_deploy.sh'

cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755')

args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm)

tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out'))

tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}'))))

# tasks.append(dict(action=dict(module='command', args=args)))

# tasks.append(dict(action=dict(module='command', args=args), register='result'))

# tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}'))))

self.tasks = tasks

def _set_option(self):

self._gen_user_task()

self.variable_manager = VariableManager()

self.loader = DataLoader()

self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None,

become=True, become_method='sudo', become_user='root', check=False)

self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip])

self.variable_manager.set_inventory(self.inventory)

play_source = dict(

name = "auto deploy tomcat",

hosts = self.ip,

remote_user = self.user,

gather_facts='no',

tasks = self.tasks

)

self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)

def run_task(self):

self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}}

tqm = None

from celery.contrib import rdb;rdb.set_trace()

#import pdb;pdb.set_trace()

self._set_option()

try:

tqm = TaskQueueManager(

inventory=self.inventory,

variable_manager=self.variable_manager,

loader=self.loader,

options=self.options,

passwords=None,

stdout_callback=self.results_callback,

)

result = tqm.run(self.play)

finally:

if tqm is not None:

tqm.cleanup()

for host, result in self.results_callback.host_ok.items():

self.results_raw['success'][host] = result._result

for host, result in self.results_callback.host_failed.items():

self.results_raw['failed'][host] = result._result

for host, result in self.results_callback.host_unreachable.items():

self.results_raw['unreachable'][host]= result._result

Log.info("result is :%s" % self.results_raw)

return self.results_raw

  • 复现方法

  • 启动celery worker:
    celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info

  • 另一窗口生产消息:
    deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy')

回答:

有两种方法解决这个问题,就是关闭assert:
1.在celery 的worker启动窗口设置export PYTHONOPTIMIZE=1或打开celery这个参数-O OPTIMIZATION
2.注释掉python包multiprocessing下面process.py中102行,关闭assert

回答:

你可以改写一下ansible中TaskQueueManager创建进程池函数

clipboard.png

clipboard.png

回答:

既然都用django,CRUD看来是标配了,那么你不如试试 post_save 这个 signal
直接 deploy.delay(**params)

回答:

clipboard.png
请问解决了没,我应该是遇到同样的问题,delay执行有输出,可实际上没执行到ansible的操作

以上是 django+celery+ansibleApi无返回 的全部内容, 来源链接: utcz.com/a/165891.html

回到顶部