Reference

API documentation

class graphbook.steps.base.AsyncStep(id, logger, item_key=None)

Asynchronous processing step that will consume everything in the in_queue so that the main thread can handle the outputs. Useful for parallel processing where the task can be optimized with multiple processes and the main thread can continue processing the rest of the graph.

class graphbook.steps.base.BatchStep(id, logger, batch_size, item_key)

A Step used for batch processing. This step will consume Pytorch tensor batches loaded by the worker pool by default.

in_q(data_record: DataRecord)

Enqueue a data record to be processed by the step

Parameters:

data_record (DataRecord) – The DataRecord input

on_item_batch(tensor, items, records)

Called when B items are loaded into PyTorch tensors and are ready to be processed where B is batch_size. This is meant to be overriden by subclasses.

Parameters:
  • tensors (List[torch.Tensor]) – The list of loaded tensors of length B

  • items (List[DataItem]) – The list of DataItems of length B associated with tensors. This list has the same order as tensors does along the batch dimension

  • records (List[DataRecord]) – The list of DataRecords of length B associated with tensors. This list has the same order as tensor does along the batch dimension

class graphbook.steps.base.DataItem(item, type=None, annotation={})

Data structure containing the text, audio, image, or video coupled with its annotation.

Parameters:
  • item (str) – A path to media, string of text, or the actual data item to store

  • type (str) – Optional string to specify the type of data this is

  • annotation (dict) – A dictionary of annotations for this item

Example

d = DataItem("path/to/image.jpg", "image", {"prediction": "dog"})
json()

Returns DataItem into a serialized JSON format

class graphbook.steps.base.DataRecord(key: str = '', annotation: dict = {}, items: dict = {})

The unit that passes through workflow steps. DataRecords contains a dictionary of DataItems related to the record, and a dictionary of annotations. It also contains the property “key” which is useful to set with a unique id as in its id from its original database.

Parameters:
  • key (str) – An optional key or id

  • annotation (Dict[str, str]) – An optional dictionary of annotations for this item

  • items (Dict[str, List[DataItems]]) – An optional dictionary of DataItems

Example

d = DataRecord( "0123456789", {"prediction": "dog"}, {"images": [DataItem("image_of_dog.png")]} )
json()

Returns DataRecord in a serialized JSON format

put_item(item_key: str, item_value: str)

A convenient function to add a DataItem to an item list

Parameters:
  • item_key (str) – The item key to append to

  • item_value (str) – The value of the item

class graphbook.steps.base.SourceStep(id, logger)

A Step that accepts no input but produce outputs.

load() Dict[str, List[DataRecord]]

Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.

class graphbook.steps.base.Split(id, logger, split_fn)

Routes incoming DataRecords into either of two output slots, A or B. If split_fn evaluates to True, the record will be forwarded to A, else the record will be forwarded to B.

Parameters:

split_fn (str) – A Python syntax function. The str must contain the function header (def …). The function will be evaluated on forward_record(record) where each record is fed into split_fn(record).

forward_record(record) str

Routes a DataRecord. Must return the corresponding output key or a dictionary that contains DataRecords. Is called after on_after_items().

Parameters:

data_record (DataRecord) – The DataRecord input

Returns:

A string that the record is associated with, or if multiple records are being processed at a time, a StepOutput may be used.

class graphbook.steps.base.SplitItemField(id, logger, split_fn, item_key, a_key, b_key, should_delete_original=True)

Associates items with a different item key based on the split_fn(item). If split_fn evaluates to True, the item will transfer to the item_key specified by a_key, else the item will transfer to the item_key specified by b_key.

Parameters:
  • split_fn (str) – A Python syntax function. The str must contain the function header (def …). The function will be evaluated on on_after_items(record) where each selected item from item_key is fed into split_fn(item).

  • item_key (str) – Original item_key that the items come from

  • a_key (str) – Will append item to DataItem list associated with the a_key if split_fn(item) evaluates to True

  • b_key (str) – Will append item to DataItem list associated with the b_key if split_fn(item) evaluates to False

  • should_delete_original (str) – If True, will delete original DataItem key-value pair of item_key. Defaults to True

on_after_items(record: DataRecord) Dict[str, List[DataRecord]]

Executes upon receiving a Datarecord and after processing the selected DataItems

Parameters:

data_record (DataRecord) – The DataRecord input

class graphbook.steps.base.SplitRecordsByItems(id, logger, split_items_fn, item_key)

Routes incoming DataRecords into either of two output slots, A or B. If split_fn evaluates to True, the record will be forwarded to A, else the record will be forwarded to B.

Parameters:

split_fn (str) – A Python syntax function. The str must contain the function header (def …). The function will be evaluated on forward_record(record) where each record and selected items is fed into split_fn(items, records).

forward_record(record: DataRecord) Dict[str, List[DataRecord]]

Routes a DataRecord. Must return the corresponding output key or a dictionary that contains DataRecords. Is called after on_after_items().

Parameters:

data_record (DataRecord) – The DataRecord input

Returns:

A string that the record is associated with, or if multiple records are being processed at a time, a StepOutput may be used.

class graphbook.steps.base.Step(id, logger, item_key=None)

The base class of an executable workflow node. All other workflow nodes should inherit from Step.

Parameters:
  • key (str) – An optional key or id

  • annotation (Dict[str, str]) – An optional dictionary of annotations for this item

  • items (Dict[str, List[DataItems]]) – An optional dictionary of DataItems

forward_record(data_record: DataRecord) str | Dict[str, List[DataRecord]]

Routes a DataRecord. Must return the corresponding output key or a dictionary that contains DataRecords. Is called after on_after_items().

Parameters:

data_record (DataRecord) – The DataRecord input

Returns:

A string that the record is associated with, or if multiple records are being processed at a time, a StepOutput may be used.

on_after_items(data_record: DataRecord)

Executes upon receiving a Datarecord and after processing the selected DataItems

Parameters:

data_record (DataRecord) – The DataRecord input

on_before_items(data_record: DataRecord)

Executes upon receiving a DataRecord and before receiving the selected DataItems

Parameters:

data_record (DataRecord) – The DataRecord input

on_end()

Executes upon end of graph execution

on_item(item: DataItem, data_record: DataRecord)

Executes upon receiving a DataItem. Is called after on_before_items().

Parameters:
  • item (DataItem) – The DataItem input

  • data_record (DataRecord) – The DataRecord that the DataItem belongs to

on_start()

Executes upon start of graph execution

remove_children()

Removes all children steps

set_child(child: Step, slot_name: str = 'out')

Sets a child step

Parameters:
  • child (Step) – child step

  • slot_name (str) – slot to bind the child to

graphbook.steps.base.StepOutput

A dict mapping of output slot to DataRecord list. Every Step outputs a StepOutput.

alias of Dict[str, List[DataRecord]]

class graphbook.steps.io.DumpJSONL(id, logger, jsonl_path)
on_after_items(data_record: DataRecord)

Executes upon receiving a Datarecord and after processing the selected DataItems

Parameters:

data_record (DataRecord) – The DataRecord input

class graphbook.steps.io.LoadImageDataset(id, logger, image_dir)
load() Dict[str, List[DataRecord]]

Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.

class graphbook.steps.io.LoadJSON(id, logger, json_path)
load() Dict[str, List[DataRecord]]

Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.

class graphbook.steps.io.LoadJSONAsDataRecords(id, logger, json_path)
load() Dict[str, List[DataRecord]]

Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.

class graphbook.steps.io.LoadJSONL(id, logger, jsonl_path, start_from=0)
load() Dict[str, List[DataRecord]]

Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.

class graphbook.steps.hf_transformers.HFImageProcessorStep(id, logger, batch_size, item_key, model: AutoModel, processor: AutoImageProcessor)
on_item_batch(tensors, items, records) Dict[str, List[DataRecord]]

Called when B items are loaded into PyTorch tensors and are ready to be processed where B is batch_size. This is meant to be overriden by subclasses.

Parameters:
  • tensors (List[torch.Tensor]) – The list of loaded tensors of length B

  • items (List[DataItem]) – The list of DataItems of length B associated with tensors. This list has the same order as tensors does along the batch dimension

  • records (List[DataRecord]) – The list of DataRecords of length B associated with tensors. This list has the same order as tensor does along the batch dimension