Бизнес блог #1
Выжимаю книги до самой сути.
👉 Реклама - @jaMasha
📇 Хотите свою книгу? Мы напишем её за вас и сделаем книгу бестселлером. Подробности в боте @Summary_library_bot
🏆 Оставьте след в истории с помощью книги
https://expert-book.pro
Фильмы и сериалы со всей планеты. Мы знаем, что посмотреть, где посмотреть и на что сходить в кино.
Last updated 3 days, 8 hours ago
Все материалы размещены по партнёрской програме ivi.ru | All materials are posted on the partner program ivi.ru
По всем вопросам: @kuzr103
Купить рекламу: https://telega.in/c/k1noxa103
Основной канал: https://t.me/kino_hd2
Last updated 1 month, 2 weeks ago
```
from models import ExtractorResource
class MockExtractor:
def init(self, integration_metadata: dict):
self.integration_metadata = integration_metadata
def get\_resources(self):
for idx in range(5):
yield ExtractorResource(path=f'mock\_s3\_file\_{idx}.csv')
```
ps: можно заматить, экстрактор реализован как генератор (yield) - в конкретном случае не так важно, тк экстрактор не отдает сами данные, а только ссылки (путь на S3 или url и тд)
Остальные классы, реализуем самостоятельно или заглядываем в репу
Каждый из классов имеет единственный аргумент = содержимое yml конфига и ничего более (но это не точно 😉), то есть все необходимые параметры должны быть описаны
в теле самого yml в нужной секции, например, у меня получилось так для экстрактора:
tasks:
extractor:
MockExtractor:
src\_s3\_conection\_id: reddit\_s3\_connection\_id
src\_s3\_bucket: raw\-public
src\_s3\_prefix\_template: reddit/{dm\_date}
src\_s3\_partition\_fmt: '%Y\-%m\-%d'
Осталось встроить наши классы в даг, используем TaskFlow API, пример для экстратора:
```
@task
def _extractor(extractor: t.Callable) -> t.List[str]:
extractor_obj = extractor(
intergation_metadata=intergation_metadata
)
return [resource.dict for resource in extractor\_obj.get\_resources()]
# извлекаем общую структуру тасок
tasks_meta = intergation_metadata.get("tasks", {})
# извлекаем extractor
extractor_name, extractor_params = list(tasks_meta.get("extractor").items())[0]
if not extractor_params:
extractor_params = {}
logging.info([extractor_name, extractor_params])
extractor = globals()[extractor_name]
# возвращаем список объектов
ext_resources = extractor.override(task_id=f"extractor{extractor_name}")(
extractor=extractor)
```
Комментарии:
- декораторная таска принимает только саму фукнцию или класс в нашем случае
- имя экстратора получаем из ямла
- все экстраторы импортируются from extractors import *
- из словаря globals()
получаем объект нужного экстрактора по имени из ямла
- если параметров экстратора нет, то подставляют пустой словарь
- resource.dict
- нужно, тк XComm не знает как сериализовать нашу модель ресурса, поэтому воспользуемся атрибутом dict
. Соответственно внутри таски transform_and_save обратно создадим ресурс
Логика для трансформера и saver сохраняется, добавляется только обработка ситуации,
когда трансформер сохраняет сам объекты и отдает только пути (пустой TransformerResource.content
). Как показала практика: такое нередко
встречается.
Остальное обдумываем сами или заглядываем в репу.
И после загрузки в AirFlow получаем красоту в UI - repo
WALLE - Выстраиваем структуру
В прошлом посте собрали фундамент:
- сформировали yml
- научились его читать
- научились создавать даг из yml метадаты
Пока в даге только пустые таски (start\end
), сейчас будем исправлять это. Во вступительном посте
очертили архитектуру дага, она повторяет процессы E(xtract)T(ransform)L(Save).
Таски transform, save предлагаю объеднить, тк transformer отдает какие-то данные, а saver их сохраняет, то есть таски обмениваются данные, а делать это через
XComm неблагодарное дело, поэтому наши даги будут состоять из двух тасок (минимум):
- extractor
- transformer_and_saver
Исходные данные (например, выгрузка API или файл на S3) в единственном числе (в смысле, что данные ASIS могут быть только одни) поэтому
extractor всегда 1, но может возвращать несколько объектов (например, N путей до файлов)
А вот обработать данные мы уже можем несколькими способами, поэтому transformers может быть несколько, а saver всегда идет в комплекте к transformer.
Для описания тасок выделим секцию tasks
в нашем yml:
version: 2
models:
\- name: mock \# имя интеграции, оно же dag\_id
description: Топ реддитов за последний час \# описание интеграции, оно же dag\_description
dag:
\# dag\_id: "" \# можно переопределить dag\_id != name
schedule\_interval: 0 * * * *
start\_date: '2024\-08\-01'
\# end\_date: '2024\-08\-31'
catchup: False
owner: dwh
tags:
\- mock
\- api\_integration
tasks:
extractor:
MockExtractor:
transformers:
\- MockTransformer:
saver:
MockSaver:
alerting\_chat\_id: \-987654321
alerting\_secret\_name: alerting\_bot\_token
Тут важно проверить, что ошибок в структуре нет и yaml_reader успешно читает такой конфиг (самостоятельно).
Структуру описали, теперь разбираемся что же это за MockExtractor
, MockTransformer
и MockSaver
🤔. А этих товарищей нужно реализовать: то есть
это некие Python-классы, которые реализуют некоторый базовый интерфейс. На текущий момент мы знаем, что extractor что-то передает transformer, этот
в свою очередь передаёт уже данные в saver. Условимся называть то чем обмениваются классы ресурсом (Resource
). Итого у нас будет 3 ресурса:
- ExtractorResource
- TransformerResource
- SaverResource
Ресурс - это объект Python, будем использовать датаклассы (dataclasses
), но до них доберемся чуть позже. А сейчас про функции каждого объекта:
- Extractor (выгружает из API и складывает на S3 (это я называю RAW-слоем = данные ASIS) и отдает далее пути до файлов
или ищет наличие файлов на S3 или FTP и возвращает пути к нужным файлам)
- Transformer (читаем данные по полученным путям, перекодирует согласно нашей логике и отдаёт saver-у набор байтов для сохранения)
- Saver (просто сохраняет байтики на S3 в нужном нам формате\партицировании - это я называю ODS-слой)
Интерфейс у нас будет единообразный, поэтому опишем какие методы должны быть реализованы у каждого класса:
- Extractor - должен иметь метод get_resources
- возвращает генератор объетов ExtractResource
- Transformer - должен иметь метод transform
, который принимает ExtractResource, возвращает объект TransformResource
- Saver - имеет 1 метод save
, который принимает TransformResource и возвращает SaveResource.
Все остальные внутренности каждого класса разрабатываются на усмотрение инженера - творческий процесс однако 😎.
Расчехляем Pycharm и кодируем, ресурсы:
```
@dataclasses.dataclass
class ExtractorResource:
path: str
@dataclasses.dataclass
class TransformerResource:
path: str
content: io.BytesIO
@dataclasses.dataclass
class SaverResource:
path: str
```
Для проверки идеи и работоспособности дага создадим Mock классы, для примера MockExtractor:
МАТЕМАРКЕТИНГ - 2024
Запоздалый пост о конфе матемаркетинг-2024. На фото наша банда на стенде
~~Постояли на стенде~~ Побывали на конференции, впечатления:
- масштабно
- с шиком\блеском
- с Себрантом
- шумно
Инженерная секция пока представлена слабо, согласно программе все рассказывали о том как
построить DataMesh. Не стало исключением и выступление нашего Head of DWH, рассказал:
- о том какие проблемы были и как решали
- как делили ~~апельсин~~ DWH
- где чья ответственность
Интересно было послушать, как это выглядит со стороны, когда сам являешься участником данных перемен🙃
Судя по кол-ву вопросов из зала - выступление получилось топ и заинтересовало многих.
Из интересных докладов можно отметить Рому Бунина о главном качестве аналитика. Это было не просто интересно, это было
визуально приятно, Рома в своем стиле с графиками, динамическим оформлением презы и даже вставками видео рассказал о чем не стоит
забывать аналитика (кажется, не только им):
- харды не главное
- чем выше ваш уровень (middle -> senior) тем важнее для вас софты
Посмотреть на фото Ромы тут
Кажется, это старо как мир: учитесь общаться, договариваться, задавать вопросы, говорить нет,
критически мыслить и ....
Бизнес блог #1
Выжимаю книги до самой сути.
👉 Реклама - @jaMasha
📇 Хотите свою книгу? Мы напишем её за вас и сделаем книгу бестселлером. Подробности в боте @Summary_library_bot
🏆 Оставьте след в истории с помощью книги
https://expert-book.pro
Фильмы и сериалы со всей планеты. Мы знаем, что посмотреть, где посмотреть и на что сходить в кино.
Last updated 3 days, 8 hours ago
Все материалы размещены по партнёрской програме ivi.ru | All materials are posted on the partner program ivi.ru
По всем вопросам: @kuzr103
Купить рекламу: https://telega.in/c/k1noxa103
Основной канал: https://t.me/kino_hd2
Last updated 1 month, 2 weeks ago