# 01_1_tensors_Аль-Натор.ipynb
# Markdown:
# Знакомство с `torch.Tensor`

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы: 
* Deep Learning with PyTorch (2020) Авторы: Eli Stevens, Luca Antiga, Thomas Viehmann
* https://pytorch.org/docs/stable/torch.html

import torch as th
import random
import numpy as np
import time
import matplotlib.pyplot as plt
# Markdown:
## Задачи для совместного разбора
# Markdown:
1\. Создайте тензор и исследуйте его основные характеристики
t = th.randint(10, (2, 2, 2)) 
t
t.shape, t.dtype, th.numel(t), t.device
# Markdown:
2\. Создайте трехмерный тензор и рассмотрите основные способы индексирования по нему
t = th.randint(10, (3, 3, 3)) 
t
t[0, 0, 0]
# Markdown:
3\. Создайте тензор (4х4) и модифицируйте следующим образом: ко всем четным столбцам прибавьте 1, из нечетных вычтите 1.
t = th.randint(10, (4, 4)) 
t
t[::, ::2] = t[::, ::2] - 1
t[::, 1::2] = t[::, 1::2] + 1
t
# Markdown:
4\. Обсудите совместимость `torch` с `numpy` и `sklearn`
th.randint(10, (2, 2, 2), device='cuda')
# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class="task" id="1"></p>

1\. Создайте двумерный тензор размера (100000, 10), заполненный нулями. Используя прихотливое индексирование, поставьте в каждой строке тензора ровно одну единицу в случайно выбранном столбце. Рассчитайте и выведите на экран вероятности $p_i$ того, что для случайно выбранной строки в столбце $i$ будет стоять единица.

- [x] Проверено на семинаре
t = th.zeros(100000, 10)
t
t[th.arange(100000), th.randint(0, 10, (100000,))] = 1
t
t.mean(dim=0)
# Markdown:
<p class="task" id="2"></p>

2\. При помощи прихотливого индексирования для двумерного тензора размерности (10, 10), состоящего из случайных целых чисел в пределах от 0 до 10, получите тензор элементов, находящихся сразу над  побочной диагональю.

- [x] Проверено на семинаре
t = th.randint(10, (10, 10))
t
t[th.arange(9), th.arange(8, -1, -1)]
# Markdown:
<p class="task" id="3"></p>

3\. Создайте двумерный тензор $t$ размерности (5, 5), состоящий из случайных чисел в пределах от 0 до 100. Обнулить все значения в массиве, расположенные вне квадрата размера 3х3 вокруг максимального элемента. Если максимумов несколько, обнулите элементы около любого из них.

- [x] Проверено на семинаре
t = th.randint(100, (5, 5))
t
max_index = th.where(t == t.max())[0][0], th.where(t == t.max())[1][0]
max_index
i, j = max_index
t_new = t.clone().detach()
t_new[0:max(0, i - 1), :] = 0
t_new[min(5, i + 2):, :] = 0
t_new[:, 0:max(0, j - 1)] = 0
t_new[:, min(5, j + 2):] = 0

t_new
# Markdown:
<p class="task" id="4"></p>

4\. Создайте трехмерный массив размерности (2, 5, 5) на основе решения задачи 3 (объедините исходный и результирущий тензор вдоль нулевой оси). Сохраните полученный трехмерный тензор в файл `tensor.pt`. Загрузите полученный тензор и покажите, что все элементы двух тензоров совпадают.

- [x] Проверено на семинаре
t_stacked = th.stack((t, t_new), dim=0)
t_stacked
t_stacked.shape
th.save(t_stacked, 'saved_tensor.pt')
t_loaded = th.load('saved_tensor.pt')
t_loaded
th.equal(t_loaded, t_stacked)
# Markdown:
<p class="task" id="5"></p>

5\. Создайте четырехмерный массив `t` размерности (2, 3, 5, 5), заполненный случайными целыми числами от 1 до 10 (сами значения должны быть представлены типом float32). Рассчитайте среднее значение для каждого двумерного тензора `t[i, j, :, :]`. Представьте результат в виде трехмерного тензора размера (2, 3, 1).

- [x] Проверено на семинаре
t = th.randint(10, (2, 3, 5, 5), dtype=th.float32)
t.mean(dim=(2, 3), keepdim=True)
# Markdown:
<p class="task" id="6"></p>

6\. Создайте одномерный тензор размера `N=100_000_000`, заполненный числами из экспоненциального распредления с параметром $\lambda=5$. Рассчитайте значения для построения гистограммы при помощи пакета `torch`. Визуализируйте гистограмму. Проверьте возможность использования GPU. При наличии GPU перенесите созданный тензор в память GPU, повторите вычисления. Сравните время расчетом с и без использования GPU.

- [x] Проверено на семинаре
t = th.distributions.Exponential(0.2).sample((100_000_000,))
start_time = time.time()

hist_values, bin_edges = th.histogram(t, bins=100)
%timeit plt.hist(hist_values, bins=100, density=True)

plt.show()
device = th.device("cuda" if th.cuda.is_available() else "cpu")
device
t = t.to(device)

%timeit hist_values = th.histc(t, bins=100)
# Markdown:
<p class="task" id="7"></p>

7\. Создайте четырехмерный тензор размера (10, 6, 6, 3), заполненный случайными целыми числами от 0 до 255. Считая, что данный тензор представляет собой батч из 10 картинок размера 6х6 в формате RGB, измените тензор следующим образом. Для оттенков красного обнулите все столбцы, кроме первых двух; для оттенков зеленого обнулите третий и четвертый столбцы; для оттенков синего обнулите пятый и шестой столбцы. Для выполнения задания используйте механизм распространения.

- [ ] Проверено на семинаре
def plot_images(tensor_batch, batch_size=10):
    t_np = tensor_batch.numpy()
    fig, axs = plt.subplots(1, batch_size, figsize=(20, 2))
    for i in range(batch_size):
        axs[i].imshow(t_np[i].astype(np.uint8))
        axs[i].axis('off')
    plt.show()

t = th.randint(256, (10, 6, 6, 3)) 
plot_images(t)
t[:, :, :, 0] *= th.tensor([1, 1, 0, 0, 0, 0]) 
t[:, :, :, 1] *= th.tensor([1, 1, 0, 0, 1, 1])  
t[:, :, :, 2] *= th.tensor([1, 1, 1, 1, 0, 0]) 
plot_images(t)


# 01_2_data_Аль-Натор.ipynb
# Markdown:
# Подготовка данных для обучения моделей

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* https://pytorch.org/docs/stable/data.html
* https://pytorch.org/tutorials/beginner/data_loading_tutorial.html
* Deep Learning with PyTorch (2020) Авторы: Eli Stevens, Luca Antiga, Thomas Viehmann

# Markdown:
## Задачи для совместного разбора
# Markdown:
1. Создайте синтетический датасет для задачи регрессии и представьте его в виде `torch.utils.data.Dataset`
import torch as th
from torch.utils.data import Dataset
from sklearn.datasets import make_regression
X, y = make_regression(n_samples=1000, n_features=10)
X.shape, y.shape, type(X)
from typing import Callable

class RegressionDataset(Dataset):
  def __init__(self, transform: Callable | None = None, **kwargs):
    super().__init__()
    self.X, self.y = make_regression(**kwargs)
    self.transform = transform

  def custom_method(self):
    ...

  def __getitem__(self, idx):
    x = self.X[idx]
    if self.transform is not None:
      x = self.transform(x)
    y = self.y[idx]

    return x, y

  def __len__(self):
    return len(self.X)
def f(x):
  return x

f(5)
class MyCallable:
  def __call__(self, x):
    return x
c = MyCallable()
c(5)
dataset = RegressionDataset(n_samples=1000, n_features=10)
dataset[0]
import numpy as np


def my_transformer(x: np.ndarray) -> np.ndarray:
  return 1000 * x
class MyCallable:
  def __init__(self, coef: int) -> None:
    self._coef = coef

  def __call__(self, x: np.ndarray) -> np.ndarray:
    return self._coef * x
dataset = RegressionDataset(
    transform=my_transformer,
    n_samples=1000,
    n_features=10
)
dataset[0]
dataset = RegressionDataset(
    transform=MyCallable(coef=10000),
    n_samples=1000,
    n_features=10
)
dataset[0]
dataset[:2]
from torch.utils.data import random_split
train, val, test = random_split(dataset, lengths=[0.7, 0.15, 0.15])
train[0]
len(train)
from torch.utils.data import DataLoader
loader = DataLoader(train, 64, )
iter(loader)
loader = DataLoader(train, 64, )

it = iter(loader)
x, y = next(it)
x.shape, y.shape
for x, y in loader:
  print(x.shape, y.shape)
  # break
700 % 64
# Markdown:
## Задачи для самостоятельного решения
import pandas as pd
import torch as th
from torch.utils.data import DataLoader, Dataset, random_split
from typing import Callable
from sklearn.preprocessing import OrdinalEncoder
from typing import Any
# Markdown:
<p class="task" id="1"></p>

1\. Считайте файл `bank-full.csv` ([источник](https://www.kaggle.com/datasets/hariharanpavan/bank-marketing-dataset-analysis-classification)) в виде `pd.DataFrame`.

Опишите класс `BankDatasetBase`. Решение должно удовлетворять следующим критериям:

* класс наследуется от `torch.utils.data.Dataset`;
* при создании объекта в конструктор передается набор данных в виде `pd.DataFrame`;
* объекты класса имеют поля `X` и `y` с признаками и метками соответственно;
* класс реализует интерфейс последовательностей (`__getitem__` + `__len__`);
* `obj[i]` возвращает кортеж, содержащий `i`-ую строку из `obj.X` (серию) и `i`-ую строку из `obj.y` (строку).
    
Создайте объект класса `BankDatasetBase` и продемонстрируйте работоспособность.

- [ ] Проверено на семинаре
data = pd.read_csv('bank-full.csv')
data.head()
data.info()
class BankDatasetBase(Dataset):
    def __init__(self, data: pd.DataFrame) -> None:
        super().__init__()
        self.X, self.y = data.drop('y', axis=1), data.y
        
    def __getitem__(self, idx: int) -> tuple:
        return self.X.iloc[idx], self.y.iloc[idx]

    def __len__(self) -> int:
        return self.y.shape[0]
bdb = BankDatasetBase(data)
len(bdb)
bdb[0]
# Markdown:
<p class="task" id="2"></p>

2\. Опишите класс `BankDataset`. Решение должно удовлетворять всем критериям из предыдущего задания, а также:
* при создании объекта в конструктор может быть передан необязательные аргументы `transform` и `target_transform`;
* если аргумент `transform` был передан, то при получении `i`-го элемента, нужно вызвать `transform(x)` и вернуть полученный результат.
* если аргумент `target_transform` был передан, то при получении `i`-го элемента, нужно вызвать `target_transform(y)` и вернуть полученный результат.

Создайте объект класса `BankDataset` и продемонстрируйте работоспособность (без передачи `target_transform` и `transform`).

- [ ] Проверено на семинаре
class BankDataset(Dataset):
    def __init__(
            self,
            data: pd.DataFrame,
            transform: Callable | None = None,
            target_transform: Callable | None = None
    ) -> None:
        super().__init__()
        self.X, self.y = data.drop('y', axis=1), data.y
        self.transform = transform
        self.target_transform = target_transform

    def __getitem__(self, idx: int) -> tuple:
        x = self.X.iloc[idx]
        y = self.y.iloc[idx]
        if self.transform is not None:
            x, y  = self.transform(x, y)
        if self.target_transform is not None:
            x, y = self.target_transform(x, y)
        return x, y

    def __len__(self) -> int:
        return self.y.shape[0]
bd = BankDataset(data)
len(bd)
bd[0]
# Markdown:
<p class="task" id="3"></p>

3\. Опишите класс `OrdinalEncoderTransform`. Решение должно удовлетворять следующим критериям:

* при создании объекта в конструктор передаются названия нечисловых столбцов в датасете
* класс реализует интерфейс `Callable` (`__call__`); метод `__call__` имеет один параметр (признаки) и возвращает набор признаков, в котором нечисловые характеристики закодированы целыми числами;
* состояние объекта (индексы для кодирования) обновляется в момент очередного вызова `__call__` (т.е. все данные сразу никогда не передаются никакому методу объекта).
    
Продемонстрируйте работоспособность, создав объект `BankDataset` и передав при создании объект класса `OrdinalEncoderTransform`.

- [ ] Проверено на семинаре
class OrdinalEncoderTransform:
    def __init__(self, category_columns: list[str]) -> None:
        self.category_columns = category_columns
        self.category_indexers = {col: {} for col in category_columns}

    def __call__(self, x: pd.Series, y: str) -> pd.Series:
        for col in self.category_columns:
            if col in x:
                if x[col] not in self.category_indexers[col]:
                    self.category_indexers[col][x[col]] = len(self.category_indexers[col])
                x[col] = self.category_indexers[col][x[col]]
        return x, y
pd.options.mode.copy_on_write = True
cats = data.drop('y', axis=1).select_dtypes(include=['object', 'category']).columns
encoder = OrdinalEncoderTransform(cats)
bd = BankDataset(data, transform=encoder)
bd[0]
# Markdown:
<p class="task" id="4"></p>

4\. Опишите класс `LabelEncoderTransform`. Решение должно удовлетворять следующим критериям:

* класс реализует интерфейс `Callable` (`__call__`); метод `__call__` имеет один параметр (строку) и возвращает целое число, соответствующее этой строке;
* состояние объекта (индексы для кодирования) обновляется в момент очередного вызова `__call__` (т.е. все данные сразу никогда не передаются никакому методу объекта).
    
Продемонстрируйте работоспособность, создав объект `BankDataset` и передав при создании объекта в качестве аргумента `target_transform` объект класса `LabelEncoderTransform`.

- [ ] Проверено на семинаре
class LabelEncoderTransform:
    def __init__(self) -> None:
        self.label_to_index = {}
        
    def __call__(self, X: pd.Series, label: str) -> int:
        if label not in self.label_to_index:
            self.label_to_index[label] = len(self.label_to_index)
        return X, self.label_to_index[label]
label_encoder = LabelEncoderTransform()
bd = BankDataset(data, target_transform=label_encoder)
bd[0]
# Markdown:
<p class="task" id="5"></p>

5\. Опишите класс `ToTensor`.  Решение должно удовлетворять следующим критериям:
* класс реализует интерфейс `Callable` (`__call__`); метод `__call__` принимает на вход серию или фрейм и возвращает тензор.

Опишите класс `Compose`.  Решение должно удовлетворять следующим критериям:
* при создании объекта в конструктор передается список объектов `transforms`, каждый из которых имеет метод `__call__(x, y)`;
* класс реализует интерфейс `Callable` (`__call__`); метод `__call__` принимает имеет параметра (признаки и класс в числовом виде) и и возвращает кортеж, полученный путем последовательного вызова объектов из `transforms`.

Продемонстрируйте работоспособность, создав объект `BankDataset` и передав при создании преобразования `Compose` список из объектов LabelEncoderTransform и ToTensor.

- [ ] Проверено на семинаре
class ToTensor:
    def __call__(self, X: pd.Series | int, y: str) -> th.Tensor:
        return th.tensor(X.values.astype(float), dtype=th.float32), y

class Compose:
    def __init__(self, transforms: list[Transform]) -> None:
        self.transforms = transforms

    def __call__(self, X: Any, y: Any) -> Any:
        for transform in self.transforms:
            if isinstance(transform, LabelEncoderTransform):
                X, y = transform(X, y)
            else:
                X, y = transform(X, y)
        return X, y
comp = Compose([OrdinalEncoderTransform(cats), LabelEncoderTransform(), ToTensor()])
bd = BankDataset(data, transform=comp)
bd[0]
# Markdown:
<p class="task" id="6"></p>

6\. Разделите датасет из предыдущего задания на обучающую и тестовую выборку в соотношении 75% на 25%. Создайте объект `DataLoader` для получения пакетов размера 64, полученных из перемешанного обучающего датасета. Кастомизируйте `DataLoader` таким образом, чтобы пакет признаков был представлен в виде трехмерного тензора размера 64x2x8 (разделите 16 признаков на два тензора по 8). Получите один пакет и выведите на экран размерность тензоров пакета.

- [ ] Проверено на семинаре
def collate_fn(batch):
    x, y = zip(*batch)
    x = th.stack(x).view(-1, 2, 8)
    y = th.tensor(y)
    return x, y
train_data, test_data = random_split(bd, lengths=[0.75, 0.25])
train_loader = DataLoader(train_data, shuffle=True, batch_size=64, collate_fn=collate_fn)
it = iter(train_loader)
x, y = next(it)
for X_batch, y_batch in train_loader:
    print(X_batch.shape)  
    print(y_batch.shape)  


# 02_1_nn_derivatives (1) (2).ipynb
# Markdown:
# Дифференцирование

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.axhline.html#matplotlib.pyplot.axhline
* https://numpy.org/doc/stable/reference/generated/numpy.log1p.html#numpy.log1p
* https://docs.sympy.org/latest/tutorials/intro-tutorial/calculus.html
* https://en.wikipedia.org/wiki/Finite_difference
* https://pythonnumericalmethods.berkeley.edu/notebooks/chapter20.02-Finite-Difference-Approximating-Derivatives.html
* https://en.wikipedia.org/wiki/Gradient_descent
* https://pytorch.org/tutorials/beginner/blitz/autograd_tutorial.html
* https://zhang-yang.medium.com/the-gradient-argument-in-pytorchs-backward-function-explained-by-examples-68f266950c29
# Markdown:
## Задачи для совместного разбора
# Markdown:
1\. Дана функция $f(x) = x^2$. Найдите производную этой функции различными способами
def f(x: float) -> float:
  return x**2
def dfdx(x: float) -> float:
  return 2 * x
f(2), dfdx(2)
from typing import Callable


def dfdx_finite(f: Callable, x: float, h: float) -> float:
  return (f(x+h) - f(x)) / h
dfdx_finite(f, 2, 1e-5)
import torch as th
x = th.tensor(10.0, requires_grad=True)
x
def g(x: th.Tensor) -> th.Tensor:
  return x ** 2
y = g(x)
y
y.backward()
x.grad # dy/dx
type(x)
th.tensor
x = th.tensor(10.0, requires_grad=True)
y = f(x)
y.backward()
grad = x.grad

gamma = 0.01
# x = x - gamma * grad
x -= gamma * grad
x = th.tensor([10.0, 20.0, 30.0], requires_grad=True)
y = g(x)
# y = [x1**2, x2**2, x3**2]
y.backward()
x1, x2, x3 = x
J = th.tensor(
    [[2*x1, 0, 0],
    [0, 2*x2, 0],
    [0, 0, 2*x3],]
)
J
z = th.ones((3, 1))
J @ z
x = th.tensor([10.0, 20.0, 30.0], requires_grad=True)
y = g(x)
# y = [x1**2, x2**2, x3**2]
y.backward(th.ones((3, )))
x.grad
# Markdown:
## Задачи для самостоятельного решения
import numpy as np
import matplotlib.pyplot as plt
import torch as th
import sympy as sp
sp.init_printing()
x = sp.symbols('x')
# Markdown:
<p class="task" id="1"></p>

1\. Дана функция $f(x)$. Найдите (аналитически) производную данной функции $f'(x)$ и реализуйте две этих функции. Постройте в одной системе координат графики $f(x)$, $f'(x)$ и $g(x) = 0$ на отрезке [1, 10]. Изобразите графики различными цветами и включите сетку.

$$f(x) = \frac{sin(x)}{\ln(x) + 1}$$

- [ ] Проверено на семинаре
f_x = sp.sin(x) / (sp.log(x) + 1)
df_x = sp.diff(f_x)
f_x, df_x
f = sp.lambdify(x, f_x, "numpy")
f_df = sp.lambdify(x, df_x, "numpy")
X = np.linspace(1.001, 10, 10000)
plt.figure(figsize=(10, 6))
plt.plot(X, f(X), label='$f(x)$', color='blue')
plt.plot(X, f_df(X), label="$f'(x)$", color='red')
plt.axhline(0, color='green', label='$g(x) = 0$', linestyle='--') 
plt.title("Графики $f(x)$, $f'(x)$ и $g(x) = 0$ на отрезке [1, 10]")
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
plt.grid(True)
plt.show()
# Markdown:
<p class="task" id="2"></p>

2\. Дана функция $f(x)$. Найдите (численно) производную данной функции $f'(x)$ на отрезке [1, 10]. Постройте в одной системе координат график $f(x)$, $f'(x)$ и $g(x) = 0$. Изобразите графики различными цветами и включите сетку.

$$f(x) = \frac{sin(x)}{\ln(x) + 1}$$

- [ ] Проверено на семинаре
f = lambda x: np.sin(x) / (np.log(x) + 1)
X = np.linspace(1.001, 10, 10000)
y = f(X)
diff = np.array([(y[i] - y[i-1])/(X[i] - X[i-1]) for i in range(1, len(y))])

plt.figure(figsize=(10, 6))
plt.plot(X, f(X), label='$f(x)$', color='blue')
plt.plot(X[1:], diff, label="$f'(x)$ (численное дифференцирование)", color='red')
plt.axhline(0, color='green', label='$g(x) = 0$', linestyle='--') 
plt.title("Графики $f(x)$, $f'(x)$ (численно) и $g(x) = 0$ на отрезке [1.1, 10]")
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
plt.grid(True)
plt.show()
t = th.tensor(X, requires_grad=True)
X = np.linspace(1.001, 10, 10000)
def f(x: th.Tensor) -> th.Tensor:
    return th.sin(x) / (th.log(x) + 1)

y = f(t)
y.backward(gradient=th.ones((t.shape[0],)))

plt.figure(figsize=(10, 6))
plt.plot(X, [float(i) for i in y], color='b', label='f(x)')
plt.plot(X, [float(i) for i in t.grad], color='g', label='f\'(x)')
plt.plot(X, [0 for _ in X], color='r', label='Ось X', linestyle='--')
plt.title("Графики $f(x)$, $f'(x)$ (численно) и $g(x) = 0$ на отрезке [1.1, 10]")
plt.xlabel('x')
plt.ylabel('y')
plt.grid(True)
plt.legend()
plt.show()
# Markdown:
<p class="task" id="3"></p>

3\. Найдите локальный минимум функции $f(x)$ при помощи метода градиентного спуска. В качестве начальной точки используйте $x_0 = 4$. Найдите локальный максимум этой же функции, используя в качестве начальной точки $x_0'=9$.

$$f(x) = \frac{sin(x)}{\ln(x) + 1}$$

- [ ] Проверено на семинаре
def f(x):
    return th.sin(x) / (th.log(x) + 1)

def gradient_descent_minimize(f, x_init, lr=0.01, epochs=1000):
    x = th.tensor(x_init, dtype=th.float32, requires_grad=True)  
    optimizer = th.optim.SGD([x], lr=lr)      
    for epoch in range(epochs):
        optimizer.zero_grad()  
        y = f(x)               
        y.backward()          
        optimizer.step()       
    return x.item()                 

gradient_descent_minimize(f, x_init=4, lr=0.01, epochs=1000)
def gradient_descent_maximize(f, x_init, lr=0.01, epochs=1000):
    x = th.tensor(x_init, dtype=th.float32, requires_grad=True)
    optimizer = th.optim.SGD([x], lr=lr)
    for epoch in range(epochs):
        optimizer.zero_grad()
        y = -f(x) 
        y.backward()
        optimizer.step()
    return x.item()

gradient_descent_maximize(f, x_init=9, lr=0.01, epochs=1000)
# Markdown:
<p class="task" id="4"></p>

4\. Дана функция $f(x)$. Найдите (используя возможности по автоматическому дифференцированию пакета `torch`) производную данной функции $f'(x)$ на отрезке [0, 10]. Постройте в одной системе координат график $f(x)$, $f'(x)$ и $g(x) = 0$ на полуинтервале (0, 10]. Изобразите графики различными цветами и включите сетку.

$$f(x) = \frac{sin(x)}{\ln(x) + 1}$$

- [ ] Проверено на семинаре
def f(x):
    return th.sin(x) / (th.log(x) + 1)

x_values = th.linspace(0.1, 10, 500, dtype=th.float32, requires_grad=True)
f_values = f(x_values)
f_values.sum().backward()  
f_prime_values_np = x_values.grad.numpy()
f_values_np = f_values.detach().numpy()
x_values_np = x_values.detach().numpy()

plt.figure(figsize=(10, 6))
plt.plot(x_values_np, f_values_np, label='$f(x)$', color='blue')
plt.plot(x_values_np, f_prime_values_np, label="$f'(x)$", color='red')
plt.plot(x_values_np, np.zeros_like(x_values_np), label='$g(x) = 0$', color='green', linestyle='--')
plt.title("Графики $f(x)$, $f'(x)$ и $g(x)=0$ на интервале (0, 10]")
plt.xlabel("x")
plt.ylabel("y")
plt.legend()
plt.grid(True)  
plt.xlim(0, 10)
plt.ylim(-2, 2) 
plt.show()
# Markdown:
<p class="task" id="5"></p>

5\. Дана функция $f(x)$. Найдите производную данной функции $f'(x)$ на отрезке [0, 10] при помощи формулы производной сложной функции. На этом же отрезке найдите, используя возможности по автоматическому дифференцированию пакета `torch`. Сравните результаты.

$$f(x) = sin(cos(x))$$

- [ ] Проверено на семинаре
def g(x: th.Tensor) -> th.Tensor:
    return th.cos(x)

def h(x: th.Tensor) -> th.Tensor:
    return th.sin(x)

def dfdg(x: th.Tensor) -> th.Tensor:
    return th.cos(g(x))

def dgdx(x: th.Tensor) -> th.Tensor:
    return -th.sin(x)

def dfdx(x: th.Tensor) -> th.Tensor:
    return dfdg(x) * dgdx(x)

x_values = th.linspace(0, 10, 500)
manual_derivative = dfdx(x_values).detach().numpy()
x_values_autograd = x_values.clone().detach().requires_grad_(True)
f_values_autograd = h(g(x_values_autograd))  
f_values_autograd.sum().backward()  
autograd_derivative = x_values_autograd.grad.numpy() 
x_values_np = x_values.detach().numpy()

plt.figure(figsize=(10, 6))
plt.plot(x_values_np, manual_derivative, label='Ручная производная', color='blue')
plt.plot(x_values_np, autograd_derivative, label='Автодифференцирование', color='red', linestyle='--')
plt.title("Сравнение производных на интервале [0, 10]")
plt.xlabel("x")
plt.ylabel("f'(x)")
plt.legend()
plt.grid(True)
plt.show()


# 02_4_torch_nn_regression (3).ipynb
# Markdown:
#  Решение задачи регрессии при помощи пакета `torch`. Метрики.

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Deep Learning with PyTorch (2020) Авторы: Eli Stevens, Luca Antiga, Thomas Viehmann
* https://pytorch.org/docs/stable/nn.html
* https://pytorch.org/docs/stable/optim.html
* https://github.com/Lightning-AI/torchmetrics
* https://pytorch.org/docs/stable/generated/torch.no_grad.html
* https://pytorch-lightning.readthedocs.io/en/2.1.2/pytorch/ecosystem/metrics.html#torchmetrics
# Markdown:
## Задачи для совместного разбора
from torchtyping import TensorType, patch_typeguard
from typeguard import typechecked
import torch as th

Scalar = TensorType[()]
patch_typeguard()
# Markdown:
1\. Используя реализацию из `torch.nn`, решите задачу регрессии. Для расчета градиентов воспользуйтесь возможностями по автоматическому дифференцированию `torch`. В качестве функции потерь используйте собственную реализацию MSE. Для настройки весов реализуйте пакетный градиентный спуск с использованием `torch.optim.SGD`.
from sklearn.datasets import make_regression
import torch as th

X, y, coef = make_regression(n_features=4, n_informative=4, coef=True, bias=0.5, random_state=42)
X = th.FloatTensor(X)
y = th.FloatTensor(y)
import torch.nn as nn

class SyntRegressionModel(nn.Module):
  def __init__(self, n_inputs: int, n_hidden: int) -> None:
    super().__init__()

    self.fc1 = nn.Linear(in_features=n_inputs, out_features=n_hidden)
    self.fc2 = nn.Linear(in_features=n_hidden, out_features=1)

    self.relu = nn.ReLU()

  def forward(self, X: th.Tensor) -> th.Tensor:
    out = self.fc1(X)
    # out = out.relu()
    out = self.relu(out)
    out = self.fc2(out)
    return out
model = SyntRegressionModel(n_inputs=4, n_hidden=2)
# y_pred = model.forward(X)
y_pred = model(X)
model.fc1.weight
y_pred.shape
n_inputs = 4
n_hidden = 1

model = nn.Sequential(
    nn.Linear(in_features=n_inputs, out_features=n_hidden),
    nn.ReLU(),
    nn.Linear(in_features=n_hidden, out_features=1),
)
y_pred = model(X)
criterion = nn.MSELoss()

loss = criterion(y_pred.flatten(), y)
loss
loss.backward()

import torch.optim as optim

optimizer = optim.SGD(model.parameters(), lr=0.01)
optimizer.step()
optimizer.zero_grad()
n_inputs = 4
n_hidden = 1

model = nn.Sequential(
    nn.Linear(in_features=n_inputs, out_features=n_hidden),
    nn.ReLU(),
    nn.Linear(in_features=n_hidden, out_features=1),
)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

for _ in range(100):
  # forward pass
  y_pred = model(X)
  loss = criterion(y_pred.flatten(), y)

  # backprop
  loss.backward()

  # gradient descend
  optimizer.step()
  optimizer.zero_grad()
loss
from torch.utils.data import TensorDataset, DataLoader

dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)

n_inputs = 4
n_hidden = 1

model = nn.Sequential(
    nn.Linear(in_features=n_inputs, out_features=1),
    # nn.ReLU(),
    # nn.Linear(in_features=n_hidden, out_features=1),
)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)

epoch_losses = []
for epoch in range(100):
  epoch_loss = 0
  for X_batch, y_batch in loader:
    # forward pass
    y_pred = model(X_batch)
    loss = criterion(y_pred.flatten(), y_batch)
    epoch_loss += loss
    # backprop
    loss.backward()
    
    # gradient descend
    optimizer.step()
    optimizer.zero_grad()
  epoch_loss = epoch_loss / len(loader)
  epoch_losses.append( epoch_loss.item())
  print(epoch, epoch_loss.item())
import matplotlib.pyplot as plt

plt.plot(epoch_losses)
def train(model, loader, n_epochs, criterion, optimizer, print_every):
  model.train()
  pass
@th.no_grad()
def eval(model):
  model.eval()
with th.no_grad():
  y_pred = model(X)
  print(y_pred.flatten())
with th.no_grad():
  y_pred = model(X)
from sklearn.metrics import r2_score
r2_score(y, y_pred.flatten())
#!pip install torchmetrics
import torchmetrics as M

from torch.utils.data import TensorDataset, DataLoader

dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)

n_inputs = 4
n_hidden = 1

model = nn.Sequential(
    nn.Linear(in_features=n_inputs, out_features=1),
    # nn.ReLU(),
    # nn.Linear(in_features=n_hidden, out_features=1),
)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)

epoch_losses = []
for epoch in range(100):
  epoch_loss = 0
  r2_metric = M.R2Score()
  for X_batch, y_batch in loader:
    # forward pass
    y_pred = model(X_batch)
    loss = criterion(y_pred.flatten(), y_batch)
    epoch_loss += loss
    r2_metric.update(y_pred.flatten(), y_batch)
    # backprop
    loss.backward()

    # gradient descend
    optimizer.step()
    optimizer.zero_grad()
  epoch_loss = epoch_loss / len(loader)
  epoch_losses.append( epoch_loss.item())
  r2_epoch = r2_metric.compute()
  print(epoch, epoch_loss.item(), r2_epoch)
# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class='task' id='1'></p>

1\. Используя реализацию полносвязного слоя из `torch.nn` решите задачу регрессии. В качестве функции потерь используйте реализацию MSE из `torch.nn`. Для настройки весов реализуйте мини-пакетный градиентный спуск с использованием `torch.optim.SGD`. Для создания модели опишите класс `SineModel`.

Предлагаемая архитектура нейронной сети:
1. Полносвязный слой с 100 нейронами
2. Активация ReLU
3. Полносвязный слой с 1 нейроном

В процессе обучения сохраняйте промежуточные прогнозы моделей. Визуализируйте облако точек и прогнозы модели в начале, середине и после окончания процесса обучения (не обязательно три, можно взять больше промежуточных вариантов).

Выведите график изменения значения функции потерь в процессе обучения. Логику расчета значения функции потерь на уровне эпохи реализуйте самостоятельно.

- [ ] Проверено на семинаре
import numpy as np
X = th.linspace(0, 1, 100).view(-1, 1)
y = th.sin(2 * th.pi * X) + 0.1 * th.rand(X.size())
class SineModel(nn.Module):
    def __init__(self, n_features: int, n_hidden: int, n_out: int) -> None:
        super(SineModel, self).__init__()
        self.fc1 = nn.Linear(in_features=n_features, out_features=n_hidden)
        self.fc2 = nn.Linear(in_features=n_hidden, out_features=n_out)
        self.relu = nn.ReLU()

    def forward(self, X: TensorType['batch', 'n_features']) -> TensorType['batch', 1]:
        out = self.fc1(X)
        out = self.relu(out)
        out = self.fc2(out)
        return out
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)

model = SineModel(n_features=1, n_hidden=100, n_out=1)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
def train_model(model, criterion, optimizer, loader, X_full, n_epochs=10000, log_epochs=(0, 500, 1000, 10000), print_every=1000):
    epoch_losses = []
    predictions_at_epochs = []

    device = next(model.parameters()).device
    for epoch in range(n_epochs+1):
        accum_losses = []
        for X_batch, y_batch in loader:
            model.train()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            accum_losses.append(loss.item())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = np.mean(accum_losses)
        epoch_losses.append(epoch_loss)

        if epoch in log_epochs:
            model.eval()
            with th.no_grad():
                y_pred_full = model(X_full)
                if device.type != 'cuda':
                    predictions_at_epochs.append((epoch, y_pred_full.detach().numpy().flatten()))

        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}')

    return epoch_losses, predictions_at_epochs
def moving_average(data, window_size):
    return np.convolve(data, np.ones(window_size) / window_size, mode='valid')

def plot_loss_curve(epoch_losses, window_size=50):
    smoothed_losses = moving_average(epoch_losses, window_size)
    plt.figure(figsize=(10, 5))
    plt.plot(smoothed_losses, label=f'Скользящее среднее (размер окна {window_size})', linewidth=2)
    plt.title('График изменения значения функции потерь (сглаженный)')
    plt.xlabel('Эпоха')
    plt.ylabel('Значение функции потерь')
    plt.grid(True)
    plt.legend()
    plt.show()

def plot_predictions(X, y, predictions_at_epochs):
    for idx, (epoch, y_pred) in enumerate(predictions_at_epochs):
        plt.figure(figsize=(10, 6))
        plt.scatter(X.numpy(), y.numpy(), label='Исходные данные', color='b')
        plt.scatter(X.numpy(), y_pred, label=f'Прогноз на эпохе {epoch}', color='r', s=60)
        plt.title(f'Прогноз модели на эпохе {epoch}')
        plt.xlabel('X')
        plt.ylabel('y')
        plt.legend()
        plt.grid(True)
        plt.show()

epoch_losses, predictions_at_epochs = train_model(model, criterion, optimizer, loader, X)
plot_loss_curve(epoch_losses, window_size=50)
plot_predictions(X, y, predictions_at_epochs)
# Markdown:
<p class='task' id='2'></p>

2\. Повторите решение задачи 1, изменив модель. Для создания модели создайте объект класса `nn.Sequential`.

Предлагаемая архитектура нейронной сети:
1. Полносвязный слой с 50 нейронами
2. Активация Tanh
3. Полносвязный слой с 1 нейроном

- [ ] Проверено на семинаре
X = th.linspace(0, 1, 100).view(-1, 1)
y = th.sin(2 * th.pi * X) + 0.1 * th.rand(X.size())
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)

model = nn.Sequential(
    nn.Linear(in_features=1, out_features=50),
    nn.Tanh(),
    nn.Linear(in_features=50, out_features=1),
)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
epoch_losses, predictions_at_epochs = train_model(model, criterion, optimizer, loader, X, log_epochs=(0, 1000, 5000, 10000))
plot_loss_curve(epoch_losses, window_size=50)
plot_predictions(X, y, predictions_at_epochs)
# Markdown:
<p class='task' id='3'></p>

3\. Используя реализацию полносвязного слоя из `torch.nn`, решите задачу регрессии. В качестве функции потерь используйте реализацию MSE из `torch.nn`. Для настройки весов реализуйте мини-пакетный градиентный спуск с использованием `torch.optim.SGD`. Перенесите вычисления на GPU и сравните время обучения с и без использования GPU. Решение должно корректно работать в случае отсутствия GPU без дополнительных изменений в коде.

- [ ] Проверено на семинаре
from sklearn.datasets import make_regression
import torch as th
import time
import numpy as np

X, y, coef = make_regression(
    n_samples=10000,
    n_features=10,
    n_informative=6,
    coef=True,
    bias=0.5,
    random_state=42
)
X = th.FloatTensor(X)
y = th.FloatTensor(y).reshape(-1, 1)
device = th.device('cpu')
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=128, shuffle=True)

model = nn.Sequential(
    nn.Linear(in_features=10, out_features=50),
    nn.Tanh(),
    nn.Linear(in_features=50, out_features=1),
).to(device)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
start_time = time.time()
epoch_losses, predictions_at_epochs = train_model(model, criterion, optimizer, loader, X, n_epochs=500, print_every=50)
end_time = time.time()
print(f'Время обучения на CPU: {end_time - start_time:.2f} секунд')
device = th.device('cuda' if th.cuda.is_available() else 'cpu')
print(f'Используемое устройство: {device}')
X = X.to(device)
y = y.to(device)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=128, shuffle=True)

model = nn.Sequential(
    nn.Linear(in_features=10, out_features=50),
    nn.Tanh(),
    nn.Linear(in_features=50, out_features=1),
).to(device)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
start_time = time.time()
epoch_losses, predictions_at_epochs = train_model(model, criterion, optimizer, loader, X, n_epochs=500, print_every=50)
end_time = time.time()
print(f'Время обучения на GPU: {end_time - start_time:.2f} секунд')
# Markdown:
<p class='task' id='4'></p>

4\. Повторите решение задач 1-2, используя для расчета значения функции потерь за эпоху метрику `MeanMetric` из пакета `torchmetrics`. Добавьте в цикл обучения расчет метрики $R^2$ (воспользуйтесь реализацией из `torchmetrics`). Выведите на экран график изменения значения функции потерь и метрики $R^2$ по эпохам в процессе обучения.
device = th.device('cpu')
import numpy as np
import torchmetrics as M

X = th.linspace(0, 1, 100).view(-1, 1)
y = th.sin(2 * th.pi * X) + 0.1 * th.rand(X.size())

class SineModel(nn.Module):
    def __init__(self, n_features: int, n_hidden: int, n_out: int) -> None:
        super(SineModel, self).__init__()
        self.fc1 = nn.Linear(in_features=n_features, out_features=n_hidden)
        self.fc2 = nn.Linear(in_features=n_hidden, out_features=n_out)
        self.relu = nn.ReLU()

    def forward(self, X: TensorType['batch', 'n_features']) -> TensorType['batch', 1]:
        out = self.fc1(X)
        out = self.relu(out)
        out = self.fc2(out)
        return out
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)

model = SineModel(n_features=1, n_hidden=100, n_out=1)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
def train_model_with_metrics(model, criterion, optimizer, loader, X_full, y_full, n_epochs=10000, print_every=1000):
    epoch_losses = []
    r2_scores = []

    loss_metric = M.MeanMetric()
    r2_metric = M.R2Score()
    for epoch in range(n_epochs+1):
        loss_metric.reset()  
        r2_metric.reset()
        
        for X_batch, y_batch in loader:
            model.train()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss_metric.update(loss)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = loss_metric.compute().item()
        epoch_losses.append(epoch_loss)

        model.eval()
        with th.no_grad():
            y_pred_full = model(X_full)
            r2_metric.update(y_pred_full, y_full)
            r2_score = r2_metric.compute().item()
            r2_scores.append(r2_score)

        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, R²: {r2_score:.4f}')

    return epoch_losses, r2_scores
def plot_r2_scores(r2_scores):
    smoothed_r2_scores = moving_average(r2_scores, window_size=50)
    epochs = range(len(smoothed_r2_scores))
    plt.figure(figsize=(8, 6))
    plt.plot(epochs, smoothed_r2_scores, label='Скользящее среднее (размер окна 50)', color='g', linewidth=2)
    plt.title('График изменения метрики R² по эпохам (сглаженный)')
    plt.xlabel('Эпоха')
    plt.ylabel('R²')
    plt.grid(True)
    plt.legend()
    plt.show()
epoch_losses, r2_scores = train_model_with_metrics(model, criterion, optimizer, loader, X, y)
plot_loss_curve(epoch_losses, window_size=50)
plot_r2_scores(r2_scores)
# Markdown:
<p class='task' id='5'></p>

5\. Повторите решение задач 1-2, изменив функцию потерь. Обучите модель, используя три функции потерь: `MSELoss`, `L1Loss` и `HuberLoss` - и выведите на одном графике динамику изменения метрики $R^2$ по эпохам для каждой модели в процессе обучения. Добавьте подписи полученных кривых.

- [ ] Проверено на семинаре
def plot_r2_comparison(r2_mse, r2_l1, r2_huber, window_size=50):
    r2_mse_smooth = moving_average(r2_mse, window_size)
    r2_l1_smooth = moving_average(r2_l1, window_size)
    r2_huber_smooth = moving_average(r2_huber, window_size)
    smooth_epochs = range(len(r2_mse_smooth))

    plt.figure(figsize=(10, 6))

    plt.plot(smooth_epochs, r2_mse_smooth, label='MSE Loss (сглаженный)', color='b', linewidth=2)
    plt.plot(smooth_epochs, r2_l1_smooth, label='L1 Loss (сглаженный)', color='g', linewidth=2)
    plt.plot(smooth_epochs, r2_huber_smooth, label='Huber Loss (сглаженный)', color='r', linewidth=2)

    plt.title('Динамика изменения метрики R² для разных функций потерь')
    plt.xlabel('Эпоха')
    plt.ylabel('R²')
    plt.grid(True)
    plt.legend()
    plt.show()

X = th.linspace(0, 1, 100).view(-1, 1)
y = th.sin(2 * th.pi * X) + 0.1 * th.rand(X.size())

class SineModel(nn.Module):
    def __init__(self, n_features: int, n_hidden: int, n_out: int) -> None:
        super(SineModel, self).__init__()
        self.fc1 = nn.Linear(in_features=n_features, out_features=n_hidden)
        self.fc2 = nn.Linear(in_features=n_hidden, out_features=n_out)
        self.relu = nn.ReLU()

    def forward(self, X: TensorType['batch', 'n_features']) -> TensorType['batch', 1]:
        out = self.fc1(X)
        out = self.relu(out)
        out = self.fc2(out)
        return out
    

dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)

model = SineModel(n_features=1, n_hidden=100, n_out=1)
optimizer = optim.SGD(model.parameters(), lr=0.005)



criterion_mse = nn.MSELoss()
_, r2_mse = train_model_with_metrics(model, criterion_mse, optimizer, loader, X, y)
criterion_l1 = nn.L1Loss()
model = SineModel(n_features=1, n_hidden=100, n_out=1)
optimizer = optim.SGD(model.parameters(), lr=0.005)

_, r2_l1 = train_model_with_metrics(model, criterion_l1, optimizer, loader, X, y)
criterion_huber = nn.HuberLoss()
model = SineModel(n_features=1, n_hidden=100, n_out=1)
optimizer = optim.SGD(model.parameters(), lr=0.005)

_, r2_huber = train_model_with_metrics(model, criterion_huber, optimizer, loader, X, y)
plot_r2_comparison(r2_mse, r2_l1, r2_huber)
# Markdown:
<p class='task' id='6'></p>

6\. Повторите решение задач 1-2, разделив датасет на обучающую и тестовую выборку в соотношении 80% на 20%. Обучите модель. Для тестовой выборки посчитайте и выведите на экран значения метрик:

- MAE;
- MAPE;
- MSE;
- MSLE (MeanSquaredLogError).


- [ ] Проверено на семинаре
from torch.utils.data import random_split
X = th.linspace(0, 1, 100).view(-1, 1)
y = th.sin(2 * th.pi * X) + 0.1 * th.rand(X.size())
class SineModel(nn.Module):
    def __init__(self, n_features: int, n_hidden: int, n_out: int) -> None:
        super(SineModel, self).__init__()
        self.fc1 = nn.Linear(in_features=n_features, out_features=n_hidden)
        self.fc2 = nn.Linear(in_features=n_hidden, out_features=n_out)
        self.relu = nn.ReLU()

    def forward(self, X: TensorType['batch', 'n_features']) -> TensorType['batch', 1]:
        out = self.fc1(X)
        out = self.relu(out)
        out = self.fc2(out)
        return out
dataset = TensorDataset(X, y)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

model = SineModel(n_features=1, n_hidden=100, n_out=1)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
X = th.linspace(0, 1, 100).view(-1, 1)
y = th.sin(2 * th.pi * X) + 0.1 * th.rand(X.size())

class SineModel(nn.Module):
    def __init__(self, n_features: int, n_hidden: int, n_out: int) -> None:
        super(SineModel, self).__init__()
        self.fc1 = nn.Linear(in_features=n_features, out_features=n_hidden)
        self.fc2 = nn.Linear(in_features=n_hidden, out_features=n_out)
        self.relu = nn.ReLU()

    def forward(self, X: TensorType['batch', 'n_features']) -> TensorType['batch', 1]:
        out = self.fc1(X)
        out = self.relu(out)
        out = self.fc2(out)
        return out
    

dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)

model = SineModel(n_features=1, n_hidden=100, n_out=1)
optimizer = optim.SGD(model.parameters(), lr=0.005)
def train_model_with_metrics_new(model, criterion, optimizer, loader, n_epochs=10000, print_every=1000):
    for epoch in range(n_epochs + 1):
        model.train()
        for X_batch, y_batch in loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {loss.item():.4f}')
train_model_with_metrics_new(model, criterion, optimizer, train_loader)
model.eval()
y_pred_list, y_test_list = [], []
with th.no_grad():
    for X_batch, y_batch in test_loader:
        y_pred = model(X_batch)
        y_pred_list.append(y_pred)
        y_test_list.append(y_batch)

y_pred_test = th.cat(y_pred_list)
y_test = th.cat(y_test_list)

mae = M.MeanAbsoluteError()(y_pred_test, y_test).item()
mape = M.MeanAbsolutePercentageError()(y_pred_test, y_test).item()
mse = M.MeanSquaredError()(y_pred_test, y_test).item()
msle = M.MeanSquaredLogError()(y_pred_test, y_test).item()

print(f'MAE: {mae:.4f}')
print(f'MAPE: {mape:.4f}')
print(f'MSE: {mse:.4f}')
print(f'MSLE: {msle:.4f}')


# 02_5_torch_nn_classification (1).ipynb
# Markdown:
#  Решение задачи классификации при помощи пакета `torch`.

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Deep Learning with PyTorch (2020) Авторы: Eli Stevens, Luca Antiga, Thomas Viehmann
* https://pytorch.org/docs/stable/nn.html
* https://pytorch.org/docs/stable/optim.html
* https://lightning.ai/docs/torchmetrics/stable/
* https://pytorch.org/docs/stable/generated/torch.no_grad.html
* https://www.learnpytorch.io/02_pytorch_classification/
* https://pytorch.org/docs/stable/data.html#torch.utils.data.WeightedRandomSampler
* https://towardsdatascience.com/demystifying-pytorchs-weightedrandomsampler-by-example-a68aceccb45
* https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
* https://medium.com/@zergtant/use-weighted-loss-function-to-solve-imbalanced-data-classification-problems-749237f38b7
* https://pytorch.org/docs/stable/generated/torch.nn.BCELoss.html#torch.nn.BCELoss
* https://pytorch.org/docs/stable/generated/torch.nn.BCEWithLogitsLoss.html#torch.nn.BCEWithLogitsLoss52
# Markdown:
## Задачи для совместного разбора
# from torchtyping import TensorType, patch_typeguard
# from typeguard import typechecked
import torch as th

# Scalar = TensorType[()]
# patch_typeguard()
# Markdown:
1\. Обсудите подходы к решению задачи классификации на примере синтетического датасета.
num_samples = 1000
num_features = 10
num_classes = 3

X = th.randn(num_samples, num_features)
y = th.randint(0, num_classes, (num_samples, ))
import torch.nn as nn

class Classifier(nn.Module):
  def __init__(self, n_inputs: int, n_classes: int) -> None:
    super().__init__()
    self.fc1 = nn.Linear(n_inputs, n_classes)

  def forward(self, X: th.Tensor) -> th.Tensor:
    return self.fc1(X)
model = Classifier(num_features, num_classes)
preds = model(X)
preds.shape
preds[:5]
preds.argmax(dim=1)[:5]
y[:5]
criterion = nn.CrossEntropyLoss()
loss = criterion(preds, y)
num_samples = 1000
num_features = 10
num_classes = 2

X = th.randn(num_samples, num_features)
y = th.randint(0, num_classes, (num_samples, ))

import torch.nn as nn

class Classifier(nn.Module):
  def __init__(self, n_inputs: int, n_classes: int) -> None:
    super().__init__()
    self.fc1 = nn.Linear(n_inputs, n_classes)

  def forward(self, X: th.Tensor) -> th.Tensor:
    return self.fc1(X)
model = Classifier(num_features, 1)
preds = model(X)
preds.shape
preds.sigmoid()[:5]
(preds.sigmoid() >= 0.5)[:5]
y[:5]
criterion = nn.BCEWithLogitsLoss()

criterion(preds.flatten(), y.float())
# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class="task" id="1"></p>

1\. Используя реализацию полносвязного слоя из `torch.nn`, решите задачу классификации. Разделите датасет на обучающую и тестовую выборку в соотношении 80% на 20%. В качестве функции потерь используйте реализацию `CrossEntropyLoss` из `torch.nn`. Для настройки весов реализуйте мини-пакетный градиентный спуск с использованием `torch.optim.SGD`.

Используйте модель, состоящую из двух слоев:
1. Полносвязный слой с 10 нейронами;
2. Полносвязный слой с 2 нейронами.

Выведите график изменения значения функции потерь в процессе обучения. Выведите на экран значения Accuracy, Precision, Recall и F1 для обучающего и тестового множества.

Выведите на экран облако точек с цветом, соответствующим предсказаниям модели на всем датасете (и обучающей, и тестовой части).

- [X] Проверено на семинаре

import numpy as np
import matplotlib.pyplot as plt
import torchmetrics as M
def train_model(model, criterion, optimizer, loader, n_epochs=1000, print_every=500):
    epoch_losses = []
    for epoch in range(n_epochs + 1):
        model.train()
        accum_losses = []
        for X_batch, y_batch in loader:
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            accum_losses.append(loss.item())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = np.mean(accum_losses)
        epoch_losses.append(epoch_loss)
        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {loss.item():.4f}')
    return epoch_losses
def moving_average(data, window_size):
    return np.convolve(data, np.ones(window_size) / window_size, mode='valid')

def plot_loss_curve(epoch_losses, window_size=50):
    smoothed_losses = moving_average(epoch_losses, window_size)
    plt.figure(figsize=(10, 5))
    plt.plot(smoothed_losses, label=f'Скользящее среднее (размер окна {window_size})', linewidth=2)
    plt.title('График изменения значения функции потерь (сглаженный)')
    plt.xlabel('Эпоха')
    plt.ylabel('Значение функции потерь')
    plt.grid(True)
    plt.legend()
    plt.show()
from sklearn.datasets import make_circles
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, random_split

X, y = make_circles(n_samples=1000, noise=0.05, random_state=42)
X = th.FloatTensor(X)
y = th.LongTensor(y)
model = nn.Sequential(
    nn.Linear(in_features=2, out_features=10),
    nn.Linear(in_features=10, out_features=2),
)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
epoch_losses = train_model(model, criterion, optimizer, train_loader)
plot_loss_curve(epoch_losses)
y_pred_train = model(train_dataset[:][0]).argmax(dim=1)
y_true_train = train_dataset[:][1]
print(f"Train Accuracy: {M.Accuracy(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Recall: {M.Precision(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train F1Score: {M.Recall(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Precision: {M.F1Score(task='binary')(y_pred_train, y_true_train).item():.4f}")
y_pred_test = model(test_dataset[:][0]).argmax(dim=1)
y_true_test = test_dataset[:][1]
print(f"Test Accuracy: {M.Accuracy(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Recall: {M.Precision(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test F1Score: {M.Recall(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Precision: {M.F1Score(task='binary')(y_pred_test, y_true_test).item():.4f}")
plt.scatter(dataset[:][0][:, 0], dataset[:][0][:, 1], c=model(dataset[:][0]).argmax(dim=1), cmap='bwr')
plt.xlabel('Признак 1')
plt.ylabel('Признак 2')
plt.title('Scatter Plot для данных')
plt.grid(True)
plt.show()
# Markdown:
<p class="task" id="2"></p>

2\. Повторите задачу 1, используя другую архитектуру нейронной сети.

1. Полносвязный слой с 10 нейронами;
2. Функция активации ReLU;
3. Полносвязный слой с 2 нейронами.

- [X] Проверено на семинаре
model = nn.Sequential(
    nn.Linear(in_features=2, out_features=10),
    nn.ReLU(),
    nn.Linear(in_features=10, out_features=2),
)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
epoch_losses = train_model(model, criterion, optimizer, train_loader)
plot_loss_curve(epoch_losses)
y_pred_train = model(train_dataset[:][0]).argmax(dim=1)
y_true_train = train_dataset[:][1]
print(f"Train Accuracy: {M.Accuracy(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Recall: {M.Precision(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train F1Score: {M.Recall(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Precision: {M.F1Score(task='binary')(y_pred_train, y_true_train).item():.4f}")
y_pred_test = model(test_dataset[:][0]).argmax(dim=1)
y_true_test = test_dataset[:][1]
print(f"Test Accuracy: {M.Accuracy(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Recall: {M.Precision(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test F1Score: {M.Recall(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Precision: {M.F1Score(task='binary')(y_pred_test, y_true_test).item():.4f}")
plt.scatter(dataset[:][0][:, 0], dataset[:][0][:, 1], c=model(dataset[:][0]).argmax(dim=1), cmap='bwr')
plt.xlabel('Признак 1')
plt.ylabel('Признак 2')
plt.title('Scatter Plot для данных')
plt.grid(True)
plt.show()
# Markdown:
<p class="task" id="3"></p>

3\. `CrossEntropyLoss` может быть использована для задачи классификации на любое количество классов. Для задачи бинарной классификации существуют специфические функции потерь. Решите задачу 2, используя `BCEWithLogitsLoss` в качестве функции потерь.

- [X] Проверено на семинаре
def train_model(model, criterion, optimizer, loader, n_epochs=1000, print_every=500):
    epoch_losses = []
    for epoch in range(n_epochs + 1):
        model.train()
        accum_losses = []
        for X_batch, y_batch in loader:
            y_batch = y_batch.unsqueeze(1).float()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            accum_losses.append(loss.item())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = np.mean(accum_losses)
        epoch_losses.append(epoch_loss)
        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {loss.item():.4f}')
    return epoch_losses
model = nn.Sequential(
    nn.Linear(in_features=2, out_features=10),
    nn.ReLU(),
    nn.Linear(in_features=10, out_features=1),
)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
epoch_losses = train_model(model, criterion, optimizer, train_loader)
plot_loss_curve(epoch_losses)
y_pred_train = (model(train_dataset[:][0]).sigmoid() > 0.5).squeeze(1).int()
y_true_train = train_dataset[:][1]
print(f"Train Accuracy: {M.Accuracy(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Recall: {M.Precision(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train F1Score: {M.Recall(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Precision: {M.F1Score(task='binary')(y_pred_train, y_true_train).item():.4f}")
y_pred_test = (model(test_dataset[:][0]).sigmoid() > 0.5).squeeze(1).int()
y_true_test = test_dataset[:][1]
print(f"Test Accuracy: {M.Accuracy(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Recall: {M.Precision(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test F1Score: {M.Recall(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Precision: {M.F1Score(task='binary')(y_pred_test, y_true_test).item():.4f}")
plt.scatter(dataset[:][0][:, 0], dataset[:][0][:, 1], c=(model(dataset[:][0]).sigmoid() > 0.5), cmap='bwr')
plt.xlabel('Признак 1')
plt.ylabel('Признак 2')
plt.title('Scatter Plot для данных')
plt.grid(True)
plt.show()
# Markdown:
<p class="task" id="4"></p>

4\. На практике часто задача классификации является несбалансированной. В файлах каталога `imb_task` содержится несбалансированный набор данных. Обучите модель без учета несбалансированности классов (аналогично предыдущим заданиям, можно использовать любую подходящую функцию потерь). Повысьте качество модели (в смысле F1) путем модификации функции потерь (указания специального аргумента, позволяющего учесть несбалансированность классов).

- [X] Проверено на семинаре
X = th.load('.\\imb_task\\imb_task\\imb_X.th').float()
y = th.load('.\\imb_task\\imb_task\\imb_y.th').long()
X.shape, y.shape
def train_model(model, criterion, optimizer, loader, n_epochs=1000, print_every=500):
    epoch_losses = []
    for epoch in range(n_epochs + 1):
        model.train()
        accum_losses = []
        for X_batch, y_batch in loader:
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            accum_losses.append(loss.item())

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = np.mean(accum_losses)
        epoch_losses.append(epoch_loss)
        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {loss.item():.4f}')
    return epoch_losses
model = nn.Sequential(
    nn.Linear(in_features=2, out_features=10),
    nn.ReLU(),
    nn.Linear(in_features=10, out_features=2),
)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
epoch_losses = train_model(model, criterion, optimizer, train_loader)
plot_loss_curve(epoch_losses)
y_pred_train = model(train_dataset[:][0]).argmax(dim=1)
y_true_train = train_dataset[:][1]
print(f"Train Accuracy: {M.Accuracy(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Recall: {M.Precision(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train F1Score: {M.Recall(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Precision: {M.F1Score(task='binary')(y_pred_train, y_true_train).item():.4f}")
y_pred_test = model(test_dataset[:][0]).argmax(dim=1)
y_true_test = test_dataset[:][1]
print(f"Test Accuracy: {M.Accuracy(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Recall: {M.Precision(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test F1Score: {M.Recall(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Precision: {M.F1Score(task='binary')(y_pred_test, y_true_test).item():.4f}")
plt.scatter(dataset[:][0][:, 0], dataset[:][0][:, 1], c=model(dataset[:][0]).argmax(dim=1), cmap='bwr')
plt.xlabel('Признак 1')
plt.ylabel('Признак 2')
plt.title('Scatter Plot для данных')
plt.grid(True)
plt.show()
model = nn.Sequential(
    nn.Linear(in_features=2, out_features=10),
    nn.ReLU(),
    nn.Linear(in_features=10, out_features=2),
)

class_counts = th.bincount(y)
class_weights = 1.0 / class_counts.float()
weights = class_weights / class_weights.sum() 
criterion = nn.CrossEntropyLoss(weight=weights)
optimizer = optim.SGD(model.parameters(), lr=0.005)
dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
epoch_losses = train_model(model, criterion, optimizer, train_loader)
plot_loss_curve(epoch_losses)
y_pred_train = model(train_dataset[:][0]).argmax(dim=1)
y_true_train = train_dataset[:][1]
print(f"Train Accuracy: {M.Accuracy(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Recall: {M.Precision(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train F1Score: {M.Recall(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Precision: {M.F1Score(task='binary')(y_pred_train, y_true_train).item():.4f}")
y_pred_test = model(test_dataset[:][0]).argmax(dim=1)
y_true_test = test_dataset[:][1]
print(f"Test Accuracy: {M.Accuracy(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Recall: {M.Precision(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test F1Score: {M.Recall(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Precision: {M.F1Score(task='binary')(y_pred_test, y_true_test).item():.4f}")
plt.scatter(dataset[:][0][:, 0], dataset[:][0][:, 1], c=model(dataset[:][0]).argmax(dim=1), cmap='bwr')
plt.xlabel('Признак 1')
plt.ylabel('Признак 2')
plt.title('Scatter Plot для данных')
plt.grid(True)
plt.show()
# Markdown:
<p class="task" id="5"></p>

5\. Повторите решение задачи 4, повысив качество модели за счет использования `WeightedRandomSampler` вместо модификации функции потерь.

- [X] Проверено на семинаре
from torch.utils.data import WeightedRandomSampler
model = nn.Sequential(
    nn.Linear(in_features=2, out_features=10),
    nn.ReLU(),
    nn.Linear(in_features=10, out_features=2),
)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)
train_labels = train_dataset[:][1]
class_counts = th.bincount(train_labels)
class_weights = 1.0 / class_counts.float()
train_sample_weights = class_weights[train_labels]
sampler = WeightedRandomSampler(weights=train_sample_weights, num_samples=len(train_sample_weights), replacement=True)

dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=64, sampler=sampler)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
epoch_losses = train_model(model, criterion, optimizer, train_loader)
plot_loss_curve(epoch_losses)
y_pred_train = model(train_dataset[:][0]).argmax(dim=1)
y_true_train = train_dataset[:][1]
print(f"Train Accuracy: {M.Accuracy(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Recall: {M.Precision(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train F1Score: {M.Recall(task='binary')(y_pred_train, y_true_train).item():.4f}")
print(f"Train Precision: {M.F1Score(task='binary')(y_pred_train, y_true_train).item():.4f}")
y_pred_test = model(test_dataset[:][0]).argmax(dim=1)
y_true_test = test_dataset[:][1]
print(f"Test Accuracy: {M.Accuracy(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Recall: {M.Precision(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test F1Score: {M.Recall(task='binary')(y_pred_test, y_true_test).item():.4f}")
print(f"Test Precision: {M.F1Score(task='binary')(y_pred_test, y_true_test).item():.4f}")
plt.scatter(dataset[:][0][:, 0], dataset[:][0][:, 1], c=model(dataset[:][0]).argmax(dim=1), cmap='bwr')
plt.xlabel('Признак 1')
plt.ylabel('Признак 2')
plt.title('Scatter Plot для данных')
plt.grid(True)
plt.show()


# 03_1_monitoring.ipynb
# Markdown:
#  Мониторинг процесса обучения

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Deep Learning with PyTorch (2020) Авторы: Eli Stevens, Luca Antiga, Thomas Viehmann
* https://pytorch.org/tutorials/recipes/recipes/tensorboard_with_pytorch.html
* https://docs.wandb.ai/quickstart
* https://docs.wandb.ai/guides/track/log/log-summary#docusaurus_skipToContent_fallback
* https://docs.wandb.ai/guides/track/log/log-models
* https://www.youtube.com/playlist?list=PLD80i8An1OEGajeVo15ohAQYF1Ttle0lk
# Markdown:
## Задачи для совместного разбора
# !pip install wandb
import wandb

wandb.login()
# Markdown:
1\. Рассмотрите возможности пакета `wandb` по отслеживанию числовых значений, визуализации изображений и таблиц.
import torch as th
def train(num_epochs: int):
  for x in range(num_epochs):
    x = th.tensor(x)
    loss = th.exp(-x/num_epochs)
    r2 = th.randn(size=(1,))
    wandb.log({"train/loss": loss, "train/r2": r2})
  wandb.run.summary["test/r2"] = 1
with wandb.init(
    project="seminar-tutorial",
    # name="run2",
    tags=["demo"],
    config={"num_epochs": 100}
):
  train(100)
import seaborn as sns

sns.get_dataset_names()
with wandb.init(
    project="seminar-tutorial",
    # name="run2",
    tags=["demo"],
    config={"num_epochs": 100}
):
  dataset = sns.load_dataset("penguins")
  img = sns.pairplot(dataset)
  wandb.log({"train/pairplot": wandb.Image(img.figure)})
# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class="task" id="1"></p>

1\. Решите задачу регрессии, используя для мониторинга процесса обучения `wandb`.

Разделите набор данных на обучающее и тестовое множество. В процессе обучения отслеживайте динамику изменения значения функции потерь и метрики $R^2$ по эпохам. После завершения обучения рассчитайте значение метрик MSE, RMSE, MAE и MAPE и сохраните в виде summary данного запуска.

Обучите не менее трех моделей (с разной архитектурой или гиперпараметрами), отследите все запуски при помощи `wandb` и вставьте в текстовую ячейку скриншоты, демонстрирующие интерфейс `wandb` с графиками обучения. Для каждого запуска приложите также скриншот с описанием гиперпараметров модели и summary (страница overview).

- [ ] Проверено на семинаре
import torch as th
from torch.utils.data import TensorDataset, DataLoader
import torchmetrics as M
from torch.utils.data import random_split
import torch.nn as nn
import torch.optim as optim
import wandb
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

wandb.login()
device = th.device('cuda' if th.cuda.is_available() else 'cpu')
print(f'Используемое устройство: {device}')
X = th.linspace(0, 1, 100).view(-1, 1).to(device)
y = th.sin(2 * th.pi * X) + 0.1 * th.rand(X.size()).to(device)

dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
def train_model_with_metrics(model, criterion, optimizer, train_loader, n_epochs=1000, print_every=100):
    epoch_losses = []
    r2_scores = []

    loss_metric = M.MeanMetric().to(device)
    r2_metric = M.R2Score().to(device)
    for epoch in range(n_epochs+1):
        loss_metric.reset()  
        r2_metric.reset()
        
        for X_batch, y_batch in train_loader:
            model.train()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss_metric.update(loss)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = loss_metric.compute().item()
        epoch_losses.append(epoch_loss)

        model.eval()
        with th.no_grad():
            for X_batch, y_batch in train_loader:
                y_pred_test = model(X_batch)
                r2_metric.update(y_pred_test, y_batch)
            r2_score = r2_metric.compute().item()
            r2_scores.append(r2_score)

        wandb.log({"epoch": epoch, "train loss": epoch_loss, "train R2": r2_score})
        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, R²: {r2_score:.4f}')

    return epoch_losses, r2_scores

def model_eval(model, test_loader):
    model.eval()
    y_pred_list, y_test_list = [], []
    with th.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            y_pred_list.append(y_pred)
            y_test_list.append(y_batch)

    y_pred_test = th.cat(y_pred_list)
    y_test = th.cat(y_test_list)

    mae = M.MeanAbsoluteError().to(device)(y_pred_test, y_test).item()
    mape = M.MeanAbsolutePercentageError().to(device)(y_pred_test, y_test).item()
    mse = M.MeanSquaredError().to(device)(y_pred_test, y_test).item()
    rmse = mse ** 0.5
    r2 = M.R2Score().to(device)(y_pred_test, y_test).item()

    wandb.summary["MSE"] = mse
    wandb.summary["RMSE"] = rmse
    wandb.summary["MAE"] = mae
    wandb.summary["MAPE"] = mape
    wandb.summary["R2"] = r2
class SineActivation(nn.Module):
    def forward(self, x):
        return th.sin(x)

models = {
    "Model_1": nn.Sequential(
        nn.Linear(X.shape[1], 16),
        nn.Tanh(),
        nn.Linear(16, 1),
    ).to(device),
    
    "Model_2": nn.Sequential(
        nn.Linear(X.shape[1], 32),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(32, 16),
        nn.ReLU(),
        nn.Linear(16, 1),
    ).to(device),

    "Model_3": nn.Sequential(
        nn.Linear(X.shape[1], 16),
        SineActivation(),
        nn.Linear(16, 16),
        nn.Tanh(),
        nn.Linear(16, 1),
    ).to(device),
}
for model_name, model in models.items():
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.005)
    
    with wandb.init(
        project="03_1_monitoring",
        group="task_1",
        name=model_name,
        config={
            "learning_rate": 0.005,
            "batch_size": 16,
            "epochs": 1000,
            "model": model_name
        }
    ):

        epoch_losses, r2_scores = train_model_with_metrics(
            model, 
            criterion, 
            optimizer, 
            train_loader,
            n_epochs=1000
        )
        model_eval(model, test_loader)
# Markdown:
![image-2.png](attachment:image-2.png)
# Markdown:
![image.png](attachment:image.png)![image-3.png](attachment:image-3.png) ![image-5.png](attachment:image-5.png) \
![image-2.png](attachment:image-2.png) ![image-4.png](attachment:image-4.png) ![image-6.png](attachment:image-6.png)
# Markdown:
<p class="task" id="2"></p>

2\. Решите задачу классификации, используя для мониторинга процесса обучения `wandb`.

Разделите набор данных на обучающее и тестовое множество. В процессе обучения отслеживайте динамику изменения значения функции потерь и метрики `Accuracy` по эпохам. После завершения обучения рассчитайте значение метрик Accuracy, Precision, Recall и F1 и сохраните в виде summary данного запуска.

Отследите все запуски при помощи `wandb` и вставьте в текстовую ячейку скриншоты, демонстрирующие интерфейс `wandb` с графиками обучения. Для каждого запуска приложите также скриншот с описанием гиперпараметров модели и summary (страница overview).


- [ ] Проверено на семинаре
def train_model_with_metrics(model, criterion, optimizer, train_loader, n_epochs=1000, print_every=100):
    epoch_losses = []
    accuracy_scores = []

    loss_metric = M.MeanMetric().to(device)
    accuracy_metric = M.Accuracy('binary').to(device)
    for epoch in range(n_epochs+1):
        loss_metric.reset()  
        accuracy_metric.reset()
        
        for X_batch, y_batch in train_loader:
            model.train()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss_metric.update(loss)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = loss_metric.compute().item()
        epoch_losses.append(epoch_loss)

        model.eval()
        with th.no_grad():
            for X_batch, y_batch in train_loader:
                y_pred_test = model(X_batch)
                accuracy_metric.update(y_pred_test.argmax(dim=1), y_batch)
            accuracy = accuracy_metric.compute().item()
            accuracy_scores.append(accuracy)

        wandb.log({"epoch": epoch, "train loss": epoch_loss, "train accuracy": accuracy})
        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, accuracy: {accuracy:.4f}')

    return epoch_losses, accuracy

def model_eval(model, test_loader):
    model.eval()
    y_pred_list, y_test_list = [], []
    with th.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            y_pred_list.append(y_pred)
            y_test_list.append(y_batch)

    y_pred_test = th.cat(y_pred_list).argmax(dim=1)
    y_test = th.cat(y_test_list)

    accuracy = M.Accuracy('binary').to(device)(y_pred_test, y_test).item()
    precision = M.Precision('binary').to(device)(y_pred_test, y_test).item()
    recall = M.Recall('binary').to(device)(y_pred_test, y_test).item()
    f1_score = M.F1Score('binary').to(device)(y_pred_test, y_test).item()

    wandb.summary["Accuracy"] = accuracy
    wandb.summary["Precision"] = precision
    wandb.summary["Recall"] = recall
    wandb.summary["F1Score"] = f1_score
from sklearn.datasets import make_circles

X, y = make_circles(n_samples=1000, noise=0.05, random_state=42)
X = th.FloatTensor(X).to(device)
y = th.LongTensor(y).to(device)

dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
models = {
    "Model_1": nn.Sequential(
        nn.Linear(X.shape[1], 16),
        nn.Tanh(),
        nn.Linear(16, 2),
    ).to(device),
    
    "Model_2": nn.Sequential(
        nn.Linear(X.shape[1], 32),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(32, 16),
        nn.ReLU(),
        nn.Linear(16, 2),
    ).to(device),

    "Model_3": nn.Sequential(
        nn.Linear(X.shape[1], 16),
        nn.Tanh(),
        nn.Linear(16, 16),
        nn.Tanh(),
        nn.Linear(16, 2),
    ).to(device),
}
for model_name, model in models.items():
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.005)
    
    with wandb.init(
        project="03_1_monitoring",
        group="task_2",
        name=model_name,
        config={
            "learning_rate": 0.005,
            "batch_size": 16,
            "epochs": 500,
            "model": model_name
        }
    ):

        epoch_losses, accuracy = train_model_with_metrics(
            model, 
            criterion, 
            optimizer, 
            train_loader,
            n_epochs=500
        )
        model_eval(model, test_loader)
# Markdown:
![image.png](attachment:image.png)
# Markdown:
![image.png](attachment:image.png) ![image-3.png](attachment:image-3.png) ![image-5.png](attachment:image-5.png)

![image-2.png](attachment:image-2.png) ![image-4.png](attachment:image-4.png) ![image-6.png](attachment:image-6.png)
# Markdown:
<p class="task" id="3"></p>

3\. Повторите задачу 2, вычислив и визуализировав матрицу несоответствий (для обучающей и тестовой выборки) тремя способами при помощи `wandb`:
* используя `torchmetrics` и представив данные в виде объекта `wandb.Table`;
* используя готовую функцию `wandb.plot.confusion_matrix`;
* построив тепловую карту при помощи `seaborn` и представив данные в виде объекта `wandb.Image`.

Вставьте в текстовую ячейку скриншоты, демонстрирующие интерфейс `wandb` со всеми нужными визуализациями.


- [ ] Проверено на семинаре
def compute_and_log_confusion_matrix(y_pred, y_true, name_prefix="train"):
    conf_matrix = M.ConfusionMatrix(task="binary", num_classes=2).to(device)(y_pred, y_true).cpu().numpy()
    table_data = [[int(conf_matrix[i, j]) for j in range(conf_matrix.shape[1])] for i in range(conf_matrix.shape[0])]
    columns = [f"Pred_{i}" for i in range(conf_matrix.shape[1])]
    table = wandb.Table(data=table_data, columns=columns)
    wandb.log({f"{name_prefix}_confusion_matrix_table": table})

    wandb.log({f"{name_prefix}_confusion_matrix": wandb.plot.confusion_matrix(
        preds=y_pred.cpu().numpy(),
        y_true=y_true.cpu().numpy(),
        class_names=[str(i) for i in range(2)]
    )})

    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=columns, yticklabels=columns)
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.title(f"{name_prefix.capitalize()} Confusion Matrix")
    wandb.log({f"{name_prefix}_confusion_matrix_heatmap": wandb.Image(plt)})
    plt.close()

def train_model_with_metrics(model, criterion, optimizer, train_loader, n_epochs=1000, print_every=100):
    epoch_losses = []
    accuracy_scores = []

    loss_metric = M.MeanMetric().to(device)
    accuracy_metric = M.Accuracy('binary').to(device)
    for epoch in range(n_epochs+1):
        loss_metric.reset()  
        accuracy_metric.reset()
        
        for X_batch, y_batch in train_loader:
            model.train()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss_metric.update(loss)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = loss_metric.compute().item()
        epoch_losses.append(epoch_loss)

        model.eval()
        with th.no_grad():
            y_pred_train, y_train = [], []
            for X_batch, y_batch in train_loader:
                y_pred_batch = model(X_batch)
                y_pred_train.append(y_pred_batch)
                y_train.append(y_batch)
                accuracy_metric.update(y_pred_batch.argmax(dim=1), y_batch)

            y_pred_train = th.cat(y_pred_train).argmax(dim=1)
            y_train = th.cat(y_train)
            accuracy = accuracy_metric.compute().item()
            accuracy_scores.append(accuracy)

            compute_and_log_confusion_matrix(y_pred_train, y_train, name_prefix="train")

        wandb.log({"epoch": epoch, "train loss": epoch_loss, "train accuracy": accuracy})
        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, accuracy: {accuracy:.4f}')

    return epoch_losses, accuracy

def model_eval(model, test_loader):
    model.eval()
    y_pred_list, y_test_list = [], []
    with th.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            y_pred_list.append(y_pred)
            y_test_list.append(y_batch)

    y_pred_test = th.cat(y_pred_list).argmax(dim=1)
    y_test = th.cat(y_test_list)

    compute_and_log_confusion_matrix(y_pred_test, y_test, name_prefix="test")

    accuracy = M.Accuracy('binary').to(device)(y_pred_test, y_test).item()
    precision = M.Precision('binary').to(device)(y_pred_test, y_test).item()
    recall = M.Recall('binary').to(device)(y_pred_test, y_test).item()
    f1_score = M.F1Score('binary').to(device)(y_pred_test, y_test).item()

    wandb.summary["Accuracy"] = accuracy
    wandb.summary["Precision"] = precision
    wandb.summary["Recall"] = recall
    wandb.summary["F1Score"] = f1_score
for model_name, model in models.items():
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.005)
    
    with wandb.init(
        project="03_1_monitoring",
        group="task_3",
        name=model_name,
        config={
            "learning_rate": 0.005,
            "batch_size": 16,
            "epochs": 50,
            "model": model_name
        }
    ):

        epoch_losses, accuracy = train_model_with_metrics(
            model, 
            criterion, 
            optimizer, 
            train_loader,
            n_epochs=50,
            print_every=10
        )
        model_eval(model, test_loader)
# Markdown:
![image.png](attachment:image.png)\
![image-2.png](attachment:image-2.png)\
![image-5.png](attachment:image-5.png) \
![image-6.png](attachment:image-6.png) \
![image-3.png](attachment:image-3.png) \
![image-4.png](attachment:image-4.png)
# Markdown:
<p class="task" id="4"></p>

4\. Повторите задачу 2, обучив две модели: линейную и нелинейную. Для каждой из моделей сделайте прогноз (по всей выборке) и визуализируйте облако точек в виде `wandb.Image` (раскрасьте точки в цвета, соответствующие прогнозам модели).

Вставьте в текстовую ячейку скриншоты, демонстрирующие интерфейс `wandb` со всеми нужными визуализациями.


- [ ] Проверено на семинаре
def train_model_with_metrics(model, criterion, optimizer, train_loader, n_epochs=1000, print_every=100):
    epoch_losses = []
    accuracy_scores = []

    loss_metric = M.MeanMetric().to(device)
    accuracy_metric = M.Accuracy('binary').to(device)
    for epoch in range(n_epochs+1):
        loss_metric.reset()  
        accuracy_metric.reset()
        
        for X_batch, y_batch in train_loader:
            model.train()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss_metric.update(loss)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = loss_metric.compute().item()
        epoch_losses.append(epoch_loss)

        model.eval()
        with th.no_grad():
            for X_batch, y_batch in train_loader:
                y_pred_test = model(X_batch)
                accuracy_metric.update(y_pred_test.argmax(dim=1), y_batch)
            accuracy = accuracy_metric.compute().item()
            accuracy_scores.append(accuracy)

        wandb.log({"epoch": epoch, "train loss": epoch_loss, "train accuracy": accuracy})
        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, accuracy: {accuracy:.4f}')

    return epoch_losses, accuracy

def model_eval(model, test_loader):
    model.eval()
    y_pred_list, y_test_list = [], []
    with th.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            y_pred_list.append(y_pred)
            y_test_list.append(y_batch)

    y_pred_test = th.cat(y_pred_list).argmax(dim=1)
    y_test = th.cat(y_test_list)

    accuracy = M.Accuracy('binary').to(device)(y_pred_test, y_test).item()
    precision = M.Precision('binary').to(device)(y_pred_test, y_test).item()
    recall = M.Recall('binary').to(device)(y_pred_test, y_test).item()
    f1_score = M.F1Score('binary').to(device)(y_pred_test, y_test).item()

    wandb.summary["Accuracy"] = accuracy
    wandb.summary["Precision"] = precision
    wandb.summary["Recall"] = recall
    wandb.summary["F1Score"] = f1_score
    
def visualize_predictions(model, X, y, model_name, labels):
    model.eval()
    with th.no_grad():
        X = X.to(device)
        y_pred = model(X).argmax(dim=1).cpu().numpy()
    y = y.cpu().numpy()

    plt.figure(figsize=(8, 6))
    scatter = plt.scatter(X[:, 0].cpu(), X[:, 1].cpu(), c=y_pred, cmap='viridis', alpha=0.7)
    plt.title(f'Предсказания модели: {model_name}')
    plt.xlabel('Признак 1')
    plt.ylabel('Признак 2')
    plt.legend(handles=scatter.legend_elements()[0], labels=labels)
    plt.grid(True)

    image = wandb.Image(plt, caption=f'Предсказания модели: {model_name}')
    wandb.log({f"{model_name} Predictions": image})
    plt.close()
from sklearn.datasets import make_circles

X, y = make_circles(n_samples=1000, noise=0.05, random_state=42)
X = th.FloatTensor(X).to(device)
y = th.LongTensor(y).to(device)

dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
models = {
    "Linear_Model": nn.Sequential(
        nn.Linear(X.shape[1], 16),
        nn.Linear(16, 2),
    ).to(device),
    
    "Nonlinear_Model": nn.Sequential(
        nn.Linear(X.shape[1], 16),
        nn.Tanh(),
        nn.Linear(16, 2),
    ).to(device),
}
labels = ["Класс 0", "Класс 1"]

for model_name, model in models.items():
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.005)
    
    with wandb.init(
        project="03_1_monitoring",
        group="task_4",
        name=model_name,
        config={
            "learning_rate": 0.005,
            "batch_size": 16,
            "epochs": 500,
            "model": model_name
        }
    ):

        epoch_losses, accuracy = train_model_with_metrics(
            model, 
            criterion, 
            optimizer, 
            train_loader,
            n_epochs=500
        )
        model_eval(model, test_loader)
        visualize_predictions(model, X, y, model_name, labels)
# Markdown:
![image-3.png](attachment:image-3.png)\
![image.png](attachment:image.png) \
![image-2.png](attachment:image-2.png)
# Markdown:
<p class="task" id="5"></p>

5\. Повторите задачу 2, реализовав логику ранней остановки. Для этого разделите данные на три части: обучающую, валидационную и тестовую. Остановите процесс обучения, если целевая метрика (F1) на валидации не увеличивалась в течении последних $k$ ($k$ - гиперпараметр метода) эпох. В момент остановки выведите сообщение с текущим номером эпохи. Сохраните номер эпохи, на которой процесс обучения был прерван, в виде summary данного запуска.

Помимо отслеживания метрик на обучающей и тестовой выборке, также отслеживайте метрики на валидационной выборке в процессе обучения.

Постройте таблицу `wandb.Table`, в которой содержится информация о:
* признаках объекта;
* правильном ответе;
* прогнозе модели;
* принадлежности к обучающему, валидационному или тестовому множеству.

Визуализируйте данную таблицу при помощи `wandb`.

Вставьте в текстовую ячейку скриншоты, демонстрирующие интерфейс `wandb` со всеми нужными визуализациями.

- [ ] Проверено на семинаре

X, y = make_circles(n_samples=1000, noise=0.05, random_state=42)
X = th.FloatTensor(X)
y = th.LongTensor(y)

dataset = TensorDataset(X, y)
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
def train_model_with_metrics(model, criterion, optimizer, train_loader, val_loader, n_epochs=1000, print_every=100, patience=10):
    epoch_losses = []
    train_accuracy_scores = []
    val_accuracy_scores = []
    val_f1_scores = []
    
    loss_metric = M.MeanMetric().to(device)
    train_accuracy_metric = M.Accuracy(task='binary').to(device)
    val_accuracy_metric = M.Accuracy(task='binary').to(device)
    val_f1_metric = M.F1Score('binary').to(device)
    
    best_val_f1 = 0.0
    epochs_no_improve = 0
    early_stop_epoch = None
    
    for epoch in range(n_epochs + 1):
        model.train()
        loss_metric.reset()
        train_accuracy_metric.reset()
        
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()
            
            loss_metric.update(loss)
            train_accuracy_metric.update(y_pred.argmax(dim=1), y_batch)
        
        epoch_loss = loss_metric.compute().item()
        epoch_losses.append(epoch_loss)
        train_accuracy = train_accuracy_metric.compute().item()
        train_accuracy_scores.append(train_accuracy)
        
        model.eval()
        val_accuracy_metric.reset()
        val_f1_metric.reset()
        
        with th.no_grad():
            for X_val, y_val in val_loader:
                X_val, y_val = X_val.to(device), y_val.to(device)
                y_pred_val = model(X_val)
                val_accuracy_metric.update(y_pred_val.argmax(dim=1), y_val)
                val_f1_metric.update(y_pred_val.argmax(dim=1), y_val)
        
        val_accuracy = val_accuracy_metric.compute().item()
        val_f1 = val_f1_metric.compute().item()
        val_accuracy_scores.append(val_accuracy)
        val_f1_scores.append(val_f1)
        
        wandb.log({
            "epoch": epoch,
            "train loss": epoch_loss,
            "train accuracy": train_accuracy,
            "val accuracy": val_accuracy,
            "val F1 Score": val_f1
        })
        
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
        
        if epochs_no_improve >= patience:
            print(f"Ранняя остановка на эпохе {epoch}")
            wandb.summary["Early Stopped Epoch"] = epoch
            early_stop_epoch = epoch
            break
        
        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}, Val F1: {val_f1:.4f}')
    
    return epoch_losses, train_accuracy_scores, val_accuracy_scores, val_f1_scores, early_stop_epoch

def model_eval(model, test_loader):
    model.eval()
    y_pred_list, y_test_list = [], []
    with th.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            y_pred_list.append(y_pred)
            y_test_list.append(y_batch)
    
    y_pred_test = th.cat(y_pred_list).argmax(dim=1)
    y_test = th.cat(y_test_list)
    
    accuracy = M.Accuracy(task='binary').to(device)(y_pred_test, y_test).item()
    precision = M.Precision('binary').to(device)(y_pred_test, y_test).item()
    recall = M.Recall('binary').to(device)(y_pred_test, y_test).item()
    f1_score = M.F1Score('binary').to(device)(y_pred_test, y_test).item()
    
    wandb.summary["Test Accuracy"] = accuracy
    wandb.summary["Test Precision"] = precision
    wandb.summary["Test Recall"] = recall
    wandb.summary["Test F1Score"] = f1_score
    
    return y_test.cpu().numpy(), y_pred_test.cpu().numpy()

def log_predictions_table(train_dataset, val_dataset, test_dataset, model):
    all_features = th.cat([train_dataset.dataset[i][0].unsqueeze(0) for i in train_dataset.indices] +
                          [val_dataset.dataset[i][0].unsqueeze(0) for i in val_dataset.indices] +
                          [test_dataset.dataset[i][0].unsqueeze(0) for i in test_dataset.indices], dim=0).cpu().numpy()
    
    all_labels = th.cat([train_dataset.dataset[i][1].unsqueeze(0) for i in train_dataset.indices] +
                        [val_dataset.dataset[i][1].unsqueeze(0) for i in val_dataset.indices] +
                        [test_dataset.dataset[i][1].unsqueeze(0) for i in test_dataset.indices], dim=0).cpu().numpy()
    
    all_predictions = []
    model.eval()
    with th.no_grad():
        for X_batch, _ in DataLoader(train_dataset.dataset, batch_size=16, shuffle=False):
            X_batch = X_batch.to(device)
            y_pred_batch = model(X_batch).argmax(dim=1)
            all_predictions.extend(y_pred_batch.cpu().numpy())
    
    split = ['train'] * len(train_dataset) + ['val'] * len(val_dataset) + ['test'] * len(test_dataset)
    
    df = pd.DataFrame(all_features, columns=[f"feature_{i}" for i in range(all_features.shape[1])])
    df['True Label'] = all_labels
    df['Predicted Label'] = all_predictions
    df['Split'] = split
    
    table = wandb.Table(dataframe=df)
    wandb.log({"Data Table": table})

models = {
    "Model_1": nn.Sequential(
        nn.Linear(X.shape[1], 16),
        nn.Tanh(),
        nn.Linear(16, 2),
    ).to(device),
    
    "Model_2": nn.Sequential(
        nn.Linear(X.shape[1], 32),
        nn.ReLU(),
        nn.Dropout(0.2),
        nn.Linear(32, 16),
        nn.ReLU(),
        nn.Linear(16, 2),
    ).to(device),

    "Model_3": nn.Sequential(
        nn.Linear(X.shape[1], 16),
        nn.Tanh(),
        nn.Linear(16, 16),
        nn.Tanh(),
        nn.Linear(16, 2),
    ).to(device),
}
for model_name, model in models.items():
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.005)
    
    with wandb.init(
        project="03_1_monitoring",
        group="task_5",
        name=model_name,
        config={
            "learning_rate": 0.005,
            "batch_size": 16,
            "epochs": 10000,
            "patience": 100,
            "model": model_name
        }
    ):
        config = wandb.config
        epoch_losses, train_acc, val_acc, val_f1, stopped_epoch = train_model_with_metrics(
            model, 
            criterion, 
            optimizer, 
            train_loader,
            val_loader,
            n_epochs=config.epochs,
            print_every=100,
            patience=config.patience
        )
        
        y_test, y_pred = model_eval(model, test_loader)
        
        log_predictions_table(train_dataset, val_dataset, test_dataset, model)
# Markdown:
![image.png](attachment:image.png)
# Markdown:
![image.png](attachment:image.png) ![image-2.png](attachment:image-2.png)


# 03_2_init_optim_dropout_batchnorm.ipynb
# Markdown:
#  Инициализация весов нейронных сетей. Способы регуляризации нейронных сетей. Продвинутые алгоритмы градиентного спуска.

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Deep Learning with PyTorch (2020) Авторы: Eli Stevens, Luca Antiga, Thomas Viehmann
* https://pytorch.org/docs/stable/nn.init.html
* https://adityassrana.github.io/blog/theory/2020/08/26/Weight-Init.html
* https://machinelearningmastery.com/dropout-for-regularizing-deep-neural-networks/
* https://machinelearningmastery.com/batch-normalization-for-training-of-deep-neural-networks/
* https://pytorch.org/docs/stable/optim.html
* https://seaborn.pydata.org/examples/errorband_lineplots.html
# Markdown:
## Задачи для совместного разбора
# Markdown:
1\. Инициализируйте веса полносвязного слоя единицами, а смещения - нулями.
import torch as th
import torch.nn as nn
fc = nn.Linear(in_features=5, out_features=3)
fc.weight
nn.init.ones_(fc.weight)
fc.weight
# Markdown:
2\. Изучите, как работает слой `nn.Dropout` в режиме обучения модели и в режиме использования модели.
model = nn.Sequential(
    # ...
    nn.Dropout(p=0.5)
)
X = th.randn(1, 5)
X
model(X) # scale: 1/(1-p)
model.eval()
model(X)
model.train()
# Markdown:
3\. Изучите, как работает слой `nn.BatchNorm1d` в режиме обучения модели и в режиме использования модели.
X = th.randn(100, 5)
X
X.mean(dim=0), X.std(dim=0), X.var(dim=0)
bn = nn.BatchNorm1d(num_features=5)
y = bn(X)
y.mean(dim=0), y.std(dim=0), y.var(dim=0)
bn.weight, bn.bias
bn.running_mean
bn.running_var
model.eval()
y = bn(X)
y.mean(dim=0), y.std(dim=0), y.var(dim=0)
# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class='task' id='1'></p>

1\. Расширьте класс `torch.nn.Linear`, описав класс `InitializedLinear` и добавив возможность инициализировать веса слоя при помощи функций из пакета `torch.nn.init` (инициализацию bias оставьте по умолчанию). Обратите внимание, что данные функции имеют дополнительные параметры. Данные параметры должны передаваться в момент создания объекта класса `InitializedLinear`.

Пример создания слоя:
```
InitializedLinear(n_features, n_hidden, init_f=nn.init.uniform_, init_args={'a': 0.0, 'b': 1.0})
```

- [x] Проверено на семинаре
import torch as th
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import torchmetrics as M
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
class InitializedLinear(nn.Linear):
    def __init__(self, n_features, n_hidden, bias=True, init_f=nn.init.uniform_, init_args=None):
        super(InitializedLinear, self).__init__(n_features, n_hidden, bias)
        
        if init_args is None:
            init_args = {}
        
        init_f(self.weight, **init_args)

layer = InitializedLinear(10, 5, init_f=nn.init.uniform_, init_args={'a': 0.0, 'b': 1.0})
layer.weight
layer = InitializedLinear(10, 5, init_f=nn.init.xavier_uniform_)
layer.weight
# Markdown:
<p class='task' id='2'></p>

2\. Решите задачу регрессии несколько раз, изменяя способ инициализации весов. Рассмотрите следующие варианты:
* `nn.init.uniform_`
* `nn.init.normal_`
* `nn.init.constant_`
* `nn.xavier_uniform_`
* `nn.kaiming_uniform_`
* `nn.xavier_normal_`
* `nn.kaiming_normal_`

Визуализируйте график изменения значений MSE с ходом эпох. Дайте кривым, соответствующие разным способам инициализации, различные цвета и добавьте подписи. Обратите внимание, что от запуска к запуску результаты могут отличаться. Чтобы решить эту проблему, обучайте каждую модель несколько раз и визуализируйте доверительный интервал (можно воспользоваться `seaborn.lineplot`).



- [ ] Проверено на семинаре
device = th.device('cuda' if th.cuda.is_available() else 'cpu')
print(f'Используемое устройство: {device}')
X = th.linspace(0, 1, 100).view(-1, 1).to(device)
y = th.sin(2 * th.pi * X) + 0.1 * th.rand(X.size()).to(device)

dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)
initialization_functions = [(nn.init.uniform_, {'a': 0.0, 'b': 1.0}, 'uniform_'),
               (nn.init.normal_, {'mean': 0.0, 'std': 1.0}, 'normal_'),
               (nn.init.constant_, {'val': 1.0}, 'constant_'),
               (nn.init.xavier_uniform_, {'gain': 1.0}, 'xavier_uniform_'),
               (nn.init.kaiming_uniform_, {'a': 0.0, 'mode': 'fan_in', 'nonlinearity': 'leaky_relu'}, 'kaiming_uniform_'),
               (nn.init.xavier_normal_, {'gain': 1.0}, 'xavier_normal_'),
               (nn.init.kaiming_normal_, {'a': 0.0, 'mode': 'fan_in', 'nonlinearity': 'leaky_relu'}, 'kaiming_normal_')]

models = [
    nn.Sequential(
    InitializedLinear(n_features=1, n_hidden=100, init_f=init[0], init_args=init[1]),
    nn.ReLU(),
    InitializedLinear(n_features=100, n_hidden=1, init_f=init[0], init_args=init[1])
).to(device)
    for init in initialization_functions]

criterion = nn.MSELoss().to(device)
optimizer = optim.AdamW(models[0].parameters(), lr=0.005)
def train_model(model, criterion, optimizer, train_loader, n_epochs=1000):
    epoch_losses = []
    loss_metric = M.MeanMetric().to(device)
    
    for epoch in range(n_epochs+1):
        loss_metric.reset()  
        
        for X_batch, y_batch in train_loader:
            model.train()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss_metric.update(loss)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = loss_metric.compute().item()
        epoch_losses.append(epoch_loss)

    return epoch_losses
results = []
for initialization in initialization_functions:

    for run in range(5):
        model = nn.Sequential(
                    InitializedLinear(n_features=1, n_hidden=100, init_f=initialization[0], init_args=initialization[1]),
                    nn.Tanh(),
                    InitializedLinear(n_features=100, n_hidden=1, init_f=initialization[0], init_args=initialization[1])
                ).to(device)
        criterion = nn.MSELoss().to(device)
        optimizer = optim.AdamW(model[0].parameters(), lr=0.005)
        epoch_losses = train_model(model, criterion, optimizer, loader, n_epochs=100)
        for epoch, loss in enumerate(epoch_losses):
                    results.append({
                        'Initialization': str(initialization[2]),
                        'Epoch': epoch,
                        'Loss': loss,
                        'Run': run
                    })
results_df = pd.DataFrame(results)

plt.figure(figsize=(14, 7))
ax = plt.gca()
ax.set_yscale('log')
sns.lineplot(
    data=results_df,
    x='Epoch',
    y='Loss',
    hue='Initialization',
    errorbar='sd',
    linewidth=2
)

plt.title('График изменения MSE с разными инициализациями весов')
plt.xlabel('Эпоха')
plt.ylabel('Среднеквадратичная ошибка (MSE)')
plt.legend(title='Инициализация')
plt.tight_layout()
plt.show()
# Markdown:
<p class='task' id='3'></p>

3\. Исследуйте, как добавление дропаута влияет на процесс обучения модели. Решите задачу регрессии несколько раз, изменяя значения вероятности дропаута $p$ от 0 до 0.8. В качестве модели рассмотрите нейронную сеть с одним скрытым слоем.

Визуализируйте график изменения значений $R^2$ в зависимости от вероятности дропаута $p$ на обучающей и тестовой выборке. Визуализируйте на отдельном графике зависимости разности между $R^2$ на обучающей выборки и $R^2$ на тестовой выборке.



- [ ] Проверено на семинаре
def train_model_with_metrics(model, criterion, optimizer, train_loader, n_epochs=1000, print_every=100):
    epoch_losses = []
    r2_scores = []

    loss_metric = M.MeanMetric().to(device)
    r2_metric = M.R2Score().to(device)
    
    for epoch in range(n_epochs + 1):
        loss_metric.reset()
        r2_metric.reset()

        model.train()
        for X_batch, y_batch in train_loader:
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)

            loss_metric.update(loss)
            r2_metric.update(y_pred, y_batch)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = loss_metric.compute().item()
        r2_score = r2_metric.compute().item()
        epoch_losses.append(epoch_loss)
        r2_scores.append(r2_score)

        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, R²: {r2_score:.4f}')

    return epoch_losses, r2_scores

def model_eval(model, test_loader):
    model.eval()
    y_pred_list, y_test_list = [], []
    with th.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            y_pred_list.append(y_pred)
            y_test_list.append(y_batch)

    y_pred_test = th.cat(y_pred_list)
    y_test = th.cat(y_test_list)
    
    r2 = M.R2Score().to(device)(y_pred_test, y_test).item()
    return r2

from sklearn.datasets import make_regression
import torch as th
from torch.utils.data import random_split

th.manual_seed(42)
X, y, coef = make_regression(
    n_samples=100,
    n_features=50,
    n_informative=20,
    noise=2,
    coef=True,
    random_state=42,

)
X = th.FloatTensor(X).to(device)
y = th.FloatTensor(y).reshape(-1, 1).to(device)
dataset = TensorDataset(X, y)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
models = [
    nn.Sequential(
        nn.Linear(X.shape[1], 32),
        nn.ReLU(),
        nn.Dropout(p),
        nn.Linear(32, 1)
    ).to(device)
    for p in np.arange(0, 0.9, 0.1)]
r2_train = []
r2_test = []
for model in models:
    criterion = nn.MSELoss().to(device)
    optimizer = optim.AdamW(model.parameters(), lr=0.005)

    epoch_losses, r2_scores = train_model_with_metrics(
            model, 
            criterion, 
            optimizer, 
            train_loader,
            n_epochs=1000,
            print_every=500
        )
    r2_train.append(r2_scores[-1])
    r2_test.append(model_eval(model, test_loader))
r2_test
r2_difference = [train - test for train, test in zip(r2_train, r2_test)]

plt.figure(figsize=(10, 5))
plt.plot(np.arange(0, 0.9, 0.1), r2_train, marker='o', label='R^2 Train')
plt.plot(np.arange(0, 0.9, 0.1), r2_test, marker='o', label='R^2 Test')
plt.xlabel('Dropout')
plt.ylabel('R^2 Score')
plt.title('R^2 и Dropout на Train и Test')
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10, 5))
plt.plot(np.arange(0, 0.9, 0.1), r2_difference, marker='o', color='purple')
plt.xlabel('Dropout')
plt.ylabel('Разница в R^2')
plt.title('Разница в R^2 (Train - Test) и Dropout')
plt.grid(True)
plt.show()
# Markdown:
<p class='task' id='4'></p>

4\. Решите задачу регрессии с и без использования пакетной нормализации. Покажите, как меняется результат обучения моделей при различных значениях скорости обучения (0.001, 0.01, 0.1) за одно и то же количество эпох.

Визуализируйте график изменения значений $R^2$ в зависимости от эпохи при различных значениях скорости обучения с- и без использования пакетной нормализации.



- [ ] Проверено на семинаре
from sklearn.datasets import load_diabetes

X, y = load_diabetes(return_X_y=True)
X = th.FloatTensor(X).to(device)
y = th.FloatTensor(y).reshape(-1, 1).to(device)
y = (y - y.mean())/y.std()
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)
def create_model(use_batch_norm=True):
    if use_batch_norm:
        return nn.Sequential(
            nn.Linear(X.shape[1], 4),
            nn.BatchNorm1d(4),
            nn.ReLU(),
            nn.Linear(4, 4),
            nn.BatchNorm1d(4),
            nn.ReLU(),
            nn.Linear(4, 1)
        )
    else:
        return nn.Sequential(
            nn.Linear(X.shape[1], 4),
            nn.ReLU(),
            nn.Linear(4, 4),
            nn.ReLU(),
            nn.Linear(4, 1)
        )
r2_scores_all = []
for norm in [True, False]:
    for lr in [0.001, 0.01, 0.1]:
        model = create_model(use_batch_norm=norm).to(device)
        criterion = nn.MSELoss().to(device)
        optimizer = optim.AdamW(model.parameters(), lr=lr)

        epoch_losses, r2_scores = train_model_with_metrics(
                model, 
                criterion, 
                optimizer, 
                loader,
                n_epochs=250,
                print_every=250
            )
        r2_scores_all.append(r2_scores)
learning_rates = [0.001, 0.01, 0.1]
normalization_status = ["Нормализация", "Без нормализации"]
x = range(251)
plt.figure(figsize=(12, 8)) 
for i, y in enumerate(r2_scores_all):
    status = normalization_status[0] if i < 3 else normalization_status[1]
    lr = learning_rates[i % 3]
    plt.plot(x, y, label=f'{status}, lr={lr}')

plt.xlabel('Эпоха')
plt.ylabel('$R^2$')
plt.title('Изменение значений $R^2$')
plt.legend()  
plt.grid(True)

plt.show()
# Markdown:
<p class='task' id='5'></p>

5\. Решите задачу регрессии c использованием различных алгоритмов градиентного спуска. Покажите, как меняется результат обучения моделей при использовании различных алгоритмов (Adam, Adagrad, RMSProp, SGD) за одно и то же количество эпох с одной и той же скоростью обучения. Используйте модель с архитектурой, аналогичной модели из предыдущей задачи.

Визуализируйте график изменения значений MAPE в зависимости от эпохи при использовании различных алгоритмов градиентного спуска.

- [ ] Проверено на семинаре
def train_model_with_metrics(model, criterion, optimizer, train_loader, n_epochs=1000, print_every=100):
    epoch_losses = []
    mape_scores = []

    loss_metric = M.MeanMetric().to(device)
    mape_metric = M.MeanAbsolutePercentageError().to(device)
    
    for epoch in range(n_epochs + 1):
        loss_metric.reset()  
        mape_metric.reset()
        
        model.train()
        for X_batch, y_batch in train_loader:
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            
            loss_metric.update(loss)
            mape_metric.update(y_pred, y_batch)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        epoch_loss = loss_metric.compute().item()
        epoch_losses.append(epoch_loss)
        
        mape_score = mape_metric.compute().item()
        mape_scores.append(mape_score)

        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, MAPE: {mape_score:.4f}')

    return epoch_losses, mape_scores
def load_boston():
    import pandas as pd
    import numpy as np

    data_url = 'http://lib.stat.cmu.edu/datasets/boston'
    raw_df = pd.read_csv(data_url, sep='\s+', skiprows=22, header=None)
    data = np.hstack([raw_df.values[::2, :], raw_df.values[1::2, :2]])
    target = raw_df.values[1::2, 2]
    return data, target
X, y = load_boston()
X = th.FloatTensor(X).to(device)
y = th.FloatTensor(y).reshape(-1, 1).to(device)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=16, shuffle=True)
import torch.optim as optim  

mape_all = []

optimizers = [
    optim.Adam,
    optim.Adagrad,
    optim.RMSprop,
    optim.SGD
]

for opt in optimizers:
    model = nn.Sequential(
                nn.Linear(X.shape[1], 4),
                nn.BatchNorm1d(4),
                nn.ReLU(),
                nn.Linear(4, 4),
                nn.BatchNorm1d(4),
                nn.ReLU(),
                nn.Linear(4, 1)
            ).to(device)
    
    criterion = nn.MSELoss().to(device)
    optimizer = opt(model.parameters(), lr=0.005)

    epoch_losses, mape_scores = train_model_with_metrics(
                model, 
                criterion, 
                optimizer, 
                loader,
                n_epochs=250,
                print_every=250
            )
    mape_all.append(mape_scores)
x = range(251)
plt.figure(figsize=(12, 8)) 
for i, y in enumerate(mape_all):
    opt = optimizers[i]
    plt.plot(x, y[:251], label=f'оптимизатор: {opt.__name__}')

plt.xlabel('Эпоха')
plt.ylabel('MAPE')
plt.title('Изменение значений MAPE')
plt.legend()  
plt.grid(True)

plt.show()


# 04_1_cnn_image_classification.ipynb
# Markdown:
#  Классификация изображений с помощью сверточных нейронных сетей

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Deep Learning with PyTorch (2020) Авторы: Eli Stevens, Luca Antiga, Thomas Viehmann
* https://pytorch.org/docs/stable/nn.html
* https://pytorch.org/vision/0.16/transforms.html#v2-api-reference-recommended
* https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html
* https://pytorch.org/vision/main/generated/torchvision.datasets.ImageFolder.html
* https://kozodoi.me/blog/20210308/compute-image-stats
* https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.matshow.html
# Markdown:
## Задачи для совместного разбора
# Markdown:
1\. Рассмотрите основные возможности по созданию датасетов из `torchvision` и примеры работы основных слоев для создания сверточных нейронных сетей для анализа изображений.

import torch
import torchvision
import torchvision.transforms.v2 as T
trainset = torchvision.datasets.CIFAR10(
    root="./cifar10",
    download=True,
    train=True,
)
trainset[0][0]
transform = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])

trainset = torchvision.datasets.CIFAR10(
    root="./cifar10",
    download=True,
    train=True,
    transform=transform
)
trainset[0][0].shape
trainset[0][0]
# Markdown:
2\. Реализуйте типовую архитектуру CNN для классификации изображений.
import torch.nn as nn

class CNN(nn.Module):
  def __init__(self):
    super().__init__()

    self.conv_block1 = nn.Sequential(
      nn.Conv2d(
          in_channels=3,
          out_channels=6,
          kernel_size=3,
      ),
      nn.ReLU(),
      nn.MaxPool2d(2, 2),
    )

    self.conv_block2 = nn.Sequential(
      nn.Conv2d(
          in_channels=6,
          out_channels=2,
          kernel_size=3,
      ),
      nn.ReLU(),
      nn.MaxPool2d(2, 2),
    )
    self.classifier = nn.Linear(72, 10)


  def forward(self, X):
    out = self.conv_block1(X)
    out = self.conv_block2(out)

    out = out.flatten(start_dim=1)
    out = self.classifier(out)

    return out
from torch.utils.data import DataLoader

loader = DataLoader(trainset, batch_size=16)
X, y = next(iter(loader))
X.shape
model = CNN()
out = model(X)
out.shape
conv1 = nn.Conv2d(3, 6, (5, 7))
conv1.weight.shape
# Markdown:

# Markdown:
<p class="task" id="1"></p>

1\. Создайте датасет `CatBreeds` на основе данных из архива `cat_breeds_4.zip`. Используя преобразования `torchvision`, приведите картинки к размеру 300х300 и нормализуйте значения интенсивности пикселей (рассчитайте статистику для нормализации отдельно). Выведите на экран количество картинок в датасете,  размер одной картинки, количество уникальных классов. Разбейте датасет на обучающее и тестовое множество в соотношении 80 на 20%.

При расчете статистики для нормализации считайте, что вы можете загрузить весь набор данных в память сразу. Однако рекомендуется реализовать подход для получения статистики на основе батчей, так как такое решение в перспективе может позволить обработать датасет, который не помещается в память целиком.

- [ ] Проверено на семинаре
import torchvision.transforms.v2 as T
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import torch as th
import numpy as np
import torchmetrics as M
import torch.optim as optim
import torch.nn as nn
import matplotlib.pyplot as plt
import random
device = th.device('cuda' if th.cuda.is_available() else 'cpu')
device
transform = T.Compose([
    T.Resize((300, 300)), 
    T.ToTensor()
])

dataset = ImageFolder(root='./cat_breeds_4/cat_breeds_4', transform=transform)
loader = DataLoader(dataset, batch_size=32)
mean = 0.0
std = 0.0
total_images = 0

for images, _ in loader:
    batch_samples = images.size(0)  
    total_images += batch_samples
    mean += images.mean([0, 2, 3]) * batch_samples
    std += images.std([0, 2, 3]) * batch_samples

mean /= total_images
std /= total_images

mean, std
transform = T.Compose([
    T.Resize((300, 300)), 
    T.ToTensor(),
    T.Normalize(mean, std)
])

dataset = ImageFolder(root='./cat_breeds_4/cat_breeds_4', transform=transform)
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)
print(f"Количество изображений в датасете: {len(dataset)}")
print(f"Размер одной картинки: {dataset[0][0].shape}")
print(f"Количество уникальных классов: {len(dataset.classes)}")
# Markdown:
<p class="task" id="2"></p>

2\. Решите задачу классификации на основе датасета из предыдущего задания, не используя сверточные слои. Постройте график изменения значения функции потерь на обучающем множестве в зависимости от номера эпохи, графики изменения метрики accuracy на обучающем и тестовом множестве в зависимости от эпохи. Выведите на экран итоговое значение метрики accuracy на обучающем и тестовом множестве. Выведите на экран количество параметров модели.   

- [ ] Проверено на семинаре
def train_model_with_metrics(model, criterion, optimizer, train_loader, test_loader, n_epochs=1000, print_every=100):
    epoch_losses = []
    accuracy_scores = []
    test_accuracies = []

    loss_metric = M.MeanMetric().to(device)  
    accuracy_metric = M.Accuracy(task='multiclass', num_classes=len(dataset.classes)).to(device)  

    for epoch in range(n_epochs + 1):
        loss_metric.reset()
        accuracy_metric.reset()

        model.train()
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)

            loss_metric.update(loss)
            accuracy_metric.update(y_pred, y_batch)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        
        epoch_loss = loss_metric.compute().item()
        accuracy = accuracy_metric.compute().item()
        epoch_losses.append(epoch_loss)
        accuracy_scores.append(accuracy)

        model.eval()
        test_accuracy_metric = M.Accuracy(task='multiclass', num_classes=len(dataset.classes)).to(device)
        with th.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                y_pred = model(X_batch)
                test_accuracy_metric.update(y_pred, y_batch)
        test_accuracies.append(test_accuracy_metric.compute().item())

        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, Train Accuracy: {accuracy:.4f}, Test Accuracy: {test_accuracies[-1]:.4f}')

    return epoch_losses, accuracy_scores, test_accuracies
model = nn.Sequential(
            nn.Flatten(),
            nn.Linear(np.prod(dataset[0][0].shape), 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 4),
        ).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)

epoch_losses, accuracy_scores, test_accuracies = train_model_with_metrics(model, criterion, optimizer, train_loader, test_loader, n_epochs=15, print_every=1)
epochs = range(1, len(epoch_losses) + 1)

plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, epoch_losses, label="Train Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epochs, accuracy_scores, label="Train Accuracy")
plt.plot(epochs, test_accuracies, label="Test Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Train & Test Accuracy")
plt.legend()

plt.tight_layout()
plt.show()
accuracy_scores[-1], test_accuracies[-1]
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Количество параметров модели: {count_parameters(model)}")
# Markdown:
<p class="task" id="3"></p>

3\. Напишите функцию, которая выбирает несколько изображений из переданного набора данных и выводит их на экран в виде сетки с указанием над ними названия правильного класса и класса, предсказанного моделью. Воспользовавшись данной функцией, выведите прогнозы итоговой модели из предыдущей задачи по 6 случайным картинкам.

```
def show_examples(model, dataset, k=6):
    pass
```

- [ ] Проверено на семинаре
def denormalize(image, mean, std):
    mean = mean.clone().detach().view(3, 1, 1)
    std = std.clone().detach().view(3, 1, 1)
    return image * std + mean

def show_examples(model, test_loader, classes, k=6):
    model.eval()

    images, true_labels = [], []
    for X_batch, y_batch in test_loader:
        images.extend(X_batch)
        true_labels.extend(y_batch)
    
    
    indices = random.sample(range(len(images)), k)

    selected_images = [images[i].to(device) for i in indices]
    selected_labels = [true_labels[i] for i in indices]
    selected_images_tensor = th.stack(selected_images)

    with th.no_grad():
        outputs = model(selected_images_tensor)
        pred_labels = th.argmax(outputs, dim=1).cpu()

    fig, axes = plt.subplots(1, k, figsize=(15, 5))
    for i, idx in enumerate(indices):
        ax = axes[i]
        image = denormalize(images[idx].cpu(), mean, std).permute(1, 2, 0)  # Денормализуем и меняем размерности
        true_label = classes[selected_labels[i]]
        pred_label = classes[pred_labels[i]]
        
        ax.imshow(image)
        ax.set_title(f"True: {true_label}\nPred: {pred_label}")
        ax.axis("off")

    plt.tight_layout()
    plt.show()
show_examples(model, test_loader, dataset.classes)
# Markdown:
<p class="task" id="4"></p>

4\. Решите задачу классификации на основе датасета из первого задания, используя сверточные слои. Постройте график изменения значения функции потерь на обучающем множестве в зависимости от номера эпохи, графики изменения метрики accuracy на обучающем и тестовом множестве в зависимости от эпохи. Выведите на экран итоговое значение метрики accuracy на обучающем и тестовом множестве. Выведите на экран количество параметров модели. Воспользовавшись функцией из предыдущего задания, выведите прогнозы итоговой модели по 6 случайным картинкам.

Сохраните веса обученной модели на диск.

- [ ] Проверено на семинаре
class CNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(
                in_channels=3,
                out_channels=6,
                kernel_size=3,
            ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )
        
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(
                in_channels=6,
                out_channels=16,
                kernel_size=3,
            ),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )
        
        self.classifier = nn.Linear(16 * 73 * 73, num_classes) 

    def forward(self, X):
        out = self.conv_block1(X)
        out = self.conv_block2(out)
        out = out.flatten(start_dim=1)  
        out = self.classifier(out)
        return out
model = CNN(num_classes=len(dataset.classes)).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = th.optim.AdamW(model.parameters(), lr=0.001)

epoch_losses, accuracy_scores, test_accuracies = train_model_with_metrics(
    model, criterion, optimizer, train_loader, test_loader, n_epochs=15, print_every=1)
epochs = range(1, len(epoch_losses) + 1)

plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, epoch_losses, label="Train Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epochs, accuracy_scores, label="Train Accuracy")
plt.plot(epochs, test_accuracies, label="Test Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Train & Test Accuracy")
plt.legend()

plt.tight_layout()
plt.show()
accuracy_scores[-1], test_accuracies[-1]
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Количество параметров модели: {count_parameters(model)}")
show_examples(model, test_loader, dataset.classes)
# Markdown:
<p class="task" id="5"></p>

5\. Проанализируйте обученную в предыдущей задаче модель, исследовав обученные ядра сверточных слоев. Выберите одно изображение из тестового набора данных и пропустите через первый сверточный слой модели. Визуализируйте полученные карты признаков.

- [ ] Проверено на семинаре
conv1_weights = model.conv_block1[0].weight.data.cpu()  

fig, axes = plt.subplots(1, min(6, conv1_weights.shape[0]), figsize=(15, 5))
for i in range(min(6, conv1_weights.shape[0])):
    ax = axes[i]
    normalized_filter = (conv1_weights[i] - conv1_weights[i].min()) / (conv1_weights[i].max() - conv1_weights[i].min())
    ax.imshow(normalized_filter.permute(1, 2, 0).numpy())
    ax.axis('off')
    ax.axis('off')
    ax.set_title(f"Filter {i+1}")
plt.show()
test_image, _ = next(iter(test_loader))
test_image = test_image[0].unsqueeze(0).to(device)

with th.no_grad():
    conv1_output = model.conv_block1(test_image)

conv1_output = conv1_output.squeeze(0).cpu().numpy() 

num_features = conv1_output.shape[0]
fig, axes = plt.subplots(1, min(6, num_features), figsize=(15, 5))
for i in range(min(6, num_features)):
    ax = axes[i]
    ax.imshow(conv1_output[i], cmap='viridis')
    ax.axis('off')
    ax.set_title(f"Feature Map {i+1}")
plt.show()


# 04_2_cnn_pretrained (1).ipynb
# Markdown:
#  Использование предобученных моделей для классификации изображений

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Deep Learning with PyTorch (2020) Авторы: Eli Stevens, Luca Antiga, Thomas Viehmann
* https://pytorch.org/vision/0.16/transforms.html#v2-api-reference-recommended
* https://pytorch.org/vision/main/generated/torchvision.datasets.ImageFolder.html
* https://pytorch.org/vision/stable/models.html
* https://albumentations.ai/docs/getting_started/image_augmentation/
* https://www.neurotec.uni-bremen.de/drupal/node/30
# Markdown:
## Задачи для совместного разбора
# Markdown:
1\. Загрузите предобученную модель из `torchvision`. Познакомьтесь с ее архитектурой. Заморозьте веса нескольких слоев.

import torchvision.models as models
model = models.efficientnet_b1(
    weights=models.EfficientNet_B1_Weights.IMAGENET1K_V2
)
model
list(model.parameters())[0].requires_grad_(False)
list(model.named_parameters())
ts = models.EfficientNet_B1_Weights.IMAGENET1K_V1.transforms()
ts
import torch as th

images = th.randint(0, 255, size=(16, 3, 500, 500))
images.shape
ts(images).shape
import os
os.chdir('c:\\Users\\Danya\\Downloads')
# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class="task" id="1"></p>

1\. Используя реализацию из `torchvision`, cоздайте модель `vgg16` и загрузите предобученные веса `IMAGENET1K_V1`. Выведите на экран структуру модели, количество слоев и количество настраиваемых (`requires_grad==True`) параметров модели.

- [X] Проверено на семинаре
import torchvision.models as models
import torchvision.transforms.v2 as T
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import torch.nn as nn
import torch.optim as optim
import torch as th
import torchmetrics as M
import time
import matplotlib.pyplot as plt
model = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)
model
num_layers = sum(1 for _ in model.parameters())
num_layers
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
trainable_params
# Markdown:
<p class="task" id="2"></p>

2\. Создайте датасет `CatBreeds` на основе данных из архива `cat_breeds_4.zip`. Разбейте датасет на обучающее и тестовое множество в соотношении 80 на 20%.

К обучающему датасету примените следующее преобразование: приведите картинки к размеру 256x256, затем обрежьте по центру с размером 224х224, затем переведите изображения в тензор и нормализуйте значения интенсивности пикселей (`mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)`).

К тестовому датасету примените преобразование `VGG16_Weights.IMAGENET1K_V1.transforms`.

- [X] Проверено на семинаре
device = th.device('cuda' if th.cuda.is_available() else 'cpu')
device
train_transform = T.Compose([
    T.Resize((256, 256)), 
    T.CenterCrop(224),
    T.ToTensor(),
    T.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])
transform_test = models.VGG16_Weights.IMAGENET1K_V1.transforms()

dataset = ImageFolder(root='./cat_breeds_4/cat_breeds_4')
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])

train_dataset.dataset.transform = train_transform
test_dataset.dataset.transform = transform_test

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
train_dataset[0][0].shape, test_dataset[0][0].shape
# Markdown:
<p class="task" id="3"></p>

3\. Заморозьте все веса модели из предыдущего задания. Замените последний слой `Linear` классификатора на новый слой, соответствующий задаче. После изменения последнего слоя выведите на экран количество настраиваемых (`requires_grad==True`) параметров модели. Решите задачу, используя модель с замороженными весами и изменнным последним слоем.

Постройте график изменения значения функции потерь на обучающем множестве в зависимости от номера эпохи, графики изменения метрики accuracy на обучающем и тестовом множестве в зависимости от эпохи. Выведите на экран итоговое значение метрики accuracy на обучающем и тестовом множестве.

- [X] Проверено на семинаре
for param in model.parameters():
    param.requires_grad = False
    
num_features = model.classifier[-1].in_features
model.classifier[-1] = nn.Linear(num_features, 4)
model = model.to(device)
def train_model_with_metrics(model, criterion, optimizer, train_loader, test_loader, n_epochs=1000, print_every=100):
    epoch_losses = []
    accuracy_scores = []
    test_accuracies = []

    loss_metric = M.MeanMetric().to(device)  
    accuracy_metric = M.Accuracy(task='multiclass', num_classes=len(dataset.classes)).to(device)  

    for epoch in range(n_epochs + 1):
        loss_metric.reset()
        accuracy_metric.reset()

        model.train()
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)

            loss_metric.update(loss)
            accuracy_metric.update(y_pred, y_batch)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        
        epoch_loss = loss_metric.compute().item()
        accuracy = accuracy_metric.compute().item()
        epoch_losses.append(epoch_loss)
        accuracy_scores.append(accuracy)

        model.eval()
        test_accuracy_metric = M.Accuracy(task='multiclass', num_classes=len(dataset.classes)).to(device)
        with th.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                y_pred = model(X_batch)
                test_accuracy_metric.update(y_pred, y_batch)
        test_accuracies.append(test_accuracy_metric.compute().item())

        if epoch % print_every == 0:
            print(f'Epoch [{epoch}/{n_epochs}], Loss: {epoch_loss:.4f}, Train Accuracy: {accuracy:.4f}, Test Accuracy: {test_accuracies[-1]:.4f}')

    return epoch_losses, accuracy_scores, test_accuracies
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
trainable_params
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)

start_time = time.time()
epoch_losses, accuracy_scores, test_accuracies = train_model_with_metrics(model, criterion, optimizer, train_loader, test_loader, n_epochs=5, print_every=1)
end_time = time.time()
end_time - start_time
accuracy_scores[-1], test_accuracies[-1]
epochs = range(1, len(epoch_losses) + 1)

plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, epoch_losses, label="Train Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epochs, accuracy_scores, label="Train Accuracy")
plt.plot(epochs, test_accuracies, label="Test Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Train & Test Accuracy")
plt.legend()

plt.tight_layout()
plt.show()
# Markdown:
<p class="task" id="4"></p>

4\. Повторите решение предыдущей задачи, заморозив все сверточные слои, кроме последнего (слои классификатора не замораживайте). Сравните качество полученного решения и решения из предыдущей задачи, а также время, затраченное на обучения моделей. Перед началом работы создайте модель заново.

- [X] Проверено на семинаре
model = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)

for param in model.features[:-1].parameters():
    param.requires_grad = False
    
num_features = model.classifier[-1].in_features
model.classifier[-1] = nn.Linear(num_features, 4)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)

start_time = time.time()
epoch_losses, accuracy_scores, test_accuracies = train_model_with_metrics(model, criterion, optimizer, train_loader, test_loader, n_epochs=5, print_every=1)
end_time = time.time()
end_time - start_time
accuracy_scores[-1], test_accuracies[-1]
epochs = range(1, len(epoch_losses) + 1)

plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, epoch_losses, label="Train Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epochs, accuracy_scores, label="Train Accuracy")
plt.plot(epochs, test_accuracies, label="Test Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Train & Test Accuracy")
plt.legend()

plt.tight_layout()
plt.show()
# Markdown:
<p class="task" id="5"></p>

5\. Повторите решение задачи 3, расширив обучающий набор данных при помощи преобразований из `torchvision`, изменяющих изображение (повороты, изменение интенсивности пикселей, обрезание и т.д.). При оценке модели на тестовой выборке данные преобразования применяться не должны. Решение о том, сколько и каких слоев модели будет обучаться, примите самостоятельно. Перед началом работы создайте модель заново.

- [ ] Проверено на семинаре
from torch.utils.data import ConcatDataset
augmentation = T.Compose([
        T.Resize((256, 256)), 
        T.CenterCrop(224),    
        T.RandomHorizontalFlip(p=0.1),
        T.RandomRotation(degrees=10),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

transform_test = models.VGG16_Weights.IMAGENET1K_V1.transforms()

dataset = ImageFolder(root='./cat_breeds_4/cat_breeds_4')
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])

test_dataset.dataset.transform = transform_test

augmented_datasets = [ImageFolder(root='./cat_breeds_4/cat_breeds_4', transform=augmentation)]
extended_train_dataset = ConcatDataset([train_dataset] + augmented_datasets)

train_loader = DataLoader(extended_train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)
model = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)

for param in model.features[:-1].parameters(): 
    param.requires_grad = False
    
num_features = model.classifier[-1].in_features
model.classifier[-1] = nn.Linear(num_features, 4)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)

start_time = time.time()
epoch_losses, accuracy_scores, test_accuracies = train_model_with_metrics(model, criterion, optimizer, train_loader, test_loader, n_epochs=5, print_every=1)
end_time = time.time()
end_time - start_time
accuracy_scores[-1], test_accuracies[-1]
epochs = range(1, len(epoch_losses) + 1)

plt.figure(figsize=(15, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, epoch_losses, label="Train Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train Loss")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epochs, accuracy_scores, label="Train Accuracy")
plt.plot(epochs, test_accuracies, label="Test Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Train & Test Accuracy")
plt.legend()

plt.tight_layout()
plt.show()


# 04_3_deploy_publishing (1).ipynb
# Markdown:
#  Разворачивание и публикация моделей. Streamlit.

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* https://docs.streamlit.io/get-started
* https://colab.research.google.com/github/mrm8488/shared_colab_notebooks/blob/master/Create_streamlit_app.ipynb#scrollTo=meJ36PefNftd
* https://www.youtube.com/playlist?list=PLtqF5YXg7GLmCvTswG32NqQypOuYkPRUE
* https://docs.streamlit.io/develop/api-reference/widgets/st.slider
* https://docs.streamlit.io/develop/api-reference/media/st.image
* https://docs.streamlit.io/develop/api-reference/widgets/st.file_uploader
* https://docs.streamlit.io/develop/api-reference/caching-and-state/st.session_state
# Markdown:
## Задачи для совместного разбора
# !pip install -q streamlit
# !npm install localtunnel
# %%writefile app.py
# !streamlit run /content/app.py &>/content/logs.txt &
# !npx localtunnel --port 8501
# Markdown:
1\. Обсудите базовые возможности по созданию веб-приложения при помощи `streamlit` на примере построения графика функции $y=x^p, x\in[-x_{min}, x_{max}]$.
import streamlit as st
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image


def plot(x_min, x_max, power):
  x = np.linspace(x_min, x_max, 200)
  y = x ** power
  plt.figure(figsize=(10, 5))
  plt.plot(x, y)
  return plt


def main():
  st.title("Пример визуализации")

  x_min = st.sidebar.slider("Минимум", min_value=-5, max_value=5)
  x_max = st.sidebar.slider("Максимум", min_value=-5, max_value=5)
  power = st.sidebar.slider("Степень", min_value=-5, max_value=5)

  fig = plot(x_min, x_max, power)
  st.pyplot(fig)


# def main():
#   img_file = st.file_uploader(
#     label="Изображение"
#   )



  if img_file is not None:
    img = Image.open(img_file)
    st.image(img)


if __name__ == "__main__":
  main()
# Markdown:
2\. Обсудите способ загрузки изображений и хранения переменных в сессии пользователя.
import os
os.chdir('c:\\Users\\Danya\\Downloads')
os.getcwd()
# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class="task" id="1"></p>

1\. Напишите функцию `load_model`, которая восстанавливает модель предсказания категорий животных на основе пути к файлу с весами этой модели и любой другой дополнительной информации, которая требуется для восстановления модели. Загрузите модель и выведите ее архитектуру на экран.

- [ ] Проверено на семинаре
from pathlib import Path
import torch as th
import torch.nn as nn
import torch.optim as optim
device = th.device('cuda') if th.cuda.is_available() else th.device('cpu')


def load_model(weights_path: Path, freeze_layers=None, *args, **kwargs) -> nn.Module:    
    model = th.load(weights_path)
    
    return model
model = load_model(Path("vgg16_cat_classifier.pth"), freeze_layers=slice(0, -1)).to(device)
model
# Markdown:
<p class="task" id="2"></p>

2\. Напишите функцию `preprocess_image`, которая принимает на вход изображение в виде `PIL.Image.Image` и предобрабатывает его таким образом, чтобы результат можно было пропустить через модель. Преобразования, применяемые к изображению, должны соответствовать тому, как данная модель была обучена.

Протестируйте работу функции, скачав картинку с котиком при помощи готовой функции `get_cat_image`.

- [ ] Проверено на семинаре
from torchvision import transforms as T
import requests
from PIL import Image
from io import BytesIO
import matplotlib.pyplot as plt

def preprocess_image(image: Image.Image):
    preprocess = T.Compose([
        T.Resize((256, 256)), 
        T.CenterCrop(224),    
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    preprocessed_image = preprocess(image).unsqueeze(0)
    return preprocessed_image
def get_cat_image(url: str) -> Image.Image:
    response = requests.get(url)
    if response.status_code == 200:
        image = Image.open(BytesIO(response.content))
        return image
    else:
        return None

urls = ["https://www.catster.com/wp-content/uploads/2023/11/Fawn-Sphynx_sophiecat_Shutterstock-800x592.jpg",
       "https://www.catster.com/wp-content/uploads/2024/07/tabby-cat-resting-indoor_Esin-Deniz-Shutterstock.jpg",
       'https://i.ytimg.com/vi/tseIInlZ56A/maxresdefault.jpg']

images = [get_cat_image(i).convert('RGB') for i in urls]
    
plt.figure(figsize=(15, 5))
for i, img in enumerate(images):
    plt.subplot(1, len(images), i + 1) 
    plt.imshow(img)
    plt.axis('off') 
    plt.title(f'Картинка {i + 1}')
plt.show()
preprocess_image(images[0])
# Markdown:
<p class="task" id="3"></p>

3\. Напишите функцию `predict`, при помощи которой можно получить прогноз для изображения. Продемонстируйте работу функции.

- [ ] Проверено на семинаре
from torchvision.datasets import ImageFolder
def predict(model, image_tensor, classes):
    with th.no_grad():
        output = model(image_tensor.to(device))
        probabilities = nn.Softmax(dim=1)(output)
        pred_label_idx = th.argmax(probabilities, dim=1).item()
    
    pred_label = classes[pred_label_idx]
    return pred_label
[predict(model, preprocess_image(img), ['American Shorthair', 'Persian', 'Russian Blue', 'Tiger']) for img in images]
# Markdown:
<p class="task" id="4"></p>

4\. Реализуйте веб-приложение, которое позволяет загрузить изображение и получить прогноз для него при помощи обученной модели. На странице должны располагаться следующие визуальные элементы:
- кнопка для загрузки изображения;
- само изображение (после загрузки);
- кнопка для получения прогнозов;
- таблица с вероятностями каждого класса (после нажатия на кнопку): должны быть видны названия классов.

Продемонстрируйте работу, вставив в ячейку скриншоты, подтверждающие корректность решения.

В этом и следующем задании использование `streamlit` является опциональным. Если вы владеете любым другим инструментом для создания веб-приложения, вы можете использовать его.

- [ ] Проверено на семинаре
# Markdown:
![image.png](attachment:image.png)
# Markdown:
![image.png](attachment:image.png)
# Markdown:
<p class="task" id="5"></p>

5\. Расширьте возможности приложения, добавив возможность отобразить информацию о топ-k наиболее вероятных классов в виде столбчатой диаграммы. Значение k должно выбираться при помощи визуального элемента "слайдер".

Продемонстрируйте работу, вставив в ячейку скриншоты, подтверждающие корректность решения.

- [ ] Проверено на семинаре

# Markdown:
![image.png](attachment:image.png)
# Markdown:
![image.png](attachment:image.png)


# 05_1_hf_transformers (1).ipynb
# Markdown:


__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* https://huggingface.co/docs/transformers/index
* https://huggingface.co/docs/transformers/main_classes/pipelines
* https://huggingface.co/docs/transformers/preprocessing
* https://habr.com/ru/articles/704592/
* https://lightning.ai/docs/torchmetrics/stable/text/bleu_score.html
# Markdown:

# Markdown:
1\. Обсудите основные возможности и экосистему пакета 🤗 Transformers на примере задачи поиска ответа на вопрос в тексте.
text = """The seminars on Deep Learning and Natural Language Processing were truly captivating,
providing a deep dive into the intricacies of these disciplines.
The wealth of knowledge and insights gained during the sessions was commendable.
However, it's disheartening to note the scarcity of homework assignments.
Anastasia, in particular, is quite concerned that the limited number of assignments might
fall short of even reaching 30. While the seminars were intellectually stimulating,
the desire for more hands-on practice through assignments remains strong,
as it is crucial for reinforcing the theoretical understanding acquired during the classes."""
question1 = "What would be the ideal number of homework assignments for Anastasia"
question2 = "What are the shortcomings of the course?"
from transformers import pipeline
answerer = pipeline(
    "question-answering",
    model="distilbert/distilbert-base-uncased-distilled-squad"
)
answerer(question=question1, context=text)
from transformers import AutoModelForQuestionAnswering, AutoTokenizer
import torch
model_name = "distilbert/distilbert-base-uncased-distilled-squad"
model = AutoModelForQuestionAnswering.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
inputs = tokenizer(question1, text, return_tensors="pt")
inputs
with torch.no_grad():
  outputs = model(**inputs)
ans_start = outputs.start_logits.argmax()
ans_end = outputs.end_logits.argmax()
ans_end
token_ids = inputs["input_ids"][0][ans_start: ans_end+1]
tokens = tokenizer.convert_ids_to_tokens(token_ids)
tokenizer.convert_tokens_to_string(tokens)


# Markdown:

# Markdown:
<p class="task" id="1"></p>

1\. Среди предобученных моделей найдите модель для перевода текста с русского языка на английский. Протестируйте данную модель на нескольких предложениях, используя `transformers.pipeline`. Выведите результаты работы в следующем виде:

```
sentence1_ru -> sentence1_en
sentence2_ru -> sentence2_en
```

Получите перевод для всех текстов из файла `RuBQ_2.0_test.json` и посчитайте BLEU-score.

- [ ] Проверено на семинаре
from transformers import pipeline
import torch as th
from sacrebleu import corpus_bleu
import json

device = 0 if th.cuda.is_available() else -1
device
translator  = pipeline(
    "translation",
    model="Helsinki-NLP/opus-mt-ru-en",
    device=device
)
sentences = [
    "Блажен, кто с смолоду был молод, Блажен, кто вовремя созрел",
    "Мы почитаем всех нулями, А единицами — себя",
    "Чем меньше женщину мы любим, Тем легче нравимся мы ей",
    "Мы все учились понемногу Чему-нибудь и как-нибудь",
]

for sentence in sentences:
    translation = translator(sentence)[0]['translation_text']
    print(f"{sentence} -> {translation}")
with open('RuBQ_2.0_test.json', 'r', encoding='utf-8') as file:
    data = json.load(file)

russian_questions = [item['question_text'] for item in data]
english_translations = [item['question_eng'] for item in data]

translated_texts = [translator(question)[0]['translation_text'] for question in russian_questions]

bleu_score = corpus_bleu(translated_texts, [english_translations])
print(f"BLEU Score: {bleu_score.score}")
# Markdown:
<p class="task" id="2"></p>

2\. Среди предобученных моделей найдите модель для классификации фотографий людей по полу.

Протестируйте данную модель на нескольких фотографиях, используя `AutoFeatureExtractor` и `AutoModelForImageClassification`.

Выведите на экран сетку 2х2 из изображений, где над каждым изображением добавлена подпись, содержащая прогноз модели и правильный ответ.

- [ ] Проверено на семинаре
import torch
from transformers import AutoFeatureExtractor, AutoModelForImageClassification
from PIL import Image
import matplotlib.pyplot as plt
model_name = "rizvandwiki/gender-classification"
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name)
model = AutoModelForImageClassification.from_pretrained(model_name)
def predict_gender(image_path):
    image = Image.open(image_path).convert("RGB")
    inputs = feature_extractor(images=image, return_tensors="pt")
    outputs = model(**inputs)
    logits = outputs.logits
    predicted_class_idx = logits.argmax(-1).item()
    return model.config.id2label[predicted_class_idx]

image_paths = [
    "C:\\Users\\Danya\\Pictures\\person_1.jpg",
    "C:\\Users\\Danya\\Pictures\\person_2.jpg",
    "C:\\Users\\Danya\\Pictures\\person_3.jpg",
    "C:\\Users\\Danya\\Pictures\\person_4.jpg"
]

true_labels = ["Male", "Female", "Male", "Male"]

fig, axes = plt.subplots(2, 2, figsize=(10, 10))
axes = axes.flatten()

for idx, image_path in enumerate(image_paths):
    predicted_label = predict_gender(image_path)
    true_label = true_labels[idx]

    image = Image.open(image_path)
    axes[idx].imshow(image)
    axes[idx].axis('off')
    axes[idx].set_title(f"Pred: {predicted_label}\nTrue: {true_label}")

plt.tight_layout()
plt.show()
# Markdown:
<p class="task" id="3"></p>

3\. Среди предобученных моделей найдите модель для генерации аудио по тексту. Используя [данный сервис](https://geek-jokes.sameerkumar.website/api?format=json), получите текст случайной шутки. Сгенерируйте аудио с озвучкой данной шутки.

Для прослушивания полученного аудио воспользуйтесь встроенным виджетом `IPython.display.Audio`

- [ ] Проверено на семинаре
import requests
import torch as th
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
from datasets import load_dataset
from IPython.display import Audio
response = requests.get("https://geek-jokes.sameerkumar.website/api?format=json")
joke = response.json()["joke"]
print(f"Joke: {joke}")
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")

embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation")
speaker_embedding = th.tensor(embeddings_dataset[7306]["xvector"]).unsqueeze(0)

inputs = processor(text=joke, return_tensors="pt")
speech = model.generate_speech(inputs["input_ids"], speaker_embedding, vocoder=vocoder)

Audio(speech, rate=16000)
# Markdown:
<p class="task" id="4"></p>

4\. Разработайте решение для поиска ответа на голосовой вопрос по тексту, используя готовые модели `transformers`. Решение должно включать себя следующие модели:
- модель распознавания текста из аудио;
- модель поиска ответа на вопрос в тексте;
- модель генерации аудио по тексту.

В качестве входных данных запишите небольшой аудиофрагмент в формате на русском языке. Для записи вы можете воспользоваться любым устройством: мобильным телефоном, веб-приложением (например, [этим](https://vocalremover.org/ru/voice-recorder)) и т.д.

В качестве контекста для поиска ответа используйте предложенный текст.

Продемонстируйте все промежуточные результаты, полученные в процессе работы конвейера моделей.

- [ ] Проверено на семинаре
import torch as th
from transformers import pipeline, WhisperProcessor, WhisperForConditionalGeneration
from IPython.display import Audio
import soundfile as sf
import torchaudio
text = """
Машинное обучение — это раздел искусственного интеллекта, который позволяет компьютерным системам улучшать свою работу на основе опыта без явного программирования.
Основная идея заключается в том, что алгоритмы могут учиться на данных, выявляя закономерности и принимая решения с минимальным вмешательством человека.
Существует несколько типов машинного обучения: обучение с учителем, где алгоритм учится на размеченных данных; обучение без учителя, работающее с неразмеченными данными;
и обучение с подкреплением, где алгоритм учится через взаимодействие с окружающей средой.
"""
question = 'Какие типы машинного обучения существуют?'
Audio('question.mp3')
model = pipeline('automatic-speech-recognition', model='openai/whisper-tiny')
outputs = model('question.mp3')
question = outputs.get('text', '').strip()
question
file_path = "question.mp3"

with open(file_path, "rb") as f:
    audio_data = f.read()

Audio(audio_data)
answerer = pipeline(
    "question-answering",
    model="timpal0l/mdeberta-v3-base-squad2",
    max_answer_len=512,
    device=0
)
answer = answerer(question=question, context=text)['answer']
answer
tts_model = pipeline('text-to-speech', model='suno/bark-small', device=0)
outputs = tts_model(answer)
audio = Audio(outputs.get('audio'), rate=outputs.get('sampling_rate'))
audio


# 05_2_hf_transformers_finetuning (1).ipynb
# Markdown:


__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы: 
* https://huggingface.co/docs/transformers/training
* https://huggingface.co/docs/datasets/main/en/repository_structure
* https://huggingface.co/docs/datasets/main/en/package_reference/loading_methods
* https://huggingface.co/docs/transformers/v4.35.2/en/training
* https://huggingface.co/docs/datasets/v2.0.0/en/image_process
* https://huggingface.co/docs/datasets/v3.0.1/en/package_reference/main_classes
* https://huggingface.co/docs/datasets/process
* https://huggingface.co/docs/evaluate/index
* https://huggingface.co/docs/transformers/main_classes/trainer
* https://huggingface.co/docs/transformers/v4.35.2/en/main_classes/trainer
* https://albumentations.ai/docs/getting_started/image_augmentation/
* https://wandb.ai/ayush-thakur/huggingface/reports/Examples-of-Early-Stopping-in-HuggingFace-Transformers--Vmlldzo0MzE2MTM
* https://colab.research.google.com/github/wandb/examples/blob/master/colabs/huggingface/Optimize_Hugging_Face_models_with_Weights_&_Biases.ipynb
# Markdown:

# Markdown:
1\. Обсудите основные шаги по дообучению моделей из экосистемы 🤗 Transformers.
# Markdown:


# Markdown:
<p class="task" id="1"></p>

1\. Создайте набор данных для обучения модели классификации пород кошек, используя пакет 🤗 Datasets. Разделите датасет на обучающее и тестовое множество в соотношении 80/20. 

К обучающему множеству примените следующие преобразования из пакета `albumentations`:
- Resize до 256х256;
- CenterCrop до размера 224х224;
- минимум одно преобразование, случайным образом изменяющее изображение.

К тестовому множеству примените аналогичное преобразование, но без добавления случайных изменений.

Создайте два `DataLoader` на основе обучающего и валидационного множества. Получите батч из обучающего множества и форму признаков и меток на экран. Признаки в батче должны быть уложены в четырехмерный тензор.



- [ ] Проверено на семинаре
import torchvision.models as models
import torchvision.transforms.v2 as T
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import torch.nn as nn
import torch.optim as optim
import torch as th
import torchmetrics as M
import time
import matplotlib.pyplot as plt
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
from transformers import AutoModelForImageClassification, AutoFeatureExtractor, Trainer, TrainingArguments
feature_extractor = AutoFeatureExtractor.from_pretrained("microsoft/resnet-50")

train_transform  = A.Compose([
    A.Resize(256, 256), 
    A.CenterCrop(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Normalize(mean=feature_extractor.image_mean, std=feature_extractor.image_std),
    ToTensorV2()
])

test_transform = A.Compose([
    A.Resize(256, 256),
    A.CenterCrop(224, 224),
    A.Normalize(mean=feature_extractor.image_mean, std=feature_extractor.image_std),
    ToTensorV2()
])

class AlbumentationsTransform:
    def __init__(self, transform):
        self.transform = transform

    def __call__(self, image):
        image = np.array(image)
        augmented = self.transform(image=image)
        return augmented['image']
dataset = ImageFolder(root='./cat_breeds_4/cat_breeds_4')
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])

train_dataset.dataset.transform = AlbumentationsTransform(train_transform)
test_dataset.dataset.transform = AlbumentationsTransform(test_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
data_iter = iter(train_loader)
images, labels = next(data_iter)
images.shape
# Markdown:
<p class="task" id="2"></p>

2\. Создайте модель при помощи класса `AutoModelForImageClassification`, заменив голову модели в соответствии с решаемой задачей  классификации. Используя стандартный цикл обучения `torch` (или `pytorch_lightning`), настройте модель. Во время обучения выводите на экран значение функции потерь и значение F1 на обучающем множестве, а также F1 на валидационном множестве. 

- [ ] Проверено на семинаре
import torchvision.models as models
import torchvision.transforms.v2 as T
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import torch.nn as nn
import torch.optim as optim
import torch as th
import torchmetrics as M
import time
import wandb
import matplotlib.pyplot as plt
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
from transformers import AutoModelForImageClassification, AutoFeatureExtractor, Trainer, TrainingArguments
from tqdm import tqdm
train_transform  = A.Compose([
    A.Resize(256, 256), 
    A.CenterCrop(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

test_transform = A.Compose([
    A.Resize(256, 256),
    A.CenterCrop(224, 224),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

class AlbumentationsTransform:
    def __init__(self, transform):
        self.transform = transform

    def __call__(self, image):
        image = np.array(image)
        augmented = self.transform(image=image)
        return augmented['image']
dataset = ImageFolder(root='./cat_breeds_4/cat_breeds_4')
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])

train_dataset.dataset.transform = AlbumentationsTransform(train_transform)
test_dataset.dataset.transform = AlbumentationsTransform(test_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
model_name = "microsoft/resnet-50"
num_classes = 4 
device = th.device("cuda" if th.cuda.is_available() else "cpu")

model = AutoModelForImageClassification.from_pretrained(
    model_name,
    num_labels=num_classes, 
    ignore_mismatched_sizes=True
).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=5e-5)

train_f1 = M.F1Score(task='multiclass', num_classes=num_classes, average='weighted').to(device)
val_f1 = M.F1Score(task='multiclass', num_classes=num_classes, average='weighted').to(device)


def train_one_epoch(model, train_loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    train_f1.reset()
    for images, labels in tqdm(train_loader, desc="Training", leave=False):
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images).logits
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        preds = th.argmax(outputs, dim=1)
        train_f1.update(preds, labels)

        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)
    f1_score = train_f1.compute()
    return avg_loss, f1_score

def evaluate(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    val_f1.reset()
    with th.no_grad():
        for images, labels in tqdm(val_loader, desc="Validation", leave=False):
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images).logits
            loss = criterion(outputs, labels)

            preds = th.argmax(outputs, dim=1)
            val_f1.update(preds, labels)

            running_loss += loss.item()

    avg_loss = running_loss / len(val_loader)
    f1_score = val_f1.compute()
    return avg_loss, f1_score
for epoch in range(5):
    print(f"Epoch {epoch + 1}/{5}")

    train_loss, train_f1_score = train_one_epoch(model, train_loader, optimizer, criterion, device)
    print(f"Training Loss: {train_loss:.4f}, Training F1: {train_f1_score:.4f}")

    val_loss, val_f1_score = evaluate(model, test_loader, criterion, device)
    print(f"Validation Loss: {val_loss:.4f}, Validation F1: {val_f1_score:.4f}")
# Markdown:
<p class="task" id="3"></p>

3\. Создайте модель при помощи класса `AutoModelForImageClassification`, заменив голову модели в соответствии решаемой задачей  классификации. Используя `transformers.Trainer`, настройте модель для решения задачи. Во время обучения выводите на экран значение функции потерь на обучающем и валидационном множества, а также F1 на валидационном множестве. 

Настройте `Trainer` таким образом, чтобы логирование процесса обучения осуществлялось при помощи `wandb`. Прикрепите скриншоты интерфейса `wandb` с результатами. 

- [ ] Проверено на семинаре

from transformers import TrainingArguments, Trainer
import wandb
from torch.utils.data import Dataset
import evaluate
from transformers import AutoImageProcessor
train_transform  = A.Compose([
    A.Resize(256, 256), 
    A.CenterCrop(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    ToTensorV2()
])

test_transform = A.Compose([
    A.Resize(256, 256),
    A.CenterCrop(224, 224),
    ToTensorV2()
])

class AlbumentationsTransform:
    def __init__(self, transform):
        self.transform = transform

    def __call__(self, image):
        image = np.array(image)
        augmented = self.transform(image=image)
        return augmented['image']
    

dataset = ImageFolder(root='./cat_breeds_4/cat_breeds_4')
train_dataset, test_dataset = random_split(dataset, [0.8, 0.2])

train_dataset.dataset.transform = AlbumentationsTransform(train_transform)
test_dataset.dataset.transform = AlbumentationsTransform(test_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
wandb.init(project="cat_breeds_classification", name="resnet50_trainer")

processor = AutoImageProcessor.from_pretrained("microsoft/resnet-50")
class ImageClassificationDataset(Dataset):
    def __init__(self, dataset, processor):
        self.dataset = dataset
        self.processor = processor

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image, label = self.dataset[idx]
        encoding = self.processor(image, return_tensors='pt')
        pixel_values = encoding['pixel_values'].squeeze()
        return {'pixel_values': pixel_values, 'labels': label}

train_dataset = ImageClassificationDataset(train_dataset, processor)
eval_dataset = ImageClassificationDataset(test_dataset, processor)
num_labels = len(dataset.classes)
label_names = dataset.classes
id2label = {str(i): label for i, label in enumerate(label_names)}
label2id = {label: i for i, label in enumerate(label_names)}

model = AutoModelForImageClassification.from_pretrained(
    "microsoft/resnet-50",
    num_labels=num_labels,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True
)
accuracy_metric = evaluate.load('accuracy')
f1_metric = evaluate.load('f1')

def compute_metrics(p):
    preds = np.argmax(p.predictions, axis=1)
    labels = p.label_ids
    accuracy = accuracy_metric.compute(predictions=preds, references=labels)['accuracy']
    f1 = f1_metric.compute(predictions=preds, references=labels, average='weighted')['f1']
    return {'accuracy': accuracy, 'f1': f1}

training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    save_strategy='epoch',
    eval_strategy='epoch',
    logging_strategy='steps',
    logging_steps=100,
    metric_for_best_model='f1',
    report_to='wandb',
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
)
trainer.train()

wandb.finish()
# Markdown:
![image.png](attachment:image.png)
# Markdown:
![image.png](attachment:image.png)
# Markdown:
![image.png](attachment:image.png)
# Markdown:
<p class="task" id="4"></p>

4\. Повторите решение задачи 3, настроив процедуру ранней остановки (используйте механизм callback для Trainer). Логика ранней остановки следующая: если метрика F1 не увеличивалась на валидационном множестве в течение 3 последних эпох, то процесс обучения останавливается.

- [ ] Проверено на семинаре

from transformers import EarlyStoppingCallback, TrainingArguments, Trainer, AutoImageProcessor, AutoModelForImageClassification
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader, random_split
import evaluate
from torchvision.datasets import ImageFolder
from PIL import Image
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
import wandb
from tqdm import tqdm


wandb.init(project="cat_breeds_classification", name="resnet50_trainer_early_stopping")

train_transform = A.Compose([
    A.Resize(256, 256), 
    A.CenterCrop(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2)
])

test_transform = A.Compose([
    A.Resize(256, 256),
    A.CenterCrop(224, 224),
])


class AlbumentationsTransform:
    def __init__(self, transform):
        self.transform = transform

    def __call__(self, image):
        image = np.array(image)
        augmented = self.transform(image=image)
        return augmented['image']


dataset = ImageFolder(root='./cat_breeds_4/cat_breeds_4')
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_subset, test_subset = random_split(dataset, [train_size, test_size])


train_subset.dataset.transform = AlbumentationsTransform(train_transform)
test_subset.dataset.transform = AlbumentationsTransform(test_transform)

processor = AutoImageProcessor.from_pretrained("microsoft/resnet-50")


class ImageClassificationDataset(Dataset):
    def __init__(self, subset, processor):
        self.subset = subset
        self.processor = processor

    def __len__(self):
        return len(self.subset)

    def __getitem__(self, idx):
        image, label = self.subset[idx]
        
        if not isinstance(image, Image.Image):
            image = Image.fromarray(image)
        
        
        encoding = self.processor(images=image, return_tensors="pt")
        pixel_values = encoding["pixel_values"].squeeze()
        
        return {"pixel_values": pixel_values, "labels": torch.tensor(label, dtype=torch.long)}


train_dataset = ImageClassificationDataset(train_subset, processor)
eval_dataset = ImageClassificationDataset(test_subset, processor)
num_labels = len(dataset.classes)
label_names = dataset.classes
id2label = {str(i): label for i, label in enumerate(label_names)}
label2id = {label: i for i, label in enumerate(label_names)}


model = AutoModelForImageClassification.from_pretrained(
    "microsoft/resnet-50",
    num_labels=num_labels,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True
)


accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")


def compute_metrics(p):
    preds = np.argmax(p.predictions, axis=1)
    labels = p.label_ids
    accuracy = accuracy_metric.compute(predictions=preds, references=labels)["accuracy"]
    f1 = f1_metric.compute(predictions=preds, references=labels, average="weighted")["f1"]
    return {"accuracy": accuracy, "f1": f1}


early_stopping_callback = EarlyStoppingCallback(
    early_stopping_patience=3,
)


training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=30,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    save_strategy='epoch',
    eval_strategy='epoch',
    logging_strategy='steps',
    logging_steps=100,
    load_best_model_at_end=True,
    metric_for_best_model='f1',
    report_to='wandb',
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,  
    compute_metrics=compute_metrics,
    callbacks=[early_stopping_callback]
)


trainer.train()


wandb.finish()

# Markdown:
![image.png](attachment:image.png)
# Markdown:
![image.png](attachment:image.png)
# Markdown:
![image.png](attachment:image.png)


# 06_1_object_detection (2).ipynb
# Markdown:
#  Обнаружение объектов

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* https://pytorch.org/tutorials/intermediate/torchvision_tutorial.html
* https://pyimagesearch.com/2021/11/01/training-an-object-detector-from-scratch-in-pytorch/
* https://pyimagesearch.com/2021/08/02/pytorch-object-detection-with-pre-trained-networks/
# Markdown:
## Задачи для совместного разбора
# Markdown:
1\. Рассмотрите простейшую архитектуру для решения задачи object detection и процесс настройки модели.
import torch as th
import torch.nn as nn
imgs = th.rand(size=(16, 3, 100, 100))
bboxes_true = th.rand(size=(16, 4))
labels_true = th.randint(0, 2, size=(16, ))
class Detector(nn.Module):
  def __init__(self):
    super().__init__()
    self.backbone = nn.Sequential(
        nn.Conv2d(3, 64, kernel_size=3),
        nn.ReLU(),
        nn.MaxPool2d(2)
    )

    self.regressor = nn.Sequential(
        nn.Linear(153664, 64),
        nn.ReLU(),
        nn.Linear(64, 4)
    )

    self.classifier = nn.Sequential(
        nn.Linear(153664, 64),
        nn.ReLU(),
        nn.Linear(64, 2)
    )

  def forward(self, X):
    features = self.backbone(X).flatten(start_dim=1)
    bbox = self.regressor(features)
    labels = self.classifier(features)

    return bbox, labels
model = Detector()
bboxes_pred, labels_pred = model(imgs)
mse_criterion = nn.MSELoss()
ce_criterion = nn.CrossEntropyLoss()
loss = (
    0.5*mse_criterion(bboxes_pred, bboxes_true) +
    0.5*ce_criterion(labels_pred, labels_true)
)
# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class="task" id="1"></p>

1\. Напишите функцию `parse_xml`, которая читает xml-файл с разметкой изображения из архива `animals.zip` и возвращает словарь, содержащий три ключа:
```
{
        "raw": # словарь с ключами xmin, ymin, xmax, ymax
        "scaled": # словарь с ключами xmin, ymin, xmax, ymax
        "obj_name": # строка
}
```
В этом словаре `row` - абсолютные значения координат вершин bounding box, а `scaled` - относительные (нормированные на ширину и высоту изображения). Примените функцию к файлу `cat.0.xml` и выведите результат на экран.


- [ ] Проверено на семинаре
import os
import zipfile
from pathlib import Path
import xml.etree.ElementTree as ET
from PIL import Image
import torch
from torch.utils.data import Dataset
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from torchvision.transforms import ToPILImage, Compose, ToTensor, Normalize, Resize
from torchvision.models import resnet50
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
def parse_xml(xml_filename):
    with zipfile.ZipFile('animals.zip') as z:        
        with z.open(f'Asirra: cat vs dogs/{xml_filename}') as f:
            tree = ET.parse(f)
            root = tree.getroot()
            
            size = root.find("size")
            width = int(size.find("width").text)
            height = int(size.find("height").text)
            
            obj = root.find("object")
            obj_name = obj.find("name").text
            
            bndbox = obj.find("bndbox")
            xmin = float(bndbox.find("xmin").text)
            ymin = float(bndbox.find("ymin").text)
            xmax = float(bndbox.find("xmax").text)
            ymax = float(bndbox.find("ymax").text)
            
            raw = {
                "xmin": xmin,
                "ymin": ymin,
                "xmax": xmax,
                "ymax": ymax
            }
            
            scaled = {
                "xmin": xmin / width,
                "ymin": ymin / height,
                "xmax": xmax / width,
                "ymax": ymax / height
            }
            
            return {
                "raw": raw,
                "scaled": scaled,
                "obj_name": obj_name
            }

parse_xml('cat.0.xml')
# Markdown:
<p class="task" id="2"></p>

2\. Опишите датасет `AnimalDetectionDataset` на основе архива `animals.zip`. Реализуйте `__getitem__` таким образом, чтобы он возвращал три элемента: тензор с изображением, словарь с координатами bounding box и метку объекта. Предусмотрите возможность передавать извне при создании датасета набор преобразований для изображений, преобразование для метки объекта (для кодирования) и флаг, показывающий, нужно ли возвращать исходные или нормированные координаты bounding box.

- [ ] Проверено на семинаре
class AnimalDetectionDataset(Dataset):
    def __init__(self, 
                 zip_path, 
                 transforms=None, 
                 target_transform=None, 
                 return_scaled_coords=True):

        self.zip_path = zip_path
        self.transforms = transforms
        self.target_transform = target_transform
        self.return_scaled_coords = return_scaled_coords

        with zipfile.ZipFile(self.zip_path) as z:
            self.file_list = z.namelist()
        
        self.image_files = [f for f in self.file_list if f.endswith('.jpg')]
        self.annotation_files = [f.replace('.jpg', '.xml') for f in self.image_files]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        with zipfile.ZipFile(self.zip_path) as z:
            image_path = self.image_files[idx]
            annotation_path = self.annotation_files[idx]

            with z.open(image_path) as img_file:
                image = Image.open(img_file).convert("RGB")

            with z.open(annotation_path) as xml_file:
                tree = ET.parse(xml_file)
                root = tree.getroot()

                size = root.find("size")
                width = int(size.find("width").text)
                height = int(size.find("height").text)

                obj = root.find("object")
                label = obj.find("name").text
                bndbox = obj.find("bndbox")
                xmin = float(bndbox.find("xmin").text)
                ymin = float(bndbox.find("ymin").text)
                xmax = float(bndbox.find("xmax").text)
                ymax = float(bndbox.find("ymax").text)

                raw_box = {
                    "xmin": xmin,
                    "ymin": ymin,
                    "xmax": xmax,
                    "ymax": ymax
                }
                scaled_box = {
                    "xmin": xmin / width,
                    "ymin": ymin / height,
                    "xmax": xmax / width,
                    "ymax": ymax / height
                }
                bounding_box = scaled_box if self.return_scaled_coords else raw_box

                if self.target_transform:
                    label = self.target_transform(label)

            if self.transforms:
                image = self.transforms(image)

            if not isinstance(image, torch.Tensor):
                image = torch.from_numpy(np.array(image)).permute(2, 0, 1) / 255.0

            return image, bounding_box, label
# Markdown:
<p class="task" id="3"></p>

3\. Создайте объект класса `AnimalDetectionDataset` без применения преобразований и со значением `return_scaled=False`. Напишите функцию `show_image_with_bounding_box` для визуализации изображения с добавлением на него bounding box и подписи объекта. Продемонстрируйте работу функцию на изображении собаки и кошки.

- [ ] Проверено на семинаре
def show_image_with_bounding_box(image, bounding_box, label):
    if isinstance(image, torch.Tensor):
        image = ToPILImage()(image)
    
    fig, ax = plt.subplots(1)
    ax.imshow(image)

    xmin, ymin, xmax, ymax = bounding_box['xmin'], bounding_box['ymin'], bounding_box['xmax'], bounding_box['ymax']
    width, height = xmax - xmin, ymax - ymin
    rect = patches.Rectangle((xmin, ymin), width, height, linewidth=2, edgecolor='red', facecolor='none')
    ax.add_patch(rect)

    ax.text(xmin, ymin - 10, label, color='red', fontsize=12, weight='bold', backgroundcolor='white')

    plt.axis('off')
    plt.show()
dataset = AnimalDetectionDataset(
    zip_path='animals.zip',
    transforms=None,  
    target_transform=None, 
    return_scaled_coords=False 
)

image, bounding_box, label = dataset[0]
print(f"Label: {label}, Bounding Box: {bounding_box}")
show_image_with_bounding_box(image, bounding_box, label)

image, bounding_box, label = dataset[-1]
print(f"Label: {label}, Bounding Box: {bounding_box}")
show_image_with_bounding_box(image, bounding_box, label)
# Markdown:
<p class="task" id="4"></p>

4\. Напишите модель для решения задачи выделения объектов. Реализуйте двухголовую сеть, одна голова которой предсказывает метку объекта (задача классификации), а вторая голова предсказывает 4 координаты вершин bounding box (задача регрессии). В качестве backbone используйте модель resnet50 из пакета `torchvision`.

- [ ] Проверено на семинаре
class Detector(nn.Module):
  def __init__(self):
    super().__init__()
    backbone = resnet50(pretrained=True)
    self.backbone = nn.Sequential(*list(backbone.children())[:-2])

    self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
    
    self.regressor = nn.Sequential(
        nn.Flatten(),
        nn.Linear(2048, 512),
        nn.ReLU(),
        nn.Linear(512, 4)
    )

    self.classifier = nn.Sequential(
        nn.Flatten(),
        nn.Linear(2048, 512),
        nn.ReLU(),
        nn.Linear(512, 2)
    )

  def forward(self, X):
    features = self.backbone(X)

    pooled_features = self.global_pool(features)

    bbox = self.regressor(pooled_features)
    labels = self.classifier(pooled_features)

    return bbox, labels
# Markdown:
<p class="task" id="5"></p>

5\. Разбейте набор данных на обучающее и валидационное множество. Обучите модель, описанную в задаче 4. При создании датасета не забудьте указать преобразования, соответствующие модели ResNet.

Используйте сумму MSELoss (для расчета ошибки на задаче регрессии) и CrossEntropyLoss (для расчета ошибки на задачи классификации) для настройки весов модели. Для ускорения процесса обучения слои backbone можно заморозить. Во время обучения выводите на экран значения функции потерь на обучающем и валидационном множестве. Используя обученную модель, получите предсказания для изображения кошки и собаки и отрисуйте их. Выполните процедуру, обратную нормализации, чтобы корректно отобразить фотографии.

- [ ] Проверено на семинаре
dataset = AnimalDetectionDataset(
    zip_path="animals.zip",
    transforms=Compose([
        Resize((224, 224)),
        ToTensor(),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]),
    target_transform=lambda label: 0 if label == "cat" else 1,
    return_scaled_coords=True  
)
train_dataset, val_dataset = random_split(dataset, [0.8, 0.2])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

len(train_dataset), len(val_dataset)
model = Detector()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = optim.AdamW(model.parameters(), lr=1e-5)
criterion_class = nn.CrossEntropyLoss()
criterion_bbox = nn.MSELoss()

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for images, bboxes, labels in train_loader:
        images = images.to(device)
        bboxes_tensor = torch.stack(
            [bboxes[key].to(torch.float32) for key in ['xmin', 'ymin', 'xmax', 'ymax']], dim=1
        ).to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        pred_bboxes, pred_labels = model(images)
        
        loss_class = criterion_class(pred_labels, labels)
        loss_bbox = criterion_bbox(pred_bboxes, bboxes_tensor)
        loss = 0.5 * loss_class +  0.5 * loss_bbox

        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, bboxes, labels in val_loader:
            images = images.to(device)
            bboxes_tensor = torch.stack(
                [bboxes[key].to(torch.float32) for key in ['xmin', 'ymin', 'xmax', 'ymax']], dim=1
            ).to(device)
            labels = labels.to(device)

            pred_bboxes, pred_labels = model(images)
            
            loss_class = criterion_class(pred_labels, labels)
            loss_bbox = criterion_bbox(pred_bboxes, bboxes_tensor)
            loss =  0.5 * loss_class +  0.5 * loss_bbox

            val_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss/len(train_loader):.4f} - Val Loss: {val_loss/len(val_loader):.4f}")
def denormalize(image, mean, std):
    mean = torch.tensor(mean).view(3, 1, 1)
    std = torch.tensor(std).view(3, 1, 1)
    image = image * std + mean
    return image

def visualize_prediction(image, bbox, label, label_map):
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    image = denormalize(image, mean, std).permute(1, 2, 0).cpu().numpy()
    image = np.clip(image, 0, 1) 

    xmin, ymin, xmax, ymax = bbox
    xmin *= image.shape[1] 
    ymin *= image.shape[0]
    xmax *= image.shape[1]
    ymax *= image.shape[0]

    plt.figure(figsize=(6, 6))
    plt.imshow(image)
    plt.gca().add_patch(plt.Rectangle(
        (xmin, ymin), xmax - xmin, ymax - ymin,
        edgecolor='red', fill=False, linewidth=2
    ))
    plt.title(f"Predicted: {label_map[label]}")
    plt.axis("off")
    plt.show()

label_map = {0: "cat", 1: "dog"}

model.eval()  
with torch.no_grad():
    for i in (0, -1):
        image, bbox, label = dataset[i]
        image_tensor = image.unsqueeze(0).to(device) 
        pred_bboxes, pred_labels = model(image_tensor)

        pred_bbox = pred_bboxes[0].cpu().numpy()  
        pred_label = pred_labels[0].argmax().item() 

        visualize_prediction(image, pred_bbox, pred_label, label_map)
# Markdown:
<p class="task" id="6"></p>

6\. Найдите в сети несколько изображений котов и собак. Используя любой инструмент для разметки (например, [CVAT](https://www.cvat.ai/)), выделите котов и собак на изображениях. Вставьте скриншот экспортированного файла с разметкой. Используя полученные изображения, визуализируйте разметку и bounding boxes, полученные при помощи модели.

- [ ] Проверено на семинаре
class AnimalDetectionDataset(Dataset):
    def __init__(self, 
                 zip_path, 
                 transforms=None, 
                 target_transform=None, 
                 return_scaled_coords=True):
        self.zip_path = zip_path
        self.transforms = transforms
        self.target_transform = target_transform
        self.return_scaled_coords = return_scaled_coords

        with zipfile.ZipFile(self.zip_path) as z:
            self.file_list = z.namelist()
        
        self.image_files = [f for f in self.file_list if f.endswith('.jpg')]
        self.annotation_file = [f for f in self.file_list if f.endswith('.xml')][0] 
    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        with zipfile.ZipFile(self.zip_path) as z:
            image_path = self.image_files[idx]
            with z.open(image_path) as img_file:
                image = Image.open(img_file).convert("RGB")

            with z.open(self.annotation_file) as xml_file:
                tree = ET.parse(xml_file)
                root = tree.getroot()

                image_name = os.path.basename(image_path)
                image_annotation = root.find(f".//image[@name='{image_name}']")

                if image_annotation is None:
                    raise ValueError(f"No annotation found for image: {image_name}")

                width = int(image_annotation.attrib['width'])
                height = int(image_annotation.attrib['height'])

                box = image_annotation.find("box")
                label = box.attrib['label']
                xmin = float(box.attrib['xtl'])
                ymin = float(box.attrib['ytl'])
                xmax = float(box.attrib['xbr'])
                ymax = float(box.attrib['ybr'])

                raw_box = {
                    "xmin": xmin,
                    "ymin": ymin,
                    "xmax": xmax,
                    "ymax": ymax
                }
                scaled_box = {
                    "xmin": xmin / width,
                    "ymin": ymin / height,
                    "xmax": xmax / width,
                    "ymax": ymax / height
                }
                bounding_box = scaled_box if self.return_scaled_coords else raw_box

                if self.target_transform:
                    label = self.target_transform(label)

            if self.transforms:
                image = self.transforms(image)

            if not isinstance(image, torch.Tensor):
                image = torch.from_numpy(np.array(image)).permute(2, 0, 1) / 255.0

            return image, bounding_box, label
dataset = AnimalDetectionDataset(
    zip_path="Dogs vs Cats.zip",
    transforms=Compose([
        Resize((224, 224)),
        ToTensor(),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ]),
    target_transform=lambda label: 0 if label == "Cat" else 1,
    return_scaled_coords=True
)

def visualize_annotation(image, bbox, label, label_map):
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    image = denormalize(image, mean, std).permute(1, 2, 0).cpu().numpy()
    image = np.clip(image, 0, 1)

    xmin, ymin, xmax, ymax = bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']
    xmin *= image.shape[1]
    ymin *= image.shape[0]
    xmax *= image.shape[1]
    ymax *= image.shape[0]

    plt.figure(figsize=(6, 6))
    plt.imshow(image)
    plt.gca().add_patch(plt.Rectangle(
        (xmin, ymin), xmax - xmin, ymax - ymin,
        edgecolor='green', fill=False, linewidth=2
    ))
    plt.title(f"Annotation: {label_map[label]}")
    plt.axis("off")
    plt.show()

label_map = {0: "cat", 1: "dog"}

model.eval()  
with torch.no_grad():
    for i in range(len(dataset)):
        image, bbox, label = dataset[i]
        image_tensor = image.unsqueeze(0).to(device) 
        
        pred_bboxes, pred_labels = model(image_tensor)
        pred_bbox = pred_bboxes[0].cpu().numpy()  
        pred_label = pred_labels[0].argmax().item() 

        visualize_annotation(image, bbox, label, label_map)
        visualize_prediction(image, pred_bbox, pred_label, label_map)



# 06_2_image_segmentation (1).ipynb
# Markdown:


__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* https://www.kaggle.com/datasets/rajkumarl/people-clothing-segmentation/data
* https://pyimagesearch.com/2021/11/08/u-net-training-image-segmentation-models-in-pytorch/
* https://amaarora.github.io/posts/2020-09-13-unet.html
* https://pytorch.org/docs/stable/generated/torch.nn.ConvTranspose2d.html
* https://medium.com/apache-mxnet/transposed-convolutions-explained-with-ms-excel-52d13030c7e8
* https://github.com/vdumoulin/conv_arithmetic/blob/master/README.md
* https://huggingface.co/docs/transformers/model_doc/segformer
* https://www.kaggle.com/code/damianpanek/segformerb0-people-clothing-segmentation
# Markdown:

# Markdown:
1\. Обсудите постановку задачи сегментации изображений.
# Markdown:
2\. Рассмотрите пример работы слоя `ConvTranspose2d`.
import torch
import torch.nn as nn
img = torch.arange(0, 4).reshape(1, 1, 2, 2).float()
img
layer = nn.ConvTranspose2d(
    in_channels=1,
    out_channels=1,
    kernel_size=3,
    bias=False,
)

layer(img)
res = torch.zeros(4, 4)
w = layer.weight.squeeze()
x = img.squeeze()

res[:3, :3] += w * x[0, 0]
res[:3, 1:4] += w * x[0, 1]
res[1:4, :3] += w * x[1, 0]
res[1:4, 1:4] += w * x[1, 1]
res
# Markdown:

# Markdown:
<p class="task" id="1"></p>

1\. Опишите датасет `ClothesSegmentationDataset`. Реализуйте `__getitem__` таким образом, чтобы он возвращал два элемента: тензор с изображением и тензор с маской. Маска должна быть представлена трехмерным тензором целых чисел. Предусмотрите возможность передавать извне при создании датасета набор преобразований для изображений и масок. Создайте объект датасета и выведите на экран форму и типы одного изображения и его маски.

- [ ] Проверено на семинаре
import os
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision.transforms import functional as F
from torchvision.transforms import Compose
import numpy as np
from torchvision.transforms import Compose, Resize, Normalize, ToTensor
import matplotlib.pyplot as plt
class ClothesSegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, image_transform=None, mask_transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.image_transform = image_transform
        self.mask_transform = mask_transform
        
        self.image_files = sorted([f for f in os.listdir(image_dir) if f.endswith(('.jpg', '.png', '.jpeg'))])
        self.mask_files = sorted([f for f in os.listdir(mask_dir) if f.endswith(('.jpg', '.png', '.jpeg'))])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        
        image_path = os.path.join(self.image_dir, self.image_files[idx])
        mask_path = os.path.join(self.mask_dir, self.mask_files[idx])

        
        image = Image.open(image_path).convert("RGB")
        mask = Image.open(mask_path)

        
        if self.image_transform:
            image = self.image_transform(image)
        else:
            image = F.to_tensor(image)  

        
        mask = np.array(mask, dtype=np.int64)  

        if self.mask_transform:
            mask = Image.fromarray(mask.astype(np.uint8))  
            mask = self.mask_transform(mask)  
            mask = torch.from_numpy(np.array(mask))  
        else:
            mask = torch.from_numpy(mask).long()  

        
        if mask.ndim == 2:
            mask = mask.unsqueeze(0)  

        return image, mask
image_dir = "./Clothes/jpeg_images/IMAGES"  
mask_dir = "./Clothes/jpeg_masks/MASKS"    

image_transform = Compose([
    Resize((256, 256)),
    ToTensor(),
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
mask_transform = Compose([
    Resize((256, 256))
])

dataset = ClothesSegmentationDataset(image_dir=image_dir, mask_dir=mask_dir, 
                                     image_transform=image_transform, mask_transform=mask_transform)

image, mask = dataset[0]

print(f"Image shape: {image.shape}, dtype: {image.dtype}")
print(f"Mask shape: {mask.shape}, dtype: {mask.dtype}")
# Markdown:
<p class="task" id="2"></p>

2\. Напишите функцию `show_image_with_mask`, которая выводит рядом два изображения: фотографию и маску. Продемонстрируйте работу функции, взяв один пример из созданного датасета.

- [ ] Проверено на семинаре
def denormalize(image, mean, std):
    mean = torch.tensor(mean).view(3, 1, 1)
    std = torch.tensor(std).view(3, 1, 1)
    return image * std + mean

def show_image_with_mask(image, mask):  
    mean = [0.485, 0.456, 0.406]  
    std = [0.229, 0.224, 0.225]
    image = denormalize(image, mean, std).clamp(0, 1)  

    
    image_np = image.permute(1, 2, 0).cpu().numpy()  
    mask_np = mask[0].cpu().numpy()  

    
    fig, axes = plt.subplots(1, 2, figsize=(12, 6))
    
    
    axes[0].imshow(image_np)
    axes[0].set_title("Image")
    axes[0].axis("off")
    
    
    axes[1].imshow(mask_np, cmap="nipy_spectral", vmin=0, vmax=59)
    axes[1].set_title("Mask")
    axes[1].axis("off")
    
    
    plt.tight_layout()
    plt.show()
image, mask = dataset[-1]

show_image_with_mask(image, mask)
# Markdown:
<p class="task" id="3"></p>

3\. Реализуйте архитектуру U-Net. Реализуйте модель таким образом, чтобы на выходе для каждого изображения получался тензор размера `n_classes x h x w`, где `n_classes` - количество уникальных значений в масках, а `h` и `w` - размер исходного изображения. Возьмите один пример из набора данных и пропустите его через сеть. Выведите форму полученного результата на экран.

- [ ] Проверено на семинаре
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
import numpy as np

class UNet(nn.Module):
    def __init__(self, in_channels, n_classes):
        super(UNet, self).__init__()
        
        def conv_block(in_c, out_c):
            return nn.Sequential(
                nn.Conv2d(in_c, out_c, kernel_size=3, padding=1),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_c, out_c, kernel_size=3, padding=1),
                nn.ReLU(inplace=True)
            )
        
        self.encoder1 = conv_block(in_channels, 64)
        self.encoder2 = conv_block(64, 128)
        self.encoder3 = conv_block(128, 256)
        self.encoder4 = conv_block(256, 512)
        
        self.bottleneck = conv_block(512, 1024)
        
        self.upconv4 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.decoder4 = conv_block(1024, 512)
        
        self.upconv3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.decoder3 = conv_block(512, 256)
        
        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.decoder2 = conv_block(256, 128)
        
        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.decoder1 = conv_block(128, 64)
        
        self.output = nn.Conv2d(64, n_classes, kernel_size=1)
        
    def forward(self, x):
        e1 = self.encoder1(x)
        e2 = self.encoder2(F.max_pool2d(e1, kernel_size=2))
        e3 = self.encoder3(F.max_pool2d(e2, kernel_size=2))
        e4 = self.encoder4(F.max_pool2d(e3, kernel_size=2))
        
        b = self.bottleneck(F.max_pool2d(e4, kernel_size=2))
        
        d4 = self.upconv4(b)
        d4 = torch.cat((e4, d4), dim=1)
        d4 = self.decoder4(d4)
        
        d3 = self.upconv3(d4)
        d3 = torch.cat((e3, d3), dim=1)
        d3 = self.decoder3(d3)
        
        d2 = self.upconv2(d3)
        d2 = torch.cat((e2, d2), dim=1)
        d2 = self.decoder2(d2)
        
        d1 = self.upconv1(d2)
        d1 = torch.cat((e1, d1), dim=1)
        d1 = self.decoder1(d1)
        
        out = self.output(d1)
        return out

model = UNet(in_channels=3, n_classes=59)

image, mask = dataset[32]  
image = image.unsqueeze(0)  

output = model(image)

print(f"Input image shape: {image.shape}")  
print(f"Output shape: {output.shape}")    
# Markdown:
<p class="task" id="4"></p>

4\.  Разбейте набор данных на обучающее и валидационное множество. Обучите модель U-Net для сегментации изображения. Во время обучения выводите на экран значения функции потерь и точности прогнозов на обучающем и валидационном множестве. Обратите внимание, что выборка является несбалансированной. При расчете функции потерь примените любую известную вам технику для работы с несбалансированными выборками.

При создании датасета допускается использовать преобразования, уменьшающие размер изображений (для ускорения процесса обучения).

Используя обученную модель, получите предсказания для нескольких изображений и отрисуйте их.
- [ ] Проверено на семинаре
from torch.utils.data import random_split, DataLoader
from torchvision.transforms import Compose, Resize, Normalize, ToTensor

image_transform = Compose([
    Resize((128, 128)),
    ToTensor(),
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
mask_transform = Compose([
    Resize((128, 128))
])

dataset = ClothesSegmentationDataset(
    image_dir="./Clothes/jpeg_images/IMAGES",
    mask_dir="./Clothes/jpeg_masks/MASKS",
    image_transform=image_transform,
    mask_transform=mask_transform
)

train_dataset, val_dataset = random_split(dataset, [0.8, 0.2])

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from tqdm import tqdm

class IoULoss(nn.Module):
    def __init__(self, n_classes, smooth=1e-6):
        super(IoULoss, self).__init__()
        self.n_classes = n_classes
        self.smooth = smooth

    def forward(self, preds, targets):
        preds = torch.softmax(preds, dim=1)  

        
        targets_one_hot = torch.nn.functional.one_hot(targets, num_classes=self.n_classes)  
        targets_one_hot = targets_one_hot.permute(0, 3, 1, 2).float()  

        intersection = (preds * targets_one_hot).sum(dim=(2, 3))
        union = preds.sum(dim=(2, 3)) + targets_one_hot.sum(dim=(2, 3)) - intersection
        iou = (intersection + self.smooth) / (union + self.smooth)  

        return 1 - iou.mean()

class DiceLoss(nn.Module):
    def __init__(self, n_classes, smooth=1e-6):
        super(DiceLoss, self).__init__()
        self.n_classes = n_classes
        self.smooth = smooth

    def forward(self, preds, targets):
        preds = torch.softmax(preds, dim=1)  

        
        targets_one_hot = torch.nn.functional.one_hot(targets, num_classes=self.n_classes)  
        targets_one_hot = targets_one_hot.permute(0, 3, 1, 2).float()  

        intersection = (preds * targets_one_hot).sum(dim=(2, 3))
        dice = (2 * intersection + self.smooth) / (preds.sum(dim=(2, 3)) + targets_one_hot.sum(dim=(2, 3)) + self.smooth)  

        return 1 - dice.mean()

class CombinedLoss(nn.Module):
    def __init__(self, n_classes, weight_ce=1.0, weight_iou=1.0, smooth=1e-6):
        super(CombinedLoss, self).__init__()
        self.ce = nn.CrossEntropyLoss()
        self.iou = IoULoss(n_classes, smooth)
        self.weight_ce = weight_ce
        self.weight_iou = weight_iou

    def forward(self, preds, targets):
        loss_ce = self.ce(preds, targets)
        loss_iou = self.iou(preds, targets)
        return self.weight_ce * loss_ce + self.weight_iou * loss_iou
n_classes = 59


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(in_channels=3, n_classes=n_classes).to(device)


criterion = CombinedLoss(n_classes=n_classes, weight_ce=1.0, weight_iou=1.0)  

optimizer = optim.AdamW(model.parameters(), lr=1e-4)


num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for images, masks in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
        images = images.to(device)
        masks = masks.to(device, dtype=torch.long)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks.squeeze(1))  
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)

    
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, masks in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Validation"):
            images = images.to(device)
            masks = masks.to(device, dtype=torch.long)

            outputs = model(images)
            loss = criterion(outputs, masks.squeeze(1))  
            val_loss += loss.item()

    val_loss /= len(val_loader)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}")
    print(f"Val Loss: {val_loss:.4f}")
import matplotlib.pyplot as plt
import torch

def denormalize(image, mean, std):
    mean = torch.tensor(mean).view(3, 1, 1)
    std = torch.tensor(std).view(3, 1, 1)
    return image * std + mean

def visualize_predictions(model, dataset, indices, device, n_classes):
    model.eval() 
    mean = [0.485, 0.456, 0.406]  
    std = [0.229, 0.224, 0.225]   

    for idx in indices:
        image, true_mask = dataset[idx]
        image_tensor = image.unsqueeze(0).to(device)  
        
        with torch.no_grad():
            pred_mask = model(image_tensor)
            pred_mask = torch.argmax(pred_mask, dim=1).squeeze(0).cpu().numpy()

        image_np = denormalize(image, mean, std).permute(1, 2, 0).cpu().numpy()  
        true_mask_np = true_mask.squeeze(0).cpu().numpy()

        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        axes[0].imshow(image_np)
        axes[0].set_title("Original Image")
        axes[0].axis("off")
        
        axes[1].imshow(true_mask_np, cmap="nipy_spectral", vmin=0, vmax=n_classes - 1)
        axes[1].set_title("True Mask")
        axes[1].axis("off")
        
        axes[2].imshow(pred_mask, cmap="nipy_spectral", vmin=0, vmax=n_classes - 1)
        axes[2].set_title("Predicted Mask")
        axes[2].axis("off")
        
        plt.tight_layout()
        plt.show()

indices = [0, 10, -1]
visualize_predictions(model, dataset, indices, device, n_classes=60)
# Markdown:
<p class="task" id="5"></p>

5\.  Обучите модуль `SegformerForSemanticSegmentation` из пакета `transformers` для сегментации изображения. Во время обучения выводите на экран значения функции потерь и точности прогнозов на обучающем и валидационном множестве. Для оптимизации используйте значение функции потерь, которое возвращает вам модель.

Используя обученную модель, получите предсказания для нескольких изображений и отрисуйте их.
- [ ] Проверено на семинаре
import torch
from torch.utils.data import DataLoader, random_split
from transformers import SegformerForSemanticSegmentation, SegformerImageProcessor
import matplotlib.pyplot as plt
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_name = "nvidia/segformer-b0-finetuned-ade-512-512" 
model = SegformerForSemanticSegmentation.from_pretrained(
    model_name,
    num_labels=60,
    ignore_mismatched_sizes=True
).to(device)

processor = SegformerImageProcessor.from_pretrained(model_name)
from torch.utils.data import random_split, DataLoader
from torchvision.transforms import Compose, Resize, Normalize, ToTensor

image_transform = Compose([
    Resize((256, 256)),
    ToTensor(),
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
mask_transform = Compose([
    Resize((256, 256))
])

dataset = ClothesSegmentationDataset(
    image_dir="C:\\Users\\Danya\\Downloads\\Clothes\\jpeg_images\\IMAGES",
    mask_dir="C:\\Users\\Danya\\Downloads\\Clothes\\jpeg_masks\\MASKS",
    image_transform=image_transform,
    mask_transform=mask_transform
)

train_dataset, val_dataset = random_split(dataset, [0.8, 0.2])

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
from torch.optim import AdamW

optimizer = AdamW(model.parameters(), lr=5e-5)

num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for pixel_values, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
        pixel_values = pixel_values.to(device)
        labels = labels.to(device, dtype=torch.long).squeeze(1)  

        outputs = model(pixel_values=pixel_values, labels=labels)
        loss = outputs.loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for pixel_values, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Validation"):
            pixel_values = pixel_values.to(device)
            labels = labels.to(device, dtype=torch.long).squeeze(1) 
            outputs = model(pixel_values=pixel_values, labels=labels)
            val_loss += outputs.loss.item()

    val_loss /= len(val_loader)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}")
    print(f"Val Loss: {val_loss:.4f}")
def visualize_predictions(model, dataset, indices, processor):
    model.eval()
    for idx in indices:
        pixel_values, labels = dataset[idx]
        pixel_values = pixel_values.unsqueeze(0).to(device)

        with torch.no_grad():
            outputs = model(pixel_values=pixel_values)
            preds = outputs.logits.argmax(dim=1).squeeze(0).cpu().numpy()

        image = pixel_values.squeeze(0).permute(1, 2, 0).cpu().numpy()
        image = (image - image.min()) / (image.max() - image.min())  

        true_mask = labels.squeeze(0).cpu().numpy()

        fig, axes = plt.subplots(1, 3, figsize=(15, 5))

        axes[0].imshow(image)
        axes[0].set_title("Original Image")
        axes[0].axis("off")

        axes[1].imshow(true_mask, cmap="nipy_spectral", vmin=0, vmax=59)
        axes[1].set_title("True Mask")
        axes[1].axis("off")

        axes[2].imshow(preds, cmap="nipy_spectral", vmin=0, vmax=59)
        axes[2].set_title("Predicted Mask")
        axes[2].axis("off")

        plt.tight_layout()
        plt.show()

visualize_predictions(model, val_dataset, indices=[0, 10, -1], processor=processor)


# 07_1_autoencoders (1).ipynb
# Markdown:
#  Автоэнкодеры

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* https://www.eecs.qmul.ac.uk/~sgg/_ECS795P_/papers/WK07-8_PyTorch_Tutorial2.html
* https://www.youtube.com/watch?v=zp8clK9yCro
* https://medium.com/@rekalantar/variational-auto-encoder-vae-pytorch-tutorial-dce2d2fe0f5f
* https://towardsdatascience.com/conditional-variational-autoencoders-with-learnable-conditional-embeddings-e22ee5359a2a
* https://pytorch.org/vision/stable/auto_examples/others/plot_visualization_utils.html#sphx-glr-auto-examples-others-plot-visualization-utils-py
# Markdown:
## Задачи для совместного разбора
# Markdown:
1\. Обсудите основные шаги в обучении автокодировщиков.
# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class="task" id="1"></p>

1\. Загрузите набор данных MNIST из пакета `torchvision` (данный набор уже разбит на обучающее и тестовое множество).

Создайте и обучите модель автокодировщика, используя только полносвязные слои и функции активации.

Кодировщик - это функция вида
$z = f_\theta(x)$
,где $\theta$ - это параметры кодировщика.

Декодировщик - это функция вида
$\hat{x} = g_\phi(z)$
,где $\phi$ - это параметры декодировщика.

В нашем случае оба компонента представляют собой нейронные сети. Скрытое представление, полученное после части-кодировщика, должно иметь размерность 2. Последним слоем части-декодеровщика сделайте сигмоиду.

В качестве функции потерь используйте `MSELoss` между исходным и восстановленным изображением $MSE(x, \hat{x})$.

Обратите внимание, что во время обучения метки классов не используются.


- [ ] Проверено на семинаре
# Markdown:
from torchvision import datasets, transforms
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torch as th
from tqdm import tqdm
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
transform = transforms.Compose([
    transforms.ToTensor(),
])

mnist_train = datasets.MNIST(root='./data/mnist', train=True, download=True, transform=transform)
mnist_test = datasets.MNIST(root='./data/mnist', train=False, download=True, transform=transform)
mnist_train, mnist_test
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(28*28, 14*14),
            nn.ReLU(),
            nn.Linear(14*14, 7*7),
            nn.ReLU(),
            nn.Linear(7*7, 2)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(2, 7*7),
            nn.ReLU(),
            nn.Linear(7*7, 14*14),
            nn.ReLU(),
            nn.Linear(14*14, 28*28),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
model = Autoencoder().cuda()
optimizer = optimizer = optim.AdamW(model.parameters(), lr=2e-3)
criterion = nn.MSELoss()

train_loader = DataLoader(mnist_train, batch_size=8, shuffle=True)
test_loader = DataLoader(mnist_test, batch_size=8, shuffle=True)
for epoch in range(10):
    model.train()
    train_loss = 0
    for img, _ in tqdm(train_loader):
        img = img.flatten(start_dim=1).cuda()
    
        output = model(img)
        loss = criterion(output, img)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
    model.eval()
    test_loss = 0.0
    with th.no_grad():
        for img, _ in tqdm(test_loader):
            img = img.flatten(start_dim=1).cuda()

            outputs = model(img)
            loss = criterion(outputs, img)
        
            test_loss += loss.item()
        
    train_loss /= len(train_loader)
    test_loss /= len(test_loader)
    
    print(f"Epoch [{epoch+1}/10], Train Loss: {train_loss:.4f} Test Loss: {test_loss:.4f}")
# Markdown:
<p class="task" id="2"></p>

2\. Получите один батч из тестового множества. Используя модель, обученную в предыдущем задании, получите скрытые представления для всех изображений из этого батча и визуализируйте на плоскости (они должны иметь размерность 2!). Раскрасьте точки в цвета, соответствующие меткам класса изображений (цифрам).

Возьмите одно изображение из тестового множества и пропустите его через обученный автокодировщик. Визуализируйте рядом (по горизонтали) два изображения: исходное и после восстановления автокодировщиком.


- [ ] Проверено на семинаре
data_iter = iter(test_loader)
images, labels = next(data_iter)

images = images.flatten(start_dim=1).cuda() 
labels = labels.cuda()

with th.no_grad(): 
    encoder = model.encoder 
    hidden_representations = encoder(images)
    
hidden_representations = hidden_representations.cpu().numpy()
hidden_representations
plt.scatter(hidden_representations[:, 0], hidden_representations[:, 1], c=labels.cpu(), cmap='tab10', s=25)
plt.colorbar() 
plt.show()
image = images[5]
with th.no_grad(): 
    reconstructed = model(image)
    
reconstructed  = reconstructed.cpu().numpy().reshape((28, 28))
image = image.cpu().numpy().reshape((28, 28))
reconstructed.shape
fig, axes = plt.subplots(1, 2, figsize=(10, 5))

axes[0].imshow(image, cmap='gray')  
axes[0].set_title("Original Image")
axes[0].axis('off')

axes[1].imshow(reconstructed, cmap='gray')  
axes[1].set_title("Reconstructed Image")
axes[1].axis('off')

plt.show()
# Markdown:
<p class="task" id="3"></p>

3\. Напишите функцию для генерации изображения на основе случайного шума. Функция должна генерировать случайный шум из стандартного нормального распределения и пропускать его через часть-декодировщик. Сгенерируйте несколько изображений и визуализируйте в виде сетки из картинок.

- [ ] Проверено на семинаре
def make_some_noise():
    tensor = th.randn(2).cuda()
    with th.no_grad():  
        decoder = model.decoder 
        generated = decoder(tensor)
    return generated
fig, axes = plt.subplots(3, 3, figsize=(10, 10))
axes = axes.flatten()

for i in range(9):
    image = make_some_noise()
    image = image.cpu().numpy().reshape((28, 28))
    axes[i].imshow(image, cmap='gray') 
    axes[i].axis('off') 

plt.tight_layout()  
plt.show()
# Markdown:
<p class="task" id="4"></p>

4\. Создайте и обучите модель условного автокодировщика, используя только полносвязные слои и функции активации.

Отличие от предыдущего варианта заключается в том, что теперь функции кодировщика и декодировщика принимают на вход также метку класса:
$$z = f_\theta(x, c)$$
$$\hat{x} = g_\phi(z, c)$$

Таким образом, теперь во теперь время обучения метки классов используются. Задействуйте их следующим образом: представьте метки классов в виде one-hot кодировки и объедините с пикселями изображения (для этого адаптируйте размерность слоев).

Скрытое представление, полученное после части-кодировщика, должно иметь размерность 2. Последним слоем части-декодеровщика сделайте сигмоиду. В качестве функции потерь используйте `MSELoss` между исходным и восстановленным изображением $MSE(x, \hat{x})$.


- [ ] Проверено на семинаре
# Markdown:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(28*28 + 10, 14*14),
            nn.ReLU(),
            nn.Linear(14*14, 7*7),
            nn.ReLU(),
            nn.Linear(7*7, 2)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(2 + 10, 7*7),
            nn.ReLU(),
            nn.Linear(7*7, 14*14),
            nn.ReLU(),
            nn.Linear(14*14, 28*28),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
model = Autoencoder().cuda()
optimizer = optimizer = optim.AdamW(model.parameters(), lr=2e-3)
criterion = nn.MSELoss()

train_loader = DataLoader(mnist_train, batch_size=8, shuffle=True)
test_loader = DataLoader(mnist_test, batch_size=8, shuffle=True)
for epoch in range(10):
    model.train()
    train_loss = 0
    for img, labels in tqdm(train_loader):
        img = img.flatten(start_dim=1).cuda()
        labels = labels.cuda()
        one_hot_labels = F.one_hot(labels, num_classes=10).float()
        x = th.cat((img.reshape((8, 28*28)), one_hot_labels), dim=1)
        
        x = model.encoder(x)
        x = th.cat((x, one_hot_labels), dim=1)
        x = model.decoder(x)
        loss = criterion(x, img)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
    model.eval()
    test_loss = 0.0
    with th.no_grad():
        for img, labels in tqdm(test_loader):
            img = img.flatten(start_dim=1).cuda()
            labels = labels.cuda()
            one_hot_labels = F.one_hot(labels, num_classes=10).float()
            x = th.cat((img.reshape((8, 28*28)), one_hot_labels), dim=1)
            
            x = model.encoder(x)
            x = th.cat((x, one_hot_labels), dim=1)
            x = model.decoder(x)
            loss = criterion(x, img)
        
            test_loss += loss.item()
        
    train_loss /= len(train_loader)
    test_loss /= len(test_loader)
    
    print(f"Epoch [{epoch+1}/10], Train Loss: {train_loss:.4f} Test Loss: {test_loss:.4f}")
# Markdown:
<p class="task" id="5"></p>

5\. Напишите функцию для генерации изображения на основе случайного шума. Функция должна генерировать случайный шум из стандартного нормального распределения и one-hot представление цифры. Далее объединенный вектор пропускается его через часть-декодировщик. Сгенерируйте несколько изображений и визуализируйте в виде сетки из картинок.

- [ ] Проверено на семинаре
labels = th.zeros(10).cuda()
labels[5] = 1
labels
def make_some_noise():
    tensor = th.randn(2).cuda()
    tensor = th.cat((tensor, labels), dim=0)
    
    with th.no_grad():  
        decoder = model.decoder 
        generated = decoder(tensor)
    return generated
fig, axes = plt.subplots(3, 3, figsize=(10, 10))
axes = axes.flatten()

for i in range(9):
    image = make_some_noise()
    image = image.cpu().numpy().reshape((28, 28))
    axes[i].imshow(image, cmap='gray') 
    axes[i].axis('off') 

plt.tight_layout()  
plt.show()
# Markdown:
<p class="task" id="6"></p>

6\. Создайте и обучите модель вариационного автокодировщика, используя только полносвязные слои и функции активации.

Кодировщик - это функция следующего вида:
$$q_\phi(z|x) = \mathcal{N}(\mu_\phi(x), \sigma_\phi^2(x))$$

Здесь $\phi$ - параметры кодировщика, а $\mu_\phi(x)$ и $\sigma_\phi^2(x)$ - это обучаемые функции (в нашем случае - полносвязные слои).

Чтобы иметь возможность обучить такую модель, используется т.н. reparametrization trick: на основе функций $\mu$ и $ \sigma$ считаем значение:

$$z = \mu_\phi(x) + \sigma_\phi(x) \odot \epsilon, \quad \epsilon \sim \mathcal{N}(0, I)$$

Декодировщик пытается восстановить исходное изображение из полученного вектора:

$$p_\theta(x|z) = f(z; \theta)$$

В качестве функции потерь обычно используется следующая:
$$\mathcal{L}_{total} = \mathcal{L}_{recon} + D_{KL}$$
$$\mathcal{L}_{recon} = -\sum_{i=1}^D [x_i \log \hat{x}_i + (1 - x_i) \log (1 - \hat{x}_i)]$$
$$D_{KL} = -\frac{1}{2} \sum_{j=1}^J (1 + \log \sigma_j^2 - \mu_j^2 - \sigma_j^2)$$


- [ ] Проверено на семинаре
# Markdown:
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(28*28, 22*22),
            nn.ReLU(),
            nn.Linear(22*22, 16*16),
            nn.ReLU(),
            nn.Linear(16*16, 200)
        )

        self.mu = nn.Linear(200, 20)
        self.logvar = nn.Linear(200, 20)
        
        self.decoder = nn.Sequential(
            nn.Linear(20, 7*7),
            nn.ReLU(),
            nn.Linear(7*7, 14*14),
            nn.ReLU(),
            nn.Linear(14*14, 28*28),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.encoder(x)
        mu = self.mu(x)
        logvar = self.logvar(x)
        
        std = th.exp(0.5 * logvar)
        eps = th.randn_like(std)
        z = mu + eps * std

        recon = self.decoder(z)

        return recon, mu, logvar
def loss_function(recon, x, mu, logvar):
    recon_loss = th.nn.functional.binary_cross_entropy(recon, x, reduction='mean')
    
    kl_loss = -0.5 * th.sum(1 + logvar - mu.pow(2) - logvar.exp())
    
    total_loss = recon_loss + kl_loss
    return total_loss
model = VAE().cuda()
optimizer = optim.AdamW(model.parameters(), lr=2e-4)

train_loader = DataLoader(mnist_train, batch_size=8, shuffle=True)
test_loader = DataLoader(mnist_test, batch_size=8, shuffle=True)
for epoch in range(10):
    model.train()
    train_loss = 0
    for img, _ in tqdm(train_loader):
        img = img.flatten(start_dim=1).cuda()
    
        recon, mu, sigma = model(img)
        loss = loss_function(recon, img, mu, sigma)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
    model.eval()
    test_loss = 0.0
    with th.no_grad():
        for img, _ in tqdm(test_loader):
            img = img.flatten(start_dim=1).cuda()

            recon, mu, sigma = model(img)
            loss = loss_function(recon, img, mu, sigma)
        
            test_loss += loss.item()
        
    train_loss /= len(train_loader)
    test_loss /= len(test_loader)
    
    print(f"Epoch [{epoch+1}/10], Train Loss: {train_loss:.4f} Test Loss: {test_loss:.4f}")
# Markdown:
<p class="task" id="7"></p>

7\. Напишите функцию для генерации изображения на основе случайного шума. Функция должна генерировать случайный шум из стандартного нормального распределения. Далее вектор пропускается его через часть-декодировщик. Сгенерируйте несколько изображений и визуализируйте в виде сетки из картинок.

- [ ] Проверено на семинаре
def make_some_noise():
    tensor= th.randn(20).cuda()
    with th.no_grad():  
        generated = model.decoder(tensor)
    return generated
fig, axes = plt.subplots(3, 3, figsize=(10, 10))
axes = axes.flatten()

for i in range(9):
    image = make_some_noise()
    image = image.cpu().numpy().reshape((28, 28))
    axes[i].imshow(image, cmap='gray') 
    axes[i].axis('off') 

plt.tight_layout()  
plt.show()


# 08_1_gymnasium_env (2).ipynb
# Markdown:
# Введение в RL и пакет Gymnasium

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* https://gymnasium.farama.org/
* https://pypi.org/project/ufal.pybox2d/
* https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/
* https://gymnasium.farama.org/api/spaces/fundamental/
* https://gymnasium.farama.org/environments/toy_text/blackjack/
# Markdown:
## Задачи для совместного разбора
# Markdown:
1\. Рассмотрите пример создания окружения `gymnasium` и основные этапы взаимодействия с этим окружением.


# !pip install gymnasium
# !pip install swig
# !pip install "gymnasium[box2d]"
import gymnasium as gym
env = gym.make("LunarLander-v3", render_mode="human")

def make_action(obs, env):
  # тут вы должны исопльзовать состояние
  return env.action_space.sample()

# ---
obs, _ = env.reset()
is_done = False
rewards = []
while not is_done:
  action = make_action(obs, env)
  obs, reward, terminated, truncated, info = env.step(action)

  is_done = terminated or truncated
  rewards.append(reward)
# --

env.close()
sum(rewards)
# Markdown:
## Задачи для самостоятельного решения
import gymnasium as gym
from gymnasium import spaces
from gymnasium.wrappers import RecordVideo
import os
import numpy as np
import random
# Markdown:
<p class="task" id="1"></p>

1\. Создайте окружение `Blackjack-v1`. Сыграйте `N=10000` игр, выбирая действие случайным образом. Посчитайте и выведите на экран долю выигранных игр.

- [ ] Проверено на семинаре
env = gym.make("Blackjack-v1")

def make_action(obs, env):
    return env.action_space.sample()

N = 10000
wins = 0

def is_win(reward):
    return reward > 0

for _ in range(N):
    obs = env.reset()
    is_done = False
    while not is_done:
        action = make_action(obs, env)
        obs, reward, terminated, truncated, info = env.step(action)

        is_done = terminated or truncated

    if is_win(reward):
        wins += 1

env.close()

win_ratio = wins / N
win_ratio
# Markdown:
<p class="task" id="2"></p>

2\. Создайте окружение `Blackjack-v1`. Предложите стратегию, которая позволит, в среднем, выигрывать чаще, чем случайный выбор действия. Реализуйте эту стратегию и сыграйте `N=10000` игр, выбирая действие согласно этой стратегии. Посчитайте и выведите на экран долю выигранных игр.

- [ ] Проверено на семинаре
env = gym.make("Blackjack-v1")

def make_action(obs, env):
    return 0 if obs[0] >= 12 else 1

N = 10000
wins = 0

def is_win(reward):
    return reward > 0

for _ in range(N):
    obs = env.reset()[0]
    is_done = False
    while not is_done:
        action = make_action(obs, env)
        obs, reward, terminated, truncated, info = env.step(action)

        is_done = terminated or truncated

    if is_win(reward):
        wins += 1

env.close()

win_ratio = wins / N
win_ratio
# Markdown:
<p class="task" id="3"></p>

3\. Создайте окружение для игры в крестики-нолики, реализовав интерфейс `gym.Env`. Решение должно удовлетворять следующим условиям:
* для создания пространства состояний используется `spaces.Box`;
* для создания пространства действий используется `spaces.MultiDiscrete`;
* игра прекращается, если:
    - нет возможности сделать ход;
    - игрок пытается отметить уже выбранную ячейку.
* после каждого хода игрок получает награду:
    - 0, если игра не закончена;
    - 1, если игрок выиграл;
    - -1, если игрок проиграл.
* стратегию выбора действия для второго игрока (машины) определите самостоятельно.

Стратегия поведения машины является частью окружения и должна быть реализована внутри него. Сделайте все соответствующие переменные и методы приватными (названия всех переменных начинаются с `__`), подчеркнув, что у пользователя не должно быть к ним доступа извне.

Сыграйте одну игру, выбирая действия случайным образом. Выведите на экран состояние окружения после каждого хода и итоговую награду пользователя за сессию.

- [ ] Проверено на семинаре
class TicTacToeEnv(gym.Env):
    def __init__(self):
        super(TicTacToeEnv, self).__init__()
        self.__state_shape = (3, 3)
        self.observation_space = spaces.Box(low=-1, high=1, shape=self.__state_shape, dtype=np.int8)
        
        self.action_space = spaces.MultiDiscrete([3, 3])

        self.__state = None
        self.__done = False
        self.__current_player = 1  

    def reset(self):
        self.__state = np.zeros(self.__state_shape, dtype=np.int8)
        self.__done = False
        self.__current_player = 1 
        return self.__state

    def step(self, action):
        if self.__done:
            raise ValueError("Игра уже завершена.")

        row, col = action
        if self.__state[row, col] != 0:  
            self.__done = True
            return self.__state, -1, self.__done, {}

        self.__state[row, col] = self.__current_player

        if self.__check_winner(self.__current_player):
            self.__done = True
            return self.__state, 1 if self.__current_player == 1 else -1, self.__done, {}

        if not self.__is_move_possible():
            self.__done = True
            return self.__state, 0, self.__done, {}

        self.__current_player *= -1

        if self.__current_player == -1:
            self.__machine_move()

            if self.__check_winner(self.__current_player):
                self.__done = True
                return self.__state, -1, self.__done, {}

            if not self.__is_move_possible():
                self.__done = True
                return self.__state, 0, self.__done, {}

            self.__current_player *= -1

        return self.__state, 0, self.__done, {}

    def render(self, mode='human'):
        symbols = {0: ".", 1: "X", -1: "O"}
        for row in self.__state:
            print(" ".join(symbols[cell] for cell in row))
        print()

    def __check_winner(self, player):
        return (
            any(np.all(self.__state[row, :] == player) for row in range(3)) or
            any(np.all(self.__state[:, col] == player) for col in range(3)) or
            np.all(np.diag(self.__state) == player) or
            np.all(np.diag(np.fliplr(self.__state)) == player)
        )

    def __is_move_possible(self):
        return np.any(self.__state == 0)

    def __machine_move(self):
        empty_cells = np.argwhere(self.__state == 0)
        if empty_cells.size > 0:
            row, col = random.choice(empty_cells)
            self.__state[row, col] = self.__current_player

env = TicTacToeEnv()
obs = env.reset()

done = False
while not done:
    env.render()
    action = env.action_space.sample() 
    obs, reward, done, _ = env.step(action)

env.render()
print(f"Итоговая награда: {reward}")
N = 10000
wins = 0

env = TicTacToeEnv()
for _ in range(N):
    obs = env.reset()

    done = False
    while not done:
        action = env.action_space.sample() 
        obs, reward, done, _ = env.step(action)
    if reward > 0:
        wins += 1

win_ratio = wins / N
win_ratio
# Markdown:
<p class="task" id="4"></p>

4\. Предложите стратегию (в виде алгоритма без использования методов машинного обучения), которая позволит, в среднем, выигрывать в крестики-нолики чаще, чем случайный выбор действия. Реализуйте эту стратегию и сыграйте игру, выбирая действия согласно этой стратегии. Выведите на экран состояние окружения после каждого хода и итоговую награду пользователя за сессию.

- [ ] Проверено на семинаре
def get_available_actions(state):
    return list(zip(*np.where(state == 0)))

def check_winner(state, player):
    for i in range(3):
        if np.all(state[i, :] == player):
            return True
        if np.all(state[:, i] == player):
            return True
    if np.all(np.diag(state) == player):
        return True
    if np.all(np.diag(np.fliplr(state)) == player):
        return True
    return False

def choose_action(state, player=1):
    for action in get_available_actions(state):
        row, col = action
        temp_state = state.copy()
        temp_state[row, col] = player
        if check_winner(temp_state, player):
            return action

    opponent = -player
    for action in get_available_actions(state):
        row, col = action
        temp_state = state.copy()
        temp_state[row, col] = opponent
        if check_winner(temp_state, opponent):
            return action

    if state[1, 1] == 0:
        return (1, 1)

    corners = [(0,0), (0,2), (2,0), (2,2)]
    available_corners = [corner for corner in corners if state[corner] == 0]
    if available_corners:
        return random.choice(available_corners)

    sides = [(0,1), (1,0), (1,2), (2,1)]
    available_sides = [side for side in sides if state[side] == 0]
    if available_sides:
        return random.choice(available_sides)

    return None
N = 10000
wins = 0
env = TicTacToeEnv()

for _ in range(N):
    obs = env.reset()
    done = False
    while not done:
        action = choose_action(obs, player=1)
        if action is None:
            _, reward, done, _ = env.step((-1, -1)) 
            break

        obs, reward, done, _ = env.step(action)

        if done:
            if reward > 0:
                wins += 1
            break

win_ratio = wins / N
win_ratio
# Markdown:
<p class="task" id="5"></p>

5\. Создайте окружение `MountainCar-v0`. Проиграйте 10 эпизодов и сохраните на диск файл с записью каждого пятого эпизода. Для записи видео воспользуйтесь обёрткой `RecordVideo`. Вставьте скриншот, на котором видно, что файлы были созданы.

- [ ] Проверено на семинаре
env = gym.make('MountainCar-v0', render_mode='rgb_array')
env = RecordVideo(env, video_folder='mountaincar_videos', episode_trigger=lambda x: x % 5 == 0)
for episode in range(1, 11):
    obs, _ = env.reset()
    done = False
    while not done:
        action = env.action_space.sample()
        obs, reward, done, truncated, info = env.step(action)
        done = done or truncated
env.close()
# Markdown:
![image.png](attachment:image.png)


# 08_2_q_learning (1).ipynb
# Markdown:


__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Саттон Р.	С.,	Барто Э. Дж. Обучение с подкреплением: Введение. 2-е изд.
* https://gymnasium.farama.org/tutorials/training_agents/blackjack_tutorial/
* https://en.wikipedia.org/wiki/Q-learning
* https://www.baeldung.com/cs/epsilon-greedy-q-learning
* https://pythonprogramming.net/q-learning-reinforcement-learning-python-tutorial/
* https://www.datacamp.com/tutorial/introduction-q-learning-beginner-tutorial
* https://rubikscode.net/2021/07/20/introduction-to-double-q-learning/
* https://gymnasium.farama.org/api/wrappers/misc_wrappers/
# Markdown:

# Markdown:
1\. Рассмотрите понятие Q-функции, ее применение для формирования политики агента и способов ее создания.
import numpy as np

states = [0, 1, 2]
actions = [0, 1]

q_table = np.random.uniform(0, 100, size=(len(states), len(actions)))
current_state = 0
q_table[current_state]
q_table[current_state].argmax()
states = np.array([
    [0.5, 0.7],
    [1.2, 0.3],
    [-5.2, 0.1],
    [2.0, -3.0],
])

actions = [0, 1]
np.digitize(states[:, 0], bins=[-6, 0, 2])
np.digitize(states, bins=[-6, 0, 4])
import numpy as np

actions = [0, 1]

q_table = np.random.uniform(0, 100, size=(3, 3, len(actions)))
state = [1, 1]
q_table[1, 1]
# Markdown:

# Markdown:
<p class="task" id="1"></p>

1\. Обучите агента для игры в блэкджек (окружение `Blackjack-v1`), используя алгоритм Q-learning. Для создания таблицы Q-функции выясните размеры пространства состояния игры и количество возможных действий игрока и выведите эти значения на экран. Во время обучения несколько раз вычислите статистику за `print_every` последних эпизодов: количество выигранных и проигранных сессий. После завершения обучения визуализируйте полученные данные. Изучите, как выглядит Q-функция (в каких состояниях игрок будет брать карту, в каких - нет). Cыграйте `N=10000` игр, применяя стратегию, выведенную из обученной Q-функции, посчитайте и выведите на экран долю выигранных игр.

Cтратегия для выбора действия:
$$a_{t+1}(s_t) = argmax_aQ(s_t, a)$$

Правило обновления Q-функции:

![q-learning](https://wikimedia.org/api/rest_v1/media/math/render/svg/d247db9eaad4bd343e7882ec546bf3847ebd36d8)

- [x] Проверено на семинаре
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm

env = gym.make('Blackjack-v1')

env.observation_space[0].n, env.observation_space[1].n, env.observation_space[2].n, env.action_space
from dataclasses import dataclass

@dataclass
class Config:
    discount: float = 0.95
    lr: float = 0.005
    n_episodes: float = 100_000
    print_every: int = 5000
class Agent:
    def __init__(self, env: gym.Env, config: Config) -> None:
        self.env = env
        self.cfg = config
        self._create_q_table()

    def _create_q_table(self):
        self.q_table = np.zeros((32, 11, 2, 2))

    def get_action(self, state: np.ndarray | tuple) -> int:
        player_sum, dealer_card, usable_ace = state
        q_values = self.q_table[player_sum, dealer_card, usable_ace]
        max_actions = np.flatnonzero(q_values == q_values.max())
        action = np.random.choice(max_actions)
        return action

    def update_q_table(
        self,
        state: np.ndarray | tuple,
        new_state: np.ndarray | tuple,
        reward: float, action: int,
        done: bool
    ) -> None:
        player_sum, dealer_card, usable_ace = state
        current_q_value = self.q_table[player_sum, dealer_card, usable_ace, action]     
        if done:
            future_q_value = 0
        else:
            new_player_sum, new_dealer_card, new_usable_ace = new_state
            future_q_value = np.max(self.q_table[new_player_sum, new_dealer_card, new_usable_ace])
        self.q_table[player_sum, dealer_card, usable_ace, action] = (
            (1 - self.cfg.lr) * current_q_value
            + self.cfg.lr * (reward + self.cfg.discount * future_q_value)
        )

    def run_episode(self) -> float:
        done = False
        state, info = self.env.reset()
        while not done:
            action = self.get_action(state)
            new_state, reward, terminated, truncated, info = self.env.step(action)
            done = terminated or truncated
            self.update_q_table(state, new_state, reward, action, done)
            state = new_state

            if done:
                return reward


def train(self):
    ep_rewards = []  
    stats = []  

    wins = 0
    losses = 0
    draws = 0

    for ep in tqdm(range(self.cfg.n_episodes)):
        
        reward = self.run_episode()
        ep_rewards.append(reward)

        if reward > 0:
            wins += 1
        elif reward < 0:
            losses += 1
        else:
            draws += 1
        
        if (ep + 1) % self.cfg.print_every == 0:
            stats.append({
                "episode": ep + 1,
                "wins": wins,
                "losses": losses,
                "draws": draws,
                "win_rate": wins / self.cfg.print_every
            })
            print(stats[-1])
            wins = 0
            losses = 0
            draws = 0

    return ep_rewards, stats
env = gym.make('Blackjack-v1')

config = Config(
    discount=0.95,
    lr=0.005,
    n_episodes=10_000,
    print_every=1000
)

agent = Agent(env, config)

ep_rewards, stats = train(agent)
win_rates = [stat.get("win_rate", 0) for stat in stats]
losses_rates = [stat.get("losses", 0) / agent.cfg.print_every for stat in stats]
losses_rates = [stat.get("losses", 0) / agent.cfg.print_every for stat in stats]
draws_rates = [stat.get("draws", 0) / agent.cfg.print_every for stat in stats]

episodes = [stat.get("episode", 0) for stat in stats]

plt.ylim(0, 1)
plt.xlabel("Игры")
plt.ylabel("Процент")
plt.plot(episodes, win_rates, label="Win Rate")
plt.plot(episodes, losses_rates, label="Loss Rate")
plt.plot(episodes, draws_rates, label="Draw Rate")
plt.legend()
plt.show()
# Markdown:
<p class="task" id="2"></p>

2\. Повторите решение предыдущей задачи, используя алгоритм $\epsilon$-greedy Q-learning. Исследуйте, как гиперпараметры и способ инициализации значений Q-функции влияют на результат.

Cтратегия для выбора действия:
1. Сгенерировать число $p$ из $U(0, 1)$;
2. Если $p < \epsilon$, то выбрать действие случайным образом;
3. В противном случае $a_{t+1}(s_t) = argmax_aQ(s_t, a)$.

Правило обновления Q-функции:
![q-learning](https://wikimedia.org/api/rest_v1/media/math/render/svg/d247db9eaad4bd343e7882ec546bf3847ebd36d8)

- [x] Проверено на семинаре
from dataclasses import dataclass

@dataclass
class Config:
    discount: float = 0.95
    lr: float = 0.005
    n_episodes: float = 100_000
    epsilon: float = 1.0
    final_epsilon: float = 0.3
    print_every: int = 5000
class Agent:
    def __init__(self, env: gym.Env, config: Config, how) -> None:
        self.env = env
        self.cfg = config
        self._create_q_table(how)

    def _create_q_table(self, how):
        if how == 'random':
            self.q_table = np.random.uniform(low=0, high=1, size=(32, 11, 2, 2))
        else:
            self.q_table = np.zeros((32, 11, 2, 2))
        

    def get_action(self, state: np.ndarray | tuple) -> int:
        p = np.random.rand()
        if p < self.cfg.epsilon:
            return self.env.action_space.sample()
        player_sum, dealer_card, usable_ace = state
        q_values = self.q_table[player_sum, dealer_card, usable_ace]
        return np.argmax(q_values)
    
    def update_q_table(
        self,
        state: np.ndarray | tuple,
        new_state: np.ndarray | tuple,
        reward: float, action: int,
        done: bool
    ) -> None:
        player_sum, dealer_card, usable_ace = state
        current_q_value = self.q_table[player_sum, dealer_card, usable_ace, action]     
        if done:
            future_q_value = 0
        else:
            new_player_sum, new_dealer_card, new_usable_ace = new_state
            future_q_value = np.max(self.q_table[new_player_sum, new_dealer_card, new_usable_ace])
        self.q_table[player_sum, dealer_card, usable_ace, action] = (
            (1 - self.cfg.lr) * current_q_value
            + self.cfg.lr * (reward + self.cfg.discount * future_q_value)
        )

    def run_episode(self) -> float:
        done = False
        state, info = self.env.reset()
        while not done:
            action = self.get_action(state)
            new_state, reward, terminated, truncated, info = self.env.step(action)
            done = terminated or truncated
            self.update_q_table(state, new_state, reward, action, done)
            state = new_state

            if done:
                return reward


def train(self):
    ep_rewards = []  
    stats = []  

    wins = 0
    losses = 0
    draws = 0

    self.cfg.epsilon_decay = self.cfg.epsilon / self.cfg.n_episodes
    for ep in tqdm(range(self.cfg.n_episodes)):
        reward = self.run_episode()
        ep_rewards.append(reward)

        if reward > 0:
            wins += 1
        elif reward < 0:
            losses += 1
        else:
            draws += 1
        
        self.cfg.epsilon -= self.cfg.epsilon_decay

        if (ep + 1) % self.cfg.print_every == 0:
            stats.append({
                "episode": ep + 1,
                "wins": wins,
                "losses": losses,
                "draws": draws,
                "win_rate": wins / self.cfg.print_every
            })
            print(stats[-1])
            wins = 0
            losses = 0
            draws = 0

    return ep_rewards, stats
env = gym.make('Blackjack-v1')

config = Config(
    discount = 0.95,
    lr = 0.005,
    n_episodes = 10_000,
    epsilon = 1.0,
    final_epsilon = 0.3,
    print_every = 1000
)

agent = Agent(env, config, how='zeros')

ep_rewards, stats = train(agent)
env = gym.make('Blackjack-v1')

config = Config(
    discount = 0.95,
    lr = 0.005,
    n_episodes = 10_000,
    epsilon = 1.0,
    final_epsilon = 0.3,
    print_every = 1000
)

agent = Agent(env, config, how='random')


ep_rewards, stats = train(agent)
env = gym.make('Blackjack-v1')

config = Config(
    discount = 0.95,
    lr = 0.05,
    n_episodes = 10_000,
    epsilon = 0.3,
    final_epsilon = 0.05,
    print_every = 1000
)

agent = Agent(env, config, how='zeros')


ep_rewards, stats = train(agent)

win_rates = [stat.get("win_rate", 0) for stat in stats]
losses_rates = [stat.get("losses", 0) / agent.cfg.print_every for stat in stats]
losses_rates = [stat.get("losses", 0) / agent.cfg.print_every for stat in stats]
draws_rates = [stat.get("draws", 0) / agent.cfg.print_every for stat in stats]

episodes = [stat.get("episode", 0) for stat in stats]

plt.ylim(0, 1)
plt.xlabel("Игры")
plt.ylabel("Процент")
plt.plot(episodes, win_rates, label="Win Rate")
plt.plot(episodes, losses_rates, label="Loss Rate")
plt.plot(episodes, draws_rates, label="Draw Rate")
plt.legend()
plt.show()


# 08_3_dqn (1).ipynb
# Markdown:
#  DQN

__Автор задач: Блохин Н.В. (NVBlokhin@fa.ru)__

Материалы:
* Саттон Р.	С.,	Барто Э. Дж. Обучение с подкреплением: Введение. 2-е изд.
* https://en.wikipedia.org/wiki/Q-learning
* https://pythonprogramming.net/q-learning-reinforcement-learning-python-tutorial/
* https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
* https://github.com/pylSER/Deep-Reinforcement-learning-Mountain-Car/tree/master
* https://valohai.com/blog/reinforcement-learning-tutorial-basic-deep-q-learning/
# Markdown:
## Задачи для совместного разбора
# Markdown:
1\. Обсудите основные отличия DQN от классических вариантов Q-learning.






# Markdown:
## Задачи для самостоятельного решения
# Markdown:
<p class="task" id="1"></p>

1\. Допишите класс `ReplayMemory` для хранения переходов между состояниями.

- [ ] Проверено на семинаре
import torch as th
import torch.optim as optim
import torch.nn as nn
import matplotlib.pyplot as plt
from dataclasses import dataclass
import gymnasium as gym
from gymnasium.wrappers import TransformObservation, RecordVideo
from ipywidgets import Video
from collections import namedtuple, deque
import random
import shutil
import os

device = th.device('cpu')
device
Transition = namedtuple(
    'Transition',
    ('state', 'action', 'next_state', 'reward', 'done')
)

class ReplayMemory(object):
    def __init__(self, capacity):
        """capacity - максимальный размер хранилища"""
        self.capacity = capacity
        self.memory = deque(maxlen=capacity)

    def push(self, *args):
        """Сохраняет переход. При нехватке места в хранилище самые старые записи удаляются."""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        """Возвращает batch_size случайно выбранных переходов"""
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
# Markdown:
<p class="task" id="2"></p>

2\. Допишите класс `DQN` для моделирования Q-функции.

- [ ] Проверено на семинаре
class DQN(nn.Module):
    """Нейронная сеть для моделирования Q-функции."""
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        
        self.model = nn.Sequential(
            nn.Linear(n_observations, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions) 
        )

    def forward(self, x):
        """Для каждого состояния должны получать n_actions чисел."""
        return self.model(x)
# Markdown:
<p class="task" id="3"></p>

3\. Допишите классы `PolicyConfig` для настроек политики агента и `Policy` для реализации политики.

- [ ] Проверено на семинаре
@dataclass
class PolicyConfig:
    """Содержит настройки для Policy: размерность пространства наблюдений, кол-во действий,
    устройство, на котором будет располагаться модели; ε и т.д."""
    n_observations: int
    n_actions: int
    device: th.device
    epsilon_start: float = 1.0
    epsilon_end: float = 0.05
    epsilon_decay: int = 5000  
class Policy:
    def __init__(self, policy_cfg: PolicyConfig):
        self.policy_network = DQN(policy_cfg.n_observations, policy_cfg.n_actions).to(policy_cfg.device)
        self.target_network = DQN(policy_cfg.n_observations, policy_cfg.n_actions).to(policy_cfg.device)
        self.sync_models()

        self.policy_cfg = policy_cfg
        self.epsilon = policy_cfg.epsilon_start
        self.epsilon_start = policy_cfg.epsilon_start
        self.epsilon_end = policy_cfg.epsilon_end
        self.epsilon_decay = policy_cfg.epsilon_decay
        self.n_actions = policy_cfg.n_actions
        self.device = policy_cfg.device
        self.steps_done = 0
        self.best_max_x = float('-inf')

    def sync_models(self):
        self.target_network.load_state_dict(self.policy_network.state_dict())

    def get_best_action(self, state: th.Tensor) -> int:
        sample = random.random()
        self.steps_done += 1

        self.epsilon = max(
            self.epsilon_end,
            self.epsilon_start - (self.epsilon_start - self.epsilon_end) * self.steps_done / self.epsilon_decay
        )
        if sample < self.epsilon:
            return random.randrange(self.n_actions)
        else:
            with th.no_grad():
                return self.policy_network(state).argmax(dim=1).item()
        
    def save(self, path_policy="results/policy_network.pth", path_target="results/target_network.pth"):
        th.save(self.policy_network.state_dict(), path_policy)
        th.save(self.target_network.state_dict(), path_target)

    def load(self, path_policy="results/policy_network.pth", path_target="results/target_network.pth"):
        self.policy_network.load_state_dict(th.load(path_policy))
        self.target_network.load_state_dict(th.load(path_target))
# Markdown:
<p class="task" id="4"></p>

4\. Напишите функцию `plot_metrics`, которая будет использоваться для визуализации процесса обучения: суммарной награды за каждый эпизод и максимальное значение x-координаты машины за эпизод. Для реализации можете воспользоваться `wandb` или любым другим удобным инструментом.

- [ ] Проверено на семинаре
def plot_metrics(episode_rewards, episode_max_x, window=100):
    def moving_average(data, window):
        return [sum(data[i:i+window]) / window for i in range(len(data) - window + 1)]

    smoothed_rewards = moving_average(episode_rewards, window)
    smoothed_max_x = moving_average(episode_max_x, window)

    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(range(len(episode_rewards)), episode_rewards, label="Суммарная награда", alpha=0.5)
    plt.plot(range(window - 1, len(smoothed_rewards) + window - 1), smoothed_rewards, label="Скользящее среднее", color='orange')
    plt.xlabel("Эпизоды")
    plt.ylabel("Суммарная награда")
    plt.title("Награда за эпизод")
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(range(len(episode_max_x)), episode_max_x, label="Макс. x-координата", alpha=0.5)
    plt.plot(range(window - 1, len(smoothed_max_x) + window - 1), smoothed_max_x, label="Скользящее среднее", color='orange')
    plt.xlabel("Эпизоды")
    plt.ylabel("Макс. x-координата")
    plt.title("Максимальная x-координата за эпизод")
    plt.legend()

    plt.tight_layout()
    plt.show()
# Markdown:
<p class="task" id="5"></p>

5\. Допишите классы `TrainConfig` для настроек обучения и `Trainer` для реализации процесса обучения.

- [ ] Проверено на семинаре
@dataclass
class TrainConfig:
    """Содержит настройки для процесса обучения: к-т дисконтирования, скорость обучения,
    количество эпизодов для обучения, размер батча и т.д."""
    gamma: float = 0.99
    learning_rate: float = 1e-3
    num_episodes: int = 5000
    batch_size: int = 64
    target_sync: int = 100  
    memory_capacity: int = 10000
    max_steps_per_episode: int = 200
class Trainer:
    def __init__(self, env: gym.Env, train_config: TrainConfig, policy: Policy):
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(policy.policy_network.parameters(), lr=train_config.learning_rate)

        self.memory = ReplayMemory(train_config.memory_capacity)
        self.env = env
        self.train_config = train_config
        self.policy = policy
        self.episode_rewards = []
        self.max_x_positions = []
        self.best_max_x = float('-inf')


    def train(self):
        metrics = {
            "episode_rewards": [],
            "episode_max_x": []
        }

        for episode in range(1, self.train_config.num_episodes + 1):
            state, _ = self.env.reset()
            state = th.tensor(state, dtype=th.float32, device=self.policy.device).unsqueeze(0)

            episode_reward, max_x = self.run_episode(state)

            metrics["episode_rewards"].append(episode_reward)
            metrics["episode_max_x"].append(max_x)

            if max_x > self.best_max_x:
                self.best_max_x = max_x
                self.policy.save()

            if episode % 100 == 0:
                print(f"Episode {episode}: Reward = {episode_reward}, Max X = {max_x}")

            if episode % self.train_config.target_sync == 0:
                self.policy.sync_models()

        plot_metrics(metrics["episode_rewards"], metrics["episode_max_x"])

    def run_episode(self, start_state: th.Tensor):
        state = start_state
        episode_reward = 0
        max_x = -float('inf')

        for _ in range(self.train_config.max_steps_per_episode):
            action = self.policy.get_best_action(state)
            next_state, reward, done, truncated, _ = self.env.step(action)
            
            reward += max(0, next_state[0] - state[0][0].item()) * 10
            if next_state[0] >= 0.5:
                reward += 100.0

            next_state_tensor = th.tensor(next_state, dtype=th.float32, device=self.policy.device).unsqueeze(0)
            position = next_state_tensor[0, 0].item()
            max_x = max(max_x, position)

            done_flag = done or truncated

            self.memory.push(state, action, next_state_tensor, reward, done_flag)

            state = next_state_tensor
            episode_reward += reward

            self.generate_batch_and_fit()

            if done_flag:
                break

        return episode_reward, max_x    
    
    def generate_batch_and_fit(self):
        if len(self.memory) < self.train_config.batch_size:
            return

        transitions = self.memory.sample(self.train_config.batch_size)
        batch = Transition(*zip(*transitions))

        state_batch = th.cat(batch.state).to(self.policy.device)
        action_batch = th.tensor(batch.action, dtype=th.long, device=self.policy.device).unsqueeze(1)
        reward_batch = th.tensor(batch.reward, dtype=th.float32, device=self.policy.device).unsqueeze(1)
        next_state_batch = th.cat(batch.next_state).to(self.policy.device)
        done_batch = th.tensor(batch.done, dtype=th.float32, device=self.policy.device).unsqueeze(1)

        current_q_values = self.policy.policy_network(state_batch).gather(1, action_batch)

        with th.no_grad():
            next_q_values = self.policy.target_network(next_state_batch).max(1)[0].unsqueeze(1)
        expected_q_values = reward_batch + (self.train_config.gamma * next_q_values * (1 - done_batch))

        self.fit_policy_network(current_q_values, expected_q_values)

    def fit_policy_network(self, X, y):
        self.optimizer.zero_grad()
        loss = self.criterion(X, y)
        loss.backward()
        self.optimizer.step()
# Markdown:
<p class="task" id="6"></p>

6\. Настройте модель для управления машиной в окружении `MountainCar-v0`. Для преобразования векторов состояний в тензоры используйте обертку `TransformObservation`. Выведите на экран график с информацией о процессе обучения. При необходимости вставьте скриншоты этих графиков.

- [ ] Проверено на семинаре
def clear_folder(folder_path):
    shutil.rmtree(folder_path, ignore_errors=True) 
    os.makedirs(folder_path, exist_ok=True)

clear_folder("results/mountaincar")
env = gym.make("MountainCar-v0", render_mode='rgb_array', max_episode_steps=500)
env = RecordVideo(env, video_folder="results/mountaincar", episode_trigger=lambda x: x % 100 == 0) 
env = TransformObservation(env, lambda obs: th.tensor(obs, dtype=th.float32), env.observation_space)


n_observations = env.observation_space.shape[0]
n_actions = env.action_space.n

train_config = TrainConfig(
        gamma=0.99,
        learning_rate=1e-3,
        num_episodes=5000,
        batch_size=64,
        target_sync=100,
        memory_capacity=100000,
        max_steps_per_episode=500 
    )


policy_config = PolicyConfig(
    n_observations=n_observations,
    n_actions=n_actions,  
    device=device,
    epsilon_start=1.0,
    epsilon_end=0.05,
    epsilon_decay=5000 
)

policy = Policy(policy_cfg=policy_config)
trainer = Trainer(env=env, train_config=train_config, policy=policy)
trainer.train()
env.close()
Video.from_file('results/mountaincar/rl-video-episode-4900.mp4')

