Coverage for patatmo/api/authentication.py : 99%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
#!/usr/bin/env python3
# constants "read_tokens": 0, "write_tokens": 0, "update_tokens": 0, "change_tmpfile": 0, "request_tokens": 0, "refresh_tokens": 0, }
"password": "", "username": "", "client_id": "", "client_secret": "" }
"grant_type": "password", "password": "", "username": "", "client_id": "", "client_secret": "", }
"grant_type": "refresh_token", "client_id": "", "client_secret": "", "refresh_token": "", }
""" Netatmo api Oauth2 authentication client
Args: credentials (dict of str, optional): developer account credentials. Required keys: username, password, cliend_id, client_secret tokens (dict of str, optional): the access and refresh token. Required keys: access_token, refresh_token tmpfile (str, optional): temporary file to use for tokens """
credentials=EMPTY_CREDENTIALS, tokens=EMPTY_TOKENS, tmpfile=None ):
# only set the (empty) tokens if you didn't already read them from file
################## ### Properties ### ################## def logger(self): """ the logging.Logger used for logging. Defaults to logging.getLogger(__name__). """
def logger(self, logger): # pragma: no cover assert isinstance(logger, logging.Logger), \ "logger property has to be a logging.Logger" self._logger = logger
def credentials(self): """ dict of str: The developer account credentials
If you set this property, it is checked for valid content first. """ except AttributeError: # pragma: no cover return EMPTY_CREDENTIALS
def credentials(self, value): # check if all needed credentials are there "credentials '{}' not defined in new credentials".format(key) # set new credentials
def tokens(self): """ dict of str: The OAuth2 tokens
If you set this property, it is checked for valid content first. Also, automatic saving to the tmpfile occurs if necessary. """ # if there are no tokens yet, set them to empty tokens
"tokens getter: to prevent recursion, don't make " "sure tokens are up to date.") else: "tokens are up to date.")
def tokens(self, value): # set the new values
# update the update time
# do tmpfile IO else: "Using empty default.")
def token_getter_in_progress(self): """ This is an internal property to prevent recursion in the tokens getter. It is always boolean. When you set it, it is converted to bool. """ else:
def token_getter_in_progress(self, value):
def tmpfile(self): """ str: The temporary file for the tokens.
If you set this property to something DIFFERENT than before, an attempt to read tokens from the tmpfile is made. If that fails, the current tokens are written to the tmpfile. """
def tmpfile(self, value): # set the value "Request to set the tmpfile to the same value of '{}'." " Not doing anything.").format(value))
oldtmpfile, value))
# update the tmpfile change time
# do tmpfile IO
### property checkers ### def tokens_defined(self): """ check if the 'tokens' property contains non-empty tokens
Returns: bool: True if tokens are complete, False otherwise """
def credentials_defined(self): """ check if the 'credentials' property contains full credentials
Returns: bool: True if credentials are complete, False otherwise """
def last_request_time(self): """ [read only] The UNIX timestamp of the last new token request """ self.tokens.get("token_request_time", # try saved time self.token_times.get("request_tokens", # try internal time 0 # use default ))
def last_refresh_time(self): """ [read only] The UNIX timestamp of the last new token request """ self.tokens.get("token_refresh_time", # try saved time self.token_times.get("refresh_tokens", # try internal time 0 # use default ))
def expire_time(self): """ [read only] The token expire time """ self.tokens.get("expires_in", # else, try expires_in DEFAULT_EXPIRE_TIME # else, use default ))
def tokens_are_up_to_date(self): """ check if the tokens are up to date
Returns: bool: True if the tokens are still valid, False if they need to be updated. """ # if the validity time span has been expired self.last_request_time + self.expire_time < now: else: # not expired
### the specific times ### def token_times(self): """ Internal time counter for token-related actions
Defaults to all times equal to 0. """
def last_token_action(self): """ Determine the last token-related action from token_times property
Returns the name of the most recent token-related action and None if no action has happened yet. """ # if nothing was done, return None else: # return the most recent action self.token_times, # the token_times key=self.token_times.get, # according to the times reverse=True # in reversed order (newest first) )[0] # and return the newest element action))
### context managers ### def no_token_getter_recursion(self): finally: "recursion-preventer: resetting variable to {}".format( old_token_getter_in_progress))
############### ### Methods ### ############### """ Check given tokens for completeness and syntax
Args: tokens (dict of str): The tokens to check
Returns: bool: True if tokens are complete, False otherwise """ # check type # check if all needed credentials are there
""" update the internal time for the token-related action 'action'
Args: action (str): the token-related action to update the time for """ if action not in EMPTY_TIMES.keys(): # pragma: no cover raise KeyError # update the tmpfile change time
""" Read or write the tmpfile dependant on what happend last Dependant on the last_token_action property, determine read or write the tmpfile. """ "change_tmpfile": self.read_tokens_from_tmpfile, "update_tokens": self.write_tokens_to_tmpfile, "read_tokens": utils.nothing, "write_tokens": utils.nothing, "request_tokens": utils.nothing, "refresh_tokens": utils.nothing, }
# get the appropriate action action.__name__))
# do it!
# if reading from tmpfile didn't work out, write it
""" Read tokens from the tmpfile The time for the 'read_tokens'-action is updated on success.
Returns: bool: True if something was actually read from the file into the tokens property, False otherwise. """ self.tmpfile)) # read # check self.tmpfile)) else: self.tmpfile))
""" Write the current tokens to the tmpfile The time for the 'write_tokens'-action is updated on success.
Returns: bool: True if something was actually written to file, False otherwise. """ self.tmpfile)) # try to write self.tmpfile)) else: "could not write tokens to tmpfile '{}'".format( self.tmpfile))
""" POST a token request to the api OAuth2 server to get NEW tokens """ # check for completeness "Cannot make a token request with incomplete credentials"
### create the request ### # the request # the response
# check for errors if error: # pragma: no cover raise API_ERRORS.get(error, ApiResponseError(error))
# update the time
""" POST a refresh request to the api OAuth2 server to refresh tokens """ "Cannot refresh the tokens with incomplete tokens"
### create the payload ### # add refresh token "client_id", "client_secret"): # add credentials to payload "token refresh request payload: {}".format(payload))
### create the request ### # the request # the response
# check for errors raise API_ERRORS.get(error, ApiResponseError(error))
# update the time
# """ Make sure the tokens are up to date (if possible) # """ "No needed to update anything.") else: "Now let's check if we have credentials.") " to post requests.") self.logger.debug("We already have tokens, let's " "refresh them.") self.refresh_current_tokens() # refresh them else: # no tokens yet "request new ones.") else: "credentials are not defined. I can't do " "anything to make sure the tokens are up to date...")
def __repr__(self): # pragma: no cover """ python representation of this object """ # self.logger.debug("__repr__ called") with self.no_token_getter_recursion(): reprstring = ( "{classname}(\n" "credentials = {credentials},\n" "tokens = {tokens},\n" "tmpfile = {tmpfile}\n" ")").format( classname="{module}.{name}".format( name=self.__class__.__name__, module=self.__class__.__module__), credentials=json.dumps( self.credentials, sort_keys=True, indent=8), tokens=json.dumps( self.tokens, sort_keys=True, indent=8), tmpfile=self.tmpfile if not isinstance( self.tmpfile, str) else '"{}"'.format( self.tmpfile)) return reprstring |