Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `Xander.Query` support for retrying connection to node when it is not available.
- `Xander.Transport` module to abstract transport layer details for connecting
to Cardano nodes, providing a unified interface for both Unix socket
connections (via `:gen_tcp`) and SSL connections (via `:ssl`).

### Changed

- Refactored `Xander.Query` to use the new `Xander.Transport` module,
eliminating the need for mini-protocol implementations to handle
transport-specific logic.

### Fixed

Expand Down
81 changes: 24 additions & 57 deletions lib/xander/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ defmodule Xander.Query do
alias Xander.Handshake
alias Xander.Messages
alias Xander.Query.Response
alias Xander.Transport

require Logger

@basic_tcp_opts [:binary, active: false, send_timeout: 4_000]
@active_n2c_versions [9, 10, 11, 12, 13, 14, 15, 16, 17]

defstruct [:client, :path, :port, :socket, :network, queue: :queue.new()]
defstruct [:transport, :socket, :network, queue: :queue.new()]

##############
# Public API #
Expand Down Expand Up @@ -47,10 +47,10 @@ defmodule Xander.Query do
"""
@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(network: network, path: path, port: port, type: type) do
transport = Transport.new(path: path, port: port, type: type)

data = %__MODULE__{
client: tcp_lib(type),
path: maybe_local_path(path, type),
port: maybe_local_port(port, type),
transport: transport,
network: network,
socket: nil
}
Expand Down Expand Up @@ -93,13 +93,9 @@ defmodule Xander.Query do
def disconnected(
:internal,
:connect,
%__MODULE__{client: client, path: path, port: port} = data
%__MODULE__{transport: transport} = data
) do
case client.connect(
maybe_parse_path(path),
port,
tcp_opts(client, path)
) do
case Transport.connect(transport) do
{:ok, socket} ->
data = %__MODULE__{data | socket: socket}
actions = [{:next_event, :internal, :establish}]
Expand Down Expand Up @@ -131,15 +127,16 @@ defmodule Xander.Query do
def connected(
:internal,
:establish,
%__MODULE__{client: client, socket: socket, network: network} = data
%__MODULE__{transport: transport, socket: socket, network: network} = data
) do
:ok =
client.send(
Transport.send(
transport,
socket,
Handshake.Proposal.version_message(@active_n2c_versions, network)
)

case client.recv(socket, 0, _timeout = 5_000) do
case Transport.recv(transport, socket, 0, _timeout = 5_000) do
{:ok, full_response} ->
{:ok, _handshake_response} = Handshake.Response.validate(full_response)

Expand All @@ -163,12 +160,12 @@ defmodule Xander.Query do
def established_no_agency(
{:call, from},
{:request, query_name},
%__MODULE__{client: client, socket: socket} = data
%__MODULE__{transport: transport, socket: socket} = data
) do
# Send acquire message and handle response
with :ok <- client.send(socket, Messages.msg_acquire()),
{:ok, _acquire_response} <- client.recv(socket, 0, _timeout = 5_000),
:ok <- setopts_lib(client).setopts(socket, active: :once) do
with :ok <- Transport.send(transport, socket, Messages.msg_acquire()),
{:ok, _acquire_response} <- Transport.recv(transport, socket, 0, _timeout = 5_000),
:ok <- Transport.setopts(transport, socket, active: :once) do
# Track the caller and query_name, then transition to
# established_has_agency state prior to sending the query.
data = update_in(data.queue, &:queue.in({from, query_name}, &1))
Expand Down Expand Up @@ -219,16 +216,16 @@ defmodule Xander.Query do
def established_has_agency(
:internal,
:send_query,
%__MODULE__{client: client, socket: socket} = data
%__MODULE__{transport: transport, socket: socket} = data
) do
# Get the current query_name from queue without removing it
{:value, {_from, query_name}} = :queue.peek(data.queue)

# Set socket to active mode so we can receive the next response
:ok = setopts_lib(client).setopts(socket, active: :once)
:ok = Transport.setopts(transport, socket, active: :once)

# Send query to node and remain in established_has_agency state
:ok = client.send(socket, build_query_message(query_name))
:ok = Transport.send(transport, socket, build_query_message(query_name))
{:keep_state, data}
end

Expand All @@ -241,16 +238,16 @@ defmodule Xander.Query do
def established_has_agency(
:info,
{_tcp_or_ssl, socket, bytes},
%__MODULE__{client: client, socket: socket} = data
%__MODULE__{transport: transport, socket: socket} = data
) do
# Parse query response (MsgResult)
case Response.parse_response(bytes) do
{:ok, query_response} ->
handle_query_result({:ok, query_response}, data, client, socket)
handle_query_result({:ok, query_response}, data, transport, socket)

{:error, reason} ->
Logger.error("Failed to parse response: #{inspect(reason)}")
handle_query_result({:error, :parse_failed}, data, client, socket)
handle_query_result({:error, :parse_failed}, data, transport, socket)
end
end

Expand All @@ -275,13 +272,13 @@ defmodule Xander.Query do
end
end

defp handle_query_result(result, data, client, socket) do
defp handle_query_result(result, data, transport, socket) do
case :queue.out(data.queue) do
{{:value, {caller, _query_name}}, rest_queue} ->
actions = [{:reply, caller, result}]

if :queue.is_empty(rest_queue) do
:ok = client.send(socket, Messages.msg_release())
:ok = Transport.send(transport, socket, Messages.msg_release())
new_data = %{data | queue: rest_queue}
{:next_state, :established_no_agency, new_data, actions}
else
Expand All @@ -290,38 +287,8 @@ defmodule Xander.Query do
end

{:empty, _} ->
:ok = client.send(socket, Messages.msg_release())
:ok = Transport.send(transport, socket, Messages.msg_release())
{:next_state, :established_no_agency, data, []}
end
end

defp maybe_local_path(path, :socket), do: {:local, path}
defp maybe_local_path(path, _), do: path

defp maybe_local_port(_port, :socket), do: 0
defp maybe_local_port(port, _), do: port

defp maybe_parse_path(path) when is_binary(path) do
uri = URI.parse(path)
~c"#{uri.host}"
end

defp maybe_parse_path(path), do: path

defp tcp_lib(:ssl), do: :ssl
defp tcp_lib(_), do: :gen_tcp

defp tcp_opts(:ssl, path),
do:
@basic_tcp_opts ++
[
verify: :verify_none,
server_name_indication: ~c"#{path}",
secure_renegotiate: true
]

defp tcp_opts(_, _), do: @basic_tcp_opts

defp setopts_lib(:ssl), do: :ssl
defp setopts_lib(_), do: :inet
end
154 changes: 154 additions & 0 deletions lib/xander/transport.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
defmodule Xander.Transport do
@moduledoc """
Abstracts transport layer details for connecting to Cardano nodes.

This module provides a unified interface for both Unix socket connections
(via `:gen_tcp`) and SSL connections (via `:ssl`), eliminating the need
for mini-protocol implementations to handle transport-specific logic.

## Usage

Create a transport from configuration:

config = Xander.Config.default_config!("/path/to/node.socket")
transport = Xander.Transport.new(config)

Then use the transport for all socket operations:

{:ok, socket} = Xander.Transport.connect(transport)
:ok = Xander.Transport.send(transport, socket, data)
{:ok, response} = Xander.Transport.recv(transport, socket, 0, 5_000)
:ok = Xander.Transport.setopts(transport, socket, active: :once)
:ok = Xander.Transport.close(transport, socket)
"""

@basic_opts [:binary, active: false, send_timeout: 4_000]

defstruct [:type, :client, :setopts_module, :path, :port, :connect_opts]

@type t :: %__MODULE__{
type: :socket | :ssl,
client: :gen_tcp | :ssl,
setopts_module: :inet | :ssl,
path: {:local, String.t()} | String.t(),
port: non_neg_integer(),
connect_opts: keyword()
}

@doc """
Creates a new Transport struct from configuration options.

## Options

* `:path` - The socket path (for Unix sockets) or URL (for SSL)
* `:port` - The port number (0 for Unix sockets, typically 9443 for SSL)
* `:type` - Either `:socket` for Unix sockets or `:ssl` for SSL connections

## Examples

# Unix socket
transport = Xander.Transport.new(path: "/tmp/node.socket", port: 0, type: :socket)

# SSL connection
transport = Xander.Transport.new(path: "https://node.demeter.run", port: 9443, type: :ssl)
"""
@spec new(keyword()) :: t()
def new(opts) do
type = Keyword.fetch!(opts, :type)
path = Keyword.fetch!(opts, :path)
port = Keyword.fetch!(opts, :port)

%__MODULE__{
type: type,
client: client_module(type),
setopts_module: setopts_module(type),
path: normalize_path(path, type),
port: normalize_port(port, type),
connect_opts: build_connect_opts(type, path)
}
end

@doc """
Connects to the Cardano node using the transport configuration.

Returns `{:ok, socket}` on success or `{:error, reason}` on failure.
"""
@spec connect(t()) :: {:ok, any()} | {:error, any()}
def connect(%__MODULE__{client: client, path: path, port: port, connect_opts: opts}) do
client.connect(parse_connect_address(path), port, opts)
end

@doc """
Sends data through the socket.

Returns `:ok` on success or `{:error, reason}` on failure.
"""
@spec send(t(), any(), iodata()) :: :ok | {:error, any()}
def send(%__MODULE__{client: client}, socket, data) do
client.send(socket, data)
end

@doc """
Receives data from the socket.

Returns `{:ok, data}` on success or `{:error, reason}` on failure.
"""
@spec recv(t(), any(), non_neg_integer(), timeout()) :: {:ok, binary()} | {:error, any()}
def recv(%__MODULE__{client: client}, socket, length, timeout) do
client.recv(socket, length, timeout)
end

@doc """
Sets socket options.

Returns `:ok` on success or `{:error, reason}` on failure.
"""
@spec setopts(t(), any(), keyword()) :: :ok | {:error, any()}
def setopts(%__MODULE__{setopts_module: setopts_module}, socket, opts) do
setopts_module.setopts(socket, opts)
end

@doc """
Closes the socket connection.

Returns `:ok`.
"""
@spec close(t(), any()) :: :ok
def close(%__MODULE__{client: client}, socket) do
client.close(socket)
end

# Private functions

defp client_module(:ssl), do: :ssl
defp client_module(_), do: :gen_tcp

defp setopts_module(:ssl), do: :ssl
defp setopts_module(_), do: :inet

defp normalize_path(path, :socket), do: {:local, path}
defp normalize_path(path, _), do: path

defp normalize_port(_port, :socket), do: 0
defp normalize_port(port, _), do: port

defp build_connect_opts(:ssl, path) do
@basic_opts ++
[
verify: :verify_none,
server_name_indication: ~c"#{path}",
secure_renegotiate: true
]
end

defp build_connect_opts(_, _), do: @basic_opts

defp parse_connect_address({:local, _path} = local_path), do: local_path

defp parse_connect_address(path) when is_binary(path) do
uri = URI.parse(path)
~c"#{uri.host}"
end

defp parse_connect_address(path), do: path
end
10 changes: 2 additions & 8 deletions test/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -397,21 +397,15 @@ defmodule Xander.Integration.TransactionTest do
IO.puts("Initial query successful - connection established")

# Get the current state to verify we're connected
{_state, %{socket: initial_socket, client: client}} =
{_state, %{socket: initial_socket, transport: transport}} =
:sys.get_state(query_pid)

assert initial_socket != nil
IO.puts("Verified initial connection state")

# Simulate connection loss by closing the socket
# This mimics what happens when the node goes down or network issues occur
case client do
:gen_tcp ->
:gen_tcp.close(initial_socket)

:ssl ->
:ssl.close(initial_socket)
end
Xander.Transport.close(transport, initial_socket)

IO.puts("Simulated connection loss by closing socket")

Expand Down
Loading