Steps

Below contains detailed reference documentation for working with Steps in Graphbook. You can create steps with decorators and functions or by extending any of the following base classes.

See also

Decorators graphbook.step(), graphbook.batch(), graphbook.source(), and graphbook.prompt() to create steps in a functional way.

class graphbook.steps.Step(item_key=None)

Bases: object

The base class of the executable workflow node, step. All other step classes should be a descendant of this class.

forward_note(data: Any) str

Deprecated. Use graphbook.core.steps.Step.route() instead.

log(message: str, type: str = 'info')

Logs a message

Parameters:
  • message (str) – message to log

  • type (str) – type of log

on_after_item(data: Any)

Executes after processing items

Parameters:

data (Any) – The data input

on_clear()

Executes when a request to clear the step is made. This is useful for steps that have internal states that need to be reset.

on_data(data: Any)

Executes upon receiving data

Parameters:

data (Any) – The data input

on_end()

Executes upon end of graph execution

on_item(item: Any, data: Any)

Executes upon receiving an item. Is called after on_data() and before on_after_item().

Parameters:
  • item (Any) – The item to process

  • data (Any) – The data that the item belongs to

on_start()

Executes upon start of graph execution

route(data: Any) str | Dict[str, List[Any]] | None

Routes the item of data. Must return the corresponding output key or a dictionary of the form graphbook.core.steps.StepOutput. Is called after on_after_item().

Parameters:

data (Any) – The data input

Returns:

A string that the data is associated with, or if multiple are being processed at a time, a StepOutput may be used.

Example

The below examples assume that this step has two output pins, “dog” and “cat”:

def route(self, data: dict) -> str:
    if data["is_dog"]:
        return "dog"
    return "cat"
def route(self, data: dict) -> StepOutput:
    dog_images = data["dog"]
    cat_images = data["cat"]
    return {
        "dog": dog_images,
        "cat": cat_images,
    }
set_context(**context)

Sets the context of the step. This is useful for setting the node_id and node_name of the step.

Parameters:

**context – The context to set

graphbook.steps.StepOutput

alias of Dict[str, List[Any]]

class graphbook.steps.BatchStep(batch_size, item_key)

Bases: AsyncStep

A Step used for batch and parallel processing using custom function definitions. Override the load_fn and dump_fn methods with your custom logic to load and dump data, respectively.

dump_data(data: Any, output)

Dumps data to be processed by the worker pool. This is called after the step processes the batch of items. The class must have a dump_fn method that takes in the output data and returns a string path to the dumped data.

Parameters:
  • data (Any) – The data input

  • item_key (str) – The item key to dump

  • output (Any) – The output data to dump

static dump_fn(**args)

The dump function to be overriden by BatchSteps that write outputs to disk.

in_q(data: Any | None)

Enqueue an item of data to be processed by the step

Parameters:

data (Any) – The input

static load_fn(**args)

The load function to be overriden by BatchSteps that will forward preprocessed data to on_item_batch.

on_clear()

Executes when a request to clear the step is made. This is useful for steps that have internal states that need to be reset.

on_item_batch(outputs, items, data) List[Tuple[Any]] | None

Called when B items are loaded and are ready to be processed where B is batch_size. This is meant to be overriden by subclasses.

Parameters:
  • outputs (List[Any]) – The list of loaded outputs of length B

  • items (List[Any]) – The list of anys of length B associated with outputs. This list has the same order as outputs does along the batch dimension

  • data (List[Any]) – The list of Anys of length B associated with outputs. This list has the same order as outputs does along the batch dimension

Returns:

The output data to be dumped as a list of parameters to be passed to dump_fn. If None is returned, nothing will be dumped.

Return type:

Optional[List[Tuple[Any]]]

class graphbook.steps.PromptStep

Bases: AsyncStep

A Step that is capable of prompting the user for input. This is useful for interactive workflows where data labeling, model evaluation, or any other human input is required. Once the prompt is handled, the execution lifecycle of the Step will proceed, normally.

get_prompt(data: Any) dict

Returns the prompt to be displayed to the user. This method can be overriden by the subclass. By default, it will return a boolean prompt. If None is returned, the prompt will be skipped on this data. A list of available prompts can be found in graphbook.prompts.

Parameters:

data (Any) – The Any input to display to the user

on_clear()

Clears any awaiting prompts and the prompt queue. If you plan on overriding this method, make sure to call super().on_clear() to ensure the prompt queue is cleared.

on_prompt_response(data: Any, response: Any)

Called when the user responds to the prompt. This method must be overriden by the subclass.

Parameters:
  • data (Any) – The Any input that was prompted

  • response (Any) – The user’s response

See also

graphbook.prompts for available user prompts.

class graphbook.steps.SourceStep

Bases: Step

A Step that accepts no input but produce outputs. This this class will attempt to load all data at once, so it is recommended to use GeneratorSourceStep especially for large datasets.

load() Dict[str, List[Any]]

Function to load data. Must output a dictionary of outputs.

Example

return {
    "out": [{"images": [PIL.Image("image_of_dog.png")]}]
}
class graphbook.steps.GeneratorSourceStep

Bases: SourceStep

A Step that accepts no input but produce outputs.

load() Generator[Dict[str, List[Any]], None, None]

Function to load data. Must output a generator that yields dictionary of outputs.

Example

yield {
    "out": [{"images": [PIL.Image("image_of_dog.png")]}]
}
on_clear()

Executes when a request to clear the step is made. This is useful for steps that have internal states that need to be reset.