1. 程式人生 > >php中Redis的應用--訊息傳遞

php中Redis的應用--訊息傳遞

訊息傳遞這一應用廣泛存在於各個網站中,這個功能也是一個網站必不可少的。本文主要介紹了php中Redis的應用--訊息傳遞。

閱讀目錄

1、摘要

2、實現方法

3、一對一訊息傳遞

4、多對多訊息傳遞

1、摘要

訊息傳遞這一應用廣泛存在於各個網站中,這個功能也是一個網站必不可少的。常見的訊息傳遞應用有,新浪微博中的@我呀、給你評論然後的提示呀、贊贊贊提示、私信呀、甚至是發微博分享的新鮮事;知乎中的私信呀、live傳送過來的訊息、知乎團隊訊息呀等等。

2、實現方法

訊息傳遞即兩個或者多個客戶端在相互發送和接收訊息。

通常有兩種方法實現:

第一種為訊息推送。

Redis內建有這種機制,publish往頻道推送訊息、subscribe訂閱頻道。這種方法有一個缺點就是必須保證接收者時刻線上(即是此時程式不能停下來,一直保持監控狀態,假若斷線後就會出現客戶端丟失資訊)

第二種為訊息拉取。所謂訊息拉取,就是客戶端自主去獲取儲存在伺服器中的資料。Redis內部沒有實現訊息拉取這種機制。因此我們需要自己手動編寫程式碼去實現這個功能。

在這裡我們,我們進一步將訊息傳遞再細分為一對一的訊息傳遞,多對多的訊息傳遞(群組訊息傳遞)。

【注:兩個類的程式碼相對較多,因此將其摺疊起來了】

3、一對一訊息傳遞

例子1:一對一訊息傳送與獲取

模組要求:

1、提示有多少個聯絡人發來新訊息

2、資訊包含傳送人、時間、資訊內容

3、能夠獲取之前的舊訊息

4、並且訊息能夠保持7天,過期將會被動觸發刪除

Redis實現思路:

1、新訊息與舊訊息分別採用兩個連結串列來儲存

2、原始訊息的結構採用陣列的形式存放,並且含有傳送人、時間戳、資訊內容

3、在推入redis的連結串列前,需要將資料轉換為json型別然後再進行儲存

4、在取出新資訊時應該使用rpoplpush來實現,將已讀的新訊息推入舊訊息連結串列中

5、取出舊訊息時,應該用舊訊息的時間與現在的時間進行對比,若超時,則直接刪除後面的全部資料(因為資料是按時間一個一個壓進連結串列中的,所以對於時間是有序排列的)

資料儲存結構圖:

PHP的實現程式碼:

#SinglePullMessage.class.php

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

<?php

#單接接收者接收訊息

class SinglePullMessage

{

 private $redis=''; #儲存redis物件

 /**

 * @desc 建構函式

 *

 * @param $host string | redis主機

 * @param $port int | 埠

 */

 public function __construct($host,$port=6379)

 {

 $this->redis=new Redis();

 $this->redis->connect($host,$port);

 }

 /**

 * @desc 傳送訊息(一個人)

 *

 * @param $toUser string | 接收人

 * @param $messageArr array | 傳送的訊息陣列,包含sender、message、time

 *

 * @return bool

 */

 public function sendSingle($toUser,$messageArr)

 {

 $json_message=json_encode($messageArr); #編碼成json資料

 return $this->redis->lpush($toUser,$json_message); #將資料推入連結串列

 }

 /**

 * @desc 使用者獲取新訊息

 *

 * @param $user string | 使用者名稱

 *

 * @return array 返回陣列,包含多少個使用者發來新訊息,以及具體訊息

 */

 public function getNewMessage($user)

 {

 #接收新資訊資料,並且將資料推入舊資訊資料鏈表中,並且在原連結串列中刪除

 $messageArr=array();

 while($json_message=$this->redis->rpoplpush($user, 'preMessage_'.$user))

 {

  $temp=json_decode($json_message); #將json資料變成物件

  $messageArr[$temp->sender][]=$temp; #轉換成陣列資訊

 }

 if($messageArr)

 {

  $arr['count']=count($messageArr); #統計有多少個使用者發來資訊

  $arr['messageArr']=$messageArr;

  return $arr;

 }

 return false;

 }

 public function getPreMessage($user)

 {

 ##取出舊訊息

 $messageArr=array();

 $json_pre=$this->redis->lrange('preMessage_'.$user, 0, -1); #一次性將全部舊訊息取出來

 foreach ($json_pre as $k => $v)

 {

  $temp=json_decode($v);  #json反編碼

  $timeout=$temp->time+60*60*24*7; #資料過期時間 七天過期

  if($timeout<time())  #判斷資料是否過期

  {

  if($k==0)   #若是最遲插入的資料都過期了,則將所有資料刪除

  {

   $this->redis->del('preMessage_'.$user);

   break;

  }

  $this->redis->ltrim('preMessage_'.$user, 0, $k); #若檢測出有過期的,則將比它之前插入的所有資料刪除

  break;

  }

  $messageArr[$temp->sender][]=$temp;

 }

 return $messageArr;

 }

 /**

 * @desc 訊息處理,沒什麼特別的作用。在這裡這是用來處理陣列資訊,然後將其輸出。

 *

 * @param $arr array | 需要處理的資訊陣列

 *

 * @return 返回列印輸出

 */

 public function dealArr($arr)

 {

 foreach ($arr as $k => $v)

 {

  foreach ($v as $k1 => $v2)

  {

  echo '傳送人:'.$v2->sender.' 傳送時間:'.date('Y-m-d h:i:s',$v2->time).'<br/>';

  echo '訊息內容:'.$v2->message.'<br/>';

  }

  echo "<hr/>";

 }

 }

}

測試:

1、傳送訊息

#建立test1.php

?

1

2

3

4

5

6

7

8

9

include './SinglePullMessage.class.php';

$object=new SinglePullMessage('192.168.95.11');

#傳送訊息

$sender='boss'; #傳送者

$to='jane';  #接收者

$message='How are you'; #資訊

$time=time();

$arr=array('sender'=>$sender,'message'=>$message,'time'=>$time);

echo $object->sendSingle($to,$arr);

2、獲取新訊息

#建立test2.php

?

1

2

3

4

5

6

7

8

9

10

11

include './SinglePullMessage.class.php';

$object=new SinglePullMessage('192.168.95.11');

#獲取新訊息

$arr=$object->getNewMessage('jane');

if($arr)

{

 echo $arr['count']."個聯絡人發來新訊息<br/><hr/>";

 $object->dealArr($arr['messageArr']);

}

else

 echo "無新訊息";

訪問結果:

3、獲取舊訊息

#建立test3.php

?

1

2

3

4

5

6

7

8

9

10

include './SinglePullMessage.class.php';

$object=new SinglePullMessage('192.168.95.11');

#獲取舊訊息

$arr=$object->getPreMessage('jane');

if($arr)

{

 $object->dealArr($arr);

}

else

 echo "無舊資料";

4、多對多訊息傳遞

例子2:多對多訊息傳送與獲取(即是群組)

模組要求:

1、使用者能夠自行建立群組,併成為群主

2、群主可以拉人進來作為群組成員、並且可以踢人

3、使用者可以直接退出群組

4、可以傳送訊息,每一位成員都可以拉取訊息

5、群組的訊息最大容納量為5000條

6、成員可以拉取新訊息,並提示有多少新訊息

7、成員可以分頁獲取之前已讀的舊訊息

。。。。。功能就寫這幾個吧,有需要或者想練習的同學們可以增加其他功能,例如禁言、匿名訊息傳送、檔案傳送等等。

Redis實現思路:

1、群組的訊息以及群組的成員組成採用有序集合進行儲存。群組訊息有序集合的member儲存使用者傳送的json資料訊息,score儲存唯一值,將採用原子操作incr獲取string中的自增長值進行儲存;群組成員有序集合的member儲存user,score儲存非零數字(在這裡這個score意義不大,我的例子程式碼中使用數字1為群主的score,其他的儲存為2。當然這使用這個資料還可以擴充套件別的功能,例如群組中成員等級)可參考下面資料儲存結構簡圖。

2、使用者所加入的群組也是採用有序集合進行儲存。其中,member儲存群組ID,score儲存使用者已經獲取該群組的最大訊息分值(對應群組訊息的score值)

3、使用者建立群組的時候,通過原子操作incr從而獲取一個唯一ID

4、使用者在群中傳送訊息時,也是通過原子操作incr獲取一個唯一自增長有序ID

5、在執行incr時,為防止併發導致競爭關係,因此需要進行加鎖操作【redis詳細鎖的講解可以參考:Redis構建分散式鎖//www.jb51.net/article/109704.htm

6、建立群組方法簡要思路,任何一個使用者都可以建立群組聊天,在建立的同時,可以選擇時是否新增群組成員(引數通過陣列的形式)。建立過程將會為這個群組建立一個群組成員有序集合(群組資訊有序集合暫時不建立),接著將群主新增進去,再將群ID新增使用者所參加的群組有序集合中。

資料儲存結構圖:

PHP的程式碼實現:

#ManyPullMessage.class.php

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

<?php

class ManyPullMessage

{

 private $redis=''; #儲存redis物件

 /**

 * @desc 建構函式

 *

 * @param $host string | redis主機

 * @param $port int | 埠

 */

 public function __construct($host,$port=6379)

 {

 $this->redis=new Redis();

 $this->redis->connect($host,$port);

 }

 /**

 * @desc 用於建立群組的方法,在建立的同時還可以拉人進群組

 *

 * @param $user string | 使用者名稱,建立群組的主人

 * @param $addUser array | 其他使用者構成的陣列

 *

 * @param $lockName string | 鎖的名字,用於獲取群組ID的時候用

 * @return int 返回群組ID

 */

 public function createGroupChat($user, $addUser=array(), $lockName='chatIdLock')

 {

 $identifier=$this->getLock($lockName); #獲取鎖

 if($identifier)

 {

  $id=$this->redis->incr('groupChatID'); #獲取群組ID

  $this->releaseLock($lockName,$identifier); #釋放鎖

 }

 else

  return false;

 $messageCount=$this->redis->set('countMessage_'.$id, 0); #初始化這個群組訊息計數器

 #開啟非事務型流水線,一次性將所有redis命令傳給redis,減少與redis的連線

 $pipe=$this->redis->pipeline();

 $this->redis->zadd('groupChat_'.$id, 1, $user); #建立群組成員有序集合,並新增群主

 #將這個群組新增到user所參加的群組有序集合中

 $this->redis->zadd('hasGroupChat_'.$user, 0, $id);

 foreach ($addUser as $v) #建立群組的同時需要新增的使用者成員

 {

  $this->redis->zadd('groupChat_'.$id, 2, $v);

  $this->redis->zadd('hasGroupChat_'.$v, 0, $id);

 }

 $pipe->exec();

 return $id; #返回群組ID

 }

 /**

 * @desc 群主主動拉人進群

 *

 * @param $user string | 群主名

 * @param $groupChatID int | 群組ID

 * @param $addMembers array | 需要拉進群的使用者

 *

 * @return bool

 */

 public function addMembers($user, $groupChatID, $addMembers=array())

 {

 $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #將groupChatName的群主取出來

 if($groupMasterScore==1) #判斷user是否是群主

 {

  $pipe=$this->redis->pipeline(); #開啟非事務流水線

  foreach ($addMembers as $v)

  {

  $this->redis->zadd('groupChat_'.$groupChatID, 2, $v);   #新增進群

  $this->redis->zadd('hasGroupChat_'.$v, 0, $groupChatID); #新增群名到使用者的有序集合中

  }

  $pipe->exec();

  return true;

 }

 return false;

 }

 /**

 * @desc 群主刪除成員

 *

 * @param $user string | 群主名

 * @param $groupChatID int | 群組ID

 * @param $delMembers array | 需要刪除的成員名字

 *

 * @return bool

 */

 public function delMembers($user, $groupChatID, $delMembers=array())

 {

 $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user);

 if($groupMasterScore==1) #判斷user是否是群主

 {

  $pipe=$this->redis->pipeline(); #開啟非事務流水線

  foreach ($delMembers as $v)

  {

  $this->redis->zrem('groupChat_'.$groupChatID, $v);  

  $this->redis->zrem('hasGroupChat_'.$v, $groupChatID);

  }

  $pipe->exec();

  return true;

 }

 return false;

 }

 /**

 * @desc 退出群組

 *

 * @param $user string | 使用者名稱

 * @param $groupChatID int | 群組名

 */

 public function quitGroupChat($user, $groupChatID)

 {

 $this->redis->zrem('groupChat_'.$groupChatID, $user);

 $this->redis->zrem('hasGroupChat_'.$user, $groupChatID);

 return true;

 }

 /**

 * @desc 傳送訊息

 *

 * @param $user string | 使用者名稱

 * @param $groupChatID int | 群組ID

 * @param $messageArr array | 包含傳送訊息的陣列

 * @param $preLockName string | 群訊息鎖字首,群訊息鎖全名為countLock_群ID

 *

 * @return bool

 */

 public function sendMessage($user, $groupChatID, $messageArr, $preLockName='countLock_')

 {

 $memberScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #成員score

 if($memberScore)

 {

  $identifier=$this->getLock($preLockName.$groupChatID); #獲取鎖

  if($identifier) #判斷獲取鎖是否成功

  {

  $messageCount=$this->redis->incr('countMessage_'.$groupChatID);

  $this->releaseLock($preLockName.$groupChatID,$identifier); #釋放鎖

  }

  else

  return false;

  $json_message=json_encode($messageArr);

  $this->redis->zadd('groupChatMessage_'.$groupChatID, $messageCount, $json_message);

  $count=$this->redis->zcard('groupChatMessage_'.$groupChatID); #檢視資訊量大小

  if($count>5000) #判斷資料量有沒有達到5000條

  { #資料量超5000,則需要清除舊資料

  $start=5000-$count;

  $this->redis->zremrangebyrank('groupChatMessage_'.$groupChatID, $start, $count);

  }

  return true;

 }

 return false;

 }

 /**

 * @desc 獲取新資訊

 *

 * @param $user string | 使用者名稱

 *

 * @return 成功則放回json資料陣列,無新資訊返回false

 */

 public function getNewMessage($user)

 {

 $arrID=$this->redis->zrange('hasGroupChat_'.$user, 0, -1, 'withscores'); #獲取使用者擁有的群組ID

 $json_message=array(); #初始化

 foreach ($arrID as $k => $v) #遍歷迴圈所有群組,檢視是否有新訊息

 {

  $messageCount=$this->redis->get('countMessage_'.$k); #群組最大資訊分值數

  if($messageCount>$v) #判斷使用者是否存在未讀新訊息

  {

  $json_message[$k]['message']=$this->redis->zrangebyscore('groupChatMessage_'.$k, $v+1, $messageCount);

  $json_message[$k]['count']=count($json_message[$k]['message']); #統計新訊息數量

  $this->redis->zadd('hasGroupChat_'.$user, $messageCount, $k); #更新已獲取訊息

  }

 }

 if($json_message)

  return $json_message;

 return false;

 }

 /**

 * @desc 分頁獲取群組資訊

 *

 * @param $user string | 使用者名稱

 * @param $groupChatID int | 群組ID

 * @param $page int | 第幾頁

 * @param $size int | 每頁多少條資料

 *

 * @return 成功返回json資料,失敗返回false

 */

 public function getPartMessage($user, $groupChatID, $page=1, $size=10)

 {

 $start=$page*$size-$size; #開始擷取資料位置

 $stop=$page*$size-1; #結束擷取資料位置

 $json_message=$this->redis->zrevrange('groupChatMessage_'.$groupChatID, $start, $stop);

 if($json_message)

  return $json_message;

 return false;

 }

 /**

 * @desc 加鎖方法

 *

 * @param $lockName string | 鎖的名字

 * @param $timeout int | 鎖的過期時間

 *

 * @return 成功返回identifier/失敗返回false

 */

 public function getLock($lockName, $timeout=2)

 {

 $identifier=uniqid(); #獲取唯一識別符號

 $timeout=ceil($timeout); #確保是整數

 $end=time()+$timeout;

 while(time()<$end)  #迴圈獲取鎖

 {

  /*

  #這裡的set操作可以等同於下面那個if操作,並且可以減少一次與redis通訊

  if($this->redis->set($lockName, $identifier array('nx', 'ex'=>$timeout)))

  return $identifier;

  */

  if($this->redis->setnx($lockName, $identifier)) #檢視$lockName是否被上鎖

  {

  $this->redis->expire($lockName, $timeout); #為$lockName設定過期時間

  return $identifier;    #返回一維識別符號

  }

  elseif ($this->redis->ttl($lockName)===-1)

  {

  $this->redis->expire($lockName, $timeout); #檢測是否有設定過期時間,沒有則加上

  }

  usleep(0.001);  #停止0.001ms

 }

 return false;

 }

 /**

 * @desc 釋放鎖

 *

 * @param $lockName string | 鎖名

 * @param $identifier string | 鎖的唯一值

 *

 * @param bool

 */

 public function releaseLock($lockName,$identifier)

 {

 if($this->redis->get($lockName)==$identifier) #判斷是鎖有沒有被其他客戶端修改

 {

  $this->redis->multi();

  $this->redis->del($lockName); #釋放鎖

  $this->redis->exec();

  return true;

 }

 else

 {

  return false; #其他客戶端修改了鎖,不能刪除別人的鎖

 }

 }

 

}

?>

測試:

1、建立createGroupChat.php(測試建立群組功能)

執行程式碼並建立568、569群組(群主為jack)

?

1

2

3

4

5

6

7

8

9

include './ManyPullMessage.class.php';

$object=new ManyPullMessage('192.168.95.11');

#建立群組

$user='jack';

$arr=array('jane1','jane2');

$a=$object->createGroupChat($user,$arr);

echo "<pre>";

print_r($a);

echo "</pre>";die;

2、建立addMembers.php(測試新增成員功能)

執行程式碼並新增新成員

?

1

2

3

4

5

6

include './ManyPullMessage.class.php';

$object=new ManyPullMessage('192.168.95.11');

$b=$object->addMembers('jack','568',array('jane1','jane2','jane3','jane4'));

echo "<pre>";

print_r($b);

echo "</pre>";die;

3、建立delete.php(測試群主刪除成員功能)

?

1

2

3

4

5

6

7

include './ManyPullMessage.class.php';

$object=new ManyPullMessage('192.168.95.11');

#群主刪除成員

$c=$object->delMembers('jack', '568', array('jane1','jane4'));

echo "<pre>";

print_r($c);

echo "</pre>";die;

4、建立sendMessage.php(測試傳送訊息功能)

多執行幾遍,568、569都發幾條

?

1

2

3

4

5

6

7

8

9

10

11

include './ManyPullMessage.class.php';

$object=new ManyPullMessage('192.168.95.11');

#傳送訊息

$user='jane2';

$message='go go go';

$groupChatID=568;

$arr=array('sender'=>$user, 'message'=>$message, 'time'=>time());

$d=$object->sendMessage($user,$groupChatID,$arr);

echo "<pre>";

print_r($d);

echo "</pre>";die;

5、建立getNewMessage.php(測試使用者獲取新訊息功能)

?

1

2

3

4

5

6

7

include './ManyPullMessage.class.php';

$object=new ManyPullMessage('192.168.95.11');

#使用者獲取新訊息

$e=$object->getNewMessage('jane2');

echo "<pre>";

print_r($e);

echo "</pre>";die;

6、建立getPartMessage.php(測試使用者獲取某個群組部分訊息)

(多傳送幾條訊息,用於測試。568中共18條資料)

?

1

2

3

4

5

6

7

include './ManyPullMessage.class.php';

$object=new ManyPullMessage('192.168.95.11');

#使用者獲取某個群組部分訊息

$f=$object->getPartMessage('jane2', 568, 1, 10);

echo "<pre>";

print_r($f);

echo "</pre>";die;

page=1,size=10

page=2,size=10

測試完畢,還需要別的功能可以自己進行修改新增測試。

這次整理這篇文章相對比較趕,心裡已經想著快點整理完趕緊學習其他的技術啦,哈哈。各位大神請留步,懇請各位給點學習redis的指導意見,本人職業方向是PHP

以上就是本文的全部內容,希望本文的內容對大家的學習或者工作能帶來一定的幫助,同時也希望多多支援指令碼之家!

轉載自:https://www.jb51.net/article/109695.htm