1. 程式人生 > 其它 >Go語言操作 Redis[garyburd/redigo/redis] Mysql[/jmoiron/sql] RabbitMQ[streadway/amqp]

Go語言操作 Redis[garyburd/redigo/redis] Mysql[/jmoiron/sql] RabbitMQ[streadway/amqp]

技術標籤:GO

Reids

安裝匯入

go get github.com/garyburd/redigo/redis
import "github.com/garyburd/redigo/redis"

github:https://github.com/antirez/redis

Doc:http://godoc.org/github.com/garyburd/redigo/redis

Redis全套使用:http://www.cnblogs.com/suoning/p/5807247.html

使用

連線

import "github.com/garyburd/redigo/redis"

func main() {
    c, err := redis.Dial("tcp", "localhost:6379")
    if err != nil {
        fmt.Println("conn redis failed, err:", err)
        return
    }
    defer c.Close()
}

set & get

    _, err = c.Do("Set", "name", "nick")
    if err != nil {
        fmt.Println(err)
        return
    }

    r, err := redis.String(c.Do("Get", "name"))
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(r)

mset & mget 批量設定

   _, err = c.Do("MSet", "name", "nick", "age", "18")
    if err != nil {
        fmt.Println("MSet error: ", err)
        return
    }

    r2, err := redis.Strings(c.Do("MGet", "name", "age"))
    if err != nil {
        fmt.Println("MGet error: ", err)
        return
    }
    fmt.Println(r2)

hset & hget hash操作

	_, err = c.Do("HSet", "names", "nick", "suoning")
	if err != nil {
		fmt.Println("hset error: ", err)
		return
	}

	r, err := redis.String(c.Do("HGet", "names", "nick"))
	if err != nil {
		fmt.Println("hget error: ", err)
		return
	}
	fmt.Println(r)

expire 設定過期時間

    _, err = c.Do("expire", "names", 5)
    if err != nil {
        fmt.Println("expire error: ", err)
        return
    }

lpush & lpop & llen 佇列

	// 佇列
	_, err = c.Do("lpush", "Queue", "nick", "dawn", 9)
	if err != nil {
		fmt.Println("lpush error: ", err)
		return
	}
	for {
		r, err := redis.String(c.Do("lpop", "Queue"))
		if err != nil {
			fmt.Println("lpop error: ", err)
			break
		}
		fmt.Println(r)
	}
	r3, err := redis.Int(c.Do("llen", "Queue"))
	if err != nil {
		fmt.Println("llen error: ", err)
		return
	}
	fmt.Println(r3)

連線池

各引數的解釋如下:

MaxIdle:最大的空閒連線數,表示即使沒有redis連線時依然可以保持N個空閒的連線,而不被清除,隨時處於待命狀態。

MaxActive:最大的啟用連線數,表示同時最多有N個連線

IdleTimeout:最大的空閒連線等待時間,超過此時間後,空閒連線將被關閉

    pool := &redis.Pool{
        MaxIdle:     16,
        MaxActive:   1024,
        IdleTimeout: 300,
        Dial: func() (redis.Conn, error) {
            return redis.Dial("tcp", "localhost:6379")
        },
    }

連線池例子

package main

import (
	"fmt"

	"github.com/garyburd/redigo/redis"
)

var pool *redis.Pool

func init() {
	pool = &redis.Pool{
		MaxIdle:     16,
		MaxActive:   1024,
		IdleTimeout: 300,
		Dial: func() (redis.Conn, error) {
			return redis.Dial("tcp", "localhost:6379")
		},
	}
}

func main() {
	c := pool.Get()
	defer c.Close()

	_, err := c.Do("Set", "name", "nick")
	if err != nil {
		fmt.Println(err)
		return
	}

	r, err := redis.String(c.Do("Get", "name"))
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(r)
}

管道操作

請求/響應服務可以實現持續處理新請求,客戶端可以傳送多個命令到伺服器而無需等待響應,最後在一次讀取多個響應。

使用Send(),Flush(),Receive()方法支援管道化操作

Send向連線的輸出緩衝中寫入命令。

Flush將連線的輸出緩衝清空並寫入伺服器端。

Recevie按照FIFO順序依次讀取伺服器的響應。

func main() {
    c, err := redis.Dial("tcp", "localhost:6379")
    if err != nil {
        fmt.Println("conn redis failed, err:", err)
        return
    }
    defer c.Close()

    c.Send("SET", "name1", "sss1")
    c.Send("SET", "name2", "sss2")

    c.Flush()

    v, err := c.Receive()
    fmt.Printf("v:%v,err:%v\n", v, err)
    v, err = c.Receive()
    fmt.Printf("v:%v,err:%v\n", v, err)

    v, err = c.Receive()    // 夯住,一直等待
    fmt.Printf("v:%v,err:%v\n", v, err)
}

Mysql

安裝匯入

go get "github.com/go-sql-driver/mysql"
go get "github.com/jmoiron/sqlx"

import (
    _ "github.com/go-sql-driver/mysql"
    "github.com/jmoiron/sqlx"
)

連結:

github:

  https://github.com/go-sql-driver/mysql

  https://github.com/jmoiron/sqlx

Doc:

http://godoc.org/github.com/jmoiron/sqlx

  http://jmoiron.github.io/sqlx/

Mysql全套使用:http://www.cnblogs.com/suoning/p/5769141.html

連線

import (
    _ "github.com/go-sql-driver/mysql"
    "github.com/jmoiron/sqlx"
)

var Db *sqlx.DB

func init() {
    database, err := sqlx.Open("mysql", "root:[email protected](127.0.0.1:3306)/golang")
    if err != nil {
        fmt.Println("open mysql failed,", err)
        return
    }
    Db = database
}

例子建表

CREATE TABLE `person` (
  `user_id` int(128) NOT NULL AUTO_INCREMENT,
  `username` varchar(255) DEFAULT NULL,
  `sex` varchar(16) DEFAULT NULL,
  `email` varchar(128) DEFAULT NULL,
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
type Person struct {
    UserId   int    `db:"user_id"`
    Username string `db:"username"`
    Sex      string `db:"sex"`
    Email    string `db:"email"`
}

例子(insert)

func main() {
	r, err := Db.Exec("insert into person(username, sex, email)values(?, ?, ?)", "suoning", "man", "[email protected]")
	if err != nil {
		fmt.Println("exec failed, ", err)
		return
	}
	id, err := r.LastInsertId()
	if err != nil {
		fmt.Println("exec failed, ", err)
		return
	}

	fmt.Println("insert succ:", id)
}

例子(update)

  _, err := Db.Exec("update person set user_id=? where username=?", 20170808, "suoning")
    if err != nil {
        fmt.Println("exec failed, ", err)
        return
    }

例子(select)

	var person []Person
	err := Db.Select(&person, "select user_id, username, sex, email from person where user_id=?", 3)
	if err != nil {
		fmt.Println("exec failed, ", err)
		return
	}
	fmt.Println("select succ:", person)

	people := []Person{}
	Db.Select(&people, "SELECT * FROM person ORDER BY user_id ASC")
	fmt.Println(people)
	jason, john := people[0], people[1]
	fmt.Printf("%#v\n%#v", jason, john)

例子(delete)

	_, err := Db.Exec("delete from person where username=? limit 1", "suoning")
	if err != nil {
		fmt.Println("exec failed, ", err)
		return
	}

	fmt.Println("delete succ")

事務

func main()  {
    conn, err := Db.Begin()
    if err != nil {
        logs.Warn("DB.Begin failed, err:%v", err)
        return
    }

    defer func() {
        if err != nil {
            conn.Rollback()
            return
        }
        conn.Commit()
    }()

    // do something
}

RabbitMQ

安裝

go get "github.com/streadway/amqp"

文件:

https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go

例子一(普通模式 生產者:消費者)

package main

import (
	"bytes"
	"fmt"
	"log"
	"os"
	"strings"

	"time"

	"github.com/streadway/amqp"
)

/*
預設點對點模式
*/

func main() {
	Produce()
	Consume()
	var t string
	fmt.Scan(&t)
}
func Produce() {
	// 連線
	conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 開啟一個併發伺服器通道來處理訊息
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 申明一個佇列
	q, err := ch.QueueDeclare(
		"task_queue", // name
		true,         // durable  永續性的,如果事前已經聲明瞭該佇列,不能重複宣告
		false,        // delete when unused
		false,        // exclusive 如果是真,連線一斷開,佇列刪除
		false,        // no-wait
		nil,          // arguments
	)
	failOnError(err, "Failed to declare a queue")

	body := bodyFrom(os.Args)

	// 釋出
	err = ch.Publish(
		"",     // exchange 預設模式,exchange為空
		q.Name, // routing key 預設模式路由到同名佇列,即是task_queue
		false,  // mandatory
		false,
		amqp.Publishing{
			// 永續性的釋出,因為佇列被宣告為持久的,釋出訊息必須加上這個(可能不用),但訊息還是可能會丟,如訊息到快取但MQ掛了來不及持久化。
			DeliveryMode: amqp.Persistent,
			ContentType:  "text/plain",
			Body:         []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s", body)
}

func Consume() {
	conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 指定佇列!
	q, err := ch.QueueDeclare(
		"task_queue", // name
		true,         // durable
		false,        // delete when unused
		false,        // exclusive
		false,        // no-wait
		nil,          // arguments
	)
	failOnError(err, "Failed to declare a queue")

	// Fair dispatch 預取,每個工作方每次拿一個訊息,確認後才拿下一次,緩解壓力
	err = ch.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, // global
	)
	failOnError(err, "Failed to set QoS")

	// 消費根據佇列名
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,  // auto-ack   設定為真自動確認訊息
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	//forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
			dot_count := bytes.Count(d.Body, []byte("."))
			t := time.Duration(dot_count)
			time.Sleep(t * time.Second)
			log.Printf("Done")

			// 確認訊息被收到!!如果為真的,那麼同在一個channel,在該訊息之前未確認的訊息都會確認,適合批量處理
			// 真時場景:每十條訊息確認一次,類似
			d.Ack(false)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	//<-forever
}
func bodyFrom(args []string) string {
	var s string
	if (len(args) < 2) || os.Args[1] == "" {
		s = fmt.Sprintf("%s-%v", "hello", time.Now())
	} else {
		s = strings.Join(args[1:], " ")
	}
	return s
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

例子二(訂閱模式 生產者:消費者)

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "os"
    "strings"
    "time"
)

/*
廣播模式
釋出方
*/

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
 
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = fmt.Sprintf("%s-%v","hello", time.Now())
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

訂閱 消費者:

package main

import (
	"fmt"
	"log"
	"os"
	"strings"

	"time"

	"github.com/streadway/amqp"
)

/*
預設點對點模式
*/

func main() {
	go Consume()
	Produce()
	var t string
	fmt.Scan(&t)
}
func Produce() {
	conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 預設模式有預設交換機,廣播自己定義一個交換機,交換機可與佇列進行繫結
	err = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type 廣播模式
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	body := bodyFrom(os.Args)

	// 釋出
	err = ch.Publish(
		"logs", // exchange 訊息傳送到交換機,這個時候沒佇列繫結交換機,訊息會丟棄
		"",     // routing key  廣播模式不需要這個,它會把所有訊息路由到繫結的所有佇列
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)
}

func Consume() {
	conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 同樣要申明交換機
	err = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	// 新建佇列,這個佇列沒名字,隨機生成一個名字
	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when usused
		true,  // exclusive  表示連線一斷開,這個佇列自動刪除
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")

	// 佇列和交換機繫結,即是佇列訂閱了發到這個交換機的訊息
	err = ch.QueueBind(
		q.Name, // queue name  佇列的名字
		"",     // routing key  廣播模式不需要這個
		"logs", // exchange  交換機名字
		false,
		nil)
	failOnError(err, "Failed to bind a queue")

	// 開始消費訊息,可開多個訂閱方,因為佇列是臨時生成的,所有每個訂閱方都能收到同樣的訊息
	msgs, err := ch.Consume(
		q.Name, // queue  佇列名字
		"",     // consumer
		true,   // auto-ack  自動確認
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("received: [x] %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}
func bodyFrom(args []string) string {
	var s string
	if (len(args) < 2) || os.Args[1] == "" {
		s = fmt.Sprintf("%s-%v", "hello", time.Now())
	} else {
		s = strings.Join(args[1:], " ")
	}
	return s
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

例子三(RPC模式 請求:應答)

package main

import (
	"fmt"
	"log"
	"math/rand"
	"strconv"

	"time"

	"github.com/streadway/amqp"
)

/*
預設點對點模式
*/

func main() {
	go RPCAnswer()

	rand.Seed(time.Now().UTC().UnixNano())

	n := 10

	log.Printf(" [x] Requesting fib(%d)", n)
	res, err := fibonacciRPC(n)
	failOnError(err, "Failed to handle RPC request")

	log.Printf(" [.] Got %d", res)

	var t string
	fmt.Scan(&t)
}

func RPCAnswer() {
	conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"rpc_queue", // name
		false,       // durable
		false,       // delete when usused
		false,       // exclusive
		false,       // no-wait
		nil,         // arguments
	)
	failOnError(err, "Failed to declare a queue")

	// 公平分發 沒有這個則round-robbin
	err = ch.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, // global
	)
	failOnError(err, "Failed to set QoS")

	// 消費,等待請求
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,  // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		//請求來了
		for d := range msgs {
			n, err := strconv.Atoi(string(d.Body))
			failOnError(err, "Failed to convert body to integer")

			log.Printf(" [.] fib(%d)", n)

			// 計算
			response := fib(n)

			// 回答
			err = ch.Publish(
				"",        // exchange
				d.ReplyTo, // routing key
				false,     // mandatory
				false,     // immediate
				amqp.Publishing{
					ContentType:   "text/plain",
					CorrelationId: d.CorrelationId, //序列號
					Body:          []byte(strconv.Itoa(response)),
				})
			failOnError(err, "Failed to publish a message")

			// 確認回答完畢
			d.Ack(false)
		}
	}()

	log.Printf(" [*] Awaiting RPC requests")
	<-forever
}
func fibonacciRPC(n int) (res int, err error) {
	conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 佇列宣告
	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when usused
		true,  // exclusive 為真即連線斷開就刪除
		false, // noWait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")

	corrId := randomString(32)

	err = ch.Publish(
		"",          // exchange
		"rpc_queue", // routing key
		false,       // mandatory
		false,       // immediate
		amqp.Publishing{
			ContentType:   "text/plain",
			CorrelationId: corrId,
			ReplyTo:       q.Name,
			Body:          []byte(strconv.Itoa(n)),
		})
	failOnError(err, "Failed to publish a message")

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive   這個為真,伺服器會認為這是該佇列唯一的消費者
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")
	for d := range msgs {
		if corrId == d.CorrelationId {
			res, err = strconv.Atoi(string(d.Body))
			failOnError(err, "Failed to convert body to integer")
			break
		}
	}

	return
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}
func fib(n int) int {
	if n == 0 {
		return 0
	} else if n == 1 {
		return 1
	} else {
		return fib(n-1) + fib(n-2)
	}
}
func randomString(l int) string {
	bytes := make([]byte, l)
	for i := 0; i < l; i++ {
		bytes[i] = byte(randInt(65, 90))
	}
	return string(bytes)
}

func randInt(min int, max int) int {
	return min + rand.Intn(max-min)
}