Skip to content

Basic Usage

PlanAI is a powerful framework for creating complex, AI-enhanced workflows using a graph-based architecture. This guide will walk you through the basic concepts and provide examples of how to use PlanAI effectively.

The fundamental building block of a PlanAI workflow. It defines how to process input and produce output.

A structure that defines the workflow by connecting TaskWorkers.

The unit of work that flows through the graph. It carries data and provenance information.

A mechanism to track the history and origin of each Task as it moves through the workflow.

A special type of TaskWorker that integrates with Language Models.

Here’s a basic example of how to create and execute a simple workflow:

from typing import List, Type
from planai import Graph, Task, TaskWorker
# Define custom TaskWorkers
class DataFetcher(TaskWorker):
output_types: List[Type[Task]] = [FetchedData]
def consume_work(self, task: FetchRequest):
# Fetch data from some source - needs to be implemented
data = self.fetch_data(task.url)
self.publish_work(FetchedData(data=data), input_task=task)
class DataProcessor(TaskWorker):
output_types: List[Type[Task]] = [ProcessedData]
def consume_work(self, task: FetchedData):
# Process the fetched data
processed_data = self.process(task.data)
self.publish_work(ProcessedData(data=processed_data), input_task=task)
# Create a graph
graph = Graph(name="Data Processing Workflow")
# Initialize tasks
fetcher = DataFetcher()
processor = DataProcessor()
# Add tasks to the graph and set dependencies
graph.add_workers(fetcher, processor)
graph.set_dependency(fetcher, processor)
# Let the graph collect all tasks published
# by the processor with the type ProcessedData
graph.set_sink(processor, ProcessedData)
# Run the graph
initial_request = FetchRequest(url="https://example.com/data")
graph.run(initial_tasks=[(fetcher, initial_request)])
# Get the outputs
outputs = graph.get_output_tasks()

PlanAI allows you to easily integrate AI capabilities into your workflow using LLMTaskWorker:

from planai import LLMTaskWorker, llm_from_config
class AIAnalyzer(LLMTaskWorker):
prompt = "Analyze the processed data and provide insights."
llm_input_type: Type[Task] = ProcessedData
output_types: List[Type[Task]] = [AnalysisResult]
# Initialize LLM
llm = llm_from_config(provider="openai", model_name="gpt-4o")
# Add to workflow
ai_analyzer = AIAnalyzer(llm=llm)
graph.add_worker(ai_analyzer)
graph.set_dependency(processor, ai_analyzer)

PlanAI provides powerful input provenance tracking capabilities, allowing you to retrieve any data previously processed in the graph:

class AnalysisTask(TaskWorker):
output_types: List[Type[Task]] = [AnalysisResult]
def consume_work(self, task: ProcessedData):
# Complete provenance - not usually required
provenance = task.copy_provenance()
# Find a specific input task - used frequently
original_data = task.find_input_task(FetchedData)
# Get the immediately previous input task
previous_task = task.previous_input_task()
# Get the provenance chain for a specific task type
fetch_provenance = task.prefix_for_input_task(DataFetcher)
if fetch_provenance is None:
raise ValueError("No fetch provenance found")
# Perform analysis using the provenance information
result = self.analyze(task.data, original_data)
self.publish_work(AnalysisResult(result=result), input_task=task)

Input provenance allows you to:

  • Trace the full history of a Task
  • Find specific input tasks in the provenance chain
  • Access the immediately previous input task
  • Get the provenance chain for a specific task type

This feature is particularly useful for complex workflows where understanding the origin and transformation of data is crucial.

Use CachedTaskWorker to avoid redundant computations:

from planai import CachedTaskWorker
class CachedProcessor(CachedTaskWorker):
output_types: List[Type[Task]] = [ProcessedData]
def consume_work(self, task: FetchedData):
# Processing logic here
pass

JoinedTaskWorker allows you to combine results from multiple upstream tasks:

from planai import JoinedTaskWorker, InitialTaskWorker
class DataAggregator(JoinedTaskWorker):
output_types: List[Type[Task]] = [AggregatedData]
join_type: Type[TaskWorker] = InitialTaskWorker
def consume_work_joined(self, tasks: List[ProcessedData]):
# Aggregation logic here
pass

When instantiating DataAggregator, you need to specify a TaskWorker as join_type. The provenance prefix produced by the worker specified by the join_type will be the key for the join operation. Once all provenance for the particular provenance prefix has left the graph, the consume_work_joined method will be called with all the tasks that have the same provenance prefix.

PlanAI allows you to create nested workflows by encapsulating an entire graph as a single TaskWorker using SubGraphWorker. This enables modular, reusable, and composable subgraphs within a larger graph. At the moment, a subgraph is allowed to have only one entry and one exit worker. The expected input and output types need to be provided via code and documentation. In particular, it needs to be possible to python import the input and output types of the subgraph.

Example:

from planai import Graph
from planai.graph_task import SubGraphWorker
# Import or define your TaskWorker classes
from my_workers import Task1Worker, Task2Worker, Task3Worker, Task1WorkItem
# 1. Define a subgraph
sub_graph = Graph(name="SubGraphExample")
worker1 = Task1Worker()
worker2 = Task2Worker()
sub_graph.add_workers(worker1, worker2)
sub_graph.set_dependency(worker1, worker2)
sub_graph.set_entry(worker1)
sub_graph.set_exit(worker2)
# 2. Wrap the subgraph as a TaskWorker
subgraph_worker = SubGraphWorker(name="ExampleSubGraph", graph=sub_graph)
# 3. Integrate into the main graph
main_graph = Graph(name="MainWorkflow")
final_worker = Task3Worker()
main_graph.add_workers(subgraph_worker, final_worker)
main_graph.set_dependency(subgraph_worker, final_worker)
main_graph.set_entry(subgraph_worker)
main_graph.set_exit(final_worker)
# 4. Run the main graph
initial_input = Task1WorkItem(data="start")
main_graph.run(initial_tasks=[(subgraph_worker, initial_input)])
  1. Modular Design: Break down complex tasks into smaller, reusable TaskWorkers.
  2. Type Safety: Use Pydantic models for input and output types to ensure data consistency.
  3. Error Handling: Implement proper error handling in your TaskWorkers to make workflows robust.
  4. Logging: Utilize PlanAI’s logging capabilities to monitor workflow execution.
  5. Testing: Write unit tests for individual TaskWorkers and integration tests for complete workflows.

For more detailed examples and advanced usage, please refer to the examples directory in the PlanAI repository.