1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
//! Messaging Function Set
//!
//! This module is primarily an implementation of a Schedule for TextMessage events.
//!

use std::{
    sync::{atomic::AtomicI64, Arc},
    time::Duration,
};

use sep2_common::packages::{
    identification::ResponseStatus,
    messaging::{MessagingProgram, TextMessage},
    objects::EventStatusType as EventStatus,
    types::MRIDType,
};
use tokio::sync::{broadcast::Receiver, RwLock};

use crate::{
    client::{Client, SEPResponse},
    device::SEDevice,
    event::{EIStatus, EventCallback, EventInstance, Events, Schedule, Scheduler},
};

// Messaging Function Set
impl Schedule<TextMessage> {
    async fn msg_start_task(self, mut rx: Receiver<()>) {
        loop {
            tokio::select! {
                _ = tokio::time::sleep(self.tickrate) => (),
                _ = rx.recv() => {
                    log::info!("TextMessageSchedule: Shutting down event start task...");
                    break
                },
            }
            let mut events = self.events.write().await;
            let mrid = match events.next_start() {
                Some((time, mrid)) if time < self.schedule_time().into() => mrid,
                _ => continue,
            };

            events.update_event(&mrid, EIStatus::Active);

            let events = events.downgrade();
            let target = events.get(&mrid).unwrap();
            let resp = (self.handler)(target).await;
            self.auto_msg_response(target.event(), resp).await;
        }
    }

    async fn msg_end_task(self, mut rx: Receiver<()>) {
        loop {
            tokio::select! {
                _ = tokio::time::sleep(self.tickrate) => (),
                _ = rx.recv() => {
                    log::info!("TextMessageSchedule: Shutting down event end task...");
                    break
                },
            }
            let mut events = self.events.write().await;
            let mrid = match events.next_end() {
                Some((time, mrid)) if time < self.schedule_time().into() => mrid,
                _ => continue,
            };

            events.update_event(&mrid, EIStatus::Complete);

            let events = events.downgrade();
            let target = events.get(&mrid).unwrap();
            let resp = (self.handler)(target).await;
            self.auto_msg_response(target.event(), resp).await;
        }
    }

    /// Cancel an [`EventInstance<TextMessage>`] that has been previously added to the schedule.
    ///
    /// Update the internal [`EventInstance<TextMessage`] state.
    async fn cancel_textmessage(
        &mut self,
        target_mrid: &MRIDType,
        current_status: EIStatus,
        cancel_reason: EIStatus,
    ) {
        let mut events = self.events.write().await;
        events.cancel_event(target_mrid, cancel_reason, self.schedule_time().into());

        let events = events.downgrade();
        let ei = events.get(target_mrid).unwrap();
        let resp = if current_status == EIStatus::Active {
            // If the event was active, let the client know it is over
            (self.handler)(ei).await
        } else {
            // If it's not active, the client doesn't even know about this event
            ResponseStatus::EventCancelled
        };
        self.auto_msg_response(ei.event(), resp).await;
    }

    async fn auto_msg_response(&self, event: &TextMessage, status: ResponseStatus) {
        match self
            .client
            .send_msg_response(
                self.device.read().await.lfdi,
                event,
                status,
                self.schedule_time(),
            )
            .await
        {
            Ok(
                e @ (SEPResponse::BadRequest(_)
                | SEPResponse::NotFound
                | SEPResponse::MethodNotAllowed(_)),
            ) => log::warn!(
                "Client: Messaging response POST attempt failed with HTTP status code: {}",
                e
            ),
            Err(e) => log::warn!(
                "Client: Messaging response POST attempted failed with reason: {}",
                e
            ),
            Ok(r @ (SEPResponse::Created(_) | SEPResponse::NoContent)) => {
                log::info!(
                    "Client: Messaging response POST attempt succeeded with reason: {}",
                    r
                )
            }
        }
    }
}

/// Messaging is NOT a function set where events exhibit 'direct control'.
/// Unlike direct control function sets, multiple text messages can be active at any given time.
///
/// This schedule simply calls the given handler when the start time arrives, if and when the event is cancelled,
/// The supplied handler is given the primacy, and the priority of the text message, and it is up to them how they are displayed.
///
///
/// TODO: This implementation currently does not support manual acknowledgement of text messages, as while the client's handler is called, a lock is acquired on the scheduler, meaning it cannot progress.
#[async_trait::async_trait]
impl Scheduler<TextMessage> for Schedule<TextMessage> {
    type Program = MessagingProgram;
    fn new(
        client: Client,
        device: Arc<RwLock<SEDevice>>,
        handler: impl EventCallback<TextMessage>,
        tickrate: Duration,
    ) -> Self {
        let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
        let out = Schedule {
            client,
            device,
            events: Arc::new(RwLock::new(Events::new())),
            handler: Arc::new(move |ei| {
                let handler = handler.clone();
                Box::pin(async move { handler.event_update(ei).await })
            }),
            bc_sd: tx.clone(),
            tickrate,
            time_offset: Arc::new(AtomicI64::new(0)),
        };
        tokio::spawn(out.clone().clean_events(rx));
        tokio::spawn(out.clone().msg_start_task(tx.subscribe()));
        tokio::spawn(out.clone().msg_end_task(tx.subscribe()));
        out
    }

    async fn add_event(&mut self, event: TextMessage, program: &Self::Program, server_id: u8) {
        let mrid = event.mrid;
        let incoming_status = event.event_status.current_status;

        // If the event already exists in the schedule
        // "Editing events shall NOT be allowed, except for updating status"
        let cur = { self.events.read().await.get(&mrid).map(|e| e.status()) };
        if let Some(current_status) = cur {
            match (current_status, incoming_status) {
                // Active -> (Cancelled || CancelledRandom || Superseded)
                (EIStatus::Active, EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded) => {
                    log::warn!("TextMessageSchedule: TextMessage ({mrid}) has been marked as superseded by the server, yet it is active locally. The event will be cancelled");
                    self.cancel_textmessage(&mrid, current_status, incoming_status.into()).await;
                },
                // Scheduled -> (Cancelled || CancelledRandom)
                (EIStatus::Scheduled, EventStatus::Cancelled | EventStatus::CancelledRandom) => {
                    log::info!("TextMessageSchedule: TextMessage ({mrid} has been marked as cancelled by the server. It will not be started");
                    self.cancel_textmessage(&mrid, current_status, incoming_status.into()).await;
                },
                // Scheduled -> Active
                (EIStatus::Scheduled, EventStatus::Active) => {
                    log::info!("TextMessageSchedule: TextMessage ({mrid}) has entered it's earliest effective start time.")
                }
                // Scheduled -> Superseded
                (EIStatus::Scheduled, EventStatus::Superseded) =>
                    log::warn!("TextMessageSchedule: TextMessage ({mrid}) has been marked as superseded by the server, which is not permissible. Ignoring."),
                // Active -> Scheduled
                (EIStatus::Active, EventStatus::Scheduled) =>
                    log::warn!("TextMessageSchedule: TextMessage ({mrid}) is active locally, and scheduled on the server. Is the client clock ahead?"),
                // Active -> Active
                (EIStatus::Active, EventStatus::Active) => (),
                // Complete -> Any
                (EIStatus::Complete, _) => (),
                // (Cancelled | CancelledRandom | Superseded) -> Any
                (EIStatus::Cancelled | EIStatus::CancelledRandom | EIStatus::Superseded, _) => (),
                // Scheduled -> Scheduled
                (EIStatus::Scheduled, EventStatus::Scheduled) => (),
            }
        } else {
            let mut events = self.events.write().await;

            self.auto_msg_response(&event, ResponseStatus::EventReceived)
                .await;

            // Event arrives cancelled or superseded
            if matches!(
                incoming_status,
                EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded
            ) {
                log::warn!("TextMessageSchedule: Told to schedule DERControl ({mrid}) which is already {:?}, sending server response and not scheduling.", incoming_status);
                self.auto_msg_response(&event, incoming_status.into()).await;
                return;
            }

            let ei = EventInstance::new(program.primacy, event, program.mrid, server_id);
            // The event may have expired already
            if ei.end_time() <= self.schedule_time().into() {
                log::warn!("TextMessageSchedule: Told to schedule TextMessage ({mrid}) which has already ended, ignoring.");
                // We do NOT send a response, as required by the spec
                return;
            }

            // Update `next_start` and `next_end`
            events.update_nexts();

            // Add it to our schedule
            events.insert(&mrid, ei);
        }
    }
}