Linkerd2 proxy destination 學習筆記
作者: 嘩啦啦 mesh團隊,熱衷於kubernetes、devops、apollo、istio、linkerd、openstack、calico 等領域技術。
linkerd2介紹
Linkerd由控制平面
和資料平面
組成:
-
控制平面
是在所屬的Kubernetes名稱空間
(linkerd預設情況下)中執行的一組服務,這些服務可以完成匯聚遙測資料
,提供面向使用者的API,並向資料平面
代理提供控制資料
等,它們共同驅動
資料平面。 -
資料平面
用Rust編寫的輕量級代理,該代理安裝在服務的每個pod
中,併成為資料平面的一部分,它接收Pod的所有接入
流量,並通過initContainer
配置iptables
傳入和傳出
流量,所以不需要更改程式碼,甚至可以將其新增到正在執行
的服務中。
借用官方的圖:
proxy由rust開發完成,其內部的非同步執行時採用了Tokio框架,服務元件用到了tower。
本文主要關注proxy與destination元件互動相關的整體邏輯,分析proxy內部的執行邏輯。
流程分析
初始化
proxy啟動後:
-
app::init
初始化配置 -
app::Main::new
建立主邏輯main
, -
main.run_until
內新加一任務ProxyParts::build_proxy_task
。
在ProxyParts::build_proxy_task
中會進行一系列的初始化工作,此處只關注dst_svc
,其建立程式碼為:
dst_svc = svc::stack(connect::svc(keepalive))
.push(tls::client::layer(local_identity.clone()))
.push_timeout(config.control_connect_timeout)
.push(control::client::layer())
.push(control::resolve::layer(dns_resolver.clone()))
.push(reconnect::layer({
let backoff = config.control_backoff.clone();
move |_| Ok(backoff.stream())
}))
.push(http_metrics::layer::<_,classify::Response>(
ctl_http_metrics.clone(),))
.push(proxy::grpc::req_body_as_payload::layer().per_make())
.push(control::add_origin::layer())
.push_buffer_pending(
config.destination_buffer_capacity,config.control_dispatch_timeout,)
.into_inner()
.make(config.destination_addr.clone())複製程式碼
dst_svc
一共有2處引用,一是crate::resolve::Resolver
的建立會涉及;另一個就是ProfilesClient
的建立。
Resolver
-
api_resolve::Resolve::new(dst_svc.clone())
建立resolver
物件 - 呼叫
outbound::resolve
建立map_endpoint::Resolve
型別物件,並當做引數resolve
傳入outbound::spawn
函式開啟出口執行緒
在outbound::spawn
中,resolve
被用於建立負載均衡控制層,並用於後續路由控制:
let balancer_layer = svc::layers()
.push_spawn_ready()
.push(discover::Layer::new(
DISCOVER_UPDATE_BUFFER_CAPACITY,resolve,))
.push(balance::layer(EWMA_DEFAULT_RTT,EWMA_DECAY));複製程式碼
在discover::Layer::layer
中:
let from_resolve = FromResolve::new(self.resolve.clone());
let make_discover = MakeEndpoint::new(make_endpoint,from_resolve);
Buffer::new(self.capacity,make_discover)複製程式碼
Profiles
- 在
ProfilesClient::new
中呼叫api::client::Destination::new(dst_svc)
建立grpc的client端並存於成員變數service
- 接著
profiles_client
物件會被用於inbound
和outbound
的建立(省略無關程式碼):
let dst_stack = svc::stack(...)...
.push(profiles::router::layer(
profile_suffixes,profiles_client,dst_route_stack,))
...複製程式碼
其中profiles::router::layer
會建立一個Layer
物件,並將profiles_client
賦予get_routes
成員。然後在service
方法中,會調到Layer::layer
方法,裡面會建立一個MakeSvc
物件,其get_routes
成員的值即為profiles_client
。
執行
新的連線過來時,從listen
拿到連線物件後,會交給linkerd_proxy::transport::tls::accept::AcceptTls
的call
,然後是linkerd2_proxy::proxy::server::Server
的call
,並最終分別呼叫linkerd2_proxy_http::balance::MakeSvc::call
和linkerd2_proxy_http::profiles::router::MakeSvc::call
方法。
balance
在linkerd2_proxy_http::balance::MakeSvc::call
中:
- 呼叫
inner.call(target)
,此處的inner
即是前面Buffer::new
的結果。 - 生成一個新的
linkerd2_proxy_http::balance::MakeSvc
物件,當做Future
返回
先看inner.call
。它內部經過層層呼叫,依次觸發Buffer
、MakeEndpoint
、FromResolve
等結構的call
方法,最終會觸發最開始建立的resolve.resolve(target)
,其內部呼叫api_resolve::Resolve::call
。
在api_resolve::Resolve::call
中:
fn call(&mut self,target: T) -> Self::Future {
let path = target.to_string();
trace!("resolve {:?}",path);
self.service
// GRPC請求,獲取k8s的endpoint
.get(grpc::Request::new(api::GetDestination {
path,scheme: self.scheme.clone(),context_token: self.context_token.clone(),}))
.map(|rsp| {
debug!(metadata = ?rsp.metadata());
// 拿到結果stream
Resolution {
inner: rsp.into_inner(),}
})
}複製程式碼
將返回的Resolution
再次放入MakeSvc
中,然後看其poll:
fn poll(&mut self) -> Poll<Self::Item,Self::Error> {
// 這個poll會依次呼叫:
// linkerd2_proxy_api_resolve::resolve::Resolution::poll
// linkerd2_proxy_discover::from_resolve::DiscoverFuture::poll
// linkerd2_proxy_discover::make_endpoint::DiscoverFuture::poll
// 最終獲得Poll<Change<SocketAddr,Endpoint>>
let discover = try_ready!(self.inner.poll());
let instrument = PendingUntilFirstData::default();
let loaded = PeakEwmaDiscover::new(discover,self.default_rtt,self.decay,instrument);
let balance = Balance::new(loaded,self.rng.clone());
Ok(Async::Ready(balance))
}複製程式碼
最終返回service Balance
。
當具體請求過來後,先會判斷Balance::poll_ready
:
fn poll_ready(&mut self) -> Poll<(),Self::Error> {
// 獲取Update<Endpoint>
// 將Remove的從self.ready_services中刪掉
// 將Insert的構造UnreadyService結構加到self.unready_services
self.poll_discover()?;
// 對UnreadyService,呼叫其poll,內部會呼叫到svc的poll_ready判斷endpoint是否可用
// 可用時,將其加入self.ready_services
self.poll_unready();
loop {
if let Some(index) = self.next_ready_index {
// 找到對應的endpoint,可用則返回
if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) {
return Ok(Async::Ready(()));
}
}
// 選擇負載比較低的endpoint
self.next_ready_index = self.p2c_next_ready_index();
if self.next_ready_index.is_none() {
//
return Ok(Async::NotReady);
}
}
}複製程式碼
就緒後,對請求req
呼叫call
:
fn call(&mut self,request: Req) -> Self::Future {
// 找到下一個可用的svc,並將其從ready_services中刪除
let index = self.next_ready_index.take().expect("not ready");
let (key,mut svc) = self
.ready_services
.swap_remove_index(index)
.expect("invalid ready index");
// 將請求轉過去
let fut = svc.call(request);
// 加到unready
self.push_unready(key,svc);
fut.map_err(Into::into)
}複製程式碼
profiles
在linkerd2_proxy_http::profiles::router::MakeSvc::call
中:
// Initiate a stream to get route and dst_override updates for this
// destination.
let route_stream = match target.get_destination() {
Some(ref dst) => {
if self.suffixes.iter().any(|s| s.contains(dst.name())) {
debug!("fetching routes for {:?}",dst);
self.get_routes.get_routes(&dst)
} else {
debug!("skipping route discovery for dst={:?}",dst);
None
}
}
None => {
debug!("no destination for routes");
None
}
};複製程式碼
經過若干判斷後,會呼叫ProfilesClient::get_routes
並將結果存於route_stream
。
進入get_routes
:
fn get_routes(&self,dst: &NameAddr) -> Option<Self::Stream> {
// 建立通道
let (tx,rx) = mpsc::channel(1);
// This oneshot allows the daemon to be notified when the Self::Stream
// is dropped.
let (hangup_tx,hangup_rx) = oneshot::channel();
// 建立Daemon物件(Future任務)
let daemon = Daemon {
tx,hangup: hangup_rx,dst: format!("{}",dst),state: State::Disconnected,service: self.service.clone(),backoff: self.backoff,};
// 呼叫Daemon::poll
let spawn = DefaultExecutor::current().spawn(Box::new(daemon.map_err(|_| ())));
// 將通道接收端傳出
spawn.ok().map(|_| Rx {
rx,_hangup: hangup_tx,})
}複製程式碼
接著看Daemon::poll
:
fn poll(&mut self) -> Poll<Self::Item,Self::Error> {
loop {
// 遍歷state成員狀態
self.state = match self.state {
// 未連線時
State::Disconnected => {
match self.service.poll_ready() {
Ok(Async::NotReady) => return Ok(Async::NotReady),Ok(Async::Ready(())) => {}
Err(err) => {
error!(
"profile service unexpected error (dst = {}): {:?}",self.dst,err,);
return Ok(Async::Ready(()));
}
};
// 構造grpc請求
let req = api::GetDestination {
scheme: "k8s".to_owned(),path: self.dst.clone(),};
debug!("getting profile: {:?}",req);
// 獲取請求任務
let rspf = self.service.get_profile(grpc::Request::new(req));
State::Waiting(rspf)
}
// 正在請求時,從請求中獲取回覆
State::Waiting(ref mut f) => match f.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),// 正常回復
Ok(Async::Ready(rsp)) => {
trace!("response received");
// 流式回覆
State::Streaming(rsp.into_inner())
}
Err(e) => {
warn!("error fetching profile for {}: {:?}",e);
State::Backoff(Delay::new(clock::now() + self.backoff))
}
},// 接收回復
State::Streaming(ref mut s) => {
// 處理回覆流
// 注意此處,引數1是get_profile請求的回覆流,
// 引數2是之前建立的通道傳送端
match Self::proxy_stream(s,&mut self.tx,&mut self.hangup) {
Async::NotReady => return Ok(Async::NotReady),Async::Ready(StreamState::SendLost) => return Ok(().into()),Async::Ready(StreamState::RecvDone) => {
State::Backoff(Delay::new(clock::now() + self.backoff))
}
}
}
// 異常,結束請求
State::Backoff(ref mut f) => match f.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),Err(_) | Ok(Async::Ready(())) => State::Disconnected,},};
}
}複製程式碼
接著 proxy_stream
:
fn proxy_stream(
rx: &mut grpc::Streaming<api::DestinationProfile,T::ResponseBody>,tx: &mut mpsc::Sender<profiles::Routes>,hangup: &mut oneshot::Receiver<Never>,) -> Async<StreamState> {
loop {
// 傳送端是否就緒
match tx.poll_ready() {
Ok(Async::NotReady) => return Async::NotReady,Ok(Async::Ready(())) => {}
Err(_) => return StreamState::SendLost.into(),}
// 從grpc stream中取得一條資料
match rx.poll() {
Ok(Async::NotReady) => match hangup.poll() {
Ok(Async::Ready(never)) => match never {},// unreachable!
Ok(Async::NotReady) => {
// We are now scheduled to be notified if the hangup tx
// is dropped.
return Async::NotReady;
}
Err(_) => {
// Hangup tx has been dropped.
debug!("profile stream cancelled");
return StreamState::SendLost.into();
}
},Ok(Async::Ready(None)) => return StreamState::RecvDone.into(),// 正確取得profile結構
Ok(Async::Ready(Some(profile))) => {
debug!("profile received: {:?}",profile);
// 解析資料
let retry_budget = profile.retry_budget.and_then(convert_retry_budget);
let routes = profile
.routes
.into_iter()
.filter_map(move |orig| convert_route(orig,retry_budget.as_ref()))
.collect();
let dst_overrides = profile
.dst_overrides
.into_iter()
.filter_map(convert_dst_override)
.collect();
// 構造profiles::Routes結構並推到傳送端
match tx.start_send(profiles::Routes {
routes,dst_overrides,}) {
Ok(AsyncSink::Ready) => {} // continue
Ok(AsyncSink::NotReady(_)) => {
info!("dropping profile update due to a full buffer");
// This must have been because another task stole
// our tx slot? It seems pretty unlikely,but possible?
return Async::NotReady;
}
Err(_) => {
return StreamState::SendLost.into();
}
}
}
Err(e) => {
warn!("profile stream failed: {:?}",e);
return StreamState::RecvDone.into();
}
}
}
}複製程式碼
回到MakeSvc::call
方法,前面建立的route_stream
會被用於建立一個linkerd2_proxy::proxy::http::profiles::router::Service
任務物件,並在其poll_ready
方法中通過poll_route_stream
從route_steam
獲取profiles::Routes
並呼叫update_routes
建立具體可用的路由規則linkerd2_router::Router
,至此,路由規則已建好,就等具體的請求過來然後在call
中呼叫linkerd2_router::call
進行對請求的路由判斷。
圖示
profile
總結
proxy採用的tower框架,每個處理邏輯都是其中的一個layer,開發時只需層層堆疊即可。不過,也正因如此,各層之間的介面都極其相似,須得小心不可調錯。 對於destination這部分邏輯,linkerd2的destination元件收到來自proxy的grpc請求後,每當endpoint或service profile有任何變動,都會立即通過stream傳送過去,proxy收到後根據endpoint調整負載均衡策略,根據service profile調整路由,然後通過它們來處理使用者服務的實際請求。
關於 ServiceMesher 社群
ServiceMesher 社群是由一群擁有相同價值觀和理念的志願者們共同發起,於 2018 年 4 月正式成立。
社群關注領域有:容器、微服務、Service Mesh、Serverless,擁抱開源和雲原生,致力於推動 Service Mesh 在中國的蓬勃發展。
社群官網:https://www.servicemesher.com