1. 程式人生 > 其它 >golang ceph客戶端操作

golang ceph客戶端操作

首先需要匯入依賴

go get gopkg.in/amz.v1/aws
go get gopkg.in/amz.v1/s3

 

1. 初始化ceph連線

在初始化連線之前,我們需要建立一個使用者得到accessKey和secretKey,新增使用者的指令如下:

docker exec ceph-rgw radosgw-admin user create --uid="test" --display-name="test user"

 

下面就是初始化ceph客戶端的操作,和一些資料庫的連線一樣的,都是授權+地址:

func init() {
	auth := aws.Auth{
		AccessKey: accessKey,
		SecretKey: secretKey,
	}

	region := aws.Region{
		Name:                 "default",
		EC2Endpoint:          url,   // "http://<ceph-rgw ip>:<ceph-rgw port>"
		S3Endpoint:           url,
		S3BucketEndpoint:     "",    // Not needed by AWS S3
		S3LocationConstraint: false, // true if this region requires a LocationConstraint declaration
		S3LowercaseBucket:    false, // true if the region requires bucket names to be lower case
		Sign:                 aws.SignV2,
	}

	CephConn = s3.New(auth, region)
}

 

下面是針對ceph的一些簡單操作,因為專案原因只需要一些檔案上傳下載的操作,其他操作可以參見amz.v1的doc

 

2. 獲取一個桶

func GetCephBucket(bucket string) *s3.Bucket {
	return CephConn.Bucket(bucket)
}

 

3. 將本地檔案上傳到ceph的一個bucket中

func put2Bucket(bucket *s3.Bucket, localPath, cephPath string) (*s3.Bucket, error) {
	err := bucket.PutBucket(s3.PublicRead)
	if err != nil {
		log.Fatal(err.Error())
		return nil, err
	}

	bytes, err := ioutil.ReadFile(localPath)
	if err != nil {
		log.Fatal(err.Error())
		return nil, err
	}

	err = bucket.Put(cephPath, bytes, "octet-stream", s3.PublicRead)
	return bucket, err
}

 

4. 從ceph下載檔案

func downloadFromCeph(bucket *s3.Bucket, localPath, cephPath string) error {
	data, err := bucket.Get(cephPath)
	if err != nil {
		log.Fatal(err.Error())
		return err
	}
	return ioutil.WriteFile(localPath, data, 0666)
}

 

5. 刪除指定的檔案

func delCephData(bucket *s3.Bucket, cephPath string) error {
	err := bucket.Del(cephPath)
	if err != nil {
		log.Fatal(err.Error())
	}
	return err
}

 

6. 刪除桶

刪除桶時要保證桶內檔案已經被刪除

func delBucket(bucket *s3.Bucket) error {
	err := bucket.DelBucket()
	if err != nil {
		log.Fatal(err.Error())
	}
	return err
}

 

7. 批量獲取檔案資訊

func getBatchFromCeph(bucket *s3.Bucket, prefixCephPath string) []string {
	maxBatch := 100

	// bucket.List() 返回桶內objects的資訊,預設1000條
	resultListResp, err := bucket.List(prefixCephPath, "", "", maxBatch)
	if err != nil {
		log.Fatal(err.Error())
		return nil
	}

	keyList := make([]string, 0)
	for _, key := range resultListResp.Contents {
		keyList = append(keyList, key.Key)
	}

	return keyList
}

 

8. 測試

編寫一個main.go來測試這些介面,嘗試上傳和下載一個檔案

func main() {
	bucketName := "bucket_test"
	filename := "C:\\Users\\dell\\Desktop\\ieee.jpg"
	cephPath := "/static/default/bucket_test/V1/" + "ieee_ceph.jpg"

	// 獲取指定桶
	bucket := GetCephBucket(bucketName)

	// 上傳
	bucket, err := put2Bucket(bucket, filename, cephPath)
	if err != nil {
		return
	}

	// 下載
	localPath := "C:\\Users\\dell\\Desktop\\download.jpg"
	err = downloadFromCeph(bucket, localPath, cephPath)
	if err != nil {
		return
	}

	// 獲得url
	url := bucket.SignedURL(cephPath, time.Now().Add(time.Hour))
	fmt.Println(url)

	// 批量查詢
	prefixCephpath := "static/default/bucket_test/V1"
	lists := getBatchFromCeph(bucket, prefixCephpath)
	for _, list := range lists {
		fmt.Println(list)
	}

	// 刪除資料
	delCephData(bucket, cephPath)

	// 刪除桶
	delBucket(bucket)

}

 

9. 上傳ceph的簡單優化

選擇將本地伺服器檔案上傳ceph應該是非同步操作,無論是採用chan通知一個上傳的協程還是晚上的定時任務

非同步任務的操作可以選擇簡單的chan,或者使用訊息佇列,如果吞吐量大的情況下就要使用rabbitmq等訊息中介軟體了