classOAuthDB(DBBase):# pylint: disable=abstract-method''' OAuthDB Collection '''def__init__(self)->None:super().__init__('oauth')defindex(self)->None:''' To make collection's index Indexs: - `owner` '''self.create_index([('owner',1),])defadd_data(self,mail:str,data:dict[str,Any])->dict[str,Any]:''' Add user data Args: mail (str): Email address as unique key to save. data (dict): The data from Google OAuth return. Returns: Return the inserted / updated data. '''returnself.find_one_and_update({'_id':mail},{'$set':{'data':data}},upsert=True)defadd_token(self,mail:str,credentials:Credentials)->None:''' Add user oauth token Args: mail (str): Email address. credentials: [google.oauth2.credentials.Credentials][] '''oauth=self.find_one({'_id':mail},{'token':1})ifoauthand'token'inoauth:data=oauth['token']else:data={}data['token']=credentials.tokenifcredentials.refresh_token:data['refresh_token']=credentials.refresh_tokendata['token_uri']=credentials.token_uridata['id_token']=credentials.id_tokendata['scopes']=credentials.scopesself.find_one_and_update({'_id':mail},{'$set':{'token':data}},upsert=True)defsetup_owner(self,mail:str,uid:str)->None:''' Setup user id into `owner` field Args: mail (str): Email address. uid (str): User id. '''self.find_one_and_update({'_id':mail},{'$set':{'owner':uid}})
defadd_data(self,mail:str,data:dict[str,Any])->dict[str,Any]:''' Add user data Args: mail (str): Email address as unique key to save. data (dict): The data from Google OAuth return. Returns: Return the inserted / updated data. '''returnself.find_one_and_update({'_id':mail},{'$set':{'data':data}},upsert=True)
defsetup_owner(self,mail:str,uid:str)->None:''' Setup user id into `owner` field Args: mail (str): Email address. uid (str): User id. '''self.find_one_and_update({'_id':mail},{'$set':{'owner':uid}})