import uuid
from pyramid.httpexceptions import HTTPFound
from pyramid.security import NO_PERMISSION_REQUIRED
import requests
from ..api import (
AuthenticationComplete,
AuthenticationDenied,
register_provider,
)
from ..exceptions import CSRFError
from ..exceptions import ThirdPartyFailure
from ..settings import ProviderSettings
from ..utils import flat_url
GOOGLE_OAUTH2_DOMAIN = 'accounts.google.com'
[docs]class GoogleAuthenticationComplete(AuthenticationComplete):
"""Google OAuth 2.0 auth complete"""
[docs]def includeme(config):
"""Activate the ``google_oauth2`` Pyramid plugin via
``config.include('velruse.providers.google_oauth2')``. After included,
two new methods will be available to configure new providers.
``config.add_google_oauth2_login()``
See :func:`~velruse.providers.google_oauth2.add_google_login`
for the supported options.
``config.add_google_oauth2_login_from_settings()``
"""
config.add_directive('add_google_oauth2_login', add_google_login)
config.add_directive('add_google_oauth2_login_from_settings',
add_google_login_from_settings)
def add_google_login_from_settings(config, prefix='velruse.google.'):
settings = config.registry.settings
p = ProviderSettings(settings, prefix)
p.update('consumer_key', required=True)
p.update('consumer_secret', required=True)
p.update('scope')
p.update('login_path')
p.update('callback_path')
config.add_google_oauth2_login(**p.kwargs)
[docs]def add_google_login(config,
consumer_key=None,
consumer_secret=None,
scope=None,
login_path='/login/google',
callback_path='/login/google/callback',
name='google'):
"""
Add a Google login provider to the application supporting the new
OAuth2 protocol.
"""
provider = GoogleOAuth2Provider(
name,
consumer_key,
consumer_secret,
scope)
config.add_route(provider.login_route, login_path)
config.add_view(provider, attr='login', route_name=provider.login_route,
permission=NO_PERMISSION_REQUIRED)
config.add_route(provider.callback_route, callback_path,
use_global_views=True,
factory=provider.callback)
register_provider(config, name, provider)
class GoogleOAuth2Provider(object):
profile_scope = 'https://www.googleapis.com/auth/userinfo.profile'
email_scope = 'https://www.googleapis.com/auth/userinfo.email'
def __init__(self,
name,
consumer_key,
consumer_secret,
scope):
self.name = name
self.type = 'google_oauth2'
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.protocol = 'https'
self.domain = GOOGLE_OAUTH2_DOMAIN
self.login_route = 'velruse.%s-login' % name
self.callback_route = 'velruse.%s-callback' % name
self.scope = scope
if not self.scope:
self.scope = ' '.join((self.profile_scope, self.email_scope))
def login(self, request):
"""Initiate a google login"""
scope = ' '.join(request.POST.getall('scope')) or self.scope
request.session['velruse.state'] = state = uuid.uuid4().hex
approval_prompt = request.POST.get('approval_prompt', 'auto')
auth_url = flat_url(
'%s://%s/o/oauth2/auth' % (self.protocol, self.domain),
scope=scope,
response_type='code',
client_id=self.consumer_key,
redirect_uri=request.route_url(self.callback_route),
approval_prompt=approval_prompt,
access_type='offline',
state=state)
return HTTPFound(location=auth_url)
def callback(self, request):
"""Process the google redirect"""
sess_state = request.session.pop('velruse.state', None)
req_state = request.GET.get('state')
if not sess_state or sess_state != req_state:
raise CSRFError(
'CSRF Validation check failed. Request state {req_state} is '
'not the same as session state {sess_state}'.format(
req_state=req_state,
sess_state=sess_state
)
)
code = request.GET.get('code')
if not code:
reason = request.GET.get('error', 'No reason provided.')
return AuthenticationDenied(reason=reason,
provider_name=self.name,
provider_type=self.type)
# Now retrieve the access token with the code
r = requests.post(
'%s://%s/o/oauth2/token' % (self.protocol, self.domain),
dict(client_id=self.consumer_key,
client_secret=self.consumer_secret,
redirect_uri=request.route_url(self.callback_route),
code=code,
grant_type='authorization_code'),
)
if r.status_code != 200:
raise ThirdPartyFailure("Status %s: %s" % (
r.status_code, r.content))
token_data = r.json()
access_token = token_data['access_token']
refresh_token = token_data.get('refresh_token')
# Retrieve profile data if scopes allow
profile = {}
user_url = flat_url(
'%s://www.googleapis.com/oauth2/v1/userinfo' % self.protocol,
access_token=access_token)
r = requests.get(user_url)
if r.status_code == 200:
data = r.json()
profile['accounts'] = [{
'domain': self.domain,
'username': data['email'],
'userid': data['id']
}]
if 'name' in data:
profile['displayName'] = data['name']
else:
profile['displayName'] = data['email']
profile['preferredUsername'] = data['email']
profile['verifiedEmail'] = data['email']
profile['emails'] = [{'value': data['email']}]
cred = {'oauthAccessToken': access_token,
'oauthRefreshToken': refresh_token}
return GoogleAuthenticationComplete(profile=profile,
credentials=cred,
provider_name=self.name,
provider_type=self.type)