AbstractVersionedDataset(filepath, version, exists_function=None, glob_function=None)
Bases: AbstractDataset[_DI, _DO], ABC
AbstractVersionedDataset is the base class for all versioned dataset
implementations.
All datasets that implement versioning should extend this
abstract class and implement the methods marked as abstract.
Example:
from pathlib import Path, PurePosixPath
import pandas as pd
from kedro.io import AbstractVersionedDataset
class MyOwnDataset(AbstractVersionedDataset):
def __init__(self, filepath, version, param1, param2=True):
super().__init__(PurePosixPath(filepath), version)
self._param1 = param1
self._param2 = param2
def load(self) -> pd.DataFrame:
load_path = self._get_load_path()
return pd.read_csv(load_path)
def save(self, df: pd.DataFrame) -> None:
save_path = self._get_save_path()
df.to_csv(str(save_path))
def _exists(self) -> bool:
path = self._get_load_path()
return Path(path.as_posix()).exists()
def _describe(self):
return dict(version=self._version, param1=self._param1, param2=self._param2)
Example catalog.yml specification:
my_dataset:
type: <path-to-my-own-dataset>.MyOwnDataset
filepath: data/01_raw/my_data.csv
versioned: true
param1: <param1-value> # param1 is a required argument
# param2 will be True by default
Parameters:
-
filepath
(PurePosixPath)
–
Filepath in POSIX format to a file.
-
version
(Version | None)
–
If specified, should be an instance of
kedro.io.core.Version. If its load attribute is
None, the latest version will be loaded. If its save
attribute is None, save version will be autogenerated.
-
exists_function
(Callable[[str], bool] | None, default:
None
)
–
Function that is used for determining whether
a path exists in a filesystem.
-
glob_function
(Callable[[str], list[str]] | None, default:
None
)
–
Function that is used for finding all paths
in a filesystem, which match a given pattern.
Source code in kedro/io/core.py
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740 | def __init__(
self,
filepath: PurePosixPath,
version: Version | None,
exists_function: Callable[[str], bool] | None = None,
glob_function: Callable[[str], list[str]] | None = None,
):
"""Creates a new instance of ``AbstractVersionedDataset``.
Args:
filepath: Filepath in POSIX format to a file.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
exists_function: Function that is used for determining whether
a path exists in a filesystem.
glob_function: Function that is used for finding all paths
in a filesystem, which match a given pattern.
"""
self._filepath = filepath
self._version = version
self._exists_function = exists_function or _local_exists
self._glob_function = glob_function or iglob
# 1 entry for load version, 1 for save version
self._version_cache = Cache(maxsize=2) # type: Cache
|
_exists_function
instance-attribute
_exists_function = exists_function or _local_exists
_filepath
instance-attribute
_glob_function
instance-attribute
_glob_function = glob_function or iglob
_version
instance-attribute
_version_cache
instance-attribute
_version_cache = Cache(maxsize=2)
_fetch_latest_load_version
_fetch_latest_load_version()
Source code in kedro/io/core.py
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763 | @cachedmethod(cache=attrgetter("_version_cache"), key=partial(hashkey, "load"))
def _fetch_latest_load_version(self) -> str:
# When load version is unpinned, fetch the most recent existing
# version from the given path.
pattern = str(self._get_versioned_path("*"))
try:
version_paths = sorted(self._glob_function(pattern), reverse=True)
except Exception as exc:
message = (
f"Did not find any versions for {self}. This could be "
f"due to insufficient permission. Exception: {exc}"
)
raise VersionNotFoundError(message) from exc
most_recent = next(
(path for path in version_paths if self._exists_function(path)), None
)
if not most_recent:
message = f"Did not find any versions for {self}"
raise VersionNotFoundError(message)
return PurePath(most_recent).parent.name
|
_fetch_latest_save_version
_fetch_latest_save_version()
Generate and cache the current save version
Source code in kedro/io/core.py
| @cachedmethod(cache=attrgetter("_version_cache"), key=partial(hashkey, "save"))
def _fetch_latest_save_version(self) -> str:
"""Generate and cache the current save version"""
return generate_timestamp()
|
_get_load_path
Source code in kedro/io/core.py
780
781
782
783
784
785
786 | def _get_load_path(self) -> PurePosixPath:
if not self._version:
# When versioning is disabled, load from original filepath
return self._filepath
load_version = self.resolve_load_version()
return self._get_versioned_path(load_version) # type: ignore[arg-type]
|
_get_save_path
Source code in kedro/io/core.py
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810 | def _get_save_path(self) -> PurePosixPath:
if not self._version:
# When versioning is disabled, return original filepath
return self._filepath
save_version = self.resolve_save_version()
versioned_path = self._get_versioned_path(save_version) # type: ignore[arg-type]
if self._exists_function(str(versioned_path)):
raise DatasetError(
f"Save path '{versioned_path}' for {self!s} must not exist if "
f"versioning is enabled."
)
return versioned_path
|
_get_versioned_path
_get_versioned_path(version)
Source code in kedro/io/core.py
| def _get_versioned_path(self, version: str) -> PurePosixPath:
return self._filepath / version / self._filepath.name
|
_release
Source code in kedro/io/core.py
| def _release(self) -> None:
super()._release()
self._version_cache.clear()
|
_save_wrapper
classmethod
Decorate save_func with logging and error handling code.
Source code in kedro/io/core.py
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851 | @classmethod
def _save_wrapper(
cls, save_func: Callable[[Self, _DI], None]
) -> Callable[[Self, _DI], None]:
"""Decorate `save_func` with logging and error handling code."""
@wraps(save_func)
def save(self: Self, data: _DI) -> None:
self._version_cache.clear()
save_version = (
self.resolve_save_version()
) # Make sure last save version is set
try:
super()._save_wrapper(save_func)(self, data)
except (FileNotFoundError, NotADirectoryError) as err:
# FileNotFoundError raised in Win, NotADirectoryError raised in Unix
_default_version = "YYYY-MM-DDThh.mm.ss.sssZ"
raise DatasetError(
f"Cannot save versioned dataset '{self._filepath.name}' to "
f"'{self._filepath.parent.as_posix()}' because a file with the same "
f"name already exists in the directory. This is likely because "
f"versioning was enabled on a dataset already saved previously. Either "
f"remove '{self._filepath.name}' from the directory or manually "
f"convert it into a versioned dataset by placing it in a versioned "
f"directory (e.g. with default versioning format "
f"'{self._filepath.as_posix()}/{_default_version}/{self._filepath.name}"
f"')."
) from err
load_version = self.resolve_load_version()
if load_version != save_version:
warnings.warn(
_CONSISTENCY_WARNING.format(save_version, load_version, str(self))
)
self._version_cache.clear()
return save
|
exists
Checks whether a dataset's output already exists by calling
the provided _exists() method.
Returns:
-
bool
–
Flag indicating whether the output already exists.
Raises:
Source code in kedro/io/core.py
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871 | def exists(self) -> bool:
"""Checks whether a dataset's output already exists by calling
the provided _exists() method.
Returns:
Flag indicating whether the output already exists.
Raises:
DatasetError: when underlying exists method raises error.
"""
self._logger.debug("Checking whether target of %s exists", str(self))
try:
return self._exists()
except VersionNotFoundError:
return False
except Exception as exc: # SKIP_IF_NO_SPARK
message = f"Failed during exists check for dataset {self!s}.\n{exc!s}"
raise DatasetError(message) from exc
|
resolve_load_version
Compute the version the dataset should be loaded with.
Source code in kedro/io/core.py
772
773
774
775
776
777
778 | def resolve_load_version(self) -> str | None:
"""Compute the version the dataset should be loaded with."""
if not self._version:
return None
if self._version.load:
return self._version.load # type: ignore[no-any-return]
return self._fetch_latest_load_version()
|
resolve_save_version
Compute the version the dataset should be saved with.
Source code in kedro/io/core.py
788
789
790
791
792
793
794 | def resolve_save_version(self) -> str | None:
"""Compute the version the dataset should be saved with."""
if not self._version:
return None
if self._version.save:
return self._version.save # type: ignore[no-any-return]
return self._fetch_latest_save_version()
|