Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions lib/xtb_client/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
defmodule XtbClient.Connection do
@moduledoc """
Module for handling connection to XTB Api.

`Connection` module is responsible for handling connection to XTB Api.
It connects to the main socket and streaming socket and provides
functions for sending and receiving messages.
"""
use GenServer

alias XtbClient.MainSocket
alias XtbClient.MainSocket.Config, as: MainSocketConfig
alias XtbClient.Messages
alias XtbClient.StreamingSocket

import XtbClient.Messages

defmodule State do

Check warning on line 18 in lib/xtb_client/connection.ex

View workflow job for this annotation

GitHub Actions / Build and test

Modules should have a @moduledoc tag.
defstruct mpid: nil,
spid: nil
end

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(args) do
{main_socket_config, opts} = Keyword.split(args, MainSocketConfig.keys())
{stream_session_config, opts} = Keyword.split(opts, [:module])

GenServer.start_link(
__MODULE__,
%{main: main_socket_config, streaming: stream_session_config},
opts
)
end

@impl GenServer
def init(args) do
with %{main: main_socket_config, streaming: stream_session_config} <- args,
{:ok, mpid} <- MainSocket.start_link(main_socket_config),
{:ok, stream_session_id} <- MainSocket.stream_session_id(mpid),
stream_session_config <-
Keyword.merge(stream_session_config, stream_session_id: stream_session_id),
stream_session_config <- Keyword.merge(main_socket_config, stream_session_config),
{:ok, spid} <-
StreamingSocket.start_link(stream_session_config) do
Process.flag(:trap_exit, true)

state = %State{
mpid: mpid,
spid: spid
}

{:ok, state}
else
{:error, reason} -> {:stop, reason}
end
end

@doc """
Sends a synchronous message to the main socket.
"""
@spec sync_call(GenServer.server(), Messages.sync_message()) ::
{:ok, struct()} | {:error, term()}
def sync_call(server, %struct{} = query) when is_sync_message(struct) do
GenServer.call(server, {:sync_call, query})
end

@spec subscribe(GenServer.server(), Messages.streaming_message()) ::
{:ok, String.t()} | {:error, term()}
def subscribe(server, %struct{} = command) when is_streaming_message(struct) do
GenServer.call(server, {:subscribe, command})
end

@impl true
def handle_call(
{:sync_call, query},
_from,
%State{mpid: mpid} = state
) do
result = MainSocket.handle_query(mpid, query)
{:reply, result, state}
end

def handle_call({:subscribe, command}, _from, %State{spid: spid} = state) do
result = StreamingSocket.subscribe(spid, command)
{:reply, result, state}
end

@impl true
def handle_info({:EXIT, _pid, _reason}, state) do
{:stop, :shutdown, state}
end
end
Loading
Loading