SharedMemoryDataCatalog
kedro.io.SharedMemoryDataCatalog ¶
SharedMemoryDataCatalog(datasets=None, config_resolver=None, load_versions=None, save_version=None)
Bases: DataCatalog
A specialized DataCatalog for managing datasets in a shared memory context.
The SharedMemoryDataCatalog extends the base DataCatalog to support multiprocessing
by ensuring that datasets are serializable and synchronized across threads or processes.
It provides additional functionality for managing shared memory datasets, such as setting
a multiprocessing manager and validating dataset compatibility with multiprocessing.
Attributes:
-
default_runtime_patterns(ClassVar) –A dictionary defining the default runtime pattern for datasets of type
kedro.io.SharedMemoryDataset.
Example:
from multiprocessing.managers import SyncManager
from kedro.io import MemoryDataset
from kedro.io.data_catalog import SharedMemoryDataCatalog
# Create a shared memory catalog
catalog = SharedMemoryDataCatalog(
datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
)
# Set a multiprocessing manager
manager = SyncManager()
manager.start()
catalog.set_manager_datasets(manager)
# Validate the catalog for multiprocessing compatibility
catalog.validate_catalog()
Source code in kedro/io/data_catalog.py
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 | |
default_runtime_patterns
class-attribute
instance-attribute
¶
default_runtime_patterns = {'{default}': {'type': 'kedro.io.SharedMemoryDataset'}}
set_manager_datasets ¶
set_manager_datasets(manager)
Associate a multiprocessing manager with all shared memory datasets in the catalog.
This method iterates through all datasets in the catalog and sets the provided
multiprocessing manager for datasets of type SharedMemoryDataset. This ensures
that these datasets are properly synchronized across threads or processes.
Parameters:
-
manager(SyncManager) –A multiprocessing manager to be associated with shared memory datasets.
Example:
from multiprocessing.managers import SyncManager
from kedro.io.data_catalog import SharedMemoryDataCatalog
catalog = SharedMemoryDataCatalog(datasets={"shared_data": MemoryDataset(data=[1, 2, 3])})
manager = SyncManager()
manager.start()
catalog.set_manager_datasets(manager)
print(catalog)
# {'shared_data': kedro.io.memory_dataset.MemoryDataset(data='<list>')}
Source code in kedro/io/data_catalog.py
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 | |
validate_catalog ¶
validate_catalog()
Validate the catalog to ensure all datasets are serializable and compatible with multiprocessing.
This method checks that all datasets in the catalog are serializable and do not include non-proxied memory datasets as outputs. Non-serializable datasets or datasets that rely on single-process memory cannot be used in a multiprocessing context. If any such datasets are found, an exception is raised with details.
Raises:
-
AttributeError–If any datasets are found to be non-serializable or incompatible with multiprocessing.
Example:
from kedro.io.data_catalog import SharedMemoryDataCatalog
catalog = SharedMemoryDataCatalog(datasets={"shared_data": MemoryDataset(data=[1, 2, 3])})
try:
catalog.validate_catalog()
except AttributeError as e:
print(f"Validation failed: {e}")
# No error
Source code in kedro/io/data_catalog.py
1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 | |