1. 程式人生 > >我們如何用Go來處理每分鐘100萬復雜請求的場景

我們如何用Go來處理每分鐘100萬復雜請求的場景

lec listen image 由於 開發 resp 復雜 cal itl

Malwarebytes 我們經歷了顯著的增長,自從我一年前加入了矽谷的公司,一個主要的職責成了設計架構和開發一些系統來支持一個快速增長的信息安全公司和所有需要的設施來支持一個每天百萬用戶使用的產品。我在反病毒和反惡意軟件行業的不同公司工作了12年,從而我知道由於我們每天處理大量的數據,這些系統是多麽復雜。

有趣的是,在過去的大約9年間,我參與的所有的web後端的開發通常是通過Ruby on Rails技術實現的。不要錯怪我。我喜歡Ruby on Rails,並且我相信它是個令人驚訝的環境。但是一段時間後,你會開始以ruby的方式開始思考和設計系統,你會忘記,如果你可以利用多線程、並行、快速執行和小內存開銷,軟件架構本來應該是多麽高效和簡單。很多年期間,我是一個c/c++、Delphi和c#開發者,我剛開始意識到使用正確的工具可以把復雜的事情變得簡單些。

作為首席架構師,我不會很關心在互聯網上的語言和框架戰爭。我相信效率、生產力。代碼可維護性主要依賴於你如何把解決方案設計得很簡單。

問題

當工作在我們的匿名遙測和分析系統中,我們的目標是可以處理來自於百萬級別的終端的大量的POST請求。web處理服務可以接收包含了很多payload的集合的JSON數據,這些數據需要寫入Amazon S3中。接下來,map-reduce系統可以操作這些數據。

按照習慣,我們會調研服務層級架構,涉及的軟件如下:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • and so on…

搭建了2個不同的集群,一個提供web前端,另外一個提供後端處理,這樣我們可以橫向擴展後端服務的數量。

但是,從剛開始,在 討論階段我們的團隊就知道我們應該使用Go,因為我們看到這會潛在性地成為一個非常龐大( large traffic)的系統。我已經使用了Go語言大約2年時間,我們開發了幾個系統,但是很少會達到這樣的負載(amount of load)。

我們開始創建一些結構,定義從POST調用得到的web請求負載,還有一個上傳到S3 budket的函數。

type PayloadCollection struct {
    WindowsVersion  string    `json:"version"`
    Token           string    `json:"token"`
    Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
    // the storageFolder method ensures that there are no name collision in
    // case we get same timestamp in the key name
    storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

    bucket := S3Bucket

    b := new(bytes.Buffer)
    encodeErr := json.NewEncoder(b).Encode(payload)
    if encodeErr != nil {
        return encodeErr
    }

    // Everything we post to the S3 bucket should be marked ‘private‘
    var acl = s3.Private
    var contentType = "application/octet-stream"

    return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

本地Go routines方法

剛開始,我們采用了一個非常本地化的POST處理實現,僅僅嘗試把發到簡單go routine的job並行化:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON‘T DO THIS
    }

    w.WriteHeader(http.StatusOK)
}

對於中小負載,這會對大多數的人適用,但是大規模下,這個方案會很快被證明不是很好用。我們期望的請求數,不在我們剛開始計劃的數量級,當我們把第一個版本部署到生產環境上。我們完全低估了流量。

上面的方案在很多地方很不好。沒有辦法控制我們產生的go routine的數量。由於我們收到了每分鐘1百萬的POST請求,這段代碼很快就崩潰了。

再次嘗試

我們需要找一個不同的方式。自開始我們就討論過, 我們需要保持請求處理程序的生命周期很短,並且進程在後臺產生。當然,這是你在Ruby on Rails的世界裏必須要做的事情,否則你會阻塞在所有可用的工作 web處理器上,不管你是使用puma、unicore還是passenger(我們不要討論JRuby這個話題)。然後我們需要利用常用的處理方案來做這些,比如Resque、 Sidekiq、 SQS等。這個列表會繼續保留,因為有很多的方案可以實現這些。

所以,第二次叠代,我們創建了一個緩沖channel,我們可以把job排隊,然後把它們上傳到S3。因為我們可以控制我們隊列中的item最大值,我們有大量的內存來排列job,我們認為只要把job在channel裏面緩沖就可以了。

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}

接下來,我們再從隊列中取job,然後處理它們。我們使用類似於下面的代碼:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD
        }
    }
}

說實話,我不知道我們在想什麽。這肯定是一個滿是Red-Bulls的夜晚。這個方法不會帶來什麽改善,我們用了一個 有缺陷的緩沖隊列並發,僅僅是把問題推遲了。我們的同步處理器同時僅僅會上傳一個數據到S3,因為來到的請求遠遠大於單核處理器上傳到S3的能力,我們的帶緩沖channel很快達到了它的極限,然後阻塞了請求處理邏輯的queue更多item的能力。

我們僅僅避免了問題,同時開始了我們的系統掛掉的倒計時。當部署了這個有缺陷的版本後,我們的延時保持在每分鐘以常量增長。

技術分享圖片

最好的解決方案

我們討論過在使用用Go channel時利用一種常用的模式,來創建一個二級channel系統,一個來queue job,另外一個來控制使用多少個worker來並發操作JobQueue。

想法是,以一個恒定速率並行上傳到S3,既不會導致機器崩潰也不好產生S3的連接錯誤。這樣我們選擇了創建一個Job/Worker模式。對於那些熟悉Java、C#等語言的開發者,可以把這種模式想象成利用channel以golang的方式來實現了一個worker線程池,作為一種替代。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
    Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

我們已經修改了我們的web請求handler,用payload創建一個Job實例,然後發到JobQueue channel,以便於worker來獲取。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // Read the body into a string for json decoding
    var content = &PayloadCollection{}
    err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
        w.Header().Set("Content-Type", "application/json; charset=UTF-8")
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let‘s create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

在web server初始化時,我們創建一個Dispatcher,然後調用Run()函數創建一個worker池子,然後開始監聽JobQueue中的job。

dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()

下面是dispatcher的實現代碼:

type Dispatcher struct {
    // A pool of workers channels that are registered with the dispatcher
    WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 0; i < d.maxWorkers; i++ {
        worker := NewWorker(d.pool)
        worker.Start()
    }

    go d.dispatch()
}

func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            // a job request has been received
            go func(job Job) {
                // try to obtain a worker job channel that is available.
                // this will block until a worker is idle
                jobChannel := <-d.WorkerPool

                // dispatch the job to the worker job channel
                jobChannel <- job
            }(job)
        }
    }
}

註意到,我們提供了初始化並加入到池子的worker的最大數量。因為這個工程我們利用了Amazon Elasticbeanstalk帶有的docker化的Go環境,所以我們常常會遵守12-factor方法論來配置我們的生成環境中的系統,我們從環境變了讀取這些值。這種方式,我們控制worker的數量和JobQueue的大小,所以我們可以很快的改變這些值,而不需要重新部署集群。

var (
    MaxWorker = os.Getenv("MAX_WORKERS")
    MaxQueue  = os.Getenv("MAX_QUEUE")
)

直接結果

我們部署了之後,立馬看到了延時降到微乎其微的數值,並未我們處理請求的能力提升很大。

技術分享圖片

Elastic Load Balancers完全啟動後,我們看到ElasticBeanstalk 應用服務於每分鐘1百萬請求。通常情況下在上午時間有幾個小時,流量峰值超過每分鐘一百萬次。

我們一旦部署了新的代碼,服務器的數量從100臺大幅 下降到大約20臺。

技術分享圖片

我們合理配置了我們的集群和自動均衡配置之後,我們可以把服務器的數量降至4x EC2 c4.Large實例,並且Elastic Auto-Scaling設置為如果CPU達到5分鐘的90%利用率,我們就會產生新的實例。

技術分享圖片

總結

在我的書中,簡單總是獲勝。我們可以使用多隊列、後臺worker、復雜的部署設計一個復雜的系統,但是我們決定利用Elasticbeanstalk 的auto-scaling的能力和Go語言開箱即用的特性簡化並發。

我們僅僅用了4臺機器,這並不是什麽新鮮事了。可能它們還不如我的MacBook能力強大,但是卻處理了每分鐘1百萬的寫入到S3的請求。

處理問題有正確的工具。當你的 Ruby on Rails 系統需要更強大的web handler時,可以考慮下ruby生態系統之外的技術,或許可以得到更簡單但更強大的替代方案。

我們如何用Go來處理每分鐘100萬復雜請求的場景