RabbitMQ закрывает соединение при обработке длинных запусков задач и тайм-аута создает ошибки

Я использую производителя RabbitMQ для отправки долгосрочных задач (30 минут +) для потребителя. Проблема в том, что потребитель все еще работает над задачей, когда соединение с сервером закрывается и запрашивается неподтвержденная задача.

Из исследования я понимаю, что для решения этой проблемы можно использовать либо биение сердца, либо увеличенный тайм-аут соединения . Оба эти решения вызывают ошибки при их попытке. При чтении ответов на аналогичные сообщения я также узнал, что многие изменения были внесены в RabbitMQ, поскольку ответы были опубликованы (например, таймаут по умолчанию по умолчанию изменился на 60 с 580 до RabbitMQ 3.5.5).

При указании биения и заблокированного таймаута соединения:

credentials = pika.PlainCredentials('user', 'password') parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000) connection = pika.BlockingConnection(parameters) channel = connection.channel() 

Отобразится следующая ошибка:

 TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout' 

При указании heartbeat_interval=1000 в параметрах соединения показана аналогичная ошибка: TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

Аналогично для socket_timeout = 1000 отображается следующая ошибка: TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

Я запускаю RabbitMQ 3.6.1, pika 0.10.0 и python 2.7 на Ubuntu 14.04.

  1. Почему вышеупомянутые подходы приводят к ошибкам?
  2. Можно ли использовать метод сердечного ритма, когда существует продолжительная непрерывная работа? Например, могут использоваться биения сердца при выполнении больших объединений баз данных, которые занимают 30+ минут? Я за то, что так много раз, много раз трудно судить о том, как долго будет выполняться такая задача, как объединение базы данных.

Я прочитал ответы на подобные вопросы

Обновление : запуск кода из документации pika приводит к той же ошибке.

Я столкнулся с той же проблемой с моими системами, которые вы видите, с отключенным соединением во время очень длинных задач.

Возможно, сердцебиение может помочь сохранить ваше соединение в живом состоянии, если ваша сетевая настройка такова, что простоя TCP / IP-соединений сильно удалены. Если это не так, изменение сердечного ритма не поможет.

Изменение таймаута соединения не поможет вообще. Этот параметр используется только при первоначальном создании соединения.

Я использую производителя RabbitMQ для отправки долгосрочных задач (30 минут +) для потребителя. Проблема в том, что потребитель все еще работает над задачей, когда соединение с сервером закрывается и запрашивается неподтвержденная задача.

для этого есть две причины, с которыми вы уже столкнулись:

  1. Соединения падают случайным образом, даже в лучших обстоятельствах
  2. Повторное начало процесса из-за сообщения с повторной очередью может вызвать проблемы

Развернув код RabbitMQ с задачами, которые варьируются от менее чем секунды, до нескольких часов во времени, я обнаружил, что мгновенное подтверждение сообщения и обновление системы сообщениями о состоянии лучше всего подходят для очень длинных задач, например.

Вам понадобится система записи (возможно, с базой данных), которая отслеживает статус заданного задания.

Когда потребитель берет сообщение и запускает процесс, он должен немедленно подтвердить сообщение и отправить сообщение о состоянии «запуска» в систему записи.

По завершении процесса отправьте другое сообщение, чтобы сказать, что это сделано.

Это не решит проблему с отключенным подключением, но ничто так не решит 100%. Вместо этого он предотвратит возникновение проблемы с перезагрузкой сообщения при удалении соединения.

Это решение действительно вводит еще одну проблему: хотя при длительном запуске процесса, как вы возобновляете работу?

Основной ответ заключается в том, чтобы использовать статус системы записи (вашей базы данных) для задания, чтобы сообщить вам, что вам нужно снова забрать эту работу. Когда приложение запустится, проверьте базу данных, чтобы убедиться, что есть незавершенная работа. Если есть, возобновить или перезапустить, что работа любым способом подходит.