Batch Data

One of the most important features offered by Graphbook is its multiprocessing capabilities. Graphbook has a worker pool that can be used to parallelize the loading of data with your own custom defined function. You can also use it to parallelize the writing of outputs, but that is covered in the next section.

See also

Workers - Learn more about the workers behind your pipeline.

Load and Batch Data

In this section, we will cover the decorator graphbook.batch(), and how to use it to parallelize the loading of data. For example, to create a batch step that loads images from the file system and convert them to PyTorch Tensors, you can use the following code:

custom_nodes/batch_steps.py
from graphbook import Note, step, batch
from PIL import Image
from typing import List
import torch
import torchvision.transforms.functional as F

# Custom defined function that will execute in parallel
@staticmethod
def convert_to_tensor(item: dict) -> torch.Tensor:
    image_path = item["value"]
    pil_image = Image.open(image_path)
    return F.to_tensor(pil_image)

@step("LoadImages")
@batch(8, "image_paths", load_fn=convert_to_tensor)
@staticmethod
def on_load_images(tensors: List[torch.Tensor], items: List[dict], notes: List[Note]):
    for tensor, note in zip(tensors, notes):
        if note["tensor"] is None:
            note["tensor"] = []
        note["tensor"].append(tensor)
from graphbook.steps import BatchStep
from graphbook import Note
from PIL import Image
from typing import List
import torch
import torchvision.transforms.functional as F

class LoadImages(BatchStep):
    RequiresInput = True
    Parameters = {
        "batch_size": {"type": "number", "default": 8},
        "item_key": {"type": "string", "default": "image_paths"}
    }
    Outputs = ["out"]
    Category = ""

    def __init__(self, batch_size, item_key):
        super().__init__(batch_size, item_key)

    # Custom defined function that will execute in parallel
    @staticmethod
    def load_fn(item: dict) -> torch.Tensor:
        image_path = item["value"]
        pil_image = Image.open(image_path)
        return F.to_tensor(pil_image)

    @staticmethod
    def on_item_batch(tensors: List[torch.Tensor], items: List[dict], notes: List[Note]):
        for tensor, note in zip(tensors, notes):
            if note["tensor"] is None:
                note["tensor"] = []
            note["tensor"].append(tensor)

The above step simply loads images from the file system and converts them to PyTorch Tensors assuming that the notes containing the image paths come from another source step.

Here is a breakdown of what we did:

  1. First, we defined a custom function convert_to_tensor that will execute in parallel. This function takes the input item that is specified by our batch step.

  2. We give a name to our step “LoadImages”.

  3. We use the graphbook.batch() decorator to specify that this step is a batch step. The first parameter is the default batch size, the second parameter is the item key from the expected notes that we will use, and the third parameter is the function that we defined in the first step.

    Note

    The first two parameters batch_size and item_key will be configurable in the UI. If you are designing the step as a class, you must manually define these parameters.

  4. We mark the decorated method as static, because we do not care about the underlying class instance.

  5. We define the graphbook.steps.BatchStep.on_item_batch() method that will be executed which simply assigns the output tensors to the notes that they came from.

Tip

A batch step decorates graphbook.steps.BatchStep.on_item_batch() by default. This method is executed with the following parameters, respectively:

  • The tensors (or whatever we output from out defined function)

  • The associated input item

  • The associated note that it came from

All three lists should be of size equal to the batch size.

Passing Data to an ML Model

Of course, if you’re batching inputs such as tensors, you are most likely preparing them to be loaded into the GPU to pass them into an ML model. By immediately passing your tensors to the model, we can avoid the large memory overhead of storing the tensors in the notes. You can do so with the following example:

custom_nodes/batch_steps.py
from graphbook import Note, step, batch
from typing import List
import torch

@step("MyMLModel")
@batch(8, "image_paths", load_fn=convert_to_tensor)
@param("model", type="resource")
@torch.no_grad()
def on_load_images(ctx, images: List[torch.Tensor], items: List[dict], notes: List[Note]):
    batch = torch.stack(images).to("cuda")
    outputs = ctx.model(batch)

    # (Option 1) Store the model's outputs in the items
    for output, item in zip(images, items):
        item["output"] = output

    # (Option 2) Store the model's outputs in the note
    for output, note in zip(outputs, notes):
        if note["output"] is None:
            note["output"] = []
        note["output"].append(output)
from graphbook.steps import BatchStep
from graphbook import Note
from typing import List
import torch

class MyMLModel(BatchStep):
    RequiresInput = True
    Parameters = {
        "batch_size": {"type": "number", "default": 8},
        "item_key": {"type": "string", "default": "image_paths"},
        "model": {"type": "resource"}
    }
    Outputs = ["out"]
    Category = ""

    def __init__(self, batch_size, item_key, model):
        super().__init__(batch_size, item_key)
        self.model = model

    ...

    @torch.no_grad()
    def on_item_batch(self, images: List[torch.Tensor], items: List[dict], notes: List[Note]):
        batch = torch.stack(images).to("cuda")
        outputs = self.model(batch)

        # (Option 1) Store the model's outputs in the items
        for output, item in zip(images, items):
            item["output"] = output

        # (Option 2) Store the model's outputs in the note
        for output, note in zip(outputs, notes):
            if note["output"] is None:
                note["output"] = []
            note["output"].append(output)

The example above assumes that there is already a resource containing a model, loaded into the GPU, that can be used to process the images.