NOTE This README is a livebook, so you can run it! watch out for the terminal output!
git clone https://github.com/anoma/engine.git
cd engine
mix deps.get
make livebook
Mix.install([
{:engine_system, path: "."}
])
This library is a work-in-progress implementation of the Engine Model in Elixir, following the formal specification described in Dynamic Effective Timed Communication Systems and Mailbox-as-Actors (under review).
An engine is an actor-like entity that enables type-safe message passing and effectful actions through guarded actions. These actions can modify both the engine's state and its environment.
Among other things, this library implements the Engine Model with a DSL and runtime system. The DSL lets you easily define engines that follow the formal specification, including configuration, state, message handling, and behaviours. The runtime system manages engine lifecycles, message passing, monitoring, and introspection.
- Elixir 1.18.0 or higher
- Erlang/OTP 28 or higher
# Check your Elixir version
System.version()
def deps do
[
{:engine_system, "~> 0.1.0"}
]
end
The package can be installed by adding engine_system
to your list of
dependencies in mix.exs
:
A complete implementation of the actor model with explicit mailbox-as-actors separation, based on the formal specifications described in the research paper. This system implements the core innovation of promoting mailboxes to first-class processing engines that receive messages but verify message writing using linked processing engines.
The DSL features compile-time validation, clean simplified syntax, and unified import. Let's start the system and explore some examples:
use EngineSystem
{:ok, _} = EngineSystem.start()
Let's start with a basic echo engine:
defengine SimpleEcho do
version "1.0.0"
mode :process
interface do
message :echo, text: :string
message :ping
end
behaviour do
on_message :echo, msg, _config, _env, sender do
IO.puts(IO.ANSI.blue() <> "HoTT is better than Cold" <> IO.ANSI.reset())
{:ok, [{:send, sender, {:echo, msg}}]}
end
on_message :ping, _msg, _config, _env, sender do
{:ok, [{:send, sender, :pong}]}
end
end
end
SimpleEcho.__engine_spec_
Now let's spawn an instance and test it:
{:ok, echo_address} = EngineSystem.spawn_engine(SimpleEcho)
IO.puts("Echo engine spawned at: #{inspect(echo_address)}")
echo_address
send_message(echo_address, {:echo, %{text: "Hello Engine System!"}})
And if you opened the livebook in a terminal, you can see the output/trace of the message passing.
send_message(echo_address, {:ping, %{}})
EngineSystem.get_system_info()
Here's an example of a stateless processing engine:
defengine StatelessCalculator do
version "1.0.0"
mode :process
interface do
message :add, a: :number, b: :number
message :multiply, a: :number, b: :number
message :divide, a: :number, b: :number
message :result, value: :number
end
behaviour do
on_message :add, msg, _config, _env, sender do
{a, b} = {msg[:a], msg[:b]}
{:ok, [{:send, sender, {:result, a + b}}]}
end
on_message :multiply, msg, _config, _env, sender do
{a, b} = {msg[:a], msg[:b]}
{:ok, [{:send, sender, {:result, a * b}}]}
end
on_message :divide, msg, _config, _env, sender do
{a, b} = {msg[:a], msg[:b]}
if b != 0 do
{:ok, [{:send, sender, {:result, a / b}}]}
else
{:ok, [{:send, sender, {:error, :division_by_zero}}]}
end
end
end
end
Let's test the calculator. Spawn the calculator:
{:ok, calc_address} = spawn_engine(StatelessCalculator)
send_message(calc_address, {:add, %{a: 10, b: 5}})
send_message(calc_address, {:multiply, %{a: 7, b: 8}})
send_message(calc_address, {:divide, %{a: 15, b: 3}})
send_message(calc_address, {:divide, %{a: 10, b: 0}}) # This should return an error
Now let's create an engine with configuration and environment (state):
defengine SimpleCounter do
version "1.0.0"
mode :process
config do
%{max_count: 100, step: 1}
end
env do
%{count: 0, total_operations: 0}
end
interface do
message(:increment)
message(:decrement)
message(:get_count)
message(:reset)
message(:count_response, value: :integer)
end
behaviour do
on_message :increment, _msg, config, env, sender do
new_count = min(env.count + config.step, config.max_count)
new_env = %{env | count: new_count, total_operations: env.total_operations + 1}
{:ok, [
{:update_environment, new_env},
{:send, sender, {:count_response, new_count}}
]}
end
on_message :decrement, _msg, config, env, sender do
new_count = max(env.count - config.step, 0)
new_env = %{env | count: new_count, total_operations: env.total_operations + 1}
{:ok, [
{:update_environment, new_env},
{:send, sender, {:count_response, new_count}}
]}
end
on_message :get_count, _msg, _config, env, sender do
{:ok, [{:send, sender, {:count_response, env.count}}]}
end
on_message :reset, _msg, _config, _env, sender do
new_env = %{count: 0, total_operations: 0}
{:ok, [
{:update_environment, new_env},
{:send, sender, {:count_response, 0}}
]}
end
end
end
Let's test the counter. Spawn the counter with default config:
{:ok, counter_address} = spawn_engine(SimpleCounter)
send_message(counter_address, {:get_count, %{}})
send_message(counter_address, {:increment, %{}})
send_message(counter_address, {:increment, %{}})
send_message(counter_address, {:get_count, %{}})
send_message(counter_address, {:decrement, %{}})
send_message(counter_address, {:get_count, %{}})
{:ok, counter_instance} = EngineSystem.lookup_instance({1,2})
state = EngineSystem.Engine.Instance.get_state(counter_instance.engine_pid)
Here's a more complex example with configuration and sophisticated state management:
defengine MyKVStore do
version "2.0.0"
mode :process
interface do
message :get, key: :atom
message :put, key: :atom, value: :any
message :delete, key: :atom
message :list_keys
message :clear
message :size
message :result, value: {:option, :any}
end
config do
%{
access_mode: :read_write,
max_size: 1000,
timeout: 30.5,
retries_enabled: true
}
end
env do
%{
store: %{},
access_counts: %{},
last_accessed: nil,
active_connections: 0
}
end
behaviour do
on_message :get, msg, _config, env, sender do
key = msg[:key]
value = Map.get(env.store, key, :not_found)
new_counts = Map.update(env.access_counts, key, 1, &(&1 + 1))
new_env = %{env | access_counts: new_counts, last_accessed: key}
{:ok, [
{:update_environment, new_env},
{:send, sender, {:result, value}}
]}
end
on_message :put, msg, config, env, sender do
{key, value} = {msg[:key], msg[:value]}
if map_size(env.store) >= config.max_size and not Map.has_key?(env.store, key) do
{:ok, [{:send, sender, {:error, :store_full}}]}
else
new_store = Map.put(env.store, key, value)
new_env = %{env | store: new_store, last_accessed: key}
{:ok, [
{:update_environment, new_env},
{:send, sender, :ack}
]}
end
end
on_message :delete, msg, _config, env, sender do
key = msg[:key]
new_store = Map.delete(env.store, key)
new_counts = Map.delete(env.access_counts, key)
new_env = %{env | store: new_store, access_counts: new_counts}
{:ok, [
{:update_environment, new_env},
{:send, sender, :ack}
]}
end
on_message :list_keys, _msg, _config, env, sender do
keys = Map.keys(env.store)
{:ok, [{:send, sender, {:result, keys}}]}
end
on_message :clear, _msg, _config, _env, sender do
new_env = %{store: %{}, access_counts: %{}, last_accessed: nil, active_connections: 0}
{:ok, [
{:update_environment, new_env},
{:send, sender, :ack}
]}
end
on_message :size, _msg, _config, env, sender do
size = map_size(env.store)
{:ok, [{:send, sender, {:result, size}}]}
end
end
end
Let's test the KV store. Spawn with custom configuration:
custom_config = %{access_mode: :read_write, max_size: 5}
{:ok, kv_address} = spawn_engine(MyKVStore, custom_config)
send_message(kv_address, {:put, %{key: :name, value: "Engine System"}})
send_message(kv_address, {:put, %{key: :version, value: "2.0.0"}})
send_message(kv_address, {:put, %{key: :language, value: "Elixir"}})
send_message(kv_address, {:get, %{key: :name}})
send_message(kv_address, {:list_keys, %{}})
send_message(kv_address, {:size, %{}})
IO.puts("KV Store tests sent!")
Let's explore the basic system management capabilities:
instances = EngineSystem.list_instances()
for instance <- instances do
{node_id,engine_id} = instance.address
{engine_name, eng_version} = instance.spec_key
IO.puts(" node_id=#{inspect(node_id)}, engine_id=#{inspect(engine_id)} - #{engine_name}-#{eng_version}")
end
system_info = EngineSystem.get_system_info()
IO.puts("System Information:")
IO.puts(" Running instances: #{system_info.running_instances}")
IO.puts(" Registered specs: #{system_info.total_specs}")
case EngineSystem.lookup_instance(kv_address) do
{:ok, info} ->
IO.puts("Engine found:")
{engine_name, eng_version} = info.spec_key
IO.puts(" Name: #{engine_name}")
IO.puts(" Version: #{eng_version}")
IO.puts(" Status: #{info.status}")
IO.puts(" Address: #{inspect(info.address)}")
error ->
IO.puts("Error looking up engine: #{inspect(error)}")
end
{:ok, named_address} = spawn_engine(SimpleCounter, %{max_count: 50}, %{}, :my_counter)
case EngineSystem.lookup_address_by_name(:my_counter) do
{:ok, address} ->
IO.puts("Found named engine at: #{inspect(address)}")
send_message(address, {:increment, %{}})
send_message(address, {:get_count, %{}})
error ->
IO.puts("Error finding named engine: #{inspect(error)}")
end
The system provides utilities for introspecting engine interfaces:
if EngineSystem.has_message?(:MyKVStore, "2.0.0", :get) do
IO.puts("MyKVStore supports :get message")
end
case EngineSystem.get_message_fields(:MyKVStore, "2.0.0", :put) do
{:ok, fields} ->
IO.puts("Fields for :put message: #{inspect(fields)}")
error ->
IO.puts("Error getting fields: #{inspect(error)}")
end
tags = EngineSystem.get_message_tags(:MyKVStore, "2.0.0")
IO.puts("MyKVStore supports messages: #{inspect(tags)}")
instance_tags = EngineSystem.get_instance_message_tags(kv_address)
IO.puts("Running instance supports: #{inspect(instance_tags)}")
The system provides message validation before sending:
case EngineSystem.validate_message(kv_address, {:get, %{key: :test}}) do
:ok ->
IO.puts("Message is valid!")
send_message(kv_address, {:get, %{key: :test}})
{:error, reason} ->
IO.puts("Message validation failed: #{inspect(reason)}")
end
case EngineSystem.validate_message(kv_address, {:invalid_message, %{}}) do
:ok ->
IO.puts("Message is valid!")
{:error, reason} ->
IO.puts("Expected validation failure: #{inspect(reason)}")
end
You can create custom mailbox engines for specialized message handling:
defengine PriorityMailbox do
version "1.0.0"
mode :mailbox # This is a mailbox engine
interface do
message :high_priority, content: :any
message :normal_priority, content: :any
message :low_priority, content: :any
end
env do
%{
high_queue: :queue.new(),
normal_queue: :queue.new(),
low_queue: :queue.new()
}
end
behaviour do
# Mailbox engines handle message queuing differently
on_message :high_priority, payload, _config, env, _sender do
new_high_queue = :queue.in(payload, env.high_queue)
new_env = %{env | high_queue: new_high_queue}
{:ok, [{:update_environment, new_env}]}
end
on_message :normal_priority, payload, _config, env, _sender do
new_normal_queue = :queue.in(payload, env.normal_queue)
new_env = %{env | normal_queue: new_normal_queue}
{:ok, [{:update_environment, new_env}]}
end
on_message :low_priority, payload, _config, env, _sender do
new_low_queue = :queue.in(payload, env.low_queue)
new_env = %{env | low_queue: new_low_queue}
{:ok, [{:update_environment, new_env}]}
end
end
end