Skip to main content

Programmatic Event Handling

Overview

Triggers are static definitions in AutoKitteh projects to start sessions that run Python workflows when certain events are received.

In addition, workflows can receive events programmatically during runtime. This page describes how this works.

Initial Subscription

import autokitteh

subscription_id = autokitteh.subscribe(connection_or_trigger_name, filter)

Subscription ID

The subscribe function call returns a UUID string that represents a specific event source and the exact time of the subscribe call. AutoKitteh queues all the events from the given event source, starting at this point in time. This UUID is used later as a handle to consume these events.

Connection/Trigger Name

Connection and trigger names are defined in the AutoKitteh project. This name identifies the desired event source: a specific third-party connection, or an HTTP webhook.

Filter

The filter string is a single-line CEL (Common Expression Language) expression. This is identical to the filter field in project triggers.

CEL conditions may reference event_type (same as in project triggers). Unlike the event_type field in project triggers, a filter conditions may check more than simple equality, and filters may contain more than a single condition. For example:

// Either 'issue_created' or 'issue_updated' events
event_type == 'issue_created' || event_type == 'issue_updated'

// Any issue-related events (e.g. issue_created / issue_updated),
// but not other entities (e.g. page_created / page_updated)
event_type.startsWith('issue_')

// Any event that relates to a created entity (e.g. issue_created,
// page_created), but not other categories (e.g. issue_updated)
event_type.endsWith('_created')

// More sophisticated string checks
event_type.contains('substring')
event_type.matches('regular expression')

In addition to event types, filters can also check event payloads. For example:

data.method in ['GET', 'HEAD'] && data.url.path.endsWith('/meow')

size(data.collection_value) < 5 || size(data.string_value) > 10

data.list_value[0].bar == 'bar value of first element in foo list'

data.dictionary_value['key'] != 'value'
tip

Data filtering in triggers and subscriptions - when it's possible - is preferable to Python checks in handler functions, because it prevents the creation of unnecessary sessions.

Consuming Events

The next_event function receives one or more subscription ID strings, which were generated by subscribe.

This function is blocking, it returns the data of a single event which was received after the subscribe call(s) that generared the given ID(s). You can call next_event any number of times.

Event order is not guaranteed, they are served in the same order they were received and processed by AutoKitteh.

Example 1 - single subscription ID, without a timeout:

subscription_id = autokitteh.subscribe(connection_or_trigger_name, filter)

event_data = autokitteh.next_event(subscription_id)

Example 2 - single subscription ID, with a timeout:

from datetime import timedelta

subscription_id = autokitteh.subscribe(connection_or_trigger_name, filter)

duration = timedelta(seocnds=10)
event_data = autokitteh.next_event(subscription_id, timeout=duration)

Example 3 - multiple subscription IDs, without a timeout:

sub_id_1 = autokitteh.subscribe(connection_or_trigger_name, filter)
sub_id_2 = autokitteh.subscribe(connection_or_trigger_name, filter)
sub_id_3 = autokitteh.subscribe(connection_or_trigger_name, filter)

event_data = autokitteh.next_event(sub_id_1, sub_id_2, sub_id_3)

Example 4 - multiple subscription IDs, with a timeout:

from datetime import timedelta

sub1 = autokitteh.subscribe(connection_or_trigger_name, filter)
sub2 = autokitteh.subscribe(connection_or_trigger_name, filter)
sub3 = autokitteh.subscribe(connection_or_trigger_name, filter)

duration = timedelta(minutes=1)
event_data = autokitteh.next_event(sub1, sub2, sub3, timeout=duration)

Timeout

This is an optional named parameter.

If you don't specify it, the next_event call will block forever and keep the session running until someone stops the session manually.

When specified, the expected type is a timedelta object.

Cleanup

When you're no longer interested in receiving events from a specific subscription, you may call this function:

autokitteh.unsubscribe(subscription_id)
note

Calling unsubscribe is recommended, but not required. Reasonable amounts of unused event subscriptions do not burden AutoKitteh, especially when the sessions they were created in have ended.

Example

from datetime import timedelta

import autokitteh


def on_trigger(_):
print("Creating an event subscription")
filter = "data.method == 'GET' && data.url.path.endswith('/meow')"
get_sub = autokitteh.subscribe("webhook_name", filter)

print("Waiting for an HTTP GET request without a timeout")
event_data = autokitteh.next_event(get_sub)
print(event_data)

print("Creating another event subscription")
filter = "data.method == 'POST' && data.url.path.endswith('/meow')"
post_sub = autokitteh.subscribe("webhook_name", filter)

print("Waiting for an HTTP GET or POST request with a 1-minute timeout")
delta = timedelta(minutes=1)
event_data = autokitteh.next_event(get_sub, post_sub, timeout=delta)
print(f"Got an HTTP {event_data.method} request: {event_data}")

print("Canceling all event subscriptions")
autokitteh.unsubscribe(get_sub)
autokitteh.unsubscribe(post_sub)

print("Done")

See also this sample project: https://github.com/autokitteh/kittehub/tree/main/samples/runtime_events