python分散式事務方案(二)基於訊息最終一致性
python分散式事務方案(二)基於訊息最終一致性
上一章採用的是tcc方案,但是在進行批量操作時,比如說幾百臺主機一起分配策略時,會執行很長時間,這時體驗比較差。 由於zabbix隱藏域後臺,而這個慢主要是集中在呼叫zabbix介面,這裡我們就基於訊息最終一致性來進行優化 訊息一致性方案是通過訊息中介軟體保證上、下游應用資料操作的一致性。基本思路是將本地操作和傳送訊息放在一個事務中,保證本地操作和訊息傳送要麼兩者都成功或者都失敗。下游應用向訊息系統訂閱該訊息,收到訊息後執行相應操作。
本地訊息表是一種簡化版的方案,將資料庫中的表來作為訊息中介軟體。 本地訊息表這種實現方式應該是業界使用最多的,其核心思想是將分散式事務拆分成本地事務進行處理,這種思路是來源於ebay。我們可以從下面的流程圖中看出其中的一些細節:
基本思路就是:
訊息生產方,需要額外建一個訊息表,並記錄訊息傳送狀態。訊息表和業務資料要在一個事務裡提交,也就是說他們要在一個數據庫裡面。然後訊息會經過MQ傳送到訊息的消費方。如果訊息傳送失敗,會進行重試傳送。
訊息消費方,需要處理這個訊息,並完成自己的業務邏輯。此時如果本地事務處理成功,表明已經處理成功了,如果處理失敗,那麼就會重試執行。如果是業務上面的失敗,可以給生產方傳送一個業務補償訊息,通知生產方進行回滾等操作。
生產方和消費方定時掃描本地訊息表,把還沒處理完成的訊息或者失敗的訊息再發送一遍。如果有靠譜的自動對賬補賬邏輯,這種方案還是非常實用的。
這種方案遵循BASE理論,採用的是最終一致性,筆者認為是這幾種方案裡面比較適合實際業務場景的,即不會出現像2PC那樣複雜的實現(當呼叫鏈很長的時候,2PC的可用性是非常低的),也不會像TCC那樣可能出現確認或者回滾不了的情況。
-
優點: 一種非常經典的實現,避免了分散式事務,實現了最終一致性。在 .NET中 有現成的解決方案。
-
缺點: 訊息表會耦合到業務系統中,如果沒有封裝好的解決方案,會有很多雜活需要處理。
下面是實現步驟:
1、先建立本地訊息表
MESSAGE_STATUS={ 'active':'active', 'fail':'fail', 'success':'success' } class Message(models.Model): topic = models.CharField(max_length=50, blank=True) event_module = models.CharField(max_length=50, blank=True,null=True) event_fun= models.CharField(max_length=30, blank=True,null=True) params=models.TextField(null=True) remark=models.CharField(max_length=300, blank=True,null=True) status = models.CharField(max_length=20, blank=True) exec_count=models.SmallIntegerField(null=True) error_msg = models.TextField(null=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) index_together = ('status','exec_count') #聯合索引 def __unicode__(self): return '%s' % self.remark def __str__(self): return '%s' % self.remark
2、定義生產者api 這裡提前定義了mysql和rabbix兩種訊息儲存方式
from models import Message
from serializers import MessageSerializer
def event_add(message):
MysqlQueue().add(message)
class MessageQueue():
def __init__(self):
pass
def add(self,message):
pass
class MysqlQueue(MessageQueue):
def add(self,message):
message["status"]="active"
message["exec_count"]=0
serializer=MessageSerializer(data=message)
serializer.is_valid(raise_exception=True)
serializer.save()
class RabbitQueue(MessageQueue):
def add(self,message):
pass
3、定義消費者api,這裡使用定時任務框架celery
import json
from models import Message
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
class MessageConsumer:
def receive(self, topic=None):
queryset = Message.objects.filter(exec_count__lt=5).exclude(status='success')
if topic:
queryset = queryset.filter(topic=topic)
messages = queryset.order_by('id').all()
for message in messages:
try:
m = __import__(message.event_module, fromlist=True)
if hasattr(m, message.event_fun):
target_func = getattr(m, message.event_fun)
logger.info(message.params)
target_func(json.loads(message.params))
message.status='success'
message.exec_count=message.exec_count + 1
message.save()
else:
logger.error("can not find function " + message.event_fun)
message.status='fail'
message.exec_count=message.exec_count + 1
message.error_msg="can not find function" + message.event_fun
message.save()
except Exception ,e:
logger.error("exec message fail,id:" + str(message.id)+",cause by "+e.message)
logger.exception(e)
message.status='fail'
message.exec_count=message.exec_count + 1
message.error_msg=e.message
message.save()
4、定義定時任務,這裡如果已經有一個定時任務在跑,則直接跳過
exec_flag=False
@shared_task(ignore_result=True)
def reveive_event_message():
global exec_flag
if exec_flag:
logger.warning("exists a tast exec reveive_event_message")
return
exec_flag=True
logger.info("reveive_event_message start")
MessageConsumer().receive()
logger.info("reveive_event_message end")
exec_flag=False
5、下面定義業務呼叫
def add_message(event_fun,params,remark):
event_message = dict()
event_message["topic"] = "topci"
event_message["event_module"] = "callback path"
event_message["event_fun"] =event_fun
event_message["params"] = json.dumps(params)
event_message["remark"] =remark
logger.debug(event_message)
event_add(event_message)
def create(self, request, *args, **kwargs):
'''
policy add
'''
assets = request.data["data"]
try:
with transaction.atomic():
#save policy
for ;;
#傳送訊息
add_message("async_update_zabbix_trigger",params,"update trigger ")
except rest_framework_serializers.ValidationError, e:
logger.exception(e)
raise
6、定義回撥方法,這裡由於是使用python可以直接傳方法名,就可以進行回撥 比如說建立定時器
def async_create_zabbix_trigger(params):
client = ZabbixClientProxy()
host_id = get_zabbix_host_by_uuid(uuid)
zabbix_items = get_zabbix_items(host_id)
if zabbix_items is None or len(zabbix_items) == 0:
return
condition = alert_models.ConditionItem.objects.get(id=params["condition_id"])
condition.alert_duration = params["alert_duration"]
condition.item_threshold = params["item_threshold"]
triggers = create_zabbix_trigger(client, asset, zabbix_items, condition, uuid)
serializer = policy_serializers.ConditionTriggerSerializer(data=triggers, many=True)
serializer.is_valid(raise_exception=True)
serializer.save()
這裡可以配置一個最大重試次數,如果超過就不會進行重試,這時就會發送郵件通知管理員進行手工重試,來達到最終一致性