1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::io;
use std::net::SocketAddr;
use futures::Future;
use tokio_core::reactor::Handle;
use tokio_core::io::{Framed, Io};
use tokio_core::net::TcpStream;
use tokio_core::io::{Codec, EasyBuf};
use tokio_proto::TcpClient;
use tokio_proto::multiplex::{ClientProto, ClientService, NewRequestIdSource, RequestIdSource};
use tokio_service::Service;
use package::Package;
use codec::PackageCodec;
use uuid::Uuid;
pub struct EventStoreClient {
inner: ClientService<TcpStream, PackageProto>,
}
impl EventStoreClient {
pub fn connect(addr: &SocketAddr, handle: &Handle) -> Box<Future<Item = Self, Error = io::Error>> {
let ret = TcpClient::new(PackageProto)
.connect(addr, handle)
.map(|client_service| {
EventStoreClient { inner: client_service }
});
Box::new(ret)
}
}
impl Service for EventStoreClient {
type Request = Package;
type Response = Package;
type Error = io::Error;
type Future = Box<Future<Item = Package, Error = io::Error>>;
fn call(&self, req: Package) -> Self::Future {
Box::new(self.inner.call(req))
}
}
pub struct Separator;
impl Codec for Separator {
type In = (Uuid, Package);
type Out = (Uuid, Package);
fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>> {
PackageCodec.decode(buf).map(|x| x.map(|x| (x.correlation_id, x)))
}
fn encode(&mut self, msg: (Uuid, Package), buf: &mut Vec<u8>) -> io::Result<()> {
let (id, pkg) = msg;
assert_eq!(id, pkg.correlation_id);
PackageCodec.encode(pkg, buf)
}
}
impl RequestIdSource<Uuid, Package> for Separator {
fn next(&mut self, pkg: &Package) -> Uuid {
pkg.correlation_id
}
}
impl NewRequestIdSource<Uuid, Package> for Uuid {
type RequestIdSource = Separator;
fn requestid_source() -> Self::RequestIdSource {
Separator
}
}
struct PackageProto;
impl<T: Io + 'static> ClientProto<T> for PackageProto {
type Request = Package;
type Response = Package;
type RequestId = Uuid;
type Transport = Framed<T, Separator>;
type BindTransport = Result<Self::Transport, io::Error>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
Ok(io.framed(Separator))
}
}