Elixir ports: GenServer
Posted on Fri 29 May 2020 in Elixir
This is a draft!
In the previous post, we introduce ports as a way to communicate with external commands from Elixir. As you remember, the connected process is the Elixir process that creates the port. Both the connected process and the port follow a protocol for sending and receiving messages. If you are familiar with Elixir, you may know that GenServer
provides a convenient and robust abstraction for handling messages between processes, similar to a client-server scheme. In this post, we use GenServer
as a proxy between the connected process and the port. Here, GenServer
will manage the message between them and, allowing us to add more features to our application, like better logging and failure handling.
Overview
In the previous post, we create a port for adding two numbers. We talk with the port like this:
port = ....
Process.command(port, "1,2\n")..
flush
Process.
Although it is effective, it can be tedious to communicate with the port that way. Also, that approach has other limitations (I may exaggerate them, but just stick with me):
- The connected process is not notified when the port dies (unless we monitor the port but, even with that, we could not know whether the port crashed). In other words, if we monitor the port, the connected process can receive a
{:DOWN, ...}
message. However, we cannot receive a{:EXIT, ...}
message, even monitoring the port. Such a message tells us that the external process crashed. - There is no logging support, making debugging harder.
- It is a bit verbose to write
Process.command -> flush -> Process.command ...
.
We address these limitations by using GenServer
for (1) handling messages with pattern matching , (2) adding log support, and (3) simplifying the API for port communication. At the end of this post, we can interact with the port like this:
pid = PortGenServer.start_link()
result = PortGenServer.command(pid, [1, 2]) # async command (cast)
GenServer recap
If you are unfamiliar with GenServer, I recommend to check this post before going any further.
Cosas por mencionar en el post:
port
es un proceso child que envia mensajes al proceso padre (en este caso, el shell). El proceso padre es el proceso que invoca aPort.open
: puede seriex
o puede ser un proceso genserver. Nota que no hay necesidad de establecer o enviar el pid del proceso padre al child para que el child envie mensajes, es algo similar aspawn_link
. Podemos ver la relacion entreport
y el proceso padre conPort.info
:
iex(1)> port = Port.open({:spawn, "cat"}, [:binary])
#Port<0.6>
iex(2)> Port.info(port)
[
name: 'cat',
links: [#PID<0.105.0>],
id: 48,
connected: #PID<0.105.0>, # aqui esta el pid del proceso padre
input: 0,
output: 0,
os_pid: 8674
]
iex(3)> self() # proceso padre
#PID<0.105.0>
- Dado que el link se da automaticamente, el codigo de tony no estaba mal (como creia). Al inicio, creia que el
port
debia mandarse en el estado inicial del genserver, como lo hace juric y geoff:
# codigo de tony
def init(_args \\ []) do
IO.puts "At init()..."
port = Port.open({:spawn, @command}, [:binary, :exit_status])
IO.puts "port: #{inspect port}"
{:ok, %{latest_output: nil, exit_status: nil} } # el no incluye pid en el state,
# se me hacia raro, pero no es necesario
end
-
Cuando hablemos del genserver, revisar la especificacion de
handle_info(msg, state)
y decir quemsg
tiene esta forma cuandoport
envia el mensaje:{port, {:data, text_line}}
. No recuerdo donde lei que tmb habia unold_state
. -
Debemos estructurar el post asi:
-
Iniciar con el toy example, indicando que podemos ejecutar un comando (
cat
en este caso) y enviar/recibir mensajes entre el shell y el port. Extender el ejemplo con un script python/ruby (como en el ejemplo de tony).
``` iex(1)> cmd = "./bin/long_running.rb"
iex(2)> port = Port.open({:spawn, cmd}, [:binary, :exit_status])
#Port<0.6>
iex(3)> flush() {#Port<0.6>, {:data, "Starting up\n"}} {#Port<0.6>, {:data, "Progress: step 1 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 2 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 3 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 4 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 5 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 6 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 7 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 8 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 9 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 10 of 10\n"}} {#Port<0.6>, {:data, "Done\n"}} {#Port<0.6>, {:exit_status, 0}} :ok
iex(5)> port = Port.open({:spawn, "cat"}, [:binary])
#Port<0.7>
iex(6)> Port.info(port)
[
name: 'cat',
links: [#PID<0.105.0>],
id: 56,
connected: #PID<0.105.0>,
input: 0,
output: 0,
os_pid: 7901
]
iex(7)> self()
#PID<0.105.0>
iex(8)> :inspect
:inspect
iex(9)> inspect port
"#Port<0.7>"
```
- Establecer/Remarcar que la comunicacion entre el proceso padre (shell, o mejor dicho, el proceso que invoca a
Port.open
) y el proceso hijo es mediante paso de mensajes. Dado esto, podemos encapsular el paso de mensjaes en un genserver para crear una api, asi nos ahorramos el escribir los requests a mano, como estesend(port, {self(), {:command, "hello"}})
. - Crear un ejemplo sencillo con genserver, ya sea usando el codigo de tony, juric o geoff.
interesting link
https://stackoverflow.com/questions/48416742/is-it-possible-to-send-a-message-to-all-child-processes-in-elixir-erlang
Codigo de tony
# port_example.ex
defmodule PortsExample do
@moduledoc """
Documentation for PortsExample.
"""
@doc """
Hello world.
## Examples
iex> PortsExample.hello()
:world
"""
def hello do
:world
end
end
# basic_port.ex
defmodule PortsExample.BasicPort do
use GenServer
require Logger
@command "./bin/long_running.rb"
# GenServer API
def start_link(args \\ [], opts \\ []) do
GenServer.start_link(__MODULE__, args, opts)
end
def init(_args \\ []) do
port = Port.open({:spawn, @command}, [:binary, :exit_status])
{:ok, %{latest_output: nil, exit_status: nil} }
end
# This callback handles data incoming from the command's STDOUT
def handle_info({port, {:data, text_line}}, state) do
latest_output = text_line |> String.trim
Logger.info "Latest output: #{latest_output}"
{:noreply, %{state | latest_output: latest_output}}
end
# This callback tells us when the process exits
def handle_info({port, {:exit_status, status}}, state) do
Logger.info "External exit: :exit_status: #{status}"
new_state = %{state | exit_status: status}
{:noreply, %{state | exit_status: status}}
end
# no-op catch-all callback for unhandled messages
def handle_info(_msg, state), do: {:noreply, state}
end
# monitored_port.ex
defmodule PortsExample.MonitoredPort do
use GenServer
require Logger
@command "./bin/long_running.rb"
# GenServer API
def start_link(args \\ [], opts \\ []) do
GenServer.start_link(__MODULE__, args, opts)
end
def init(_args \\ []) do
port = Port.open({:spawn, @command}, [:binary, :exit_status])
Port.monitor(port)
{:ok, %{port: port, latest_output: nil, exit_status: nil} }
end
# This callback handles data incoming from the command's STDOUT
def handle_info({port, {:data, text_line}}, %{port: port} = state) do
Logger.info "Data: #{inspect text_line}"
{:noreply, %{state | latest_output: String.trim(text_line)}}
end
# This callback tells us when the process exits
def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
Logger.info "Port exit: :exit_status: #{status}"
new_state = %{state | exit_status: status}
{:noreply, new_state}
end
def handle_info({:DOWN, _ref, :port, port, :normal}, state) do
Logger.info "Handled :DOWN message from port: #{inspect port}"
{:noreply, state}
end
def handle_info(msg, state) do
Logger.info "Unhandled message: #{inspect msg}"
{:noreply, state}
end
end
# trap_process_crash.ex
defmodule PortsExample.TrapProcessCrash do
use GenServer
require Logger
@command "./bin/long_running.rb"
# GenServer API
def start_link(args \\ [], opts \\ []) do
GenServer.start_link(__MODULE__, args, opts)
end
def init(args \\ []) do
Process.flag(:trap_exit, true)
port = Port.open({:spawn, @command}, [:binary, :exit_status])
Port.monitor(port)
{:ok, %{port: port, latest_output: nil, exit_status: nil} }
end
def terminate(reason, %{port: port} = state) do
Logger.info "** TERMINATE: #{inspect reason}. This is the last chance to clean up after this process."
Logger.info "Final state: #{inspect state}"
port_info = Port.info(port)
os_pid = port_info[:os_pid]
Logger.warn "Orphaned OS process: #{os_pid}"
:normal
end
# This callback handles data incoming from the command's STDOUT
def handle_info({port, {:data, text_line}}, %{port: port} = state) do
Logger.info "Data: #{inspect text_line}"
{:noreply, %{state | latest_output: String.trim(text_line)}}
end
# This callback tells us when the process exits
def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
Logger.info "Port exit: :exit_status: #{status}"
new_state = %{state | exit_status: status}
{:noreply, new_state}
end
def handle_info({:DOWN, _ref, :port, port, :normal}, state) do
Logger.info "Handled :DOWN message from port: #{inspect port}"
{:noreply, state}
end
def handle_info({:EXIT, port, :normal}, state) do
Logger.info "handle_info: EXIT"
{:noreply, state}
end
def handle_info(msg, state) do
Logger.info "Unhandled message: #{inspect msg}"
{:noreply, state}
end
end