Source code for feste.compute
# This module contains modified code from Dask which is
# licensed under BSD 3-Clause License for the following holder:
# Copyright (c) 2014, Anaconda, Inc. and contributors.
from typing import Any, Callable
from feste.graph import FesteGraph
from feste.optimization import Optimizer
from feste.scheduler import get_multiprocessing
[docs]def compute(*args, scheduler_fn: Callable = get_multiprocessing, # type: ignore
optimize_graph: bool = True, **kwargs) -> Any:
"""This function will compute the given objects using the default
multiprocessing scheduler.
:param scheduler_fn: a scheduler (defaults to multiprocessing scheduler)
:param optimize_graph: if graph should be optimized
:return: computed objects
"""
feste_graph, collections, repack = FesteGraph.collect(*args)
if optimize_graph:
optimizer = Optimizer.from_backends()
feste_graph = optimizer.apply(feste_graph)
keys, postcomputes = [], []
for x in collections:
keys.append(x.__dask_keys__())
postcomputes.append(x.__dask_postcompute__())
results = scheduler_fn(dict(feste_graph), keys,
optimize_graph=optimize_graph, **kwargs)
return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) # type: ignore