1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
//! gst-pai-streamer
//!
//! The purpose if this crate is to implement the ``periscopai`` sequencer.
//! The sequencer is responsible for creating the gstreamer pipeline, installing
//! the appropriate callbacks for messages and managing the pipeline state.
//!
//! It is meant to be a simple interface to gstreamer, implemented in rust and
//! which can be extended with Python.
//!
//! The python extension should be available [periscopai/dev](https://m.devpi.net/periscopai/dev)
//! on devpi.
//!
//! ```text
//! 
//!
//!         +--------------------------+
//!         |  Web Interface - React   |
//!         +--------------------------+
//!                     |
//!                     V 
//!         +--------------------------+
//!         |  Rest API     - Python   |
//!         +--------------------------+
//!                     |
//!                     V
//!         +--------------------------+
//!         |  Engine       - Python   |
//!         +--------------------------+
//!                     |  
//!                     V
//!     +=================================+
//!     |         pai-gst-sequencer       |
//!     |   +--------------------------+  |
//!     |   |   PAISequencer - Rust    |  |
//!     |   +--------------------------+  |
//!     |               |                 |
//!     |               V                 |
//!     |   +--------------------------+  |
//!     |   |  AI GST Pipeline  - C    |  |
//!     |   +--------------------------+  |
//!     |                                 |
//!     +=================================+
//! ```
//!
//! **Author:** Laurent Brack
//!

/// Representation of the Sequencer state
///
/// Note that to represent Enum in prints, you need to
/// mark them as derive(Debug) and then
/// use a print statement like this
/// ``` rust
/// # use pai_gst_sequencer::*;
/// # let mut sequencer = PAISequencer::new("video");
/// println!("sequencer state '{:?}'",sequencer.state());
/// ```
/// more on [stackoverflow](https://stackoverflow.com/questions/28024373/is-there-a-way-to-print-enum-values)
extern crate gstreamer as gst;
extern crate pyo3;
use gst::prelude::*;
use pyo3::prelude::*;

/// A Python module implemented in Rust.
#[pymodule]
#[allow(unused_variables)]
fn pai_gst_sequencer(py: Python, m: &PyModule) -> PyResult<()> {
    m.add_class::<PAISequencer>()?;
    Ok(())
}

#[derive(Debug, PartialEq)] // To be able to assert on enum value - see main.rs for details.
pub enum PAISequencerState {
    /// The pipeline was created by not initialized
    CREATED,
    /// The pipeline is in error state
    ERROR,
    /// pipeline is running
    RUNNING,
    /// pipeline is stopped
    STOPPED,
}

// Class Documentation
//
/// Periscopai sequence pipeline
///
/// # Arguments
///
#[pyclass]
pub struct PAISequencer {
    /// bla
    input: String,
    state: u8,
    source: gst::Element,
    sink: gst::Element,
    pipeline: gst::Pipeline,
}

#[pymethods]
impl PAISequencer {
    #[classattr]
    pub const CREATED: u8 = 10;
    #[classattr]
    pub const ERROR: u8 = 11;
    #[classattr]
    pub const RUNNING: u8 = 12;
    #[classattr]
    pub const STOPPED: u8 = 13;

    #[new]
    pub fn new(input: &str) -> PAISequencer {
        // Initialize GStreamer
        gst::init().unwrap();
        let instance = PAISequencer {
            input: input.to_string(),
            state: PAISequencer::CREATED,
            source: gst::ElementFactory::make("videotestsrc", Some("source"))
                .expect("Could not create source element."),
            sink: gst::ElementFactory::make("autovideosink", Some("sink"))
                .expect("Could not create sink element"),
            pipeline: gst::Pipeline::new(Some("test-pipeline")),
        };
        // Build the pipeline
        instance
            .pipeline
            .add_many(&[&instance.source, &instance.sink])
            .unwrap();
        instance
            .source
            .link(&instance.sink)
            .expect("Elements could not be linked.");

        // Modify the source's properties
        instance.source.set_property_from_str("pattern", "smpte");

        instance
    }

    /// starts the sequencer and sets the state to RUNNING
    ///
    /// # Returns:PAISequencerState::RUNNING
    ///
    /// Note that the reference to self is set as mutable as we
    /// want to be able to change the state.
    pub fn start(&mut self) -> u8 {
        self.state = PAISequencer::RUNNING;
        // Start playing
        self.pipeline
            .set_state(gst::State::Playing)
            .expect("Unable to set the pipeline to the `Playing` state");

        // // Wait until error or EOS
        // let bus = self.pipeline.get_bus().unwrap();
        // for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) {
        //     use gst::MessageView;

        //     match msg.view() {
        //         MessageView::Error(err) => {
        //             eprintln!(
        //                 "Error received from element {:?}: {}",
        //                 err.get_src().map(|s| s.get_path_string()),
        //                 err.get_error()
        //             );
        //             eprintln!("Debugging information: {:?}", err.get_debug());
        //             break;
        //         }
        //         MessageView::Eos(..) => break,
        //         _ => (),
        //     }
        // }

        self.state
    }
    pub fn stop(&mut self) -> u8 {
        self.state = PAISequencer::STOPPED;

        self.pipeline
            .set_state(gst::State::Null)
            .expect("Unable to set the pipeline to the `Null` state");

        self.state
    }
    ///
    pub fn state(&self) -> u8 {
        self.state
    }
    pub fn input(&self) -> &String {
        &self.input
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::{thread, time};

    #[test]
    fn test_states() {
        let mut sequencer = PAISequencer::new("video");
        println!("internal state {:?}", sequencer.state());
        assert!(matches!(sequencer.state(), PAISequencer::CREATED));
        println!("sequencer state after start '{:?}'", sequencer.start());
        assert!(matches!(sequencer.state(), PAISequencer::RUNNING));
        println!("sleeping for 2 seconds");
        thread::sleep(time::Duration::from_millis(2000));
        println!("sequencer state after stop'{:?}'", sequencer.stop());
        assert!(matches!(sequencer.state(), PAISequencer::STOPPED));
    }
}