API доступа к данным через таблицы Data Lake без сложностей

Удобный доступ к данным через таблицы Data Lake с помощью API

Создание надежного службы API GraphQL поверх файлов озера данных S3 с DuckDB и Go

Фото Джошуа Сортино на Unsplash

1. Введение

Таблицы озер данных в основном используются командами инженерии данных с использованием вычислительных систем для обработки больших объемов данных, таких как Spark или Flink, а также аналитиками и учеными-исследователями, создающими модели и отчеты с помощью мощных SQL-движков запросов, таких как Trino или Redshift. Эти вычислительные системы стали стандартом для доступа к данным в озерах данных, потому что они были разработаны для эффективной обработки типовых задач по работе с большими данными: сканирование больших объемов данных, работу с облачным объектным хранилищем, чтение и запись файлов с оптимизированными запросами форматирования, таких как Parquet или ORC, и т. д.

Однако также обычно требуется наличие API для доступа к большим данным (или некоему агрегированному представлению), таким как внутренние микросервисы. Предположим, что у нас есть таблица озера данных, в которой хранятся данные о реальных временных статистиках наших клиентов, сгенерированные неким приложением Spark. Эти данные могут использоваться в основном для внутренней отчетности, но могут быть также ценными для других сервисов в нашей организации. Несмотря на то, что это обычное требование, оно далеко от простого, в основном потому, что для его выполнения требуется другой набор инструментов. Создание файлов Parquet в корзине S3, доступных для HTTP-базирующегося на API с низкой латентностью, не так просто (особенно когда файлы непрерывно обновляются и требуется выполнить некие преобразования, прежде чем сделать их доступными).

Для работы с такими сценариями мы обычно будем нуждаться в базе данных, способной обрабатывать запросы с низкой латентностью для клиентов. Также потребуются некоторые задания ETL, которые смогут обрабатывать и преобразовывать файлы данных в S3 и загружать их в базу данных. Наконец, нам придется создать правильную конечную точку API для обслуживания запросов клиента.

Действительно, как показывает иллюстрация, предоставление возможности тонким клиентам выполнять запросы файлов озера данных с высокой скоростью обычно происходит за счет добавления большего количества составляющих и процессов в нашу конвейерную линию, чтобы либо копировать и загружать данные в более дорогие склады, обслуживающие клиента, или агрегировать и преобразовывать их в соответствии с низколатентными базами данных.

Целью этой статьи является исследование и демонстрация другого и более простого подхода к решению этого требования с помощью легких внутренних утилит для запросов. В частности, я покажу, как мы можем использовать внутренние утилиты, такие как DuckDB и Arrow Data Fusion, чтобы создавать сервисы, которые могут обрабатывать файлы и объемы озера данных, а также действовать как быстрое хранилище в памяти, обслуживающее вызовы API с низкой латентностью. Используя этот подход, мы можем эффективно объединить необходимую функциональность в одну службу запросов, которую можно горизонтально масштабировать, которая будет загружать данные, агрегировать и хранить их в памяти, а также эффективно и быстро обслуживать вызовы API.

В следующем разделе № 2 будут изложены основные требования и строительные блоки для этой службы, и будет объяснено, как они помогают решить основные проблемы, с которыми она сталкивается. Раздел № 3 будет углубляться в основу службы – загрузку данных и функциональность запросов, и показывать, как это может быть реализовано с помощью DuckDB и Go. (Эта статья сосредоточена на DuckDB и Go, но вы можете найти реализацию этой концепции на Rust с использованием Arrow Data Fusion в связанном ниже репозитории). Раздел № 4 добавит слой обслуживания GraphQL API сверху. Раздел № 5 будет заключительным.

2. Главные строительные блоки

Следует отметить, что основное предположение в данном подходе состоит в том, что данные, которые мы хотим сделать доступными для нашего API, могут поместиться в память нашей службы или в машине, на которой она работает. Для некоторых случаев использования это может быть ограничение, вызывающее определенные проблемы, но я считаю, что оно менее ограничивающее, чем кажется. Например, я использовал этот подход с памятью на основе реляционной таблицы, состоящей из 2 миллионов записей и 10 столбцов с использованием памяти около 350 МБ. Мы часто забываем, что данные, которые мы фактически обслуживаем, часто намного меньше, чем данные, которые мы храним или обрабатываем. В любом случае, это важное обстоятельство, которое следует иметь в виду.

Независимый сервис, который будет работать как более простая и привлекательная альтернатива общей вышеописанной архитектуре, должен отвечать, по меньшей мере, следующим требованиям:

  • Он должен уметь удобно читать и преобразовывать файлы данных непосредственно из нашего «озера данных» или объектного хранилища.
  • Он должен быть способен сохранять реляционные данные в памяти и отвечать на запросы с низкой задержкой.
  • Он должен иметь горизонтальную масштабируемость
  • Он должен быть простым и декларативным для запросов, преобразования и загрузки данных – SQL будет наиболее удобным подходом.

Проще говоря, вместо расширения инфраструктуры нашего приложения с помощью базы данных и дополнительного процесса ETL, идеально было бы создать сервис, который может загружать и преобразовывать данные непосредственно из источника, хранить их эффективно и предоставлять возможность быстрого запроса.

На мой взгляд, сочетание этих трех функций является одним из величайших преимуществ DuckDB (на котором сосредоточена эта статья) и Arrow Data Fusion. Правда, базы данных в памяти – это не новинка, но переломным моментом для DuckDB и Arrow Data Fusion является их расширяемость, которая позволяет использовать расширения, которые легко добавляют возможности для прямого чтения и записи данных в разных форматах и местоположениях, а также делать это масштабируемо и быстро.

Следовательно, наш сервис будет состоять из 3 основных компонентов или слоев, которые будут оборачивать друг друга: компонента низкого уровня, которая будет инкапсулировать соединение с DuckDB (которое я буду называть DataDriver), компонента DAO, которая будет использовать драйвер для выполнения запросов и обработки запросов API, и API-резолвер, который будет обслуживать его.

Другими словами, в терминах зависимостей и их отношений у нас есть следующая структура:

API-Resolver инкапсулирует структуру DAO, которая инкапсулирует структуру DataDriver, которая инкапсулирует соединение DuckDB

Следующая секция начнет с фокусировки на более низких слоях (структура DAO и DataDriver), а в следующей секции будет обсуждаться верхний уровень API и то, как он объединяет все это вместе.

3. Загрузка данных и запросы с использованием DuckDB

В этом разделе мы собираемся создать драйвер, который обертывает соединение с DuckDB. Он будет отвечать за инициализацию DuckDB и предоставлять интерфейс для выполнения SQL-выражений и запросов. Мы будем использовать отличную библиотеку go-duckdb, которая предоставляет интерфейс database/sql для DuckDB, статически связываясь с его библиотекой C.

Инициализация подключения sql.DB к DuckDB

Как уже упоминалось, мы обернем интерфейс sql.DB, предоставленный библиотекой go-duckdb, с помощью структуры с именем DuckDBDriver, которая будет отвечать за правильную инициализацию. Мы инициализируем DuckDB, выполняя несколько инициализационных выражений (bootQueries) с использованием объекта Connector. Коннектор выполняет выражения, устанавливающие учетные данные AWS (так как мы хотим загружать данные из S3), а также загрузку и установку расширений, которые требуются нашему сервису: расширение parquet (для чтения файлов parquet) и httpfs (для прямого чтения из объектного хранилища, такого как S3 через HTTP).

Как показано в приведенном выше блоке кода, функция getBootQueries() просто возвращает набор инициализирующих выражений в виде строк (вы можете увидеть эти выражения здесь). Инициализационные выражения выполняются коннектором, чтобы при вызове OpenDB() мы получали DuckDB в качестве соединения sql.DB, загруженного с необходимыми расширениями и секретами. Поскольку go-duckdb предоставляет интерфейс database/sql для соединения с DuckDB, основная функциональность запросов может быть реализована и предоставлена достаточно просто:

Как уже упоминалось, структура драйвера данных DuckDB будет просто действовать как утилитарный класс, обертывающий соединение с базой данных DuckDB, тогда как все выполнения запросов будут эффективно управляться структурой DAO, в которой будут находиться функции логики бизнеса, использующие методы драйвера. Структура DAO в свою очередь будет обертывать структуру DuckDBDriver.

Загрузка кешированных данных

На последнем этапе инициализации бэкэнда данных сервиса мы загружаем данные, которые хотим предоставить, из файлов parquet в S3 в таблицу памяти. Для этого мы будем использовать функцию execute() нашего драйвера с запросом CTAS, который создаст именованную таблицу, используя любые преобразования, которые мы можем выразить на SQL, из функции read_parquet(). Пример прояснит это.

Предположим, у нас есть таблица parquet, состоящая из данных, описывающих наших пользователей. Мы хотим создать сервис, который будет предоставлять только 3 поля из этой таблицы для быстрого доступа к API: имя, фамилия и возраст. Мы также хотим убедиться, что поле возраст будет доступно как целое число, хотя сохранено в файле parquet как строка.

Для этого, после инициализации нашего драйвера DuckDB с необходимыми расширениями и установки необходимых учетных данных AWS, мы просто выполняем SQL-запрос, который выбирает нужные нам данные, непосредственно из файлов parquet в S3, в память, используя функцию read_parquet().

CREATE TABLE Users AS SELECT NAME, LAST_NAME, CAST(AGE as integer)FROM read_parquet('s3://гдето/данные/*')

В этом выражении мы создаем таблицу в памяти с именем Users, которая состоит из выбранных нами полей из файлов parquet в указанной в функции read_parquet() локации. Мы можем использовать любую SQL-функцию и синтаксис, поддерживаемый DuckDB, включая составные запросы и агрегацию. Вот более полный пример того, как мы используем этот подход для инициализации нашего сервиса

На самом деле это ядро этого сервиса — при создании и запуске сервиса он выполняет запрос, который позволяет загружать данные из S3 в память.

После инициализации сервиса и загрузки данных мы можем выполнять любой SQL-запрос, который нам нужен, непосредственно в памяти таблицы, представленной нашим драйвером данных, чтобы обслуживать наше API с задержкой менее одной секунды.

4. Обслуживание GraphQL

Теперь, когда у нас есть соединение с таблицей в памяти, загруженной данными из кэшированных файлов parquet, последний шаг — создать конечную точку GraphQL, чтобы эффективно отвечать на запросы данных. Для этого мы будем использовать библиотеку gqlgen от 99designs, что делает эту задачу довольно простой.

Глубокое погружение в gqlgen выходит за рамки данной статьи. Тем, кто мало знаком с GraphQL, рекомендуется посмотреть документацию, которая очень понятно представлена и объяснена. Однако я считаю, что некоторое знакомство с концепциями GraphQL достаточно, чтобы понять этот раздел и получить общую идею.

Создание конечной точки GraphQL с использованием gqlgen обычно включает 3 основных шага: (1) создание схемы, (2) генерацию кода резолвера и заглушек, и (3) добавление кода резолвера, который реализует функции API.

Мы начнем с этой схемы, описывающей пользователя в нашей таблице и 2 основные функции для получения данных о пользователях: определенного-getAll и по email.

scalar Timetype User {  name: String!  last_name: String!  email: String!  age: Int!}type Query {  users: [User!]!  getUsersByEmail(email: String!): [User!]!}

После создания схемы мы вызываем процедуру генерации кода gqlgen в нашем проектном каталоге:

go run github.com/99designs/gqlgen generate

Выполнение процедуры генерации сгенерирует много кода, включая фактическую структуру User (структура нашей модели данных), соответствующий шаблон декларации резолвера и его реализацию. Рассмотрим их по порядку.

Структура резолвера была сгенерирована в файле с именем resolver.go с двумя выражениями — объявление типа структуры без свойств или членов и конструктором (метод new()), который его инициализирует. Как мы скоро увидим, резолвер — это слой по обслуживанию нашего API, который реализует функцию для каждого метода API. Цель файла resolver.go заключается в возможности внедрения всех необходимых зависимостей в резолвер или добавления всего, что нам нужно для обеспечения выполнения запросов для нашего API. Напомним, что именно это цель нашей структуры DAO. Наша структура DAO оборачивает драйвер данных DuckDB, который содержит соединение с нашими таблицами в памяти и отвечает за “преобразование” запросов API для данных в SQL-запросы. Поэтому мы просто внедряем инициализированный объект DAO в резолвер, чтобы резолвер мог использовать его для выполнения запросов.

// resolver.gotype Resolver struct { dao *data.DAO // добавляем ссылку на наш DAO}func NewResolver(dao *data.DAO) *Resolver { return &Resolver{dao: dao} //инициализируем наш DAO}

Следующий файл, который был сгенерирован (и будет сгенерирован каждый раз при выполнении процедуры gqlgen generate), является schema.resolvers.go, который представляет собой реализацию методов резолвера. Сгенерированный файл schema.resolvers в основном содержит сигнатуры методов функций API, объявленных в схеме. В нашем случае он будет состоять из 2 методов.

// schema.resolvers.gofunc (r *queryResolver) GetUsersByEmail(ctx context.Context, email string) ([]*model.User, error) {}func (s *DAO) GetUsers() ([]*model.User, error) {}

Для реализации этих функций нам сначала нужно иметь соответствующий метод в нашей структуре DAO, но давайте сначала реализуем один для примера, а затем завершим необходимый код DAO.

// schema.resolvers.gofunc (r *queryResolver) Users(ctx context.Context) ([]*model.User, error) { res, err := r.dao.GetUsers() if err != nil {  log.Printf("Ошибка при получении пользователей: %v", err)  return nil, err } return res, nil}

Как вы можете видеть, потому что наш DAO был внедрен в структуру резолвера, мы можем просто вызывать его функции, используя наш резолвер. Эта структура делает код слоя API очень чистым и простым.

Теперь давайте напишем фактическую реализацию необходимой функции в структуре DAO. Как вы можете видеть ниже, требуемый код довольно прост. Хотя я использую некоторые вспомогательные функции (которые вы можете увидеть в сопровождающем репозитории GitHub), функция GetUsers() просто выполняет SQL-запрос к нашей таблице DuckDB в памяти и создает список пользователей (не забывайте о том, что структура model.User была сгенерирована gqlgen, используя нашу схему).

//dao.gofunc (s *DAO) GetUsers() ([]*model.User, error) {//QryAllUsers := "select * from users" rows, err := s.driver.Query(QryAllUsers) if err != nil {  return nil, err } defer rows.Close() resultset, err := sqlhelper.ResultSetFromRows(rows) if err != nil {  return nil, err } users := make([]*model.User, 0) for _, row := range resultset {  user := newUserFromRow(row) // заполняем структуру пользователя  users = append(users, user) } return users, nil}

Теперь у нас по сути есть все необходимые слои, которые нам нужно объединить. Это структура драйвера данных (которая инкапсулирует соединение с нашим DuckDB в памяти), которая добавляется в структуру DAO, которая реализует и служит интерфейсом для всех необходимых функций API, которые вызываются резолвером – нашим обработчиком API. Связь между компонентами и ролью очень четко выражена тем, как они связаны вместе в файле server.go, который запускает наш сервис и его зависимости.

// server.godataDriver := data.NewDuckDBDriver(awsCred)dataStore := data.NewStore(dataDriver) resolver := graph.NewResolver(dataStore)srv := handler.NewDefaultServer(graph.NewExecutableSchema(graph.Config{Resolvers: resolver}))http.Handle("/query", srv)

При инициализации нашего сервиса мы сначала инициализируем наш драйвер, который получает подключение к нашему хранилищу DuckDB в памяти. Затем мы инжектируем драйвер в метод NewStore, который создает DAO и использует драйвер для загрузки данных из файлов parquet в память. Наконец, мы инжектируем структуру DAO в обработчик API, который вызывает его функцию при обработке API-запросов.

5. Заключение

Цель данной статьи заключается в предложении альтернативного подхода для обеспечения доступа к таблицам озера данных через HTTP API для “тонких” клиентов. Этот случай использования становится все более распространенным и обычно требует добавления нескольких компонентов, мониторинга и ресурсов в нашу конвейерную работу. В этой статье я предложил более простую альтернативу, которая, как я считаю, может подходить для многих случаев использования. Я продемонстрировал, как мы можем использовать производительность запросов DuckDB и его расширения, чтобы включить загрузку данных из удаленного объектного хранилища, сохранить их в отношения в памяти и предоставить возможность выполнять запросы с под-секундной задержкой. Более общим образом, я постарался привести пример отличных возможностей расширений DuckDB, которые могут принести нашим сервисам, а также простоты его встраивания.

Надеюсь, это будет полезно!

  • Сопровождающий репозиторий GitHub с примером кода можно найти здесь
  • Репозиторий на GitHub с реализацией той же концепции, используя Rust и Arrow Data Fusion, можно найти здесь.

** Все изображения, если не указано иначе, принадлежат автору