Programmatic Event Handling
Overview
Triggers are static definitions in AutoKitteh projects to start sessions that run Python workflows when certain events are received.
In addition, workflows can receive events programmatically during runtime. This page describes how this works.
Initial Subscription
import autokitteh
subscription_id = autokitteh.subscribe(connection_or_trigger_name, filter)
Subscription ID
The subscribe
function call returns a UUID string that represents a specific
event source and the exact time of the subscribe
call. AutoKitteh queues all
the events from the given event source, starting at this point in time. This
UUID is used later as a handle to consume these events.
Connection/Trigger Name
Connection and trigger names are defined in the AutoKitteh project. This name identifies the desired event source: a specific third-party connection, or an HTTP webhook.
Filter
The filter string is a single-line CEL (Common Expression Language)
expression. This is identical to the filter
field in project triggers.
Complete reference: https://github.com/google/cel-spec/blob/master/doc/langdef.md
CEL conditions may reference event_type
(same as in project triggers).
Unlike the event_type
field in project triggers, a filter
conditions may
check more than simple equality, and filters may contain more than a single
condition. For example:
// Either 'issue_created' or 'issue_updated' events
event_type == 'issue_created' || event_type == 'issue_updated'
// Any issue-related events (e.g. issue_created / issue_updated),
// but not other entities (e.g. page_created / page_updated)
event_type.startsWith('issue_')
// Any event that relates to a created entity (e.g. issue_created,
// page_created), but not other categories (e.g. issue_updated)
event_type.endsWith('_created')
// More sophisticated string checks
event_type.contains('substring')
event_type.matches('regular expression')
In addition to event types, filters can also check event payloads. For example:
data.method in ['GET', 'HEAD'] && data.url.path.endsWith('/meow')
size(data.collection_value) < 5 || size(data.string_value) > 10
data.list_value[0].bar == 'bar value of first element in foo list'
data.dictionary_value['key'] != 'value'
Data filtering in triggers and subscriptions - when it's possible - is preferable to Python checks in handler functions, because it prevents the creation of unnecessary sessions.
Consuming Events
The next_event
function receives one or more subscription ID strings, which
were generated by subscribe
.
This function is blocking, it returns the data of a single event which was
received after the subscribe
call(s) that generared the given ID(s). You can
call next_event
any number of times.
Event order is not guaranteed, they are served in the same order they were received and processed by AutoKitteh.
Example 1 - single subscription ID, without a timeout:
subscription_id = autokitteh.subscribe(connection_or_trigger_name, filter)
event_data = autokitteh.next_event(subscription_id)
Example 2 - single subscription ID, with a timeout:
from datetime import timedelta
subscription_id = autokitteh.subscribe(connection_or_trigger_name, filter)
duration = timedelta(seocnds=10)
event_data = autokitteh.next_event(subscription_id, timeout=duration)
Example 3 - multiple subscription IDs, without a timeout:
sub_id_1 = autokitteh.subscribe(connection_or_trigger_name, filter)
sub_id_2 = autokitteh.subscribe(connection_or_trigger_name, filter)
sub_id_3 = autokitteh.subscribe(connection_or_trigger_name, filter)
event_data = autokitteh.next_event(sub_id_1, sub_id_2, sub_id_3)
Example 4 - multiple subscription IDs, with a timeout:
from datetime import timedelta
sub1 = autokitteh.subscribe(connection_or_trigger_name, filter)
sub2 = autokitteh.subscribe(connection_or_trigger_name, filter)
sub3 = autokitteh.subscribe(connection_or_trigger_name, filter)
duration = timedelta(minutes=1)
event_data = autokitteh.next_event(sub1, sub2, sub3, timeout=duration)
Timeout
This is an optional named parameter.
If you don't specify it, the next_event
call will block forever and keep the
session running until someone stops the session manually.
When specified, the expected type is a timedelta object.
Cleanup
When you're no longer interested in receiving events from a specific subscription, you may call this function:
autokitteh.unsubscribe(subscription_id)
Calling unsubscribe
is recommended, but not required. Reasonable amounts of
unused event subscriptions do not burden AutoKitteh, especially when the
sessions they were created in have ended.
Example
from datetime import timedelta
import autokitteh
def on_trigger(_):
print("Creating an event subscription")
filter = "data.method == 'GET' && data.url.path.endswith('/meow')"
get_sub = autokitteh.subscribe("webhook_name", filter)
print("Waiting for an HTTP GET request without a timeout")
event_data = autokitteh.next_event(get_sub)
print(event_data)
print("Creating another event subscription")
filter = "data.method == 'POST' && data.url.path.endswith('/meow')"
post_sub = autokitteh.subscribe("webhook_name", filter)
print("Waiting for an HTTP GET or POST request with a 1-minute timeout")
delta = timedelta(minutes=1)
event_data = autokitteh.next_event(get_sub, post_sub, timeout=delta)
print(f"Got an HTTP {event_data.method} request: {event_data}")
print("Canceling all event subscriptions")
autokitteh.unsubscribe(get_sub)
autokitteh.unsubscribe(post_sub)
print("Done")
See also this sample project: https://github.com/autokitteh/kittehub/tree/main/samples/runtime_events