Reference
API documentation
- class graphbook.steps.base.AsyncStep(id, logger, item_key=None)
Asynchronous processing step that will consume everything in the in_queue so that the main thread can handle the outputs. Useful for parallel processing where the task can be optimized with multiple processes and the main thread can continue processing the rest of the graph.
- class graphbook.steps.base.BatchStep(id, logger, batch_size, item_key)
A Step used for batch processing. This step will consume Pytorch tensor batches loaded by the worker pool by default.
- in_q(data_record: DataRecord)
Enqueue a data record to be processed by the step
- Parameters:
data_record (DataRecord) – The DataRecord input
- on_item_batch(tensor, items, records)
Called when B items are loaded into PyTorch tensors and are ready to be processed where B is batch_size. This is meant to be overriden by subclasses.
- Parameters:
tensors (List[torch.Tensor]) – The list of loaded tensors of length B
items (List[DataItem]) – The list of DataItems of length B associated with tensors. This list has the same order as tensors does along the batch dimension
records (List[DataRecord]) – The list of DataRecords of length B associated with tensors. This list has the same order as tensor does along the batch dimension
- class graphbook.steps.base.DataItem(item, type=None, annotation={})
Data structure containing the text, audio, image, or video coupled with its annotation.
- Parameters:
item (str) – A path to media, string of text, or the actual data item to store
type (str) – Optional string to specify the type of data this is
annotation (dict) – A dictionary of annotations for this item
Example
d = DataItem("path/to/image.jpg", "image", {"prediction": "dog"})
- json()
Returns DataItem into a serialized JSON format
- class graphbook.steps.base.DataRecord(key: str = '', annotation: dict = {}, items: dict = {})
The unit that passes through workflow steps. DataRecords contains a dictionary of DataItems related to the record, and a dictionary of annotations. It also contains the property “key” which is useful to set with a unique id as in its id from its original database.
- Parameters:
key (str) – An optional key or id
annotation (Dict[str, str]) – An optional dictionary of annotations for this item
items (Dict[str, List[DataItems]]) – An optional dictionary of DataItems
Example
d = DataRecord( "0123456789", {"prediction": "dog"}, {"images": [DataItem("image_of_dog.png")]} )
- json()
Returns DataRecord in a serialized JSON format
- put_item(item_key: str, item_value: str)
A convenient function to add a DataItem to an item list
- Parameters:
item_key (str) – The item key to append to
item_value (str) – The value of the item
- class graphbook.steps.base.SourceStep(id, logger)
A Step that accepts no input but produce outputs.
- load() Dict[str, List[DataRecord]]
Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.
- class graphbook.steps.base.Split(id, logger, split_fn)
Routes incoming DataRecords into either of two output slots, A or B. If split_fn evaluates to True, the record will be forwarded to A, else the record will be forwarded to B.
- Parameters:
split_fn (str) – A Python syntax function. The str must contain the function header (def …). The function will be evaluated on forward_record(record) where each record is fed into split_fn(record).
- forward_record(record) str
Routes a DataRecord. Must return the corresponding output key or a dictionary that contains DataRecords. Is called after on_after_items().
- Parameters:
data_record (DataRecord) – The DataRecord input
- Returns:
A string that the record is associated with, or if multiple records are being processed at a time, a StepOutput may be used.
- class graphbook.steps.base.SplitItemField(id, logger, split_fn, item_key, a_key, b_key, should_delete_original=True)
Associates items with a different item key based on the split_fn(item). If split_fn evaluates to True, the item will transfer to the item_key specified by a_key, else the item will transfer to the item_key specified by b_key.
- Parameters:
split_fn (str) – A Python syntax function. The str must contain the function header (def …). The function will be evaluated on on_after_items(record) where each selected item from item_key is fed into split_fn(item).
item_key (str) – Original item_key that the items come from
a_key (str) – Will append item to DataItem list associated with the a_key if split_fn(item) evaluates to True
b_key (str) – Will append item to DataItem list associated with the b_key if split_fn(item) evaluates to False
should_delete_original (str) – If True, will delete original DataItem key-value pair of item_key. Defaults to True
- on_after_items(record: DataRecord) Dict[str, List[DataRecord]]
Executes upon receiving a Datarecord and after processing the selected DataItems
- Parameters:
data_record (DataRecord) – The DataRecord input
- class graphbook.steps.base.SplitRecordsByItems(id, logger, split_items_fn, item_key)
Routes incoming DataRecords into either of two output slots, A or B. If split_fn evaluates to True, the record will be forwarded to A, else the record will be forwarded to B.
- Parameters:
split_fn (str) – A Python syntax function. The str must contain the function header (def …). The function will be evaluated on forward_record(record) where each record and selected items is fed into split_fn(items, records).
- forward_record(record: DataRecord) Dict[str, List[DataRecord]]
Routes a DataRecord. Must return the corresponding output key or a dictionary that contains DataRecords. Is called after on_after_items().
- Parameters:
data_record (DataRecord) – The DataRecord input
- Returns:
A string that the record is associated with, or if multiple records are being processed at a time, a StepOutput may be used.
- class graphbook.steps.base.Step(id, logger, item_key=None)
The base class of an executable workflow node. All other workflow nodes should inherit from Step.
- Parameters:
key (str) – An optional key or id
annotation (Dict[str, str]) – An optional dictionary of annotations for this item
items (Dict[str, List[DataItems]]) – An optional dictionary of DataItems
- forward_record(data_record: DataRecord) str | Dict[str, List[DataRecord]]
Routes a DataRecord. Must return the corresponding output key or a dictionary that contains DataRecords. Is called after on_after_items().
- Parameters:
data_record (DataRecord) – The DataRecord input
- Returns:
A string that the record is associated with, or if multiple records are being processed at a time, a StepOutput may be used.
- on_after_items(data_record: DataRecord)
Executes upon receiving a Datarecord and after processing the selected DataItems
- Parameters:
data_record (DataRecord) – The DataRecord input
- on_before_items(data_record: DataRecord)
Executes upon receiving a DataRecord and before receiving the selected DataItems
- Parameters:
data_record (DataRecord) – The DataRecord input
- on_end()
Executes upon end of graph execution
- on_item(item: DataItem, data_record: DataRecord)
Executes upon receiving a DataItem. Is called after on_before_items().
- Parameters:
item (DataItem) – The DataItem input
data_record (DataRecord) – The DataRecord that the DataItem belongs to
- on_start()
Executes upon start of graph execution
- remove_children()
Removes all children steps
- graphbook.steps.base.StepOutput
A dict mapping of output slot to DataRecord list. Every Step outputs a StepOutput.
alias of
Dict
[str
,List
[DataRecord
]]
- class graphbook.steps.io.DumpJSONL(id, logger, jsonl_path)
- on_after_items(data_record: DataRecord)
Executes upon receiving a Datarecord and after processing the selected DataItems
- Parameters:
data_record (DataRecord) – The DataRecord input
- class graphbook.steps.io.LoadImageDataset(id, logger, image_dir)
- load() Dict[str, List[DataRecord]]
Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.
- class graphbook.steps.io.LoadJSON(id, logger, json_path)
- load() Dict[str, List[DataRecord]]
Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.
- class graphbook.steps.io.LoadJSONAsDataRecords(id, logger, json_path)
- load() Dict[str, List[DataRecord]]
Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.
- class graphbook.steps.io.LoadJSONL(id, logger, jsonl_path, start_from=0)
- load() Dict[str, List[DataRecord]]
Function to load data and convert into DataRecords. Must output a dictionary of DataRecords.
- class graphbook.steps.hf_transformers.HFImageProcessorStep(id, logger, batch_size, item_key, model: AutoModel, processor: AutoImageProcessor)
- on_item_batch(tensors, items, records) Dict[str, List[DataRecord]]
Called when B items are loaded into PyTorch tensors and are ready to be processed where B is batch_size. This is meant to be overriden by subclasses.
- Parameters:
tensors (List[torch.Tensor]) – The list of loaded tensors of length B
items (List[DataItem]) – The list of DataItems of length B associated with tensors. This list has the same order as tensors does along the batch dimension
records (List[DataRecord]) – The list of DataRecords of length B associated with tensors. This list has the same order as tensor does along the batch dimension