Reactive Programming Patterns: Examples from Fuse


Summary

Reactive programming requires new programming techniques and methods. This post describes common patterns in asynchronous reactive programming and gives examples from the open-source code base for the Fuse connected car platform. While the examples below are based on the pico platform, they are equally applicable to other Actor-based reactive programming platforms like Akka.

Radioactive Minifig

Microservices are hot and, consequently, so is reactive programming. Reactive programming, particularly the actor-model, is a natural way to build systems that have the asynchrony and data isolation that good microservices demand. Reactive programming requires different skills from what most programmers learn to complete more traditional programming projects. One way to understand reactive programming is to study common reactive programming patterns.

I realized as I was getting ready to teach my CS462 class about reactive programming patterns, that I'd used each of the patterns I wanted to talk about as part of the Fuse Connected Car project we built at Kynetx in 2014. The Fuse API code is open-source and available on Github. So, Fuse is an excellent resource for showing off these important patterns.

After some background on the technical details of Fuse and the system it's built in, we'll review a selection of reactive programming patterns and show code examples from Fuse that illustrate the pattern for both intra-pico and pico-to-pico interactions.

Fuse Background

Fuse is a connected-car platform. I've written about extensively on Fuse. For the purposes of this post, it's important to understand the following:

  • Fuse is build using a reactive programming system called picos (short for persistent-compute objects). Picos are based on the actor model and most pico interaction is event-driven.
  • Picos use a set of pre-built services called Wrangler to manage things like creating and destroying picos, pico-to-pico subscriptions, storing profiles, and so on.
  • Pico functionality is determined by the rules installed in the pico.
  • Rules are collected into rulesets that can share function definitions.
  • Each ruleset has a separate persistent key-value store from every other ruleset. This is manifest in persistent variables. As such most pico-based system have no need of an external database.
  • Rules are programmed in a language called KRL.
  • When we create a pico, it is automatically endowed with an event bus that connects all rules installed in the pico.
  • Fuse creates a separate pico for each vehicle.
  • Wrangler provides a ruleset that functions as a persistent data store for the entire pico called the PDS. The PDS provides a standard profile for each pico. Fuse stores all of the vehicle's configuration and identity information in the pico profile.
  • Other vehicle data is stored by the individual Fuse service. For example, the trip service stores information about trips, the fuel service stores information about fuel purchase, and so on.
  • Rules can raise events on the pico's internal event bus, sent events to other picos, and use HTTP to interface with other Web-based APIs.

Not only do we create a pico for each vehicle, but we also create one for each owner, and one to represent the fleet. The following diagram shows schematically how these various picos are related to each other.

Fuse System Pico Graph
Fuse pico arrangement

Fuse uses a service called Carvoyant to manage devices and provide an API that is used to get vehicle data. The Carvoyant API is a well-designed RESTful API that uses OAuth for user authorization.

Carvoyant accounts correspond roughly to the concept of a fleet in Fuse. Each account has a collection of vehicles as well as information about the owner. Each vehicle pico in Fuse is linked through a set of event subscriptions (using a specialization of the webhook pattern called the Evented API) to the vehicle in Carvoyant as shown in the following diagram:

Fuse fleet Carvoyant correspondence
Correspondence between Fuse Fleet and Carvoyant Account

Whenever the device in a vehicle detects something interesting (e.g. a change in ignition status, entering or leaving a geofence, key parameters like fuel level going over or under a threshold, and so on), it communicates that state change to Carvoyant via a cellular network. Carvoyant is programmed to raise that event, via an event subscription to the corresponding vehicle pico.

Intra-Pico Event Patterns

Every pico has an internal event bus. Any event sent to the pico via one of its event channels is raised on the internal bus. Any rule installed in the pico that selects on that event will run in response to the raised event. Rules in the pico can also raise events to the internal event bus.

There are several common patterns used by KRL rules to process events. Here's a list of the patterns discussed in this section:

Raising Events

Any rule can raise an event on the internal event bus using the raise command. Any rule installed in the pico can see that event and if it selects on that event, will run.

raising events
Raising Events

Here's the set_debug_pref rule from the Fuse Owner ruleset showing a raise command in the postlude:

rule set_debug_pref {
  select when fuse new_debug_value
  pre {
    new_val = event:attr("debug_value");
  }
  always {
    raise pds event "new_settings_attribute" 
          with setRID   = meta:rid() // this rid
           and setAttr  = "debugPreference"
           and setValue = new_val
  }
}

Commentary

  • This rule shows a new_settings_attribute event being raised with a specific set of event attributes.
  • The raised event can contain computed event attributes
  • This rule postlude is introduced with the keyword always meaning the postlude statements will always execute if the rule is selected.

In addition to the with..and... syntax shown here, attributes can be sent with the event using the attributes keyword which accepts a map (hash) that gives the attribute names and values1. If we rewrote the preceding rule to use that format, it would look like this:

rule set_debug_pref {
  select when fuse new_debug_value
  pre {
    settings =  {"setRID"   : meta:rid(), // this rid
                 "setAttr"  : "debugPreference",
                 "setValue" : event:attr("debug_value")
                };
  }
  always {
    raise pds event "new_settings_attribute" attributes settings
  }
}

There's no functional difference between the two forms. The latter allows the entire set of attributes to be computed (including attribute names) rather than just the values.

As you read the following examples that use raise, keep in mind that raise puts events on the pico's internal event bus. Both the event bus and persistent storage of each pico are isolated from those of other picos.

Rule Chaining and Abstraction

Often when a rule raises an event, the goal is to signal a general state change in the pico. For example, a rule that is updating the profile might raise the profile_updated event. Any other rule in the pico that is interested in knowing when the profile has been updated can listen for that event and do any necessary processing.

Sometimes, however, the reason for raising an event is more tactical. We simply need to do something that takes two or more rules. This is called rule chaining.

raising events
Rule Chaining

In rule chaining, rule A is raising an event with the explicit goal of causing rule B to be selected.

As an example of this, consider the following two rules (slightly simplified) from the Fuse Fuel ruleset:

rule record_fuel_purchase {
  select when fuse new_fuel_purchase
  pre {
    // new records can't have id
    rec = event:attrs().delete(["id"]); 
  }  
  send_directive("Recording fill up") with rec = rec
  fired {
    raise fuse event "updated_fuel_purchase" attributes rec; 
  }
}
  
rule update_fuel_purchase {
  select when fuse updated_fuel_purchase
  pre {

    // if no id, assume new record and create one
    id = event:attr("id") || random:uuid();  
    // build a fuel purchase record
    ...
  }
  if( not volume.isnull() ...
   && not id.isnull()
    ) then {
      send_directive("Updating fill up") with rec = rec
  }
  fired {
    set ent:fuel_purchases{id} rec;
    raise fuse event "fuel_purchase_saved";
  }
}

Commentary:

  • The second rule, update_fuel_purchase, does all the work of storing a fuel purchase record. It's designed to either update an existing record or create a new record when the id attribute is null.
  • The first rule, record_fuel_purchase, is for new fuel purchase records, but all it does is ensure that the incoming record doesn't for some reason, have an id.
  • record_fuel_purchase raises an explicit event to chain to update_fuel_purchase.

Rule chaining is one way for a rules to avoid repeating logic (logical coupling). There's only one place where a fuel purchase record is created and stored even though there are two different events that signal a new_fuel_purchase or an updated_fuel_purchase. We're using rule chaining to abstract the updated_fuel_purchase event.

Guard Rules and Idempotence

One of the most important reasons to chain rules is to guard the action a rule takes to ensure idempotence. Pico-based systems are easier to program when responses to an event are idempotent, meaning that they can run multiple times without cumulative effect.

Many operations are idempotent (i.e. installing a ruleset in a pico over and over only results in the ruleset being added once). For operations that aren't naturally idempotent, we can make the rule idempotent using the rule's guard condition. Using a guard condition we can ensure the rule only fires when specific conditions are met.

There may be reasons why just using the guard condition in the rule isn't satisfactory. First, there are several library calls that are poorly designed and cause side effects in the prelude.2. Second, even when side-effecting code isn't an issue, we might not want to execute an entire, computationally heavy prelude before checking the condition.

guard rule pattern
The Guard Rule Pattern

A guard rule uses rule chaining so that the guard rule can test the guard condition and only raise an event for the second rule when it passes. The guard rule:

  1. responds to the triggering event
  2. tests a condition that ensures idempotence
  3. raises an explicit event in the postlude for which the second rule is listening

Here's an example from the Fuse Owner ruleset. This rule is called when a new Fuse owner account is created.

rule kickoff_new_fuse_instance {
  select when fuse need_fleet
  pre {
    fleet_channel = pds:get_item(common:namespace(),"fleet_channel");
  }
  if(fleet_channel.isnull()) then
    send_directive("requesting new Fuse setup");
  fired {
    raise explicit event "need_new_fleet" 
	  with _api = "sky"
	   and fleet = event:attr("fleet") || "My Fleet";
  }
}

Commentary:

  • When a new account is created, we want to create a fleet pico. But we want that action to be idempotent since any Fuse account should have one and only one fleet pico.
  • The rule that creates the fleet pico, create_fleet, is listening for the need_new_fleet event.
  • We cannot enforce idempotence with the conditional statement in the create_fleet rule because the common:factory() call has the side effect of creating a new child pico.
  • We use the existence of the fleet channel to test for whether or not the owner has already been initialized. If the fleet channel is null we assume the owner pico needs to be initialized.

Self-Healing Systems

Self-healing systems are a hallmark of robust system design. Rules can watch triggering events, check pico state, and raise an event that signals the problem. The trigger could be some regularly occurring event or a scheduled event. Regardless, we only want to signal a problem if one exists. A guard rule is a natural fit here.

self healing
Guard Rules and Self Healing

In this pattern, one or more rules listen for the event that signals a problem and raise an event that signals the problem so other rules can take some action to fix the break.

Here's an example from Fuse that illustrates a check for a problem that needs to be corrected. Fuse subscribes to events from Carvoyant like ignition_status that signal some change in the vehicle. These subscriptions are set up when the vehicle is initialized. They can become dirty for various reasons:

  • The vehicle event channel could change and the subscriptions to Carvoyant need to be updated to send events to the pico over the new channel.
  • The design of Fuse might change necessitating new or different event subscriptions to Carvoyant.
  • Something else might go wrong in either Fuse of Carvoyant that requires the subscriptions be resynchronized.

Since picos are independent systems they cannot be updated en masse using, for example, an SQL statement. Rather we require that they fix themselves when a fix is needed. For example, adding new functionality to Fuse sometimes requires changing the event subscriptions that a Fuse vehicle pico has to the Carvoyant API. The Fuse system is designed to automatically fix subscriptions so that they match the specification of needed Carvoyant events.

rule check_subscriptions {
  select when fuse subscription_check
  pre {
    vid = carvoyant:vehicle_id(); 
    my_subs = carvoyant:getSubscription(vid);
  }
  if( not subscriptionsOk(my_subs) ) then
  {
    send_directive("subscriptions not OK")
      with my_subscriptions = my_subs
       and should_have = should_have
  }
  fired {
    raise carvoyant event dirty_subscriptions;
  }
}

Commentary:

  • The primary work is done by the subscriptionsOk() predicate. That predicate checks all the needed event subscriptions (from a list) against the existing subscriptions and ensures they're all there.
  • The rules that respond to the dirty_subscriptions event use the same list to find any missing or bad subscriptions and recreate them.

Event Splitting

Event splitting is useful when a single event can signal different state changes based on its attributes or the current state of the receiving pico.

event splitting pattern
The Event Splitting Pattern

For example, in Fuse, the Carvoyant system sends an ignitionStatus event when the the ignition key is turned on or off. The event includes an event attribute, status, that indicates how the ignition state changed.

Fuse treats these two states differently. The ignition being turned on indicates the start of a trip. The Carvoyant system manages trip tracking using telemetry data from the in-vehicle unit, so Fuse doesn't have to do anything with a trip when the ignition is turned on. Fuse does, however, use this as a signal to do a few clean-up and self-healing procedures. For example, the system checks for missed trips when the vehicle starts up and grabs the current vehicle status. We use the ignition being turned on as a convenient (and usually frequent) signal to accomplish tasks while the vehicle pico is not otherwise busy.

On the other hand, when the ignition is turned off, Fuse processes the trip that has just completed, saving it for later use. The ignitionStatus event doesn't contain all of the trip information, it merely signals the state change. Fuse uses Carvoyant's API to gather the detailed trip information, process it, and store the results.

The following two rules from the Fuse Carvoyant ruleset show how Fuse distinguishes and processes these two different state changes:

rule process_ignition_on  {  
  select when carvoyant ignitionStatus where status eq "ON"
  pre {
    ignition_data = normalize_carvoyant_attributes(event:attrs());
  }
  noop();
  always {
    raise fuse event "trip_check" with duration = 2; // recover lost trips
    raise fuse event ignition_processed attributes ignition_data;
  }
}

rule process_ignition_off {  
  select when carvoyant ignitionStatus where status eq "OFF"
  pre {
    tid = event:attr("tripId");
    ignition_data = normalize_carvoyant_attributes(event:attrs());
  }
  if not tid.isnull() then noop();
  fired {
    raise fuse event "new_trip" with tripId = tid;
    raise fuse event ignition_processed attributes ignition_data;
  } else {
    error warn "No trip ID " + ignition_data.encode();
  }
}

Commentary:

  • We use an attribute expression (signaled by the where keyword) in the event expression to split the event and process it differently.
  • The attribute expression creates an event expression that only selects when the specified condition is true.
  • In this case there are only the two options, but you could split an event multiple ways using this technique.

This example uses the event expression to split the incoming event, but you could do it in the rule body as well, computing different events to be raised based on the incoming event, the current state or the pico, or even external information from an API call.

Event Logging

Event logging is a great example of the power of loose coupling in event-based systems. A logging rule can log important information for notification, monitoring, or support on the side without inserting the logging logic into the primary event flow. You can add multiple logging rules, change them as needed, or delete them without touching the base system.

event logging pattern
The Event Logging Pattern

The finalize_new_users rule in the Fuse Owner ruleset demonstrates this:

rule finalize_new_users {
  select when fuse new_fleet_initialized
	  and pds profile_updated
  pre {
    me = pds:get_all_me();
    my_email =  me{"myProfileEmail"} || random:uuid();
    msg = <<
A new fleet was created for #{me.encode()} with ECI #{meta:eci()}
>>;
  }
  sendgrid:send("Kynetx Fleet Team", "fuse-support@kynetx.com",
                "New Fuse Fleet", msg);
  always {
    set app:fuse_users{my_email} makeAcctRecord(me)
  }
}

Commentary:

  • This rule is selected the first time a new Owner pico is set up (signalled by the new_fleet_initialized event).
  • The event expression also requires the profile_updated event (using the and event operator) so that it won't run until after initialization is complete.
  • The rule sends an email to fuse-support using Sendgrid and records a record in an application persistent variable (app:fuse_users).

Note that because the pico's event bus is isolated, there's no danger that a new_fleet_initialized event will combine with a profile_updated event from another pico and mistakenly select. This rule selects only when both those events happen (in any order) in this pico.

Event Errors

Rules can raise error events in the rule postlude using the error statement. Rules that detect an error condition raise an error event and rules programmed to handle errors responded. A set of rules can declare a specific ruleset for handling errors using the errors to pragma in the meta block of the ruleset. Error events, unlike events generated by the raise statement, are routed directly to the designated error handling ruleset.

error event pattern
The Error Event Pattern

The Fuse error handling ruleset is fairly simple-minded. There is one rule, handle_error, that responds to all errors by mailing them to fuse-support. The message contains detailed information about what went wrong.

rule handle_error {
    select when system error 
    pre {
	genus = event:attr("genus");
	species = event:attr("species") || "none";
	level = event:attr("level");
	rid = event:attr("error_rid");
	rule_name = event:attr("rule_name");
	msg = event:attr("msg");
	eci = meta:eci();
	session = CloudOS:currentSession() || "none";
	ent_keys = rsm:entity_keys().encode();
	kre = meta:host();

	send_email=true;

	error_email_body = <<
A Fuse error occurred with the following details:
  Time: #{time:now()}

  RID: #{rid}
  Rule: #{rule_name}
  Host: #{kre}
  

  level: #{level}
  genus: #{genus}
  species: #{species}
  message: #{msg}

  eci: #{eci}
  txn_id: #{meta:txnId()}
  PCI Session Token: #{session}
  RSM Entity Keys: #{ent_keys}
>>;
    }
    if (send_email) then
	sendgrid:send(to_name, to_addr, subject, error_email_body);
}

Commentary:

  • Most of the information sent in the email comes from event attributes automatically generated when the error event is raised.
  • The email body is composed using extended quoting and beestings.

When Fuse was young, this was quite handy since I was told about things that went wrong and was able to track them down.

More sophisticated error handling can attempt to fix the problem and retry. For example, occasionally Carvoyant signals that the ignition has been turned off and doesn't include a trip ID. Without a trip ID, Fuse cannot query the Carvoyant API for the trip's parameters. Because this is caused by a race condition in the Carvoyant system, the trip can be recovered after waiting a short period. The no_trip_id rule in the Fuse error handling ruleset schedules a retry for recovering a trip in one minute:

rule no_trip_id {
  select when system error where msg.match(re/No trip ID/)
  sendgrid:send(to_name, to_addr, subject,
                "Scheduling retry for " + event:attr("msg"));
  always {
    schedule fuse event "trip_check" at time:add(time:now(),{"minutes" : 1}) 
       with duration = 1; 
  }
}

Commentary:

  • The event expression uses an attribute expression to only select when the error signals that it wa caused by no trip ID. This is simply matching the text of the error message using a regular expression.
  • The schedule command is similar to the raise command except that it gives a time for raising the event instead of raising the event immediately.
  • In this case we're calculating the time for scheduling the event to be one minute from now.

Event Abstraction

Because of the nature of rule languages, you may often write several rules that have the same event expression. This goes against the grain of programmers of traditional programming languages where repeating yourself is not only wasteful but leads to code maintenance problems (logical coupling). The answer is to abstract the portions of those multiple rules that are repetitive and that are apt to be changed frequently.

rule abstraction
The Rule Abstraction Pattern

The post_process_ignition rule from the Fuse Carvoyant ruleset is a good example of this. You'll recall from the Event Splitting pattern that there are separate rules for when the ignitionStatus event is raised with status equal to "ON" and when it's "OFF". Even so, there are some things we want to happen as part of both of those rules. The answer is to have a single rule that does these common activities that is selected after either of the rules for processing the ignitionStatus event have fired.

rule post_process_ignition {
  select when fuse ignition_processed
  pre {
    ignition_data = event:attrs();
  }
  noop();
  always {
    raise fuse event "need_vehicle_status";
    raise pds event "new_data_available"
	attributes {
	  "namespace": namespace(),
	  "keyvalue": "ignitionStatus_fired",
	  "value": ignition_data,
	  "_api": "sky"

	};
  }
}

Commentary:

  • This rule selects on the ignition_processed event which all the rules for processing the ignitionStatus event raise.
  • The rule raises several events in response to the incoming event, abstracting the multiple events into a single event using a rule.

Similar to the example in the Rule Chaining and Abstraction section above, we are avoiding logical coupling by not repeating logic that then has to be updated in multiple places. The key difference in this pattern and that one is that we're doing it specifically to abstract the events.

Enriching Events

Events are often quite sparse in the information they carry with them. Because events can be frequent, they are also usually lightweight. In addition, events are often broadcast on an event bus and event buses are usually kept simple for performance and administrative reasons. Consequently, events may be seen by systems not authorized for the full payload that could accompany the event.

event enrichment pattern
The Event Enrichment Pattern

Event enrichment deals with these issues by adding information to an event before passing it along to other rules in the system. The save_trip rule in Fuse Trips is an example of this:

rule save_trip {
  select when fuse new_trip 
  pre {
    vid = carvoyant:vehicle_id();
    raw_trip_info = carvoyant:tripInfo(incoming{"tripId"}, vid);
    tid = mkTid(raw_trip_info{"id"}).klog(">>>>> trip ID >>>>>");

    trip_info = raw_trip_info.delete(["data"]);
    
    ...
    
    final_trip_info = trip_info
		 .put(["cost"], trip_summary{"cost"})
		 .put(["interval"], trip_summary{"interval"})
		 .put(["avgSpeed"], trip_summary{"avgSpeed"})
		 .put(["name"], trip_name)
		 .put(["category"], trip_category)
		 ;
  }
  if( end_time neq "ERROR_NO_TIMESTAMP_AVAILABLE" 
   && trip_info{"mileage"} > 0.01
    ) then
  {send_directive("Adding trip #{tid}") with 
    end_time = end_time and
    trip_summary = trip_summary
    ;
  }
  fired {
    raise fuse event trip_saved with 
      tripId = tid and
      tripSummary = trip_summary;
    set ent:trips_by_id{tid} final_trip_info;
    set ent:trip_summaries{tid} trip_summary;
  } else {
    ...
  }
}

Even though I've simplified this rule, it's still fairly complex. I've left in the some of the complexity so illustrate the idea of enrichment.

  • The rule prelude makes a call to the Carvoyant API with the tripId to get details about the trip that was just completed.
  • The prelude computes things like trip cost and average speed and adds those to the trip information.
  • It also compares this trip to past trips to see if it should be categorized or named.
  • Finally, in addition to saving that information in a persistent entity variable, it raises a new event, trip_saved, that has been enriched with new information.

Note that the rule condition ensures that the enriched event is only raised if the length of the trip is greater than 0.01 miles as a way of ensuring that we don't record trips when the vehicle ignition was simply turned on and off without the vehicle moving. The condition also checks for other error conditions. Consequently, the enriched event is also a surer signal that a trip happened.

The update_vehicle_data rule in the Fuse Vehicle ruleset is another example of gathering information from various places to create a richer event, updated_vehicle, that contains significantly more information than the original rule.

Semantic Translation

Semantic translation is a special kind of event enrichment. Semantic translation interprets an event in light the context in which it is raised. The result is new event that has a different, usually richer meaning.

For example, an event that says you're at the airport could be turned into an event that says you're leaving on a trip, if the rule can confirm you have a ticket or your calendar shows a trip. An event that signals leaving on a trip is more specific and thus more meaningful than one that merely indicates presence at the airport. There might be some rules that only care that you're at the airport, but leaving on a trip indicates a different intention.

semantic translation pattern
The Semantic Translation Pattern

The process_ignition_off rule from the Fuse Carvoyant ruleset gives an example of this.

rule process_ignition_off {  
  select when carvoyant ignitionStatus where status eq "OFF"
  pre {
    tid = event:attr("tripId");
    ignition_data = normalize_carvoyant_attributes(event:attrs());
  }
  if not tid.isnull() then noop();
  fired {
    raise fuse event "new_trip" with tripId = tid;
    raise fuse event ignition_processed attributes ignition_data;
  } else {
    error warn "No trip ID " + ignition_data.encode();
  }
}

As we've seen, this rule selects when the ignitionStatus event is raised and the event attribute status is equal to "OFF". If the event also has a tripId, we interpret this as the signal that a trip has ended and raise the new_trip event.

Pico-to-Pico Event Patterns

Pico-to-pico events are the primary way that picos interact. Picos can send messages that contain events to other picos. Most interesting applications for picos are built from systems of picos cooperating to solve a problem. Carl Hewitt, the inventor of the Actor model of programming said "One actor is no actor. Actors come in systems."

Picos send events to a specific pico on a specific channel. There is a difference between raising an event on the internal event bus and sending an event to another pico. Raising an event inside the pico effectively broadcasts the event to all of the rules installed in the pico. On the other hand, pico-to-pico events are point-to-point, from one pico to another. A pico can have as many incoming and outgoing event channels as are needed for receiving events from and sending events to other picos.

pico_to_pico_events
Pico To Pico Events

Once an event is received by a pico, it is raised on the pico’s internal event bus and any installed rule can select if its event expression is met. As we’ve seen, the state of one picos is completely isolated from any other.

As we discussed in the Background section, Fuse consists of at least three picos representing the owner, the fleet, and one for each vehicle. These picos interact to provide Fuse's functionality. The following sections describe some common event patterns with examples from the Fuse code base.

Correlating Events

When one pico sends an event to another, it often is expecting an asynchronous response. A correlation identifier can be used to associate these two events. A correlation identifier links conversational state in asynchronous interactions.

event_correlation
event correlation

Correlation identifiers are passed as event attributes and can be any string that is unique within the conversation.

Rules use a correlation identifier to ensure that two processes don't get confused with one another. For example, the correlation identifier can be used in the pico's persistent state to keep data about different conversations separate.

The following rule from the Fuse Fleet ruleset shows the calculation and use of an correlation number as part of creating Fuse fleet reports. The rule has been simplified to emphasize the idea of correlation numbers.

rule start_periodic_report {
  select when fuse periodic_report_start
  pre {
    new_rcn = genCorrelationNumber();
    rcn = event:attr("report_correlation_number")
	       .defaultsTo(new_rcn);
    ...
    report_data = {"period": period,
		   "start": start,
		   "end": end,
		   "timezone": tz};
    augmented_attrs = event:attrs()
                       .put(["report_correlation_number"], rcn);
  }
  fired {
    raise explicit event periodic_report_routable
      with attributes augmented_attrs;
    set ent:report_data{rcn} report_data;
    schedule explicit event "periodic_report_timer_expired"
      at time:add(time:now(),{"minutes" : 2}) 
      attributes {"report_correlation_number": rcn, "timezone": tz}
  }
}

Commentary:

  • The correlation number is generated by a function that ensures that it's a unique string.
  • The implicit understanding is that any rule that sees the correlation number will pass it along so that every player can correlate their actions. Future versions of Wrangler, the pico operating system, will provide more automation for correlation.
  • The correlation number is used in internal events (the raise in the postlude).
  • The rule stores the correlation number for later use. In this case it's used as a key for storing other information in a persistent variable (the report_data) that will be used later for interactions involving this event.

Event Recipient Lists

Event or message routing is a critical task in reactive systems. The simplest way to route events to other picos is to keep a list of picos. Sending events to each of the picos on a recipient list is a simple matter. In this pattern one rule sends the same event to each of a number of other picos. The recipient list is analogous to the To: list on an email message.

event_recipient_list
Event Recipient List

The recipient list can be static, based on a particular configuration of picos in the computation, or it can be calculated in various ways. Computed recipient lists allow a pico to act as an event router.

The following example from Fuse shows an example of a rule that uses an event recipient list that contains all the vehicles in the fleet. The list isn't static, it's computed by calling the activeVehicleSummary() function. This function could change the recipient list based on vehicles being added or removed from the fleet, or merely by vehicles being inactive (not connected to a device).

rule process_periodic_report_with_rcn {
  select when explicit periodic_report_routable
  foreach activeVehicleSummary() setting(vsum)
    pre {
      rcn = event:attr("report_correlation_number");
      channel = {"cid": vsum{"channel"}};
      ...
    }
    if(not rcn.isnull()) then {
      event:send(channel, "fuse", "periodic_vehicle_report")
	  with attrs = {
	    "report_correlation_number": rcn,
	    "vehicle_id": vsum{"deviceId"},
	    "start": common:convertToUTC(start),
	    "end": common:convertToUTC(end)
	  };
    }
    ...
}

Commentary:

  • The foreach statement runs the rule once for each vehicle returned by activeVehicleSummaries()
  • Each iteration of the foreach loop sets the vsum variable with the vehicle summary for a specific vehicle.
  • The rule routes the periodic_vehicle_report event to each vehicle using event:send() and the event channel in vsum.
  • The correlation number is taken from an event attribute.
  • If the correlation number is missing, the rule doesn't fire.
  • The correlation number is sent with the event to each vehicle pico.

Content-Based Event Routing

Another way to route messages is by content. In content-based event routing, the routing pico knows about some number of other picos and selects where to route the event based on the event domain, name, or attributes and other information such as the current state of the pico and external information from APIs.

The routing rule usually attaches a correlation identifier to the event before routing it.

event_routing
Event Routing

The route_to_owner rule from the Fuse Fleet ruleset is a simple example of this idea. A Fuse fleet can have more than one owner and the fleet needs to generally keep the owner aware of certain things by routing events.

rule route_to_owner {
  select when fuse new_fleet
           or fuse reminders_ready
           or fuse email_for_owner
  pre {
    owner_subs =
       CloudOS:subscriptionList(common:namespace(),"FleetOwner");
    // find the owner who contacted us (could be more than one)
    matching_owner = owner_subs.filter(
              function(sub){ sub{"backChannel"} eq meta:eci()
                           }
              );
    // use first owner if no match
    owner_list = matching_owner.length() > 0 => matching_owner
                                              | owner_subs;
    owner = owner_list.head().pick("$.eventChannel");
  }
  {
    send_directive("Routing to owner")
      with channel = owner 
       and attrs = event:attrs();
    event:send({"cid": owner}, "fuse", event:type())
      with attrs = event:attrs();
  }
}

Commentary:

  • The event expression is used to determine what events get routed to the owner. This method is static and bound early. KRL offers no mechanism at present for dynamically computing an event expression, but rule chaining with computed event types could be used to achieve a similar effect.
  • The rule is designed to route events to just one owner. The event is routed to the owner who sent the incoming event or to the first owner, if the incoming event didn't come from an owner.
  • If needed, this rule could be extended to route to all owners.

Pico Registration

The most general way to route events is to create a service directory and allow picos to register for events based on specific criteria.

pico registry
Pico Registration

In this pattern, picos send registration events that include information like their name, an event channel identifier, and attributes that are important in routing. The registration pico might be a special pico that serves as a directory in a large system or the registration might just be a ruleset in pico with other responsibilities. The registration pico might route events based on pico type, name, or other attributes.

One important feature of a directory is to allow picos to change their event channel for security reasons without losing service. A directory also allows picos to move to other hosting providers without loss of functionality.

A more complex example of this idea is to use a registrar. A registrar is a third party that manages the registry on behalf of the instances. The registrar watches for child_created or child_destroyed events to know when instances are created or destroyed and registers or deregisters them as appropriate. The registrar also periodically checks the health of instances and automatically deregisters those it deems incapacitated. The registrar decouples instances from the registry. They can be oblivious to the existence of the registry so that they never need explicitly register.

There is no good example of this pattern in Fuse. However, the following example code shows how this could work. First let's look at a simple rule to process registration events:

rule register_picos {
  select when system pico_registration
  pre {
    topic = event:attr("topic");
    pico_data =  makeRegistrationRecord(event:attrs());
  }
  if(not pico_data{"eci"}.isnull()) then noop();
  fired {
     set ent:registrations{topic} = pico_data
  }
}

Commentary:

  • The rule makes use of a topic event attribute to determine which topic the registering pico is interested in.
  • The rule uses a function, makeRegistrationRecord() to process the incoming event attributes and create a record of anything important.
  • The rule only fires if the incoming registration event includes an event channel. Obviously, this check could include any necessary required information.
  • The rule ultimately stores the registration request in an entity variable called ent:registrations by topic.

The following code routes to registered picos by topic. This rule would be in the same pico as the pico_registration rule shown above.

rule route_to_registered {
  select when fuse events_to_route
           or fuse another_event_to_route
  foreach ent:registrations{event:attr("topic")}
    setting(registered_pico)
    pre {
      channel = {"cid": registered_pico{"eci"}}
    }
    event:send(channel, "fuse", event:type()) with attrs = event:attrs();
}

Commentary:

  • This rule selects on any routable events that have been set in the rule's event expression.
  • The foreach loop will run once for any pico in the ent:registrations variable for the topic specified in the attributes of the incoming event. The topic could be computed rather than relying on it being in the incoming event.
  • All the incoming event attributes and the incoming event type are routed to the picos that are registered for the topic.

Aggregators

The aggregator pattern listens for incoming events and collects them. Once the required events have been collected, the aggregator raises an event or takes some another action.

event_aggregation
Event Aggregation

The following two rules from the Fuse Fleet ruleset comprise an event aggregator. The first, catch_periodic_vehicle_reports, watches for periodic_vehicle_report_created events from one of the vehicles in the fleet, saves the information in the event as that vehicle's report, and raises an event that indicates the vehicle report was added.

rule catch_periodic_vehicle_reports {
  select when fuse periodic_vehicle_report_created

  pre {
    vehicle_id = event:attr("vehicle_id");
    rcn = event:attr("report_correlation_number");
    updated_vehicle_reports =
        (ent:vehicle_reports{[rcn,"reports"]})
		  .defaultsTo([])
	          .append(event:attr("vehicle_details").decode());

  }
  noop();
  always {
    set ent:vehicle_reports{[rcn,"reports"]} updated_vehicle_reports;
    raise explicit event periodic_vehicle_report_added with
      report_correlation_number = rcn
  }

}    

Commentary:

  • The report correlation number is sent with the incoming event as an event attribute.
  • The report correlation number is used to store the report in an entity variable. Thus multiple simultaneous reports could in play at the same time without interfering with each other.
  • The same report correlation number is raised with the periodic_vehicle_report_added event as an attribute.

The second rule, determines when sufficient reports have been collected. In this case, it's comparing the number of reports received with the number of active vehicles. So long as there are insufficient reports received, the the rule does nothing.

rule check_periodic_report_status {
  select when explicit periodic_vehicle_report_added
  pre {
    rcn = event:attr("report_correlation_number");
    vehicles_in_fleet = activeVehicleSummary().length();
    number_of_reports_received = (ent:vehicle_reports{[rcn,"reports"]})
				   .length();
  }
  if ( vehicles_in_fleet <= number_of_reports_received ) then noop();
  fired {
    log "process vehicle reports ";
    raise explicit event periodic_report_ready with
      report_correlation_number = rcn;
  } else {
    log "we're still waiting for " +
        (vehicles_in_fleet - number_of_reports_received) +
	" reports on #{rcn}";
  }
}

Commentary:

  • The prelude calculates how many vehicle reports have been received using the report correlation number.
  • The report correlation number is passed along with the periodic_report_ready event so that any downstream rules can process the right report.
  • We use the log() statement in the else clause of the postlude to show information in the logs about how many reports have been received.

Scatter-Gather Pattern

The scatter-gather pattern is useful when you need to get some number of picos to do something and then aggregate the results to complete the computation. This is a common pattern for asynchronous processing in reactive systems. Events are sent asynchronously and, consequently, the sending pico does not block and is free to process other data while it's waiting for the result. Similarly, the picos that receive the event can process the event and respond when ready.

Fuse uses the scatter-gather pattern in creating weekly vehicle reports. We've already seen the process_periodic_report_with_rcn rule in the Event Recipient Lists pattern. This rule scatters events telling the vehicles that the fleet needs the vehicle report. The rules we just saw in the Aggregator pattern are the gathering part of this set of rules.

When we combine the pictures from those two patterns, we get a set up that looks like this:

scatter_gather
Scatter Gather

The scatter-gather pattern and it's use in Fuse is described in some detail in Using the Scatter-Gather Pattern to Asynchronously Create Fuse Reports. Here's a diagram showing the specific interactions in Fuse:

scatter-gather pattern
Scatter-Gather Pattern in Fuse

The owner pico kicks everything off by sending the request_periodic_report event to the fleet. The start_periodic_report rule in the fleet pico scatters the periodic_vehicle_report event to each vehicle in the fleet, whether there's 1 or 100. Of course, these events are asynchronous as well. Consequently the vehicle picos are not under time pressure to complete.

When each vehicle pico completes, it sends a periodic_vehicle_report_created event to the fleet pico. The catch_vehicle_reports rule is listening and gathers the reports. Once it's added the vehicle report, it fires the periodic_vehicle_report_added event. Another rule in the fleet pico, check_report_status is checking to see if every vehicle has responded. When the number of reports equals the number of vehicles, it raises the periodic_report_data_ready event and the data is turned into a report and the owner pico is notified it's ready for emailing.

As we've seen, these rules make extensive use of the report correlation number to ensure that reports are not intermingled if a request_periodic_report event happens to be sent before the previous one finishes.

Dealing with Failure

Because events may be lost, asynchronous systems have to be prepared to deal with failure.

event failure
Event Failure

In the case of Fuse, the start_periodic_report rule that we saw in the section on Correlating Events also schedules the periodic_report_timer_expired event for two minutes in the future:

schedule explicit event "periodic_report_timer_expired"
  at time:add(time:now(),{"minutes" : 2}) 
  attributes {"report_correlation_number": rcn, "timezone": tz}

Another rule, retry_from_expired_timer, listens for this event and retries missing vehicles:

rule retry_from_expired_timer {
  select when explicit periodic_report_timer_expired
  pre {
    max_retries = 2;
    rcn = event:attr("report_correlation_number");
    tz = event:attr("timezone");
    vehicle_summaries = vehicleSummary();
    vehicle_reports = ent:vehicle_reports{[rcn,"reports"]}
                        .defaultsTo([]);
    vehicle_summaries_keys = vehicle_summaries
                               .map(function(r){r{"deviceId"}});
    vehicle_reports_keys = vehicle_reports
                             .map(function(r){r{"deviceId"}});

    missing_vehicles = vehicle_summaries_keys
			   .difference(vehicle_reports_keys);
    in_array = function(k,a){
      a.filter(function(x){x eq k}).length() > 0;
    };
    needed = vehicle_summaries.filter(
                 function(s){
		   in_array(s{"deviceId"}, missing_vehicles)
		 });

    rcn_unprocessed = not ent:vehicle_reports{rcn}.isnull();
  }
  if ( needed.length() > 0
    && ent:retry_count < max_retries
     ) then {
    noop();
  }
  fired {
    log "Retrying for " + (needed.length()) + " vehicles";
    set ent:retry_count ent_retry_count+1;
    raise fuse event periodic_report_start attributes {
       "vehicle_summaries": needed,
       "timezone": tz,
       "report_correlation_number": rcn 
      } if rcn_unprocessed;
    schedule explicit event "periodic_report_timer_expired"
      at time:add(time:now(),{"minutes" : 2}) 
      attributes {"report_correlation_number": rcn, "timezone": tz}
  } else {
    clear ent:retry_count;
    raise explicit event periodic_report_ready with
      report_correlation_number = rcn if rcn_unprocessed;
  }
}

Commentary:

  • The retry in two minutes is somewhat arbitrary. Since reports are only generated once a week, waiting two minutes to retry does not seem overly long.
  • The logic in the prelude of this rule is primarily concerned with calculating needed, a list of the vehicles that have not yet sent reports.
  • If reports are needed, the same event, periodic_report_start, is raised, but with a vehicle_summaries attribute.
  • The scheduled event is also reset in case this retry fails as well.
  • The rule is designed to retry a maximum number of times. If the maximum is reached, the rule raises the periodic_report_ready event even if not all reports have been received. The report is processed without them.

Conclusions

Part of the challenge in building reactive systems is leaning to program in a completely new style. Reactive systems are by nature asynchronous and loosely coupled. Furthermore, program flow doesn't follow in the same manner as in object-oriented or imperative systems, but rather is based on the event-flow. Rule-based systems may have any number of independent responses to a given event. Knowing patterns can help ease the transition to this new style of programming.

This post has demonstrated and explained a number intra-pico and pico-to-pico event patterns. The list we have reviewed is by no means exhaustive, but it is indicative of the wide variety of event processing that happens in actor-based systems. The Fuse code base is open-source and thus represents an example of a large, Internet of Things system programmed in a reactive style using the actor model.


  1. KRL data structures are just JSON.
  2. In general, KRL is designed so that declarations in the prelude cannot cause side effects to persistent variables.

Please leave comments using the Hypothes.is sidebar.

Last modified: Tue Feb 4 15:15:13 2020.