kube-proxy原始碼分析
kube-proxy原始碼解析
ipvs相對於iptables模式具備較高的效能與穩定性, 本文講以此模式的原始碼解析為主,如果想去了解iptables模式的原理,可以去參考其實現,架構上無差別。
kube-proxy主要功能是監聽service和endpoint的事件,然後下放代理策略到機器上。 底層呼叫docker/libnetwork, 而libnetwork最終呼叫了netlink 與netns來實現ipvs的建立等動作
<!--more-->
初始化配置
程式碼入口:cmd/kube-proxy/app/server.go
Run() 函式
通過命令列引數去初始化proxyServer的配置
proxyServer, err := NewProxyServer(o)
type ProxyServer struct { // k8s client Client clientset.Interface EventClient v1core.EventsGetter // ipvs 相關介面 IptInterface utiliptables.Interface IpvsInterface utilipvs.Interface IpsetInterface utilipset.Interface // 處理同步時的處理器 Proxier proxy.ProxyProvider // 代理模式,ipvs iptables userspace kernelspace(windows)四種 ProxyMode string // 配置同步週期 ConfigSyncPeriod time.Duration // service 與 endpoint 事件處理器 ServiceEventHandler config.ServiceHandler EndpointsEventHandler config.EndpointsHandler }
Proxier是主要入口,抽象了兩個函式:
type ProxyProvider interface {
// Sync immediately synchronizes the ProxyProvider's current state to iptables.
Sync()
// 定期執行
SyncLoop()
}
ipvs 的interface 這個很重要:
type Interface interface { // 刪除所有規則 Flush() error // 增加一個virtual server AddVirtualServer(*VirtualServer) error UpdateVirtualServer(*VirtualServer) error DeleteVirtualServer(*VirtualServer) error GetVirtualServer(*VirtualServer) (*VirtualServer, error) GetVirtualServers() ([]*VirtualServer, error) // 給virtual server加個realserver, 如 VirtualServer就是一個clusterip realServer就是pod(或者自定義的endpoint) AddRealServer(*VirtualServer, *RealServer) error GetRealServers(*VirtualServer) ([]*RealServer, error) DeleteRealServer(*VirtualServer, *RealServer) error }
我們在下文再詳細看ipvs_linux是如何實現上面介面的
virtual server與realserver, 最重要的是ip:port,然後就是一些代理的模式如sessionAffinity等:
type VirtualServer struct {
Address net.IP
Protocol string
Port uint16
Scheduler string
Flags ServiceFlags
Timeout uint32
}
type RealServer struct {
Address net.IP
Port uint16
Weight int
}
建立apiserver client
client, eventClient, err := createClients(config.ClientConnection, master)
建立Proxier 這是僅僅關注ipvs模式的proxier
else if proxyMode == proxyModeIPVS {
glog.V(0).Info("Using ipvs Proxier.")
proxierIPVS, err := ipvs.NewProxier(
iptInterface,
ipvsInterface,
ipsetInterface,
utilsysctl.New(),
execer,
config.IPVS.SyncPeriod.Duration,
config.IPVS.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
config.ClusterCIDR,
hostname,
getNodeIP(client, hostname),
recorder,
healthzServer,
config.IPVS.Scheduler,
)
...
proxier = proxierIPVS
serviceEventHandler = proxierIPVS
endpointsEventHandler = proxierIPVS
這個Proxier具備以下方法:
+OnEndpointsAdd(endpoints *api.Endpoints)
+OnEndpointsDelete(endpoints *api.Endpoints)
+OnEndpointsSynced()
+OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
+OnServiceAdd(service *api.Service)
+OnServiceDelete(service *api.Service)
+OnServiceSynced()
+OnServiceUpdate(oldService, service *api.Service)
+Sync()
+SyncLoop()
所以ipvs的這個Proxier實現了我們需要的絕大部分介面
小結一下:
+-----------> endpointHandler
|
+-----------> serviceHandler
| ^
| | +-------------> sync 定期同步等
| | |
ProxyServer---------> Proxier --------> service 事件回撥
| |
| +-------------> endpoint事件回撥
| | 觸發
+-----> ipvs interface ipvs handler <-----+
啟動proxyServer
- 檢查是不是帶了clean up引數,如果帶了那麼清除所有規則退出
- OOM adjuster貌似沒實現,忽略
- resouceContainer也沒實現,忽略
- 啟動metrics伺服器,這個挺重要,比如我們想監控時可以傳入這個引數, 包含promethus的 metrics. metrics-bind-address引數
- 啟動informer, 開始監聽事件,分別啟動協程處理。
1 2 3 4我們都不用太關注,細看5即可:
informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)
serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
// 註冊 service handler並啟動
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
// 這裡面僅僅是把ServiceEventHandler賦值給informer回撥
go serviceConfig.Run(wait.NeverStop)
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
// 註冊endpoint
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)
go informerFactory.Start(wait.NeverStop)
serviceConfig.Run與endpointConfig.Run僅僅是給回撥函式賦值, 所以註冊的handler就給了informer, informer監聽到事件時就會回撥:
for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnServiceSynced()")
c.eventHandlers[i].OnServiceSynced()
}
那麼問題來了,註冊進去的這個handler是啥? 回顧一下上文的
serviceEventHandler = proxierIPVS
endpointsEventHandler = proxierIPVS
所以都是這個proxierIPVS
handler的回撥函式, informer會回撥這幾個函式,所以我們在自己開發時實現這個interface註冊進去即可:
type ServiceHandler interface {
// OnServiceAdd is called whenever creation of new service object
// is observed.
OnServiceAdd(service *api.Service)
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
OnServiceUpdate(oldService, service *api.Service)
// OnServiceDelete is called whenever deletion of an existing service
// object is observed.
OnServiceDelete(service *api.Service)
// OnServiceSynced is called once all the initial even handlers were
// called and the state is fully propagated to local cache.
OnServiceSynced()
}
開始監聽
go informerFactory.Start(wait.NeverStop)
這裡執行後,我們建立刪除service endpoint等動作都會被監聽到,然後回撥,回顧一下上面的圖,最終都是由Proxier去實現,所以後面我們重點關注Proxier即可
s.Proxier.SyncLoop()
然後開始SyncLoop,下文開講
Proxier 實現
我們建立一個service時OnServiceAdd方法會被呼叫, 這裡記錄一下之前的狀態與當前狀態兩個東西,然後發個訊號給syncRunner讓它去處理:
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
記錄service 資訊,可以看到沒做什麼事,就是把service存在map裡, 如果沒變直接刪掉map資訊不做任何處理:
change, exists := scm.items[*namespacedName]
if !exists {
change = &serviceChange{}
// 老的service資訊
change.previous = serviceToServiceMap(previous)
scm.items[*namespacedName] = change
}
// 當前監聽到的service資訊
change.current = serviceToServiceMap(current)
如果一樣,直接刪除
if reflect.DeepEqual(change.previous, change.current) {
delete(scm.items, *namespacedName)
}
proxier.syncRunner.Run() 裡面就傳送了一個訊號
select {
case bfr.run <- struct{}{}:
default:
}
這裡面處理了這個訊號
s.Proxier.SyncLoop()
func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}
proxier.syncRunner.Loop(wait.NeverStop)
}
runner裡收到訊號執行,沒收到訊號會定期執行:
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
glog.V(3).Infof("%s Loop running", bfr.name)
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
glog.V(3).Infof("%s Loop stopping", bfr.name)
return
case <-bfr.timer.C(): // 定期執行
bfr.tryRun()
case <-bfr.run:
bfr.tryRun() // 收到事件訊號執行
}
}
}
這個bfr runner裡我們最需要主意的是一個回撥函式,tryRun裡檢查這個回撥是否滿足被排程的條件:
type BoundedFrequencyRunner struct {
name string // the name of this instance
minInterval time.Duration // the min time between runs, modulo bursts
maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run, 這個回撥
lastRun time.Time // time of last run
timer timer // timer for deferred runs
limiter rateLimiter // rate limiter for on-demand runs
}
// 傳入的proxier.syncProxyRules這個函式
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
這是個600行左右的搓逼函式,也是處理主要邏輯的地方。
syncProxyRules
- 設定一些iptables規則,如mark與comment
- 確定機器上有網絡卡,ipvs需要繫結地址到上面
- 確定有ipset,ipset是iptables的擴充套件,可以給一批地址設定iptables規則 ...(又臭又長,重複程式碼多,看不下去了,細節問題自己去看吧)
- 我們最關注的,如何去處理VirtualServer的
serv := &utilipvs.VirtualServer{
Address: net.ParseIP(ingress.IP),
Port: uint16(svcInfo.port),
Protocol: string(svcInfo.protocol),
Scheduler: proxier.ipvsScheduler,
}
if err := proxier.syncService(svcNameString, serv, false); err == nil {
if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
}
}
看下實現, 如果沒有就建立,如果已存在就更新, 給網絡卡繫結service的cluster ip:
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
if appliedVirtualServer == nil {
if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
return err
}
} else {
if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil {
return err
}
}
}
// bind service address to dummy interface even if service not changed,
// in case that service IP was removed by other processes
if bindAddr {
_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
if err != nil {
return err
}
}
return nil
}
建立service實現
現在可以去看ipvs的AddVirtualServer的實現了,主要是利用socket與核心程序通訊做到的。
pkg/util/ipvs/ipvs_linux.go
裡 runner結構體實現了這些方法, 這裡用到了 docker/libnetwork/ipvs庫:
// runner implements Interface.
type runner struct {
exec utilexec.Interface
ipvsHandle *ipvs.Handle
}
// New returns a new Interface which will call ipvs APIs.
func New(exec utilexec.Interface) Interface {
ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs
if err != nil {
glog.Errorf("IPVS interface can't be initialized, error: %v", err)
return nil
}
return &runner{
exec: exec,
ipvsHandle: ihandle,
}
}
New的時候建立了一個特殊的socket, 這裡與我們普通的socket程式設計無差別,關鍵是syscall.AF_NETLINK這個引數,代表與核心程序通訊:
sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC)
func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol)
if err != nil {
return nil, err
}
s := &NetlinkSocket{
fd: int32(fd),
}
s.lsa.Family = syscall.AF_NETLINK
if err := syscall.Bind(fd, &s.lsa); err != nil {
syscall.Close(fd)
return nil, err
}
return s, nil
}
建立一個service, 轉換成docker service格式,直接呼叫:
// AddVirtualServer is part of Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
eSvc, err := toBackendService(vs)
if err != nil {
return err
}
return runner.ipvsHandle.NewService(eSvc)
}
然後就是把service資訊打包,往socket裡面寫即可:
func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
req := newIPVSRequest(cmd)
req.Seq = atomic.AddUint32(&i.seq, 1)
if s == nil {
req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages
req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
} else {
req.AddData(fillService(s))
} // 把service塞到請求中
if d == nil {
if cmd == ipvsCmdGetDest {
req.Flags |= syscall.NLM_F_DUMP
}
} else {
req.AddData(fillDestinaton(d))
}
// 給核心程序傳送service資訊
res, err := execute(i.sock, req, 0)
if err != nil {
return [][]byte{}, err
}
return res, nil
}
構造請求
func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
return newGenlRequest(ipvsFamily, cmd)
}
在構造請求時傳入的是ipvs協議簇
然後構造一個與核心通訊的訊息頭
func NewNetlinkRequest(proto, flags int) *NetlinkRequest {
return &NetlinkRequest{
NlMsghdr: syscall.NlMsghdr{
Len: uint32(syscall.SizeofNlMsghdr),
Type: uint16(proto),
Flags: syscall.NLM_F_REQUEST | uint16(flags),
Seq: atomic.AddUint32(&nextSeqNr, 1),
},
}
}
給訊息加Data,這個Data是個陣列,需要實現兩個方法:
type NetlinkRequestData interface {
Len() int // 長度
Serialize() []byte // 序列化, 核心通訊也需要一定的資料格式,service資訊也需要實現
}
比如 header是這樣序列化的, 一看愣住了,思考好久才看懂: 拆下看: ([unsafe.Sizeof(hdr)]byte) 一個[]byte型別,長度就是結構體大小 (unsafe.Pointer(hdr))把結構體轉成byte指標型別 加個取它的值 用[:]轉成byte返回
func (hdr *genlMsgHdr) Serialize() []byte {
return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
}
傳送service資訊給核心
一個很普通的socket傳送接收資料
func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
var (
err error
)
if err := s.Send(req); err != nil {
return nil, err
}
pid, err := s.GetPid()
if err != nil {
return nil, err
}
var res [][]byte
done:
for {
msgs, err := s.Receive()
if err != nil {
return nil, err
}
for _, m := range msgs {
if m.Header.Seq != req.Seq {
continue
}
if m.Header.Pid != pid {
return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
}
if m.Header.Type == syscall.NLMSG_DONE {
break done
}
if m.Header.Type == syscall.NLMSG_ERROR {
error := int32(native.Uint32(m.Data[0:4]))
if error == 0 {
break done
}
return nil, syscall.Errno(-error)
}
if resType != 0 && m.Header.Type != resType {
continue
}
res = append(res, m.Data)
if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
break done
}
}
}
return res, nil
}
Service 資料打包 這裡比較細,核心思想就是核心只認一定格式的標準資料,我們把service資訊按其標準打包傳送給核心即可。 至於怎麼打包的就不詳細講了。
func fillService(s *Service) nl.NetlinkRequestData {
cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
if s.FWMark != 0 {
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
} else {
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))
// Port needs to be in network byte order.
portBuf := new(bytes.Buffer)
binary.Write(portBuf, binary.BigEndian, s.Port)
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
}
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
if s.PEName != "" {
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
}
f := &ipvsFlags{
flags: s.Flags,
mask: 0xFFFFFFFF,
}
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
return cmdAttr
}
總結
Service總體來講程式碼比較簡單,但是覺得有些地方實現的有點繞,不夠簡單直接。 總體來說就是監聽apiserver事件,然後比對 處理,定期也會去執行同步策略.
掃碼關注sealyun 探討