From: <jfa...@us...> - 2009-06-30 23:35:38
|
Revision: 18028 http://personalrobots.svn.sourceforge.net/personalrobots/?rev=18028&view=rev Author: jfaustwg Date: 2009-06-30 23:35:22 +0000 (Tue, 30 Jun 2009) Log Message: ----------- 2nd pass Subscriber filter which replaces SyncHelper, and conforms to our latest discussions. Also updated MsgCache/Consumer to conform. Modified Paths: -------------- pkg/trunk/sandbox/message_filters/include/message_filters/consumer.h pkg/trunk/sandbox/message_filters/include/message_filters/msg_cache.h pkg/trunk/sandbox/message_filters/src/filter_example.cpp Added Paths: ----------- pkg/trunk/sandbox/message_filters/include/message_filters/subscriber.h Removed Paths: ------------- pkg/trunk/sandbox/message_filters/include/message_filters/sync_helper.h Modified: pkg/trunk/sandbox/message_filters/include/message_filters/consumer.h =================================================================== --- pkg/trunk/sandbox/message_filters/include/message_filters/consumer.h 2009-06-30 23:34:18 UTC (rev 18027) +++ pkg/trunk/sandbox/message_filters/include/message_filters/consumer.h 2009-06-30 23:35:22 UTC (rev 18028) @@ -40,26 +40,29 @@ #define MESSAGE_FILTERS_CONSUMER_H_ #include <boost/thread.hpp> -#include "sensor_msgs/CamInfo.h" +#include <boost/signals.hpp> +#include <ros/console.h> + namespace message_filters { // A really easy 'filter' that simply consumes data +template<class M> class Consumer { public: + typedef boost::shared_ptr<M const> MPtr; - template<class T> + template<class A> + Consumer(A& a) + { + conn_ = a.connect(boost::bind(&Consumer<M>::processData, this, _1)); + } - /** - * Links Consumer's input to some provider's output. - * \param provider The filter from which we want to receive data - */ - void subscribe(T& provider) + ~Consumer() { - printf("Called Subscribe\n") ; - provider.addOutputCallback(boost::bind(&Consumer::processData, this, _1)) ; + conn_.disconnect(); } /** @@ -67,14 +70,14 @@ * involve pushing data onto queues, and pushing it along the pipeline into * another filter */ - void processData(const sensor_msgs::CamInfoConstPtr& msg) + void processData(const MPtr& msg) { - printf("%u - Called Consumer Callback!\n", (*msg).header.seq) ; + ROS_INFO("Called Consumer Callback!") ; } private: - + boost::signals::connection conn_; } ; } Modified: pkg/trunk/sandbox/message_filters/include/message_filters/msg_cache.h =================================================================== --- pkg/trunk/sandbox/message_filters/include/message_filters/msg_cache.h 2009-06-30 23:34:18 UTC (rev 18027) +++ pkg/trunk/sandbox/message_filters/include/message_filters/msg_cache.h 2009-06-30 23:35:22 UTC (rev 18028) @@ -42,6 +42,7 @@ #include <deque> #include "boost/thread.hpp" #include "boost/shared_ptr.hpp" +#include <boost/signals.hpp> #include "ros/time.h" namespace message_filters @@ -57,13 +58,22 @@ class MsgCache { public: - typedef boost::shared_ptr<M const> MsgPtr ; + typedef boost::shared_ptr<M const> MConstPtr ; + typedef boost::function<void(const MConstPtr&)> Callback; + typedef boost::signal<void(const MConstPtr&)> Signal; - MsgCache(unsigned int cache_size = 1) + template<class A> + MsgCache(A& a, unsigned int cache_size = 1) { setCacheSize(cache_size) ; + incoming_connection_ = a.connect(boost::bind(&MsgCache::addToCache, this, _1)); } + ~MsgCache() + { + incoming_connection_.disconnect(); + } + /** * Set the size of the cache. * \param cache_size The new size the cache should be. Must be > 0 @@ -79,58 +89,45 @@ cache_size_ = cache_size ; } - - template<class T> - /** - * Links Consumer's input to some provider's output. - * \param provider The filter from which we want to receive data - */ - void subscribe(T& provider) + boost::signals::connection connect(const Callback& callback) { - //printf("Called MsgCache Subscribe\n") ; - provider.addOutputCallback(boost::bind(&MsgCache::addToCache, this, _1)) ; + boost::mutex::scoped_lock lock(signal_mutex_); + return signal_.connect(callback); } /** * Add the most recent message to the cache, and pop off any elements that are too old. * This method is registered with a data provider when subscribe is called. */ - void addToCache(const MsgPtr& msg) + void addToCache(const MConstPtr& msg) { //printf(" Cache Size: %u\n", cache_.size()) ; - cache_lock_.lock() ; + { + boost::mutex::scoped_lock lock(cache_lock_); - while (cache_.size() >= cache_size_) // Keep popping off old data until we have space for a new msg - cache_.pop_front() ; // The front of the deque has the oldest elem, so we can get rid of it + while (cache_.size() >= cache_size_) // Keep popping off old data until we have space for a new msg + cache_.pop_front() ; // The front of the deque has the oldest elem, so we can get rid of it - cache_.push_back(msg) ; // Add the newest message to the back of the deque - cache_lock_.unlock() ; + cache_.push_back(msg) ; // Add the newest message to the back of the deque + } - // Sequentially call each registered call - for (unsigned int i=0; i<output_callbacks_.size(); i++) - output_callbacks_[i]() ; + { + boost::mutex::scoped_lock lock(signal_mutex_); + // Sequentially call each registered call + signal_(msg); + } } /** - * Called by another filter that wants the output of this filter - * \param callback The function that is called when data is available - */ - void addOutputCallback(const boost::function<void()>& callback) - { - output_callbacks_.push_back(callback) ; - } - - - /** * Receive a vector of messages that occur between a start and end time (inclusive). * This call is non-blocking, and only aggregates messages it has already received. * It will not wait for messages have not yet been received, but occur in the interval. * \param start The start of the requested interval * \param end The end of the requested interval */ - std::vector<MsgPtr> getInterval(const ros::Time& start, const ros::Time& end) + std::vector<MConstPtr> getInterval(const ros::Time& start, const ros::Time& end) { - cache_lock_.lock() ; + boost::mutex::scoped_lock lock(cache_lock_); // Find the starting index. (Find the first index after [or at] the start of the interval) unsigned int start_index = 0 ; @@ -148,15 +145,13 @@ end_index++ ; } - std::vector<MsgPtr> interval_elems ; + std::vector<MConstPtr> interval_elems ; interval_elems.reserve(end_index - start_index) ; for (unsigned int i=start_index; i<end_index; i++) { interval_elems.push_back(cache_[i]) ; } - cache_lock_.unlock() ; - return interval_elems ; } @@ -165,11 +160,12 @@ * \param time Time that must occur right after the returned elem * \returns shared_ptr to the newest elem that occurs before 'time'. NULL if doesn't exist */ - MsgPtr getElemBeforeTime(const ros::Time& time) + MConstPtr getElemBeforeTime(const ros::Time& time) { - MsgPtr out ; + boost::mutex::scoped_lock lock(cache_lock_); - cache_lock_.lock() ; + MConstPtr out ; + unsigned int i=0 ; int elem_index = -1 ; while (i<cache_.size() && @@ -182,8 +178,6 @@ if (elem_index >= 0) out = cache_[elem_index] ; - cache_lock_.unlock() ; - return out ; } @@ -192,11 +186,11 @@ * \param time Time that must occur right before the returned elem * \returns shared_ptr to the oldest elem that occurs after 'time'. NULL if doesn't exist */ - MsgPtr getElemAfterTime(const ros::Time& time) + MConstPtr getElemAfterTime(const ros::Time& time) { - MsgPtr out ; + boost::mutex::scoped_lock lock(cache_lock_); - cache_lock_.lock() ; + MConstPtr out ; int i=cache_.size()-1 ; int elem_index = -1 ; @@ -212,8 +206,6 @@ else out.reset() ; - cache_lock_.unlock() ; - return out ; } @@ -222,24 +214,25 @@ */ ros::Time getLatestTime() { + boost::mutex::scoped_lock lock(cache_lock_); + ros::Time latest_time(0, 0) ; - cache_lock_.lock() ; if (cache_.size() > 0) latest_time = cache_.back()->header.stamp ; - cache_lock_.unlock() ; return latest_time ; } private: boost::mutex cache_lock_ ; //!< Lock for cache_ - std::deque<MsgPtr > cache_ ; //!< Cache for the messages + std::deque<MConstPtr > cache_ ; //!< Cache for the messages unsigned int cache_size_ ; //!< Maximum number of elements allowed in the cache. - //! Array of callbacks to be called whenever new data is received - std::vector<boost::function<void()> > output_callbacks_ ; -} ; + boost::signals::connection incoming_connection_; + Signal signal_; + boost::mutex signal_mutex_; +}; } Added: pkg/trunk/sandbox/message_filters/include/message_filters/subscriber.h =================================================================== --- pkg/trunk/sandbox/message_filters/include/message_filters/subscriber.h (rev 0) +++ pkg/trunk/sandbox/message_filters/include/message_filters/subscriber.h 2009-06-30 23:35:22 UTC (rev 18028) @@ -0,0 +1,89 @@ +/********************************************************************* +* Software License Agreement (BSD License) +* +* Copyright (c) 2009, Willow Garage, Inc. +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following +* disclaimer in the documentation and/or other materials provided +* with the distribution. +* * Neither the name of the Willow Garage nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*********************************************************************/ + +#ifndef MESSAGE_FILTERS_SUBSCRIBER_H +#define MESSAGE_FILTERS_SUBSCRIBER_H + +#include <ros/ros.h> + +#include <boost/signals.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/noncopyable.hpp> + +namespace message_filters +{ + +template<class M> +class Subscriber : public boost::noncopyable +{ +public: + typedef boost::shared_ptr<M const> MConstPtr; + typedef boost::function<void(const MConstPtr&)> Callback; + typedef boost::signal<void(const MConstPtr&)> Signal; + + Subscriber(ros::NodeHandle& nh, const std::string& topic, uint32_t queue_size, const ros::TransportHints& transport_hints = ros::TransportHints(), ros::CallbackQueueInterface* callback_queue = 0) + { + ros::SubscribeOptions ops; + ops.init<M>(topic, queue_size, boost::bind(&Subscriber<M>::cb, this, _1)); + ops.callback_queue = callback_queue; + ops.transport_hints = transport_hints; + sub_ = nh.subscribe(ops); + } + + ~Subscriber() + { + sub_.shutdown(); + } + + boost::signals::connection connect(const Callback& callback) + { + boost::mutex::scoped_lock lock(signal_mutex_); + return signal_.connect(callback); + } + +private: + void cb(const MConstPtr& m) + { + boost::mutex::scoped_lock lock(signal_mutex_); + signal_(m); + } + + ros::Subscriber sub_; + Signal signal_; + boost::mutex signal_mutex_; +}; + +} + +#endif Deleted: pkg/trunk/sandbox/message_filters/include/message_filters/sync_helper.h =================================================================== --- pkg/trunk/sandbox/message_filters/include/message_filters/sync_helper.h 2009-06-30 23:34:18 UTC (rev 18027) +++ pkg/trunk/sandbox/message_filters/include/message_filters/sync_helper.h 2009-06-30 23:35:22 UTC (rev 18028) @@ -1,98 +0,0 @@ -/********************************************************************* -* Software License Agreement (BSD License) -* -* Copyright (c) 2008, Willow Garage, Inc. -* All rights reserved. -* -* Redistribution and use in source and binary forms, with or without -* modification, are permitted provided that the following conditions -* are met: -* -* * Redistributions of source code must retain the above copyright -* notice, this list of conditions and the following disclaimer. -* * Redistributions in binary form must reproduce the above -* copyright notice, this list of conditions and the following -* disclaimer in the documentation and/or other materials provided -* with the distribution. -* * Neither the name of the Willow Garage nor the names of its -* contributors may be used to endorse or promote products derived -* from this software without specific prior written permission. -* -* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE -* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, -* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN -* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -* POSSIBILITY OF SUCH DAMAGE. -*********************************************************************/ - -/*! \mainpage - * \htmlinclude manifest.html - */ - -#ifndef MESSAGE_FILTERS_SYNC_HELPER_H_ -#define MESSAGE_FILTERS_SYNC_HELPER_H_ - -#include <boost/thread.hpp> -#include <vector> - -namespace message_filters -{ - -template <class M> -class SyncHelper -{ -public: - - /** - * Used to link ROS messages to a filter system - * \param topic The topic that we want to subscribe to - * \param queue_size Defines the queue size in the subscribe call - * \param nh Node handle - */ - SyncHelper(const std::string& topic, uint32_t queue_size, ros::NodeHandle& nh) - { - sub_ = nh.subscribe(topic, queue_size, &SyncHelper<M>::msgHandler, this) ; - } - - /** - * Ros subscribe callback. Receives messages, and then pushes them to all the - * output callbacks - */ - void msgHandler(const boost::shared_ptr<M const>& msg) - { - printf("%u - Called SyncHelper Callback!\n", msg->header.seq) ; - - // Sequentially call each registered call - for (unsigned int i=0; i<output_callbacks_.size(); i++) - output_callbacks_[i](msg) ; - } - - /** - * Called by another filter that wants the output of this filter - * \param callback The function that is called when data is available - */ - void addOutputCallback(const boost::function<void(const boost::shared_ptr<M const>&)>& callback) - { - output_callbacks_.push_back(callback) ; - } - -private: - ros::Subscriber sub_ ; - - //! Vector of callbacks that need to be called, whenever data is ready. - std::vector<boost::function<void(const boost::shared_ptr<M const>&)> > output_callbacks_ ; - -} ; - -} - - - -#endif // MESSAGE_FILTERS_SYNC_HELPER_H_ Modified: pkg/trunk/sandbox/message_filters/src/filter_example.cpp =================================================================== --- pkg/trunk/sandbox/message_filters/src/filter_example.cpp 2009-06-30 23:34:18 UTC (rev 18027) +++ pkg/trunk/sandbox/message_filters/src/filter_example.cpp 2009-06-30 23:35:22 UTC (rev 18028) @@ -33,17 +33,20 @@ *********************************************************************/ #include "ros/ros.h" -#include "robot_msgs/PointStamped.h" -#include "robot_msgs/PointCloud.h" -#include "sensor_msgs/CamInfo.h" +#include "std_msgs/String.h" #include "message_filters/topic_synchronizer_filter.h" -#include "message_filters/sync_helper.h" +#include "message_filters/subscriber.h" #include "message_filters/consumer.h" #include "message_filters/msg_cache.h" using namespace std ; using namespace message_filters ; +void callback(const std_msgs::String::ConstPtr& msg) +{ + ROS_INFO("Received message [%s]", msg->data.c_str()); +} + int main(int argc, char** argv) { ros::init(argc, argv, "sync_test") ; @@ -53,15 +56,11 @@ //TopicSynchronizerFilter<robot_msgs::PointStamped, robot_msgs::PointCloud> filter ; // Define the source 'node' - SyncHelper<sensor_msgs::CamInfo> sync_helper("stereo/left/cam_info/", 10, nh) ; + Subscriber<std_msgs::String> sub(nh, "chatter", 10); + Consumer<std_msgs::String> consumer(sub); + MsgCache<std_msgs::String> cache(sub, 10); + cache.connect(callback); - Consumer consumer ; - // Link the consumer to the output of sync_helper - consumer.subscribe(sync_helper) ; - - MsgCache<sensor_msgs::CamInfo> cache(10) ; - cache.subscribe(sync_helper) ; - ros::spin() ; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |