Golang的併發程式設計(2)
阿新 • • 發佈:2021-07-01
package main import ( "bufio" "fmt" "io" "os" "strings" "time" ) type Reader interface { Read(rc chan []byte) } type Writer interface { Write(wc chan string) } type LogProcess struct { rc chan []byte wc chan string read Reader write Writer } type ReadFromFIle struct { path string } type WriteToInfluxDB struct { influxDBDsn string } func (r *ReadFromFIle) Read(rc chan []byte) { //開啟檔案 f, err :=os.Open(r.path) if err != nil{ panic(fmt.Sprintf("open file error:%s",err.Error())) } //末尾逐行讀取 f.Seek(0,2) rd := bufio.NewReader(f) for { line, err := rd.ReadBytes('\n') if err == io.EOF { time.Sleep(500*time.Millisecond) continue }else if err != nil{ panic(fmt.Sprintf("ReadBytes error:%s",err.Error())) } rc <- line[:len(line)-1] } } func (r *WriteToInfluxDB) Write(wc chan string) { //寫入 for v := range wc{ fmt.Println(v) } } func (l *LogProcess) Process() { //解析 for v := range l.rc{ l.wc <- strings.ToUpper(string(v)) } } func main() { r := &ReadFromFIle{ path: "./access.log", } w := &WriteToInfluxDB{ influxDBDsn: "username&password..", } lp := &LogProcess{ rc: make(chan []byte), wc: make(chan string), read: r, write: w, } go lp.read.Read(lp.rc) go lp.Process() go lp.write.Write(lp.wc) time.Sleep(30*time.Second) }