Dump Data
You can also use the worker pool to parallelize the dumping of data to disk/network with your own custom defined function.
See also
Workers - Learn more about the workers behind your pipeline.
To parallelize dumping, we still need to use the decorator graphbook.batch() because dumping is made available to graphbook.steps.BatchStep.
from graphbook import step, batch
from PIL import Image
from typing import List
import torch
import torchvision.transforms.functional as F
# Custom defined function that will execute in parallel
@staticmethod
def save_image(image: Image.Image, output_path: str):
image.save(output_path)
@step("LoadImages")
@batch(8, "image_paths", dump_fn=save_image, load_fn=convert_to_tensor)
@staticmethod
def on_load_images(tensors: List[torch.Tensor], items: List[dict], data: List[dict]):
# Generate images
...
args = []
for image, item in zip(images, items):
input_path = items['value']
output_path = input_path.replace('.jpg', '_processed.jpg')
args.append((image, output_path))
return args
from graphbook.steps import BatchStep
from PIL import Image
from typing import List
import torch
import torchvision.transforms.functional as F
class LoadImages(BatchStep):
RequiresInput = True
Parameters = {
"batch_size": {"type": "number", "default": 8},
"item_key": {"type": "string", "default": "image_paths"}
}
Outputs = ["out"]
Category = ""
def __init__(self, batch_size, item_key):
super().__init__(batch_size, item_key)
# Custom defined function that will execute in parallel
@staticmethod
def dump_fn(image: Image.Image, output_path: str):
image.save(output_path)
@staticmethod
def on_item_batch(tensors: List[torch.Tensor], items: List[dict], data: List[dict]):
# Generate images
...
args = []
for image, item in zip(images, items):
input_path = items['value']
output_path = input_path.replace('.jpg', '_processed.jpg')
args.append((image, output_path))
return args
Here, we override the graphbook.steps.BatchStep.dump_fn() method to define our custom function to dump images to disk in parallel with the main process.
The event graphbook.steps.BatchStep.on_item_batch() can return a list of parameters to pass to dump_fn(**args) for each element in the return output.