RabbitMQ+Redis叢集+Quartz實現簡單高併發秒殺
花了兩天時間實現了一個使用rabbitMQ佇列和redis叢集存取資料以及使用Quartz觸發新增秒殺商品。
這一塊小功能很早就想做的,自從自學了redis的命令,發現了expire能夠設定自動消亡的時候,我就已經開始蠢蠢欲動了,接著在接觸rabbitMQ工作模式(多個消費者爭搶資料)的時候,我已經下決心要實現秒殺了。
上個專案是9月底和朋友做完的,一個高併發分散式的專案,開6臺centOS虛擬機器搭建nginx、主從伺服器、redis叢集,rabbitMQ佇列,amoeba實現讀寫分離和主從資料庫,以及Solr搜尋。這個專案是用來練手linux與分散式的,大多數精力都花在搭環境上了,基本步驟也都能百度到,不想寫到部落格。正好這個秒殺的功能不多不少,思路還有點意思,所以寫一下與大家分享。
秒殺的設計理念:
限流: 鑑於只有少部分使用者能夠秒殺成功,所以要限制大部分流量,只允許少部分流量進入服務後端。前臺頁面控制
削峰:對於秒殺系統瞬時會有大量使用者湧入,所以在搶購一開始會有很高的瞬間峰值。高峰值流量是壓垮系統很重要的原因,所以如何把瞬間的高流量變成一段時間平穩的流量也是設計秒殺系統很重要的思路。實現削峰的常用的方法有利用快取和訊息中介軟體(RabbitMQ)等技術。
非同步處理:秒殺系統是一個高併發系統,採用非同步處理模式可以極大地提高系統併發量,其實非同步處理就是削峰的一種實現方式。(RabbitMQ實現)
記憶體快取:秒殺系統最大的瓶頸一般都是資料庫讀寫,由於資料庫讀寫屬於磁碟IO,效能很低,如果能夠把部分資料或業務邏輯轉移到記憶體快取,效率會有極大地提升。
我的第一次嘗試:
純粹使用一臺redis實現秒殺,是有同步安全問題的
因為redis是支援高併發的,一秒可以承受10000次的請求,所以暫且使用一臺redis試試效果,畢竟單臺redis是單執行緒,併發安全問題會少一點。
首先建立秒殺商品表,習慣使用PowerDesigner畫表格,因為只是簡單的一個Demo,只需要id, title, price, num, KillTime 五個屬性,分別指代商品id,商品標題,商品價格,秒殺商品的數量,以及秒殺開始的時間。
這個我是模仿淘寶的整點搶購,每一個小時掃描一次秒殺商品表,將商品按搶購時間釋出出來,將id作為key,num作為value寫入redis,並設定消亡時間為1s(為了測試方便設了5秒)。
當用戶點選搶購按鈕,首先在前端進行控制,如果時間還沒到整點前後兩分鐘的區間,直接在前端攔截(沒寫),else才傳送請求,使用ajax與restful方式傳送請求的url,根據接收的引數,反饋不一樣的資訊。
後臺收到請求之後,首先根據id從redis中get對應的num,如果為null,返回”notbegin”,判斷num>0則decr自減,返回true,否則返回finished,如果catch到了錯誤,返回false。
前端function:
<script type="text/javascript">
function startKill(btn) {
var id = $(btn).attr("id");
$.ajax({
type : "GET",
url : "${app}/SecKill/startKill/" + id,
dataType : 'text',
success : function(data) {
if (data == "true") {
alert("恭喜,搶購成功");
} else if (data == "notbegin") {
alert("活動還沒開始哦!");
} else if (data == "finished") {
alert("商品已經搶完");
} else {
alert("抱歉,搶購失敗");
}
}
});
}
function tick(){
var today = new Date();
var timeString = today.toLocaleString();
$(".clock").innerHTML = timeString;
window.setTimeout("tick();", 100);
}
window.onload = tick;
Quartz的配置:
<!-- 定義任務bean -->
<bean name="secKillJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<!-- 指定具體的job類 -->
<property name="jobClass" value="com.jt.manage.job.SecKillJob" />
<!-- 指定job的名稱 -->
<property name="name" value="SecKillJob" />
<!-- 指定job的分組 -->
<property name="group" value="SecKillJob" />
<!-- 必須設定為true,如果為false,當沒有活動的觸發器與之關聯時會在排程器中刪除該任務 -->
<property name="durability" value="true"/>
<!-- 指定spring容器的key,如果不設定在job中的jobmap中是獲取不到spring容器的 ,目的是使用applicationContext獲取orderMapper-->
<property name="applicationContextJobDataKey" value="applicationContext"/>
</bean>
<!-- 定義排程器 -->
<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="cronTrigger" />
</list>
</property>
</bean>
<!-- 定義觸發器 -->
<bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="secKillJobDetail" />
<!-- 每30秒執行一次 -->
<property name="cronExpression" value="0/30 * * * * ?" />
</bean>
Quartz按時執行的job
public class SecKillJob extends QuartzJobBean{
private static Connection connection = null;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
System.out.println("開始加入快取");
ApplicationContext applicationContext = (ApplicationContext) context.getJobDetail().getJobDataMap().get("applicationContext");
SecKillItemMapper mapper = applicationContext.getBean(SecKillItemMapper.class);
JedisCluster jedisCluster = applicationContext.getBean(JedisCluster.class);
SecKillItem secKillItem = new SecKillItem();
List<SecKillItem> list = mapper.select(secKillItem);
for (SecKillItem item : list) {
String id = item.getId().toString();
String num = item.getNum().toString();
jedisCluster.set(id, num);
jedisCluster.expire(id, 15);
//每種商品只新增一件
initConnection();
provider(token);
}
System.out.println("加入快取與Rabbitmq成功");
}
}
Controller層:
@Controller
@RequestMapping("/SecKill")
public class SecKillController {
@Autowired
private SecKillItemService secKillItemService;
@Autowired
private RabbitSecKillService rabbitSecKillService;
@RequestMapping("/list")
public String listSecKillItem(Model model) {
List<SecKillItem> secKillItemList = secKillItemService.findSecKillItemList();
model.addAttribute("secKillItemList", secKillItemList);
return "seckillitem-list";
}
@RequestMapping("/startKill/{id}")
@ResponseBody
public String startKill(@PathVariable Long id){
String result = secKillItemService.getSecKillResult(id);
System.out.println("result:"+result);
return result;
}
}
Service層
@Service
public class SecKillItemServiceImpl implements SecKillItemService {
@Autowired
private JedisCluster jedisCluster;
@Autowired
private SecKillItemMapper secKillItemMapper;
private static Connection connection = null;
@Override
public List<SecKillItem> findSecKillItemList() {
SecKillItem secKillItem = new SecKillItem();
List<SecKillItem> secKillItemList = secKillItemMapper.select(secKillItem);
return secKillItemList;
}
@Override
public String getSecKillResult(Long id) {
try {
System.out.println("id:"+id);
System.out.println("剩餘存活時間"+jedisCluster.ttl(id+""));
if(jedisCluster.get(id+"")==null){
System.out.println("活動還沒開始");
return "notbegin";
}
String num = jedisCluster.get(id+"");
Integer Num = Integer.parseInt(num);
System.out.println("商品"+id+"當前有"+Num+"個");
if(Num>0){
Num-=1;
jedisCluster.decr(id+"");
System.out.println("商品"+id+"剩餘"+jedisCluster.get(id+"")+"個");
return "true";
}else{
return "finished";
}
} catch (Exception e) {
return "false";
}
}
}
併發100次發現有3次為true,出現了安全問題,分析了一下,是因為service存在if判斷。
百度了下redis有自帶的getset方法可以進行同步操作,不過只用getset,還是要判斷返回的數量,這個不太靠譜。
我的第二次嘗試:
使用rabbitMQ的阻塞佇列和redis叢集來實現
因為rabbitMQ的排隊入庫,真的是很好的削峰技術,我覺得是可行的。
這裡有一個點:當rabbitMQ用於資料庫削峰的時候,在配置檔案中配置了接收資料的方法,這個方法預設開啟且被動接受;
然而秒殺的接收資料方是使用者,當用戶主動點選秒殺時,使用者才從佇列中獲取資料,所以不能使用配置檔案。。。可能是rabbitmq沒有這樣一個倒過來的處理邏輯,那我只能手寫佇列的生產者和消費者了。
而且鑑於第一次嘗試中的get 和 set 操作同時存在導致redis不安全的問題,這次不從redis中get資料進行判斷,將redis只作為資料的提供者。
還有一個點:單臺redis是單執行緒的,是安全的,只要你正確使用它的執行緒安全的方法;但是redis叢集是多執行緒的,是不安全的,即使有hash一致性演算法,即使redis叢集可以整體操作,但是底層還是單獨的redis在工作,所以事務也是不支援的。
這次我每次初始化provider的時候,只在佇列中儲存一個token,使用者點選後consumer方法和provider共用一個connection,注意不能使用ThreadLocal,因為他們是不同的執行緒的資料,會get到null。自己手寫一個普通的靜態類來維護connection就好,因為一件商品只需要在一個佇列中存取
新job:
public class SecKillJob extends QuartzJobBean{
private static Connection connection = null;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
System.out.println("開始加入快取");
ApplicationContext applicationContext = (ApplicationContext) context.getJobDetail().getJobDataMap().get("applicationContext");
SecKillItemMapper mapper = applicationContext.getBean(SecKillItemMapper.class);
JedisCluster jedisCluster = applicationContext.getBean(JedisCluster.class);
SecKillItem secKillItem = new SecKillItem();
//只秒殺第一件商品
secKillItem.setId(1L);
List<SecKillItem> list = mapper.select(secKillItem);
for (SecKillItem item : list) {
//純使用redis存取資料,使用num,get和set會有執行緒安全問題
/**
* 使用rabbitMQ+redis,在redis中存入(key,value) id--token,token根據商品id計算,
* 並將其存入rabbitMQ,存入的數量表示可以被搶購的數量
*/
String id = item.getId().toString();
String token = id+"-token";
jedisCluster.set(id, token);
jedisCluster.expire(id, 15);
//每種商品只新增一件
initConnection();
provider(token);
}
System.out.println("加入快取與Rabbitmq成功");
}
public static void initConnection(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.196.137");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/SecKillHost");
connectionFactory.setUsername("SecKillAdmin");
connectionFactory.setPassword("626316");
try {
connection = connectionFactory.newConnection();
//ConnThreadLocal.setConn(connection);
Conn.setConn(connection);
System.out.println("這是SecKillJob的initConnection(), Connection==null??"+(connection==null));
} catch (IOException e) {
e.printStackTrace();
}
}
public static void provider(String token){
try {
//1.只有通過通道才能連線rabbitMQ
Channel channel = connection.createChannel();
//每次存入redis前先刪除這個佇列
channel.queueDelete("SecKillQueue");
//2.定義佇列的名稱
String queue = "SecKillQueue";
//3.宣告佇列
channel.queueDeclare(queue, false, false, false, null);
channel.basicPublish("", queue, null, token.getBytes());
System.out.println("一件商品已存入rabbitMq,token="+token);
channel = connection.createChannel();
} catch (Exception e) {
e.printStackTrace();
}
}
}
新service
public class SecKillItemServiceImpl implements SecKillItemService {
@Autowired
private JedisCluster jedisCluster;
@Autowired
private SecKillItemMapper secKillItemMapper;
private static Connection connection = null;
@Override
public List<SecKillItem> findSecKillItemList() {
SecKillItem secKillItem = new SecKillItem();
List<SecKillItem> secKillItemList = secKillItemMapper.select(secKillItem);
return secKillItemList;
}
public String getSecKillResult(Long id){
try {
String token = consumer();
Long tokenId = Long.parseLong(token.substring(0, token.indexOf("-")));
if(id.equals(tokenId)){
return "true";
}else{
System.out.println("活動還沒開始/已經搶購完畢");
return "notbegin";
}
} catch (Exception e) {
e.printStackTrace();
return "false";
}
}
public static String consumer() throws Exception{
//connection=ConnThreadLocal.getConn();
connection = Conn.getConn();
System.out.println("這是SecKillServiceImpl 的 consumer() , 從ConnThreadLocal獲取的Connection==null??"+(connection==null));
Channel channel = connection.createChannel();
String queue = "SecKillQueue";
channel.queueDeclare(queue, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, false, consumer);//當開啟autoAsk時,若處理出錯,則生產者還會將此訊息給別的消費者使用,所以,要關掉自動應答
System.out.println("準備好接收了");
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String token = new String(delivery.getBody());
System.out.println("接收到的token"+token);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
return token;
}
}
用來維護connection的Conn類
public class Conn {
private static Connection connection;
public static void setConn(Connection connection){
Conn.connection = connection;
}
public static Connection getConn(){
return Conn.connection;
}
}
rabbitMQ的工作模式:
原理說明:
生產者為訊息佇列中生產訊息,多個消費者爭搶執行權利,誰搶到誰執行.
初步實現的效果:
rabbitMQ後臺:
併發一百個,的確只有一個獲取到了
有一個問題是:除第一個顯示搶購成功,其餘的在設定的15秒內是沒有反應的,因為一直在等待rabbitMQ的資料,直到我銷燬queue。在實際業務中,可以採用頁面動畫來緩解使用者情緒,比如大轉盤之類的,先轉個十幾秒哈哈。
還有,可以在前臺直接產生隨機數,比如100以內的隨機數,只有隨機數為1的可以訪問後臺,其餘直接搶購失敗,也是實際業務中可能使用的
其實還有分散式鎖的解決辦法,不難,而且可以不使用佇列
不過暫時不研究了,因為我要開始找實習了,在實踐中發掘更有用的知識。快12月了,校招都沒了,不好找啊。。。。
祝自己架構師之路順利^_^