1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::collections::HashSet;
use std::rc::Rc;
use std::sync::mpsc::Sender;
use core::{EndpointId, Message};
use core::socket::{Protocol, Reply};
use core::endpoint::Pipe;
use core::context::{Context, Event};
use super::pipes::PipeCollection;
use super::{Timeout, PUB, SUB};
use super::policy::broadcast;
use io_error::*;
pub struct Pub {
reply_tx: Sender<Reply>,
pipes: PipeCollection,
bc: HashSet<EndpointId>
}
impl From<Sender<Reply>> for Pub {
fn from(tx: Sender<Reply>) -> Pub {
Pub {
reply_tx: tx,
pipes: PipeCollection::new(),
bc: HashSet::new()
}
}
}
impl Protocol for Pub {
fn id(&self) -> u16 { PUB }
fn peer_id(&self) -> u16 { SUB }
fn add_pipe(&mut self, _: &mut Context, eid: EndpointId, pipe: Pipe) {
self.pipes.insert(eid, pipe);
}
fn remove_pipe(&mut self, ctx: &mut Context, eid: EndpointId) -> Option<Pipe> {
self.bc.remove(&eid);
if self.bc.is_empty() {
ctx.raise(Event::CanSend(false));
}
self.pipes.remove(&eid)
}
fn send(&mut self, ctx: &mut Context, msg: Message, timeout: Timeout) {
let msg = Rc::new(msg);
broadcast::send_to_all(&mut self.bc, &mut self.pipes, ctx, msg);
ctx.raise(Event::CanSend(false));
let _ = self.reply_tx.send(Reply::Send);
if let Some(sched) = timeout {
ctx.cancel(sched);
}
}
fn on_send_ack(&mut self, _: &mut Context, _: EndpointId) {
}
fn on_send_timeout(&mut self, _: &mut Context) {
}
fn on_send_ready(&mut self, ctx: &mut Context, eid: EndpointId) {
if self.bc.is_empty() {
ctx.raise(Event::CanSend(true));
}
self.bc.insert(eid);
}
fn on_send_not_ready(&mut self, ctx: &mut Context, eid: EndpointId) {
let was_empty = self.bc.is_empty();
self.bc.remove(&eid);
let is_empty = self.bc.is_empty();
if was_empty ^ is_empty {
ctx.raise(Event::CanSend(false));
}
}
fn recv(&mut self, ctx: &mut Context, timeout: Timeout) {
let error = other_io_error("Recv is not supported by pub protocol");
let _ = self.reply_tx.send(Reply::Err(error));
if let Some(sched) = timeout {
ctx.cancel(sched);
}
}
fn on_recv_ack(&mut self, _: &mut Context, _: EndpointId, _: Message) {
}
fn on_recv_timeout(&mut self, _: &mut Context) {
}
fn on_recv_ready(&mut self, _: &mut Context, _: EndpointId) {
}
fn on_recv_not_ready(&mut self, _: &mut Context, _: EndpointId) {
}
fn is_send_ready(&self) -> bool {
!self.bc.is_empty()
}
fn is_recv_ready(&self) -> bool {
false
}
fn close(&mut self, ctx: &mut Context) {
self.pipes.close_all(ctx)
}
}