Where is data, Lebowski

Description
Канал про разное в data-мире:
- от библиотек визуализации до data egineering
- от графиков до элементов разработки
- от .csv до API
Advertising
We recommend to visit

Бизнес блог #1
Выжимаю книги до самой сути.

👉 Реклама - @jaMasha

📇 Хотите свою книгу? Мы напишем её за вас и сделаем книгу бестселлером. Подробности в боте @Summary_library_bot

🏆 Оставьте след в истории с помощью книги
https://expert-book.pro

Фильмы и сериалы со всей планеты. Мы знаем, что посмотреть, где посмотреть и на что сходить в кино.

Last updated 3 days, 8 hours ago

Все материалы размещены по партнёрской програме ivi.ru | All materials are posted on the partner program ivi.ru

По всем вопросам: @kuzr103
Купить рекламу: https://telega.in/c/k1noxa103
Основной канал: https://t.me/kino_hd2

Last updated 1 month, 2 weeks ago

1 month ago
[​​](https://cdn30.notepost.ru/hqd1WTUHzNKlkT2w-1732030179980.jpeg)

​​
```
from models import ExtractorResource

class MockExtractor:
def init(self, integration_metadata: dict):
self.integration_metadata = integration_metadata

def get\_resources(self): for idx in range(5): yield ExtractorResource(path=f'mock\_s3\_file\_{idx}.csv')

```

ps: можно заматить, экстрактор реализован как генератор (yield) - в конкретном случае не так важно, тк экстрактор не отдает сами данные, а только ссылки (путь на S3 или url и тд)

Остальные классы, реализуем самостоятельно или заглядываем в репу

Каждый из классов имеет единственный аргумент = содержимое yml конфига и ничего более (но это не точно 😉), то есть все необходимые параметры должны быть описаны
в теле самого yml в нужной секции, например, у меня получилось так для экстрактора:

tasks: extractor: MockExtractor: src\_s3\_conection\_id: reddit\_s3\_connection\_id src\_s3\_bucket: raw\-public src\_s3\_prefix\_template: reddit/{dm\_date} src\_s3\_partition\_fmt: '%Y\-%m\-%d'

Осталось встроить наши классы в даг, используем TaskFlow API, пример для экстратора:

```
@task
def _extractor(extractor: t.Callable) -> t.List[str]:
extractor_obj = extractor(
intergation_metadata=intergation_metadata
)

return [resource.dict for resource in extractor\_obj.get\_resources()]

# извлекаем общую структуру тасок
tasks_meta = intergation_metadata.get("tasks", {})

# извлекаем extractor
extractor_name, extractor_params = list(tasks_meta.get("extractor").items())[0]

if not extractor_params:
extractor_params = {}

logging.info([extractor_name, extractor_params])
extractor = globals()[extractor_name]

# возвращаем список объектов
ext_resources = extractor.override(task_id=f"extractor{extractor_name}")(
extractor=extractor)
```

Комментарии:
- декораторная таска принимает только саму фукнцию или класс в нашем случае
- имя экстратора получаем из ямла
- все экстраторы импортируются from extractors import *
- из словаря globals() получаем объект нужного экстрактора по имени из ямла
- если параметров экстратора нет, то подставляют пустой словарь
- resource.dict - нужно, тк XComm не знает как сериализовать нашу модель ресурса, поэтому воспользуемся атрибутом dict. Соответственно внутри таски transform_and_save обратно создадим ресурс

Логика для трансформера и saver сохраняется, добавляется только обработка ситуации,
когда трансформер сохраняет сам объекты и отдает только пути (пустой TransformerResource.content). Как показала практика: такое нередко
встречается.

Остальное обдумываем сами или заглядываем в репу.

И после загрузки в AirFlow получаем красоту в UI - repo

#walle
#framework
#automate

1 month ago

WALLE - Выстраиваем структуру

В прошлом посте собрали фундамент:
- сформировали yml
- научились его читать
- научились создавать даг из yml метадаты

Пока в даге только пустые таски (start\end), сейчас будем исправлять это. Во вступительном посте
очертили архитектуру дага, она повторяет процессы E(xtract)T(ransform)L(Save).
Таски transform, save предлагаю объеднить, тк transformer отдает какие-то данные, а saver их сохраняет, то есть таски обмениваются данные, а делать это через
XComm неблагодарное дело, поэтому наши даги будут состоять из двух тасок (минимум):
- extractor
- transformer_and_saver

Исходные данные (например, выгрузка API или файл на S3) в единственном числе (в смысле, что данные ASIS могут быть только одни) поэтому
extractor всегда 1, но может возвращать несколько объектов (например, N путей до файлов)

А вот обработать данные мы уже можем несколькими способами, поэтому transformers может быть несколько, а saver всегда идет в комплекте к transformer.

Для описания тасок выделим секцию tasks в нашем yml:

version: 2 models: \- name: mock \# имя интеграции, оно же dag\_id description: Топ реддитов за последний час \# описание интеграции, оно же dag\_description dag: \# dag\_id: "" \# можно переопределить dag\_id != name schedule\_interval: 0 * * * * start\_date: '2024\-08\-01' \# end\_date: '2024\-08\-31' catchup: False owner: dwh tags: \- mock \- api\_integration tasks: extractor: MockExtractor: transformers: \- MockTransformer: saver: MockSaver: alerting\_chat\_id: \-987654321 alerting\_secret\_name: alerting\_bot\_token

Тут важно проверить, что ошибок в структуре нет и yaml_reader успешно читает такой конфиг (самостоятельно).

Структуру описали, теперь разбираемся что же это за MockExtractor, MockTransformer и MockSaver🤔. А этих товарищей нужно реализовать: то есть
это некие Python-классы, которые реализуют некоторый базовый интерфейс. На текущий момент мы знаем, что extractor что-то передает transformer, этот
в свою очередь передаёт уже данные в saver. Условимся называть то чем обмениваются классы ресурсом (Resource). Итого у нас будет 3 ресурса:
- ExtractorResource
- TransformerResource
- SaverResource

Ресурс - это объект Python, будем использовать датаклассы (dataclasses), но до них доберемся чуть позже. А сейчас про функции каждого объекта:
- Extractor (выгружает из API и складывает на S3 (это я называю RAW-слоем = данные ASIS) и отдает далее пути до файлов
или ищет наличие файлов на S3 или FTP и возвращает пути к нужным файлам)
- Transformer (читаем данные по полученным путям, перекодирует согласно нашей логике и отдаёт saver-у набор байтов для сохранения)
- Saver (просто сохраняет байтики на S3 в нужном нам формате\партицировании - это я называю ODS-слой)

Интерфейс у нас будет единообразный, поэтому опишем какие методы должны быть реализованы у каждого класса:
- Extractor - должен иметь метод get_resources - возвращает генератор объетов ExtractResource
- Transformer - должен иметь метод transform, который принимает ExtractResource, возвращает объект TransformResource
- Saver - имеет 1 метод save, который принимает TransformResource и возвращает SaveResource.

Все остальные внутренности каждого класса разрабатываются на усмотрение инженера - творческий процесс однако 😎.

Расчехляем Pycharm и кодируем, ресурсы:

```
@dataclasses.dataclass
class ExtractorResource:
path: str

@dataclasses.dataclass
class TransformerResource:
path: str
content: io.BytesIO

@dataclasses.dataclass
class SaverResource:
path: str
```

Для проверки идеи и работоспособности дага создадим Mock классы, для примера MockExtractor:

1 month ago
[​​](https://cdn30.notepost.ru/wa4F7ZJjA5qYvLAU-1732032303858.jpeg)**МАТЕМАРКЕТИНГ - 2024**

​​МАТЕМАРКЕТИНГ - 2024

Запоздалый пост о конфе матемаркетинг-2024. На фото наша банда на стенде

~~Постояли на стенде~~ Побывали на конференции, впечатления:
- масштабно
- с шиком\блеском
- с Себрантом
- шумно

Инженерная секция пока представлена слабо, согласно программе все рассказывали о том как
построить DataMesh. Не стало исключением и выступление нашего Head of DWH, рассказал:
- о том какие проблемы были и как решали
- как делили ~~апельсин~~ DWH
- где чья ответственность

Интересно было послушать, как это выглядит со стороны, когда сам являешься участником данных перемен🙃
Судя по кол-ву вопросов из зала - выступление получилось топ и заинтересовало многих.

Из интересных докладов можно отметить Рому Бунина о главном качестве аналитика. Это было не просто интересно, это было
визуально приятно, Рома в своем стиле с графиками, динамическим оформлением презы и даже вставками видео рассказал о чем не стоит
забывать аналитика (кажется, не только им):
- харды не главное
- чем выше ваш уровень (middle -> senior) тем важнее для вас софты

Посмотреть на фото Ромы тут

Кажется, это старо как мир: учитесь общаться, договариваться, задавать вопросы, говорить нет,
критически мыслить и ....

#matemarketing

We recommend to visit

Бизнес блог #1
Выжимаю книги до самой сути.

👉 Реклама - @jaMasha

📇 Хотите свою книгу? Мы напишем её за вас и сделаем книгу бестселлером. Подробности в боте @Summary_library_bot

🏆 Оставьте след в истории с помощью книги
https://expert-book.pro

Фильмы и сериалы со всей планеты. Мы знаем, что посмотреть, где посмотреть и на что сходить в кино.

Last updated 3 days, 8 hours ago

Все материалы размещены по партнёрской програме ivi.ru | All materials are posted on the partner program ivi.ru

По всем вопросам: @kuzr103
Купить рекламу: https://telega.in/c/k1noxa103
Основной канал: https://t.me/kino_hd2

Last updated 1 month, 2 weeks ago