1. 程式人生 > 其它 >原始碼實踐_實現一個mqtt(1)

原始碼實踐_實現一個mqtt(1)

目錄

原始碼實踐_實現一個mqtt(1)

https://gitee.com/maomaomaoge/opmq

寫這個原因

因為用開源的emqx,背後的語言是erlang,不是計算的語言,但是天生的分散式,單機測試吞吐非常容易卡死,部署分散式,說實話,我不會,小公司也沒有那個人力去部署,一般用一個單機搞,能多大,就多大,所以,我想實現一個單機最大效率的mq(當然這和機器的網絡卡和cpu有關),只是支援簡單的釋出訂閱即可,不正那麼多彎彎繞,小公司用不到的東西

設計topic

因為我沒臺關注topic的解析,但是可以為以後考慮到是topic主導,還是tcp連線主導。也就是用連線去找topic,還是topic去找tcp,最終敲定topic下面帶著tcp切片。資料結構用谷歌的Btree,因為這個資料結構在記憶體查詢很快。

下面是初步實現的topic和tcp的關係,結合了btree

package opmq

import (
	"github.com/google/btree"
	"net"
)

type Topic struct {
	Name string
	Conn []net.Conn
	Hash int64
}

func NewTopic(name string) *Topic {
	t :=  &Topic{
		Name: name,
		Conn: make([]net.Conn, 0),
	}
	t.CalcHash()

	return t
}

func (t *Topic) Less(b btree.Item) bool {
	return t.Hash < b.(*Topic).Hash
}

func (t *Topic) CalcHash()  {
	b := []byte(t.Name)
	for _, v := range b {
		t.Hash += int64(v)
	}
}

測試程式碼

package opmq

import (
	"flag"
	"fmt"
	"github.com/google/btree"
	"strconv"
	"testing"
)

// all extracts all items from a tree in order as a slice.
func all(t *btree.BTree) (out []btree.Item) {
	t.Ascend(func(a btree.Item) bool {
		out = append(out, a)
		return true
	})
	return
}

func hashHere(s string) int64 {
	h := int64(0)
	b := []byte(s)
	for _, v := range b {
		h += int64(v)
	}

	return h
}

func TestTopic_CalcHash(t *testing.T) {
	var btreeDegree = flag.Int("degree", 32, "B-Tree degree")
	tree := btree.New(*btreeDegree)

	for i := 0; i < 10; i++ {
		to := &Topic{
			Name: strconv.Itoa(i),
		}
		to.CalcHash()

		tree.ReplaceOrInsert(to)
	}

	for _, v := range all(tree) {
		fmt.Println(v)
	}

	fmt.Println("查詢")
	to2 := &Topic{
		Name: "1",
	}
	to2.CalcHash()
	fmt.Println(tree.Get(to2))
}

# 輸出程式碼
=== RUN   TestTopic_CalcHash
&{0 [] 48}
&{1 [] 49}
&{2 [] 50}
&{3 [] 51}
&{4 [] 52}
&{5 [] 53}
&{6 [] 54}
&{7 [] 55}
&{8 [] 56}
&{9 [] 57}
查詢
&{1 [] 49}
--- PASS: TestTopic_CalcHash (0.00s)
PASS

Process finished with the exit code 0

報文

直接用開源包的報文解析

測試報文

server.go

package opmq

import (
	"fmt"
	"gitee.com/maomaomaoge/opmq/packets"
	"log"
	"net"
)

func Serve() {
	ln, err := net.Listen("tcp", ":7000")
	if err != nil {
		return
	}
	for {
		conn, err := ln.Accept()
		if err != nil {
			log.Fatalln(err)
		}

		go read(conn)
	}
}

func read(conn net.Conn)  {
	for {
		packet, err := packets.ReadPacket(conn)
		if err != nil {
			return
		}

		fmt.Println("收到的報文: ", packet.String())

		ack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
		ack.Write(conn)
	}
}

開源包的最簡單程式碼

/*
 * Copyright (c) 2021 IBM Corp and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v2.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    https://www.eclipse.org/legal/epl-2.0/
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Seth Hoenig
 *    Allan Stockdill-Mander
 *    Mike Robertson
 */

package main

import (
	"fmt"
	"log"
	"os"
	"time"

	"github.com/eclipse/paho.mqtt.golang"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("TOPIC: %s\n", msg.Topic())
	fmt.Printf("MSG: %s\n", msg.Payload())
}

func main() {
	mqtt.DEBUG = log.New(os.Stdout, "", 0)
	mqtt.ERROR = log.New(os.Stdout, "", 0)
	opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:7000")

	c := mqtt.NewClient(opts)
	token := c.Connect()
	if token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	for i := 0; i < 5; i++ {
		text := fmt.Sprintf("this is msg #%d!", i)
		token := c.Publish("go-mqtt/sample", 0, false, text)
		token.Wait()
	}

	time.Sleep(6 * time.Second)

	if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	c.Disconnect(250)

	time.Sleep(1 * time.Second)
}

serve收到的報文

=== RUN   TestServe
收到的報文:  CONNECT: dup: false qos: 0 retain: false rLength: 12 protocolversion: 4 protocolname: MQTT cleansession: true willflag: false WillQos: 0 WillRetain: false Usernameflag: false Passwordflag: false keepalive: 30 clientId:  willtopic:  willmessage:  Username:  Password: 

nice, 初步完成