Merge branch 'feat/rosimage' into 'main'

rosimage plugin

See merge request gstreamer/gst-plugins-rs!1126
This commit is contained in:
jobafr 2024-04-26 21:01:06 +00:00
commit 16bec4dc9e
10 changed files with 571 additions and 0 deletions

View file

@ -55,6 +55,7 @@ members = [
"video/hsv",
"video/png",
"video/rav1e",
"video/rosimage",
"video/videofx",
"video/webp",
]

2
video/rosimage/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

44
video/rosimage/Cargo.toml Normal file
View file

@ -0,0 +1,44 @@
[package]
name = "gst-plugin-rosimage"
version = "0.2.0"
edition = "2021"
description = "Acts as a ROS1 node subscribing to an image topic and outputs a video/x-raw stream, or vice versa"
rust-version = "1.66"
authors = ["Johannes Barthel <johannes.barthel@farming-revolution.com>"]
license = "MIT OR MPL-2.0"
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/"
[dependencies]
crossbeam-channel = "0.5.7"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
once_cell = "1.0"
rosrust = "0.9"
rosrust_msg = "0.1"
[lib]
name = "rosimagebridge"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[features]
static = []
capi = []
doc = ["gst/v1_18"]
[build-dependencies]
gst-plugin-version-helper = { path="../../version-helper" }
[package.metadata.capi]
min_version = "0.8.0"
[package.metadata.capi.header]
enabled = false
[package.metadata.capi.library]
install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0"

23
video/rosimage/README.md Normal file
View file

@ -0,0 +1,23 @@
# rosimage
This package contains a plugin that subscribes to a ROS1 image topic and acts as a gstreamer video source (`rosimagesrc`), and one for consuming a gstreamer stream and publishing it as a ROS topic (`rosimagesink`).
Related work:
- [ros-gst-bridge](https://github.com/BrettRD/ros-gst-bridge/): the same thing, but for ROS2 (and with more features)
- [gscam](https://github.com/ros-drivers/gscam): ROS1, but only the gstreamer → ROS part.
## Installation
If you have not already, install ROS on your system, according to the [official guide](http://wiki.ros.org/noetic/Installation). The only ROS dependency this plugin has is the `sensor_msgs` package, which is included in `ros-noetic-desktop`. After installing, add ROS to your environment (`source /opt/ros/noetic/setup.bash`) and build the plugin by running `cargo cbuild -p gst-plugin-rosimage` from the root of this repo.
## Hello world
Open four terminals, and run `source /opt/ros/noetic/setup.bash` in each. Then:
- In the first one, start `roscore`
- In the second one, start a gstreamer → ROS pipeline: `GST_PLUGIN_PATH="target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH" gst-launch-1.0 videotestsrc ! rosimagesink topic=/helloworld`
- In the third one, start `rqt_image_view` and select the `/helloworld` topic from the dropdown menu. You should now be seeing the gstreamer test video, but from the ROS side of things.
- In the fourth terminal, start the ROS → gstreamer pipeline `GST_PLUGIN_PATH="target/x86_64-unknown-linux-gnu/debug:$GST_PLUGIN_PATH" gst-launch-1.0 rosimagesrc topic=/helloworld ! videoconvert ! autovideosink`. This will produce a GStreamer window showing the same test video, but after it has been converted back from ROS.
## Copyright
© 2023 by [farming revolution GmbH](https://farming-revolution.com/), published under the [MIT License](../../LICENSE-MIT) or the [Mozilla Public License Version 2.0](../../LICENSE-MPL-2.0), at your choice.

3
video/rosimage/build.rs Normal file
View file

@ -0,0 +1,3 @@
fn main() {
gst_plugin_version_helper::info()
}

22
video/rosimage/src/lib.rs Normal file
View file

@ -0,0 +1,22 @@
use gst::glib;
mod rosimagesink;
mod rosimagesrc;
gst::plugin_define!(
rosimagebridge,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"Proprietary",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
rosimagesrc::register(plugin)?;
rosimagesink::register(plugin)?;
Ok(())
}

View file

@ -0,0 +1,202 @@
use gst::glib;
use gst::prelude::*;
use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy;
use rosrust_msg::sensor_msgs::Image;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rosimagesink",
gst::DebugColorFlags::empty(),
Some("ROS1 Image topic sink"),
)
});
struct Dimensions {
width: i32,
height: i32,
}
#[derive(Default)]
pub struct RosImageSink {
publisher: once_cell::sync::OnceCell<rosrust::Publisher<Image>>,
topic: once_cell::sync::OnceCell<String>,
dimensions: once_cell::sync::OnceCell<Dimensions>,
}
impl RosImageSink {
fn ensure_publisher(&self) -> Result<(), gst::ErrorMessage> {
if self.publisher.get().is_some() {
return Ok(());
}
let topic = self.topic.get().ok_or_else(|| {
gst::error_msg!(
gst::CoreError::Negotiation,
["ROS topic has not been specified"]
)
})?;
rosrust::init(&format!("gst_to_ros__{}", topic.replace('/', "_")));
self.publisher
.set(
rosrust::publish(topic, 1)
.map_err(|e| gst::error_msg!(gst::CoreError::Failed, ["{:?}", e]))?,
)
.map_err(|_e| ())
.expect("tried to set publisher when it already existed, this is a bug");
return Ok(());
}
}
#[glib::object_subclass]
impl ObjectSubclass for RosImageSink {
const NAME: &'static str = "rosimagesink";
type Type = super::RosImageSink;
type ParentType = gst_base::BaseSink;
}
impl ObjectImpl for RosImageSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![{
let mut b = glib::ParamSpecString::builder("topic");
b.set_nick(Some("ROS Topic"));
b.set_blurb(Some("The Image topic on which to publish ROS messages (e.g. /video_stream/image_raw)"));
b.set_flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY);
b.build()
}]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"topic" => {
let value: String = value.get().expect("type checked upstream");
self.topic
.set(value)
.expect("could not save topic to self.topic");
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"topic" => self
.topic
.get()
.unwrap_or(&String::from("(uninitialized)"))
.to_value(),
_ => unimplemented!(),
}
}
}
impl GstObjectImpl for RosImageSink {}
impl BaseSinkImpl for RosImageSink {
fn start(&self) -> Result<(), gst::ErrorMessage> {
self.ensure_publisher()?;
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
rosrust::shutdown();
Ok(())
}
fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::info!(
CAT,
"Received image for publishing on topic '{}'",
self.topic.get().map(String::as_str).unwrap_or("(unknown)")
);
self.ensure_publisher().unwrap();
let publisher = self.publisher.get().unwrap();
if publisher.subscriber_count() == 0 {
gst::info!(
CAT,
"no subscribers on topic '{}', dropping frame",
self.topic.get().map(String::as_str).unwrap_or("(unknown)")
);
return Ok(gst::FlowSuccess::Ok);
}
let mut msg = Image::default();
msg.data = vec![0; buffer.size()];
msg.encoding = "rgb8".to_owned();
let dims = self.dimensions.get_or_init(|| {
let caps = self.obj().pads()[0].current_caps().unwrap();
gst::info!(
CAT,
"Initializing caps for topic '{}': {:?}",
self.topic.get().map(String::as_str).unwrap_or("(unknown)"),
&caps
);
let width: i32 = caps.iter().next().unwrap().get("width").unwrap();
let height: i32 = caps.iter().next().unwrap().get("height").unwrap();
assert!(width > 0);
assert!(height > 0);
Dimensions { width, height }
});
msg.width = dims.width as u32;
msg.height = dims.height as u32;
msg.step = 3 * (dims.width as u32);
buffer.copy_to_slice(0, &mut msg.data).unwrap();
publisher.send(msg).unwrap();
Ok(gst::FlowSuccess::Ok)
}
fn caps(&self, filter: Option<&gst::Caps>) -> Option<gst::Caps> {
self.parent_caps(filter)
}
}
impl ElementImpl for RosImageSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"ROS Image topic source",
"Source/Video",
env!("CARGO_PKG_DESCRIPTION"),
"Johannes Barthel <johannes.barthel@farming-revolution.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder("video/x-raw")
.field("format", "RGB")
.build();
let src_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}

View file

@ -0,0 +1,20 @@
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct RosImageSink(ObjectSubclass<imp::RosImageSink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
}
// Registers the type for our element, and then registers in GStreamer under
// the name "rosimagesink" for being able to instantiate it via e.g.
// gst::ElementFactory::make().
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rosimagesink",
gst::Rank::None,
RosImageSink::static_type(),
)
}

View file

@ -0,0 +1,234 @@
use gst::glib;
use gst::prelude::*;
use gst::ClockTime;
use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::{base_src, prelude::*};
use gst_base::traits::BaseSrcExt;
use gst_base::PushSrc;
use once_cell::sync::Lazy;
use rosrust_msg::sensor_msgs::Image;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rosimagesrc",
gst::DebugColorFlags::empty(),
Some("ROS1 Image topic source"),
)
});
#[derive(Default)]
pub struct RosImageSrc {
receiver: once_cell::sync::OnceCell<crossbeam_channel::Receiver<Image>>,
subscriber: once_cell::sync::OnceCell<rosrust::Subscriber>,
topic: once_cell::sync::OnceCell<String>,
create_called: std::sync::Arc<std::sync::atomic::AtomicU64>,
}
fn encoding_to_gst(encoding: &str) -> Option<gst_video::VideoFormat> {
match encoding {
"rgb8" => Some(gst_video::VideoFormat::Rgb),
"bgr8" => Some(gst_video::VideoFormat::Bgr),
_ => None,
}
}
impl RosImageSrc {
fn ensure_subscriber(&self) -> Result<(), gst::ErrorMessage> {
if self.subscriber.get().is_some() {
return Ok(());
}
let (sender, receiver) = crossbeam_channel::bounded(1);
let topic = self.topic.get().ok_or_else(|| {
gst::error_msg!(
gst::CoreError::Negotiation,
["ROS topic has not been specified"]
)
})?;
let topic2 = topic.clone();
rosrust::init(&format!("ros_to_gst__{}", topic.replace('/', "_")));
let create_called1 = self.create_called.clone();
self.subscriber.set(
rosrust::subscribe(topic, 2, move |v: rosrust_msg::sensor_msgs::Image| {
gst::info!(CAT, "Received image on topic '{}' in frame '{}'", topic2, v.header.frame_id);
if encoding_to_gst(&v.encoding).is_none() {
gst::error!(CAT, "Unknown ROS image format '{}' on topic '{}'. Not forwarding frame to GStreamer pipeline.", v.encoding, topic2);
} else if let Err(e) = sender.try_send(v) {
let create_called = create_called1.load(std::sync::atomic::Ordering::SeqCst);
gst::error!(CAT, "error sending an image frame to gstreamer: {:#?} | create_called: {}", e, create_called);
}
}).map_err(|e| gst::error_msg!(gst::CoreError::Failed, ["{:?}", e]))?
).map_err(|_e| gst::error_msg!(gst::CoreError::Failed, ["Tried to start RosImageSrc, but Subscriber was already there."]))?;
self.receiver
.set(receiver)
.expect("RosImageSrc.receiver initialized a second time. This is a bug.");
Ok(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for RosImageSrc {
const NAME: &'static str = "rosimagesrc";
type Type = super::RosImageSrc;
type ParentType = gst_base::PushSrc;
}
impl ObjectImpl for RosImageSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![{
let mut b = glib::ParamSpecString::builder("topic");
b.set_nick(Some("ROS Topic"));
b.set_blurb(Some(
"The Image topic which to subscribe (e.g. /usb_cam/image_raw)",
));
b.set_flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY);
b.build()
}]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"topic" => {
let value: String = value.get().expect("type checked upstream");
self.topic
.set(value)
.expect("could not save topic to self.topic");
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"topic" => self
.topic
.get()
.unwrap_or(&String::from("(uninitialized)"))
.to_value(),
_ => unimplemented!(),
}
}
fn constructed(&self) {
self.parent_constructed();
self.obj().set_do_timestamp(true);
self.obj().set_format(gst::Format::Time);
assert!(self.is_seekable() == false);
self.obj().set_live(true);
}
}
impl GstObjectImpl for RosImageSrc {}
impl BaseSrcImpl for RosImageSrc {
fn start(&self) -> Result<(), gst::ErrorMessage> {
self.ensure_subscriber()?;
for pad in self.obj().pads().iter_mut() {
pad.activate_mode(gst::PadMode::Push, true).unwrap();
}
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
rosrust::shutdown();
Ok(())
}
fn is_seekable(&self) -> bool {
false
}
fn caps(&self, _filter: Option<&gst::Caps>) -> Option<gst::Caps> {
self.ensure_subscriber().ok()?;
let frame = self
.receiver
.get()
.expect("receiver not initialized after call to ensure_subscriber, this is a bug.")
.recv()
.expect("error receiving frame while determining caps.");
let encoding = encoding_to_gst(&frame.encoding)?;
Some(
gst_video::VideoInfo::builder(encoding, frame.width, frame.height)
.fps(0)
.build()
.unwrap()
.to_caps()
.unwrap(),
)
}
}
impl PushSrcImpl for RosImageSrc {
fn create(
&self,
buffer: Option<&mut gst::BufferRef>,
) -> Result<base_src::CreateSuccess, gst::FlowError> {
self.create_called
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
gst::info!(CAT, "RosImageSrc::create() called");
self.ensure_subscriber()
.expect("create called when subscriber was not ready");
let obj = self.obj();
let t = obj.current_clock_time();
self.obj().suggest_next_sync();
let frame = self
.receiver
.get()
.expect("no receiver after call to ensure_subscriber, this is a bug")
.recv()
.expect("could not receive frame in RosImageSrc.create");
if let Some(buf) = buffer {
use std::io::Write;
buf.set_size(frame.data.len());
let mut buf = buf.as_cursor_writable().unwrap();
buf.write(&frame.data).unwrap();
return Ok(base_src::CreateSuccess::FilledBuffer);
};
let mut buf = gst::Buffer::from_slice(frame.data);
Ok(base_src::CreateSuccess::NewBuffer(buf))
}
}
impl ElementImpl for RosImageSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"ROS Image topic source",
"Source/Video",
env!("CARGO_PKG_DESCRIPTION"),
"Johannes Barthel <johannes.barthel@farming-revolution.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder("video/x-raw").build();
let mut src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}

View file

@ -0,0 +1,20 @@
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct RosImageSrc(ObjectSubclass<imp::RosImageSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
// Registers the type for our element, and then registers in GStreamer under
// the name "rosimagesrc" for being able to instantiate it via e.g.
// gst::ElementFactory::make().
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rosimagesrc",
gst::Rank::None,
RosImageSrc::static_type(),
)
}