[譯]Golang應付百萬級請求/分鐘
我在不同公司從事反爬蟲、反病毒、反惡意程式已經有15年了,我知道,由於每天需要處理和應對的大量資料,這些系統最終會因此變得十分複雜。
目前我是smsjunk.com的CEO以及KnowBe4的首席架構師,兩家公司都是活躍與網路安全行業。
有趣的是在過去10年作為一名軟體工程師,幾乎所有我參與的後端開發專案裡面都是用Ruby on Rails來完成的。可是你不要誤會,我熱愛Ruby on Rails並且我認為它是一個非常出色的開發環境,但當你用ruby的思路在設計和開發系統一段時間以後,你往往會忘記,其實你還可以利用多執行緒,並行化,高速執行以及更小的記憶體開銷來開發系統。我是一名c/c++,Delphi以及c#的開發人員已經很多年了,然後我開始慢慢意識到,使用合適的工具讓系統變得更加簡單明瞭才是一件正確的事情。
程式設計界對於程式語言以及框架的爭論從未停歇,而我並不想參與到其中去。我相信效率高低,生產力大小以及程式碼的可維護性很大一部分取決於你所設計的架構是否足夠簡單。
要解決的問題
當我們開發一個匿名遙測以及資料分析系統的時候,其中一個需求是能夠處理和應付百萬數量級的POST請求,網路請求處理器會接收一個POST過來JSON,這個JSON裡面會包含許多需要寫入到Amazon S3的資料集合,以便我們的map-reduce系統可以在後續來處理這些資料。
一般情況下我們會考慮構建一個worker分層的結構,並且利用一些中介軟體,例如:
- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
- 等等..
然後設立兩個不同的叢集,一個是給web客戶端,另一個是給worker,然後我們可以將worker擴容到我們處理業務時所需要的數量。
但在最開始的時候,我們的團隊就意識到可以用Go來實現所有這些,因為在討論期間我們認為這將會是一個非常高訪問量的系統。我利用Go來開發也已經有兩年了,用它來開發過一些系統,但是負載規模遠沒有此次的需求這麼大。
我們先定義一些struct來規定我們POST接收的請求體,以及定義一個上傳到S3 bucket的方法UploadToS3
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v",p.storageFolder,time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path,b,int64(b.Len()),contentType,acl,s3.Options{})
}
複製程式碼
幼稚地使用Go runtines
最開始的時候我們非常天真地實現一個POST的鉤子方法如下,只是簡單地將每個請求體的上傳動作放到Go rutinues中讓他們並行執行:
func payloadHandler(w http.ResponseWriter,r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body,MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type","application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _,payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
複製程式碼
在中等規模的負載情況下,這種方法對大部分人都是沒有問題的,但在應對更大規模的請求量時候,我們很快就招架不住了。當我們把這個版本的程式碼部署到生產環境以後,我們期待能有大量的請求進來但實際還不能達到百萬級別的數量級。我們完全低估了這個系統要處理的流量數。
但不管怎麼說上面的方法都是欠妥的。因為它沒有任何方法讓我們去控制Go runtinues啟動的數量。所以當我們的系統在面對每分鐘百萬級POST請求的時候很快就垮掉了。
再戰
我們需要找到另外的方法。在一開始我們就在討論如何讓我們的請求處理程式的生命週期儘可能地縮短以及上傳到S3的操作能在後臺或者非同步執行。當然,在Ruby on Rails裡面你必須這麼做,否則你將會阻塞到所有其他的網路請求處理程式。無論您使用的是美洲獅,獨角獸還是過路人(請不要參與JRuby討論)。然後我們想到使用訊息佇列這種比較常見的方法來處理來達到我們的目的,例如Resque,Sidekiq,SQS等等,還有數不清的工具因為實在有太多方法來實現這個功能。
所以在第二次迭代的時候,我們需要建立一個緩衝佇列,我們會將任務放入佇列裡面然後再一個個地上傳到S3上,但由於我們希望達到能夠控制這個佇列的最大容量的目的,並且我們有足夠的RAM來允許我們將請求體儲存到記憶體當中,所以我們認為直接使用了Go提供的channel,然後將我們的請求直接入隊到channel中處理就可以了。
var Queue chan Payload
func init() {
Queue = make(chan Payload,MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter,r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _,payload := range content.Payloads {
Queue <- payload
}
...
}
複製程式碼
我們會從channel中獲取任務並且執行他們的上傳操作
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
複製程式碼
但說句老實話,我並不知道這是在幹嘛。肯定是因為那時已經太晚還有我們已經喝了太多的紅牛。??
這個改動並沒有讓我們的困境得到任何改善,我們將併發任務放到了佇列中執行僅僅是看上去好像解決了問題。但是我們的非同步程式一次只會上傳一個請求體到S3上面,但是我們的請求數此時遠遠大於我們上傳到S3的數量,可想而知我們的緩衝佇列很快就到達了他的極限爆滿了,然後它阻擋了其他網路請求的入隊操作。
相當於我們僅僅迴避了問題,並且讓我們的系統的崩潰時間進入了倒數。我們這個缺陷的版本釋出以後,整個系統的延遲率在持續性地每分鐘在上漲。
更加好的解決辦法
我們決定採用協同的方式來改進我們的Go channel,通過建立一個帶有2個的channel處理系統,一個用於將請求體入隊,另一個是負責控制worker
在JobQueue
中併發執行時的數量。
這個想法的核心是以一個相對穩定的頻率去並行上傳資料到S3,這樣的話既不會把我們的伺服器弄垮,也不會因為連線過多造成很多S3的連線錯誤。所以我們開始著手於Job/Worker模式。這個對於熟悉Java,c#開發 來說並不陌生,你可以理解為這是Go利用channel來實現worker執行緒池的方法。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,JobChannel: make(chan Job),quit: make(chan bool)}
}
// Start method starts the run loop for the worker,listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s",err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
複製程式碼
接下來修改我們網路請求的鉤子函式,負責建立一個Job的結構體的例項然後將其放入JobQueue channel中等待worker來獲取執行。
func payloadHandler(w http.ResponseWriter,r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body,MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type","application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _,payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
複製程式碼
在我們網路服務初始化的時候建立一個Dispather
並且呼叫Run()
建立一個裝有一定數量worker的執行緒池,用來接收和處理來自JobQueue
的Job
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
複製程式碼
下面是我們Dispather
的實現
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job,maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
複製程式碼
注意我們限制了worker線上程池的最大數量。我們的應用執行在一個docker化的Go環境中,部署在Amazon的Elasticbeanstalk上,並且儘量遵循12要素原則來配置我們的生產環境,在環境變數中獲取對應的引數值,這樣我們就可以控制worker的數量以及JobQueue
的最大容量通過直接修改對應的值而不需要重新去部署我們的應用。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
複製程式碼
當我們將這個版本釋出到生產環境以後我們的延遲率馬上有明顯的下降,我們處理請求的能力有一個質的飛躍。
在一分鐘以後等我們的負載均衡器完全啟動起來以後,可以看到ElasticBeanstalk上伺服器接收的請求數將近一百萬次每分鐘。通常我們早上都有幾個小時的流量高峰期,那時甚至會超過百萬請求次數每分鐘。而且當我們釋出完新程式碼以後伺服器的數量就從100臺下降到並穩定在了20臺。
當給叢集加上合適的配置以及設定自動伸縮以後,甚至可以降到僅僅用4臺c4.Large的EC2例項來處理日常業務。並且叢集會自動增加新的例項當CPU使用率持續5分鐘達到90%時。
總結
簡潔化設計永遠是我所追求的東西。我們可以設計一個複雜的系統用很多的佇列,後臺執行worker,複雜的部署等等,但取而代之我們決定利用Elasticbeanstalk強大的自動伸縮功能以及Go所提供開箱即用的併發特性。
總會有一個工具適合你的工作,在有的時候當你Ruby on Rails系統需要一個強大的網路請求處理功能的時候,可以試著考慮一下除了ruby生態圈以外的更加強大和簡潔的替代方案。
在你走之前
如果你能關注一下我的Twittwer並且分享給身邊的朋友的話,我會非常感謝的!我的Twitter是 twitter.com/mcastilho