Source code for b2luigi

"""Task scheduling and batch running for basf2 jobs made simple"""
__version__ = "0.5.0"

from luigi import *
from luigi.util import inherits, copies

from b2luigi.core.parameter import wrap_parameter, BoolParameter

wrap_parameter()

from b2luigi.core.task import Task, ExternalTask, WrapperTask
from b2luigi.core.temporary_wrapper import on_temporary_files
from b2luigi.core.dispatchable_task import DispatchableTask, dispatch
from b2luigi.core.settings import get_setting, set_setting, clear_setting
from b2luigi.cli.process import process


class requires(object):
    """
    This "hack" copies the luigi.requires functionality, except that we allow for 
    additional kwarg arguments when called.

    It can be used to require a certain task, but with some variables already set, 
    e.g.

        class TaskA(b2luigi.Task):
             some_parameter = b2luigi.IntParameter()
             some_other_parameter = b2luigi.IntParameter()
 
             def output(self):
                 yield self.add_to_output("test.txt")
 
         @b2luigi.requires(TaskA, some_parameter=3)
         class TaskB(b2luigi.Task):
             another_parameter = b2luigi.IntParameter()
 
             def output(self):
                 yield self.add_to_output("out.dat")

    TaskB will not require TaskA, where some_parameter is already set to 3. 
    This also means, that TaskB only has the parameters another_parameter 
    and some_other_parameter (because some_parameter is already fixed).

    """

    def __init__(self, task_to_require, **kwargs):
        super(requires, self).__init__()
        self.kwargs = kwargs
        self.task_to_require = task_to_require

    def __call__(self, task_that_requires):
        # Get all parameter objects from the underlying task
        for param_name, param_obj in self.task_to_require.get_params():
            # Check if the parameter exists in the inheriting task
            if not hasattr(task_that_requires, param_name) and not param_name in self.kwargs:
                # If not, add it to the inheriting task
                setattr(task_that_requires, param_name, param_obj)

        old_requires = task_that_requires.requires

        # Modify task_that_requres by adding methods
        def requires(_self):
            yield from old_requires(_self)
            yield _self.clone(cls=self.task_to_require, **self.kwargs)

        task_that_requires.requires = requires

        return task_that_requires