Skip to content

API Reference

This section provides detailed API documentation for all PlanAI components. The framework is organized into several core modules:

Base class for all units of work that flow through PlanAI workflows. All data in PlanAI is represented as Task objects with built-in provenance tracking.

Abstract base class for processing tasks. All workers inherit from this class and implement the consume_work method.

The workflow orchestrator that manages task execution, dependencies, and parallelism.

Specialized workers for integrating Large Language Models into workflows, including LLMTaskWorker and CachedLLMTaskWorker.

  • llm_from_config: Factory function for creating LLM instances
  • Tool: Base class for LLM function calling/tools
  • CachedTaskWorker: Base class for workers with caching
  • CachedLLMTaskWorker: LLM worker with response caching
  • InitialTaskWorker: Entry point for workflows
  • JoinedTaskWorker: Aggregates multiple task results
  • SubGraphWorker: Encapsulates graphs as workers
  • PydanticDictWrapper: Utility for wrapping dictionaries as tasks
  • Provenance: Classes for tracking task lineage
  • WorkflowTestHelper: Utilities for testing workflows
  • MockLLM: Mock LLM for unit testing

The main imports are available from the root module:

from planai import (
# Core classes
Task,
TaskWorker,
Graph,
# LLM integration
LLMTaskWorker,
CachedLLMTaskWorker,
llm_from_config,
Tool,
# Advanced workers
InitialTaskWorker,
JoinedTaskWorker,
CachedTaskWorker,
SubGraphWorker,
# Utilities
Dispatcher,
InputProvenance,
)

PlanAI uses Python type hints extensively. All components are designed to work with type checkers like mypy:

from typing import List, Type
from planai import TaskWorker, Task
class MyWorker(TaskWorker):
output_types: List[Type[Task]] = [OutputTask]
def consume_work(self, task: InputTask) -> None:
# Type-safe processing
pass

PlanAI supports asynchronous execution for improved performance:

graph = Graph(name="Async Workflow")
# Workers run concurrently when possible
graph.run(initial_tasks, max_workers=10)

All PlanAI components include comprehensive error handling:

try:
graph.run(initial_tasks)
except WorkflowError as e:
# Handle workflow-specific errors
pass
except Exception as e:
# Handle general errors
pass
  • Explore specific component documentation in the subsections
  • See Examples for practical usage
  • Review the source code for implementation details