Python threading with queue: как избежать использования соединения?

У меня сценарий с двумя потоками:

  1. поток, ожидающий сообщений из сокета (встроенный в библиотеку C – блокирующий вызов – «Barra.ricevi»), затем помещает элемент в очередь

  2. поток, ожидающий, чтобы получить элемент из очереди и сделать что-то

Образец кода

import Barra import Queue import threading posQu = Queue.Queue(maxsize=0) def threadCAN(): while True: canMsg = Barra.ricevi("can0") if canMsg[0] == 'ERR': print (canMsg) else: print ("Enqueued message"), canMsg posQu.put(canMsg) thCan = threading.Thread(target = threadCAN) thCan.daemon = True thCan.start() while True: posMsg = posQu.get() print ("Messagge from the queue"), posMsg 

В результате каждый раз, когда новое сообщение поступает из сокета, в очередь добавляется новый элемент, но основной поток, который должен получать элементы из очереди, никогда не проснулся.

Вывод следующий:

Сообщение о статусе

Сообщение о статусе

Сообщение о статусе

Сообщение о статусе

Я ожидал:

Сообщение о статусе

Messagge из очереди

Сообщение о статусе

Messagge из очереди

Единственный способ решить эту проблему швов, чтобы добавить строку:

 posQu.join() 

в конце потока, ожидающего сообщения из сокета, и строки:

 posQu.task_done() 

в конце основной нити.

В этом случае после этого из сокета получено новое сообщение, поток блокируется, ожидая, когда основной поток обработает выделенный объект.

К сожалению, это не является желаемым поведением, так как я хотел бы, чтобы поток всегда был готов получать сообщения из сокета и не дожидался завершения задания из другого потока.

Что я делаю неправильно? благодаря

Эндрю (Италия)

    Вероятно, это связано с тем, что ваша Barra не выпускает глобальную блокировку интерпретатора (GIL), когда Barra.ricevi . Однако вы можете проверить это.

    GIL гарантирует, что только один поток может работать в любой момент времени (ограничивая полезность потоков в многопроцессорной системе). GIL переключает потоки каждые 100 «тиков» – тик свободно привязывается к инструкциям байткода. Подробнее см. Здесь .

    В вашем потоке производителя не так много происходит за пределами вызова C-библиотеки. Это означает, что продюсерский поток получит вызов Barra.ricevi очень много раз, прежде чем GIL переключится на другой поток.

    Решения для этого – с точки зрения возрастающей сложности:

    • Вызовите time.sleep(0) после добавления элемента в очередь. Это дает поток, так что другой поток может работать.
    • Используйте sys.setcheckinterval() чтобы уменьшить количество «тиков», выполняемых перед переключением потоков. Это будет зависеть от того, что программа будет намного дороже вычислить.
    • Используйте multiprocessing а не threading . Это включает в себя использование multiprocessing.Queue вместо Queue.Queue .
    • Измените Barra так, чтобы он освобождал GIL, когда вызываются его функции.

    Пример использования multiprocessing . Имейте в виду, что при использовании многопроцессорности ваши процессы больше не имеют подразумеваемого общего состояния. Вам нужно будет взглянуть на многопроцессорность, чтобы узнать, как передавать информацию между процессами.

     import Barra import multiprocessing def threadCAN(posQu): while True: canMsg = Barra.ricevi("can0") if canMsg[0] == 'ERR': print(canMsg) else: print("Enqueued message", canMsg) posQu.put(canMsg) if __name__ == "__main__": posQu = multiprocessing.Queue(maxsize=0) procCan = multiprocessing.Process(target=threadCAN, args=(posQu,)) procCan.daemon = True procCan.start() while True: posMsg = posQu.get() print("Messagge from the queue", posMsg)