回撥函式ros::spin()與ros::spinOnce()
ROS的主迴圈中需要不斷呼叫ros::spin()或ros::spinOnce(),兩者區別在於前者呼叫後不會再返回,而後者在呼叫後還可以繼續執行之後的程式。
在使用ros::spin()的情況下,一般來說在初始化時已經設定好所有訊息的回撥,並且不需要其他背景程式執行。這樣一來,每次訊息到達時會執行使用者的回撥函式進行操作,相當於程式是訊息事件驅動的;而在使用ros::spinOnce()的情況下,一般來說僅僅使用回撥不足以完成任務,還需要其他輔助程式的執行:比如定時任務、資料處理、使用者介面等。
關於訊息接收回調機制在ROS官網上略有說明 (callbacks and spinning)。總體來說其原理是這樣的:除了使用者的主程式以外,ROS的socket連線控制程序會在後臺接收訂閱的訊息,所有接收到的訊息並不是立即處理,而是等到spin()或者spinOnce()執行時才集中處理。所以為了保證訊息可以正常接收,需要尤其注意spinOnce()函式的使用 (對於spin()來說則不涉及太多的人為因素)。
I. 對於速度較快的訊息,需要注意合理控制訊息佇列及spinOnce()的時間。例如,如果訊息到達的頻率是100Hz,而spinOnce()的執行頻率是10Hz,那麼就要至少保證訊息佇列中預留的大小大於10。
II. 如果對於使用者自己的週期性任務,最好和spinOnce()並列呼叫。即使該任務是週期性的對於資料進行處理,例如對接收到的IMU資料進行Kalman濾波,也不建議直接放在回撥函式中:因為存在通訊接收的不確定性,不能保證該回調執行在時間上的穩定性。
// 示例程式碼 ros::Rate r(100); while (ros::ok()) { libusb_handle_events_timeout(...);// Handle USB events ros::spinOnce(); // Handle ROS events r.sleep(); }
III. 最後說明一下將ROS整合到其他程式架構時的情況。有些圖形處理程式會將main()包裹起來,此時就需要找到一個合理的位置呼叫ros::spinOnce()。比如對於OpenGL來說,其中有一個方法就是採用設定定時器定時呼叫的方法:
// 示例程式碼 void timerCb(int value) { ros::spinOnce(); } glutTimerFunc(10, timerCb, 0); glutMainLoop(); // Never returns
訊息到來並不會立即執行訊息處理回撥函式,而是在呼叫ros::spin()之後,才進行訊息處理的輪轉,訊息回撥函式統一處理訂閱話題的訊息。
roscpp不會在你的應用中明確一個執行緒模型:也就是說即使roscpp會在幕後使用多執行緒管理網路連結,排程等,但它不會將自己的執行緒暴露在你的應用中。
roscpp允許你的回撥函式被任意多執行緒呼叫,如果你願意。
最後的結果可能是你的回撥函式將沒有機會被呼叫,最常用的方法是使用ros::spin()呼叫。
注意:回撥函式的排隊和輪轉,不會對內部的網路通訊造成影響,它們僅僅會影響到使用者的回撥函式何時發生。它們會影響到訂閱者佇列。因為處理你回撥函式的速度,你訊息到來的速度,將會決定以前的訊息會不會被丟棄。
1.單執行緒下的輪轉
最簡單的單執行緒spin的例子就是ros::spin()自己。
ros::init(argc, argv, "my_node"); //初始化節點 ros::NodeHandle nh; //建立節點控制代碼 ros::Subscriber sub = nh.subscribe(...); //建立訊息訂閱者 ... ros::spin(); //呼叫spin(),統一處理訊息
在這裡,所有的使用者回撥函式將在spin()呼叫之後被呼叫.
ros::spin()不會返回,直到節點被關閉,或者呼叫ros::shutdown(),或者按下ctrl+C
另一個常用的模式是週期性地呼叫ros::spinOnce():
ros::Rate r(10); // 10 hz while (should_continue) { //... do some work, publish some messages, etc. ... ros::spinOnce(); //輪轉一次,返回 r.sleep(); //休眠 }
ros::spinOnce()將會在被呼叫的那一時間點呼叫所有等待的回撥函式.
注意:ros::spin()和ros::spinOnce()函式對單執行緒應用很有意義,目前不會應用於多執行緒.
2.多執行緒輪轉
上面是單執行緒下的訊息回撥函式輪轉,那多執行緒下是什麼樣子?
roscpp庫提供了一些內嵌的支援來從多執行緒中呼叫回撥函式.
1) ros::MultiThreadedSpiner
它是一個阻塞型輪轉器,類似於ros::spin().
可以使用它的構造器來設定執行緒的個數,如果不設定或設成0,它將為每個cpu核心使用一個執行緒。
ros::MultiThreadedSpinner spinner(4); // Use 4 threads
spinner.spin(); // spin() will not return until the node has been shutdown
2)ros::AsyncSpinner
API : http://docs.ros.org/api/roscpp/html/classros_1_1AsyncSpinner.html
更實用的多執行緒輪轉是非同步輪轉器(AsyncSpiner),相對於阻塞的spin()呼叫,它有自己的start()和stop()呼叫
並且在銷燬後將自動停止。
對上述MultiThreadedSpiner等效的AsyncSpiner使用如下:
ros::AsyncSpinner spinner(4); // Use 4 threads spinner.start(); ros::waitForShutdown();
3.CallbackQueue::callAvailable() and callOne()
CallbackQueue API 回撥函式佇列類:
http://docs.ros.org/api/roscpp/html/classros_1_1CallbackQueue.html
可以建立一個回撥函式佇列類:
#include
...
ros::CallbackQueue my_queue;
回撥函式佇列類有兩種觸發其內部回撥函式的方法:callAvailable()方法和callOne()方法.
前者將獲取當前可以符合條件的回撥函式,並且全部觸發它們;後者將簡單地觸發佇列中最早的那個回撥函式.
這兩個方法都接受一個可選的timeout超時時間,它們將在此時間之內等待一個回撥函式變得符合條件。
如果這個值是0,那麼,如果佇列中沒有回撥函式,該方法立即返回.
4.高階主題:使用不同的回撥函式佇列
預設的是所有的訊息回撥函式都會被壓入全域性訊息回撥佇列.
roscpp允許使用自定義的訊息回撥函式佇列並分別服務。
這可以以兩種粒度實現:
1)每個subsceribe(),advertise(),advertiseService(),等
這部分可以使用高階版的方法呼叫原型,使用一個選項結構體指標引數.
2)每個節點控制代碼
這是常見的方法,使用節點控制代碼的setCallbackQueue()方法:
ros::NodeHandle nh;
nh.setCallbackQueue(&my_callback_queue);
這使所有的訊息訂閱者,服務,定時器等的回撥函式都進入my_callback_queue,而非roscpp的預設佇列.
這意味著,ros::spin()和ros::spinOnce()將不會觸發這些回撥函式。
使用者自己必須額外呼叫這些回撥函式,可以使用的是回撥函式佇列類物件的callAvailable()方法和callOne()方法
應用:
將不同的回撥函式分別壓進不同的回撥函式佇列有下面幾個優勢:
1)長時服務:對一個服務的回撥函式安排一個單獨的佇列,然後單獨地使用一個執行緒來呼叫它,可以保證不會阻塞其它回撥函式
2)計算消耗回撥函式:與長時服務相似,為一個費計算時間的回撥函式安排一個單獨的回撥佇列處理,能夠減輕應用的負擔.