|
1 | | -//! JSON-RPC client implementation. |
2 | | -#![deny(missing_docs)] |
3 | | - |
4 | | -use failure::{format_err, Fail}; |
5 | | -use futures::sync::{mpsc, oneshot}; |
6 | | -use futures::{future, prelude::*}; |
7 | | -use jsonrpc_core::{Error, Params}; |
8 | | -use serde::de::DeserializeOwned; |
9 | | -use serde::Serialize; |
10 | | -use serde_json::Value; |
11 | | - |
12 | | -pub mod transports; |
13 | | - |
14 | | -#[cfg(test)] |
15 | | -mod logger; |
16 | | - |
17 | | -/// The errors returned by the client. |
18 | | -#[derive(Debug, Fail)] |
19 | | -pub enum RpcError { |
20 | | - /// An error returned by the server. |
21 | | - #[fail(display = "Server returned rpc error {}", _0)] |
22 | | - JsonRpcError(Error), |
23 | | - /// Failure to parse server response. |
24 | | - #[fail(display = "Failed to parse server response as {}: {}", _0, _1)] |
25 | | - ParseError(String, failure::Error), |
26 | | - /// Request timed out. |
27 | | - #[fail(display = "Request timed out")] |
28 | | - Timeout, |
29 | | - /// The server returned a response with an unknown id. |
30 | | - #[fail(display = "Server returned a response with an unknown id")] |
31 | | - UnknownId, |
32 | | - /// Not rpc specific errors. |
33 | | - #[fail(display = "{}", _0)] |
34 | | - Other(failure::Error), |
35 | | -} |
36 | | - |
37 | | -impl From<Error> for RpcError { |
38 | | - fn from(error: Error) -> Self { |
39 | | - RpcError::JsonRpcError(error) |
40 | | - } |
41 | | -} |
42 | | - |
43 | | -/// A message sent to the `RpcClient`. This is public so that |
44 | | -/// the derive crate can generate a client. |
45 | | -struct RpcMessage { |
46 | | - /// The rpc method name. |
47 | | - method: String, |
48 | | - /// The rpc method parameters. |
49 | | - params: Params, |
50 | | - /// The oneshot channel to send the result of the rpc |
51 | | - /// call to. |
52 | | - sender: oneshot::Sender<Result<Value, RpcError>>, |
53 | | -} |
54 | | - |
55 | | -/// A channel to a `RpcClient`. |
56 | | -#[derive(Clone)] |
57 | | -pub struct RpcChannel(mpsc::Sender<RpcMessage>); |
58 | | - |
59 | | -impl RpcChannel { |
60 | | - fn send( |
61 | | - &self, |
62 | | - msg: RpcMessage, |
63 | | - ) -> impl Future<Item = mpsc::Sender<RpcMessage>, Error = mpsc::SendError<RpcMessage>> { |
64 | | - self.0.to_owned().send(msg) |
65 | | - } |
66 | | -} |
67 | | - |
68 | | -impl From<mpsc::Sender<RpcMessage>> for RpcChannel { |
69 | | - fn from(sender: mpsc::Sender<RpcMessage>) -> Self { |
70 | | - RpcChannel(sender) |
71 | | - } |
72 | | -} |
73 | | - |
74 | | -/// The future returned by the rpc call. |
75 | | -pub struct RpcFuture { |
76 | | - recv: oneshot::Receiver<Result<Value, RpcError>>, |
77 | | -} |
78 | | - |
79 | | -impl RpcFuture { |
80 | | - /// Creates a new `RpcFuture`. |
81 | | - pub fn new(recv: oneshot::Receiver<Result<Value, RpcError>>) -> Self { |
82 | | - RpcFuture { recv } |
83 | | - } |
84 | | -} |
85 | | - |
86 | | -impl Future for RpcFuture { |
87 | | - type Item = Value; |
88 | | - type Error = RpcError; |
89 | | - |
90 | | - fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> { |
91 | | - // TODO should timeout (#410) |
92 | | - match self.recv.poll() { |
93 | | - Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)), |
94 | | - Ok(Async::Ready(Err(error))) => Err(error), |
95 | | - Ok(Async::NotReady) => Ok(Async::NotReady), |
96 | | - Err(error) => Err(RpcError::Other(error.into())), |
97 | | - } |
98 | | - } |
99 | | -} |
100 | | - |
101 | | -/// Client for raw JSON RPC requests |
102 | | -#[derive(Clone)] |
103 | | -pub struct RawClient(RpcChannel); |
104 | | - |
105 | | -impl From<RpcChannel> for RawClient { |
106 | | - fn from(channel: RpcChannel) -> Self { |
107 | | - RawClient(channel) |
108 | | - } |
109 | | -} |
110 | | - |
111 | | -impl RawClient { |
112 | | - /// Call RPC with raw JSON |
113 | | - pub fn call_method(&self, method: &str, params: Params) -> impl Future<Item = Value, Error = RpcError> { |
114 | | - let (sender, receiver) = oneshot::channel(); |
115 | | - let msg = RpcMessage { |
116 | | - method: method.into(), |
117 | | - params, |
118 | | - sender, |
119 | | - }; |
120 | | - self.0 |
121 | | - .send(msg) |
122 | | - .map_err(|error| RpcError::Other(error.into())) |
123 | | - .and_then(|_| RpcFuture::new(receiver)) |
124 | | - } |
125 | | -} |
126 | | - |
127 | | -/// Client for typed JSON RPC requests |
128 | | -#[derive(Clone)] |
129 | | -pub struct TypedClient(RawClient); |
130 | | - |
131 | | -impl From<RpcChannel> for TypedClient { |
132 | | - fn from(channel: RpcChannel) -> Self { |
133 | | - TypedClient(channel.into()) |
134 | | - } |
135 | | -} |
136 | | - |
137 | | -impl TypedClient { |
138 | | - /// Create new TypedClient |
139 | | - pub fn new(raw_cli: RawClient) -> Self { |
140 | | - TypedClient(raw_cli) |
141 | | - } |
142 | | - |
143 | | - /// Call RPC with serialization of request and deserialization of response |
144 | | - pub fn call_method<T: Serialize, R: DeserializeOwned + 'static>( |
145 | | - &self, |
146 | | - method: &str, |
147 | | - returns: &'static str, |
148 | | - args: T, |
149 | | - ) -> impl Future<Item = R, Error = RpcError> { |
150 | | - let args = |
151 | | - serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC"); |
152 | | - let params = match args { |
153 | | - Value::Array(vec) => Params::Array(vec), |
154 | | - Value::Null => Params::None, |
155 | | - _ => { |
156 | | - return future::Either::A(future::err(RpcError::Other(format_err!( |
157 | | - "RPC params should serialize to a JSON array, or null" |
158 | | - )))) |
159 | | - } |
160 | | - }; |
161 | | - |
162 | | - future::Either::B(self.0.call_method(method, params).and_then(move |value: Value| { |
163 | | - log::debug!("response: {:?}", value); |
164 | | - let result = |
165 | | - serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns.into(), error.into())); |
166 | | - future::done(result) |
167 | | - })) |
168 | | - } |
169 | | -} |
170 | | - |
171 | | -#[cfg(test)] |
172 | | -mod tests { |
173 | | - use super::*; |
174 | | - use crate::transports::local; |
175 | | - use crate::{RpcChannel, RpcError, TypedClient}; |
176 | | - use jsonrpc_core::{self, IoHandler}; |
177 | | - |
178 | | - #[derive(Clone)] |
179 | | - struct AddClient(TypedClient); |
180 | | - |
181 | | - impl From<RpcChannel> for AddClient { |
182 | | - fn from(channel: RpcChannel) -> Self { |
183 | | - AddClient(channel.into()) |
184 | | - } |
185 | | - } |
186 | | - |
187 | | - impl AddClient { |
188 | | - fn add(&self, a: u64, b: u64) -> impl Future<Item = u64, Error = RpcError> { |
189 | | - self.0.call_method("add", "u64", (a, b)) |
190 | | - } |
191 | | - } |
192 | | - |
193 | | - #[test] |
194 | | - fn test_client_terminates() { |
195 | | - let mut handler = IoHandler::new(); |
196 | | - handler.add_method("add", |params: Params| { |
197 | | - let (a, b) = params.parse::<(u64, u64)>()?; |
198 | | - let res = a + b; |
199 | | - Ok(jsonrpc_core::to_value(res).unwrap()) |
200 | | - }); |
201 | | - |
202 | | - let (client, rpc_client) = local::connect::<AddClient, _, _>(handler); |
203 | | - let fut = client |
204 | | - .clone() |
205 | | - .add(3, 4) |
206 | | - .and_then(move |res| client.add(res, 5)) |
207 | | - .join(rpc_client) |
208 | | - .map(|(res, ())| { |
209 | | - assert_eq!(res, 12); |
210 | | - }) |
211 | | - .map_err(|err| { |
212 | | - eprintln!("{:?}", err); |
213 | | - assert!(false); |
214 | | - }); |
215 | | - tokio::run(fut); |
216 | | - } |
217 | | -} |
| 1 | +//! JSON-RPC client implementation primitives. |
| 2 | +//! |
| 3 | +//! By default this crate does not implement any transports, |
| 4 | +//! use corresponding features (`tls`, `http` or `ws`) to opt-in for them. |
| 5 | +//! |
| 6 | +//! See documentation of [`jsonrpc-client-transports`](../jsonrpc_client_transports/) for more details. |
| 7 | +
|
| 8 | +pub use jsonrpc_client_transports::*; |
0 commit comments