# t-1_Аль-Натор.ipynb
# Markdown:
# Алгоритмы и структуры данных в языке Python

02.09.2024, 11:50 - 13:20, 3-я пара<br>
В4/ауд. 3410 (кк) (4-й Вешняковский проезд, 4)<br>
ПМ22-2<br>
Семинар<br>
# Markdown:
# Параллельные вычисления
# Markdown:
Материалы:
* https://disk.yandex.ru/d/L0yk0mc0fYDtmA
* https://superfastpython.com/multiprocessing-pool-python/
* https://docs.python.org/3/library/multiprocessing.html
# Markdown:
### Пример 1

Создадим простой процесс и выполним его.
import time, multiprocessing
def do_smth():
    print('Задержка на 1 секунду ...')
    time.sleep(1)
    print('Конец задержки')
start = time.perf_counter()

do_smth()

finish = time.perf_counter()

print(f'Выполнено за {round(finish - start, 2)} сек.')
# Markdown:
Выполним этот процесс последовательно дважды.


start = time.perf_counter()

do_smth()
do_smth()

finish = time.perf_counter()

print(f'Выполнено за {round(finish - start, 2)} сек.')
# Markdown:
Время последовательного выполнения процессов увеличилось пропорционально их числу. Теперь выполним эти процессы параллельно. Для этого используем библиотеку *multiprocessing*.


import multiprocessing

start = time.perf_counter()

if __name__ == '__main__':
    p1 = multiprocessing.Process(target=do_smth)
    p2 = multiprocessing.Process(target=do_smth)

    p1.start()
    p2.start()

    p1.join()
    p2.join()

finish = time.perf_counter()

print(f'Выполнено за {round(finish - start, 2)} сек.')
# Markdown:
Мы увидели, что время уменьшилось. Сейчас процессов всего два, что если у нас будет на порядок больше процессов?
import multiprocessing
import time
start = time.perf_counter()

processes = []

for _ in range(20):
   p = multiprocessing.Process(target=do_smth)
   p.start()
   processes.append(p)

for process in processes:
   process.join()

finish = time.perf_counter()

print(f'Выполнено за {round(finish - start, 2)} сек.')
# Markdown:
Модернизируем нашу программу. Пусть время задержки станет параметром.
def do_smth(seconds):
    print('Задержка на {seconds} сек. ...')
    time.sleep(1)
    print('Конец задержки')
start = time.perf_counter()

if __name__ == '__main__':

    processes = []

    for _ in range(20):
        p = multiprocessing.Process(target=do_smth, args=[1.5])
        p.start()
        processes.append(p)

    for process in processes:
        process.join()

finish = time.perf_counter()

print(f'Выполнено за {round(finish - start, 2)} сек.')
# Markdown:
### Пример 2
%%file rand_string_.py

import random, string

def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put(rand_str)
import rand_string_
import random, string
import multiprocessing as mp

random.seed(123)

# Define an output queue
output = mp.Queue()

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string_.rand_string, args=(5, output)) \
             for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit (wait exit) the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)
# Markdown:
## Пул процессов

Другой подход - использование класса *Pool*.
# Markdown:
Пул процессов — это шаблон программирования для автоматического управления пулом рабочих процессов.

Пул отвечает за фиксированное количество процессов.

- контролирует, когда они создаются, например, когда они нужны.
- также контролирует, что они должны делать, когда они не используются, например, заставляя их ждать, не потребляя вычислительные ресурсы.
- может предоставлять универсальный интерфейс для выполнения специальных задач с переменным количеством аргументов, во многом похожий на свойство target объекта Process, но не требует, чтобы мы выбирали процесс для выполнения задачи, запускали процесс или ждали завершения задачи.

Python предоставляет пул процессов через класс *multiprocessing.Pool*.

Чтобы использовать пул процессов, необходимо сначала создать и настроить экземпляр класса.

```python
# создание пула процессов
pool = multiprocessing.pool.Pool(...)
```

После настройки можно отправлять задачи в пул для выполнения с использованием *apply()* и *map()*.

```python
# отправка задач на выполнение
results = pool.map(task, items)
```

После того, как мы закончим работу с пулом процессов, его можно закрыть, а используемые пулом ресурсы освободить.

```python
# завершение пула процессов
pool.close()
```
# Markdown:
В жизненном цикле использования класса multiprocessing.Pool есть четыре основных шага: создание, отправка, ожидание и завершение работы.

1. Создание: создание пула процессов путем вызова конструктора multiprocessing.Pool().
<br>
2. Отправка: отправка задач синхронно или асинхронно.

    * 2a. Отправка задач синхронно
    * 2b. Отправка задач асинхронно
<br>
3. Ожидание: ожидание и получение результатов по мере завершения задач (необязательно).

    * 3a. Ожидание завершения объектов AsyncResult
    * 3b. Ожидание результата объектов AsyncResult
<br>
4. Завершение работы: завершение работы пула процессов путем вызова shutdown().

    * 4a. Автоматическое завершение работы с помощью диспетчера контекста
# Markdown:

# Markdown:
**Шаг 1. Создание пула процессов**

```python
pool = multiprocessing.Pool()
```

Такой код создаст пул процессов, который будет использовать количество рабочих процессов, соответствующее количеству логических ядер ЦП в вашей системе.

<br>

**Шаг 2. Отправка задач**

**2а. Синхронная отправка задач**

Синхронная выдача задач означает, что вызывающая сторона будет заблокирована до тех пор, пока выданная задача или задачи не будут завершены. Реализация синхронной отправки задач возможна при помощи функций apply(), map() и starmap().

Функция *apply()* принимает имя функции, которую должен выполнить рабочий процесс. Вызов будет заблокирован до тех пор, пока функция не будет выполнена рабочим процессом, после чего произойдет возврат.

```python
# отправка задачи в пул процессов
pool.apply(task)
```

Пул процессов предоставляет параллельную версию встроенной функции map() для выдачи задач.

```python
# iterates return values from the issued tasks
for result in map(task, items):
    # ...
```

Функция *starmap()* такая же, как и параллельная версия функции *map()*, за исключением того, что она позволяет каждому вызову функции принимать несколько аргументов. В частности, она принимает итерируемый объект, где каждый элемент является итерируемым объектом аргументов для целевой функции.

```python
# iterates return values from the issued tasks
for result in starmap(task, items):
    # ...
```

**2b. Асинхронная отправка задач**

Асинхронная выдача задач пулу процессов означает, что вызывающий не будет блокироваться, что позволяет вызывающему продолжать выполнять другую работу, пока выполняются задачи.

Неблокирующие вызовы для выдачи задач пулу процессов возвращаются немедленно и предоставляют хук или механизм для проверки состояния задач и получения результатов позже. Вызывающий может выдавать задачи и продолжать работу с программой.

Неблокирующие вызовы пула процессов включают apply_async(), map_async() и starmap_async().

```python
...
# issue tasks to the process pool asynchronously
result = map_async(task, items)

...
# iterates results as tasks are completed in order
for result in imap(task, items):
    # ...

...
# iterates results as tasks are completed, in the order they are completed
for result in imap_unordered(task, items):
    # ...
```

** Шаг 3. Ожидание (необязательный)

Объект AsyncResult возвращается при выдаче задач в multiprocessing.Pool асинхронно. Этого можно добиться с помощью любого из следующих методов в пуле процессов:

Pool.apply_async() для выдачи одной задачи.

Pool.map_async() для выдачи нескольких задач.

Pool.starmap_async() для выдачи нескольких задач, которые принимают несколько аргументов.

AsyncResult предоставляет дескриптор для одной или нескольких выданных задач.

Он позволяет вызывающему проверять статус выданных задач, ждать завершения задач и получать результаты после их завершения.

Нам не нужно использовать возвращаемый AsyncResult, например, если выданные задачи не возвращают значения, и нас не интересует, когда задачи завершатся или будут ли они выполнены успешно. Вот почему этот шаг в жизненном цикле является необязательным.

**Шаг 4. Завершение работы пула процессов**

Multiprocessing.Pool можно закрыть, как только у нас не останется больше задач для выдачи.

Есть два способа завершить работу пула процессов.

Это:

Вызов Pool.close().

Вызов Pool.terminate().
Функция close() немедленно вернется, и пул не будет принимать никаких дальнейших задач.

```python
...
# close the process pool
pool.close()
```

В качестве альтернативы мы можем захотеть принудительно завершить все дочерние рабочие процессы, независимо от того, выполняют ли они задачи или нет.

Это можно сделать с помощью функции terminate().

```python
...
# forcefully close all worker processes
pool.terminate()
```

Затем мы можем захотеть дождаться завершения всех задач в пуле. Этого можно добиться, вызвав функцию join() в пуле.

Например:
```python
# wait for all issued tasks to complete
pool.join()
```
# Markdown:
## Задачи
# Markdown:
1. Посчитайте, сколько раз встречается каждый из символов (заглавные и строчные символы не различаются) в файле

`Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt`

и в файле

`Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt`
from collections import Counter

def count(file, output=None):
    with open(file, 'r', encoding='cp1251') as f:
        data = ''.join(f.readlines()).lower()
    counts = Counter(data)
    if output:
        output.put(counts)
    return counts

start = time.perf_counter()

file1 = './01_multiprocessing_data/Dostoevskiy Fedor. Prestuplenie i nakazanie - BooksCafe.Net.txt'
file2 = './01_multiprocessing_data/Dostoevskiy Fedor. Igrok - BooksCafe.Net.txt'
c1 = count(file1)
c2 = count(file2)

finish = time.perf_counter()
print(f'Выполнено за {round(finish - start, 2)} сек.')
c1.most_common(5), c2.most_common(5)
# Markdown:
2. Решить задачу 1, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создать свой собственный процесс.
%%file read_book.py

from collections import Counter

def count(file, output):
    with open(file, 'r', encoding='cp1251') as f:
        data = ''.join(f.readlines()).lower()
    counts = Counter(data)
    output.put(counts)
import read_book
import multiprocessing as mp

random.seed(123)

output = mp.Queue()

processes = [mp.Process(target=read_book.count, args=([file1, file2][i], output)) for i in range(2)]

for p in processes:
    p.start()

for p in processes:
    p.join()

results = [output.get() for p in processes]

results[0].most_common(5), results[1].most_common(5)
# Markdown:
## Лабораторная работа 10
import pandas as pd
import multiprocessing as mp
import time
import csv
import os
from collections import Counter
from collections import defaultdict
# Markdown:
1. Разбейте файл `recipes_full.csv` на несколько (например, 8) примерно одинаковых по объему файлов c названиями `id_tag_nsteps_*.csv`. Каждый файл содержит 3 столбца: `id`, `tag` и `n_steps`, разделенных символом `;`. Для разбора строк используйте `csv.reader`.

__Важно__: вы не можете загружать в память весь файл сразу. Посмотреть на первые несколько строк файла вы можете, написав код, который считывает эти строки.

Подсказка: примерное кол-во строк в файле - 2.3 млн.

```
id;tag;n_steps
137739;60-minutes-or-less;11
137739;time-to-make;11
137739;course;11
```

with open('recipes_full.csv', 'r') as f:
    reader = csv.reader(f)
    total_rows = sum(1 for row in reader) - 1
    print(f'Количество строк: {total_rows}')
rows_per_file = total_rows // 8
remainder = total_rows % 8
rows_per_file, remainder
file_index = 0
row_count = 0
current_file = None
current_writer = None
with open('recipes_full.csv', 'r', newline='', encoding='utf-8') as f:
    reader = csv.reader(f, delimiter=',')
    header = next(reader)
    for row in reader:
        if current_file is None or row_count >= rows_per_file + (1 if file_index <= remainder else 0):
            if current_file:
                current_file.close()

            file_index += 1
            row_count = 0
            current_file = open(f'id_tag_nsteps_{file_index}.csv', 'w', newline='', encoding='utf-8')
            current_writer = csv.writer(current_file, delimiter=';')
            current_writer.writerow(['id', 'tag', 'n_steps'])

        current_writer.writerow([row[1], row[5], row[6]])
        row_count += 1

    if current_file:
        current_file.close()
file_prefix = 'id_tag_nsteps_'
files = [f for f in os.listdir('.') if f.startswith(file_prefix) and f.endswith('.csv')]

for file in sorted(files):
    with open(file, 'r', newline='', encoding='utf-8') as f:
        reader = csv.reader(f, delimiter=';')
        print(f'Файл {file} содержит {sum(1 for row in reader) - 1 } строк')
# Markdown:
2. Напишите функцию, которая принимает на вход название файла, созданного в результате решения задачи 1, считает среднее значение количества шагов для каждого тэга и возвращает результат в виде словаря.
def calculate_avg_steps(file_name):
    tag_steps = defaultdict(int)
    tag_count = defaultdict(int)

    with open(file_name, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile, delimiter=';')
        for row in reader:
            tags = row['tag'].strip('[]').replace("'", "").split(', ')
            n_steps = int(row['n_steps'])

            for tag in tags:
                tag_steps[tag] += n_steps
                tag_count[tag] += 1

    avg_steps = {tag: tag_steps[tag] / tag_count[tag] for tag in tag_steps}

    return avg_steps

calculate_avg_steps('id_tag_nsteps_3.csv')
# Markdown:
3. Напишите функцию, которая считает среднее значение количества шагов для каждого тэга по всем файлам, полученным в задаче 1, и возвращает результат в виде словаря. Не используйте параллельных вычислений. При реализации выделите функцию, которая объединяет результаты обработки отдельных файлов. Модифицируйте код из задачи 2 таким образом, чтобы иметь возможность получить результат, имея результаты обработки отдельных файлов. Определите, за какое время задача решается для всех файлов.

def process_file(file_name):
    tag_steps = defaultdict(int)
    tag_count = defaultdict(int)

    with open(file_name, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile, delimiter=';')
        for row in reader:
            tags = row['tag'].strip('[]').replace("'", "").split(', ')
            n_steps = int(row['n_steps'])

            for tag in tags:
                tag_steps[tag] += n_steps
                tag_count[tag] += 1

    return tag_steps, tag_count

def merge_results(total_steps, total_counts, file_steps, file_counts):
    for tag, steps in file_steps.items():
        total_steps[tag] += steps
        total_counts[tag] += file_counts[tag]

def calculate_avg_steps_all_files(file_list):
    total_steps = defaultdict(int)
    total_counts = defaultdict(int)

    for file_name in file_list:
        file_steps, file_counts = process_file(file_name)
        merge_results(total_steps, total_counts, file_steps, file_counts)

    avg_steps = {tag: total_steps[tag] / total_counts[tag] for tag in total_steps}

    return avg_steps

def main(file_list):
    start_time = time.time()
    result = calculate_avg_steps_all_files(file_list)
    end_time = time.time()
    elapsed_time = end_time - start_time

    print(f"Задача решена за {elapsed_time:.2f} секунд.")
    return result

result = main(files)
result
# Markdown:
4. Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Для обработки каждого файла создайте свой собственный процесс. Определите, за какое время задача решается для всех файлов.
%%file process_file.py

import csv
from collections import defaultdict

def process_file(file_path):
    steps_data = defaultdict(lambda: {'n_steps': 0, 'count': 0})

    with open(file_path, 'r', encoding='utf-8') as f:
        csv_data = csv.DictReader(f, delimiter=';')

        for record in csv_data:
            steps = int(record['n_steps'])
            tags = record['tag'].strip('[]').replace("'", "").split(', ')
            for tag in tags:
                steps_data[tag]['n_steps'] += steps
                steps_data[tag]['count'] += 1

    tag_steps = {tag: data['n_steps'] for tag, data in steps_data.items()}
    tag_count = {tag: data['count'] for tag, data in steps_data.items()}

    return tag_steps, tag_count
import process_file

def calculate_avg_steps_all_files_mp(file_list):
    with mp.Pool() as pool:
        results = pool.map(process_file.process_file, file_list)

    total_steps, total_counts = defaultdict(int), defaultdict(int)
    for file_steps, file_counts in results:
        for tag, steps in file_steps.items():
            total_steps[tag] += steps
            total_counts[tag] += file_counts[tag]

    avg_steps = {tag: total_steps[tag] / total_counts[tag] for tag in total_steps}
    return avg_steps
%timeit -o result = calculate_avg_steps_all_files_mp(files)
# Markdown:
5. (*) Решите задачу 3, распараллелив вычисления с помощью модуля `multiprocessing`. Создайте фиксированное количество процессов (равное половине количества ядер на компьютере). При помощи очереди передайте названия файлов для обработки процессам и при помощи другой очереди заберите от них ответы.
# Markdown:


# t-2_Аль-Натор.ipynb
# Markdown:
# Dask Array
# Markdown:
Материалы:
* Макрушин С.В. Лекция 11: Dask
* https://docs.dask.org/en/latest/array.html
* JESSE C. DANIEL. Data Science with Python and Dask.
# Markdown:
## Задачи для совместного разбора
import dask.array as da
import dask
import h5py
import numpy as np
import time
# Markdown:
1. Создайте массив размерностью 1000 на 300000, заполненный числами из стандартного нормального распределения. Исследуйте основные характеристики полученного массива.
da_array = da.random.normal(0, 1, size=(1000, 300_000), chunks=(1000 // 10, 300_000 // 7))

print(f"Mean: {da_array.mean().compute():.10f}")
print(f"Std: {da_array.std().compute()}")
print(f"Min: {da_array.min().compute()}")
print(f"Max: {da_array.max().compute()}")
# Markdown:
2. Посчитайте сумму квадратов элементов массива, созданного в задаче 1. Создайте массив `np.array` такого же размера и сравните скорость решения задачи с использование `da.array` и `np.array`
np_array = np.random.randn(1000, 300_000)

print('NumPy Time:')
%timeit np_result = np.sum(np.square(np_array))

print('Dask Time:')
%timeit da_result = da.sum(da.square(da_array)).compute()
# Markdown:
3. Визуализируйте граф вычислений для задачи 12.
dask_graph = da.square(da_array)
dask_graph.visualize()
# Markdown:
## Лабораторная работа 11
# Markdown:
1. Считайте датасет `recipe` из файла `minutes_n_ingredients_full.hdf5` в виде `dask.array`. Укажите аргумент `chunks=(100_000, 3)` при создании массива. Выведите на экран основную информацию о массиве.
data_f  = h5py.File('./11_dask_array_data/minutes_n_ingredients_full.hdf5', 'r')
list(data_f.keys())
data_set = data_f['/recipe']
recipe = da.from_array(data_set, chunks=(100_000, 3))
recipe
# Markdown:
2. Вычислите среднее значение по каждому столбцу, кроме первого.
mean_values = recipe[:, 1:].mean(axis=0).compute()
mean_values
# Markdown:
3. Исследуйте, как влияет значение аргумента `chunks` при создании `dask.array` на скорость выполнения операции поиска среднего.
chunk_sizes = [(100_000, 3), (500_000, 3), (1000_000, 3), (10_000_000)]
for chunks in chunk_sizes:
    recipe = da.from_array(data_set, chunks=chunks)
    print(f'Chunks: {chunks}\nTime:')
    %timeit recipe[:, 1:].mean(axis=0).compute()
# Markdown:
4. Выберите рецепты, время выполнения которых меньше медианного значения
median_time = da.percentile(recipe[:, 1], 50)
filtered_recipes = recipe[recipe[:, 1] < median_time]
filtered_recipes.compute()
# Markdown:
5. Посчитайте количество каждого из возможных значений кол-ва ингредиентов
ingredient_counts = da.bincount(recipe[:, 2]).compute()
value_counts = {i: count for i, count in enumerate(ingredient_counts)}
value_counts
# Markdown:
6. Найдите максимальную продолжительность рецепта. Ограничьте максимальную продолжительность рецептов сверху значением, равному 75% квантилю.
max_duration = recipe[:, 1].max().compute()
max_duration
quantile_75 = da.percentile(recipe[:, 1], 75).compute()
quantile_75
limited_duration_recipes = recipe[recipe[:, 1] <= quantile_75]
limited_duration_recipes.compute().shape
# Markdown:
7. Создайте массив `dask.array` из 2 чисел, содержащих ваши предпочтения относительно времени выполнения рецепта и кол-ва ингредиентов. Найдите наиболее похожий (в смысле $L_1$) рецепт из имеющихся в датасете.
preferences = da.from_array(np.array([130, 29]))

differences = da.abs(recipe[:, 1:] - preferences).sum(axis=1)
most_similar_recipe_index = da.argmin(differences)
most_similar_recipe = recipe[most_similar_recipe_index].compute()
most_similar_recipe
# Markdown:
8. Работая с исходным файлом в формате `hdf5`, реализуйте алгоритм подсчета среднего значения в блочной форме и вычислите с его помощью среднее значение второго столбца в массиве.

Блочный алгоритм вычислений состоит из двух частей:
1. Загрузка фрагмента за фрагментом данных по `blocksize` элементов и проведение вычислений на этим фрагментом
2. Агрегация результатов вычислений на различных фрагментах для получения результата на уровне всего набора данных

Важно: при работе с `h5py` в память загружаются не все элементы, а только те, которые запрашиваются в данный момент
def blockwise_mean(file_name, dataset_name, column_index, blocksize):
    with h5py.File(file_name, 'r') as f:
        dataset = f[dataset_name]
        total_sum = 0
        total_count = 0

        for i in range(0, dataset.shape[0], blocksize):
            data_block = dataset[i:i+blocksize, column_index]
            total_sum += data_block.sum()
            total_count += data_block.shape[0]

        mean_value = total_sum / total_count

    return mean_value

file_name = './11_dask_array_data/minutes_n_ingredients_full.hdf5'
dataset_name = '/recipe'
column_index = 1
blocksize = 100_000

print('Время без блочного алгоритма:')
%timeit recipe[:, column_index].mean().compute()

print('Время с блочным алгоритмом:')
%timeit blockwise_mean(file_name, dataset_name, column_index, blocksize)
blockwise_mean(file_name, dataset_name, column_index, blocksize)


# t-3.ipynb
import sqlite3
import xlwings as xw
import os
import pandas as pd
x=13
y = ((((((((x+2)*x+3)*x+4)*x+5)*x+6)*x+7)*x+8)*x+9)*x+10
print(y)
m = [1,2,3,4,5,6,7,8,9,10]

def f1(m,x):
  return x*len(m)

y = f1(m,13)
print(y)

# Удаляем базу данных, если она существует
db_path = 't-3.db'

if os.path.exists(db_path):
    os.remove(db_path)
    print(f'База данных {db_path} была успешно удалена.')
else:
    print(f'Файл {db_path} не существует.')

# Markdown:
t-3 Создать в БД SQLite таблицу с несколькими колонками (3-5 колонок) из XLSX-таблицы.
   (см. пример SQLite*.ipynb в Материалах.
Заполнить их соответствующими данными, \
a) задавая в программы несколько INSERT (не менее 10) с конкретными значениями.
# Создаем таблицу 'products' с полями для категории, продукта, продаж и квартала
conn = sqlite3.connect('t-3.db')
cursor = conn.cursor()
cursor.execute('CREATE TABLE products (id INTEGER PRIMARY KEY, Category TEXT, Product TEXT, Sales REAL, Quarter INTEGER)')
conn.commit()
# Читаем данные с диапазона A1:D287 из Excel
wb = xw.Book('Excel-sample-data-for-pivot-tables.xlsx')
sheet = wb.sheets['Source Data']
values = sheet.range('A1:D287').value
values[:5]
# Добавляем 10 строк данных в таблицу 'products'
cursor.executemany('''
    INSERT INTO products (category, product, sales, quarter)
    VALUES (?, ?, ?, ?)
''', values[1:10])
conn.commit()
# Выводим данные из таблицы 'products'
cursor.execute('SELECT * FROM products')
for row in cursor.fetchall():
    print(row)
# Markdown:
б) загружая в цикле непосредственно из XLSX-таблицы (+3 балла)\
   Исходные данные согласно варианту (подпапка XSLX):\
   Excel-sample-data-for-pivot-tables.xlsx
# В цикле загружаем оставшиеся данные из Excel в базу данных
for row in range(11, len(values) + 1):
    cursor.execute('''
            INSERT INTO products (category, product, sales, quarter)
            VALUES (?, ?, ?, ?)
        ''', sheet.range(f'A{row}:D{row}').value)

conn.commit()
# Выводим все данные из таблицы 'products'
cursor.execute('SELECT * FROM products')
for row in cursor.fetchall():
    print(row)
conn.close()
wb.close()
# Markdown:
1) Для выборки записей по некоторому критерию одного (или 2-х) значений\
         SELECT a, b FROM t1 WHERE c>100 AND d=4

# Пример выборки данных: выводим продукты с продажами больше 10000 в категории 'Beverages'
conn = sqlite3.connect('t-3.db')
cursor = conn.cursor()
cursor.execute("SELECT Product, Sales FROM products WHERE Sales > 10000 AND Category = 'Beverages'")
for row in cursor.fetchall():
    print(row)
# Markdown:
2) Сгруппировать данные по одному из параметров, подсчитав сумму значений по другому параметру.\
         SELECT a, SUM(b) FROM t1 GROUP BY  c
# Пример группировки: суммируем продажи по категориям
cursor.execute('SELECT Category, SUM(Sales) FROM products GROUP BY Category')
for row in cursor.fetchall():
    print(row)
# Markdown:
3) Нормализовать БД и построить запрос по нескольким таблицам
# Нормализуем базу данных: создаем таблицы для категорий, продуктов и продаж
cursor.execute('''
    CREATE TABLE IF NOT EXISTS categories (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT UNIQUE
    )
''')

cursor.execute('''
    CREATE TABLE IF NOT EXISTS new_products (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT,
        category_id INTEGER,
        FOREIGN KEY (category_id) REFERENCES categories(id)
    )
''')

cursor.execute('''
    CREATE TABLE IF NOT EXISTS sales (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        product_id INTEGER,
        sales REAL,
        quarter TEXT,
        FOREIGN KEY (product_id) REFERENCES new_products(id)
    )
''')

conn.commit()
# Загружаем данные из Excel в DataFrame для дальнейшего использования
df = pd.read_excel('Excel-sample-data-for-pivot-tables.xlsx')
df.head()
# Вставляем уникальные категории
df_categories = df['Category'].unique()
for category in df_categories:
    cursor.execute('INSERT OR IGNORE INTO categories (name) VALUES (?)', (category,))

# Вставляем продукты с ссылками на категорию
for _, row in df.iterrows():
    cursor.execute('SELECT id FROM categories WHERE name = ?', (row['Category'],))
    category_id = cursor.fetchone()[0]
    cursor.execute('INSERT INTO new_products (name, category_id) VALUES (?, ?)', (row['Product'], category_id))

# Вставляем продажи с ссылками на продукт
for _, row in df.iterrows():
    cursor.execute('SELECT id FROM new_products WHERE name = ?', (row['Product'],))
    product_id = cursor.fetchone()[0]
    cursor.execute('INSERT INTO sales (product_id, sales, quarter) VALUES (?, ?, ?)', (product_id, row['Sales'], row['Quarter']))

conn.commit()

# Выводим общие продажи по категориям
cursor.execute('''
    SELECT categories.name AS category, SUM(sales.sales) AS total_sales
    FROM sales
    JOIN new_products ON sales.product_id = new_products.id
    JOIN categories ON new_products.category_id = categories.id
    GROUP BY categories.name
''')

for row in cursor.fetchall():
    print(f"Category: {row[0]}, Total Sales: {row[1]}")

conn.close()


# t-4.ipynb
# Markdown:
# Аль-Натор Даниил ПМ22-1
# Markdown:
# Dask DataFrames
# Markdown:
Dask DataFrames — это аналог Pandas DataFrames, который позволяет работать с таблицами данных, которые не помещаются в памяти. Dask DataFrames поддерживает множество операций, таких как фильтрация, сортировка, группировка и агрегация данных, а также объединение таблиц.
# Markdown:
### Отчет по заданию

#### Постановка задачи:
Цель задания — объединить два набора данных с различными атрибутами на основе общего ключевого поля с использованием Dask DataFrames, что позволяет работать с большими объемами данных, не загружая их полностью в память. Необходимо выполнить объединение данных по ключу, а также провести их обработку с применением не менее двух функций из предложенного списка.

#### Описание данных:
1. **Набор данных 1**:
   - Источник: сгенерирован с помощью библиотеки **Faker** с русской локализацией.
   - Поля:
     - **K1** – ФИО (ключевое поле, уникальный идентификатор).
     - **Возраст** – возраст человека.
     - **Город** – город проживания.
     - **Зарплата** – зарплата в рублях.
   
2. **Набор данных 2**:
   - Источник: сгенерирован с использованием **Faker** с русской локализацией.
   - Поля:
     - **K1** – ФИО (ключевое поле, уникальный идентификатор).
     - **Образование** – уровень образования.
     - **Опыт работы (лет)** – количество лет трудового стажа.
     - **Должность** – текущая должность.

#### Ключевое поле:
- Поле **K1** (ФИО) используется как ключ для объединения двух наборов данных. Оно присутствует в обоих наборах, однако списки значений могут не полностью совпадать (например, часть ФИО есть только в одном наборе данных).

#### Различия в наборах данных:
- Наборы данных различаются не только по содержимому (пересечение по ключу **K1** частичное), но и по атрибутам:
   - В первом наборе содержатся возраст, город и зарплата.
   - Во втором наборе содержатся образование, опыт работы и должность.
import pandas as pd
import numpy as np
from faker import Faker
# Инициализация Faker с русской локализацией
fake = Faker('ru_RU')

# Функция для генерации данных с общими K1
def generate_realistic_data(num_rows, common_fraction=0.5):
    # Генерация общего набора K1 (имена), который попадет в оба набора
    common_names = [fake.name() for _ in range(int(num_rows * common_fraction))]
    
    # Генерация уникальных имен для каждого набора данных
    unique_names_1 = [fake.name() for _ in range(int(num_rows * (1 - common_fraction)))]
    unique_names_2 = [fake.name() for _ in range(int(num_rows * (1 - common_fraction)))]
    
    # Создание первого набора данных
    data1 = {
        'K1': common_names + unique_names_1,  # Общие и уникальные имена
        'Возраст': np.random.randint(18, 70, size=num_rows),
        'Город': [fake.city() for _ in range(num_rows)],
        'Зарплата': np.random.randint(15000, 250000, size=num_rows)
    }
    
    # Создание второго набора данных
    data2 = {
        'K1': common_names + unique_names_2,  # Общие и уникальные имена
        'Образование': np.random.choice(['Среднее', 'Бакалавр', 'Магистр', 'Кандидат наук'], size=num_rows),
        'Опыт работы (лет)': np.random.randint(0, 50, size=num_rows),
        'Должность': np.random.choice(['Менеджер', 'Разработчик', 'Аналитик'], size=num_rows)
    }
    
    return pd.DataFrame(data1), pd.DataFrame(data2)

# Генерация 100 строк данных для каждого DataFrame, с 50% общих имен
df1, df2 = generate_realistic_data(100, common_fraction=0.5)

# Сохранение в CSV файлы
df1.to_csv('dataset_1.csv', index=False)
df2.to_csv('dataset_2.csv', index=False)

# Markdown:
1) Считая, что поле K1 имеет одинаковый набор значений (возможно в разном порядке), объединить данные в один DaskDataFrames, загрузив предварительно каждый csv в свой DataFrame c помощью dd.read_csv. (+7)
import dask.dataframe as dd

# Загрузка данных из CSV файлов в Dask DataFrames
df1 = dd.read_csv('dataset_1.csv')
df2 = dd.read_csv('dataset_2.csv')
# Объединяем два DataFrame по ключевому полю K1
merged_df = dd.merge(df1, df2, on='K1', how='inner')

# Вывод результата
merged_df.head()
# Markdown:
2) Наборы значений в файлах не совпадают. При объединении в строках, для которых полу K1 имеется только в одном файле, аттрибуты оставлять пустыми (или значениями по умолчанию). (+3)
merged_df = dd.merge(df1, df2, on='K1', how='outer')
# Вывод результата
merged_df.head()
# Markdown:
3) Обработать итоговый DataFrame, используя не менее 2-х разных функций, выбранных из 4 (5) по варианту (цель обработки выбрать самостоятельно). Список ф-й в док-те Dask_DataFrame_variant (+5)
# Применение align() - выравниваем DataFrame с самим собой по внешнему объединению
merged_df['Опыт работы (лет)'] = merged_df['Опыт работы (лет)'].add(1)
# Применение align() - выравниваем DataFrame с самим собой по внешнему объединению
all_salaries_positive = merged_df['Зарплата'] > 0
all_salaries_positive.all().compute()
merged_df.head()
# Markdown:
Оценить производительность работы при разных разбиениях DataFrame  (+5)
# Функция для замера времени выполнения при разном количестве разбиений
def evaluate_performance(npartitions):
    # Изменяем количество разбиений в DataFrame
    df_partitioned = merged_df.repartition(npartitions=npartitions)
    
    # Группировка и вычисление средней зарплаты по городам
    df_partitioned.groupby('Город')['Зарплата'].mean().compute()

    # Фильтрация сотрудников с зарплатой выше 50,000
    df_partitioned[df_partitioned['Зарплата'] > 50000].compute()

# Оценка производительности при разных разбиениях
%timeit evaluate_performance(2)   # 2 разбиения
%timeit evaluate_performance(4)   # 4 разбиения
%timeit evaluate_performance(8)   # 8 разбиений
%timeit evaluate_performance(16)   # 16 разбиений
%timeit evaluate_performance(64)   # 64 разбиений
%timeit evaluate_performance(128)   # 128 разбиений
# Markdown:
### Краткое описание результатов по всем заданиям:
1. **Задание 1: Объединение данных по ключевому полю**  
   Два набора данных были успешно объединены по ключевому полю **K1** (ФИО). Оба набора содержали различные атрибуты (возраст, зарплата, город в одном и образование, опыт работы, должность в другом).

2. **Задание 2: Обработка данных с неполным пересечением ключей**  
   Выполнено объединение наборов данных с использованием внешнего объединения (`outer join`), что позволило сохранить все строки из обоих наборов, даже если значения ключа **K1** не совпадали полностью. В результате для сотрудников, присутствующих только в одном из наборов, соответствующие поля заполнились значениями `NaN`.

3. **Задание 3: Применение функций для обработки данных**  
   Были применены следующие функции:
   - **`add()`** — для добавления одного года к столбцу "Опыт работы".
   - **`all()`** — для проверки, что все значения в столбце зарплат больше нуля.
   
   Эти функции позволили провести эффективную обработку данных и проверку на соответствие условиям.

4. **Оценка производительности**  
   Были проведены замеры производительности операций на DataFrame с разными разбиениями (partitions), что позволило оптимизировать процесс обработки больших объемов данных.

### Общий итог:
Выполненные задания продемонстрировали гибкость и мощность Dask DataFrames для обработки больших данных, особенно с точки зрения параллельной обработки, объединения данных, группировки и фильтрации, а также применения вычислительных функций.

# t-5.ipynb
# Markdown:
# Аль-Натор Даниил ПМ22-1
import dask.dataframe as dd
# Загрузка наборов данных в Dask DataFrames
df1 = dd.read_csv('dataset_1.csv')
df2 = dd.read_csv('dataset_2.csv')
# Объединение наборов данных по полю 'K1' (общее идентификаторное поле)
merged_df = dd.merge(df1, df2, on='K1')
# Вывод результата
merged_df.head()
# Markdown:
1) Преобразовать в Dask.Bag DataFrame из t-4
# Преобразование объединенного DataFrame в Dask Bag
bag = merged_df.to_bag().map(lambda row: dict(zip(merged_df.columns, row)))
bag.take(1)
# Markdown:
2) Выполнить операции map и filter (действия по выбору)
# Операция map: добавляем новое поле с общим опытом работы в месяцах
bag_mapped = bag.map(lambda record: {**record, 'Опыт работы (месяцев)': record['Опыт работы (лет)'] * 12})

# Операция filter: фильтруем записи, где зарплата указана и она не меньше 100,000 рублей
bag_filtered = bag_mapped.filter(lambda record: record['Зарплата'] >= 100_000)

# Выполнение вычислений
results = bag_filtered.compute()

# Вывод первых 5 записей для проверки результата
print(results[:5])


# t-6.ipynb
import pandas as pd
import numpy as np
import dask.dataframe as dd
import matplotlib.pyplot as plt
# Markdown:
Данные - акции NVIDIA.\
Date (Дата): даты, к которым относятся данные по ценам.\
Price (Цена): значения цен акций на определенные даты.
data = pd.read_csv('US1.NVDA_230415_240423.csv', delimiter = ';', parse_dates=['<DATE>'])[['<DATE>', '<CLOSE>']]
data.columns = ['Date', 'Price']
data.head()
# Markdown:
1) Рассчитать простую скользящую среднюю (SMA)
# Вычисляем SMA
data.set_index('Date', inplace=True)
data['SMA_5D'] = data['Price'].rolling(window='5D').mean()
data.tail()
# Markdown:
Результат показал сглаженные значения цен, которые помогают отслеживать тренды.
# Markdown:
2) Вариант 1 - рассчитать кумулятивную скользящую среднюю (CMA)
# Вычисляем CMA
data['CMA'] = data['Price'].expanding().mean()
data.tail()
# Markdown:
В отличие от SMA, CMA учитывает все предыдущие данные вплоть до текущего момента, предоставляя более сглаженный и стабильный тренд.
# Markdown:
3) Вычисление скользящего среднего временного ряда с помощью Dask DataFrame https://habr.com/ru/companies/otus/articles/759552/
dask_df  = dd.read_csv('US1.NVDA_230415_240423.csv', delimiter = ';', parse_dates=['<DATE>'])[['<DATE>', '<CLOSE>']]
dask_df.columns = ['Date', 'Price']
dask_df = dask_df.set_index('Date')

rolling_mean = dask_df['Price'].rolling(window='5D').mean().compute()
rolling_mean.tail()
# Markdown:
Было использовано Dask для вычисления скользящего среднего по тому же набору данных.
# Markdown:
4) И сравнить время выполнения в п.1 и п.3
print('Время pandas:')
%timeit data['Price'].rolling(5).mean()
print('Время dask:')
%timeit dask_df['Price'].rolling(window='5D').mean().compute()
# Markdown:
Это показывает, что для небольших наборов данных Pandas является более эффективным инструментом, из-за накладных расходов на параллельную обработку на небольших данных в Dask.
# Markdown:
5) На одних осях построить графики данных из 1) и 2)
plt.figure(figsize=(10, 6))
data['Price'].plot(label='Price', color='black')
data['SMA_5D'].plot(label='SMA (5)', color='blue')
data['CMA'].plot(label='CMA', color='green')
plt.title('Скользящие средние (SMA, CMA)')
plt.legend(loc='best')
plt.show()


# t-7.ipynb
# Markdown:
# Аль-Натор Даниил ПМ22-1
# Markdown:
### Функции 19, 38, 58, 84, 110
import dask.dataframe as dd
import dask.datasets
# Markdown:
Возьмём синтетический набор данных из Dask, который генерируется функцией `dask.datasets.timeseries()`. Этот набор данных имитирует временные ряды и состоит из следующих колонок:

- **`id`**: Идентификатор строки.
- **`name`**: Случайные имена (например, 'Alice', 'Bob' и т.д.).
- **`x`**: Случайные значения с плавающей запятой (тип `float`).
- **`y`**: Другие случайные значения с плавающей запятой (тип `float`).
- **`timestamp`**: Временные метки (индекс данных).
df_dask = dask.datasets.timeseries()
df_dask.head()
# Markdown:
19. **`cummin`** (кумулятивный минимум):
   - Функция `cummin()` вычисляет кумулятивный минимум вдоль оси (по умолчанию ось 0, то есть по строкам).
   - **Пример**: если у нас есть столбец `x = [3, 1, 4, 2]`, то результат будет: `[3, 1, 1, 1]`. Как видно, каждое последующее значение — это минимальное из всех предыдущих и текущего.
   - **Применение**: В примере мы вычисляем кумулятивный минимум для столбца `x`.
# Пример использования cummin (Кумулятивный минимум по оси)
cummin_result = df_dask['x'].cummin().compute()
print("Cummin result:\n", cummin_result)
# Markdown:
38. **`gt`** (операция "больше чем"):
   - Функция `gt()` сравнивает два столбца и возвращает результат побитового сравнения: где одно значение больше другого.
   - **Пример**: если у нас есть два столбца `x = [1, 2, 3]` и `y = [2, 2, 2]`, результат будет: `[False, False, True]`, так как только последнее значение `x` больше соответствующего значения `y`.
   - **Применение**: Мы сравниваем столбцы `x` и `y` для выявления строк, где `x` больше, чем `y`.
# Пример использования gt (Сравнение больше ли значения, чем в переданной другой колонке)
gt_result = df_dask['x'].gt(df_dask['y']).compute()
print("Greater than result:\n", gt_result)
# Markdown:
58. **`mean`** (среднее значение):
   - Функция `mean()` вычисляет среднее арифметическое значение по указанной оси для всех числовых столбцов.
   - **Пример**: если у нас есть столбец `x = [1, 2, 3]`, то его среднее значение будет `(1+2+3)/3 = 2`.
   - **Применение**: В примере мы вычисляем среднее значение по столбцам `x` и `y`.
# Пример использования mean (Среднее значение по колонкам)
mean_result = df_dask[['x', 'y']].mean().compute()
print("Mean result:\n", mean_result)
# Markdown:
84. **`rdiv`** (обратное деление):
   - Функция `rdiv()` выполняет обратное деление: делит указанное значение на данные столбца.
   - **Пример**: если у нас есть столбец `x = [1, 2, 4]` и мы используем `rdiv(10)`, результат будет `[10/1, 10/2, 10/4] = [10, 5, 2.5]`.
   - **Применение**: Мы делим 10 на каждое значение в столбце `x`.
# Пример использования rdiv (Обратное деление значений столбца 'x' на 10)
rdiv_result = df_dask['x'].rdiv(10).compute()
print("rdiv result (10 / x):\n", rdiv_result)
# Markdown:
 110. `to_backend()` (смена бэкенда данных)
   - Функция `to_backend()` используется для смены бэкенда данных, например, с Dask на Pandas. Это полезно, если вы хотите сначала использовать Dask для работы с большими данными, а затем переключиться на Pandas для дальнейшей обработки.
   - **Пример**: Мы преобразуем Dask DataFrame в Pandas DataFrame с помощью `to_backend(backend='pandas')`.
   - **Применение**: В примере происходит преобразование данных для работы с Pandas DataFrame.
# Пример использования to_backend (смена бэкенда на pandas)
to_backend_result = df_dask.to_backend(backend='pandas')
print("To backend (Pandas) result:\n", to_backend_result.head())


# t-10.ipynb
# Markdown:
# Аль-Натор Даниил ПМ22-1
# Markdown:
### Установил Redis через WSL
# Markdown:
t-10  Работа с СУБД Redis (10 = 5+5)
1) Провести сравнение производительности работы с обычной СУБД (sqlite8) и СУБД Redis.\
подготовить тестовые данные типа “ключ-значение” (возможно из данных ПР или случайных) в достаточном объеме (3-5 Гбайт) и разместить их в БД;\
2) Написать программу, которая обращается к БД по ключам, использую обычные запросы “SELECT … WHERE key=Ki”. Последовательность ключей (поряжка 100 тыс.? ) выбирается из сгенерированого случайным образом файла. Произвести замер времени работы программы. С выбранными значениями сделать какую-либо обработку;\
3) Установить Redis, создать БД и заполнить теми же данными;\
4) Запустить ту же программу, изменив способ обращения по ключу, и также замерить время.
# Markdown:
Результат:\
Redis работает медленнее на моих данных в данных заданиях из-за особенностей его структуры данных и архитектуры. В Redis ключи были сохранены как списки, поэтому для каждого ключа необходимо выполнять операции для извлечения всех элементов списка (например, `LRANGE`), что занимает больше времени по сравнению с простым выбором строки в SQLite. Также Redis хранит все данные в оперативной памяти, что может вызывать замедления при больших объёмах данных, особенно при использовании пайплайнов и обработки батчами.
import sqlite3
from datasets import load_dataset
from tqdm import tqdm
import os
import random
import time
import redis
# Markdown:
1) Подготовить тестовые данные типа “ключ-значение” (возможно из данных ПР или случайных) в достаточном объеме (3-5 Гбайт) и разместить их в БД
# Markdown:
### Описание Набора Данных CCNews

1. **Источник**: Common Crawl (общедоступный веб-архив)
2. **Описание**: Набор данных содержит текстовые статьи, включая категории, что позволяет применять его в задачах классификации и анализа текстов. Данные очищены и обработаны для использования в NLP-проектах.

### Структура Выбранных Данных

Набор выбранных данных включает следующие столбцы:

- **plain_text**: Основной текст статьи, содержащий полный текст новостного сообщения. Текст может быть достаточно длинным и включает в себя содержание статьи, что подходит для обучения моделей анализа текста и суммаризации.
- **categories**: Категории, к которым относится статья. Поле `categories` может включать в себя одну или несколько категорий, таких как 'Политика', 'Бизнес', 'Технологии' и др.
# Удаление файла базы данных, если он существует
if os.path.exists('ccnews_data.db'):
    os.remove('ccnews_data.db')
# Подключение к базе данных SQLite и создание таблицы
conn = sqlite3.connect('ccnews_data.db')
cursor = conn.cursor()

# Создание таблицы без ограничения уникальности для category
cursor.execute('''
    CREATE TABLE IF NOT EXISTS kv_store (
        category TEXT,
        plain_text TEXT
    )
''')

# Создание индекса для ускорения поиска по категории
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON kv_store (category)')
conn.commit()
# Загрузка набора данных в режиме стриминга
dataset = load_dataset('stanford-oval/ccnews', split='train', streaming=True)

# Выбор нужных столбцов
selected_data = dataset.map(lambda x: {'plain_text': x['plain_text'], 'categories': x['categories']})
# Определение лимита
limit = 1_000_000

# Параметры батча
batch_size = 10_000
batch = []
count = 0

# Загрузка в базу данных по батчам
for row in tqdm(selected_data, total=limit, desc='Processing and Saving Data'):
    category = row['categories']
    plain_text = row['plain_text']

    batch.append((category, plain_text))
    count += 1

    # Сохраняем батч в базу данных, когда достигаем batch_size
    if len(batch) >= batch_size:
        cursor.executemany('INSERT INTO kv_store (category, plain_text) VALUES (?, ?)', batch)
        conn.commit()
        batch = []  # Очищаем батч

    # Если достигли лимита, выходим из цикла
    if count >= limit:
        break

cursor.close()
# Подсчёт количества строк и размера файла
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM kv_store')
row_count = cursor.fetchone()[0]
conn.close()

# Получение размера файла в гигабайтах
file_size_gb = os.path.getsize('ccnews_data.db') / (1024 ** 3)  # Размер в гигабайтах

print(f'Количество строк в базе данных: {row_count}')
print(f'Размер файла базы данных: {file_size_gb:.2f} ГБ')
# Markdown:
2) Написать программу, которая обращается к БД по ключам, использую обычные запросы “SELECT … WHERE key=Ki”. Последовательность ключей (поряжка 100 тыс.? ) выбирается из сгенерированого случайным образом файла. Произвести замер времени работы программы. С выбранными значениями сделать какую-либо обработку
# Подключаемся к базе данных
conn = sqlite3.connect('ccnews_data.db')
cursor = conn.cursor()

# Извлекаем уникальные категории (ключи) из базы данных
cursor.execute('SELECT DISTINCT category FROM kv_store')
all_keys = [row[0] for row in cursor.fetchall()]

# Генерируем 100,000 случайных ключей
random_keys = random.choices(all_keys, k=100_000)

# Сохраняем случайные ключи в файл
with open('random_keys.txt', 'w', encoding='utf-8') as f:
    for key in random_keys:
        f.write(f'{key}\n')

conn.close()
# Загружаем случайные ключи из файла
with open('random_keys.txt', 'r', encoding='utf-8') as f:
    random_keys = [line.strip() for line in f]


# Обработка результатов по ключам в SQLite (подсчёт количества символов по random_keys)
def get_total_length_sqlite(random_keys):
    conn = sqlite3.connect('ccnews_data.db')
    cursor = conn.cursor()
    total_length = 0

    # Подсчёт длины только для ключей из random_keys
    for key in random_keys:
        cursor.execute('SELECT plain_text FROM kv_store WHERE category = ?', (key,))
        result = cursor.fetchone()
        if result:
            total_length += len(result[0])

    conn.close()
    return total_length
# Измерение времени выполнения
%timeit get_by_keys(random_keys)
total_length = get_total_length_sqlite(random_keys)
print(f'Общая длина текстов: {total_length} символов')
# Markdown:
3) Установить Redis, создать БД и заполнить теми же данными
# Подключение к серверу Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Очистка базы Redis перед загрузкой
r.flushdb()

conn = sqlite3.connect('ccnews_data.db')
cursor = conn.cursor()

# Выбор всех данных из таблицы kv_store
cursor.execute('SELECT category, plain_text FROM kv_store')
data = cursor.fetchall()

# Параметры пайплайна
pipeline = r.pipeline()
batch_size = 10000
total_records = len(data)

# Запись базы данных в Redis с использованием пайплайнов и команды RPUSH
for i, (category, plain_text) in enumerate(tqdm(data, total=total_records, desc='Processing and Saving Data'), 1):
    pipeline.rpush(category, plain_text)  # Добавляем значение в список по ключу категории

    # Выполняем команды в пайплайне, когда достигаем batch_size
    if i % batch_size == 0:
        pipeline.execute()

# Выполнение оставшихся команд в пайплайне
pipeline.execute()

cursor.close()
conn.close()
total_entries = 0
for key in r.scan_iter():
    list_length = r.llen(key)
    total_entries += list_length

print(f'Общее количество записей в Redis: {total_entries}')
# Markdown:
4) Запустить ту же программу, изменив способ обращения по ключу, и также замерить время.
# Обработка результатов по ключам в Redis (подсчёт количества символов по random_keys)
def get_total_length_redis(random_keys, batch_size=1000):
    total_length = 0

    # Использование пайплайнов и подсчёт длины только для ключей из random_keys
    for i in range(0, len(random_keys), batch_size):
        batch_keys = random_keys[i:i + batch_size]
        pipeline = r.pipeline()
        for key in batch_keys:
            pipeline.lrange(key, 0, -1)
        results = pipeline.execute()

        # Подсчёт длины текстов в текущем батче
        for items in results:
            if items:
                total_length += sum(len(item.decode('utf-8')) for item in items)

    return total_length
# Измерение времени выполнения
%timeit get_total_length_redis(random_keys)
redis_total = get_total_length_redis(random_keys)
print(f'Общее количество символов в Redis: {redis_total}')
# Markdown:
Redis работает медленнее на моих данных в данных заданиях из-за особенностей его структуры данных и архитектуры. В Redis ключи были сохранены как списки, поэтому для каждого ключа необходимо выполнять операции для извлечения всех элементов списка (например, `LRANGE`), что занимает больше времени по сравнению с простым выбором строки в SQLite. Также Redis хранит все данные в оперативной памяти, что может вызывать замедления при больших объёмах данных, особенно при использовании пайплайнов и обработки батчами.


# t-11.ipynb
# Markdown:
# Аль-Натор Даниил ПМ22-1
# Markdown:
## Работа с FAISS
На большом объеме больших векторов оценить время нахождения наименьшего расстояния для k других векторов
- а) алгоритм простого перебора
- б) алгоритм с помощью FAISS
# Markdown:
## Результаты
- Простой перебор: подходит для небольших объемов данных, но становится крайне медленным на больших наборах.
- FAISS: значительно ускоряет поиск за счет использования индексов и оптимизированных алгоритмов.
import numpy as np
import time
import faiss
from scipy.spatial import distance
# Markdown:
Генерация случайных данных
# Параметры
dim = 512     # Размерность векторов
nb = 1000   # Количество векторов в индексе
nq = 500     # Количество векторов для поиска

# Генерация случайных векторов
np.random.seed(42)
vectors = np.random.random((nb, dim)).astype('float32')
query_vectors = np.random.random((nq, dim)).astype('float32')

print(f"Сгенерировано {nb} векторов размерности {dim}")
print(f"Сгенерировано {nq} векторов для поиска")
# Markdown:
Алгоритм простого перебора
def brute_force_search(vectors, query_vectors, k=5):
    results = []
    for query in query_vectors:
        distances = [distance.euclidean(query, vector) for vector in vectors]
        nearest_indices = np.argsort(distances)[:k]
        results.append(nearest_indices)
    return results

# Оценка времени для простого перебора
start_time = time.time()
brute_force_results = brute_force_search(vectors, query_vectors, k=5)
brute_force_time = time.time() - start_time

print(f"Время выполнения алгоритма простого перебора: {brute_force_time:.4f} секунд")
# Markdown:
Использование FAISS для ускоренного поиска
# Создаем индекс FAISS
index = faiss.IndexFlatL2(dim)  # L2 = Евклидово расстояние
index.add(vectors)  # Добавляем векторы в индекс

# Оценка времени для поиска с помощью FAISS
start_time = time.time()
D, I = index.search(query_vectors, 5)  # Ищем 5 ближайших соседей для каждого вектора
faiss_time = time.time() - start_time

print(f"Время выполнения поиска с помощью FAISS: {faiss_time:.4f} секунд")
# Markdown:
Сравнение времени выполнения
print(f"Алгоритм простого перебора: {brute_force_time:.4f} секунд")
print(f"Алгоритм FAISS: {faiss_time:.4f} секунд")
print(f"Ускорение за счет FAISS: {brute_force_time / faiss_time:.0f} раз")


# t-12_Аль-Натор_Даниил_ПМ_22-1.ipynb
# Markdown:
# Аль-Натор Даниил ПМ22-1
# Markdown:
t-12  Распараллеливание выполнения алгоритмов с помощью Dask

Разработать пример программы на распараллеливание, усложнив алгоритмы функций. Сравнить время выполнения программы без распараллеливания и с распараллеливанием. Добиться максимального коэффициент сокращения времени выполнения (не менее 10 раз)
!python -m pip install jupyter-server-proxy
import time
from dask import delayed, compute
from tqdm import tqdm
from dask import visualize
import matplotlib.pyplot as plt
from dask.distributed import Client
import numpy as np


client = Client(n_workers=8, threads_per_worker=2)

# Функция для вычисления чисел Фибоначчи (рекурсивно, неэффективно)
def fibonacci(n):
    if n <= 1:
        return n
    else:
        return fibonacci(n - 1) + fibonacci(n - 2)
# Последовательное выполнение задачи
def sequential_execution(numbers):
    results = []
    for num in tqdm(numbers):
        results.append(fibonacci(num))
    return results
# Параллельное выполнение задачи с использованием dask.delayed
def parallel_execution(numbers):
    tasks = [delayed(fibonacci)(num) for num in numbers]

    # Визуализируем граф вычислений
    visualize(tasks, filename='dask_graph', format='png')

    results = compute(*tasks)
    return results
# Список чисел для вычисления
numbers = [25, 30, 35, 20, 30, 35, 25, 35]
print(f"Числа для вычислений: {numbers}")
print("Последовательное выполнение:")
start_time = time.time()
sequential_results = sequential_execution(numbers)
sequential_duration = time.time() - start_time
print(f"Время выполнения (последовательно): {sequential_duration:.2f} секунд")
print("\nПараллельное выполнение с Dask:")
start_time = time.time()
parallel_results = parallel_execution(numbers)
parallel_duration = time.time() - start_time
print(f"Время выполнения (параллельно): {parallel_duration:.2f} секунд")
%%time
print("Последовательное выполнение:")
start_time = time.time()
sequential_results = sequential_execution(numbers)
sequential_duration = time.time() - start_time
print(f"Время выполнения (последовательно): {sequential_duration:.2f} секунд")
print("\nПараллельное выполнение с Dask:")
start_time = time.time()
parallel_results = parallel_execution(numbers)
parallel_duration = time.time() - start_time
print(f"Время выполнения (параллельно): {parallel_duration:.2f} секунд")
from IPython.display import Image

# Отображение сохраненного графа Dask из файла
Image(filename='dask_graph.png')
# Вычисление коэффициента ускорения
speedup = sequential_duration / parallel_duration
print(f"\nКоэффициент ускорения: {speedup:.2f} раз")


# t-13_Аль-Натор_Даниил_ПМ_22-1.ipynb
# Markdown:
# Аль-Натор Даниил ПМ22-1
# Markdown:
t-13  Парсинг web-страниц и создание БД по данным из выбранной категории. Распараллеливание алгоритма обработки страниц помощью Dask

Выбрать одну из категорий в Википедии с достаточно большим числом страниц (500-1000 и более). Распарсить страницы по объектам, выбрав по каждому несколько атрибутов (не менее 3-х). Построить XML-файл и по нему создать БД SQlite3 (см. t-3)
import requests
from bs4 import BeautifulSoup
from dask import delayed, compute
from dask.distributed import Client
import xml.etree.ElementTree as ET
import sqlite3
import pandas as pd
def get_category_pages(category_name):
    S = requests.Session()

    URL = "https://ru.wikipedia.org/w/api.php"

    PARAMS = {
        "action": "query",
        "list": "categorymembers",
        "cmtitle": f"Категория:{category_name}",
        "cmlimit": "10",
        "format": "json"
    }

    pages = []
    while True:
        response = S.get(url=URL, params=PARAMS)
        data = response.json()
        pages.extend([f"https://ru.wikipedia.org/wiki/{item['title'].replace(' ', '_')}" for item in data['query']['categorymembers']])
        if 'continue' in data:
            PARAMS['cmcontinue'] = data['continue']['cmcontinue']
        else:
            break

        if len(pages) >= 500:
            break

    return pages
category_name = 'Писатели_Российской_империи'
pages = get_category_pages(category_name)
print(f"Количество страниц: {len(pages)}")
def parse_page(url):
    headers = {
        "User-Agent": "Mozilla/5.0 (compatible; Bot/1.0; +http://example.com/bot)"
    }
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # Извлечение имени
        name = soup.find('h1', {'id': 'firstHeading'}).text.strip()

        # Извлечение даты рождения
        birth_date = 'Неизвестно'
        bday = soup.find('span', {'class': 'bday'})
        if bday:
            birth_date = bday.text.strip()

        # Извлечение рода деятельности
        occupation = 'Неизвестно'
        infobox = soup.find('table', {'class': 'infobox'})
        if infobox:
            row = infobox.find(string='Род деятельности')
            if row:
                occupation = row.find_next('td').text.strip()

        return {'name': name, 'birth_date': birth_date, 'occupation': occupation}
    except Exception as e:
        print(f"Ошибка при обработке URL {url}: {e}")
        return {'name': 'Неизвестно', 'birth_date': 'Неизвестно', 'occupation': 'Неизвестно'}
import time
from tqdm import tqdm

start_time = time.time()
results = [parse_page(url) for url in tqdm(pages)]
end_time = time.time()
print(f"Потребовалось: {end_time - start_time} секунд")
# параллельное выполнение

start_time = time.time()
delayed_results = [delayed(parse_page)(url) for url in pages]
results = compute(*delayed_results)
end_time = time.time()
print(f"Потребовалось: {end_time - start_time} секунд")
root = ET.Element('persons', {'category': category_name})

for person in results:
    psn = ET.SubElement(root, 'psn', {
        'name': person['name'],
        'birth_date': person['birth_date'],
        'occupation': person['occupation']
    })

tree = ET.ElementTree(root)
tree.write('persons.xml', encoding='utf-8', xml_declaration=True)
# Парсинг XML-файла
tree = ET.parse('persons.xml')
root = tree.getroot()

data = []
for psn in root.findall('psn'):
    name = psn.get('name')
    birth_date = psn.get('birth_date')
    occupation = psn.get('occupation')
    data.append((name, birth_date, occupation))

# Создание базы данных
conn = sqlite3.connect('persons.db')
cursor = conn.cursor()
cursor.execute('''
    CREATE TABLE IF NOT EXISTS persons (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT,
        birth_date TEXT,
        occupation TEXT
    )
''')

# Вставка данных
cursor.executemany('INSERT INTO persons (name, birth_date, occupation) VALUES (?, ?, ?)', data)
conn.commit()
conn.close()
conn = sqlite3.connect('persons.db')
df = pd.read_sql_query('SELECT * FROM persons', conn)
conn.close()
df.sample(5)


# t-14_Аль-Натор.ipynb
# Markdown:
# Аль-Натор Даниил ПМ22-1
# Markdown:
t-14 Привести XML-файл по категориям людей (t-13) в стандартный формат.
# Markdown:
Программа выполняет следующие задачи:

1. **Чтение XML-файла**:
   - Функция `read_xml` читает входной XML-файл с использованием библиотеки `xml.etree.ElementTree`.
   - Если файл успешно прочитан, возвращается корневой элемент дерева XML. В случае ошибки выводится сообщение об ошибке, и возвращается `None`.

2. **Обработка данных и запись в новый XML-файл**:
   - Функция `process_and_write_xml` выполняет обработку данных из входного XML-файла и записывает их в новый XML-файл.
   - Действия внутри функции:
     - Вызывает `read_xml` для чтения исходного XML-файла. Если чтение не удалось, выполнение прерывается.
     - Создает новый XML-документ с корневым элементом `<root>`.
     - Добавляет дочерний элемент `<persons>` с атрибутом `category`, значение которого копируется из исходного XML (или устанавливается в `'Unknown'`, если атрибут отсутствует).
     - Находит все элементы `<psn>` в исходном XML, сортирует их по значению атрибута `name`.
     - Для каждого элемента `<psn>` в исходном XML создается новый элемент `<psn>` с атрибутами:
       - `n` (имя) — обязательный, берется из атрибута `name` исходного элемента.
       - `b` (дата рождения) — добавляется только если значение атрибута `birth_date` не равно `"Неизвестно"`.
       - `a` (профессия) — добавляется только если значение атрибута `occupation` не равно `"Неизвестно"`.
     - Новый XML-документ сохраняется в файл с указанным именем (`output_file`) в кодировке `windows-1251`.

3. **Чтение и вывод нового XML-файла**:
   - Программа читает созданный XML-файл с помощью `ET.parse`.
   - Если файл успешно прочитан, его содержимое преобразуется в строку и форматируется для удобного вывода. Каждый открывающий или закрывающий тег выводится на новой строке.
   - Если чтение файла не удалось, выводится сообщение об ошибке.

4. **Итоговый результат**:
   - Программа считывает данные из входного XML-файла, преобразует их в структурированный и стандартизированный формат, а затем записывает результат в новый XML-файл. Она также позволяет отформатировано вывести содержимое результата.
# Markdown:
Результат:
# Markdown:
# Markdown:
# Markdown:
# ...
import xml.etree.cElementTree as ET

# Чтение XML-файла
def read_xml(file_name):
    try:
        tree = ET.parse(file_name)
        root = tree.getroot()
        print(f"Успешно прочитан файл: {file_name}")
        return root
    except Exception as e:
        print(f"Ошибка при чтении файла: {e}")
        return None

# Преобразование данных и запись в новый XML
def process_and_write_xml(input_file, output_file):
    root = read_xml(input_file)
    if root is None:
        return

    # Создание нового корневого элемента
    root1 = ET.Element("root")
    doc = ET.SubElement(root1, "persons", attrib={"category": root.attrib.get('category', 'Unknown')})

    # Сортировка по имени
    persons = sorted(root.findall('psn'), key=lambda x: x.attrib.get('name', 'Unknown'))

    for p in persons:
        name = p.attrib.get('name', 'Unknown')
        birth_date = p.attrib.get('birth_date')
        occupation = p.attrib.get('occupation')

        # Создание нового элемента
        pu = ET.SubElement(doc, "psn", n=name)
        if birth_date != "Неизвестно":
            pu.set("b", birth_date)
        if occupation != "Неизвестно":
            pu.set("a", occupation)

    # Запись в файл
    tree = ET.ElementTree(root1)
    tree.write(output_file, encoding="windows-1251", xml_declaration=True)
    print(f"XML преобразован и сохранен в '{output_file}'")

# Основной запуск
input_file = 'persons.xml'
output_file = 'standardized_persons.xml'

process_and_write_xml(input_file, output_file)
# Чтение XML-файла
file_name = 'standardized_persons.xml'  # Имя загруженного файла
try:
    tree = ET.parse(file_name)
    root = tree.getroot()
    print(f"Успешно прочитан файл: {file_name}")
except Exception as e:
    print(f"Ошибка при чтении файла: {e}")
    root = None

# Форматирование и вывод содержимого файла
if root is not None:
    xml_output = ET.tostring(root, encoding='unicode', method='xml')
    formatted_output = "\n".join(xml_output.split("><"))
    print(formatted_output)


# t-15_Аль-Натор_Даниил_ПМ22-1.ipynb
# Markdown:
# Аль_Натор Даниил ПМ22-1
# Markdown:
Формулировка задачи: имеется набор текстов. Необходимо для каждого слова, встречающегося в наборе текстов, посчитать, сколько раз встречается слово в наборе. В качестве текстов взять литературные произведения из папки Материалы..../Texts (чем больше, тем лучше). Предварительно тексты “очистить” и нормализовать слова.
# Markdown:
Код анализирует тексты из папки `./data/Texts`, подсчитывает количество вхождений каждого слова (исключая стоп-слова), и выводит таблицу с результатами. Итоги сохраняются в файл `word_count.csv` в папке `./results`. Таблица позволяет выявить наиболее часто встречающиеся слова в анализируемых текстах.
import os
import re
from collections import Counter
from dask import delayed, compute
from glob import glob
import pandas as pd
from nltk.corpus import stopwords
# Загрузка стоп-слов
stop_words = set(stopwords.words('russian') + stopwords.words('english'))
stop_words
# Функция для определение кодировки файла
def detect_encoding(file_path):
    with open(file_path, 'rb') as f:
        raw_data = f.read()
    try:
        raw_data.decode('utf-8')
        return 'utf-8'
    except UnicodeDecodeError:
        return 'cp1251'
    
# Функция для чтения и нормализации текста
def read_and_normalize(file_path):
    encoding = detect_encoding(file_path)
    with open(file_path, 'r', encoding=encoding) as f:
        text = f.read().lower()
        text = re.sub(r"[^a-zA-Zа-яА-Я0-9\s]", "", text)
        words = text.split()
        words = [word for word in words if word not in stop_words]  # Удаление стоп-слов
        return words

# Функция для подсчета слов в отдельном тексте
def count_words_in_file(file_path):
    words = read_and_normalize(file_path)
    return Counter(words)

# Функция для объединения результатов подсчета
@delayed
def merge_counters(counter1, counter2):
    return counter1 + counter2
input_dir = "./data/Texts"
file_paths = glob(os.path.join(input_dir, "*.txt"))

# Список задач для Dask
word_counts = [delayed(count_words_in_file)(file_path) for file_path in file_paths]

# Объединяем результаты
total_word_count = word_counts[0]
for wc in word_counts[1:]:
    total_word_count = merge_counters(total_word_count, wc)

# Вычисляем финальный результат
final_word_count = compute(total_word_count)[0]

# Преобразуем результат в DataFrame
word_count_df = pd.DataFrame(final_word_count.items(), columns=['Word', 'Count']).sort_values(by='Count', ascending=False)

# Сохраняем результат
output_dir = "./results"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "word_count.csv")
word_count_df.to_csv(output_path, index=False, encoding="utf-8")

# Выводим результат
word_count_df

