Dump Data

You can also use the worker pool to parallelize the dumping of data to disk/network with your own custom defined function.

See also

Workers - Learn more about the workers behind your pipeline.

To parallelize dumping, we still need to use the decorator graphbook.batch() because dumping is made available to graphbook.steps.BatchStep.

custom_nodes/batch_steps.py
from graphbook import Note, step, batch
from PIL import Image
from typing import List
import torch
import torchvision.transforms.functional as F

# Custom defined function that will execute in parallel
@staticmethod
def save_image(image: Image.Image, output_path: str):
    image.save(output_path)

@step("LoadImages")
@batch(8, "image_paths", dump_fn=save_image, load_fn=convert_to_tensor)
@staticmethod
def on_load_images(tensors: List[torch.Tensor], items: List[dict], notes: List[Note]):
    # Generate images
    ...

    args = []
    for image, item in zip(images, items):
        input_path = items['value']
        output_path = input_path.replace('.jpg', '_processed.jpg')
        args.append((image, output_path))
    return args
from graphbook.steps import BatchStep
from graphbook import Note
from PIL import Image
from typing import List
import torch
import torchvision.transforms.functional as F

class LoadImages(BatchStep):
    RequiresInput = True
    Parameters = {
        "batch_size": {"type": "number", "default": 8},
        "item_key": {"type": "string", "default": "image_paths"}
    }
    Outputs = ["out"]
    Category = ""

    def __init__(self, batch_size, item_key):
        super().__init__(batch_size, item_key)

    # Custom defined function that will execute in parallel
    @staticmethod
    def dump_fn(image: Image.Image, output_path: str):
        image.save(output_path)

    @staticmethod
    def on_item_batch(tensors: List[torch.Tensor], items: List[dict], notes: List[Note]):
        # Generate images
        ...

        args = []
        for image, item in zip(images, items):
            input_path = items['value']
            output_path = input_path.replace('.jpg', '_processed.jpg')
            args.append((image, output_path))
        return args

Here, we override the graphbook.steps.BatchStep.dump_fn() method to define our custom function to dump images to disk in parallel with the main process. The event graphbook.steps.BatchStep.on_item_batch() can return a list of parameters to pass to dump_fn(**args) for each element in the return output.