1. 程式人生 > >skynet 原始碼解析 message queue

skynet 原始碼解析 message queue

#ifndef SKYNET_MESSAGE_QUEUE_H
#define SKYNET_MESSAGE_QUEUE_H

#include <stdlib.h>
#include <stdint.h>

struct skynet_message {
	uint32_t source;		//訊息源的控制代碼
	int session;			//	
	void * data;			//訊息指標
	size_t sz;				//訊息大小
};

// type is encoding in skynet_message.sz high 8bit
#define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)

struct message_queue;

void skynet_globalmq_push(struct message_queue * queue);
struct message_queue * skynet_globalmq_pop(void);

struct message_queue * skynet_mq_create(uint32_t handle);
void skynet_mq_mark_release(struct message_queue *q);

typedef void (*message_drop)(struct skynet_message *, void *);

void skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud);
uint32_t skynet_mq_handle(struct message_queue *);

// 0 for success
int skynet_mq_pop(struct message_queue *q, struct skynet_message *message);
void skynet_mq_push(struct message_queue *q, struct skynet_message *message);

// return the length of message queue, for debug
int skynet_mq_length(struct message_queue *q);
int skynet_mq_overload(struct message_queue *q);

void skynet_mq_init();

#endif
#include "skynet.h"
#include "skynet_mq.h"
#include "skynet_handle.h"
#include "spinlock.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <stdbool.h>

#define DEFAULT_QUEUE_SIZE 64
#define MAX_GLOBAL_MQ 0x10000

// 0 means mq is not in global mq.
// 1 means mq is in global mq , or the message is dispatching.

#define MQ_IN_GLOBAL 1
#define MQ_OVERLOAD 1024


//訊息結構為先進先出,從tail壓入資料,從head拿出資料


//  ↑   
//|head|
//|	   |
//|	   |
//|tail|
//  ↑

//https://blog.csdn.net/zxm342698145/article/details/80847301
//訊息佇列可以看做一個圓,分別有兩個tail和head哨兵進行迴圈



//訊息佇列
struct message_queue {
	struct spinlock lock;		//訊息節點鎖
	uint32_t handle;			//控制代碼
	int cap;					//預分配大小
	int head;					//頭部
	int tail;					//尾部
	int release;                    //釋放標記
	int in_global;                  //是否在全域性佇列內
	int overload;                   //過載
	int overload_threshold;         //過載閾值
	struct skynet_message *queue;	//訊息
	struct message_queue *next;		//下一個訊息佇列
};

//全域性佇列
struct global_queue {				
	struct message_queue *head;
	struct message_queue *tail;
	struct spinlock lock;			//全域性佇列鎖
};
//訊息佇列
static struct global_queue *Q = NULL;


//將訊息佇列放入全域性佇列
void 
skynet_globalmq_push(struct message_queue * queue) {
	struct global_queue *q= Q;

	SPIN_LOCK(q) //上鎖
	assert(queue->next == NULL);
	if(q->tail) {			//如果當前全域性佇列中已有訊息佇列就將尾部置為當前佇列
		q->tail->next = queue;
		q->tail = queue;
	} else {
		q->head = q->tail = queue;//如果當前全域性佇列為空,則設定頭尾節點
	}
	SPIN_UNLOCK(q)//解鎖
}

//彈出全域性佇列頭部節點
struct message_queue * 
skynet_globalmq_pop() {
	struct global_queue *q = Q;	
	SPIN_LOCK(q)
	//獲取當前頭部節點
	struct message_queue *mq = q->head;
	if(mq) {
		//將頭部向下順移一個節點
		q->head = mq->next;
		if(q->head == NULL) {	    //如果head的下一個節點為空,則將全域性佇列的尾部節點置為空,
			assert(mq == q->tail);	//當尾部節點不等於尾部節點觸發斷言
			q->tail = NULL;
		}
		mq->next = NULL;	//將我們待取出的節點的下個節點欄位置為null,使其稱為一個獨立的節點
	}
	SPIN_UNLOCK(q)

	return mq;
}

//通過傳入的控制代碼建立一個訊息節點
struct message_queue * 
skynet_mq_create(uint32_t handle) {
	//聲請一個訊息節點
	struct message_queue *q = skynet_malloc(sizeof(*q));
	q->handle = handle;				//設定控制代碼
	q->cap = DEFAULT_QUEUE_SIZE;	//設定最大容量
	q->head = 0;					//頭部哨兵,指當前頭部的位置
	q->tail = 0;					//尾部哨兵,指當前尾部的位置
	SPIN_INIT(q)					//初始化鎖
	// When the queue is create (always between service create and service init) ,
	// set in_global flag to avoid push it to global queue .
	// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
	q->in_global = MQ_IN_GLOBAL;	//設為全域性節點	
	q->release = 0;					//釋放標記
	q->overload = 0;				//過載
	q->overload_threshold = MQ_OVERLOAD;//過載閾值
	q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);	//申請一個訊息記憶體
	q->next = NULL;														//暫無下個節點

	return q;
}


//釋放訊息佇列,從函式設計來看,函式的執行應在節點完全獨立之後.
static void 
_release(struct message_queue *q) {
	assert(q->next == NULL);
	SPIN_DESTROY(q)
	skynet_free(q->queue);
	skynet_free(q);
}

//獲取當前節點的控制代碼
uint32_t 
skynet_mq_handle(struct message_queue *q) {
	return q->handle;
}

//獲取當前佇列剩餘容量,由於佇列設計為一個圓,所以當尾節點小於頭節點,要加上當前整個圓的長度
int
skynet_mq_length(struct message_queue *q) {
	int head, tail,cap;

	SPIN_LOCK(q)
	head = q->head;
	tail = q->tail;
	cap = q->cap;
	SPIN_UNLOCK(q)
	
	if (head <= tail) {
		return tail - head;
	}
	return tail + cap - head;
}

//獲取當前節點超載資訊,並置為0
int
skynet_mq_overload(struct message_queue *q) {
	if (q->overload) {
		int overload = q->overload;
		q->overload = 0;
		return overload;
	} 
	return 0;
}

//從一個訊息佇列取出一個訊息,為引數返回值形式放入放入的skynet_message指標內
//返回0為取出訊息成功,返回1為取出訊息失敗
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
	int ret = 1;
	SPIN_LOCK(q)
	
	if (q->head != q->tail) {//當訊息佇列中有訊息時
		*message = q->queue[q->head++];	//從頭部開始取出訊息放入傳入的引數內
		ret = 0;						//取出成功
		int head = q->head;				
		int tail = q->tail;
		int cap = q->cap;
										//在取出成功後,對當前訊息佇列進行整理
		if (head >= cap) {				//當取出的訊息超過了預分配的最大容量,則置為0
			q->head = head = 0;
		}
		int length = tail - head;		//獲取當前長度
		if (length < 0) {				//
			length += cap;
		}
		while (length > q->overload_threshold) {	//當前訊息佇列長度大於過載閾值時,對過載大小和過載閾值進行重新計算分配
			q->overload = length;
			q->overload_threshold *= 2;
		}
	} else {
		// reset overload_threshold when queue is empty
		q->overload_threshold = MQ_OVERLOAD;
	}
	//當訊息佇列沒有訊息,則設為非全域性佇列
	if (ret) {
		q->in_global = 0;
	}
	
	SPIN_UNLOCK(q)
	
	return ret;
}

//擴大當前佇列
static void
expand_queue(struct message_queue *q) {
	struct skynet_message *new_queue = skynet_malloc(sizeof(struct skynet_message) * q->cap * 2);
	int i;
	for (i=0;i<q->cap;i++) {
		new_queue[i] = q->queue[(q->head + i) % q->cap];
	}
	q->head = 0;
	q->tail = q->cap;
	q->cap *= 2;
	
	skynet_free(q->queue);
	q->queue = new_queue;
}

//向全域性佇列新增節點
void 
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
	assert(message);
	SPIN_LOCK(q)
	//向訊息佇列的尾部插入一個訊息
	q->queue[q->tail] = *message;
	//當尾節點和預分配大小重合,等於tail回到出發點,則置為0
	if (++ q->tail >= q->cap) {
		q->tail = 0;
	}
	//當尾部位置等於頭部位置,說明當前訊息佇列已經置滿,立即進行擴容
	if (q->head == q->tail) {
		expand_queue(q);
	}
	//如果當前訊息佇列為非全域性,則設為全域性
	if (q->in_global == 0) {
		q->in_global = MQ_IN_GLOBAL;
		skynet_globalmq_push(q);
	}
	
	SPIN_UNLOCK(q)
}

//全域性佇列初始化
void 
skynet_mq_init() {
	struct global_queue *q = skynet_malloc(sizeof(*q));
	memset(q,0,sizeof(*q));
	SPIN_INIT(q);
	Q=q;
}

//釋放訊息佇列
void 
skynet_mq_mark_release(struct message_queue *q) {
	SPIN_LOCK(q)
	assert(q->release == 0);
	q->release = 1;
	if (q->in_global != MQ_IN_GLOBAL) {
		skynet_globalmq_push(q);
	}
	SPIN_UNLOCK(q)
}

//對當前訊息佇列內的訊息進行drop_func操作,執行完畢後釋放當前訊息佇列
static void
_drop_queue(struct message_queue *q, message_drop drop_func, void *ud) {
	struct skynet_message msg;
	while(!skynet_mq_pop(q, &msg)) {
		drop_func(&msg, ud);
	}
	_release(q);
}

//檢測是否需要進行釋放,分別進行釋放操作和全域性佇列新增操作,地獄或天堂
void 
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {
	SPIN_LOCK(q)
	
	if (q->release) {
		SPIN_UNLOCK(q)
		_drop_queue(q, drop_func, ud);
	} else {
		skynet_globalmq_push(q);
		SPIN_UNLOCK(q)
	}
}

相關推薦

skynet 原始碼解析 message queue

#ifndef SKYNET_MESSAGE_QUEUE_H #define SKYNET_MESSAGE_QUEUE_H #include <stdlib.h> #include <stdint.h> struct skynet_message

Loop,Handler,Message原始碼解析

一.Looper 1.Loop的概述 主要是講訊息從訊息佇列中取出來,然後交給與message相對應的Handler處理。 2.Loop的屬性 Thread<Local>:為每個執行緒建立一個副本  就是建立一個Loop線上程裡面。    

Handler,Message,Looper,MessageQueue原始碼解析

好久不看原始碼了,心生畏懼,所以這一週抽出時間來看看Handler相關原始碼 感謝下面幾位博主的部落格 我們是如何使用Handler的? public class HandlerActivity extends AppCompatActivity {

Android的訊息處理機制:Message、Handlerhe和Looper原始碼解析

android的訊息處理有三個核心類:Looper,Handler和Message。其實還有一個Message Queue(訊息佇列),但是MQ被封裝到Looper裡面了,我們不會直接與MQ打交道,因此我沒將其作為核心類。下面一一介紹: 執行緒的魔法師 Looper Loo

RocketMQ原始碼解析Message拉取&消費(下)

title: RocketMQ 原始碼分析 —— Message 拉取與消費(下) date: 2017-05-11 tags: categories: RocketMQ permalink: RocketMQ/message-pull-and-cons

Queue-PriorityQueue原始碼解析

Queue佇列通常是先進先出(FIFO),但也有特殊的非FIFO,如本文也分析的PriorityQueue。 ## Queue介面 Queue介面定義的方法: ![](https://img2020.cnblogs.com/blog/1306925/202005/1306925-202005131753

Windows Message Queue

col get pty most esc win use logs param Message queue is the basic fundamental of windows system. For each process, the system maintains

zoj 2724 Windows Message Queue(使用priority_queue容器模擬消息隊列)

return strong BE struct rom http PQ mco main 題目鏈接:   http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode=2724 題目描述:   Message qu

常用Message Queue對比

分布式系 AR con 出隊 很多 lower ado 簡單 角色 RabbitMQRedisZeroMQActiveMQKafka/Jafka相關概念RabbitMQ是使用Erlang編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP, STOM

Netty進階:Futrue&Promise原始碼解析

文章目錄 1. Future&Promise 2. AbstractFuture 3.Completefuture 4.Channelfuture&Completechannel

大資料基礎(1)zookeeper原始碼解析

五 原始碼解析   public enum ServerState { LOOKING, FOLLOWING, LEADING, OBSERVING;}zookeeper伺服器狀態:剛啟動LOOKING,follower是FOLLOWING,leader是LEADING,observer是

Android框架原始碼解析之(四)Picasso

這次要分析的原始碼是 Picasso 2.5.2 ,四年前的版本,用eclipse寫的,但不影響這次我們對其原始碼的分析 地址:https://github.com/square/picasso/tree/picasso-parent-2.5.2 Picasso的簡單使用

Android框架原始碼解析之(三)ButterKnife

注:所有分析基於butterknife:8.4.0 原始碼目錄:https://github.com/JakeWharton/butterknife 其中最主要的3個模組是: Butterknife註解處理器https://github.com/JakeWharton/

Android框架原始碼解析之(二)OKhttp

原始碼在:https://github.com/square/okhttp 包實在是太多了,OKhttp核心在這塊https://github.com/square/okhttp/tree/master/okhttp 直接匯入Android Studio中即可。 基本使用:

Android框架原始碼解析之(一)Volley

前幾天面試CVTE,HR面掛了。讓內部一個學長幫我查看了一下面試官評價,發現二面面試官的評價如下: 廣度OK,但缺乏深究能力,深度與實踐不足 原始碼:只能說流程,細節程式碼不清楚,retrofit和volley都是。 感覺自己一方面:自己面試技巧有待提高吧(框

HashMap原始碼解析(JDK8)

前言 這段時間有空,專門填補了下基礎,把常用的ArrayList、LinkedList、HashMap、LinkedHashMap、LruCache原始碼看了一遍,List相對比較簡單就不單獨介紹了,Map準備用兩篇的篇幅,分別介紹HashMap和(LruCache+LinkedHa

原始碼解析--Long、long型別的比較遇到的問題

Long、long型別的比較遇到的問題: 1、long 是基本型別 Long是物件型別。 public static void main(String[] args) { Long A = 127l; Long B = 127l; long C = 127; l

CopyOnWriteArrayList實現原理以及原始碼解析

CopyOnWriteArrayList實現原理以及原始碼解析 1、CopyOnWrite容器(併發容器) Copy-On-Write簡稱COW,是一種用於程式設計中的優化策略。 其基本思路是,從一開始大家都在共享同一個內容,當某個人想要修改這個內容的時候,才

LinkedList實現原理以及原始碼解析(1.7)

LinkedList實現原理以及原始碼解析(1.7) 在1.7之後,oracle將LinkedList做了一些優化, 將1.6中的環形結構優化為了直線型了連結串列結構。 1、LinkedList定義: public class LinkedList<E>

ArrayList實現原理以及原始碼解析(補充JDK1.7,1.8)

ArrayList實現原理以及原始碼解析(補充JDK1.7,1.8) ArrayList的基本知識在上一節已經討論過,這節主要看ArrayList在JDK1.6到1.8的一些實現變化。 JDK版本不一樣,ArrayList類的原始碼也不一樣。 1、ArrayList類結構: