1. 程式人生 > 其它 >goreplay~v1.3.0新增項

goreplay~v1.3.0新增項

--input-file-dry-run:預覽時間和請求資訊

--input-file-max-wait:允許跳過已記錄檔案中的長暫停(s)

--input-file-read-depth:預讀和緩衝請求(並排序)。預設值是100

--input-raw-timestamp-type go:當網路時間戳不可靠時,啟用應用程式級時間戳

原始碼解析,plugins.go

for _, options := range Settings.InputFile {
    plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun
) }

輸入檔案構造器

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) {
    i = new(FileInput)
    i.data = make(chan []byte, 1000)
    i.exit = make(chan bool)
    i.path = path
    i.speedFactor 
= 1 i.loop = loop i.readDepth = readDepth i.stats = expvar.NewMap("file-" + path) i.dryRun = dryRun i.maxWait = maxWait if err := i.init(); err != nil { return } go i.emit() return }

做檔案初始化

func (i *FileInput) init() (err error) {
    defer i.mu.Unlock()
    i.mu.Lock()

    var matches []
string if strings.HasPrefix(i.path, "s3://") { sess := session.Must(session.NewSession(awsConfig())) svc := s3.New(sess) bucket, key := parseS3Url(i.path) params := &s3.ListObjectsInput{ Bucket: aws.String(bucket), Prefix: aws.String(key), } resp, err := svc.ListObjects(params) if err != nil { Debug(0, "[INPUT-FILE] Error while retreiving list of files from S3", i.path, err) return err } for _, c := range resp.Contents { matches = append(matches, "s3://"+bucket+"/"+(*c.Key)) } } else if matches, err = filepath.Glob(i.path); err != nil { Debug(0, "[INPUT-FILE] Wrong file pattern", i.path, err) return } if len(matches) == 0 { Debug(0, "[INPUT-FILE] No files match pattern: ", i.path) return errors.New("No matching files") } i.readers = make([]*fileInputReader, len(matches)) for idx, p := range matches { i.readers[idx] = newFileInputReader(p, i.readDepth) } i.stats.Add("reader_count", int64(len(matches))) return nil }

內容讀取

func (i *FileInput) emit() {
    var lastTime int64 = -1

    var maxWait, firstWait, minWait int64
    minWait = math.MaxInt64

    i.stats.Add("negative_wait", 0)

    for {
        select {
        case <-i.exit:
            return
        default:
        }

        reader := i.nextReader()

        if reader == nil {
            if i.loop {
                i.init()
                lastTime = -1
                continue
            } else {
                break
            }
        }

        reader.queue.RLock()
        payload := heap.Pop(&reader.queue).(*filePayload)
        i.stats.Add("total_counter", 1)
        i.stats.Add("total_bytes", int64(len(payload.data)))
        reader.queue.RUnlock()

        if lastTime != -1 {
            diff := payload.timestamp - lastTime

            if firstWait == 0 {
                firstWait = diff
            }

            if i.speedFactor != 1 {
                diff = int64(float64(diff) / i.speedFactor)
            }

            if i.maxWait > 0 && diff > int64(i.maxWait) {
                diff = int64(i.maxWait)
            }

            if diff >= 0 {
                lastTime = payload.timestamp

                if !i.dryRun {
                    time.Sleep(time.Duration(diff))
                }

                i.stats.Add("total_wait", diff)

                if diff > maxWait {
                    maxWait = diff
                }

                if diff < minWait {
                    minWait = diff
                }
            } else {
                i.stats.Add("negative_wait", 1)
            }
        } else {
            lastTime = payload.timestamp
        }

        // Recheck if we have exited since last check.
        select {
        case <-i.exit:
            return
        default:
            if !i.dryRun {
                i.data <- payload.data
            }
        }
    }

    i.stats.Set("first_wait", time.Duration(firstWait))
    i.stats.Set("max_wait", time.Duration(maxWait))
    i.stats.Set("min_wait", time.Duration(minWait))

    Debug(0, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))

    if i.dryRun {
        fmt.Printf("Records found: %v\nFiles processed: %v\nBytes processed: %v\nMax wait: %v\nMin wait: %v\nFirst wait: %v\nIt will take `%v` to replay at current speed.\nFound %v records with out of order timestamp\n",
            i.stats.Get("total_counter"),
            i.stats.Get("reader_count"),
            i.stats.Get("total_bytes"),
            i.stats.Get("max_wait"),
            i.stats.Get("min_wait"),
            i.stats.Get("first_wait"),
            time.Duration(i.stats.Get("total_wait").(*expvar.Int).Value()),
            i.stats.Get("negative_wait"),
        )
    }
}