Coverage for keystone.common.controller : 86%
data:image/s3,"s3://crabby-images/c28ee/c28ee9bb428b64c48dda6edbeeaacaae15092402" alt=""
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
'action': action, 'kwargs': ', '.join(['%s=%s' % (k, kwargs[k]) for k in kwargs])})
context=context, token_id=context['token_id'])
#V3 Tokens except AttributeError: LOG.warning(_('RBAC: Invalid user')) raise exception.Unauthorized()
else:
else: #v2 Tokens except AttributeError: LOG.warning(_('RBAC: Invalid user')) raise exception.Unauthorized() # NOTE(vish): this is pretty inefficient for role in creds.get('roles', [])]
"""Flatten a nested dictionary
Converts a dictionary with nested values to a single level flat dictionary, with dotted notation for each key.
""" else:
"""Wraps API calls with role based access controls (RBAC).""" def wrapper(self, context, *args, **kwargs): else: context, kwargs) # Simply use the passed kwargs as the target dict, which # would typically include the prime key of a get/update/delete # call.
"""Wraps filtered API calls with role based access controls (RBAC)."""
def wrapper(self, context, **kwargs): context, kwargs) # Now, build the target dict for policy check. We include: # # - Any query filter parameters # - Data from the main url (which will be in the kwargs # parameter) and would typically include the prime key # of a get/update/delete call # # First any query filter parameters
', '.join(['%s=%s' % (filter, target[filter]) for filter in target])))
# Now any formal url parameters
flatten(target))
else: LOG.warning(_('RBAC: Bypassing authorization'))
'trust_api', 'catalog_api', 'credential_api') """Base controller class for Identity API v2."""
trust_id=trust_id)
#First delete tokens that could get other tokens. user_id, tenant_id=project_id)
#delete tokens generated from trusts self._delete_tokens_for_trust(context, trust['trustee_user_id'], trust['id'])
"""Ensures the reference contains the specified attribute."""
"""Fill in domain_id since v2 calls are not domain-aware.
This will overwrite any domain_id that was inadvertently specified in the v2 call.
"""
"""Remove domain_id since v2 calls are not domain-aware."""
"""Base controller class for Identity API v3.
Child classes should set the ``collection_name`` and ``member_name`` class attributes, representing the collection of entities they are exposing to the API. This is required for supporting self-referential links, pagination, etc.
"""
# allow a missing trailing slash in the config endpoint += '/'
else:
def _add_self_referential_link(cls, ref):
def wrap_member(cls, context, ref):
'next': None, 'self': cls.base_url(path=context['path']), 'previous': None}
def paginate(cls, context, refs): """Paginates a list of references by page & per_page query strings.""" # FIXME(dolph): client needs to support pagination first
page = context['query_string'].get('page', 1) per_page = context['query_string'].get('per_page', 30) return refs[per_page * (page - 1):per_page * page]
def filter_by_attribute(cls, context, refs, attr): """Filters a list of references by query string value."""
"""Matches attributes allowing for booleans as strings.
We test explicitly for a value that defines it as 'False', which also means that the existence of the attribute with no value implies 'True'
""" val_attr == '0'): else: else:
"""Ensures the value matches the reference's ID, if any.""" raise exception.ValidationError('Cannot change ID')
"""Generates and assigns a unique identifer to a reference."""
"""Fill in domain_id if not specified in a v3 call."""
if context['is_admin']: ref['domain_id'] = DEFAULT_DOMAIN_ID else: # Fish the domain_id out of the token # # We could make this more efficient by loading the domain_id # into the context in the wrapper function above (since # this version of normalize_domain will only be called inside # a v3 protected call). However, given that we only use this # for creating entities, this optimization is probably not # worth the duplication of state try: token_ref = self.token_api.get_token( context=context, token_id=context['token_id']) except exception.TokenNotFound: LOG.warning(_('Invalid token in normalize_domain_id')) raise exception.Unauthorized()
if 'domain' in token_ref: ref['domain_id'] = token_ref['domain']['id'] else: # FIXME(henry-nash) Revisit this once v3 token scoping # across domains has been hashed out ref['domain_id'] = DEFAULT_DOMAIN_ID
"""Override v2 filter to let domain_id out for v3 calls.""" return ref |