# Advanced IO

> *Note:* This documentation is based on `Kedro 0.15.2`, if you spot anything that is incorrect then please create an [issue](https://github.com/quantumblacklabs/kedro/issues) or pull request.

In this tutorial, you will learn about advanced uses of the [Kedro IO](/kedro.io.rst) module and understand the underlying implementation.

Relevant API documentation: [AbstractDataSet](/kedro.io.AbstractDataSet), [DataSetError](/kedro.io.DataSetError)

## Error handling

We have custom exceptions for the main classes of errors that you can handle to deal with failures.

```python
from kedro.io import *
```

```python
io = DataCatalog(data_sets=dict())  # empty catalog

try:
    cars_df = io.load('cars')
except DataSetError:
    print("Error raised.")
```


## AbstractDataSet

To understand what is going on behind the scenes, you should study the [AbstractDataSet interface](/kedro.io.AbstractDataSet). `AbstractDataSet` is the underlying interface that all datasets extend. It requires subclasses to override the `_load` and `_save` and provides `load` and `save` methods that enrich the corresponding private methods with uniform error handling. It also requires subclasses to override `_describe`, which is used in logging the internal information about the instances of your custom `AbstractDataSet` implementation.

If you have a dataset called `parts`, you can make direct calls to it like so:

```python
parts_df = parts.load()
```

However, we recommend using a `DataCatalog` instead (for more details, see [this section](../04_user_guide/04_data_catalog.md) in the User Guide) as it has been designed to make all datasets available to project members.

For contributors, if you would like to submit a new dataset, you will have to extend `AbstractDataSet`.


## Versioning

In order to enable versioning, you need to update the `catalog.yml` config file and set the `versioned` attribute to `true` for the given dataset. If this is a custom dataset, the implementation must also:
  1. extend `kedro.io.core.AbstractVersionedDataSet` AND
  2. add `version` namedtuple as an argument to its `__init__` method AND
  3. call `super().__init__()` with positional arguments `filepath`, `version`, and, optionally, with a `glob` and an `exists` functions if it uses non-local filesystem (see [kedro.io.CSVLocalDataSet](/kedro.io.CSVLocalDataSet) and [kedro.io.CSVS3DataSet](/kedro.io.CSVS3DataSet) for examples) AND
  4. modify its `_describe`, `_load` and `_save` methods respectively to support versioning (see [`kedro.io.CSVLocalDataSet`](/kedro.io.CSVLocalDataSet) for an example implementation)

An example dataset could look similar to the below:

```python
from pathlib import Path

import pandas as pd

from kedro.io import AbstractVersionedDataSet


class MyOwnDataSet(AbstractVersionedDataSet):
    def __init__(self, param1, param2, filepath, version):
        super().__init__(Path(filepath), version)
        self._param1 = param1
        self._param2 = param2

    def _load(self) -> pd.DataFrame:
        load_path = self._get_load_path()
        return pd.read_csv(load_path)

    def _save(self, df: pd.DataFrame) -> None:
        save_path = self._get_save_path()
        df.to_csv(save_path)

    def _describe(self):
        return dict(version=self._version, param1=self._param1, param2=self._param2)
```

With `catalog.yml` specifying:

```yaml
my_dataset:
  type: <path-to-my-own-dataset>.MyOwnDataSet
  filepath: data/01_raw/my_data.csv
  versioned: true
```

### `version` namedtuple

Versioned dataset `__init__` method must have an optional argument called `version` with a default value of `None`. If provided, this argument must be an instance of [`kedro.io.core.Version`](/kedro.io.Version). Its `load` and `save` attributes must either be `None` or contain string values representing exact load and save versions:

* If `version` is `None` then the dataset is considered *not versioned*.
* If `version.load` is `None` then the latest available version will be used to load the dataset, otherwise a string representing exact load version must be provided.
* If `version.save` is `None` then a new save version string will be generated by calling `kedro.io.core.generate_timestamp()`, otherwise a string representing exact save version must be provided.

### Versioning using the YAML API

The easiest way to version a specific dataset is to change the corresponding entry in the `catalog.yml`.

> *Note:* `catalog.yml` only allows you to choose to version your datasets but it does not allow to choose which version to load or save. In rare case it is strongly required you may want to instantiate your versioned datasets using Code API and define version parameter explicitly (see the [corresponding section](#versioning-using-the-code-api) below).

For example, if the following dataset was defined in the `catalog.yml`:

```yaml
cars.csv:
  type: CSVLocalDataSet
  filepath: data/01_raw/company/cars.csv
  versioned: true
```

the `DataCatalog` will create a versioned `CSVLocalDataSet` called `cars.csv`. The actual csv file location will look like `data/01_raw/company/cars.csv/<version>/cars.csv`, where `<version>` corresponds to a global save version string formatted as `YYYY-MM-DDThh.mm.ss.sssZ`. Every time the `DataCatalog` is instantiated, it generates a new global save version, which is propagated to all versioned datasets it contains.

> **Important:** the `DataCatalog` does not re-generate save versions between instantiations. Therefore, if you call `catalog.save('cars.csv', some_data)` twice, then the second call will fail, since it tries to overwrite a versioned dataset using the same save version. This limitation does not apply to `load` operation.

By default, the `DataCatalog` will load the latest version of the dataset. However, it is also possible to specify an exact load version. In order to do that, you can pass a dictionary with exact load versions to `DataCatalog.from_config`:

```python
load_versions = {'cars.csv': '2019-02-13T14.35.36.518Z'}
io = DataCatalog.from_config(catalog_config, credentials, load_versions=load_versions)
cars = io.load('cars.csv')
```

The last row in the example above would attempt to load a CSV file from `data/01_raw/company/cars.csv/2019-02-13T14.35.36.518Z/cars.csv`.

> `load_versions` configuration has an effect only if a dataset versioning has been enabled in the catalog config file - see the example above.

> **Important:** we recommend not to override `save_version` argument in `DataCatalog.from_config` unless strongly required to do so, since it may lead to inconsistencies between loaded and saved versions of the versioned datasets.

### Versioning using the Code API

Although we recommend enabling versioning using the `catalog.yml` config file as described in the section above, you may require more control over load and save versions of a specific dataset. To achieve this you can instantiate `Version` and pass it as a parameter to the dataset initialisation:

```python
from kedro.io import CSVLocalDataSet, DataCatalog, Version
import pandas as pd

data1 = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
data2 = pd.DataFrame({"col1": [7], "col2": [8], "col3": [9]})
version = Version(
    load=None,  # load the latest available version
    save=None,  # generate save version automatically on each save operation
)
test_data_set = CSVLocalDataSet(
    filepath="data/01_raw/test.csv",
    save_args={"index": False},
    version=version,
)
io = DataCatalog({"test_data_set": test_data_set})

# save the dataset to data/01_raw/test.csv/<version>/test.csv
io.save("test_data_set", data1)
# save the dataset into a new file data/01_raw/test.csv/<version>/test.csv
io.save("test_data_set", data2)

# load the latest version from data/test.csv/*/test.csv
reloaded = io.load("test_data_set")
assert data2.equals(reloaded)
```

> *Note:* In the example above we did not fix any versions. If we do, then the behaviour of load and save operations becomes slightly different:

```python
version = Version(
    load="my_exact_version",   # load exact version
    save="my_exact_version",   # save to exact version
)
test_data_set = CSVLocalDataSet(
    filepath="data/01_raw/test.csv",
    save_args={"index": False},
    version=version,
)
io = DataCatalog({"test_data_set": test_data_set})

# save the dataset to data/01_raw/test.csv/my_exact_version/test.csv
io.save("test_data_set", data1)
# load from data/01_raw/test.csv/my_exact_version/test.csv
reloaded = io.load("test_data_set")
assert data1.equals(reloaded)

# raises DataSetError since the path
# data/01_raw/test.csv/my_exact_version/test.csv already exists
io.save("test_data_set", data2)
```

> **Important:** Passing exact load and/or save versions to the dataset instantiation is not recommended, since it may lead to inconsistencies between operations. For example, if versions for load and save operations do not match, save operation would result in a `UserWarning` indicating that save a load versions do not match. Load after save may also return an error if the corresponding load version is not found:

```python
version = Version(
    load="exact_load_version",  # load exact version
    save="exact_save_version"   # save to exact version
)
test_data_set = CSVLocalDataSet(
    filepath="data/01_raw/test.csv",
    save_args={"index": False},
    version=version,
)
io = DataCatalog({"test_data_set": test_data_set})

io.save("test_data_set", data1)  # emits a UserWarning due to version inconsistency

# raises DataSetError since the data/01_raw/test.csv/exact_load_version/test.csv
# file does not exist
reloaded = io.load("test_data_set")
```

### Supported datasets

Currently the following datasets support versioning:

- `CSVLocalDataSet`
- `CSVS3DataSet`
- `HDFLocalDataSet`
- `HDFS3DataSet`
- `JSONLocalDataSet`
- `ParquetLocalDataSet`
- `PickleLocalDataSet`
- `PickleS3DataSet`
- `TextLocalDataSet`
- `ExcelLocalDataSet`
- `kedro.contrib.io.feather.FeatherLocalDataSet`
- `kedro.contrib.io.parquet.ParquetS3DataSet`
- `kedro.contrib.io.azure.CSVBlobDataSet`
- `kedro.contrib.io.pyspark.SparkDataSet`
