Manual
It may be worth familiarizing yourself with the concepts in the Asyncrhonous Programming chapter of the Julia manual before proceeding.
@topic & Topics
Time-varying states are represented by the Topic{T}
type. A topic is a thread-safe container holding values of type T
. The name is borrowed from ROS; they fulfill a similar role to ROS topics, but work quite differently.
Topics are like (mostly) thread-safe Observables. They are essentially a 2-element circular buffer with mutual exclusion enforced on writes, but allowing unlimited concurrent reads, which reuse the most recently written value. They can be thought of as a Last-In, Only-Out (LIOO?) queue, or simply a thread-safe box containing a variable.
The idea is that the most recent value written to the topic is the most valid representation of that state, older values are obsolete, and discarded. It is generally advised to only have a single task be the "source" of the topic.
T
can be any type available in Julia itself: primitive types like UInt16
, abstract types like Number
, or even Any
, dicts or structs encoding custom message types, variable length arrays, images, simulation models, symbolic differential equations, or even julia source code.
They can be made using the @topic
macro, which creates an automatically named topic with an inferred type, bound to the specified variable:
@topic x::Int = 10.0 # x holds Int64s, with initial value of 10
@topic y::Number = 10.0 # y holds Numbers, initially the Float64 10.0
@topic z::Number = 10 # z holds Numbers, initially the Int64 10
@topic a::Any = plot(rand(10)) # the world is your oyster
They can also be created manually, for example in loops or generators:
ys = [Topic(0; name="y$i") for i in 1:100]
zs = [Topic{Float64}(0; name="y$i") for i in 1:100]
The variable represents the topic itself. Their value can be accessed or set using []
:
@topic x = 0.0
@topic y = 0.0
x[] = 1 # set the value of x
1 == x[] # use the value of x
typeof(x) # Topic{Float64}
typeof(x[]) # Float64
y[] = sin(x[])
Note that things get tricky when the topic is a mutable type. As a general rule, don't mutate the value of a topic - replace it with a new value instead:
@topic x = [1,2,3]
x[] = [1,2,3,4] # this is fine
push!(x[], 5) # this is not
Once I find an elegant way to automate mutation, I will add it. For now, consider:
let _x = x[]
push!(_x, 5)
x[] = _x
end
@on
The @on
macro builds a task which will run in response to a topic update. For example:
@topic x = 0.0
@topic y = 0.0
@on x y[] = sin(x[])
Now, whenever x
is updated, y
will be updated to sin(x)
.
It expects one of the general forms:
@on topic "name" loop_ex
@on topic "name" init_ex loop_ex final_ex
topic
is the topic to react toname
is an optional stringinit_ex
is an expression to run once on task creationloop_ex
is the expression to run on each update to the topicfinal_ex
is an expression to run once on task destruction
Time
ReactiveToolkit uses the Nano
type for its internal representation of time, which corresponds to the system clock in nanoseconds as a UInt64
. This design choice was made to differentiate ReactiveToolkit's representation of time from the various notions of time provided by other packages, and maintain compatibility between them by requiring explicit conversions to Nano
s.
Nano
s can be created using the constructors nanos
, micros
, millis
, or seconds
, and their operation should be self-explanatory. For convenience, the Hz
, kHz
, and MHz
constructors are also provided, which return the period of the specified frequency in Nano
s.
ReactiveToolkit also natively supports conversion from Dates.AbstractTime
subtypes, such as Second
, Minute
, Hour
, Day
, etc. These can be used in place of the Nano
constructors above.
# the following are equivalent:
@after Hz(1/60) do_the_thing()
@after seconds(60) do_the_thing()
@after Second(60) do_the_thing()
@after Minute(1) do_the_thing()
In fact, it is possible to use any notion of time from any source by defining an appropriate constructor method for Nano
:
struct MartianDay
val::Number
end
Nano(d::MartianDay) = Nano(8.86426641e13*d.val)
@every MartianDay(1) println("and so, another day goes by on Mars")
ReactiveToolkit.now()
returns the current timestamp in Nano
s.
ReactiveToolkit.autosleep(t::Nanos)
will cycle through various sleep strategies to minimize CPU usage while still maintaining the specified period far more accurately than the OS scheduler would otherwise allow.
@after
The @after
macro builds a task which will run once on any available thread after a delay. For example:
@after seconds(1) println("hello from thread $(Threads.threadid())")
It expects the general form:
@after delay "name" task_ex
delay
is the duration of the delayname
is an optional stringtask_ex
is the expression to delay the execution of
A useful design pattern is to use @after
to control task lifetime:
@after seconds(1) begin
i = 0
task = @every millis(10) println("hello! i is $(i+=1)")
@after seconds(3) kill(task)
end
@every
The @every
macro builds a task which will run repeatedly on any available thread at a specified interval. For example:
@every seconds(10) println("... is this annoying yet?")
@every Minute(15) println("remember to take a break!")
# generate a 1Hz sine wave, updated at 100Hz:
@topic x = 0.0
@every millis(10) x[] = sin(2π*now()*1e-9)
It expects one of the general forms:
@every interval "name" loop_ex
@every interval "name" init_ex loop_ex final_ex
interval
is the duration of the delay between runsname
is a string (and is optional)init_ex
is an expression to run once on task creationloop_ex
is the expression to run on each intervalfinal_ex
is an expression to run once on task destruction
@loop
The @loop
macro builds a task which will run in response to an arbitrary event. This is the low-level primitive on which @on
and @every
are built. It is useful for interacting with hardware, or other external processes. Importantly: The user is responsible for ensuring that the loop task expression includes one blocking call to work properly. For example, here is an Arduino-style serial monitor which can be defined directly in the REPL:
using ReactiveToolkit, LibSerialPort
function SerialMonitor(addr)
# objects can be captured by the task
# but kept out of global scope
port = SerialPort(addr)
@loop "$addr serial monitor" begin
# initializer
!isopen(port) && open(port)
end begin
# loop task
println(readline(port))
end begin
# finalizer
isopen(port) && close(port)
end
end
tk = SerialMonitor("/dev/ttyACM0")
# do stuff
kill(tk)
This example shows the expanded syntax for including an optional initializer and finalizer in addition to the main loop expression. As mentioned above, loop task expression must inlcude a blocking call to work properly. In the example above, the task waits on readline(port)
- thus, it will run whenever a new packet arrives. For contrast, here is a more manual implementation:
using ReactiveToolkit, LibSerialPort
port = SerialPort("/dev/ttyACM0")
open(port)
tk = @loop "serial monitor" println(readline(port))
kill(tk)
close(port)
It is often useful to start with the manual version and build up to re-usable constructors as needed. Note that many older microcontrollers (which use a UART-based FTDI chip to implement USB communication) will also need a baud rate set as the second argument to the SerialPort constructor.
Killing Tasks
Tasks are stopped using kill(task)
:
tk = @every seconds(0.5) println("...is this annoying yet?")
# wait for it to get annoying
kill(tk)
If we forget to bind the task to a variable name (this happens often), it can be found and killed using rtk_tasks()
.
Note that kill only requests that the task stop. The task will continue to wait on its blocking call. If the task is waiting on an external event, it will continue to show as active
until that event occurs. I intend to rework the task killing mechanisms to be more robust, transparent, and extensible in the future.