Openstack Cinder中建立volume過程的原始碼解析(9)
感謝朋友支援本部落格,歡迎共同探討交流,由於能力和時間有限,錯誤之處在所難免,歡迎指正!
如果轉載,請保留作者資訊。
部落格地址:http://blog.csdn.net/gaoxingnengjisuan
郵箱地址:[email protected]
我們在上一篇部落格中可以看到,在建立新卷之前,需要獲取建立卷的目標主機,經過分析原始碼我們可以知道,如果沒有指定建立新卷的目標主機,需要通過排程器演算法實現目標主機的確定,如果指定了建立新卷的目標主機,則直接獲取目標主機,無論是哪種情況,都需要呼叫方法self.volume_rpcapi.create_volume來實現在目標主機上新卷的建立。在這篇部落格中,我們就具體來分析這個方法的實現過程。
我們先來看方法create_volume的原始碼實現:
我們可以看到,這裡也是應用了廣播方法cast實現遠端呼叫方法create_volume,即/cinder/volume/manager.py----class VolumeManager----def create_volume,我們具體來看這個方法的實現原始碼:def create_volume(self, ctxt, volume, host, request_spec, filter_properties, allow_reschedule=True, snapshot_id=None, image_id=None, source_volid=None): """ 遠端呼叫實現建立並匯出卷; """ request_spec_p = jsonutils.to_primitive(request_spec) self.cast(ctxt, self.make_msg('create_volume', volume_id=volume['id'], request_spec=request_spec_p, filter_properties=filter_properties, allow_reschedule=allow_reschedule, snapshot_id=snapshot_id, image_id=image_id, source_volid=source_volid), # queue_get_for:根據給定的topic和host獲取對應的佇列名稱; topic=rpc.queue_get_for(ctxt, self.topic, host), version='1.4')
可見,這裡再一次應用taskflow模式來實現建立並匯出卷的操作。我們具體來看方法get_manager_flow的原始碼實現:@utils.require_driver_initialized def create_volume(self, context, volume_id, request_spec=None, filter_properties=None, allow_reschedule=True, snapshot_id=None, image_id=None, source_volid=None): """ Creates and exports the volume. 建立並匯出卷; """ # 構建並返回用於通過管理器建立卷的flow; flow = create_volume.get_manager_flow( self.db, self.driver, self.scheduler_rpcapi, self.host, volume_id, request_spec=request_spec, filter_properties=filter_properties, allow_reschedule=allow_reschedule, snapshot_id=snapshot_id, image_id=image_id, source_volid=source_volid, reschedule_context=context.deepcopy()) assert flow, _('Manager volume flow not retrieved') # 進行flow的執行操作; flow.run(context.elevated()) if flow.state != states.SUCCESS: raise exception.CinderException(_("Failed to successfully complete" " manager volume workflow")) self._reset_stats() return volume_id
def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
request_spec=None, filter_properties=None,
allow_reschedule=True,
snapshot_id=None, image_id=None, source_volid=None,
reschedule_context=None):
"""
Constructs and returns the manager entrypoint flow.
構建並返回用於通過管理器建立卷的flow;
flow將會做以下的事情:
1. 首先要確定我們是否允許進行重新排程,因為這影響了我們如何對出現錯誤的情況進行處理;
2. 為相關的task注入keys和values;
3. 對於出錯的task進行處理,傳送錯誤通知,記錄錯誤資訊等;
4. 實現了從輸入的引數中提取建立卷的規範資訊的操作;
5. 通知已經開始進行卷的建立操作;
6. 根據所獲取的建立卷的規範資訊實現卷的建立操作;
7. 當成功的建立卷之後,完成卷建立之後的通知操作;
"""
# flow_name:volume_create_manager;
flow_name = ACTION.replace(":", "_") + "_manager"
# 獲取類Flow的例項化物件;
volume_flow = linear_flow.Flow(flow_name)
# Determine if we are allowed to reschedule since this affects how
# failures will be handled.
# 首先要確定我們是否允許進行重新排程,因為這影響了我們如何對出現錯誤的情況進行處理;
if not filter_properties:
filter_properties = {}
if not request_spec and allow_reschedule:
LOG.debug(_("No request spec, will not reschedule"))
allow_reschedule = False
if not filter_properties.get('retry', None) and allow_reschedule:
LOG.debug(_("No retry filter property or associated "
"retry info, will not reschedule"))
allow_reschedule = False
# 新增一個給定的task到flow;
# 這個類實現了注入字典資訊到flow中;
volume_flow.add(base.InjectTask({
'filter_properties': filter_properties,
'image_id': image_id,
'request_spec': request_spec,
'snapshot_id': snapshot_id,
'source_volid': source_volid,
'volume_id': volume_id,
}, addons=[ACTION]))
# 如果不允許進行重新排程的操作;
if not allow_reschedule:
# On failure ensure that we just set the volume status to error.
LOG.debug(_("Retry info not present, will not reschedule"))
# 新增一個給定的task到flow;
# 這個task實現了當出現錯誤時,設定指定id的卷的狀態為ERROR;
volume_flow.add(OnFailureChangeStatusTask(db))
# 如果允許進行重新排程的操作;
else:
# 新增一個給定的task到flow;
# 觸發一個傳送進行重新排程的請求,當進行task恢復回滾操作的時候;
volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, scheduler_rpcapi))
# 新增一個給定的task到flow;
# 提取一個用於建立卷的通用結構規範;
volume_flow.add(ExtractVolumeSpecTask(db))
# 新增一個給定的task到flow;
# 執行關於給定卷的相關通知操作,獲取指定卷的使用率資訊,並進行通知操作;
volume_flow.add(NotifyVolumeActionTask(db, host, "create.start"))
# 新增一個給定的task到flow;
# 根據所提供的規範要求實現卷的建立操作;
volume_flow.add(CreateVolumeFromSpecTask(db, host, driver))
# 新增一個給定的task到flow;
# 當成功的建立卷之後,完成卷建立之後的通知操作;
volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end"))
# 獲取flow的除錯資訊;
return flow_utils.attach_debug_listeners(volume_flow)
這裡最重要的一個task就是CreateVolumeFromSpecTask,它所實現的操作就是根據所提供的規範要求實現卷的建立。我們具體來看這個類的原始碼實現:
class CreateVolumeFromSpecTask(base.CinderTask):
"""
根據所提供的規範要求實現卷的建立操作;
"""
def __init__(self, db, host, driver):
super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION])
self.db = db
self.driver = driver
self.requires.update(['volume_spec', 'volume_ref'])
self._create_func_mapping = {
'raw': self._create_raw_volume,
'snap': self._create_from_snapshot,
'source_vol': self._create_from_source_volume,
'image': self._create_from_image,
}
self.host = host
def __call__(self, context, volume_ref, volume_spec):
"""
根據所提供的規範要求實現卷的建立操作;
"""
if not self.driver.initialized:
LOG.error(_("Unable to create volume, driver not initialized"))
driver_name = self.driver.__class__.__name__
raise exception.DriverNotInitialized(driver=driver_name)
# 獲取建立卷的型別資訊;
create_type = volume_spec.pop('type', None)
# 根據具體的建立卷的型別,獲取對應的建立卷的方法;
# self._create_func_mapping = {
# 'raw': self._create_raw_volume,
# 'snap': self._create_from_snapshot,
# 'source_vol': self._create_from_source_volume,
# 'image': self._create_from_image,
# }
create_functor = self._create_func_mapping.get(create_type)
if not create_functor:
raise exception.VolumeTypeNotFound(volume_type_id=create_type)
volume_spec = dict(volume_spec)
volume_id = volume_spec.pop('volume_id', None)
if not volume_id:
volume_id = volume_ref['id']
LOG.info(_("Volume %(volume_id)s: being created using %(functor)s "
"with specification: %(volume_spec)s") %
{'volume_spec': volume_spec, 'volume_id': volume_id,
'functor': _make_pretty_name(create_functor)})
volume_ref['host'] = self.host
# 根據確定的要呼叫的建立卷的方法,呼叫這個方法實現指定型別的卷的建立操作;
model_update = create_functor(context, volume_ref=volume_ref,
**volume_spec)
try:
if model_update:
volume_ref = self.db.volume_update(context, volume_ref['id'], model_update)
except exception.CinderException as ex:
if model_update:
LOG.exception(_("Failed updating model of volume %(volume_id)s"
" with creation provided model %(model)s") %
{'volume_id': volume_id, 'model': model_update})
raise exception.ExportFailure(reason=ex)
model_update = None
try:
LOG.debug(_("Volume %s: creating export"), volume_ref['id'])
# 為邏輯卷建立匯出介面;
model_update = self.driver.create_export(context, volume_ref)
if model_update:
self.db.volume_update(context, volume_ref['id'], model_update)
except exception.CinderException as ex:
if model_update:
LOG.exception(_("Failed updating model of volume %(volume_id)s"
" with driver provided model %(model)s") %
{'volume_id': volume_id, 'model': model_update})
raise exception.ExportFailure(reason=ex)
我們在這個類的初始化方法中可以看到:
self._create_func_mapping = {
'raw': self._create_raw_volume,
'snap': self._create_from_snapshot,
'source_vol': self._create_from_source_volume,
'image': self._create_from_image,
}
這裡就指明瞭建立新卷的四種途徑,即直接建立raw格式的新卷、從快照建立新卷、從已有的卷建立新卷和從映象建立新卷。在上述類的__call__方法中,根據具體情況分別呼叫了不用的方法實現了新卷的建立,我們來看看這幾個建立新卷的方法的原始碼:
def _create_raw_volume(self, context, volume_ref, **kwargs):
"""
實現raw格式卷的建立;
"""
return self.driver.create_volume(volume_ref)
def _create_from_snapshot(self, context, volume_ref, snapshot_id,
**kwargs):
"""
實現從快照建立卷的操作,並根據具體情況實現對指定卷的glance元資料進行更新操作;
"""
volume_id = volume_ref['id']
# 獲取指定卷的快照;
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
# 呼叫具體驅動中的create_volume_from_snapshot方法,實現從快照建立卷;
model_update = self.driver.create_volume_from_snapshot(volume_ref,
snapshot_ref)
make_bootable = False
try:
# 根據volume_id獲取volume;
originating_vref = self.db.volume_get(context, snapshot_ref['volume_id'])
make_bootable = originating_vref.bootable
except exception.CinderException as ex:
LOG.exception(_("Failed fetching snapshot %(snapshot_id)s bootable"
" flag using the provided glance snapshot "
"%(snapshot_ref_id)s volume reference") %
{'snapshot_id': snapshot_id,
'snapshot_ref_id': snapshot_ref['volume_id']})
raise exception.MetadataUpdateFailure(reason=ex)
if make_bootable:
# 根據具體情況實現對指定卷的glance元資料進行更新操作;
self._handle_bootable_volume_glance_meta(context, volume_id,
snapshot_id=snapshot_id)
return model_update
def _create_from_source_volume(self, context, volume_ref,
source_volid, **kwargs):
"""
實現從源卷建立(實際上就是直接拷貝)卷的操作;
"""
# 根據source_volid獲取卷的資訊;
srcvol_ref = self.db.volume_get(context, source_volid)
# 建立指定卷的克隆;
model_update = self.driver.create_cloned_volume(volume_ref, srcvol_ref)
# 根據具體情況實現對指定卷的glance元資料進行更新操作;
if srcvol_ref.bootable:
self._handle_bootable_volume_glance_meta(context, volume_ref['id'],
source_volid=source_volid)
return model_update
def _create_from_image(self, context, volume_ref,
image_location, image_id, image_meta,
image_service, **kwargs):
"""
從映象實現卷的建立;
"""
LOG.debug(_("Cloning %(volume_id)s from image %(image_id)s "
" at location %(image_location)s") %
{'volume_id': volume_ref['id'],
'image_location': image_location, 'image_id': image_id})
# 從現有的映象有效的建立一個卷;
model_update, cloned = self.driver.clone_image(volume_ref, image_location, image_id)
# 如果沒有實現克隆,說明沒有指定的映象;
# 實現建立卷,並下載映象資料到卷中;
if not cloned:
# 實現建立卷,並下載映象資料到卷中;
model_update = self.driver.create_volume(volume_ref)
updates = dict(model_update or dict(), status='downloading')
# 更新卷的狀態;
try:
volume_ref = self.db.volume_update(context,
volume_ref['id'], updates)
except exception.CinderException:
LOG.exception(_("Failed updating volume %(volume_id)s with "
"%(updates)s") %
{'volume_id': volume_ref['id'],
'updates': updates})
# 下載glance映象資料到指定的卷;
self._copy_image_to_volume(context, volume_ref, image_id, image_location, image_service)
# 根據具體情況實現對指定卷的glance元資料進行更新操作;
self._handle_bootable_volume_glance_meta(context, volume_ref['id'],
image_id=image_id,
image_meta=image_meta)
return model_update
再來看這幾個方法的原始碼,不同的方法中會進一步呼叫不同的方法來實現新卷的建立,這就直接與/cinder/volume/drivers中的不同的塊儲存後端實現直接聯絡到一起了,具體呼叫的是那一種塊儲存器中的建立卷的方法,就是由self.driver所確定的。
OK!到此為止,cinder中建立新卷的整體流程的原始碼分析已經全部完成,其實我想說的一句話就是,如果真的把這個流程的實現過程搞清楚,那麼cinder模組的原始碼也就基本掌握了。
謝謝大家的支援!
相關推薦
Openstack Cinder中建立volume過程的原始碼解析(8)
感謝朋友支援本部落格,歡迎共同探討交流,由於能力和時間有限,錯誤之處在所難免,歡迎指正!如果轉載,請保留作者資訊。部落格地址:http://blog.csdn.net/gaoxingnengjisuan郵箱地址:[email protected] 在這篇部落格中,
Openstack Cinder中建立volume過程的原始碼解析(9)
感謝朋友支援本部落格,歡迎共同探討交流,由於能力和時間有限,錯誤之處在所難免,歡迎指正!如果轉載,請保留作者資訊。部落格地址:http://blog.csdn.net/gaoxingnengjisuan郵箱地址:[email protected] 我們在上一篇部落
Mybaits 原始碼解析 (二)----- 根據配置檔案建立SqlSessionFactory(Configuration的建立過程)
我們使用mybatis操作資料庫都是通過SqlSession的API呼叫,而建立SqlSession是通過SqlSessionFactory。下面我們就看看SqlSessionFactory的建立過程。 配置檔案解析入口 我們看看第一篇文章中的測試方法 1 public static void m
Mybaits 原始碼解析 (四)----- SqlSession的建立過程(看懂框架原始碼再也不用死記硬背面試題)
SqlSession是mybatis的核心介面之一,是myabtis介面層的主要組成部分,對外提供了mybatis常用的api。myabtis提供了兩個SqlSesion介面的實現,常用的實現類是DefaultSqlSession。它相當於一個數據庫連線物件,在一個SqlSession中可以執行多條SQL語句
Mybaits 原始碼解析 (六)----- 全網最詳細:Select 語句的執行過程分析(上篇)(Mapper方法是如何呼叫到XML中的SQL的?)
上一篇我們分析了Mapper介面代理類的生成,本篇接著分析是如何呼叫到XML中的SQL 我們回顧一下MapperMethod 的execute方法 public Object execute(SqlSession sqlSession, Object[] args) { Object res
EventBus原始碼解析(一)—訂閱過程
1.EventBus原始碼解析(一)—訂閱過程 2.EventBus原始碼解析(二)—釋出事件和登出流程 前言 最近發現EventBus用起來是真的方便,本來對於EventBus我對於這個框架的原始碼的閱讀的優先順序是比較低的,因為這個框架不像OkHttp,Gli
【MapReduce詳解及原始碼解析(一)】——分片輸入、Mapper及Map端Shuffle過程
title: 【MapReduce詳解及原始碼解析(一)】——分片輸入、Mapper及Map端Shuffle過程 date: 2018-12-03 21:12:42 tags: Hadoop categories: 大資料 toc: true 點選檢視我的部落格:Josonlee’
ElasticSearch原始碼解析(三):索引建立
我們先來看看索引建立的事例程式碼: Directory directory = FSDirectory.getDirectory("/tmp/testindex"); // Use standard analyzer Analyzer analyzer = new
Spring原始碼解析(4):IOC過程下
上文說到populateBean方法中,對被@Autowired註解的屬性方法進行注入。在這之後,BeanFactory執行applyPropertyValues方法,這個方法中,一個是把之前解析出來的屬性值設定到bean中去;一個是繼續解析出BeanDefinition中定
roaringbitmap 原始碼解析(3)底層容器相互add過程
今天主要講述roaringbitmap的add計算。 先判斷高位是否存在相同的,如果相同,再低位的容器相add。否則,直接跳過 public static RoaringBitmap and(final RoaringBitmap x1, final
Spring原始碼解析(七):Spring AOP中對攔截器呼叫的實現
前面我們分析了Spring AOP實現中得到Proxy物件的過程,下面我們看看在Spring AOP中攔截器鏈是怎樣被呼叫的,也就是Proxy模式是怎樣起作用的,或者說Spring是怎樣為我們提供AOP功能的; 在JdkDynamicAopProxy中生成Proxy物件的時
Mybaits 原始碼解析 (七)----- Select 語句的執行過程分析(下篇)全網最詳細,沒有之一
我們上篇文章講到了查詢方法裡面的doQuery方法,這裡面就是呼叫JDBC的API了,其中的邏輯比較複雜,我們這邊文章來講,先看看我們上篇文章分析的地方 SimpleExecutor 1 public <E> List<E> doQuery(MappedStatement m
HashMap原始碼解析(JDK8)
前言 這段時間有空,專門填補了下基礎,把常用的ArrayList、LinkedList、HashMap、LinkedHashMap、LruCache原始碼看了一遍,List相對比較簡單就不單獨介紹了,Map準備用兩篇的篇幅,分別介紹HashMap和(LruCache+LinkedHa
qt 中建立一個工作執行緒(例子)
當一個事件需要很長的處理時間,就建立一個工作執行緒,防止主介面卡死。 1.新建一個QT的gui專案,裡面包含main.cpp,mainwindow.h,mainwindow.cpp,mainwindow.ui檔案 2.新建一個頭檔案thread.h,派生一個執行緒類,重新寫一個執行緒的入口函式。
Spring原始碼解析(十三)——AOP原理——AnnotationAwareAspectJAutoProxyCreator註冊
* 2、 AnnotationAwareAspectJAutoProxyCreator: * AnnotationAwareAspectJAutoProxyCreator &nbs
Spring原始碼解析(四)——元件註冊4
/** * 給容器中註冊元件; * 1)、包掃描+元件標註註解(@Controller/@Service/@Repository/@Component)[自己寫的類] * 2)、@Bean[匯入的第三方包裡面的元件] * 3)、@Import[快速給容器中匯入一個
Spring原始碼解析(八)——生命週期——BeanPostProcessor在spring底層的使用
一、ApplicationContextAwareProcessor import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import or
Spring原始碼解析(七)——生命週期——BeanPostProcessor
https://blog.csdn.net/u011734144/article/details/72600932 http://www.cnblogs.com/lucas2/p/9430169.html BeanPostProcessor:bean的後置處理器。在bean
Spring原始碼解析(三)——元件註冊3
@Scope設定元件作用域 import com.ken.domain.Person; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Config
Spring原始碼解析(二)——元件註冊2
import com.ken.service.BookService; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.