通过Java与Django / Celery互操作

我们公司有一个基于Python的网站和一些基于Python的工作程序节点,它们通过Django /

Celery和RabbitMQ进行通信。我有一个基于Java的应用程序,需要将任务提交给基于Celery的工作人员。我可以将作业从Java发送到RabbitMQ很好,但是基于Celery的工作人员从来没有接过工作。从查看两种类型的作业提交的数据包捕获来看,存在差异,但是我无法理解如何解释它们,因为其中很多都是二进制文件,因此我无法找到有关解码的文档。这里有人对Java

/ RabbitMQ和Celery一起工作有任何参考或经验吗?

回答:

我找到了解决方案。RabbitMQ的Java库引用交换/队列/路由键。在Celery中,队列名称实际上是映射到Java库中引用的交换。默认情况下,芹菜的队列只是“芹菜”。如果您的Django设置使用以下语法定义了一个名为“

myqueue”的队列:

CELERY_ROUTES = {

'mypackage.myclass.runworker' : {'queue':'myqueue'},

}

然后,基于Java的代码需要执行以下操作:

        ConnectionFactory factory = new ConnectionFactory();

Connection connection = null ;

try {

connection = factory.newConnection(mqHost, mqPort);

} catch (IOException ioe) {

log.error("Unable to create new MQ connection from factory.", ioe) ;

}

Channel channel = null ;

try {

channel = connection.createChannel();

} catch (IOException ioe) {

log.error("Unable to create new channel for MQ connection.", ioe) ;

}

try {

channel.queueDeclare("celery", false, false, false, true, null);

} catch (IOException ioe) {

log.error("Unable to declare queue for MQ channel.", ioe) ;

}

try {

channel.exchangeDeclare("myqueue", "direct") ;

} catch (IOException ioe) {

log.error("Unable to declare exchange for MQ channel.", ioe) ;

}

try {

channel.queueBind("celery", "myqueue", "myqueue") ;

} catch (IOException ioe) {

log.error("Unable to bind queue for channel.", ioe) ;

}

// Generate the message body as a string here.

try {

channel.basicPublish(mqExchange, mqRouteKey,

new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),

messageBody.getBytes("ASCII"));

} catch (IOException ioe) {

log.error("IOException encountered while trying to publish task via MQ.", ioe) ;

}

事实证明,这只是术语上的差异。

以上是 通过Java与Django / Celery互操作 的全部内容, 来源链接: utcz.com/qa/414511.html

回到顶部