Skip to content

infer_subc/workflows

Code for classes which interface with the sibling napari plugin repo organelle-segmenter-plugin documented at ndcn.github.io/organelle-segmenter-plugin

infer_subc/workflow for napari plugin

The workflow sub-module is designed to interface with this repo (infer_subc) and the sibling napari plugin repo organelle-segmenter-plugin

workflow sub-modules

Workflow

Represents an executable aics-segmentation workflow This class provides the functionality to run a workflow using an image input according to the steps defined in its WorkflowDefinition.

Source code in infer_subc/workflow/workflow.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class Workflow:
    """
    Represents an executable aics-segmentation workflow
    This class provides the functionality to run a workflow using an image input
    according to the steps defined in its WorkflowDefinition.
    """

    def __init__(self, workflow_definition: WorkflowDefinition, input_image: np.ndarray):
        if workflow_definition is None:
            raise ArgumentNullError("workflow_definition")
        if input_image is None:
            raise ArgumentNullError("input_image")
        self._definition: WorkflowDefinition = workflow_definition
        self._starting_image: np.ndarray = input_image
        self._next_step: int = 0  # Next step to execute
        self._results: List = list()  # Store most recent step results

    @property
    def workflow_definition(self) -> WorkflowDefinition:
        return self._definition

    def reset(self):
        """
        Reset the workflow so it can be run again
        """
        self._next_step = 0
        self._results = list()

    def get_next_step(self) -> WorkflowStep:
        """
        Get the next step to be performed

        Params:
            none: None

        Returns
            (WorkflowStep): next WorkflowStep object to perform on image
            None if all steps have already been executed
        """
        if self._next_step >= len(self._definition.steps):
            return None
        return self._definition.steps[self._next_step]

    def execute_next(self, parameters: Dict[str, Any] = None) -> np.ndarray:
        """
        Execute the next workflow step.

        Params:
            parameters: Optional dictionary of parameter inputs to use when executing the step
                        If parameters are not provided, the step's default parameters will be used

        Returns
            result (np.ndarray): resultant image from running the
            next workflow step
        """

        step = self.get_next_step()

        log.info(f"Executing step #{step.step_number}")
        # Pick which image to perform the workflow step on
        image: np.ndarray = None

        if self._next_step == 0:
            # First image, so use the starting image for the next workflow step
            image = [self._starting_image]
        elif self.is_done():
            # No more workflow steps to perform
            # TODO: what to do if done with workflow
            #  but execute_next is prompted?
            # printing message for now
            log.info("No steps left to run")
        else:
            image = list()
            for i in step.parent:
                res = self.get_result(i - 1)  # parents are 1 indexed
                image.append(res)

        result: np.ndarray = self.get_next_step().execute(image, parameters or step.parameter_values)
        self._results.append(result)

        # Only increment after running step
        self._next_step += 1
        return result

    # TODO maybe change this to match the step number instead?
    #      Review when we implement rerunning single workflow steps
    def get_result(self, step_index: int) -> np.ndarray:
        """
        Get the result image for a workflow step.

        You must call execute() on the workflow step in order to
        produce a result first before calling this function.

        Params:
            step_index (int): index of the WorkflowStep in the
            workflowengine to get the result image of.

        Returns
            self.image (np.ndarray): Result of performing workflow step
                                     on the given image
                                     None if step has not been executed yet.
        """
        if step_index < 0:
            return self._starting_image
        if step_index >= len(self._results):
            return None  # returns None if the WorkflowStep has not been executed.

        return self._results[step_index]

    def get_most_recent_result(self) -> np.ndarray:
        """
        Get the result from the last executed WorkflowStep.

        Params:
           none: None

        Returns
            (np.ndarray): Result of the last executed WorkflowStep,
                            returns the starting image if no Workflowsteps have
                            been run.
        """
        if self._next_step == 0:
            return self._starting_image  # TODO does this behavior make sense? Return None instead?
        else:
            return self.get_result(self._next_step - 1)

    def execute_all(self) -> np.ndarray:
        """
        Execute all steps in the Workflow
        Note: default parameters will be used to execute the steps. To execute a step
              with user-provided parameters, use execute_next()

        Params:
            none: None

        Returns
            (np.ndarray): Result of the final WorkflowStep.
        """
        self.reset()
        while not self.is_done():
            self.execute_next()
        return self.get_most_recent_result()

    def execute_step(self, i: int, parameters: Dict[str, Any], selected_image: List[Image]) -> np.ndarray:
        """

        Args:
            i: step number (0 indexed) that you want to run

        Returns

        """
        # TODO: discuss using selected layer for all types
        step_to_run = self._definition.steps[i]

        image = [i.data for i in selected_image]
        result: np.ndarray = step_to_run.execute(image, parameters or step_to_run.parameter_values)

        if len(self._results) <= i:
            # this is the first time running this step
            self._results.append(result)
        else:
            # this step has been run before so replacing old value
            self._results[i] = result

        return result

    def is_done(self) -> bool:
        """
        Check if all WorkflowSteps have been executed.

        Params:
            none: None

        Returns
            (bool): True if all WorkflowSteps have been executed, False if not
        """
        return self._next_step >= len(self._definition.steps)

execute_all()

Execute all steps in the Workflow

default parameters will be used to execute the steps. To execute a step

with user-provided parameters, use execute_next()

Parameters:

Name Type Description Default
none

None

required

Returns (np.ndarray): Result of the final WorkflowStep.

Source code in infer_subc/workflow/workflow.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def execute_all(self) -> np.ndarray:
    """
    Execute all steps in the Workflow
    Note: default parameters will be used to execute the steps. To execute a step
          with user-provided parameters, use execute_next()

    Params:
        none: None

    Returns
        (np.ndarray): Result of the final WorkflowStep.
    """
    self.reset()
    while not self.is_done():
        self.execute_next()
    return self.get_most_recent_result()

execute_next(parameters=None)

Execute the next workflow step.

Parameters:

Name Type Description Default
parameters Dict[str, Any]

Optional dictionary of parameter inputs to use when executing the step If parameters are not provided, the step's default parameters will be used

None

Returns result (np.ndarray): resultant image from running the next workflow step

Source code in infer_subc/workflow/workflow.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def execute_next(self, parameters: Dict[str, Any] = None) -> np.ndarray:
    """
    Execute the next workflow step.

    Params:
        parameters: Optional dictionary of parameter inputs to use when executing the step
                    If parameters are not provided, the step's default parameters will be used

    Returns
        result (np.ndarray): resultant image from running the
        next workflow step
    """

    step = self.get_next_step()

    log.info(f"Executing step #{step.step_number}")
    # Pick which image to perform the workflow step on
    image: np.ndarray = None

    if self._next_step == 0:
        # First image, so use the starting image for the next workflow step
        image = [self._starting_image]
    elif self.is_done():
        # No more workflow steps to perform
        # TODO: what to do if done with workflow
        #  but execute_next is prompted?
        # printing message for now
        log.info("No steps left to run")
    else:
        image = list()
        for i in step.parent:
            res = self.get_result(i - 1)  # parents are 1 indexed
            image.append(res)

    result: np.ndarray = self.get_next_step().execute(image, parameters or step.parameter_values)
    self._results.append(result)

    # Only increment after running step
    self._next_step += 1
    return result

execute_step(i, parameters, selected_image)

Parameters:

Name Type Description Default
i int

step number (0 indexed) that you want to run

required

Returns

Source code in infer_subc/workflow/workflow.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def execute_step(self, i: int, parameters: Dict[str, Any], selected_image: List[Image]) -> np.ndarray:
    """

    Args:
        i: step number (0 indexed) that you want to run

    Returns

    """
    # TODO: discuss using selected layer for all types
    step_to_run = self._definition.steps[i]

    image = [i.data for i in selected_image]
    result: np.ndarray = step_to_run.execute(image, parameters or step_to_run.parameter_values)

    if len(self._results) <= i:
        # this is the first time running this step
        self._results.append(result)
    else:
        # this step has been run before so replacing old value
        self._results[i] = result

    return result

get_most_recent_result()

Get the result from the last executed WorkflowStep.

Parameters:

Name Type Description Default
none

None

required

Returns (np.ndarray): Result of the last executed WorkflowStep, returns the starting image if no Workflowsteps have been run.

Source code in infer_subc/workflow/workflow.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def get_most_recent_result(self) -> np.ndarray:
    """
    Get the result from the last executed WorkflowStep.

    Params:
       none: None

    Returns
        (np.ndarray): Result of the last executed WorkflowStep,
                        returns the starting image if no Workflowsteps have
                        been run.
    """
    if self._next_step == 0:
        return self._starting_image  # TODO does this behavior make sense? Return None instead?
    else:
        return self.get_result(self._next_step - 1)

get_next_step()

Get the next step to be performed

Parameters:

Name Type Description Default
none

None

required

Returns (WorkflowStep): next WorkflowStep object to perform on image None if all steps have already been executed

Source code in infer_subc/workflow/workflow.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def get_next_step(self) -> WorkflowStep:
    """
    Get the next step to be performed

    Params:
        none: None

    Returns
        (WorkflowStep): next WorkflowStep object to perform on image
        None if all steps have already been executed
    """
    if self._next_step >= len(self._definition.steps):
        return None
    return self._definition.steps[self._next_step]

get_result(step_index)

Get the result image for a workflow step.

You must call execute() on the workflow step in order to produce a result first before calling this function.

Parameters:

Name Type Description Default
step_index int

index of the WorkflowStep in the

required

Returns self.image (np.ndarray): Result of performing workflow step on the given image None if step has not been executed yet.

Source code in infer_subc/workflow/workflow.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def get_result(self, step_index: int) -> np.ndarray:
    """
    Get the result image for a workflow step.

    You must call execute() on the workflow step in order to
    produce a result first before calling this function.

    Params:
        step_index (int): index of the WorkflowStep in the
        workflowengine to get the result image of.

    Returns
        self.image (np.ndarray): Result of performing workflow step
                                 on the given image
                                 None if step has not been executed yet.
    """
    if step_index < 0:
        return self._starting_image
    if step_index >= len(self._results):
        return None  # returns None if the WorkflowStep has not been executed.

    return self._results[step_index]

is_done()

Check if all WorkflowSteps have been executed.

Parameters:

Name Type Description Default
none

None

required

Returns (bool): True if all WorkflowSteps have been executed, False if not

Source code in infer_subc/workflow/workflow.py
180
181
182
183
184
185
186
187
188
189
190
def is_done(self) -> bool:
    """
    Check if all WorkflowSteps have been executed.

    Params:
        none: None

    Returns
        (bool): True if all WorkflowSteps have been executed, False if not
    """
    return self._next_step >= len(self._definition.steps)

reset()

Reset the workflow so it can be run again

Source code in infer_subc/workflow/workflow.py
34
35
36
37
38
39
def reset(self):
    """
    Reset the workflow so it can be run again
    """
    self._next_step = 0
    self._results = list()

ConfigurationException

Bases: Exception

Raised when errors are encountered reading from Configuration files

Source code in infer_subc/workflow/workflow_config.py
11
12
13
14
15
16
class ConfigurationException(Exception):
    """
    Raised when errors are encountered reading from Configuration files
    """

    pass

WorkflowConfig

Provides access to structure workflow configuration

Source code in infer_subc/workflow/workflow_config.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class WorkflowConfig:
    """
    Provides access to structure workflow configuration
    """

    def __init__(self):
        self._all_functions = None
        self._available_workflow_names = None

    def get_available_workflows(self) -> List[str]:
        """
        Get the list of all workflows available through configuration
        """
        if self._available_workflow_names is None:
            json_list = sorted(Directories.get_structure_config_dir().glob("conf_*.json"))
            self._available_workflow_names = [p.stem[5:] for p in json_list]

        return self._available_workflow_names

    def get_all_functions(self) -> List[SegmenterFunction]:
        """
        Get the list of all available Functions from configuration
        """
        if self._all_functions is None:
            path = Directories.get_structure_config_dir() / "all_functions.json"

            try:
                with open(path) as file:
                    obj = json.load(file)
                    self._all_functions = self._all_functions_decoder(obj)
            except Exception as ex:
                raise ConfigurationException(f"Error reading json configuration from {path}") from ex

        return self._all_functions

    def get_workflow_definition(self, workflow_name: str) -> WorkflowDefinition:  # PrebuiltWorkflowDefinition:
        """
        Get a WorkflowDefinition for the given workflow from the corresponding
        prebuilt json structure config
        """
        if workflow_name is None or len(workflow_name.strip()) == 0:
            raise ValueError("workflow_name cannot be empty")

        if workflow_name not in self.get_available_workflows():
            raise ValueError(f"No workflow configuration available for {workflow_name}")

        path = Directories.get_structure_config_dir() / f"conf_{workflow_name}.json"

        return self.get_workflow_definition_from_config_file(path, workflow_name, prebuilt=True)

    def get_workflow_definition_from_config_file(
        self, file_path: Path, workflow_name: str = None, prebuilt: bool = False
    ) -> WorkflowDefinition:
        """
        Get a WorkflowDefinition based off the given json configuration file
        """
        if file_path.suffix.lower() != ".json":
            raise ValueError("Workflow configuration file must be a json file with .json file extension.")

        with open(file_path) as file:
            try:
                obj = json.load(file)
                # print(obj)
                return self._workflow_decoder(obj, workflow_name or file_path.stem, prebuilt)
            except Exception as ex:
                raise ConfigurationException(f"Error reading json configuration from {file_path}") from ex

    def save_workflow_definition_as_json(self, workflow_definition: WorkflowDefinition, output_file_path: Path):
        """
        Save a WorkflowDefinition as a json config file
        """
        if output_file_path.suffix.lower() != ".json":
            raise ValueError("Workflow configuration file save path must have a .json extension.")

        with open(output_file_path, "w") as file:
            json.dump(self._workflow_encoder(workflow_definition), file, indent=4, sort_keys=True)

    def _all_functions_decoder(self, obj: Dict) -> List[SegmenterFunction]:
        """
        Decode Functions config (all_functions.json)
        """

        def build_function_parameter(name: str, data: Dict):
            return FunctionParameter(
                name=name,
                widget_type=WidgetType.from_str(data["widget_type"]),
                data_type=data["data_type"],
                min_value=data.get("min", None),
                max_value=data.get("max", None),
                increment=data.get("increment", None),
                options=data.get("options", None),
            )

        functions = list()
        for function_k, function_v in obj.items():
            function = SegmenterFunction(
                name=function_k,
                display_name=function_v["name"],
                function=function_v["python::function"],
                module=function_v["python::module"],
            )

            if function_v.get("parameters") is not None and len(function_v["parameters"]) > 0:
                params = dict()

                for param_k, param_v in function_v["parameters"].items():
                    param_name = param_k
                    params[param_name] = list()

                    if isinstance(param_v, dict):
                        params[param_name].append(build_function_parameter(param_name, param_v))
                    elif isinstance(param_v, list):
                        for item in param_v:
                            params[param_name].append(build_function_parameter(param_name, item))

                function.parameters = params

            functions.append(function)

        return functions

    def _workflow_decoder(self, obj: Dict, workflow_name: str, prebuilt: bool = False) -> WorkflowDefinition:
        """
        Decode Workflow config (conf_{workflow_name}.json)
        """
        functions = self.get_all_functions()
        steps: List[WorkflowStep] = list()
        for step_k, step_v in obj.items():
            # print(f"step_k{step_k}  - step_v{step_v}")

            step_number = int(step_k)
            function_id = step_v["function"]
            function = next(filter(lambda f: f.name == function_id, functions), None)

            if function is None:
                raise ConfigurationException(
                    f"Could not find a Segmenter function matching the function identifier <{function_id}>."
                )

            if isinstance(step_v["parent"], list):
                parent = step_v["parent"]
            else:
                parent = [step_v["parent"]]

            step = WorkflowStep(
                category=WorkflowStepCategory.from_str(step_v["category"]),
                function=function,
                step_number=step_number,
                parent=parent,
            )

            if step_v.get("parameter_values") is not None and len(step_v["parameter_values"]) > 0:
                param_defaults = dict()

                for param_k, param_v in step_v["parameter_values"].items():
                    param_name = param_k
                    param_defaults[param_name] = param_v

                step.parameter_values = param_defaults
            # print(f"adding step {step_number}")
            steps.append(step)

        steps.sort(key=lambda s: s.step_number)

        return WorkflowDefinition(workflow_name, steps, prebuilt=prebuilt)

    def _workflow_encoder(self, workflow_definition: WorkflowDefinition) -> Dict:
        """
        Encode a WorkflowDefinition to a json dictionary
        """

        # TODO add header / version ?
        result = dict()
        for step in workflow_definition.steps:
            step_number = str(step.step_number)
            parent = step.parent[0] if len(step.parent) == 1 else step.parent

            step_dict = {
                step_number: {"function": step.function.name, "category": step.category.value, "parent": parent}
            }
            if step.parameter_values is not None:
                step_dict[step_number].update({"parameter_values": step.parameter_values})

            result.update(step_dict)

        return result

get_all_functions()

Get the list of all available Functions from configuration

Source code in infer_subc/workflow/workflow_config.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def get_all_functions(self) -> List[SegmenterFunction]:
    """
    Get the list of all available Functions from configuration
    """
    if self._all_functions is None:
        path = Directories.get_structure_config_dir() / "all_functions.json"

        try:
            with open(path) as file:
                obj = json.load(file)
                self._all_functions = self._all_functions_decoder(obj)
        except Exception as ex:
            raise ConfigurationException(f"Error reading json configuration from {path}") from ex

    return self._all_functions

get_available_workflows()

Get the list of all workflows available through configuration

Source code in infer_subc/workflow/workflow_config.py
28
29
30
31
32
33
34
35
36
def get_available_workflows(self) -> List[str]:
    """
    Get the list of all workflows available through configuration
    """
    if self._available_workflow_names is None:
        json_list = sorted(Directories.get_structure_config_dir().glob("conf_*.json"))
        self._available_workflow_names = [p.stem[5:] for p in json_list]

    return self._available_workflow_names

get_workflow_definition(workflow_name)

Get a WorkflowDefinition for the given workflow from the corresponding prebuilt json structure config

Source code in infer_subc/workflow/workflow_config.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def get_workflow_definition(self, workflow_name: str) -> WorkflowDefinition:  # PrebuiltWorkflowDefinition:
    """
    Get a WorkflowDefinition for the given workflow from the corresponding
    prebuilt json structure config
    """
    if workflow_name is None or len(workflow_name.strip()) == 0:
        raise ValueError("workflow_name cannot be empty")

    if workflow_name not in self.get_available_workflows():
        raise ValueError(f"No workflow configuration available for {workflow_name}")

    path = Directories.get_structure_config_dir() / f"conf_{workflow_name}.json"

    return self.get_workflow_definition_from_config_file(path, workflow_name, prebuilt=True)

get_workflow_definition_from_config_file(file_path, workflow_name=None, prebuilt=False)

Get a WorkflowDefinition based off the given json configuration file

Source code in infer_subc/workflow/workflow_config.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def get_workflow_definition_from_config_file(
    self, file_path: Path, workflow_name: str = None, prebuilt: bool = False
) -> WorkflowDefinition:
    """
    Get a WorkflowDefinition based off the given json configuration file
    """
    if file_path.suffix.lower() != ".json":
        raise ValueError("Workflow configuration file must be a json file with .json file extension.")

    with open(file_path) as file:
        try:
            obj = json.load(file)
            # print(obj)
            return self._workflow_decoder(obj, workflow_name or file_path.stem, prebuilt)
        except Exception as ex:
            raise ConfigurationException(f"Error reading json configuration from {file_path}") from ex

save_workflow_definition_as_json(workflow_definition, output_file_path)

Save a WorkflowDefinition as a json config file

Source code in infer_subc/workflow/workflow_config.py
86
87
88
89
90
91
92
93
94
def save_workflow_definition_as_json(self, workflow_definition: WorkflowDefinition, output_file_path: Path):
    """
    Save a WorkflowDefinition as a json config file
    """
    if output_file_path.suffix.lower() != ".json":
        raise ValueError("Workflow configuration file save path must have a .json extension.")

    with open(output_file_path, "w") as file:
        json.dump(self._workflow_encoder(workflow_definition), file, indent=4, sort_keys=True)

SegmentationWrap dataclass

Simple dataclass wrapper for segmentations of organelles + masks TODO: make a nice reppr

Source code in infer_subc/workflow/workflow_definition.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@dataclass
class SegmentationWrap:
    """
    Simple dataclass wrapper for segmentations of organelles  + masks
    TODO: make a nice reppr
    """

    name: str
    image: np.ndarray
    meta: Dict[str, Any]
    raw_meta: Tuple[Dict[str, Any], Union[Dict[str, Any], List]]
    channel_names: List[str]
    channels: List[int]
    segmentations: List[np.ndarray]
    masks: List[np.ndarray]
    mask_names: List[int]

    def __init__(self, name: str, image: np.ndarray, meta: Dict[str, Any]):
        self.name = name
        self.image = image
        self.meta = meta
        # self.raw_meta = get_raw_meta_data(meta)

    def add_mask(self, name: str, mask: np.ndarray):
        self.mask_names.append(name)
        self.masks.append(mask)

    def add_segmentation(self, name: str, segmentation: np.ndarray, channel: int):
        self.channel_names.append(name)
        self.channels.append(channel)
        self.segmentations.append(segmentation)

WorkflowDefinition dataclass

Definition of a custom aics-segmentation Workflow loaded from file.

This class only defines the workflow (i.e. the workflow characteristics and steps) and is used either for building an executable Workflow object or to access information about the Workflow without needing to execute it

Source code in infer_subc/workflow/workflow_definition.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@dataclass
class WorkflowDefinition:
    """
    Definition of a custom aics-segmentation Workflow loaded from file.

    This class only defines the workflow (i.e. the workflow characteristics and steps)
    and is used either for building an executable Workflow object
    or to access information about the Workflow without needing to execute it
    """

    name: str
    steps: List[WorkflowStep]
    prebuilt: bool

    def __init__(self, name: str, steps: List[WorkflowStep], prebuilt: bool = True):
        self.name = name
        self.steps = steps
        self.prebuilt = prebuilt
        self.from_file = True

WorkflowEngine

aicssegmentation workflow engine Use this class to access and execute aicssegmentation structure workflows

Source code in infer_subc/workflow/workflow_engine.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class WorkflowEngine:
    """
    aicssegmentation workflow engine
    Use this class to access and execute aicssegmentation structure workflows
    """

    def __init__(self, workflow_config: WorkflowConfig = None):
        self._workflow_config = workflow_config or WorkflowConfig()
        self._workflow_definitions = self._load_workflow_definitions()

    @property
    def workflow_definitions(self) -> List[WorkflowDefinition]:
        """
        List of all workflow definitions
        """
        return self._workflow_definitions

    def get_executable_workflow(self, workflow_name: str, input_image: np.ndarray) -> Workflow:
        """
        Get an executable workflow object

        inputs:
            workflow_name (str): Name of the workflow to load
            input_image (ndarray): input image for the workflow to execute on
        """
        if workflow_name is None:
            raise ArgumentNullError("workflow_name")
        if input_image is None:
            raise ArgumentNullError("input_image")

        definition = self._get_workflow_definition(workflow_name)

        return Workflow(definition, input_image)

    # JAH: add segmentation_name ... do i need it?
    def add_workflow(self, file_path: Union[Path, str], workflow_name: Union[str, None] = None) -> WorkflowDefinition:
        """
        add WorkflowDefinition to list from a configuration file
        """
        defn = self._workflow_config.get_workflow_definition_from_config_file(
            Path(file_path), workflow_name, prebuilt=False
        )
        self._workflow_definitions += [defn]

    def get_executable_batch_workflow(
        self,
        workflow_name: str,
        input_dir: str,
        output_dir: str,
        segmentation_name: str,
        channel_index: int = -1,
    ):
        """
        Get an executable BatchWorkflow object

        inputs:
            workflow_name (str): Name of the workflow to load
            input_dir (str|Path): Directory containing input files for the batch processing
            output_dir (str|Path): Output directory for the batch processing
            channel_index (int): Index of the channel to process in each image (usually a structure channel)
        """
        if workflow_name is None:
            raise ArgumentNullError("workflow_name")
        if segmentation_name is None:
            raise ArgumentNullError("segmentation_name")
        if input_dir is None:
            raise ArgumentNullError("input_dir")
        if output_dir is None:
            raise ArgumentNullError("output_dir")

        definition = self._get_workflow_definition(workflow_name)

        return BatchWorkflow(definition, input_dir, output_dir, segmentation_name, channel_index)

    def get_executable_workflow_from_config_file(
        self, file_path: Union[str, Path], input_image: np.ndarray
    ) -> Workflow:
        """
        Get an executable workflow object from a configuration file

        inputs:
            file_path (str|Path): Path to the workflow configuration file
            input_image (ndarray): input image for the workflow to execute on
        """
        if input_image is None:
            raise ArgumentNullError("input_image")
        if file_path is None:
            raise ArgumentNullError("file_path")

        definition = self._workflow_config.get_workflow_definition_from_config_file(Path(file_path))
        return Workflow(definition, input_image)

    # JAH: add segmentation_name ... do i need it?
    def get_executable_batch_workflow_from_config_file(
        self,
        file_path: Union[str, Path],
        input_dir: Union[str, Path],
        output_dir: Union[str, Path],
        segmentation_name: str,
        channel_index: int = -1,
    ):
        """
        Get an executable batch workflow object from a configuration file

        inputs:
            file_path (str|Path): Path to the workflow configuration file
            input_dir (str|Path): Directory containing input files for the batch processing
            output_dir (str|Path): Output directory for the batch processing
            channel_index (int): Index of the channel to process in each image (usually a structure channel)
        """
        if file_path is None:
            raise ArgumentNullError("file_path")
        if segmentation_name is None:
            raise ArgumentNullError("segmentation_name")
        if input_dir is None:
            raise ArgumentNullError("input_dir")
        if output_dir is None:
            raise ArgumentNullError("output_dir")

        definition = self._workflow_config.get_workflow_definition_from_config_file(Path(file_path))
        return BatchWorkflow(definition, input_dir, output_dir, segmentation_name, channel_index)

    # JAH: add segmentation_name ... do i need it?
    def get_executable_batch_workflows_from_config_file(
        self,
        file_path: Union[List[str], List[Path]],
        input_dir: Union[str, Path],
        output_dir: Union[str, Path],
        segmentation_names: List[str],
        channel_index: int = -1,
    ):
        """
        Get an executable batch workflow object from a configuration file

        inputs:
            file_path (str|Path): Path to the workflow configuration file
            input_dir (str|Path): Directory containing input files for the batch processing
            output_dir (str|Path): Output directory for the batch processing
            channel_index (int): Index of the channel to process in each image (usually a structure channel)
        """
        if file_path is None:
            raise ArgumentNullError("file_path")
        if segmentation_names is None:
            raise ArgumentNullError("segmentation_name")
        if input_dir is None:
            raise ArgumentNullError("input_dir")
        if output_dir is None:
            raise ArgumentNullError("output_dir")

        definitions = [self._workflow_config.get_workflow_definition_from_config_file(Path(fn)) for fn in file_path]
        return BatchWorkflow(definitions, input_dir, output_dir, segmentation_names, channel_index)

        # return [
        #     self.get_executable_batch_workflow_from_config_file(fp, input_dir, output_dir, nm, channel_index)
        #     for fp, nm in zip(file_path, segmentation_names)
        # ]

        # b_wfls = []
        # for wf,s_nm in zip(file_path,segmentation_name):
        #     definition = self._workflow_config.get_workflow_definition_from_config_file(Path(wf))
        #     b_wfls.append(BatchWorkflow(definition, input_dir, output_dir, s_nm, channel_index))
        # return b_wfls

    def save_workflow_definition(self, workflow_definition: WorkflowDefinition, output_file_path: Union[str, Path]):
        if workflow_definition is None:
            raise ArgumentNullError("workflow_definition")
        if output_file_path is None:
            raise ArgumentNullError("file_path")

        self._workflow_config.save_workflow_definition_as_json(workflow_definition, output_file_path)

    def _load_workflow_definitions(self) -> List[WorkflowDefinition]:
        definitions = list()
        available_workflows = self._workflow_config.get_available_workflows()
        for name in available_workflows:
            definitions.append(self._workflow_config.get_workflow_definition(name))
        return definitions

    def _get_workflow_definition(self, workflow_name: str) -> WorkflowDefinition:
        definition = next(filter(lambda d: d.name == workflow_name, self._workflow_definitions), None)
        if definition is None:
            raise ValueError(
                f"No available workflow definition found for {workflow_name}. Specify a valid workflow name."
            )

        return definition

workflow_definitions: List[WorkflowDefinition] property

List of all workflow definitions

add_workflow(file_path, workflow_name=None)

add WorkflowDefinition to list from a configuration file

Source code in infer_subc/workflow/workflow_engine.py
47
48
49
50
51
52
53
54
def add_workflow(self, file_path: Union[Path, str], workflow_name: Union[str, None] = None) -> WorkflowDefinition:
    """
    add WorkflowDefinition to list from a configuration file
    """
    defn = self._workflow_config.get_workflow_definition_from_config_file(
        Path(file_path), workflow_name, prebuilt=False
    )
    self._workflow_definitions += [defn]

get_executable_batch_workflow(workflow_name, input_dir, output_dir, segmentation_name, channel_index=-1)

Get an executable BatchWorkflow object

inputs

workflow_name (str): Name of the workflow to load input_dir (str|Path): Directory containing input files for the batch processing output_dir (str|Path): Output directory for the batch processing channel_index (int): Index of the channel to process in each image (usually a structure channel)

Source code in infer_subc/workflow/workflow_engine.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def get_executable_batch_workflow(
    self,
    workflow_name: str,
    input_dir: str,
    output_dir: str,
    segmentation_name: str,
    channel_index: int = -1,
):
    """
    Get an executable BatchWorkflow object

    inputs:
        workflow_name (str): Name of the workflow to load
        input_dir (str|Path): Directory containing input files for the batch processing
        output_dir (str|Path): Output directory for the batch processing
        channel_index (int): Index of the channel to process in each image (usually a structure channel)
    """
    if workflow_name is None:
        raise ArgumentNullError("workflow_name")
    if segmentation_name is None:
        raise ArgumentNullError("segmentation_name")
    if input_dir is None:
        raise ArgumentNullError("input_dir")
    if output_dir is None:
        raise ArgumentNullError("output_dir")

    definition = self._get_workflow_definition(workflow_name)

    return BatchWorkflow(definition, input_dir, output_dir, segmentation_name, channel_index)

get_executable_batch_workflow_from_config_file(file_path, input_dir, output_dir, segmentation_name, channel_index=-1)

Get an executable batch workflow object from a configuration file

inputs

file_path (str|Path): Path to the workflow configuration file input_dir (str|Path): Directory containing input files for the batch processing output_dir (str|Path): Output directory for the batch processing channel_index (int): Index of the channel to process in each image (usually a structure channel)

Source code in infer_subc/workflow/workflow_engine.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def get_executable_batch_workflow_from_config_file(
    self,
    file_path: Union[str, Path],
    input_dir: Union[str, Path],
    output_dir: Union[str, Path],
    segmentation_name: str,
    channel_index: int = -1,
):
    """
    Get an executable batch workflow object from a configuration file

    inputs:
        file_path (str|Path): Path to the workflow configuration file
        input_dir (str|Path): Directory containing input files for the batch processing
        output_dir (str|Path): Output directory for the batch processing
        channel_index (int): Index of the channel to process in each image (usually a structure channel)
    """
    if file_path is None:
        raise ArgumentNullError("file_path")
    if segmentation_name is None:
        raise ArgumentNullError("segmentation_name")
    if input_dir is None:
        raise ArgumentNullError("input_dir")
    if output_dir is None:
        raise ArgumentNullError("output_dir")

    definition = self._workflow_config.get_workflow_definition_from_config_file(Path(file_path))
    return BatchWorkflow(definition, input_dir, output_dir, segmentation_name, channel_index)

get_executable_batch_workflows_from_config_file(file_path, input_dir, output_dir, segmentation_names, channel_index=-1)

Get an executable batch workflow object from a configuration file

inputs

file_path (str|Path): Path to the workflow configuration file input_dir (str|Path): Directory containing input files for the batch processing output_dir (str|Path): Output directory for the batch processing channel_index (int): Index of the channel to process in each image (usually a structure channel)

Source code in infer_subc/workflow/workflow_engine.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_executable_batch_workflows_from_config_file(
    self,
    file_path: Union[List[str], List[Path]],
    input_dir: Union[str, Path],
    output_dir: Union[str, Path],
    segmentation_names: List[str],
    channel_index: int = -1,
):
    """
    Get an executable batch workflow object from a configuration file

    inputs:
        file_path (str|Path): Path to the workflow configuration file
        input_dir (str|Path): Directory containing input files for the batch processing
        output_dir (str|Path): Output directory for the batch processing
        channel_index (int): Index of the channel to process in each image (usually a structure channel)
    """
    if file_path is None:
        raise ArgumentNullError("file_path")
    if segmentation_names is None:
        raise ArgumentNullError("segmentation_name")
    if input_dir is None:
        raise ArgumentNullError("input_dir")
    if output_dir is None:
        raise ArgumentNullError("output_dir")

    definitions = [self._workflow_config.get_workflow_definition_from_config_file(Path(fn)) for fn in file_path]
    return BatchWorkflow(definitions, input_dir, output_dir, segmentation_names, channel_index)

get_executable_workflow(workflow_name, input_image)

Get an executable workflow object

inputs

workflow_name (str): Name of the workflow to load input_image (ndarray): input image for the workflow to execute on

Source code in infer_subc/workflow/workflow_engine.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def get_executable_workflow(self, workflow_name: str, input_image: np.ndarray) -> Workflow:
    """
    Get an executable workflow object

    inputs:
        workflow_name (str): Name of the workflow to load
        input_image (ndarray): input image for the workflow to execute on
    """
    if workflow_name is None:
        raise ArgumentNullError("workflow_name")
    if input_image is None:
        raise ArgumentNullError("input_image")

    definition = self._get_workflow_definition(workflow_name)

    return Workflow(definition, input_image)

get_executable_workflow_from_config_file(file_path, input_image)

Get an executable workflow object from a configuration file

inputs

file_path (str|Path): Path to the workflow configuration file input_image (ndarray): input image for the workflow to execute on

Source code in infer_subc/workflow/workflow_engine.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_executable_workflow_from_config_file(
    self, file_path: Union[str, Path], input_image: np.ndarray
) -> Workflow:
    """
    Get an executable workflow object from a configuration file

    inputs:
        file_path (str|Path): Path to the workflow configuration file
        input_image (ndarray): input image for the workflow to execute on
    """
    if input_image is None:
        raise ArgumentNullError("input_image")
    if file_path is None:
        raise ArgumentNullError("file_path")

    definition = self._workflow_config.get_workflow_definition_from_config_file(Path(file_path))
    return Workflow(definition, input_image)

WorkflowStep dataclass

Represents a single step in an aicssegmentation Workflow

Source code in infer_subc/workflow/workflow_step.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@dataclass
class WorkflowStep:
    """
    Represents a single step in an aicssegmentation Workflow
    """

    category: WorkflowStepCategory
    function: SegmenterFunction
    step_number: int
    parent: List[int]
    parameter_values: Dict[str, List] = None

    @property
    def name(self):
        return self.function.display_name

    def execute(self, input_images: List[np.ndarray], parameters: Dict[str, Any] = None) -> np.ndarray:
        """
        Execute this workflow step on the given input image and return the result.

        Params:
            input_images (List[np.ndarray]): List of image inputs to perform this
                    workflow step on, generally parent image
            parameters (Dict): Dictionary of parameters to pass to the
                                underlying function

        Returns
            self.result (np.ndarray): Result of performing workflow step
                                        on the given image.
        """
        if not isinstance(input_images, list):
            raise ValueError("input_images must be a list")

        if parameters is not None and not self._check_parameters(parameters):
            raise ValueError(
                "Provided parameters are invalid. All keys in the parameters dictionary"
                "must correspond to existing parameter names defined for the underlying workflow function."
                "Note: parameter names are case sensitive"
            )

        py_module = importlib.import_module(self.function.module)
        py_function = getattr(py_module, self.function.function)

        try:
            # Most functions require unpacking the images
            if parameters is not None:
                return py_function(*input_images, **parameters)

            return py_function(*input_images)
        except TypeError:
            # Some functions want it as a list
            if parameters is not None:
                return py_function(input_images, **parameters)
            return py_function(input_images)

    def _check_parameters(self, parameters: Dict[str, Any]) -> bool:
        for key in parameters.keys():
            if key not in self.function.parameters.keys():
                return False

        return True

execute(input_images, parameters=None)

Execute this workflow step on the given input image and return the result.

Parameters:

Name Type Description Default
input_images List[np.ndarray]

List of image inputs to perform this workflow step on, generally parent image

required
parameters Dict

Dictionary of parameters to pass to the underlying function

None

Returns self.result (np.ndarray): Result of performing workflow step on the given image.

Source code in infer_subc/workflow/workflow_step.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def execute(self, input_images: List[np.ndarray], parameters: Dict[str, Any] = None) -> np.ndarray:
    """
    Execute this workflow step on the given input image and return the result.

    Params:
        input_images (List[np.ndarray]): List of image inputs to perform this
                workflow step on, generally parent image
        parameters (Dict): Dictionary of parameters to pass to the
                            underlying function

    Returns
        self.result (np.ndarray): Result of performing workflow step
                                    on the given image.
    """
    if not isinstance(input_images, list):
        raise ValueError("input_images must be a list")

    if parameters is not None and not self._check_parameters(parameters):
        raise ValueError(
            "Provided parameters are invalid. All keys in the parameters dictionary"
            "must correspond to existing parameter names defined for the underlying workflow function."
            "Note: parameter names are case sensitive"
        )

    py_module = importlib.import_module(self.function.module)
    py_function = getattr(py_module, self.function.function)

    try:
        # Most functions require unpacking the images
        if parameters is not None:
            return py_function(*input_images, **parameters)

        return py_function(*input_images)
    except TypeError:
        # Some functions want it as a list
        if parameters is not None:
            return py_function(input_images, **parameters)
        return py_function(input_images)