Реализация многопроцессорной обработки .Queue и queue.Queue

Я ищу более подробные сведения о реализации Queues в Python, чем я могу найти в документации.

Из того, что я понял, и извините мое невежество, если я ошибаюсь в этом:

queue.Queue() : реализуется с помощью базовых массивов в памяти и поэтому не может использоваться совместно несколькими процессами, но может использоваться совместно между потоками. Все идет нормально.

multiprocessing.Queue() : реализуется через трубы ( man 2 pipes ), которые имеют ограничение по размеру (довольно крошечное: на Linux, man 7 pipe говорит 65536 untweaked):

Начиная с Linux 2.6.35, емкость по умолчанию составляет 65536 байт, но емкость может быть запрошена и задана с помощью операций fcntl(2) F_GETPIPE_SZ и F_SETPIPE_SZ

Но, в Python, всякий раз, когда я пытаюсь записать в трубу данные размером более 65536 байтов, он работает без исключения – я мог бы наводнить мою память следующим образом:

 import multiprocessing from time import sleep def big(): result = "" for i in range(1,70000): result += ","+str(i) return result # 408888 bytes string def writequeue(q): while True: q.put(big()) sleep(0.1) if __name__ == '__main__': q = multiprocessing.Queue() p = multiprocessing.Process(target=writequeue, args=(q,)) p.start() while True: sleep(1) # No pipe consumption, we just want to flood the pipe 

Итак, вот мои вопросы:

  • Python настраивает ограничение на канал? если да, то насколько? Исходный код Python приветствуется.

  • Связываются ли Python сообщения, взаимодействующие с другими процессами, отличными от Python? Если да, то приветствуются рабочие примеры (желательно JS) и ссылки на ресурсы.

One Solution collect form web for “Реализация многопроцессорной обработки .Queue и queue.Queue”

Почему q.put () не блокирует?

mutiprocessing.Queue создает канал, который блокирует, если труба уже заполнена. Разумеется, запись больше, чем пропускная способность канала, вызовет вызов write до тех пор, пока на считывающем конце не будет достаточно данных. Итак, если труба блокируется при достижении ее пропускной способности, почему q.put() не блокирует и после того, как канал заполнен? Даже первый вызов q.put() в примере должен заполнить трубу, и все должно блокироваться там, нет?

Нет, он не блокируется, потому что реализация multiprocessing.Queue отделяет метод .put() от записи в канал. Метод .put() захватывает данные, переданные ему во внутреннем буфере, и есть отдельный поток, который заряжается чтением из этого буфера и записывается в трубу. Этот поток будет блокироваться, когда канал будет заполнен, но он не будет препятствовать .put() чтобы .put() задерживал большее количество данных во внутреннем буфере.

Реализация .put () сохраняет данные в self._buffer и замечает, как он self._buffer поток, если его еще нет:

 def put(self, obj, block=True, timeout=None): assert not self._closed if not self._sem.acquire(block, timeout): raise Full with self._notempty: if self._thread is None: self._start_thread() self._buffer.append(obj) self._notempty.notify() 

Метод ._feed() – это то, что читается из self._buffer и self._buffer данные в канал. И ._start_thread() – это то, что устанавливает поток, который запускает ._feed() .

Как я могу ограничить размер очереди?

Если вы хотите ограничить, сколько данных может быть записано в очередь, я не вижу способа сделать это, указав количество байтов, но вы можете ограничить количество элементов, которые хранятся во внутреннем буфере в любой момент времени передавая число для multiprocessing.Queue :

 q = multiprocessing.Queue(2) 

Когда я использую указанный выше параметр и использую ваш код, q.put() будет выставлять в очередь два элемента и будет блокировать третью попытку.

Связываются ли Python сообщения, взаимодействующие с другими процессами, отличными от Python?

Это зависит. Средства, предоставляемые модулем multiprocessing , нелегко взаимодействуют с другими языками. Я ожидаю, что можно будет сделать multiprocessing взаимодействие с другими языками, но достижение этой цели было бы крупным предприятием. Модуль написан с ожиданием, что задействованные процессы запускают код Python.

Если вы посмотрите на более общие методы, тогда ответ будет да. Вы можете использовать сокет как коммуникационный канал между двумя разными процессами. Например, процесс JavaScript, который читается из именованного сокета:

 var net = require("net"); var fs = require("fs"); sockPath = "/tmp/test.sock" try { fs.unlinkSync(sockPath); } catch (ex) { // Don't care if the path does not exist, but rethrow if we get // another error. if (ex.code !== "ENOENT") { throw ex; } } var server = net.createServer(function(stream) { stream.on("data", function(c) { console.log("received:", c.toString()); }); stream.on("end", function() { server.close(); }); }); server.listen(sockPath); 

И процесс Python, который пишет ему:

 import socket import time sockfile = "/tmp/test.sock" conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) conn.connect(sockfile) count = 0 while True: count += 1 conn.sendall(bytes(str(count), "utf-8")) time.sleep(1) 

Если вы хотите попробовать выше, сначала нужно запустить сторону JavaScript, чтобы на стороне Python было что-то писать. Это доказательство концепции. Для полного решения потребуется больше пользы.

Чтобы передать сложные структуры из Python на другие языки, вам нужно будет найти способ сериализации данных в формате, который можно прочитать с обеих сторон. Соленые огурцы, к сожалению, специфичны для Python. Обычно я выбираю JSON всякий раз, когда мне нужно сериализовать между языками или использовать специальный формат, если JSON этого не сделает.

  • Возможно ли перемещать / объединять сообщения между очередями RabbitMQ?
  • Как создать задержанную очередь в RabbitMQ?
  • Настройка задачи Python asyncio
  • Python - лучший язык программирования в мире.