Openstack Keystone 認證流程(六)--認證
1. 身份認證
在前一章中, 介紹了路由的過程, 這樣我們就能URL中輕易地找到所對應的需要執行的程式碼。在這一章中, 我們看看具體的一個認證請求是如何被處理的。
假設有如下一個請求:
$ curl -s -X POST http://8.21.28.222:35357/v2.0/tokens \
-H "Content-Type: application/json" \
-d '{"auth": {"tenantName": "'"$OS_TENANT_NAME"'", "passwordCredentials":
{"username": " '"$OS_USERNAME"'", "password": "'"$OS_PASSWORD"'"}}}' \
| python -m json.tool
從Keystone.paste.ini中/v2.0 = admin_api
, 可以找出對應的流水線為admin_api
, 然後找到流水線的最一個點如下:
[app:admin_service]
paste.app_factory = keystone.service:admin_app_factory
最後可以在token/routers可以找到如下一條路由:
from keystone.token import controllers
...
token_controller = controllers.Auth()
mapper.connect('/tokens',
controller=token_controller,
action='authenticate',
conditions=dict(method=['POST']))
所以對應的controller為keystone.token.controllers.Auth, action為authenticate,找到對應的方法, 其程式碼如下:
def authenticate(self, context, auth=None):
"""Authenticate credentials and return a token.
Accept auth as a dict that looks like::
{
"auth":{
"passwordCredentials":{
"username":"test_user",
"password":"mypass"
},
"tenantName":"customer-x"
}
}
In this case, tenant is optional, if not provided the token will be
considered "unscoped" and can later be used to get a scoped token.
Alternatively, this call accepts auth with only a token and tenant
that will return a token that is scoped to that tenant.
"""
if auth is None:
raise exception.ValidationError(attribute='auth',
target='request body')
auth_token_data = None
if "token" in auth:
# Try to authenticate using a token
auth_info = self._authenticate_token(
context, auth)
else:
# Try external authentication
try:
auth_info = self._authenticate_external(
context, auth)
except ExternalAuthNotApplicable:
# Try local authentication
auth_info = self._authenticate_local(
context, auth)
user_ref, tenant_ref, metadata_ref, expiry, bind = auth_info
core.validate_auth_info(self, user_ref, tenant_ref)
# NOTE(morganfainberg): Make sure the data is in correct form since it
# might be consumed external to Keystone and this is a v2.0 controller.
# The user_ref is encoded into the auth_token_data which is returned as
# part of the token data. The token provider doesn't care about the
# format.
user_ref = self.identity_api.v3_to_v2_user(user_ref)
if tenant_ref:
tenant_ref = self.filter_domain_id(tenant_ref)
auth_token_data = self._get_auth_token_data(user_ref,
tenant_ref,
metadata_ref,
expiry)
if tenant_ref:
catalog_ref = self.catalog_api.get_catalog(
user_ref['id'], tenant_ref['id'], metadata_ref)
else:
catalog_ref = {}
auth_token_data['id'] = 'placeholder'
if bind:
auth_token_data['bind'] = bind
roles_ref = []
for role_id in metadata_ref.get('roles', []):
role_ref = self.identity_api.get_role(role_id)
roles_ref.append(dict(name=role_ref['name']))
(token_id, token_data) = self.token_provider_api.issue_v2_token(
auth_token_data, roles_ref=roles_ref, catalog_ref=catalog_ref)
return token_data
因為是採用的本地使用者名稱和密碼認證,所以最終會進入_authenticate_local,繼續看程式碼
def _authenticate_local(self, context, auth):
"""Try to authenticate against the identity backend.
Returns auth_token_data, (user_ref, tenant_ref, metadata_ref)
"""
if 'passwordCredentials' not in auth:
raise exception.ValidationError(
attribute='passwordCredentials', target='auth')
if "password" not in auth['passwordCredentials']:
raise exception.ValidationError(
attribute='password', target='passwordCredentials')
password = auth['passwordCredentials']['password']
if password and len(password) > CONF.identity.max_password_length:
raise exception.ValidationSizeError(
attribute='password', size=CONF.identity.max_password_length)
if ("userId" not in auth['passwordCredentials'] and
"username" not in auth['passwordCredentials']):
raise exception.ValidationError(
attribute='username or userId',
target='passwordCredentials')
user_id = auth['passwordCredentials'].get('userId', None)
if user_id and len(user_id) > CONF.max_param_size:
raise exception.ValidationSizeError(attribute='userId',
size=CONF.max_param_size)
username = auth['passwordCredentials'].get('username', '')
if len(username) > CONF.max_param_size:
raise exception.ValidationSizeError(attribute='username',
size=CONF.max_param_size)
if username:
try:
user_ref = self.identity_api.get_user_by_name(
username, DEFAULT_DOMAIN_ID)
user_id = user_ref['id']
except exception.UserNotFound as e:
raise exception.Unauthorized(e)
try:
user_ref = self.identity_api.authenticate(
user_id=user_id,
password=password)
except AssertionError as e:
raise exception.Unauthorized(e)
metadata_ref = {}
tenant_id = self._get_project_id_from_auth(auth)
tenant_ref, metadata_ref['roles'] = self._get_project_roles_and_ref(
user_id, tenant_id)
expiry = core.default_expire_time()
return (user_ref, tenant_ref, metadata_ref, expiry, None)
在這個方法中, 最開始是對傳入的引數進行有效性檢查。然後使用username獲得使用者資訊:
user_ref = self.identity_api.get_user_by_name(
username, DEFAULT_DOMAIN_ID)
user_id = user_ref['id']
這裡有個問題,就是identity_api並沒有定義。這個先暫時不去考慮,會在下一章中繼續討論這個問題。現在只需要知道這個對應的是identity/core.py中的Manager類就可以。
在獲取到使用者資訊之後,就會使用identity_api.authenticate來校驗使用者名稱和密碼:
user_ref = self.identity_api.authenticate(
user_id=user_id,
password=password)
開啟identity/core.py,然後找到authenticate方法
@domains_configured
def authenticate(self, user_id, password, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
ref = driver.authenticate(user_id, password)
if not driver.is_domain_aware():
ref = self._set_domain_id(ref, domain_id)
return ref
這裡可以看出其繼續使用對應的driver來進行認證。這個driver可以從Keystone.conf檔案中找到。
[identity]
# driver = keystone.identity.backends.sql.Identity
開啟檔案, 找到其對應的方法
def authenticate(self, user_id, password):
session = self.get_session()
user_ref = None
try:
user_ref = self._get_user(session, user_id)
except exception.UserNotFound:
raise AssertionError('Invalid user / password')
if not self._check_password(password, user_ref):
raise AssertionError('Invalid user / password')
return identity.filter_user(user_ref.to_dict())
這裡的session是實際連線Mysql資料庫的session. 使用session取出user表所對應的資訊。然後使用_check_password進行密碼的校驗。在看密碼校驗前,我們先看看user表所對應的結構:
MariaDB [keystone]> desc user;
Field | Type | Null | Key | Default | Extra |
---|---|---|---|---|---|
id | varchar(64) | NO | PRI | NULL | |
name | varchar(255) | NO | NULL | ||
extra | text | YES | NULL | ||
password | varchar(128) | YES | NULL | ||
enabled | tinyint(1) | YES | NULL | ||
domain_id | varchar(64) | NO | MUL | NULL | |
default_project_id | varchar(64) | YES | NULL |
可以看出裡面有使用者名稱及密碼等資訊。我們再回到_check_password
def _check_password(self, password, user_ref):
return utils.check_password(password, user_ref.password)
裡面只有一句,然後繼續轉到utils.check_password
def check_password(password, hashed):
if password is None or hashed is None:
return False
password_utf8 = trunc_password(password).encode('utf-8')
return passlib.hash.sha512_crypt.verify(password_utf8, hashed)
這裡的實現很簡單,先把密碼編碼成utf-8,然後使用passlib.hash.sha512_crypt進行hash加密驗證。這時,如果密碼正確,就會返回真。
然後返回到_authenticate_local,從使用者資訊中再取出一些其它的資訊並返回到上一級
metadata_ref = {}
tenant_id = self._get_project_id_from_auth(auth)
tenant_ref, metadata_ref['roles'] = self._get_project_roles_and_ref(
user_id, tenant_id)
expiry = core.default_expire_time()
return (user_ref, tenant_ref, metadata_ref, expiry, None)
回到keystone.token.controllers.Auth.authenticate
user_ref, tenant_ref, metadata_ref, expiry, bind = auth_info
core.validate_auth_info(self, user_ref, tenant_ref)
# NOTE(morganfainberg): Make sure the data is in correct form since it
# might be consumed external to Keystone and this is a v2.0 controller.
# The user_ref is encoded into the auth_token_data which is returned as
# part of the token data. The token provider doesn't care about the
# format.
user_ref = self.identity_api.v3_to_v2_user(user_ref)
if tenant_ref:
tenant_ref = self.filter_domain_id(tenant_ref)
auth_token_data = self._get_auth_token_data(user_ref,
tenant_ref,
metadata_ref,
expiry)
if tenant_ref:
catalog_ref = self.catalog_api.get_catalog(
user_ref['id'], tenant_ref['id'], metadata_ref)
else:
catalog_ref = {}
auth_token_data['id'] = 'placeholder'
if bind:
auth_token_data['bind'] = bind
roles_ref = []
for role_id in metadata_ref.get('roles', []):
role_ref = self.identity_api.get_role(role_id)
roles_ref.append(dict(name=role_ref['name']))
(token_id, token_data) = self.token_provider_api.issue_v2_token(
auth_token_data, roles_ref=roles_ref, catalog_ref=catalog_ref)
return token_data
從已有的資訊中,生成token資訊,並且返回給上層。最後會在Application的__call__ 方法中呼叫render_response, 把body的結果轉換為json格式,並且返回到client.
def render_response(body=None, status=None, headers=None):
"""Forms a WSGI response."""
headers = headers or []
headers.append(('Vary', 'X-Auth-Token'))
if body is None:
body = ''
status = status or (204, 'No Content')
else:
body = jsonutils.dumps(body, cls=utils.SmarterEncoder)
headers.append(('Content-Type', 'application/json'))
status = status or (200, 'OK')
return webob.Response(body=body,
status='%s %s' % status,
headerlist=headers)
至此, client會得到如第二章一樣的json格式的輸出。