OutOfMemoryError при использовании PySpark для чтения файлов в локальном режиме

У меня около десятка файлов, зашифрованных gpg, содержащих данные, которые я бы хотел проанализировать с помощью PySpark. Моя стратегия – применить функцию дешифрования как плоскую карту к каждому файлу, а затем продолжить обработку на уровне записи:

def read_fun_generator(filename): with gpg_open(filename[0].split(':')[-1], 'r') as f: for line in f: yield line.strip() gpg_files = sc.wholeTextFiles(/path/to/files/*.gpg) rdd_from_gpg = gpg_files.flatMap(read_fun_generator).map(lambda x: x.split('|')) rdd_from_gpg.count() # <-- For example... 

Этот подход работает очень хорошо при использовании одного потока в локальном режиме, то есть установки ведущего на local[1] . Однако использование любого более чем одного потока вызывает OutOfMemoryError . Я пробовал увеличивать spark.executor.memory и spark.driver.memory до 30g , но это, похоже, не поможет. Я могу подтвердить в пользовательском интерфейсе, что эти настройки застряли. (Моя машина имеет более 200 ГБ.) Однако я заметил в журналах, что блок-менеджер, похоже, начинает работать только с 265,4 МБ памяти. Интересно, связано ли это?

Вот полная конфигурация, с которой я начинаю:

 conf = (SparkConf() .setMaster("local[*]") .setAppName("pyspark_local") .set("spark.executor.memory", "30g") .set("spark.driver.memory", "30g") .set("spark.python.worker.memory", "5g") ) sc = SparkContext(conf=conf) 

Это трассировка стека из моих журналов:

 15/06/10 11:03:30 INFO SparkContext: Running Spark version 1.3.1 15/06/10 11:03:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/10 11:03:31 INFO SecurityManager: Changing view acls to: santon 15/06/10 11:03:31 INFO SecurityManager: Changing modify acls to: santon 15/06/10 11:03:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(santon); users with modify permissions: Set(santon) 15/06/10 11:03:31 INFO Slf4jLogger: Slf4jLogger started 15/06/10 11:03:31 INFO Remoting: Starting remoting 15/06/10 11:03:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:44347] 15/06/10 11:03:32 INFO Utils: Successfully started service 'sparkDriver' on port 44347. 15/06/10 11:03:32 INFO SparkEnv: Registering MapOutputTracker 15/06/10 11:03:32 INFO SparkEnv: Registering BlockManagerMaster 15/06/10 11:03:32 INFO DiskBlockManager: Created local directory at /tmp/spark-24dc8f0a-a89a-44f8-bb95-cd5514e5bf0c/blockmgr-85b6f082-ff5a-4a0e-b48a-1ec62715dda0 15/06/10 11:03:32 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/06/10 11:03:32 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7b2172ed-d658-4e11-bbc1-600697f3255e/httpd-5423f8bc-ec43-48c5-9367-87214dad54f4 15/06/10 11:03:32 INFO HttpServer: Starting HTTP Server 15/06/10 11:03:32 INFO Server: jetty-8.yz-SNAPSHOT 15/06/10 11:03:32 INFO AbstractConnector: Started SocketConnector@0.0.0.0:50366 15/06/10 11:03:32 INFO Utils: Successfully started service 'HTTP file server' on port 50366. 15/06/10 11:03:32 INFO SparkEnv: Registering OutputCommitCoordinator 15/06/10 11:03:32 INFO Server: jetty-8.yz-SNAPSHOT 15/06/10 11:03:32 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/06/10 11:03:32 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/06/10 11:03:32 INFO SparkUI: Started SparkUI at localhost:4040 15/06/10 11:03:32 INFO Executor: Starting executor ID <driver> on host localhost 15/06/10 11:03:32 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@localhost:44347/user/HeartbeatReceiver 15/06/10 11:03:33 INFO NettyBlockTransferService: Server created on 46730 15/06/10 11:03:33 INFO BlockManagerMaster: Trying to register BlockManager 15/06/10 11:03:33 INFO BlockManagerMasterActor: Registering block manager localhost:46730 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 46730) 15/06/10 11:03:33 INFO BlockManagerMaster: Registered BlockManager 15/06/10 11:05:19 INFO MemoryStore: ensureFreeSpace(215726) called with curMem=0, maxMem=278302556 15/06/10 11:05:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 210.7 KB, free 265.2 MB) 15/06/10 11:05:19 INFO MemoryStore: ensureFreeSpace(31533) called with curMem=215726, maxMem=278302556 15/06/10 11:05:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 30.8 KB, free 265.2 MB) 15/06/10 11:05:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:46730 (size: 30.8 KB, free: 265.4 MB) 15/06/10 11:05:19 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/06/10 11:05:19 INFO SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2 15/06/10 11:05:22 INFO FileInputFormat: Total input paths to process : 16 15/06/10 11:05:22 INFO FileInputFormat: Total input paths to process : 16 15/06/10 11:05:22 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 71665121 15/06/10 11:05:22 INFO SparkContext: Starting job: count at <timed exec>:2 15/06/10 11:05:22 INFO DAGScheduler: Got job 0 (count at <timed exec>:2) with 2 output partitions (allowLocal=false) 15/06/10 11:05:22 INFO DAGScheduler: Final stage: Stage 0(count at <timed exec>:2) 15/06/10 11:05:22 INFO DAGScheduler: Parents of final stage: List() 15/06/10 11:05:22 INFO DAGScheduler: Missing parents: List() 15/06/10 11:05:22 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[1] at count at <timed exec>:2), which has no missing parents 15/06/10 11:05:23 INFO MemoryStore: ensureFreeSpace(6264) called with curMem=247259, maxMem=278302556 15/06/10 11:05:23 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.1 KB, free 265.2 MB) 15/06/10 11:05:23 INFO MemoryStore: ensureFreeSpace(4589) called with curMem=253523, maxMem=278302556 15/06/10 11:05:23 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.5 KB, free 265.2 MB) 15/06/10 11:05:23 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:46730 (size: 4.5 KB, free: 265.4 MB) 15/06/10 11:05:23 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/06/10 11:05:23 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/06/10 11:05:23 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[1] at count at <timed exec>:2) 15/06/10 11:05:23 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 15/06/10 11:05:23 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1903 bytes) 15/06/10 11:05:23 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 3085 bytes) 15/06/10 11:05:23 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/06/10 11:05:23 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 15/06/10 11:05:26 INFO WholeTextFileRDD: Input split: Paths:[gpg_files] 15/06/10 11:05:40 ERROR Utils: Uncaught exception in thread stdout writer for /anaconda/python/bin/python java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.<init>(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:405) at org.apache.hadoop.io.Text.decode(Text.java:382) at org.apache.hadoop.io.Text.toString(Text.java:280) at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:86) at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:421) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205) Exception in thread "stdout writer for /anaconda/python/bin/python" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.<init>(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:405) at org.apache.hadoop.io.Text.decode(Text.java:382) at org.apache.hadoop.io.Text.toString(Text.java:280) at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:86) at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:421) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205) 15/06/10 11:05:47 INFO PythonRDD: Times: total = 24140, boot = 2860, init = 664, finish = 20616 15/06/10 11:05:47 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1873 bytes result sent to driver 15/06/10 11:05:47 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 24251 ms on localhost (1/2) 

Кто-нибудь сталкивается с этой проблемой? Есть ли параметр, о котором я не знаю, что я должен изменить? Кажется, это должно быть возможно …

  • Pandas-образное преобразование сгруппированных данных на pyspark DataFrame
  • Загрузить CSV-файл с помощью Spark
  • Вычислить режим колонки PySpark DataFrame?
  • Работает ли искривление предиката с JDBC?
  • Взорвать в PySpark
  • Выбор значений из непустых столбцов в элементе данных PySpark DataFrame
  • Как мы можем присоединиться к двум фреймворкам Spark SQL с использованием критерия «LIKE» SQL-esque?
  • Spark ALS predAll возвращает пустое
  • One Solution collect form web for “OutOfMemoryError при использовании PySpark для чтения файлов в локальном режиме”

    Вещь с sc.wholeTextFiles (/ path / to / files / *. Gpg) – для возврата PairRDD, ключ – имя файла и значение – это содержимое файла.

    Похоже, вы не используете часть содержимого файла, но все же сказали Спарку прочитать файлы с диска и отправить их рабочим.

    Если ваша цель состоит в том, чтобы обрабатывать только список имен файлов и их содержимое для чтения с помощью gpg_open, вы можете сделать это:

     def read_fun_generator(filename): with gpg_open(filename.split(':')[-1], 'r') as f: for line in f: yield line.strip() gpg_filelist = glob.glob("/path/to/files/*.gpg") # generate RDD with file name per record gpg_files = sc.parallelize(gpg_filelist) rdd_from_gpg = gpg_files.flatMap(read_fun_generator).map(lambda x: x.split('|')) rdd_from_gpg.count() # <-- For example... 

    Это уменьшит объем памяти, используемой JVM Spark.

    Python - лучший язык программирования в мире.