AutoKitteh SDK API Reference
This page documents all functions available in the autokitteh
Python SDK module.
Installation
The autokitteh
module is automatically installed in AutoKitteh's virtual environment.
For local development and testing, you can install it with:
pip install autokitteh
Installing the SDK locally is only useful for:
- Type hints and IDE autocompletion
- Testing individual handler functions in isolation
- Local syntax checking and linting
The SDK functions require the AutoKitteh runtime to work. When run outside of AutoKitteh (e.g., locally with plain Python), SDK functions like subscribe()
, next_event()
, start()
, http_outcome()
, etc., will not function properly as they depend on the AutoKitteh execution environment.
For actual workflow execution, you must run your code via ak deploy
on an AutoKitteh server.
Full API documentation is also available at: https://autokitteh.readthedocs.io/
See: Local Development for setting up your IDE and testing workflows.
Core Functions
autokitteh.activity
Decorator to mark a function as an activity. Activities are the unit of durable execution in AutoKitteh.
When to use:
- Functions with non-pickleable arguments or return values (e.g., file handles, database connections)
- Long-running operations
- Functions that perform I/O operations
- When you need to group multiple operations as a single atomic unit
Example:
import autokitteh
@autokitteh.activity
def fetch_data(url):
# This entire function runs as a single activity
response = requests.get(url)
return response.json()
def handler(event):
data = fetch_data("https://api.example.com/data")
print(data)
You cannot use event subscription functions (subscribe
, next_event
) or signals inside an activity.
See: Using the autokitteh.activity Decorator
Session Management
autokitteh.start()
Start a new child session.
Signature:
autokitteh.start(loc: str, data: dict = None, memo: dict = None) -> str
Parameters:
loc
(str): Code location in format"filename:function_name"
data
(dict, optional): Payload to pass to the function (accessible viaevent.data
)memo
(dict, optional): String-to-string dictionary for metadata displayed in the UI
Returns: Session ID (string) of the newly created session
Example:
import autokitteh
def on_webhook(event):
# Start a child session
session_id = autokitteh.start(
"worker.py:process_task",
data={"task_id": 123, "priority": "high"},
memo={"triggered_by": "webhook"}
)
print(f"Started session: {session_id}")
# In worker.py:
def process_task(event):
task_id = event.data.task_id
priority = event.data.priority
print(f"Processing task {task_id} with priority {priority}")
Use cases:
- Spawning parallel workflows
- Delegating work to separate sessions
- Creating background jobs
Event Subscription
autokitteh.subscribe()
Subscribe to events from a connection or trigger.
Signature:
autokitteh.subscribe(connection_or_trigger_name: str, filter: str = None) -> str
Parameters:
connection_or_trigger_name
(str): Name of the connection or trigger defined in your manifestfilter
(str, optional): CEL expression to filter events
Returns: Subscription ID (UUID string)
Example:
import autokitteh
def on_trigger(event):
# Subscribe to Slack messages
sub_id = autokitteh.subscribe(
"slack_conn",
"event_type == 'message_posted' && data.channel == 'C12345'"
)
print(f"Subscribed with ID: {sub_id}")
See: Programmatic Event Handling
autokitteh.next_event()
Wait for and retrieve the next event from one or more subscriptions.
Signature:
autokitteh.next_event(*subscription_ids: str, timeout: timedelta = None) -> Event
Parameters:
*subscription_ids
(str): One or more subscription IDs fromsubscribe()
timeout
(timedelta, optional): Maximum time to wait. If not specified, waits indefinitely
Returns: Event object containing the event data, or None
if timeout occurred
Example - Single subscription:
from datetime import timedelta
import autokitteh
def on_webhook(event):
sub_id = autokitteh.subscribe("webhook_trigger", "data.method == 'POST'")
# Wait up to 5 minutes
event = autokitteh.next_event(sub_id, timeout=timedelta(minutes=5))
if event:
print(f"Received event: {event.data}")
else:
print("Timeout - no event received")
Example - Multiple subscriptions:
def on_trigger(event):
# Wait for either GitHub or Slack event
github_sub = autokitteh.subscribe("github_conn", "event_type == 'push'")
slack_sub = autokitteh.subscribe("slack_conn", "event_type == 'message_posted'")
# Returns whichever event arrives first
event = autokitteh.next_event(github_sub, slack_sub, timeout=timedelta(hours=1))
print(f"Received event from: {event.connection_id}")
Without a timeout, next_event()
will block forever and keep the session running until manually stopped.
autokitteh.unsubscribe()
Cancel an event subscription.
Signature:
autokitteh.unsubscribe(subscription_id: str) -> None
Parameters:
subscription_id
(str): Subscription ID returned fromsubscribe()
Example:
def on_trigger(event):
sub_id = autokitteh.subscribe("slack_conn")
# ... process some events ...
# Clean up when done
autokitteh.unsubscribe(sub_id)
Calling unsubscribe()
is recommended but not required. Unused subscriptions don't burden AutoKitteh significantly.
HTTP Response
autokitteh.http_outcome()
Set the HTTP response for synchronous webhooks.
Signature:
autokitteh.http_outcome(status_code: int = 200, headers: dict = None, body: str = None) -> None
Parameters:
status_code
(int): HTTP status code (default: 200)headers
(dict, optional): Response headersbody
(str, optional): Response body
Example:
import autokitteh
def api_endpoint(event):
# Process the request
data = event.data.body.json
result = process_data(data)
# Return JSON response
autokitteh.http_outcome(
status_code=200,
headers={"Content-Type": "application/json"},
body=json.dumps({"result": result})
)
Use with synchronous webhooks:
triggers:
- name: api
type: webhook
is_sync: true # Enable synchronous mode
call: program.py:api_endpoint
See: Synchronous Webhooks Example
Project Value Store
autokitteh.set_value()
Store a value in the project-wide key-value store.
Signature:
autokitteh.set_value(key: str, value: Any) -> None
Parameters:
key
(str): Key namevalue
(Any): Value to store (must be pickleable)
autokitteh.get_value()
Retrieve a value from the project-wide store.
Signature:
autokitteh.get_value(key: str, default: Any = None) -> Any
Parameters:
key
(str): Key namedefault
(Any, optional): Default value if key doesn't exist
Returns: Stored value or default
autokitteh.del_value()
Delete a value from the store.
Signature:
autokitteh.del_value(key: str) -> None
Parameters:
key
(str): Key name to delete
autokitteh.list_values_keys()
List all keys in the store.
Signature:
autokitteh.list_values_keys() -> list[str]
Returns: List of all keys
Example:
from autokitteh import set_value, get_value, del_value, list_values_keys
def on_webhook(event):
# Store values
set_value("counter", 0)
set_value("last_user", event.data.user)
# Retrieve values
counter = get_value("counter", default=0)
counter += 1
set_value("counter", counter)
# List all keys
keys = list_values_keys()
print(f"Stored keys: {keys}")
# Delete a value
del_value("last_user")
Limitations:
- Values must be pickleable
- Maximum 64 values per project
- Maximum 64KB per value after serialization
- Values are project-scoped (not shared across projects)
See: Project Values Store
Signals
autokitteh.signal()
Send a signal that can be waited on by other sessions.
Signature:
autokitteh.signal(signal_name: str, data: dict = None) -> None
Parameters:
signal_name
(str): Name of the signaldata
(dict, optional): Data to send with the signal
autokitteh.wait_for_signal()
Wait for a signal to be received.
Signature:
autokitteh.wait_for_signal(signal_name: str, timeout: timedelta = None) -> dict
Parameters:
signal_name
(str): Name of the signal to wait fortimeout
(timedelta, optional): Maximum time to wait
Returns: Signal data or None
if timeout
Example:
from datetime import timedelta
import autokitteh
# Session 1: Wait for approval
def workflow(event):
print("Waiting for approval...")
signal_data = autokitteh.wait_for_signal("approval", timeout=timedelta(hours=24))
if signal_data:
print(f"Approved by: {signal_data.get('approver')}")
else:
print("Timeout - no approval received")
# Session 2: Send approval
def approve(event):
autokitteh.signal("approval", {"approver": "admin", "timestamp": "2024-01-01"})
Signals cannot be used inside activities.
See: Signals Module
Data Types
autokitteh.Event
Event object passed to handler functions.
Attributes:
data
: Event payload (dict-like object with attribute access)session_id
: ID of the current session
Example:
def handler(event: autokitteh.Event):
# Access via attributes
body = event.data.body
# Access via dict syntax
body = event['data']['body']
# Get session ID
print(f"Session: {event.session_id}")
autokitteh.AttrDict
Dictionary with attribute-style access used for event data.
Example:
from autokitteh import AttrDict
# For testing event handlers
event = AttrDict({
"data": {
"body": "hello",
"method": "POST"
},
"session_id": "ses_123"
})
handler(event)
Integration Helpers
The SDK provides helper functions for initializing clients for various integrations:
autokitteh.slack.slack_client()
Initialize Slack client.
from autokitteh.slack import slack_client
slack = slack_client("slack_conn_name")
slack.chat_postMessage(channel="#general", text="Hello!")
autokitteh.github.github_client()
Initialize GitHub client.
from autokitteh.github import github_client
github = github_client("github_conn_name")
repo = github.get_repo("owner/repo")
autokitteh.google.google_sheets_client()
Initialize Google Sheets client.
from autokitteh.google import google_sheets_client
sheets = google_sheets_client("sheets_conn_name")
See individual integration documentation for complete details.