Elixir ports: GenServer

Posted on Fri 29 May 2020 in Elixir

This is a draft!

In the previous post, we introduce ports as a way to communicate with external commands from Elixir. As you remember, the connected process is the Elixir process that creates the port. Both the connected process and the port follow a protocol for sending and receiving messages. If you are familiar with Elixir, you may know that GenServer provides a convenient and robust abstraction for handling messages between processes, similar to a client-server scheme. In this post, we use GenServer as a proxy between the connected process and the port. Here, GenServer will manage the message between them and, allowing us to add more features to our application, like better logging and failure handling.

Overview

In the previous post, we create a port for adding two numbers. We talk with the port like this:

port = ....
Process.command(port, "1,2\n")..
flush
Process.

Although it is effective, it can be tedious to communicate with the port that way. Also, that approach has other limitations (I may exaggerate them, but just stick with me):

  • The connected process is not notified when the port dies (unless we monitor the port but, even with that, we could not know whether the port crashed). In other words, if we monitor the port, the connected process can receive a {:DOWN, ...} message. However, we cannot receive a {:EXIT, ...} message, even monitoring the port. Such a message tells us that the external process crashed.
  • There is no logging support, making debugging harder.
  • It is a bit verbose to write Process.command -> flush -> Process.command ....

We address these limitations by using GenServer for (1) handling messages with pattern matching , (2) adding log support, and (3) simplifying the API for port communication. At the end of this post, we can interact with the port like this:

pid = PortGenServer.start_link()
result = PortGenServer.command(pid, [1, 2])     # async command (cast)

GenServer recap

If you are unfamiliar with GenServer, I recommend to check this post before going any further.

Cosas por mencionar en el post:

  • port es un proceso child que envia mensajes al proceso padre (en este caso, el shell). El proceso padre es el proceso que invoca a Port.open: puede ser iex o puede ser un proceso genserver. Nota que no hay necesidad de establecer o enviar el pid del proceso padre al child para que el child envie mensajes, es algo similar a spawn_link. Podemos ver la relacion entre port y el proceso padre con Port.info:
iex(1)> port = Port.open({:spawn, "cat"}, [:binary])
#Port<0.6>

iex(2)> Port.info(port)
[
  name: 'cat',
  links: [#PID<0.105.0>],
  id: 48,
  connected: #PID<0.105.0>,     # aqui esta el pid del proceso padre
  input: 0,
  output: 0,
  os_pid: 8674
]

iex(3)> self()                  # proceso padre
#PID<0.105.0>
  • Dado que el link se da automaticamente, el codigo de tony no estaba mal (como creia). Al inicio, creia que el port debia mandarse en el estado inicial del genserver, como lo hace juric y geoff:
# codigo de tony
def init(_args \\ []) do

    IO.puts "At init()..."

    port = Port.open({:spawn, @command}, [:binary, :exit_status])

    IO.puts "port: #{inspect port}"

    {:ok, %{latest_output: nil, exit_status: nil} }     # el no incluye pid en el state, 
                                                        # se me hacia raro, pero no es necesario
  end
  • Cuando hablemos del genserver, revisar la especificacion de handle_info(msg, state) y decir que msg tiene esta forma cuando port envia el mensaje: {port, {:data, text_line}}. No recuerdo donde lei que tmb habia un old_state.

  • Debemos estructurar el post asi:

  • Iniciar con el toy example, indicando que podemos ejecutar un comando (cat en este caso) y enviar/recibir mensajes entre el shell y el port. Extender el ejemplo con un script python/ruby (como en el ejemplo de tony).

``` iex(1)> cmd = "./bin/long_running.rb"

iex(2)> port = Port.open({:spawn, cmd}, [:binary, :exit_status])
#Port<0.6>

iex(3)> flush() {#Port<0.6>, {:data, "Starting up\n"}} {#Port<0.6>, {:data, "Progress: step 1 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 2 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 3 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 4 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 5 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 6 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 7 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 8 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 9 of 10\n"}} {#Port<0.6>, {:data, "Progress: step 10 of 10\n"}} {#Port<0.6>, {:data, "Done\n"}} {#Port<0.6>, {:exit_status, 0}} :ok

iex(5)> port = Port.open({:spawn, "cat"}, [:binary]) #Port<0.7> iex(6)> Port.info(port)
[ name: 'cat', links: [#PID<0.105.0>], id: 56, connected: #PID<0.105.0>, input: 0, output: 0, os_pid: 7901 ] iex(7)> self() #PID<0.105.0> iex(8)> :inspect :inspect iex(9)> inspect port "#Port<0.7>"

```

  • Establecer/Remarcar que la comunicacion entre el proceso padre (shell, o mejor dicho, el proceso que invoca a Port.open) y el proceso hijo es mediante paso de mensajes. Dado esto, podemos encapsular el paso de mensjaes en un genserver para crear una api, asi nos ahorramos el escribir los requests a mano, como este send(port, {self(), {:command, "hello"}}).
  • Crear un ejemplo sencillo con genserver, ya sea usando el codigo de tony, juric o geoff.

interesting link

https://stackoverflow.com/questions/48416742/is-it-possible-to-send-a-message-to-all-child-processes-in-elixir-erlang

Codigo de tony

# port_example.ex
defmodule PortsExample do
  @moduledoc """
  Documentation for PortsExample.
  """

  @doc """
  Hello world.

  ## Examples

      iex> PortsExample.hello()
      :world

  """
  def hello do
    :world
  end
end
# basic_port.ex
defmodule PortsExample.BasicPort do                                                                                                                            
  use GenServer
  require Logger

  @command "./bin/long_running.rb"

  # GenServer API
  def start_link(args \\ [], opts \\ []) do
    GenServer.start_link(__MODULE__, args, opts)
  end

  def init(_args \\ []) do
    port = Port.open({:spawn, @command}, [:binary, :exit_status])

    {:ok, %{latest_output: nil, exit_status: nil} }
  end

  # This callback handles data incoming from the command's STDOUT
  def handle_info({port, {:data, text_line}}, state) do
    latest_output = text_line |> String.trim

    Logger.info "Latest output: #{latest_output}"

    {:noreply, %{state | latest_output: latest_output}}
  end

  # This callback tells us when the process exits
  def handle_info({port, {:exit_status, status}}, state) do
    Logger.info "External exit: :exit_status: #{status}"

    new_state = %{state | exit_status: status}
    {:noreply, %{state | exit_status: status}}
  end

  # no-op catch-all callback for unhandled messages
  def handle_info(_msg, state), do: {:noreply, state}
end
# monitored_port.ex

defmodule PortsExample.MonitoredPort do
  use GenServer
  require Logger

  @command "./bin/long_running.rb"

  # GenServer API
  def start_link(args \\ [], opts \\ []) do
    GenServer.start_link(__MODULE__, args, opts)
  end

  def init(_args \\ []) do
    port = Port.open({:spawn, @command}, [:binary, :exit_status])
    Port.monitor(port)

    {:ok, %{port: port, latest_output: nil, exit_status: nil} }
  end

  # This callback handles data incoming from the command's STDOUT
  def handle_info({port, {:data, text_line}}, %{port: port} = state) do
    Logger.info "Data: #{inspect text_line}"
    {:noreply, %{state | latest_output: String.trim(text_line)}}
  end

  # This callback tells us when the process exits
  def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
    Logger.info "Port exit: :exit_status: #{status}"

    new_state = %{state | exit_status: status}

    {:noreply, new_state}
  end

  def handle_info({:DOWN, _ref, :port, port, :normal}, state) do
    Logger.info "Handled :DOWN message from port: #{inspect port}"
    {:noreply, state}
  end

  def handle_info(msg, state) do
    Logger.info "Unhandled message: #{inspect msg}"
    {:noreply, state}
  end

end
# trap_process_crash.ex
defmodule PortsExample.TrapProcessCrash do
  use GenServer
  require Logger

  @command "./bin/long_running.rb"

  # GenServer API
  def start_link(args \\ [], opts \\ []) do
    GenServer.start_link(__MODULE__, args, opts)
  end

  def init(args \\ []) do
    Process.flag(:trap_exit, true)

    port = Port.open({:spawn, @command}, [:binary, :exit_status])
    Port.monitor(port)

    {:ok, %{port: port, latest_output: nil, exit_status: nil} }
  end

  def terminate(reason, %{port: port} = state) do
    Logger.info "** TERMINATE: #{inspect reason}. This is the last chance to clean up after this process."
    Logger.info "Final state: #{inspect state}"

    port_info = Port.info(port)
    os_pid = port_info[:os_pid]

    Logger.warn "Orphaned OS process: #{os_pid}"

    :normal
  end

  # This callback handles data incoming from the command's STDOUT
  def handle_info({port, {:data, text_line}}, %{port: port} = state) do
    Logger.info "Data: #{inspect text_line}"
    {:noreply, %{state | latest_output: String.trim(text_line)}}
  end

  # This callback tells us when the process exits
  def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
    Logger.info "Port exit: :exit_status: #{status}"

    new_state = %{state | exit_status: status}

    {:noreply, new_state}
  end

  def handle_info({:DOWN, _ref, :port, port, :normal}, state) do
    Logger.info "Handled :DOWN message from port: #{inspect port}"
    {:noreply, state}
  end

  def handle_info({:EXIT, port, :normal}, state) do
    Logger.info "handle_info: EXIT"
    {:noreply, state}
  end

  def handle_info(msg, state) do
    Logger.info "Unhandled message: #{inspect msg}"
    {:noreply, state}
  end

end

References