Бизнес блог #1
Выжимаю книги до самой сути.
? Реклама - @jaMasha
? Хотите свою книгу? Мы напишем её за вас и сделаем книгу бестселлером. Подробности в боте @Summary_library_bot
? Оставьте след в истории с помощью книги
https://expert-book.pro
Фильмы и сериалы со всей планеты. Мы знаем, что посмотреть, где посмотреть и на что сходить в кино.
Last updated 4 days, 10 hours ago
Все материалы размещены по партнёрской програме ivi.ru | All materials are posted on the partner program ivi.ru
По всем вопросам: @kuzr103
Купить рекламу: https://telega.in/c/k1noxa103
Основной канал: https://t.me/kino_hd2
Last updated 2 weeks, 3 days ago
WALLE - Готовим yml
Во вступительном посте оговорили, что фундамент фреймворка - yml конфиг.
Надо научиться читать его и формировать некую стркутуру для управления фреймворком.
Реализуем базу:
- все конфиги храним в директории metadata
- читаем директорию metadata
- данные из yml храним в переменной, например: integration_meta
- integration_meta
подается на вход функции создания дага create_dag(integration_meta)
Какие минимальные данные необходимы для формирования дага:
- dag_id
- расписание (но это не точно)
- дата старта\окончания (мы же хотим управлять процессом)
- catchup
- куда алертить (помним, что хотим единый алертинг: в нашем случае будет телеграмм, поэтому нужен channel_id + токен)
- общая информация (owner\tags\description)
С одной стороны конфиг должен знать что-то про нашу интеграцию, с другой у него должно быть что-то общее (про даг и про задачу),
попробуем выделять независимые секции в конфиге под эти задачи. Начнем с такого:
version: 2
models:
\- name: reddit \# имя интеграции, оно же dag\_id
description: Топ реддитов за последний час \# описание интеграции, оно же dag\_description
dag:
\# dag\_id: "" \# можно переопределить dag\_id != name
schedule\_interval: 0 * * * *
start\_date: '2024\-08\-01'
\# end\_date: '2024\-08\-31'
catchup: False
alerting\_chat\_id: \-987654321
alerting\_secret\_name: alerting\_bot\_token
owner: dwh
tags:
\- dwh
\- reddit
\- api\_integration
ps: по API будем забирать топ реддитов за последний час (данных там копейки, десятки строк, но нам не так важно кол-во)
Сохраняем наш первый yml ?
.
Учимся читать
.
Ничего необычного, Python библиотека pyyaml уже всё умеет.
От нас достаточно открыть файл, прочитать содержимое как текст и далее прогрузить через библиотеку pyyaml
. Немного шаблонного кода:
```
import yaml
def read_yaml(path_to_file: str):
with open(path_to_file, "r") as f_yaml:
try:
content = yaml.safe_load(f_yaml)
return content
except yaml.YAMLError as e:
print(e)
```
Добавил обработку ошибок try\except
- чуть-чуть защиты от сломанных файлов yml.
Проверяем работу реализованной функции (скрин в репе):
{'models': [{'dag': {'alerting\_chat\_id': \-987654321,
'alerting\_secret\_name': 'alerting\_bot\_token',
'catchup': False,
'owner': 'dwh',
'schedule\_interval': '0 * * * *',
'start\_date': '2024\-08\-01',
'tags': ['dwh', 'reddit', 'api\_integration']},
'description': 'Топ реддитов за последний час',
'name': 'reddit'}],
'version': 2}
yaml.safe_load
возвращает данные в виде python-словаря. Еще немного шаблонного кода для создания AirFlow дага и мы на финишной прямой:
- реализуем функцию create_dag
, она должна возвращать объект DAG
(from airflow import DAG
)
- объект даг нужно положить в globals()
, чтобы AirFlow смог узнать про наш даг
- пусть даг имеет следующую структуру:
- 1 таск группа
- таска start (EmptyOperator)
- таска end
Мои маленькие дата инженеры - это подлежит самостоятельной реализации ?. Мою реализацию посмотреть тут
.
Ну и плюс, код который будет читать все конфиги из директории metadata
(тут в помощь модуль из стандартной либы python os
) и формировать даги. Дерзаем?
У меня получилась такая структура папок:
\- src:
\- /common:
\- func.py
\- /metadata:
\- reddit.yml
\- /yaml\_reader:
\- reader.py
\- walle.py
Загружаем в AirFlow, наслаждаемся картиной (она в скринах)
На этом самая простая часть закончилась?
WALLE
Заботливо дадим красивое имя фреймворку Walle
, почему так:
- он строит фундамент DWH ( всё отгружаем на S3)
- построение фундамента - самая грязная работа (не считая подготовки данных от RAW до STAGE слоя)
- да, это просто красиво ? (не называть же его YetAnotherDagGenerator или FrameWork №Х)
К предыдущему посту вопросиков накидали:
А проблемы при этом возникают, что твоя автоматизация не умеет все, что надо, её надо дописывать,
тестить, рефакторить, иногда не работает… и это ради 10 строчек кода?
автоматизация не умеет всё
- справедливо, но всё и не надо. Применяем правило Парето: рефакторить
- Отвечу вопросом на вопрос: а неавтоматизированные даги рефакторить не надо? Написал один раз и забыл, нетиногда не работает
- справедливо, сделаю отсылку к п2: любой код может не работать =). Также вижу пользу: если не работает то, для всего - нет специфических проблем.тестить
- пффф, какие тесты. уяк - уяк и в продакшенОбщий посыл автоматизации такой:
Нет стремления покрыть 100% кейсов, есть желание написать код, в котором в одних и тех местах вызываются коннекшены, они имеют
одинаковые названия, код переиспользуется (для алертинга используется 1 и только 1 функция или стандартный набор, а не 8 различных фукнций ***?♂️***),
таски\даги \- имеют одинаковый нейминг и одинаковую структуру и тд...
FrameWorkStory1.0
.
Однажды был участником дискуссии: фреймворк или библиотека - одно и тоже или есть разница?. На тот момент объяснить оппоненту не удалось разницу, кажется, у меня не было правильных слов.
Обратимся к определению, на мой вкус эта часть ближе всего описывает что такое фреймворк:
программное обеспечение, облегчающее разработку и объединение разных компонентов большого программного проекта
.
Фреймворк объединяет строительные блоки, таким образом, что они подходят друг к другу. Блоками могут быть как раз библиотеки. Но что важнее фреймворк диктует правила построения архитектуры приложения, задавая на начальном этапе разработки поведение по умолчанию — «каркас», который нужно будет расширять и изменять согласно указанным требованиям.
.
Почему возникло желание:
- писать каждый раз шаблонный код дага для Airflow стало утомительно
- каждый из инженеров придумывает какие-то куски заново
- каждый переиспользует то, что сам видел или сам писал
- разнообразие функций с одинаковым функционалом
- поддержка кучи кастомных процессов
- ...
.
Почему вообще возникает множество кейсов:
- разные источники (API, S3, базы данных и др)
- API бывают сильно разные, как минимум по типу возвращаемого результата (json, csv, excel)
- формат и структура хранения данных во внешних S3 разнятся (однажды смежная команда разработчиков сказала, что они не знают что такое parquet, поэтому выгрузка в csv)
.
Что хочется:
1. Процесс доставки данных до dwh должен быть унифицированным (это L в ETL), например, в dwh стандартно загружаются файлы parquet со сжатием gzip. То есть всё интеграции должны быть приведены к этому формату
2. Процесс извлечения данных должен быть сведён к разработке только специфических адаптеров, например, клиент для работы с API яметрики или API ВыгрузиМеняПозже будут разными, но отдавать в какое-то единое место будут только parquet со сжатием gzip.
3. Создание дагов должно быть автоматизировано (соблюдай принцип DRY)
4. Структура Дага должна быть унифицирована
5. Управление из единого места
6. Сохранять исходники данные
.
Задачка масштабная, решать всё и сразу не получится, поэтому действуем от кейсов, в моем кармане было 2 штуки:
- загрузка csv из внешнего API
- выгрузка из API (API отдаёт json)
.
В результате нескольких подходов вышла такая архитектура:
- yml всему голова ( в yml описываются всё необходимые части пайплайна, как без боли читать yml это отдельная серия)
- конечной точкой процесса является S3
- два слоя: raw (данные as is), ods (gzip.parquet, партицирование по дням в формате YYYY-MM-DD)
- структура пайплайна совпадает с классикой etl (каждая буковка это таска) :
E = extractor ( возвращает коллекцию объектов ExtractorResourse, может быть только один)
T = transformers ( принимает Resourse, преобразует и возвращает TransformResourse, трансформеров может сколько угодно)
L = S = saver (принимает TransformResourse, сохраняет, может быть только один) по факту он неизменен, тк выходной формат стандартизирован и сохранение на S3
.
Зерно в ваши головы закинул, остальное будет дальше?
А пока расскажите:
- а как вам идея
- получится ли все автоматизировать
- и есть ли у вас примеры такой автоматизации/фреймворков
.
ps: у меня есть пример API с которым будут эксперименты, но можно в меня позакидывать что-то оригинальное?
.
#framework #automate
Colorized it!
.
Пост удобства, у коллеги заметил цветовое разделение коннектов к базам в DataGrip - показалось крайне интересным.
Но что, у меня-то Dbeaver, сходу решения не нашлось =(
.
Со второй попытки копания в настройках удалось найти нужную ?
.
Маленькая инструкция в репо.
.
#dbeaver #colorize
Выходим из затишья
,
Почти теория: чем выше рабочая нагрузка тем меньше постов в блогах =)
Станем разрушать эту теорию.
,
За время затишья удалось:
- погрузиться в методологию DV2.0 (datavault) ✊
- разобраться в рабочих фреймворках ?
- написать свои фреймворки ?
- стать teamlead (всё только началось)
.
Обо всем поговорим - планируется несколько серий постов, уххх ? А сейчас о нашем быстром друге Clichouse.
Парочка рабочих кейсов:
- проблема inodes
Что такое inodes: wiki, по-простому, файлы с метаинформацией и как оказалось их кол-во,
хоть и большое, но ограничено. И вот если вставлять очень активно и много, то они могут закончиться и тогда кластер переходит в read-only. Не слабо такое хватануть в понедельник утром, проблему решала поддержка YaCloud
- массовые вставки и отъезд по TTL
Хранение в клике может быть разным:
- на дисках кластера
- на дисках кластера и на внешних дисках (один из вариантов внутренний object_storage), почитать в доке - это гибридное хранилище
.
TTL - это возможность управлять жизненным циклом данных, возможности достаточно интересные, но сейчас нас интересует возможность
изменять тип хранения, то есть по достижении определенного момента времени хранение данных должно измениться, local -> object_storage.
При совокупности некоторых параметров и кейсов может наступить забавная ситуация:
- бекфилл данных
- данные настолько далеко в истории, что сразу наступает TTL
.
Что происходит: вставка идет на локальный диск и сразу данные отъезжают в object_storage и если данных много - получается два очень активных процесса. И это бомба замедленного действия, которая стреляет в ногу = поврежденные файлы, которые ломают процесс отъезда даннных -> накопление данных на локальных дисках -> место заканчивается -> read-only. Положение спасала поддержка YaCloud, так что
не создавайте таких массовых процессов ?
.
Вот такие рабочие кейсы, до встречи?
.
#work #clickhouse
Пусть это будет пятничным мемом во вторник?
.
Когда вы с коллегами решили ввести CI проверку на наличие delete-ов при работа с ClickHouse?
? Collab Practice
.
За время ревьюверства на курсах по Анализу данных и Data Science накопилось небольшое кол-во
Collab-ноутбуков с полезными штуками.
.
Делюсь, может кому-то пригодится:
1️⃣ Пропуски и их заполнение
2️⃣ Как не выстрелить себе в ногу при приведении типов данных
3️⃣ xticks или как самому накинуть на ось Х
4️⃣ Категоризация данных
.
Любые замечания или дополнения принимаются ?
#python #collab #practice
? Прокачай себя до уровня PRO или полезные лайфхаки
.
Воды не будет, только те, конструкции, которые сам нашел и часто использую в Python+Pandas:
1️⃣ Pandas метод assign
Когда нужно создать столбцы на лету:
```
import pandas as pd
df.assign(column_one = lambda row: row["existing_column"] // 1024,
column_two = 24,
....)
```
2️⃣ Pandas метод pipe
Как apply, только для всего датафрейма и принимает аргументы функции:
```
import numpy as np
import pandas as pd
def calculate(df: pd.DataFrame, thr: float = 2.56, columns: list = []):
for col in columns:
df[f"{col}_transformed"] = np.sqrt(df[col]) > 2.56
return df
df.pipe(calculate, thr=5, columns=["value_1", "value_2"])
```
3️⃣ Структуры данных из collections
Если не список и не датафрейм, то точно defaultdict. Обычно использую для сбора результатов в цикле:
```
from collections import defaultdict
res = defaultdict(list) # общая структура словарь, где для каждого ключа значением по-умолчанию будет пустой список
for idx, value enumerate([123, 999, 678]):
res["idx"].append(idx)
res["value"].append(value**2)
res["value"] # это список =)
```
Кейсы:
- подготовка данных для визуализации (например, отдать по API)
- сбор статистики при выполнении сложных SQL запросов
- ...
4️⃣ Модуль itertools
- batched
открыл для себя, когда сам сначала написал такую штуку (немножко изобрел ?)
- combinations
- итератор комбинаций из элементов списка
- zip_longest
- "длинный" брат zip
(если не знал, то zip проходится по самому короткому из списков)
5️⃣ Модуль functools - магия функционального программирования
- partial
- кажется, это самая часто используемая функцию из модуля (ну может после lru_cache()
)
```
import typing
import pandas as pd
import numpy as np
def func(df: pd.DataFrame, columns: list, method: t.Callable = np.mean, window_width: int = 2):
assert window_width == 2, "Ширина окна должна быть 2+!"
for col in columns:
df[f"{col}\_rolling"] = df[col].rolling(window\_width, min\_periods=2).apply(lambda window: method(window))
return df
func_mean = partial(func, columns=["value_1", "value_2"], method=np.median)
# теперь можно так, остальные параметры уже частично применились выше
func_mean(df=df)
```
Кейсы:
- ресерч методов или алгоритмов для решения одной и той же задачи
- отправка уведомлений только нужным адресам (чтобы каждый раз не прописывать их в аргументах)
6️⃣ reduce
- применяет функцию из 2 аргументов итерабельно к списку, на выходе 1 значение
```
l = [0, 1, -5, 7, 8]
# x - первый элемент списка, или результат применения функции к текущему элементу, y - текущий элемент
res = reduce(lambda x, y: x+y, l)
print(res) # 11
```
.
И последнее (но это не точно): не забывайте заглядывать в доки, чтобы не городить монструозные преобразования, которые решаются специальным параметром. Помните, в казалось бы, простой функции pandas.read_csv - ~50 параметров, скорее всего ваш кейс уже там есть?
.
#python #lifehack
? Сам себе Кандинский
.
Финалим серию постов про Clickhouse и наши разборки с погодой.
.
?Рецепт приготовления:
- данные о погоде (вьюха со среднедневными данными + last_point)
- Metabase BI
.
Естественно, пришлось поработать напильником ?, тк Metabase из коробки не умеет ходить в Clickhouse, но
сообщество не дремлет и выкатили версию с его поддержкой.
.
Ссылку на версию Metabase + Clickhouse и внешний вид дашборда найдете в репе (или ниже).
.
? Links:
- Metabase + clickhouse-driver
- Run Metabase in docker
- Get started in Metabase
Визуализация дело тонкое, проэтому если тебе нужно прокачать этот навык, то вот рекомендации:
- Бесплатный курс от DataYoga (Рома Бунин в создателях)
- Визуализация данных и введение в BI-инструменты от ЯПрактикум (Рома Бунин + Александр Богачёв - убийственное комбо) или можно почитать книгу Богачёва Графики, которые убеждают всех
.
#bi #clickhouse #dash #dashboard
Распараллель меня, если сможешь
.
TaskFlow и динамический маппинг тасок в AirFlow. Эти две возможности
также открыли для меня коллеги. Стоит отметить, что программирование дагов для меня
неPythonic way - странный синтаксис, странная логика, как будто неPython внутри Python?♂️
.
Две вышеобозначенные возможности не добавляют питонячности, хотя и можно найти какие-то сходства с
функциональным программированием.
.
TaskFlow - набор удобных декораторов для создания дагов, тасок, как Python-функций. Теперь, вместо:
```
def etl():
pass
with Dag(...) as dag:
etl = Python(task_id="etl", python_callable=etl, ...., da=dag)
```
Можно написать:
```
@dag(....)
def create_dag():
@task
def etl():
pass
create_dag()
```
.
Красивый код получается, когда используются подходы в чистом виде: или классический или декораторы, когда классический стиль и TaskFlow смешиваются получается каша ?. TaskFlow даёт одно очевидное преимущество: TaskFlow сам заботиться о перемещении данных между input\output tasks, как если бы вы использовали обычные функции - это очень круто? Пример ниже:
```
@task
def get_data_from_api() -> List[]:
...
return data
api_data = get_data_from_api()
@task
def etl(data: List[]) -> None:
# transfrom and save data
...
etl = etl(data=api_data)
```
.
Второй интересный паттерн: динамический маппинг тасок. Сегодня у тебя 1 хост, а завтра 7. Сегодня нужно 3 таски за разные дни, а завтра 8. Чтобы на зависеть от неизвестных входящих параметров следует брать на заметку динамический маппинг. Реализуется он довольно просто: у каждого оператора (таска) есть методы, например, partial
, expand
, что это нам дает:
1️⃣ Генерим несколько дат и выполняем etl за каждую
```
@task
def get_dates() -> List[str]:
return ['2024-02-01', '2024-02-04', '2024-02-03']
@task
def etl(dated_at: str) -> None:
# some code, which required from dated_at
dates = get_dates()
# метод expand - позволяет вызвать экземпляр таски для каждого элемента списка
etl = etl.expand(dated_at=dates)
```
2️⃣ На s3 N файлов, нужно однотипно все обработать или просто загрузить
```
@task
def find_s3_files() -> List[str]:
return [{'filepath': 's3://file_0.parquet'},
{'filepath': 's3://file_1.parquet'},
{'filepath': 's3://file_2.parquet'}]
files = find_s3_files()
# используем partial для задания аргументов одинаковых для каждой таски
# выполняем загрузку каждого файла отдельной таской
load = ClickhouseOperator.partial(
task_id="load",
connection_id="conn_id",
sql="""INSERT INTO stg.table SELECT * FROM s3('{{ params.filepath }})'""",
).expand(params=files)
```
.
Ранее такое приходилось реализовывать через циклы, а начиная с версии 2.3 у нас есть удобная возможность - используйте?
.
Самое интересное, что маппить можно не только 1 -> N, но и Nin -> Nout, то есть рельтута одного маппинга подавать на вход другого и работать это будет именно так как ожидается: не сначала N первых тасок, а после N последующих. Таски свяжутся в последовательные кусочки: N1 -> N1, N2 -> N2 и тд ?
Набросал небольшой пример, как можно работать с API при помощи динамического маппинга - покрутите, экспериментируйте.
? Links:
- TaskFlow
- Dynamic Task Mapping
- Create dynamic Airflow tasks (astronomer)
Бизнес блог #1
Выжимаю книги до самой сути.
? Реклама - @jaMasha
? Хотите свою книгу? Мы напишем её за вас и сделаем книгу бестселлером. Подробности в боте @Summary_library_bot
? Оставьте след в истории с помощью книги
https://expert-book.pro
Фильмы и сериалы со всей планеты. Мы знаем, что посмотреть, где посмотреть и на что сходить в кино.
Last updated 4 days, 10 hours ago
Все материалы размещены по партнёрской програме ivi.ru | All materials are posted on the partner program ivi.ru
По всем вопросам: @kuzr103
Купить рекламу: https://telega.in/c/k1noxa103
Основной канал: https://t.me/kino_hd2
Last updated 2 weeks, 3 days ago