Search code examples
pythonoauth-2.0pyside2spotipy

Spotipy OAuth2 Access Token without pasting URL to console


I am currently working on a Python Spotify Managing Application. I am using PySide2 (Qt) to create the GUI. Qt has a Browser feature to visit websites. I am using this code to authenticate:

import spotipy
from spotipy.oauth2 import SpotifyOAuth

auth = SpotifyOAuth(scope=scope, cache_path='user_cache',
                    client_id=client_id, client_secret=client_secret,
                    redirect_uri=redirect_uri)

sp = spotipy.Spotify(auth_manager=auth)

print(auth.get_auth_response())

When I run this code it opens a browser window in Chrome and asks me to login to my Spotify account. Then it redirects me to my redirect_uri. I have to paste this link into the console.

Console Output

My problem is that I don't want to paste the URI into the console. I want the App to get the url from inside the PySide2 (Qt) browser (I know how to get the current link and so on) and paste it to the console automatically.

My questions are:

  1. Does Spotipy have a feature to make an OAuth2 without pasting the link to the console? I want to bypass the input and pass the redirect link directly to spotipy.

  2. Is it possible to select the browser it opens manually?

I'd like to do it without Flask, just PySide2 (PyQt, Qt, etc.) Best case: Just get the token from the query and use it for api requests


Solution

  • A possible solution is to implement a new scheme in Qt WebEngine where the request is redirected.

    On the other hand Spotipy uses Requests making the requests block the eventloop causing the GUI to freeze so I have modified the requests making them asynchronous.

    from functools import cached_property, partial
    import threading
    import types
    
    import spotipy
    from spotipy.oauth2 import SpotifyOAuth, SpotifyClientCredentials
    
    from PySide2 import QtCore, QtWidgets, QtWebEngineCore, QtWebEngineWidgets
    
    
    class ReplySpotify(QtCore.QObject):
        finished = QtCore.Signal()
    
        def __init__(self, func, args=(), kwargs=None, parent=None):
            super().__init__(parent)
            self._results = None
            self._is_finished = False
            self._error_str = ""
            threading.Thread(
                target=self._execute, args=(func, args, kwargs), daemon=True
            ).start()
    
        @property
        def results(self):
            return self._results
    
        @property
        def error_str(self):
            return self._error_str
    
        def is_finished(self):
            return self._is_finished
    
        def has_error(self):
            return bool(self._error_str)
    
        def _execute(self, func, args, kwargs):
            if kwargs is None:
                kwargs = {}
            try:
                self._results = func(*args, **kwargs)
            except Exception as e:
                self._error_str = str(e)
            self._is_finished = True
            self.finished.emit()
    
    
    def convert_to_reply(func, *args, **kwargs):
        reply = ReplySpotify(func, args, kwargs)
        return reply
    
    
    class ConvertToReply(type):
        def __call__(cls, *args, **kw):
            klass = super().__call__(*args, **kw)
            for key in dir(klass):
                value = getattr(klass, key)
                if isinstance(value, types.MethodType) and not key.startswith("_"):
                    wrapped = partial(convert_to_reply, value)
                    setattr(klass, key, wrapped)
            return klass
    
    
    class QSpotify(spotipy.Spotify, metaclass=ConvertToReply):
        pass
    
    
    class QOauthHandler(QtWebEngineCore.QWebEngineUrlSchemeHandler):
        authenticated = QtCore.Signal(str, dict)
    
        def __init__(self, parent=None):
            super().__init__(parent)
            self._html = ""
    
        @property
        def html(self):
            return self._html
    
        @html.setter
        def html(self, html):
            self._html = html
    
        def requestStarted(self, request):
            request_url = request.requestUrl()
            if request_url.host() == "oauth":
                query = QtCore.QUrlQuery(request_url.query())
                d = dict()
                for k, v in query.queryItems():
                    d[k] = v
                self.authenticated.emit(request_url.path(), d)
    
                buf = QtCore.QBuffer(parent=self)
                request.destroyed.connect(buf.deleteLater)
                buf.open(QtCore.QIODevice.WriteOnly)
                buf.write(self.html.encode())
                buf.seek(0)
                buf.close()
                request.reply(b"text/html", buf)
    
    
    class QSpotifyOAuth(QtCore.QObject, SpotifyOAuth):
        authenticationRequired = QtCore.Signal(QtCore.QUrl)
        codeChanged = QtCore.Signal()
    
        def __init__(
            self,
            client_id=None,
            client_secret=None,
            redirect_uri=None,
            state=None,
            scope=None,
            cache_path=None,
            username=None,
            proxies=None,
            show_dialog=False,
            requests_session=True,
            requests_timeout=None,
            parent=None,
        ):
            QtCore.QObject.__init__(self, parent=None)
            SpotifyOAuth.__init__(
                self,
                client_id,
                client_secret,
                redirect_uri,
                state,
                scope,
                cache_path,
                username,
                proxies,
                show_dialog,
                requests_session,
                requests_timeout,
            )
            self._code = ""
    
        def get_auth_response(self, state=None):
            url = QtCore.QUrl.fromUserInput(self.get_authorize_url())
            self.authenticationRequired.emit(url)
            loop = QtCore.QEventLoop()
            self.codeChanged.connect(loop.quit)
            loop.exec_()
            if state is None:
                state = self.state
            return state, self.code
    
        @property
        def code(self):
            return self._code
    
        def autenticate(self, values):
            self._code = values.get("code", "")
            self.codeChanged.emit()
    
    
    class Widget(QtWidgets.QWidget):
        def __init__(self, parent=None):
            super().__init__(parent)
    
            lay = QtWidgets.QHBoxLayout(self)
            lay.addWidget(self.view, stretch=1)
            lay.addWidget(self.log, stretch=1)
    
            self.view.hide()
    
            client_id = ""
            client_secret = ""
            self.qauth = QSpotifyOAuth(
                cache_path="user_cache",
                client_id=client_id,
                client_secret=client_secret,
                redirect_uri="qt://oauth/spotify",
                scope="user-library-read",
            )
            self.qclient = QSpotify(auth_manager=self.qauth)
            self.qauth.authenticationRequired.connect(self.view.load)
            self.qauth.authenticationRequired.connect(self.view.show)
    
            reply = self.qclient.current_user_saved_tracks()
            reply.setParent(self)
            reply.finished.connect(partial(self.on_finished, reply))
    
        @cached_property
        def view(self):
            return QtWebEngineWidgets.QWebEngineView()
    
        @cached_property
        def log(self):
            return QtWidgets.QTextEdit(readOnly=True)
    
        def handle(self, path, values):
            self.qauth.autenticate(values)
            self.view.hide()
    
        def on_finished(self, reply):
            reply.deleteLater()
            for item in reply.results["items"]:
                track = item["track"]
                text = "<b>%s</b> %s" % (track["artists"][0]["name"], track["name"])
                self.log.append(text)
    
            if reply.results["items"]:
                new_reply = self.qclient.next(reply.results)
                new_reply.setParent(self)
                new_reply.finished.connect(partial(self.on_finished, new_reply))
    
    
    def main():
        import sys
    
        scheme = QtWebEngineCore.QWebEngineUrlScheme(b"qt")
        QtWebEngineCore.QWebEngineUrlScheme.registerScheme(scheme)
    
        app = QtWidgets.QApplication(sys.argv)
        QtCore.QCoreApplication.setOrganizationName("qtspotify")
        QtCore.QCoreApplication.setApplicationName("Qt Spotify")
    
        handler = QOauthHandler()
    
        profile = QtWebEngineWidgets.QWebEngineProfile.defaultProfile()
        """profile.setPersistentCookiesPolicy(
            QtWebEngineWidgets.QWebEngineProfile.NoPersistentCookies
        )"""
        profile.installUrlSchemeHandler(b"qt", handler)
    
        w = Widget()
        w.resize(640, 480)
        w.show()
    
        handler.authenticated.connect(w.handle)
    
        sys.exit(app.exec_())
    
    
    if __name__ == "__main__":
        main()
    

    Note: You must add the url "qt://oauth/spotify" to the project settings in the dashboard:

    enter image description here