Как вызвать функцию задержки задачи сельдерея из языков, отличных от python, таких как Java?

У меня есть установка celery + rabbitmq для 3-х кластерной машины. Я также создал задачу, которая генерирует регулярное выражение на основе данных из файла и использует эту информацию для анализа текста.

from celery import Celery celery = Celery('tasks', broker='amqp://localhost//') import re @celery.task def add(x, y): return x + y def get_regular_expression(): with open("text") as fp: data = fp.readlines() str_re = "|".join([x.split()[2] for x in data ]) return str_re @celery.task def analyse_json(tw): str_re = get_regular_expression() re.match(str_re,tw.text) 

Я могу сделать вызов этой задачи очень легко, используя следующий код python:

 from tasks import analyse_tweet_json x = tweet ## load from a file (x is a json) analyse_tweet_json.delay(x) 

Однако теперь я хочу сделать тот же вызов с Java, а не с python. Я не уверен, что это самый простой способ сделать то же самое.

Я написал этот код для отправки сообщения брокеру AMQP. Код работает нормально, но задача не выполняется. Я не уверен, как указать имя задачи, которая должна быть выполнена.

 import com.rabbitmq.client.AMQP; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; class try1 { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "celery", "celery"); String messageBody = "{\"text\":\"i am good\"}" ; byte[] msgBytes = messageBody.getBytes("ASCII") ; channel.basicPublish(queueName, queueName, new AMQP.BasicProperties ("application/json", null, null, null, null, null, null, null, null, null, null, "guest", null, null),messageBody.getBytes("ASCII")) ; connection.close(); 

}}

это результат в журнале ошибок rabbitMq: –

 connection <0.14627.0>, channel 1 - error: {amqp_error,not_found, "no exchange 'amq.gen-gEV47GX9pF_oZ-0bEnOazE' in vhost '/'", 'basic.publish'} 

Любая помощь будет оценена.

спасибо, Амит

2 Solutions collect form web for “Как вызвать функцию задержки задачи сельдерея из языков, отличных от python, таких как Java?”

Было несколько вопросов.

1) String queueName = channel.queueDeclare (). Команда getQueue () возвращала неправильное имя очереди. Я изменил queuename на «сельдерей», и он отлично работал. 2) Формат json должен быть такого типа: – {"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77", "task": "celery.task.PingTask", "args": [], "kwargs": {}, "retries": 0, "eta": "2009-11-17T12: 30: 56.527191"}

как показано в http://docs.celeryproject.org/en/latest/internals/protocol.html

После этих двух изменений он отлично справился.

-Amit

сельдерей неявно объявляет обмен, используя Java, вам придется объявить его самостоятельно.

см. « Взаимодействие с Django / Celery from Java»

Python - лучший язык программирования в мире.