keystone獲取token程式碼分析
阿新 • • 發佈:2019-01-28
1. 入口 keystone->auth->controllers->authenticate_for_token
1.1 url:/auth/tokens
def authenticate_for_token(self, request, auth=None)://auth是body體內容
"""Authenticate user and issue a token."""
include_catalog = 'nocatalog' not in request.params//如果不明確指定,則包含catalog
//request.params是url字尾的內容
*validate_issue_token_auth(auth)*//驗證body體引數的合法性、有效性。
try :
auth_info = core.AuthInfo.create(auth=auth)
auth_context = core.AuthContext(extras={},
method_names=[],
bind={})
self.authenticate(request, auth_info, auth_context)
if auth_context.get('access_token_id'):
auth_info.set_scope(None, auth_context['project_id'], None)
self._check_and_set_default_scoping(auth_info, auth_context)
(domain_id, project_id, trust, unscoped) = auth_info.get_scope()
# NOTE(notmorgan): only methods that actually run and succeed will
# be in the auth_context['method_names'] list. Do not blindly take
# the values from auth_info, look at the authoritative values. Make
# sure the set is unique.
method_names_set = set(auth_context.get('method_names', []))
method_names = list(method_names_set)
# Do MFA Rule Validation for the user
if not self._mfa_rules_validator.check_auth_methods_against_rules(
auth_context['user_id'], method_names_set):
raise exception.InsufficientAuthMethods(
user_id=auth_context['user_id'],
methods='[%s]' % ','.join(auth_info.get_method_names()))
expires_at = auth_context.get('expires_at')
token_audit_id = auth_context.get('audit_id')
is_domain = auth_context.get('is_domain')
(token_id, token_data) = self.token_provider_api.issue_token(
auth_context['user_id'], method_names, expires_at, project_id,
is_domain, domain_id, auth_context, trust, include_catalog,
parent_audit_id=token_audit_id)//獲取token
# NOTE(wanghong): We consume a trust use only when we are using
# trusts and have successfully issued a token.
if trust:
self.trust_api.consume_use(trust['id'])
return render_token_data_response(token_id, token_data,
created=True)
except exception.TrustNotFound as e:
LOG.warning(six.text_type(e))
raise exception.Unauthorized(e)
2. validate_issue_token_auth
2.1 驗證body體引數的合法性、有效性。
def validate_issue_token_auth(auth=None):
if auth is None:
return
validation.lazy_validate(schema.token_issue, auth)//基本的schema語法校驗
user = auth['identity'].get('password', {}).get('user')//獲取body體中user資訊。
if user is not None:
if 'id' not in user and 'name' not in user://user中'id'、'name'至少包含一個
msg = _('Invalid input for field identity/password/user: '
'id or name must be present.')
raise exception.SchemaValidationError(detail=msg)
domain = user.get('domain')
if domain is not None:
if 'id' not in domain and 'name' not in domain:
msg = _(
'Invalid input for field identity/password/user/domain: '
'id or name must be present.')
raise exception.SchemaValidationError(detail=msg)
scope = auth.get('scope')
if scope is not None and isinstance(scope, dict):
project = scope.get('project')
if project is not None:
if 'id' not in project and 'name' not in project:
msg = _(
'Invalid input for field scope/project: '
'id or name must be present.')
raise exception.SchemaValidationError(detail=msg)
domain = project.get('domain')
if domain is not None:
if 'id' not in domain and 'name' not in domain:
msg = _(
'Invalid input for field scope/project/domain: '
'id or name must be present.')
raise exception.SchemaValidationError(detail=msg)
domain = scope.get('domain')
if domain is not None:
if 'id' not in domain and 'name' not in domain:
msg = _(
'Invalid input for field scope/domain: '
'id or name must be present.')
raise exception.SchemaValidationError(detail=msg)
3. auth_info = core.AuthInfo.create(auth=auth)
3.1 構造AuthInfo物件auth_info
3.2 初始化auth成員變數為auth
3.3 初始化_scope_data成員變數為(None, None, None, None)
3.4 返回auth_info(需要認證的資料)
def create(auth=None, scope_only=False):
auth_info = AuthInfo(auth=auth)// 構造AuthInfo物件auth_info,引數為傳入的auth
auth_info._validate_and_normalize_auth_data(scope_only)//標準化auth資料
return auth_info
def _validate_and_normalize_auth_data(self, scope_only=False):
if not self.auth:
raise exception.ValidationError(attribute='auth',
target='request body')
if scope_only is False://此時傳入的為False, 如果為True則表示僅僅驗證scope
self._validate_auth_methods()//保證傳入的所有方法都已經設定到auth_info物件中,且方法都是支援的。
self._validate_and_normalize_scope_data()//驗證scope資訊有效性(必須只能scope一個,project/domain都啟用
def _validate_auth_methods(self):
# make sure all the method data/payload are provided//保證所有傳入方法都已經提供
for method_name in self.get_method_names()://self.get_method_name獲取body體中去重後的method方法陣列
if method_name not in self.auth['identity']:
raise exception.ValidationError(attribute=method_name,
target='identity')
# make sure auth method is supported
for method_name in self.get_method_names():
if method_name not in AUTH_METHODS://AUTH_METHODS為dict{'password':<key...Password,'token'.}
//AUTH_METHODS是當前支援的方法列表
raise exception.AuthMethodNotSupported()
def _validate_and_normalize_scope_data(self):
"""Validate and normalize scope data."""
if 'scope' not in self.auth://scope為空,則不需要驗證,返回
return
if sum(['project' in self.auth['scope'],
'domain' in self.auth['scope'],
'unscoped' in self.auth['scope'],
'OS-TRUST:trust' in self.auth['scope']]) != 1://sum[True,True,Flase] == 2, 在這裡如果不等於
//1就是有問題的,為啥?scope有且僅能scope一種.
raise exception.ValidationError(
attribute='project, domain, OS-TRUST:trust or unscoped',
target='scope')
if 'unscoped' in self.auth['scope']:
self._scope_data = (None, None, None, 'unscoped')//表示未scope
return
if 'project' in self.auth['scope']:
project_ref = self._lookup_project(self.auth['scope']['project'])//查詢project資訊
self._scope_data = (None, project_ref['id'], None, None)//儲存scope資訊
elif 'domain' in self.auth['scope']:
domain_ref = self._lookup_domain(self.auth['scope']['domain'])//查詢domain資訊
self._scope_data = (domain_ref['id'], None, None, None)//儲存scope資訊
elif 'OS-TRUST:trust' in self.auth['scope']:
if not CONF.trust.enabled://查詢配置檔案,是否開啟trust
raise exception.Forbidden('Trusts are disabled.')
trust_ref = self._lookup_trust(
self.auth['scope']['OS-TRUST:trust'])
# TODO(ayoung): when trusts support domains, fill in domain data
if trust_ref.get('project_id') is not None:
project_ref = self._lookup_project(
{'id': trust_ref['project_id']})
self._scope_data = (None, project_ref['id'], trust_ref, None)
else:
self._scope_data = (None, None, trust_ref, None)
def _lookup_project(self, project_info):
project_id = project_info.get('id')//從
project_name = project_info.get('name')
try:
if project_name:
if (CONF.resource.project_name_url_safe == 'strict' and//配置檔案中project_name_url_safe
utils.is_not_url_safe(project_name)):
msg = _('Project name cannot contain reserved characters.')
LOG.warning(msg)
raise exception.Unauthorized(message=msg)
if 'domain' not in project_info://project_info中,必須包含domain欄位
raise exception.ValidationError(attribute='domain',
target='project')
domain_ref = self._lookup_domain(project_info['domain'])//獲取domain資訊
project_ref = self.resource_api.get_project_by_name(
project_name, domain_ref['id'])//根據project名字,domainId獲取project資訊
else:
project_ref = self.resource_api.get_project(project_id)
# NOTE(morganfainberg): The _lookup_domain method will raise
# exception.Unauthorized if the domain isn't found or is
# disabled.
self._lookup_domain({'id': project_ref['domain_id']})
except exception.ProjectNotFound as e:
LOG.warning(six.text_type(e))
raise exception.Unauthorized(e)
self._assert_project_is_enabled(project_ref)//判斷project是否啟用
return project_ref //返回專案資訊
4.auth_context = core.AuthContext(extras={},method_names=[],bind={})
4.1. 獲取認證的上下文資訊
4.2. 在這裡當前值為auth_context是字典, {'bind':{},' extra':{},' method_names': []}
class AuthContext(dict):
5.self.authenticate(request, auth_info, auth_context)
5.1 在這裡就要開始進行認證(核心之一)
def authenticate(self, request, auth_info, auth_context):
if not isinstance(auth_context, core.AuthContext):
LOG.error(
'`auth_context` passed to the Auth controller '
'`authenticate` method is not of type '
'`keystone.auth.controllers.AuthContext`. For security '
'purposes this is required. This is likely a programming '
'error. Received object of type `%s`', type(auth_context))
raise exception.Unauthorized(
_('Cannot Authenticate due to internal error.'))
# The 'external' method allows any 'REMOTE_USER' based authentication
# In some cases the server can set REMOTE_USER as '' instead of
# dropping it, so this must be filtered out
if request.remote_user://是否是remote使用者
try:
external = core.get_auth_method('external')
resp = external.authenticate(request,
auth_info)
if resp and resp.status:
# NOTE(notmorgan): ``external`` plugin cannot be multi-step
# it is either a plain success/fail.
auth_context.setdefault(
'method_names', []).insert(0, 'external')
# NOTE(notmorgan): All updates to auth_context is handled
# here in the .authenticate method.
auth_context.update(resp.response_data or {})
except exception.AuthMethodNotSupported:
# This will happen there is no 'external' plugin registered
# and the container is performing authentication.
# The 'kerberos' and 'saml' methods will be used this way.
# In those cases, it is correct to not register an
# 'external' plugin; if there is both an 'external' and a
# 'kerberos' plugin, it would run the check on identity twice.
LOG.debug("No 'external' plugin is registered.")
except exception.Unauthorized:
# If external fails then continue and attempt to determine
# user identity using remaining auth methods
LOG.debug("Authorization failed for 'external' auth method.")
# need to aggregate the results in case two or more methods
# are specified
auth_response = {'methods': []}//返回物件
for method_name in auth_info.get_method_names()://認證方法 'password'
method = core.get_auth_method(method_name)//獲得keystone.auth.plugins.passowrd.Password物件
resp = method.authenticate(request,
auth_info.get_method_data(method_name))//呼叫Password中的鑑權
if resp:
if resp.status://如果返回狀態
auth_context.setdefault(
'method_names', []).insert(0, method_name)//在auth_context中設定方法名
//此時為{...,' method_names': ['password']}
# NOTE(notmorgan): All updates to auth_context is handled
# here in the .authenticate method. If the auth attempt was
# not successful do not update the auth_context
resp_method_names = resp.response_data.pop(
'method_names', [])//list:[]
auth_context['method_names'].extend(resp_method_names)
auth_context.update(resp.response_data or {})//response_data是userId, 所以
//auth_context中新增user_id
elif resp.response_body://如果返回body(查詢方法??)
auth_response['methods'].append(method_name)
auth_response[method_name] = resp.response_body
if auth_response["methods"]:
# authentication continuation required
raise exception.AdditionalAuthRequired(auth_response)
if 'user_id' not in auth_context:
msg = _('User not found by auth plugin; authentication failed')
LOG.warning(msg)
raise exception.Unauthorized(msg)
def authenticate(self, request, auth_payload):
"""Try to authenticate against the identity backend."""
response_data = {}
user_info = auth_plugins.UserAuthInfo.create(auth_payload, METHOD_NAME)//根據auth_payload(user資訊:domain、name、password)及方法名(此處是password)構造使用者資訊。
try:
self.identity_api.authenticate(
request,
user_id=user_info.user_id,
password=user_info.password)//最終呼叫keystone->identity->core中的authenticate
except AssertionError:
# authentication failed because of invalid username or password
msg = _('Invalid username or password')
raise exception.Unauthorized(msg)
response_data['user_id'] = user_info.user_id
return base.AuthHandlerResponse(status=True, response_body=None,
response_data=response_data)
def authenticate(self, request, user_id, password):
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(user_id))
ref = driver.authenticate(entity_id, password)//通過driver呼叫keystone->backends->sql.py
ref = self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.USER)
ref = self._shadow_nonlocal_user(ref)
self.shadow_users_api.set_last_active_at(ref['id'])
return ref
keystone->identity->backends->sql.py
def authenticate(self, user_id, password):
with sql.session_for_read() as session:
try:
user_ref = self._get_user(session, user_id)//通過id查詢使用者資訊
user_dict = base.filter_user(user_ref.to_dict())
except exception.UserNotFound:
raise AssertionError(_('Invalid user / password'))
if self._is_account_locked(user_id, user_ref)://判斷使用者是否鎖定
raise exception.AccountLocked(user_id=user_id)
elif not self._check_password(password, user_ref)://判斷密碼是否正確
self._record_failed_auth(user_id)
raise AssertionError(_('Invalid user / password'))
elif not user_ref.enabled://使用者是否啟用
raise exception.UserDisabled(user_id=user_id)
elif user_ref.password_is_expired://密碼是否過期
raise exception.PasswordExpired(user_id=user_id)
# successful auth, reset failed count if present
if user_ref.local_user.failed_auth_count://如果failed_auth_count不為0,則重置使用者鑑權失敗次數為0,鑑權失敗時間為空。
self._reset_failed_auth(user_id)
return user_dict
(token_id, token_data) = self.token_provider_api.issue_token
keystone->token_provider.py
def issue_token(self, user_id, method_names, expires_at=None,
project_id=None, is_domain=False, domain_id=None,
auth_context=None, trust=None, include_catalog=True,
parent_audit_id=None):
token_id, token_data = self.driver.issue_token(
user_id, method_names, expires_at, project_id, domain_id,
auth_context, trust, include_catalog, parent_audit_id)//呼叫token->providers->fernet->core.py
if self._needs_persistence://是否需要持久化
data = dict(key=token_id,
id=token_id,
expires=token_data['token']['expires_at'],
user=token_data['token']['user'],
tenant=token_data['token'].get('project'),
is_domain=is_domain,
token_data=token_data,
trust_id=trust['id'] if trust else None,
token_version=self.V3)
self._create_token(token_id, data)
if CONF.token.cache_on_issue:
# NOTE(amakarov): here and above TOKENS_REGION is to be passed
# to serve as required positional "self" argument. It's ignored,
# so I've put it here for convenience - any placeholder is fine.
self._validate_token.set(token_data, TOKENS_REGION, token_id)
return token_id, token_data
def issue_token(self, *args, **kwargs):
token_id, token_data = super(Provider, self).issue_token(
*args, **kwargs)
self._build_issued_at_info(token_id, token_data)
return token_id, token_data
keystone->token->providers->common.py
def issue_token(self, user_id, method_names, expires_at=None,
project_id=None, domain_id=None, auth_context=None,
trust=None, include_catalog=True,
parent_audit_id=None):
if auth_context and auth_context.get('bind'):
# NOTE(lbragstad): Check if the token provider being used actually
# supports bind authentication methods before proceeding.
if not self._supports_bind_authentication:
raise exception.NotImplemented(_(
'The configured token provider does not support bind '
'authentication.'))
if CONF.trust.enabled and trust:
if user_id != trust['trustee_user_id']:
raise exception.Forbidden(_('User is not a trustee.'))
token_ref = None
if auth_context and self._is_mapped_token(auth_context):
token_ref = self._handle_mapped_tokens(
auth_context, project_id, domain_id)
access_token = None
if 'oauth1' in method_names:
access_token_id = auth_context['access_token_id']
access_token = self.oauth_api.get_access_token(access_token_id)
token_data = self.v3_token_data_helper.get_token_data(//構造token data
user_id,
method_names,
domain_id=domain_id,
project_id=project_id,
expires=expires_at,
trust=trust,
bind=auth_context.get('bind') if auth_context else None,
token=token_ref,
include_catalog=include_catalog,
access_token=access_token,
audit_info=parent_audit_id)
token_id = self._get_token_id(token_data)
return token_id, token_data
keystone->token->providers->common.py //在這裡組裝token_data
//token_data包括audit_ids、catalog、expires_at、is_domain、issued_at、methods、projects、roles、users
def get_token_data(self, user_id, method_names, domain_id=None,
project_id=None, expires=None, trust=None, token=None,
include_catalog=True, bind=None, access_token=None,
issued_at=None, audit_info=None):
token_data = {'methods': method_names}
# We've probably already written these to the token
if token:
for x in ('roles', 'user', 'catalog', 'project', 'domain'):
if x in token:
token_data[x] = token[x]
if bind:
token_data['bind'] = bind
self._populate_scope(token_data, domain_id, project_id)
if token_data.get('project'):
self._populate_is_admin_project(token_data)
self._populate_user(token_data, user_id, trust)
self._populate_roles(token_data, user_id, domain_id, project_id, trust,
access_token)
self._populate_audit_info(token_data, audit_info)
if include_catalog:
self._populate_service_catalog(token_data, user_id, domain_id,
project_id, trust)
self._populate_service_providers(token_data)
self._populate_token_dates(token_data, expires=expires,
issued_at=issued_at)
self._populate_oauth_section(token_data, access_token)
return {'token': token_data}
keystone->token->providers->fernet->core.py //獲取token id.
def _get_token_id(self, token_data):
"""Generate the token_id based upon the data in token_data.
:param token_data: token information
:type token_data: dict
:rtype: six.text_type
"""
user_id = token_data['token']['user']['id']
expires_at = token_data['token']['expires_at']
audit_ids = token_data['token']['audit_ids']
methods = token_data['token'].get('methods')
domain_id = token_data['token'].get('domain', {}).get('id')
project_id = token_data['token'].get('project', {}).get('id')
trust_id = token_data['token'].get('OS-TRUST:trust', {}).get('id')
access_token_id = token_data['token'].get('OS-OAUTH1', {}).get(
'access_token_id')
federated_info = self._build_federated_info(token_data)
return self.token_formatter.create_token(
user_id,
expires_at,
audit_ids,
methods=methods,
domain_id=domain_id,
project_id=project_id,
trust_id=trust_id,
federated_info=federated_info,
access_token_id=access_token_id
)
keystone->token->providers->fernet->token_formatter.py
//建立token id
def create_token(self, user_id, expires_at, audit_ids, methods=None,
domain_id=None, project_id=None, trust_id=None,
federated_info=None, access_token_id=None):
"""Given a set of payload attributes, generate a Fernet token."""
for payload_class in PAYLOAD_CLASSES:
if payload_class.create_arguments_apply(
project_id=project_id, domain_id=domain_id,
trust_id=trust_id, federated_info=federated_info,
access_token_id=access_token_id):
break
version = payload_class.version
payload = payload_class.assemble(
user_id, methods, project_id, domain_id, expires_at, audit_ids,
trust_id, federated_info, access_token_id
)
versioned_payload = (version,) + payload
serialized_payload = msgpack.packb(versioned_payload)
token = self.pack(serialized_payload)
# NOTE(lbragstad): We should warn against Fernet tokens that are over
# 255 characters in length. This is mostly due to persisting the tokens
# in a backend store of some kind that might have a limit of 255
# characters. Even though Keystone isn't storing a Fernet token
# anywhere, we can't say it isn't being stored somewhere else with
# those kind of backend constraints.
if len(token) > 255:
LOG.info('Fernet token created with length of %d '
'characters, which exceeds 255 characters',
len(token))
return token