Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions helix-container/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use helixdb::helix_gateway::{
gateway::{GatewayOpts, HelixGateway},
router::router::{HandlerFn, HandlerSubmission},
};
use helixdb::helix_transport::tokio_transport::TokioTransport;
use helixdb::helix_runtime::tokio_runtime::TokioRuntime;
use inventory;
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -72,7 +73,7 @@ async fn main() {

println!("Routes: {:?}", routes.keys());
// create gateway
let gateway = HelixGateway::new(
let gateway = HelixGateway::<TokioRuntime, TokioTransport>::new(
&format!("0.0.0.0:{}", port),
graph,
GatewayOpts::DEFAULT_POOL_SIZE,
Expand All @@ -81,8 +82,8 @@ async fn main() {
).await;
// start server
println!("Starting server...");
let a = gateway.connection_handler.accept_conns().await.unwrap();
let b = a.await.unwrap();
let handle = gateway.connection_handler.accept_conns().await.unwrap();
handle.await;

}

30 changes: 13 additions & 17 deletions helixdb/src/helix_gateway/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use tokio::net::TcpListener;
use crate::helix_transport::Transport;
use crate::helix_runtime::AsyncRuntime;

use crate::helix_gateway::{router::router::HelixRouter, thread_pool::thread_pool::ThreadPool};

pub struct ConnectionHandler<R: AsyncRuntime + Clone + Send + Sync + 'static> {
pub struct ConnectionHandler<R: AsyncRuntime + Clone + Send + Sync + 'static, T: Transport> {
pub address: String,
pub active_connections: Arc<Mutex<HashMap<String, ClientConnection>>>,
pub thread_pool: ThreadPool<R>,
pub thread_pool: ThreadPool<R, T>,
pub runtime: R,
}

Expand All @@ -25,7 +25,7 @@ pub struct ClientConnection {
pub addr: SocketAddr,
}

impl<R: AsyncRuntime + Clone + Send + Sync + 'static> ConnectionHandler<R> {
impl<R: AsyncRuntime + Clone + Send + Sync + 'static, T: Transport> ConnectionHandler<R, T> {
pub fn new(
address: &str,
graph: Arc<HelixGraphEngine>,
Expand All @@ -36,37 +36,33 @@ impl<R: AsyncRuntime + Clone + Send + Sync + 'static> ConnectionHandler<R> {
Ok(Self {
address: address.to_string(),
active_connections: Arc::new(Mutex::new(HashMap::new())),
thread_pool: ThreadPool::new(size, graph, Arc::new(router), runtime.clone())?,
thread_pool: ThreadPool::<R, T>::new(size, graph, Arc::new(router), runtime.clone())?,
runtime,
})
}

pub async fn accept_conns(&self) -> Result<<R as AsyncRuntime>::JoinHandle<()>, GraphError> {
// Create a new TcpListener for each accept_conns call
let listener = TcpListener::bind(&self.address).await.map_err(|e| {
eprintln!("Failed to bind to address {}: {}", self.address, e);
GraphError::GraphConnectionError("Failed to bind to address".to_string(), e)
})?;
// Bind transport listener
let listener = T::bind(&self.address)
.await
.map_err(|e| {
eprintln!("Failed to bind to address {}: {}", self.address, e);
GraphError::GraphConnectionError("Failed to bind to address".to_string(), e)
})?;

// Log binding success to stderr since stdout might be buffered

let active_connections = Arc::clone(&self.active_connections);
let thread_pool_sender = self.thread_pool.sender.clone();
let address = self.address.clone();


let runtime = self.runtime.clone();
let handle = runtime.spawn(async move {

loop {
match listener.accept().await {
match T::accept(&listener).await {
Ok((stream, addr)) => {

// Configure TCP stream
if let Err(e) = stream.set_nodelay(true) {
eprintln!("Failed to set TCP_NODELAY: {}", e);
}

// Create a client connection record
let client_id = Uuid::new_v4().to_string();
let client = ClientConnection {
Expand Down
11 changes: 6 additions & 5 deletions helixdb/src/helix_gateway/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::HashMap, sync::Arc};

use super::connection::connection::ConnectionHandler;
use crate::helix_transport::Transport;
use crate::helix_runtime::AsyncRuntime;
use crate::helix_engine::graph_core::graph_core::HelixGraphEngine;
use super::router::router::{HandlerFn, HelixRouter};
Expand All @@ -11,21 +12,21 @@ impl GatewayOpts {
pub const DEFAULT_POOL_SIZE: usize = 1024;
}

pub struct HelixGateway<R: AsyncRuntime + Clone + Send + Sync + 'static> {
pub connection_handler: ConnectionHandler<R>,
pub struct HelixGateway<R: AsyncRuntime + Clone + Send + Sync + 'static, T: Transport> {
pub connection_handler: ConnectionHandler<R, T>,
pub runtime: R,
}

impl<R: AsyncRuntime + Clone + Send + Sync + 'static> HelixGateway<R> {
impl<R: AsyncRuntime + Clone + Send + Sync + 'static, T: Transport> HelixGateway<R, T> {
pub async fn new(
address: &str,
graph: Arc<HelixGraphEngine>,
size: usize,
routes: Option<HashMap<(String, String), HandlerFn>>,
runtime: R,
) -> HelixGateway<R> {
) -> HelixGateway<R, T> {
let router = HelixRouter::new(routes);
let connection_handler = ConnectionHandler::new(address, graph, size, router, runtime.clone()).unwrap();
let connection_handler = ConnectionHandler::<R, T>::new(address, graph, size, router, runtime.clone()).unwrap();
println!("Gateway created");
HelixGateway { connection_handler, runtime }
}
Expand Down
25 changes: 13 additions & 12 deletions helixdb/src/helix_gateway/thread_pool/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,23 @@ use crate::protocol::response::Response;

extern crate tokio;

use tokio::net::TcpStream;
use crate::helix_transport::Transport;

pub struct Worker<R: AsyncRuntime + Clone + Send + Sync + 'static> {
pub struct Worker<R: AsyncRuntime + Clone + Send + Sync + 'static, T: Transport> {
pub id: usize,
pub handle: <R as AsyncRuntime>::JoinHandle<()>,
pub runtime: R,
_marker: std::marker::PhantomData<T>,
}

impl<R: AsyncRuntime + Clone + Send + Sync + 'static> Worker<R> {
impl<R: AsyncRuntime + Clone + Send + Sync + 'static, T: Transport> Worker<R, T> {
fn new(
id: usize,
graph_access: Arc<HelixGraphEngine>,
router: Arc<HelixRouter>,
rx: Receiver<TcpStream>,
rx: Receiver<T::Stream>,
runtime: R,
) -> Worker<R> {
) -> Worker<R, T> {
let handle = runtime.spawn(async move {
loop {
let mut conn = match rx.recv_async().await {
Expand Down Expand Up @@ -68,31 +69,31 @@ impl<R: AsyncRuntime + Clone + Send + Sync + 'static> Worker<R> {
}
});

Worker { id, handle, runtime }
Worker { id, handle, runtime, _marker: std::marker::PhantomData }
}
}

pub struct ThreadPool<R: AsyncRuntime + Clone + Send + Sync + 'static> {
pub sender: Sender<TcpStream>,
pub struct ThreadPool<R: AsyncRuntime + Clone + Send + Sync + 'static, T: Transport> {
pub sender: Sender<T::Stream>,
pub num_unused_workers: Mutex<usize>,
pub num_used_workers: Mutex<usize>,
pub workers: Vec<Worker<R>>,
pub workers: Vec<Worker<R, T>>,
pub runtime: R,
}
impl<R: AsyncRuntime + Clone + Send + Sync + 'static> ThreadPool<R> {
impl<R: AsyncRuntime + Clone + Send + Sync + 'static, T: Transport> ThreadPool<R, T> {
pub fn new(
size: usize,
graph: Arc<HelixGraphEngine>,
router: Arc<HelixRouter>,
runtime: R,
) -> Result<ThreadPool<R>, RouterError> {
) -> Result<ThreadPool<R, T>, RouterError> {
assert!(
size > 0,
"Expected number of threads in thread pool to be more than 0, got {}",
size
);

let (tx, rx) = flume::unbounded::<TcpStream>();
let (tx, rx) = flume::unbounded::<T::Stream>();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&graph), Arc::clone(&router), rx.clone(), runtime.clone()));
Expand Down
4 changes: 3 additions & 1 deletion helixdb/src/helix_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use std::pin::Pin;
/// Production code uses a Tokio-backed implementation while tests can
/// provide deterministic schedulers by implementing this trait.
pub trait AsyncRuntime {
type JoinHandle<T>: Future<Output = T> + Send + 'static;
type JoinHandle<T>: Future<Output = T> + Send + 'static
where
T: Send + 'static;

/// Spawn a future onto the runtime.
fn spawn<F, T>(&self, fut: F) -> Self::JoinHandle<T>
Expand Down
23 changes: 21 additions & 2 deletions helixdb/src/helix_runtime/tokio_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,43 @@
use super::AsyncRuntime;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

/// Tokio based implementation of [`AsyncRuntime`].
#[derive(Clone, Default)]
pub struct TokioRuntime;

impl AsyncRuntime for TokioRuntime {
type JoinHandle<T> = tokio::task::JoinHandle<T>;
type JoinHandle<T> = TokioJoinHandle<T>
where
T: Send + 'static;

fn spawn<F, T>(&self, fut: F) -> Self::JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
tokio::spawn(fut)
TokioJoinHandle(tokio::spawn(fut))
}

fn sleep(&self, dur: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(tokio::time::sleep(dur))
}
}

/// Wrapper around Tokio's [`JoinHandle`] that unwraps the result.
pub struct TokioJoinHandle<T>(tokio::task::JoinHandle<T>);

impl<T> Future for TokioJoinHandle<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
match inner.poll(cx) {
Poll::Ready(Ok(val)) => Poll::Ready(val),
Poll::Ready(Err(err)) => panic!("Join error: {}", err),
Poll::Pending => Poll::Pending,
}
}
}
25 changes: 25 additions & 0 deletions helixdb/src/helix_transport/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
pub mod tokio_transport;

use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncWrite};

/// Abstraction over network transport for HelixDB.
///
/// The transport trait allows the gateway to be decoupled from a
/// concrete networking stack so that simulation tests can provide a
/// deterministic in-memory transport.
pub trait Transport {
type Listener: Send + Sync + 'static;
type Stream: AsyncRead + AsyncWrite + Unpin + Send + 'static;

/// Bind a listener to the provided address.
fn bind<'a>(addr: &'a str) -> Pin<Box<dyn Future<Output = std::io::Result<Self::Listener>> + Send + 'a>>;

/// Accept the next incoming connection from a listener.
fn accept<'a>(listener: &'a Self::Listener) -> Pin<Box<dyn Future<Output = std::io::Result<(Self::Stream, SocketAddr)>> + Send + 'a>>;

/// Connect to a remote address returning a stream.
fn connect<'a>(addr: &'a str) -> Pin<Box<dyn Future<Output = std::io::Result<Self::Stream>> + Send + 'a>>;
}
27 changes: 27 additions & 0 deletions helixdb/src/helix_transport/tokio_transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use super::Transport;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};

/// Tokio based transport implementation using TCP sockets.
#[derive(Clone, Default)]
pub struct TokioTransport;

impl Transport for TokioTransport {
type Listener = TcpListener;
type Stream = TcpStream;

fn bind<'a>(addr: &'a str) -> Pin<Box<dyn Future<Output = std::io::Result<Self::Listener>> + Send + 'a>> {
Box::pin(async move { TcpListener::bind(addr).await })
}

fn accept<'a>(listener: &'a Self::Listener) -> Pin<Box<dyn Future<Output = std::io::Result<(Self::Stream, SocketAddr)>> + Send + 'a>> {
Box::pin(async move { listener.accept().await })
}

fn connect<'a>(addr: &'a str) -> Pin<Box<dyn Future<Output = std::io::Result<Self::Stream>> + Send + 'a>> {
Box::pin(async move { TcpStream::connect(addr).await })
}
}
1 change: 1 addition & 0 deletions helixdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub mod helixc;
pub mod ingestion_engine;
pub mod protocol;
pub mod helix_runtime;
pub mod helix_transport;