1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::io;
use std::net::SocketAddr;

use futures::Future;

use tokio_core::reactor::Handle;
use tokio_core::io::{Framed, Io};
use tokio_core::net::TcpStream;
use tokio_core::io::{Codec, EasyBuf};
use tokio_proto::TcpClient;
use tokio_proto::multiplex::{ClientProto, ClientService, NewRequestIdSource, RequestIdSource};
use tokio_service::Service;

use package::Package;
use codec::PackageCodec;

use uuid::Uuid;

/// `tokio_service::Service` implementation of the client.
pub struct EventStoreClient {
    inner: ClientService<TcpStream, PackageProto>,
}

impl EventStoreClient {
    /// Connect to an EventStore database listening at given `addr` using the given
    /// `tokio::reactor::Core`s `handle`.
    /// Returns a future representing the client which can be used to send and receive `Package`
    /// values.
    pub fn connect(addr: &SocketAddr, handle: &Handle) -> Box<Future<Item = Self, Error = io::Error>> {
        let ret = TcpClient::new(PackageProto)
            .connect(addr, handle)
            .map(|client_service| {
                EventStoreClient { inner: client_service }
            });

        Box::new(ret)
    }
}

impl Service for EventStoreClient {
    type Request = Package;
    type Response = Package;
    type Error = io::Error;
    type Future = Box<Future<Item = Package, Error = io::Error>>;

    fn call(&self, req: Package) -> Self::Future {
        Box::new(self.inner.call(req))
    }
}

/*
/// Simple middleware
struct Heartbeats<T> {
    inner: T,
}

impl<T> Stream for Heartbeats<T>
    where T: Service<Request = Package, Response = Package, Error = io::Error>,
          T::Future: 'static
{
    type Request = Package;
    type Response = Package;
    type Error = io::Error;
    type Future = Box<Future<Item = Package, Error = io::Error>>;

    fn call(&self, req: Package) -> Self::Future {
        if self.credentials.as_ref().is_some() && req.authentication.is_none() {
            req.authentication = self.credentials.clone();
        }

        self.inner.call(req)
    }
}
*/

pub struct Separator;

impl Codec for Separator {
    type In = (Uuid, Package);
    type Out = (Uuid, Package);

    fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>> {
        PackageCodec.decode(buf).map(|x| x.map(|x| (x.correlation_id, x)))
    }

    fn encode(&mut self, msg: (Uuid, Package), buf: &mut Vec<u8>) -> io::Result<()> {
        let (id, pkg) = msg;
        assert_eq!(id, pkg.correlation_id);
        PackageCodec.encode(pkg, buf)
    }
}

impl RequestIdSource<Uuid, Package> for Separator {
    fn next(&mut self, pkg: &Package) -> Uuid {
        pkg.correlation_id
    }
}

impl NewRequestIdSource<Uuid, Package> for Uuid {
    type RequestIdSource = Separator;

    fn requestid_source() -> Self::RequestIdSource {
        Separator
    }
}

struct PackageProto;

impl<T: Io + 'static> ClientProto<T> for PackageProto {
    type Request = Package;
    type Response = Package;
    type RequestId = Uuid;

    type Transport = Framed<T, Separator>;
    type BindTransport = Result<Self::Transport, io::Error>;

    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(io.framed(Separator))
    }
}