原始碼實踐_實現一個mqtt(1)
阿新 • • 發佈:2021-09-01
目錄
原始碼實踐_實現一個mqtt(1)
寫這個原因
因為用開源的emqx,背後的語言是erlang,不是計算的語言,但是天生的分散式,單機測試吞吐非常容易卡死,部署分散式,說實話,我不會,小公司也沒有那個人力去部署,一般用一個單機搞,能多大,就多大,所以,我想實現一個單機最大效率的mq(當然這和機器的網絡卡和cpu有關),只是支援簡單的釋出訂閱即可,不正那麼多彎彎繞,小公司用不到的東西
設計topic
因為我沒臺關注topic的解析,但是可以為以後考慮到是topic主導,還是tcp連線主導。也就是用連線去找topic,還是topic去找tcp,最終敲定topic下面帶著tcp切片。資料結構用谷歌的Btree,因為這個資料結構在記憶體查詢很快。
下面是初步實現的topic和tcp的關係,結合了btree
package opmq import ( "github.com/google/btree" "net" ) type Topic struct { Name string Conn []net.Conn Hash int64 } func NewTopic(name string) *Topic { t := &Topic{ Name: name, Conn: make([]net.Conn, 0), } t.CalcHash() return t } func (t *Topic) Less(b btree.Item) bool { return t.Hash < b.(*Topic).Hash } func (t *Topic) CalcHash() { b := []byte(t.Name) for _, v := range b { t.Hash += int64(v) } }
測試程式碼
package opmq import ( "flag" "fmt" "github.com/google/btree" "strconv" "testing" ) // all extracts all items from a tree in order as a slice. func all(t *btree.BTree) (out []btree.Item) { t.Ascend(func(a btree.Item) bool { out = append(out, a) return true }) return } func hashHere(s string) int64 { h := int64(0) b := []byte(s) for _, v := range b { h += int64(v) } return h } func TestTopic_CalcHash(t *testing.T) { var btreeDegree = flag.Int("degree", 32, "B-Tree degree") tree := btree.New(*btreeDegree) for i := 0; i < 10; i++ { to := &Topic{ Name: strconv.Itoa(i), } to.CalcHash() tree.ReplaceOrInsert(to) } for _, v := range all(tree) { fmt.Println(v) } fmt.Println("查詢") to2 := &Topic{ Name: "1", } to2.CalcHash() fmt.Println(tree.Get(to2)) } # 輸出程式碼 === RUN TestTopic_CalcHash &{0 [] 48} &{1 [] 49} &{2 [] 50} &{3 [] 51} &{4 [] 52} &{5 [] 53} &{6 [] 54} &{7 [] 55} &{8 [] 56} &{9 [] 57} 查詢 &{1 [] 49} --- PASS: TestTopic_CalcHash (0.00s) PASS Process finished with the exit code 0
報文
直接用開源包的報文解析
測試報文
server.go
package opmq
import (
"fmt"
"gitee.com/maomaomaoge/opmq/packets"
"log"
"net"
)
func Serve() {
ln, err := net.Listen("tcp", ":7000")
if err != nil {
return
}
for {
conn, err := ln.Accept()
if err != nil {
log.Fatalln(err)
}
go read(conn)
}
}
func read(conn net.Conn) {
for {
packet, err := packets.ReadPacket(conn)
if err != nil {
return
}
fmt.Println("收到的報文: ", packet.String())
ack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
ack.Write(conn)
}
}
開源包的最簡單程式碼
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
*/
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/eclipse/paho.mqtt.golang"
)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:7000")
c := mqtt.NewClient(opts)
token := c.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("go-mqtt/sample", 0, false, text)
token.Wait()
}
time.Sleep(6 * time.Second)
if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
c.Disconnect(250)
time.Sleep(1 * time.Second)
}
serve收到的報文
=== RUN TestServe
收到的報文: CONNECT: dup: false qos: 0 retain: false rLength: 12 protocolversion: 4 protocolname: MQTT cleansession: true willflag: false WillQos: 0 WillRetain: false Usernameflag: false Passwordflag: false keepalive: 30 clientId: willtopic: willmessage: Username: Password:
nice, 初步完成