Source code for ipernity.api

"""
Ipernity API Class
=======================

All funcionality in PyIpernity is available through the :class:`IpernityAPI`
class. 
"""

from __future__ import annotations

import json
import os
from logging import getLogger
from time import sleep
from typing import Any, Iterable, Mapping, Union, TYPE_CHECKING

import requests

from .auth import AuthHandler, auth_methods
from .method import IpernityMethod
from .exceptions import APIRequestError, UnknownMethod, UploadError

if TYPE_CHECKING:
    api_arg = Union[str, float, int]

log = getLogger(__name__)

methodsfile = os.path.join(
    os.path.dirname(__file__),
    'methods.json'
)
with open(methodsfile, 'r') as mf:
    _methods: Mapping[str, Mapping[str, Any]] = json.load(mf)


[docs] class IpernityAPI: """ Encapsulates Ipernity functionality. See :ref:`calling-api-methods` for access to the individual API methods. Args: api_key: The API key obtained from Ipernity. api_secret: The secret belonging to the API key. token: API token. Can be given as a string or as a mapping. When given as a mapping, the actual token is extracted as ``token['token']``, ``token['user']`` is stored as user information (see :attr:`~IpernityAPI.user_info`), and ``token['permissions']`` is stored as :attr:`permissions`. The format of the mapping should be like the return data of :iper:`auth.getToken`. auth: Authentication methop, can be ``desktop``, ``web`` or a subclass of :class:`~ipernity.auth.AuthHandler` (not an instance thereof!). The authentication handler is set accordingly. url: API URL, should normally be left alone. auth_url_base: Base for Authentication URLs, should normally be left alone. .. seealso:: * `Ipernity API methods <http://www.ipernity.com/help/api>`_ .. versionchanged:: 0.3.1 * New argument ``auth_url_base`` * URLs default to HTTPS .. versionchanged:: 0.3.0 ``auth`` can be a subclass of :class:`~ipernity.auth.AuthHandler`. """ # Methods data retrieved from http://api.ipernity.com/api/api.methods.getList/json __methods__ = _methods def __init__( self, api_key: str, api_secret: str, token: str | Mapping | None = None, auth: str | AuthHandler = 'desktop', url: str = 'https://api.ipernity.com/api/', auth_url_base: str = 'https://www.ipernity.com/apps/authorize' ): log.debug('Creating API object with key %s', api_key) self._api_key = api_key self._api_secret = api_secret self.token = token self._url = url self._auth_url_base = auth_url_base if isinstance(auth, type) and issubclass(auth, AuthHandler): self._auth = auth(self) elif auth in auth_methods: self._auth = auth_methods[auth](self) else: raise ValueError(f'Authentication method {auth} is not supported') def __getattr__(self, name: str) -> IpernityMethod: """Returns an IpernityMethod object for the given method""" if name.startswith('_'): raise AttributeError(f'Attribute {name} not found') return IpernityMethod(self, name) @property def auth(self) -> AuthHandler: """The authentication handler""" return self._auth @property def api_key(self) -> str: """ The API key generated by Ipernity Assigning a new value will set :attr:`token` to None. .. versionadded:: 0.3.0 """ return self._api_key @api_key.setter def api_key(self, value: str): self._api_key = value self.token = None @property def api_secret(self) -> str: """ The API secret generated by Ipernity Assigning a new value will set :attr:`token` to None. .. versionadded:: 0.3.0 """ return self._api_secret @api_secret.setter def api_secret(self, value: str): self._api_secret = value self.token = None @property def token(self) -> str: """ The authentication token When setting, the new value can be given as a string or as ``dict``. If given as a dict, the actual token is extracted as ``token['token']``. """ return self._token @token.setter def token(self, value: str | Mapping | None): if isinstance(value, dict): self._token = value['token'] if 'user' in value: self._user = value['user'] else: self._user = None if 'permissions' in value: self._perm = value['permissions'] else: self._perm = None else: self._token = value self._user = None self._perm = None @property def user_info(self) -> dict | None: """ Information about the current user """ if self._user is None: if self.token is not None: self._check_token() return self._user @property def permissions(self) -> dict | None: """ Information about the current permissions .. versionadded:: 0.1.5 """ if self._perm is None: if self.token is not None: self._check_token() return self._perm _permission_values = ['none', 'read', 'write', 'delete']
[docs] def has_permissions(self, permissions: Mapping[str, str] | None) -> bool: """ Checks if the API has at least the given permissions. Returns False if the token is ``None``. .. versionadded:: 0.1.5 """ if self.token is None: return False if permissions is None: return True for tgt, perm in permissions.items(): if tgt.startswith('perm_'): tgt = tgt[5:] if ( self._permission_values.index(perm) > self._permission_values.index(self.permissions[tgt]) ): return False return True
def _check_token(self): auth = self.auth.checkToken(self.token)['auth'] self._user = auth['user'] self._perm = auth['permissions']
[docs] def call(self, method_name: str, **kwargs: api_arg) -> dict: """ Makes an API call. Args: method_name: API method to call kwargs: API arguments Raises: UnknownMethod: Tried to call a method not contained in :iper:`api.methods.getList`. APIRequestError: The API call returned an error, or the HTTP request failed. .. versionchanged:: 0.2.0 An HTTP error raises ``APIRequestError`` instead of ``HTTPError``. """ if method_name not in self.__methods__: raise UnknownMethod(method_name) url = self._url + method_name + '/json' response = self.auth.do_request(url, method_name, kwargs) # Check for HTTP errors if not response.ok: raise APIRequestError( 'httperror', response.status_code, response.reason, method_name, kwargs ) result = response.json() # Check return data for errors if result['api']['status'] != 'ok': raise APIRequestError( result['api']['status'], result['api']['code'], result['api']['message'], method_name, kwargs ) log.debug(f'Returning {result}') return result
[docs] def upload_file(self, filename: str, **kwargs: api_arg) -> str: """ Simplified interface to uploading a file Args: filename: The file to be uploaded. Can be relative or absolute. kwargs: Additional attributes for :iper:`upload.file`. Returns: The ``doc_id`` of the uploaded file. Raises: UploadError: The ticket gets invalid. """ # noqa: E501 ticket = self.upload.file(file=filename, **kwargs)['ticket'] done = False while not done: status = self.upload.checkTickets(tickets = ticket)['tickets']['ticket'][0] if status['id'] != ticket: raise UploadError( filename, ticket, f'{filename}: API returned incorrect ticket {status["id"]}, expected {ticket}' ) if int(status.get('invalid', '0')): raise UploadError( filename, ticket ) done = int(status.get('done', '0')) if done: id_ = status['doc_id'] else: sleep(int(status['eta'])) log.debug('Got id=%s for filename=%s', id_, filename) return id_
[docs] def walk_data( self, method_name: str, elem_name: str | None = None, **kwargs: api_arg ) -> Iterable[dict]: """ Iterates over an arbitrary API search/list. ``walk_data`` guesses the structure of the returned JSON object from the API method used. If this does not work, the ``elem_name`` argument can be given to specify the object keys: * If ``elem_name`` contains dots, the last part is taken to be the innermost key that points to the list of elements to iterate over, while the preceding parts specify the outer keys. * If ``elem_name`` does not contain dots, the returned JSON is assumed to contain a key "``elem_name``+s" pointing to an object that contains a key "``elem_name``" that contains the list of results. Args: method_name: Search method to call. The method must accept the ``page`` argument. elem_name: Name of list elements. kwargs: Argument for the search method. Use ``per_page`` to set the number of returned elements per method call. Yields: ``dict`` containing the element data. """ if elem_name is None: # Guess element name if not given. mparts = method_name.split('.') if len(mparts) == 2: elem_name = mparts[0] list_name = [elem_name + 's'] else: list_name = mparts[0:-1] elem_name = mparts[-2][:-1] else: if '.' in elem_name: mparts = elem_name.split('.') list_name = mparts[:-1] elem_name = mparts[-1] else: list_name = [elem_name + 's'] if 'page' in kwargs: page = kwargs['page'] del kwargs['page'] else: page = 1 pages = page # total pages while page <= pages: log.debug(f'Fetching page {page} of {method_name} {kwargs}') res = self.call(method_name, page = page, **kwargs) for key in list_name: res = res[key] if 'pages' in res: pages = int(res['pages']) else: total = int(res['total']) per_page = int(res['per_page']) pages = total // per_page if total % per_page: pages += 1 if elem_name in res: yield from res[elem_name] else: log.debug('No key %s in result', elem_name) page += 1
[docs] def walk_albums(self, **kwargs: api_arg) -> Iterable[dict]: """ Iterates over a user's albums. See the `album.getList documentation <http://www.ipernity.com/help/api/method/album.getList>`_ for possible arguments. """ return self.walk_data('album.getList', **kwargs)
[docs] def walk_album_docs(self, album_id: int, **kwargs: api_arg) -> Iterable[dict]: """ Iterates over the documents of an album. See the `album.docs.getList documentation <http://www.ipernity.com/help/api/method/album.docs.getList>`_ for optional arguments. Args: album_id: The album's ID. kwargs: Additional arguments for :iper:`album.docs.getList` """ return self.walk_data( 'album.docs.getList', album_id = album_id, **kwargs )
[docs] def walk_docs(self, **kwargs: api_arg) -> Iterable[dict]: """ Iterates over a user's documents. See the `doc.getList documentation <http://www.ipernity.com/help/api/method/doc.getList>`_ for possible arguments. """ return self.walk_data('doc.getList', **kwargs)
[docs] def walk_folders(self, **kwargs: api_arg) -> Iterable[dict]: """ Iterates over a user's folders. See the `folder.getList documentation <http://www.ipernity.com/help/api/method/folder.getList>`_ for possible arguments. """ return self.walk_data('folder.getList', **kwargs)
[docs] def walk_folder_albums(self, folder_id: int, **kwargs: api_arg) -> Iterable[dict]: """ Iterates over the albums of a folder. See the `folder.albums.getList documentation <http://www.ipernity.com/help/api/method/folder.albums.getList>`_ for optional arguments. Args: folder_id: The folder's ID. kwargs: Additional arguments for :iper:`folder.albums.getList` """ return self.walk_data( 'folder.albums.getList', folder_id = folder_id, **kwargs )