1. 程式人生 > 程式設計 >Linkerd2 proxy destination 學習筆記

Linkerd2 proxy destination 學習筆記

作者: 嘩啦啦 mesh團隊,熱衷於kubernetes、devops、apollo、istio、linkerd、openstack、calico 等領域技術。

linkerd2介紹

Linkerd由控制平面資料平面組成:

  • 控制平面是在所屬的Kubernetes名稱空間(linkerd預設情況下)中執行的一組服務,這些服務可以完成匯聚遙測資料,提供面向使用者的API,並向資料平面代理提供控制資料等,它們共同驅動資料平面。
  • 資料平面用Rust編寫的輕量級代理,該代理安裝在服務的每個pod中,併成為資料平面的一部分,它接收Pod的所有接入流量,並通過initContainer配置iptables
    正確轉發流量的攔截所有傳出流量,因為它是附加工具,並且攔截服務的所有傳入和傳出流量,所以不需要更改程式碼,甚至可以將其新增到正在執行的服務中。

借用官方的圖:

proxy-destination

proxy由rust開發完成,其內部的非同步執行時採用了Tokio框架,服務元件用到了tower

本文主要關注proxy與destination元件互動相關的整體邏輯,分析proxy內部的執行邏輯。

流程分析

初始化

proxy啟動後:

  1. app::init初始化配置
  2. app::Main::new建立主邏輯main
  3. main.run_until內新加一任務 ProxyParts::build_proxy_task

ProxyParts::build_proxy_task中會進行一系列的初始化工作,此處只關注dst_svc,其建立程式碼為:

    dst_svc = svc::stack(connect::svc(keepalive))
                .push(tls::client::layer(local_identity.clone()))
                .push_timeout(config.control_connect_timeout)
                .push(control::client::layer())
                .push(control::resolve::layer(dns_resolver.clone()))
                .push(reconnect::layer({
                    let backoff = config.control_backoff.clone();
                    move |_| Ok(backoff.stream())
                }))
                .push(http_metrics::layer::<_,classify::Response>(
                    ctl_http_metrics.clone(),))
                .push(proxy::grpc::req_body_as_payload::layer().per_make())
                .push(control::add_origin::layer())
                .push_buffer_pending(
                    config.destination_buffer_capacity,config.control_dispatch_timeout,)
                .into_inner()
                .make(config.destination_addr.clone())複製程式碼

dst_svc一共有2處引用,一是crate::resolve::Resolver的建立會涉及;另一個就是ProfilesClient的建立。

Resolver

  1. api_resolve::Resolve::new(dst_svc.clone())建立resolver物件
  2. 呼叫outbound::resolve建立 map_endpoint::Resolve型別物件,並當做引數resolve傳入outbound::spawn函式開啟出口執行緒

outbound::spawn中,resolve被用於建立負載均衡控制層,並用於後續路由控制:

let balancer_layer = svc::layers()
        .push_spawn_ready()
        .push(discover::Layer::new(
            DISCOVER_UPDATE_BUFFER_CAPACITY,resolve,))
        .push(balance::layer(EWMA_DEFAULT_RTT,EWMA_DECAY));複製程式碼

discover::Layer::layer中:

let from_resolve = FromResolve::new(self.resolve.clone());
let make_discover = MakeEndpoint::new(make_endpoint,from_resolve);
Buffer::new(self.capacity,make_discover)複製程式碼

Profiles

  1. ProfilesClient::new中呼叫api::client::Destination::new(dst_svc)建立grpc的client端並存於成員變數service
  2. 接著profiles_client物件會被用於inboundoutbound的建立(省略無關程式碼):
    let dst_stack = svc::stack(...)...
        .push(profiles::router::layer(
            profile_suffixes,profiles_client,dst_route_stack,))
        ...複製程式碼

其中profiles::router::layer會建立一個Layer物件,並將profiles_client賦予get_routes成員。然後在service方法中,會調到Layer::layer方法,裡面會建立一個MakeSvc物件,其get_routes成員的值即為profiles_client

執行

新的連線過來時,從listen拿到連線物件後,會交給linkerd_proxy::transport::tls::accept::AcceptTlscall,然後是linkerd2_proxy::proxy::server::Servercall,並最終分別呼叫linkerd2_proxy_http::balance::MakeSvc::calllinkerd2_proxy_http::profiles::router::MakeSvc::call方法。

balance

linkerd2_proxy_http::balance::MakeSvc::call中:

  1. 呼叫inner.call(target),此處的inner即是前面Buffer::new的結果。
  2. 生成一個新的linkerd2_proxy_http::balance::MakeSvc物件,當做Future返回

先看inner.call。它內部經過層層呼叫,依次觸發BufferMakeEndpointFromResolve等結構的call方法,最終會觸發最開始建立的resolve.resolve(target),其內部呼叫api_resolve::Resolve::call

api_resolve::Resolve::call中:

    fn call(&mut self,target: T) -> Self::Future {
        let path = target.to_string();
        trace!("resolve {:?}",path);
        self.service
            // GRPC請求,獲取k8s的endpoint
            .get(grpc::Request::new(api::GetDestination {
                path,scheme: self.scheme.clone(),context_token: self.context_token.clone(),}))
            .map(|rsp| {
                debug!(metadata = ?rsp.metadata());
                // 拿到結果stream
                Resolution {
                    inner: rsp.into_inner(),}
            })
    }複製程式碼

將返回的Resolution再次放入MakeSvc中,然後看其poll:

    fn poll(&mut self) -> Poll<Self::Item,Self::Error> {
        // 這個poll會依次呼叫:
        //    linkerd2_proxy_api_resolve::resolve::Resolution::poll
        //    linkerd2_proxy_discover::from_resolve::DiscoverFuture::poll
        //    linkerd2_proxy_discover::make_endpoint::DiscoverFuture::poll
        // 最終獲得Poll<Change<SocketAddr,Endpoint>> 
        let discover = try_ready!(self.inner.poll());
        let instrument = PendingUntilFirstData::default();
        let loaded = PeakEwmaDiscover::new(discover,self.default_rtt,self.decay,instrument);
        let balance = Balance::new(loaded,self.rng.clone());
        Ok(Async::Ready(balance))
    }複製程式碼

最終返回service Balance

當具體請求過來後,先會判斷Balance::poll_ready

    fn poll_ready(&mut self) -> Poll<(),Self::Error> {
        // 獲取Update<Endpoint>
        // 將Remove的從self.ready_services中刪掉
        // 將Insert的構造UnreadyService結構加到self.unready_services
        self.poll_discover()?;
        // 對UnreadyService,呼叫其poll,內部會呼叫到svc的poll_ready判斷endpoint是否可用
        // 可用時,將其加入self.ready_services
        self.poll_unready();
        
        loop {
            if let Some(index) = self.next_ready_index {
                // 找到對應的endpoint,可用則返回
                if let Ok(Async::Ready(())) = self.poll_ready_index_or_evict(index) {
                    return Ok(Async::Ready(()));
                }
            }
            // 選擇負載比較低的endpoint
            self.next_ready_index = self.p2c_next_ready_index();
            if self.next_ready_index.is_none() {
                // 
                return Ok(Async::NotReady);
            }
        }
    }複製程式碼

就緒後,對請求req呼叫call

    fn call(&mut self,request: Req) -> Self::Future {
        // 找到下一個可用的svc,並將其從ready_services中刪除
        let index = self.next_ready_index.take().expect("not ready");
        let (key,mut svc) = self
            .ready_services
            .swap_remove_index(index)
            .expect("invalid ready index");

        // 將請求轉過去
        let fut = svc.call(request);
        // 加到unready
        self.push_unready(key,svc);

        fut.map_err(Into::into)
    }複製程式碼

profiles

linkerd2_proxy_http::profiles::router::MakeSvc::call中:

        // Initiate a stream to get route and dst_override updates for this
        // destination.
        let route_stream = match target.get_destination() {
            Some(ref dst) => {
                if self.suffixes.iter().any(|s| s.contains(dst.name())) {
                    debug!("fetching routes for {:?}",dst);
                    self.get_routes.get_routes(&dst)
                } else {
                    debug!("skipping route discovery for dst={:?}",dst);
                    None
                }
            }
            None => {
                debug!("no destination for routes");
                None
            }
        };複製程式碼

經過若干判斷後,會呼叫ProfilesClient::get_routes並將結果存於route_stream

進入get_routes

    fn get_routes(&self,dst: &NameAddr) -> Option<Self::Stream> {
        // 建立通道
        let (tx,rx) = mpsc::channel(1);
        // This oneshot allows the daemon to be notified when the Self::Stream
        // is dropped.
        let (hangup_tx,hangup_rx) = oneshot::channel();
        // 建立Daemon物件(Future任務)
        let daemon = Daemon {
            tx,hangup: hangup_rx,dst: format!("{}",dst),state: State::Disconnected,service: self.service.clone(),backoff: self.backoff,};
        // 呼叫Daemon::poll
        let spawn = DefaultExecutor::current().spawn(Box::new(daemon.map_err(|_| ())));
        // 將通道接收端傳出
        spawn.ok().map(|_| Rx {
            rx,_hangup: hangup_tx,})
    }複製程式碼

接著看Daemon::poll

    fn poll(&mut self) -> Poll<Self::Item,Self::Error> {
        loop {
            // 遍歷state成員狀態
            self.state = match self.state {
                // 未連線時
                State::Disconnected => {
                    match self.service.poll_ready() {
                        Ok(Async::NotReady) => return Ok(Async::NotReady),Ok(Async::Ready(())) => {}
                        Err(err) => {
                            error!(
                                "profile service unexpected error (dst = {}): {:?}",self.dst,err,);
                            return Ok(Async::Ready(()));
                        }
                    };
                    // 構造grpc請求
                    let req = api::GetDestination {
                        scheme: "k8s".to_owned(),path: self.dst.clone(),};
                    debug!("getting profile: {:?}",req);
                    // 獲取請求任務
                    let rspf = self.service.get_profile(grpc::Request::new(req));
                    State::Waiting(rspf)
                }
                // 正在請求時,從請求中獲取回覆
                State::Waiting(ref mut f) => match f.poll() {
                    Ok(Async::NotReady) => return Ok(Async::NotReady),// 正常回復
                    Ok(Async::Ready(rsp)) => {
                        trace!("response received");
                        // 流式回覆
                        State::Streaming(rsp.into_inner())
                    }
                    Err(e) => {
                        warn!("error fetching profile for {}: {:?}",e);
                        State::Backoff(Delay::new(clock::now() + self.backoff))
                    }
                },// 接收回復
                State::Streaming(ref mut s) => {
                    // 處理回覆流
                    // 注意此處,引數1是get_profile請求的回覆流,
                    //   引數2是之前建立的通道傳送端
                    match Self::proxy_stream(s,&mut self.tx,&mut self.hangup) {
                        Async::NotReady => return Ok(Async::NotReady),Async::Ready(StreamState::SendLost) => return Ok(().into()),Async::Ready(StreamState::RecvDone) => {
                            State::Backoff(Delay::new(clock::now() + self.backoff))
                        }
                    }
                }
                // 異常,結束請求
                State::Backoff(ref mut f) => match f.poll() {
                    Ok(Async::NotReady) => return Ok(Async::NotReady),Err(_) | Ok(Async::Ready(())) => State::Disconnected,},};
        }
    }複製程式碼

接著 proxy_stream

    fn proxy_stream(
        rx: &mut grpc::Streaming<api::DestinationProfile,T::ResponseBody>,tx: &mut mpsc::Sender<profiles::Routes>,hangup: &mut oneshot::Receiver<Never>,) -> Async<StreamState> {
        loop {
            // 傳送端是否就緒
            match tx.poll_ready() {
                Ok(Async::NotReady) => return Async::NotReady,Ok(Async::Ready(())) => {}
                Err(_) => return StreamState::SendLost.into(),}

            // 從grpc stream中取得一條資料
            match rx.poll() {
                Ok(Async::NotReady) => match hangup.poll() {
                    Ok(Async::Ready(never)) => match never {},// unreachable!
                    Ok(Async::NotReady) => {
                        // We are now scheduled to be notified if the hangup tx
                        // is dropped.
                        return Async::NotReady;
                    }
                    Err(_) => {
                        // Hangup tx has been dropped.
                        debug!("profile stream cancelled");
                        return StreamState::SendLost.into();
                    }
                },Ok(Async::Ready(None)) => return StreamState::RecvDone.into(),// 正確取得profile結構
                Ok(Async::Ready(Some(profile))) => {
                    debug!("profile received: {:?}",profile);
                    // 解析資料
                    let retry_budget = profile.retry_budget.and_then(convert_retry_budget);
                    let routes = profile
                        .routes
                        .into_iter()
                        .filter_map(move |orig| convert_route(orig,retry_budget.as_ref()))
                        .collect();
                    let dst_overrides = profile
                        .dst_overrides
                        .into_iter()
                        .filter_map(convert_dst_override)
                        .collect();
                    // 構造profiles::Routes結構並推到傳送端
                    match tx.start_send(profiles::Routes {
                        routes,dst_overrides,}) {
                        Ok(AsyncSink::Ready) => {} // continue
                        Ok(AsyncSink::NotReady(_)) => {
                            info!("dropping profile update due to a full buffer");
                            // This must have been because another task stole
                            // our tx slot? It seems pretty unlikely,but possible?
                            return Async::NotReady;
                        }
                        Err(_) => {
                            return StreamState::SendLost.into();
                        }
                    }
                }
                Err(e) => {
                    warn!("profile stream failed: {:?}",e);
                    return StreamState::RecvDone.into();
                }
            }
        }
    }複製程式碼

回到MakeSvc::call方法,前面建立的route_stream會被用於建立一個linkerd2_proxy::proxy::http::profiles::router::Service任務物件,並在其poll_ready方法中通過poll_route_streamroute_steam獲取profiles::Routes並呼叫update_routes建立具體可用的路由規則linkerd2_router::Router,至此,路由規則已建好,就等具體的請求過來然後在call中呼叫linkerd2_router::call進行對請求的路由判斷。

圖示

profile

proxy-destination

總結

proxy採用的tower框架,每個處理邏輯都是其中的一個layer,開發時只需層層堆疊即可。不過,也正因如此,各層之間的介面都極其相似,須得小心不可調錯。 對於destination這部分邏輯,linkerd2的destination元件收到來自proxy的grpc請求後,每當endpoint或service profile有任何變動,都會立即通過stream傳送過去,proxy收到後根據endpoint調整負載均衡策略,根據service profile調整路由,然後通過它們來處理使用者服務的實際請求。

關於 ServiceMesher 社群

ServiceMesher 社群是由一群擁有相同價值觀和理念的志願者們共同發起,於 2018 年 4 月正式成立。

社群關注領域有:容器、微服務、Service Mesh、Serverless,擁抱開源和雲原生,致力於推動 Service Mesh 在中國的蓬勃發展。

社群官網:https://www.servicemesher.com

ServiceMesher 社群