Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
[package]
name = "jsonrpc-ws"
name = "jsonrpc"
version = "0.1.0"
authors = ["tiannian <dtiannian@aliyun.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
name = "jsonrpc"
path = "src/lib.rs"

[workspace]
members = [
".",
"jsonrpc-websocket",
]

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
jsonrpc-lite = "0.5.0"
fxhash = "0.2.1"
futures-util = "0.3.5"

[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }
28 changes: 28 additions & 0 deletions jsonrpc-websocket/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "jsonrpc-websocket"
version = "0.1.0"
authors = ["xujian <s1473561555@sina.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
name = "jsonrpc_websocket"
path = "src/lib.rs"


[dependencies]
tokio = { version = "0.2", features = ["full"] }
chrono = "0.4.11"
tokio-tungstenite = "0.10.1"
futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] }
url = "2.0.0"
jsonrpc-lite = "0.5.0"
jsonrpc = { path = "../../jsonrpc-ws" }
log = "0.4.8"


[dev-dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
env_logger = "0.7"
168 changes: 168 additions & 0 deletions jsonrpc-websocket/examples/example_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use std::env;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::TcpStream;
use tokio::time;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
use url::Url;

static LOCAL_SERVER: &'static str = "ws://127.0.0.1:9000";

const RECONN_INTERVAL: u64 = 3000;

struct WebSockWriteHalf(pub Option<SplitSink<WebSocketStream<TcpStream>, Message>>);
struct WebSockReadHalf(pub Option<SplitStream<WebSocketStream<TcpStream>>>);

async fn set_conn_none(
lock_ws_receiver: Arc<Mutex<WebSockReadHalf>>,
lock_ws_sender: Arc<Mutex<WebSockWriteHalf>>,
) -> bool {
let mut ws_receiver = lock_ws_receiver.lock().unwrap();
let mut ws_sender = lock_ws_sender.lock().unwrap();

ws_receiver.0 = None;
ws_sender.0 = None;
return true;
}

async fn client_check_conn(
case_url: Url,
lock_ws_receiver: Arc<Mutex<WebSockReadHalf>>,
lock_ws_sender: Arc<Mutex<WebSockWriteHalf>>,
) -> bool {
let ws_receiver = lock_ws_receiver.lock().unwrap();

if let None = ws_receiver.0 {
drop(ws_receiver);

if let Ok((ws_stream, _)) = connect_async(case_url).await {
let (sender, receiver) = ws_stream.split();
let mut ws_receiver = lock_ws_receiver.lock().unwrap();
let mut ws_sender = lock_ws_sender.lock().unwrap();

ws_sender.0 = Some(sender);
ws_receiver.0 = Some(receiver);
log::info!("connect success");
return true;
} else {
log::info!("connect fail, reconning ...");
return false;
}
}
return true;
}

async fn receiver_loop(
case_url: Url,
lock_ws_receiver: Arc<Mutex<WebSockReadHalf>>,
lock_ws_sender: Arc<Mutex<WebSockWriteHalf>>,
) {
loop {
let mut ws_receiver = lock_ws_receiver.lock().unwrap();

let result: Result<String, bool> = match &mut ws_receiver.0 {
Some(ws_receiver) => match ws_receiver.next().await {
Some(Ok(msg)) => {
if msg.is_text() {
Ok(msg.into_text().unwrap())
} else {
log::warn!("Peer receive data format error, not text");
Err(false)
}
}
Some(Err(_)) => {
log::warn!("server close connect");
Err(true)
}
None => Err(true),
},
None => Err(true),
};
drop(ws_receiver);

match result {
Ok(msg) => {
println!("resp: {}", msg);
}
Err(is_reconn) => {
if is_reconn {
set_conn_none(lock_ws_receiver.clone(), lock_ws_sender.clone()).await;
if client_check_conn(
case_url.clone(),
lock_ws_receiver.clone(),
lock_ws_sender.clone(),
)
.await
{
log::info!("re_conn: {}", case_url);
continue;
} else {
time::delay_for(Duration::from_millis(RECONN_INTERVAL)).await;
}
}
}
}
}
}

async fn ws_send(str_cmd: String, lock_ws_sender: Arc<Mutex<WebSockWriteHalf>>) {
let mut ws_sender = match lock_ws_sender.try_lock() {
Ok(ws_sender) => ws_sender,
Err(_) => {
time::delay_for(Duration::from_millis(100)).await;

log::warn!("ws_stream close, skip send");
return;
}
};

if let Some(ws_sender) = &mut ws_sender.0 {
if let Err(err) = ws_sender.send(Message::Text(str_cmd)).await {
log::warn!("ws_stream send failed with err: {}", err);
}
} else {
log::warn!("ws_stream close, skip send");
}
}

#[tokio::main]
async fn main() {
use env_logger::Env;
env_logger::from_env(Env::default().default_filter_or("warn")).init();

let connect_transport = env::args()
.nth(1)
.unwrap_or_else(|| LOCAL_SERVER.to_string());

let case_url = Url::parse(&connect_transport).expect("Bad testcase URL");

let lock_ws_receiver = Arc::new(Mutex::new(WebSockReadHalf(None)));
let lock_ws_sender = Arc::new(Mutex::new(WebSockWriteHalf(None)));

let local = tokio::task::LocalSet::new();
local
.run_until(async move {
tokio::task::spawn_local(receiver_loop(
case_url,
lock_ws_receiver.clone(),
lock_ws_sender.clone(),
));

let mut reader = BufReader::new(tokio::io::stdin());
loop {
let mut str_cmd = String::new();
reader.read_line(&mut str_cmd).await.unwrap();
str_cmd.pop();

ws_send(str_cmd, lock_ws_sender.clone()).await;
}
})
.await;
}
105 changes: 105 additions & 0 deletions jsonrpc-websocket/examples/example_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
extern crate jsonrpc_websocket;

use jsonrpc::route::Route;
use jsonrpc::Data;
use jsonrpc_lite::Error as JsonRpcError;
use jsonrpc_websocket::WsServer;
use serde::{Deserialize, Serialize};
use std::env;
use std::sync::{Arc, RwLock};

#[derive(Serialize)]
pub enum ExampleError {
// websock 错误
ParamIsNone,
}

impl Into<JsonRpcError> for ExampleError {
fn into(self) -> JsonRpcError {
let (code, message) = match self {
ExampleError::ParamIsNone => (1000i64, "Param is none"),
};

JsonRpcError {
code,
message: message.to_string(),
data: None,
}
}
}

#[derive(Serialize, Deserialize)]
pub struct CurrencyDetail {
value: u64,
id: String,
dcds: String,
locked: bool,
owner: String,
}

pub struct CurrencyStore {}
impl CurrencyStore {
pub fn get_detail_by_ids(
&self,
req: GetDetailParam,
) -> Result<Vec<CurrencyDetail>, ExampleError> {
if req.ids.len() == 0 {
return Err(ExampleError::ParamIsNone);
}
Ok(Vec::<CurrencyDetail>::new())
}
}

#[derive(Serialize, Deserialize)]
pub struct GetDetailParam {
ids: Vec<String>,
}

pub async fn get_detail_by_ids(
wallet: Data<TSystem>,
req: GetDetailParam,
) -> Result<Vec<CurrencyDetail>, ExampleError> {
let store = wallet.get_ref().store.try_read().unwrap();
store.get_detail_by_ids(req)
}

pub struct TSystem {
pub store: RwLock<CurrencyStore>,
}

impl TSystem {
pub fn new() -> Self {
Self {
store: RwLock::new(CurrencyStore {}),
}
}
}

pub async fn start_ws_server(bind_transport: String) {
let route: Arc<Route> = Arc::new(
Route::new()
.data(TSystem::new())
.to("currency.ids.detail".to_string(), get_detail_by_ids),
);

let ws_server = match WsServer::bind(bind_transport).await {
Ok(ws_server) => ws_server,
Err(err) => panic!("{}", err),
};

ws_server.listen_loop(route).await;
}

static LOCAL_SERVER: &'static str = "127.0.0.1:9000";

#[tokio::main]
async fn main() {
use env_logger::Env;
env_logger::from_env(Env::default().default_filter_or("warn")).init();

let bind_transport = env::args()
.nth(1)
.unwrap_or_else(|| LOCAL_SERVER.to_string());

start_ws_server(bind_transport).await;
}
2 changes: 2 additions & 0 deletions jsonrpc-websocket/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod server;
pub use server::WsServer;
Loading