Как вытащить X количество предыдущих данных в строку в CSV

У меня очень большой CSV-данных, и мне нужно добавить предыдущие данные в каждую строку для каждого имени в столбце 2 для дат, предшествующих текущему, указанному в столбце2. Я думаю, что самый простой способ представить эту проблему – предоставить подробный пример, похожий на мои реальные данные, но значительно сократился:

Datatitle,Date,Name,Score,Parameter data,01/09/13,george,219,dataa,text data,01/09/13,fred,219,datab,text data,01/09/13,tom,219,datac,text data,02/09/13,george,229,datad,text data,02/09/13,fred,239,datae,text data,02/09/13,tom,219,dataf,text data,03/09/13,george,209,datag,text data,03/09/13,fred,217,datah,text data,03/09/13,tom,213,datai,text data,04/09/13,george,219,dataj,text data,04/09/13,fred,212,datak,text data,04/09/13,tom,222,datal,text data,05/09/13,george,319,datam,text data,05/09/13,fred,225,datan,text data,05/09/13,tom,220,datao,text data,06/09/13,george,202,datap,text data,06/09/13,fred,226,dataq,text data,06/09/13,tom,223,datar,text data,06/09/13,george,219,dataae,text 

Итак, для трех первых строк этого csv нет предыдущих данных. Поэтому, если бы мы сказали, что хотим вывести столбец 3 и 4 для последних трех событий джорджа (row1) в дату, предшествующую текущей, это было бы:

 data,01/09/13,george,219,dataa,text,x,y,x,y,x,y 

Однако, когда предыдущие данные начинают становиться avaialble, мы надеемся создать csv, например:

 Datatitle,Date,Name,Score,Parameter,LTscore,LTParameter,LTscore+1,LTParameter+1,LTscore+2,LTParameter+3, data,01/09/13,george,219,dataa,text,x,y,x,y,x,y data,01/09/13,fred,219,datab,text,x,y,x,y,x,y data,01/09/13,tom,219,datac,text,x,y,x,y,x,y data,02/09/13,george,229,datad,text,219,dataa,x,y,x,y data,02/09/13,fred,239,datae,text,219,datab,x,y,x,y data,02/09/13,tom,219,dataf,text,219,datac,x,y,x,y data,03/09/13,george,209,datag,text,229,datad,219,dataa,x,y data,03/09/13,fred,217,datah,text,239,datae,219,datab,x,y data,03/09/13,tom,213,datai,text,219,dataf,219,datac,x,y data,04/09/13,george,219,dataj,text,209,datag,229,datad,219,dataa data,04/09/13,fred,212,datak,text,217,datah,239,datae,219,datab data,04/09/13,tom,222,datal,text,213,datai,219,dataf,219,datac data,05/09/13,george,319,datam,text,219,dataj,209,datag,229,datad data,05/09/13,fred,225,datan,text,212,datak,217,datah,239,datae data,05/09/13,tom,220,datao,text,222,datal,213,datai,219,dataf data,06/09/13,george,202,datap,text,319,datam,219,dataj,209,datag data,06/09/13,fred,226,dataq,text,225,datan,212,datak,217,datah data,06/09/13,tom,223,datar,text,220,datao,222,datal,213,datai data,06/09/13,george,219,datas,text,319,datam,219,dataj,209,datag 

Вы заметите, что для 06/09/13 джордж происходит дважды, и оба раза он имеет ту же строку 319,datam,219,dataj,209,datag добавленную к его строке. Во второй раз, когда появляется Джордж, он добавляет эту же строку, потому что джордж 3 строки выше в одну дату. (Это просто подчеркивает «на дату, предшествующую текущей».

Как видно из заголовков столбцов, мы собираем последние 3 оценки и связанные 3 параметра и добавляем их в каждую строку. Обратите внимание, что это очень упрощенный пример. В действительности каждая дата будет содержать пару тысяч строк, в реальных данных также нет шаблона имен, поэтому мы не ожидаем увидеть fred, tom, george рядом друг с другом на повторяющемся шаблоне. Если кто-нибудь может помочь мне разобраться, как лучше всего достичь этого (наиболее эффективного), я был бы очень благодарен. Если что-то неясно, пожалуйста, дайте мне знать, я добавлю больше деталей. Любые конструктивные комментарии оценены. Благодарю SMNALLY

Кажется, ваш файл находится в порядке даты. Если мы берем последнюю запись за каждое имя за дату и добавляем ее к размеру deque для каждого имени при написании каждой строки, это должно сделать трюк:

 import csv from collections import deque, defaultdict from itertools import chain, islice, groupby from operator import itemgetter # defaultdict whose first access of a key will create a deque of size 3 # defaulting to [['x', 'y'], ['x', 'y'], ['x' ,'y']] # Since deques are efficient at head/tail manipulation, then an insert to # the start is efficient, and when the size is fixed it will cause extra # elements to "fall off" the end... names_previous = defaultdict(lambda: deque([['x', 'y']] * 3, 3)) with open('sample.csv', 'rb') as fin, open('sample_new.csv', 'wb') as fout: csvin = csv.reader(fin) csvout = csv.writer(fout) # Use groupby to detect changes in the date column. Since the data is always # asending, the items within the same data are contigious in the data. We use # this to identify the rows within the *same* date. # date=date we're looking at, rows=an iterable of rows that are in that date... for date, rows in groupby(islice(csvin, 1, None), itemgetter(1)): # After we've processed entries in this date, we need to know what items of data should # be considered for the names we've seen inside this date. Currently the data # is taken from the last occurring row for the name. to_add = {} for row in rows: # Output the row present in the file with a *flattened* version of the extra data # (previous items) that we wish to apply. eg: # [['x, 'y'], ['x', 'y'], ['x', 'y']] becomes ['x', 'y', 'x', 'y', 'x', y'] # So we're easily able to store 3 pairs of data, but flatten it into one long # list of 6 items... # If the name (row[2]) doesn't exist yet, then by trying to do this, defaultdict # will automatically create the default key as above. csvout.writerow(row + list(chain.from_iterable(names_previous[row[2]]))) # Here, we store for the name any additional data that should be included for the name # on the next date group. In this instance we store the information seen for the last # occurrence of that name in this date. eg: If we've seen it more than once, then # we only include data from the last occurrence. # NB: If you wanted to include more than one item of data for the name, then you could # utilise a deque again by building it within this date group to_add[row[2]] = row[3:5] for key, val in to_add.iteritems(): # We've finished the date, so before processing the next one, update the previous data # for the names. In this case, we push a single item of data to the front of the deck. # If, we were storing multiple items in the data loop, then we could .extendleft() instead # to insert > 1 set of data from above. names_previous[key].appendleft(val) 

Это сохраняет только имена и последние 3 значения в памяти во время прогона.

Может пожелать настроить, чтобы включить правильные / писать новые заголовки вместо того, чтобы просто пропускать их на входе.

Мои два цента:
– Python 2.7.5
– Я использовал defaultdict для хранения предыдущих строк для каждого имени .
– Я использовал ограниченную длину, чтобы удерживать предыдущие строки, потому что мне понравилось поведение fullo. Мне было легко подумать об этом – просто продолжайте втискивать в него вещи.
– Я использовал operator.itemgetter () для индексирования и нарезки, потому что он просто читается лучше.

 from collections import deque, defaultdict import csv from functools import partial from operator import itemgetter # use a 3 item deque to hold the # previous three rows for each name deck3 = partial(deque, maxlen = 3) data = defaultdict(deck3) name = itemgetter(2) date = itemgetter(1) sixplus = itemgetter(slice(6,None)) fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter', 'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1', 'LTscore+2', 'LTParameter+3'] with open('data.txt') as infile, open('processed.txt', 'wb') as outfile: reader = csv.reader(infile) writer = csv.writer(outfile) writer.writerow(fields) # comment out the next line if the data file does not have a header row reader.next() for row in reader: default = deque(['x', 'y', 'x', 'y', 'x', 'y'], maxlen = 6) try: previous_row = data[name(row)][-1] previous_date = date(previous_row) except IndexError: previous_date = None if previous_date == date(row): # use the xtra stuff from last time row.extend(sixplus(previous_row)) # discard the previous row because # there is a new row with the same date data[name(row)].pop() else: # add columns 3 and 4 from each previous row for deck in data[name(row)]: # adding new items to a full deque causes # items to drop off the other end default.appendleft(deck[4]) default.appendleft(deck[3]) row.extend(default) writer.writerow(row) data[name(row)].append(row) по from collections import deque, defaultdict import csv from functools import partial from operator import itemgetter # use a 3 item deque to hold the # previous three rows for each name deck3 = partial(deque, maxlen = 3) data = defaultdict(deck3) name = itemgetter(2) date = itemgetter(1) sixplus = itemgetter(slice(6,None)) fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter', 'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1', 'LTscore+2', 'LTParameter+3'] with open('data.txt') as infile, open('processed.txt', 'wb') as outfile: reader = csv.reader(infile) writer = csv.writer(outfile) writer.writerow(fields) # comment out the next line if the data file does not have a header row reader.next() for row in reader: default = deque(['x', 'y', 'x', 'y', 'x', 'y'], maxlen = 6) try: previous_row = data[name(row)][-1] previous_date = date(previous_row) except IndexError: previous_date = None if previous_date == date(row): # use the xtra stuff from last time row.extend(sixplus(previous_row)) # discard the previous row because # there is a new row with the same date data[name(row)].pop() else: # add columns 3 and 4 from each previous row for deck in data[name(row)]: # adding new items to a full deque causes # items to drop off the other end default.appendleft(deck[4]) default.appendleft(deck[3]) row.extend(default) writer.writerow(row) data[name(row)].append(row) 

Подумав об этом решении немного над стеклом порта, я понял, что это слишком сложно – это имеет тенденцию происходить, когда я стараюсь быть фантазией. Не совсем уверен в протоколе, поэтому я оставлю его – у него есть возможное преимущество в сохранении предыдущих трех строк для каждого имени.

Вот решение, использующее срезы и обычный словарь. Он сохраняет только ранее обработанную строку. Гораздо проще. Я сохранил товар, снова для удобства чтения.

 import csv from operator import itemgetter fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter', 'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1', 'LTscore+2', 'LTParameter+3'] name = itemgetter(2) date = itemgetter(1) cols_sixplus = itemgetter(slice(6,None)) cols34 = itemgetter(slice(3, 5)) cols6_9 = itemgetter(slice(6, 10)) data_alt = {} with open('data.txt') as infile, open('processed_alt.txt', 'wb') as outfile: reader = csv.reader(infile) writer = csv.writer(outfile) writer.writerow(fields) # comment out the next line if the data file does not have a header row reader.next() for row in reader: try: previous_row = data_alt[name(row)] except KeyError: # first time this name encountered row.extend(['x', 'y', 'x', 'y', 'x', 'y']) data_alt[name(row)] = row writer.writerow(row) continue if date(previous_row) == date(row): # use the xtra stuff from last time row.extend(cols_sixplus(previous_row)) else: row.extend(cols34(previous_row)) row.extend(cols6_9(previous_row)) data_alt[name(row)] = row writer.writerow(row) 

Я обнаружил, что для подобных типов обработки, что накопление строк и запись их в кусках, а не отдельно, может значительно повысить производительность. Кроме того, если возможно, чтение всего файла данных сразу помогает.

Вот пример кода, который должен продемонстрировать то, что вы ищете, по образцам, предоставленным с вашим вопросом. Я назвал свой входной файл «input.csv» и прочитал / написал из рабочего каталога, «output.csv» отправляется в ту же папку. Я использовал комментарии в коде, чтобы попытаться объяснить, сохранить предыдущие записи в словаре с поиском по имени и сохранить список оценок для каждого – сохранить текущие записи даты в новом словаре буфера и добавить его в основной словарь каждый раз, когда дата изменения ввода. Дайте мне знать, если у вас есть какие-либо вопросы, код немного грубый – просто быстрый пример. Слайд [: 6] дает последние 6 элементов списка (три предыдущих пары пар очков / параметров) для текущего имени.

 import csv myInput = open('input.csv','rb') myOutput = open('output.csv','wb') myFields = ['Datatitle','Date','Name','Score','Parameter','Text', 'LTscore','LTParameter','LTscore+1','LTParameter+1', 'LTscore+2','LTParameter+2'] inCsv = csv.DictReader(myInput,myFields) outCsv = csv.writer(myOutput) outCsv.writerow(myFields) # Write header row previous_dict = dict() # store scores from previous dates new_dict = dict() # buffer for records on current-date only def add_new(): # merge new_dict into previous_dict global new_dict, previous_dict for k in new_dict: if not previous_dict.has_key(k): previous_dict[k] = list() # put new items first previous_dict[k] = new_dict[k] + previous_dict[k] new_dict = dict() # reset buffer old_date = '00/00/00' # start with bogus *oldest* date string inCsv.next() # skip header row for row in inCsv: myTitle = row['Datatitle'] myDate = row['Date'] myName = row['Name'] myScore = row['Score'] myParameter = row['Parameter'] myText = row['Text'] if old_date != myDate: add_new() # store new_dict buffer with previous data old_date = myDate if not new_dict.has_key(myName): new_dict[myName] = [] # put new scores first new_dict[myName] = [myScore,myParameter] + new_dict[myName] if not previous_dict.has_key(myName): previous_dict[myName] = [] outCsv.writerow([myTitle,myDate,myName,myScore,myParameter,myText] \ + previous_dict[myName][:6]) # end loop for each row myInput.close() myOutput.close() 

Мое решение должно преуспеть для больших наборов данных. Если потребление памяти вызывает беспокойство, длина списков имен может быть ограничена тремя оценками – в настоящее время я сохраняю все предыдущие баллы и просто показываю три, если вам нужно больше в будущем. Если размер данных громоздкий, вы всегда можете использовать базу данных файлов sqlite вместо dict для временного поиска данных на диске, а не всего в памяти. С 8G оперативной памяти и 2G данных, вы должны быть в порядке с in-memory python dictionary, как используется здесь. Убедитесь, что вы используете 64-разрядную версию Python в 64-разрядной ОС. Мой пример не выводит ничего на экран, но для большого файла вы можете поместить инструкцию печати, которая показывает прогресс, каждый N строк (например, каждые 100, 1000, выбирают на основе вашей скорости системы). Обратите внимание, что выход экрана замедляет скорость обработки файлов.

Вот один из подходов: точная реализация зависит от ваших данных, но это должно дать вам хорошую отправную точку.

Вы выполняете два прохода через входные данные CSV.

  1. На первом проходе через вход сканируйте строки и создайте словарь. Имя может использоваться как ключ, например {'Tom' : [(date1, values),(date2, values)], 'George' : [(date1, values), (date2,values)]} . Может оказаться, что работать с вложенными словарями проще, например {'Tom' : {date1: values, date2: values}, 'George' : {date1: values, date2: values}} . Подробнее о структуре данных ниже.

  2. На втором проходе через вход вы объединяете исходные входные данные и исторические данные из словаря для создания выходных данных.

Способ выбора исторических данных зависит от регулярности входных данных. Например, если даты сортируются в порядке возрастания, и вы внедрили словарь списков, это может быть так же просто, как взять срезы из соответствующего списка, например dataDict['Tom'][i-3:i] . Но поскольку вы упоминаете, что на одну дату может быть несколько записей, вам, вероятно, потребуется дополнительная работа. Некоторые возможности:

  • Учитывая подход словаря списков, поддерживайте значения как список, чтобы не было повторяющихся записей даты, например {'Tom' :(date1, [val1, val2, val3]),(date2, values)], 'George' : [(date1, values),(date2,values)]} .

  • При использовании словаря словарей, найдите нужные диапазоны дат. В этом случае вам, вероятно, придется проверять исключения KeyError, если только каждая дата не доступна последовательно. Вы также можете сохранить дополнительный, отсортированный индекс доступных дат.

Я все время отбирал это, потому что у меня такой же маленький проект. Я отправлю второй ответ с улучшением, используя сопрограммы. Процесс похож на мой другой ответ, но он быстрее (хотя я не знаю почему). Есть три сопрограммы – читатель, процессор и писатель. В приведенном ниже коде показаны некоторые сокращенные статистические данные профилировщика.

 """uses coroutines. 2 gig file, 1M lines, 2K characters/line: - read and send one line at a time - process and send one line - accumulate 720 lines before write Wed Nov 13 08:04:34 2013 fooprof 10947682 function calls (9946973 primitive calls) in 82.147 seconds Ordered by: standard name ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 82.147 82.147 <string>:1(<module>) 1 59.896 59.896 82.147 82.147 optimizations.py:45(reader) 1000710 8.864 0.000 21.703 0.000 optimizations.py:57(processor) 1000710 1.506 0.000 6.137 0.000 optimizations.py:94(writer) 1002098 0.185 0.000 0.185 0.000 {len} 1000708 0.209 0.000 0.209 0.000 {method 'append' of 'list' objects} 2/1 0.073 0.036 0.078 0.078 {method 'close' of 'generator' objects} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects} 1937129 0.295 0.000 0.295 0.000 {method 'extend' of 'list' objects} 1002097 3.115 0.000 3.115 0.000 {method 'join' of 'str' objects} 2001416/1000708 0.839 0.000 22.172 0.000 {method 'send' of 'generator' objects} 1000708 4.305 0.000 4.305 0.000 {method 'split' of 'str' objects} 1000708 0.823 0.000 0.823 0.000 {method 'strip' of 'str' objects} 1390 2.033 0.001 2.033 0.001 {method 'write' of 'file' objects} 1 0.004 0.004 0.004 0.004 {method 'writelines' of 'file' objects} 2 0.001 0.001 0.001 0.001 {open} Running a few in a row helps: Fri Nov 15 22:12:02 2013 fooprof 10947671 function calls (9946963 primitive calls) in 69.237 seconds Fri Nov 15 22:13:44 2013 fooprof 10947671 function calls (9946963 primitive calls) in 64.330 seconds using a dummy reader that sends the same line 1M times Wed Nov 13 13:36:57 2013 fooprof 10004374 function calls (9004373 primitive calls) in 23.013 seconds using dummy reader AND writer --> processor time Wed Nov 13 13:45:08 2013 fooprof 10001730 function calls (9001729 primitive calls) in 10.523 seconds using a dummy processor and writer --> mostly reader time Wed Nov 13 22:45:24 2013 fooprof 6005839 function calls (5005131 primitive calls) in 24.502 seconds using a dummy reader and processor --> writer time Wed Nov 13 22:52:12 2013 fooprof 6004374 function calls (5004373 primitive calls) in 24.326 seconds """ import csv from operator import itemgetter # data,01/09/13,george,219,dataa,text # data,01/09/13,george,219,dataa,text,x,y,x,y,x,y # just keep the previous row fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter', 'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1', 'LTscore+2', 'LTParameter+3'] def reader(processor, filename = 'data.txt'): processor.next() with open(filename) as f: #skip the header f.next() for line in f: processor.send(line) processor.close() return 'done' def processor(writer): """Process line and send to writer. line --> str, a complete row of data sends str """ date = itemgetter(1) name = itemgetter(2) cols_sixplus = itemgetter(slice(6,None)) cols34 = itemgetter(slice(3, 5)) cols6_9 = itemgetter(slice(6, 10)) data = {} writer.next() try: while True: line = yield row = line.strip().split(',') try: previous_row = data[name(row)] except KeyError as e: # first time this name encountered row.extend(['x', 'y', 'x', 'y', 'x', 'y']) data[name(row)] = row writer.send(','.join(row) + '\n' ) continue if date(previous_row) == date(row): # use the xtra stuff from last time row.extend(cols_sixplus(previous_row)) else: row.extend(cols34(previous_row)) row.extend(cols6_9(previous_row)) data[name(row)] = row writer.send(','.join(row) + '\n') except GeneratorExit: writer.close() def writer(filename = 'processed.txt', accum = 1000): with open(filename, 'wb') as f: f.write('Datatitle,Date,Name,Score,Parameter,LTscore,LTParameter,LTscore+1,LTParameter+1,LTscore+2,LTParameter+3\n') try: while True: # dataout = list() dataout = list() while len(dataout) < accum: dataout.append((yield)) f.write(''.join(dataout)) except GeneratorExit: f.writelines(dataout) if __name__ == '__main__': import cProfile, pstats cProfile.run("reader(processor(writer(accum = 720)), filename = 'biggerdata.txt')", 'fooprof') p = pstats.Stats('fooprof') p.strip_dirs().sort_stats(-1).print_stats() 

Если вы посмотрите на время профайлера, используя фиктивные функции (mocks?), Они не добавляют времени для всех трех реальных функций – я тоже этого не понимаю.

Я попытался использовать linecache в читателе, но он был медленнее. Я пробовал mmap в читателе, читая 200M куски, но он был медленнее – вероятно, потому, что я использовал re.finditer (), чтобы выделить строки. Я, вероятно, вернусь к читателю mmap для своих целей.