Source code for

Module implementing a rate-limited multithreaded download client for downloading from Sentinel Hub service

from __future__ import annotations

import logging
import time
import warnings
from threading import Lock
from typing import Any, Callable, ClassVar, TypeVar

import requests
from requests import Response

from ..config import SHConfig
from ..constants import SHConstants
from ..exceptions import OutOfRequestsException, SHRateLimitWarning, SHRuntimeWarning
from ..types import JsonDict
from .client import DownloadClient
from .handlers import fail_user_errors, retry_temporary_errors
from .models import DownloadRequest, DownloadResponse
from .rate_limit import SentinelHubRateLimit
from .session import SentinelHubSession

LOGGER = logging.getLogger(__name__)

T = TypeVar("T")

[docs]class SentinelHubDownloadClient(DownloadClient): """Download client specifically configured for download from Sentinel Hub service""" _CACHED_SESSIONS: ClassVar[dict[tuple[str, str], SentinelHubSession]] = {} _UNIVERSAL_CACHE_KEY = "universal-user", "default-url" def __init__(self, *, session: SentinelHubSession | None = None, default_retry_time: float = 30, **kwargs: Any): """ :param session: If a session object is provided here then this client instance will always use only the provided session. Otherwise, it will either use a cached session or create a new session and cache it. :param default_retry_time: The default waiting time (in seconds) when retrying after getting a TOO_MANY_REQUESTS response without appropriate retry headers. :param kwargs: Optional parameters from DownloadClient """ super().__init__(**kwargs) if session is not None and not isinstance(session, SentinelHubSession): raise ValueError( f"A session parameter has to be an instance of {SentinelHubSession.__name__} or None, but " f"{session} was given" ) self.session = session self.default_retry_time = default_retry_time * 1000 # rescale to milliseconds self.rate_limit = SentinelHubRateLimit(num_processes=self.config.number_of_download_processes) self.lock: Lock | None = None
[docs] def download(self, *args: Any, **kwargs: Any) -> Any: """The main download method :param args: Passed to `` :param kwargs: Passed to `` """ # Because the Lock object cannot be pickled we create it only here and remove it afterward self.lock = Lock() try: return super().download(*args, **kwargs) finally: self.lock = None
@retry_temporary_errors @fail_user_errors def _execute_download(self, request: DownloadRequest) -> DownloadResponse: """ Executes the download with a single thread and uses a rate limit object, which is shared between all threads """ download_attempts = 0 while True: sleep_time = self._execute_thread_safe(self.rate_limit.register_next) if sleep_time == 0: download_attempts += 1 LOGGER.debug( "Sending %s request to %s. Hash of sent request is %s", request.request_type.value, request.url, request.get_hashed_name(), ) response = self._do_download(request) if response.status_code == warnings.warn("Download rate limit hit", category=SHRateLimitWarning) if self.config.max_retries is not None and download_attempts >= self.config.max_retries: raise OutOfRequestsException("Maximum number of download attempts reached") self._execute_thread_safe(self.rate_limit.update, response.headers, default=self.default_retry_time) continue response.raise_for_status() LOGGER.debug("Successful %s request to %s", request.request_type.value, request.url) return DownloadResponse.from_response(response, request) LOGGER.debug("Request needs to wait. Sleeping for %0.2f", sleep_time) time.sleep(sleep_time) def _execute_thread_safe(self, thread_unsafe_function: Callable[..., T], *args: Any, **kwargs: Any) -> T: """Executes a function inside a thread lock and handles potential errors""" if self.lock is None: return thread_unsafe_function(*args, **kwargs) with self.lock: return thread_unsafe_function(*args, **kwargs) def _do_download(self, request: DownloadRequest) -> Response: """Runs the download""" if request.url is None: raise ValueError(f"Faulty request {request}, no URL specified.") return requests.request( request.request_type.value, url=request.url, json=request.post_values, headers=self._prepare_headers(request), timeout=self.config.download_timeout_seconds, ) def _prepare_headers(self, request: DownloadRequest) -> JsonDict: """Prepares final headers by potentially joining them with session headers. Note that in the current implementation of this method request headers have priority to overwrite default and session headers with the same keys. """ session_headers: JsonDict = {} if request.use_session: session_headers = self._execute_thread_safe(self._get_session_headers) return {**SHConstants.HEADERS, **session_headers, **request.headers} def _get_session_headers(self) -> JsonDict: """Provides up-to-date session headers Note that calling session_headers property triggers update if session has expired therefore this has to be called in a thread-safe way """ return self.get_session().session_headers
[docs] def get_session(self) -> SentinelHubSession: """Provides the session object used by the client :return: A Sentinel Hub session object """ if self.session: return self.session cache_key = self._get_cache_key(self.config) if cache_key in SentinelHubDownloadClient._CACHED_SESSIONS: session = SentinelHubDownloadClient._CACHED_SESSIONS[cache_key] elif SentinelHubDownloadClient._UNIVERSAL_CACHE_KEY in SentinelHubDownloadClient._CACHED_SESSIONS: session = SentinelHubDownloadClient._CACHED_SESSIONS[SentinelHubDownloadClient._UNIVERSAL_CACHE_KEY] else: session = SentinelHubSession(config=self.config) SentinelHubDownloadClient._CACHED_SESSIONS[cache_key] = session return session
[docs] @staticmethod def cache_session(session: SentinelHubSession, universal: bool = False) -> None: """Cache a Sentinel Hub session for to be reused by all instances of `SentinelHubDownloadClient` and its child classes within the same Python runtime environment. :param session: A session object to be cached. :param universal: By default a session is cached for a specific OAuth user ID and Sentinel Hub deployment. But if this flag is set to `True` it will cache session for any OAuth user ID and deployment. The intended purpose of this parameter is that when a session is sent to a remote processing instance, which doesn't have configured Sentinel Hub OAuth credentials, then the session can still be used even without credentials. """ if not isinstance(session, SentinelHubSession): raise ValueError( f"Given object should be an instance of {SentinelHubSession.__name__} but {session} was given" ) cache_key = ( SentinelHubDownloadClient._UNIVERSAL_CACHE_KEY if universal else SentinelHubDownloadClient._get_cache_key(session) ) SentinelHubDownloadClient._CACHED_SESSIONS[cache_key] = session
@staticmethod def _get_cache_key(config_or_session: SentinelHubSession | SHConfig) -> tuple[str, str]: """Calculates a cache key for the given session or config object. The key consists of an OAuth client ID and a base service URL. """ if isinstance(config_or_session, SHConfig): return config_or_session.sh_client_id, config_or_session.sh_base_url if isinstance(config_or_session, SentinelHubSession): base_url = config_or_session.config.sh_base_url # If session was generated from token then config_or_session.config.sh_client_id could have wrong client id. sh_client_id ="azp", "") if not sh_client_id: warnings.warn( "Failed to read client ID from OAuth token. Session caching might not work correctly.", category=SHRuntimeWarning, ) return sh_client_id, base_url raise ValueError(f"Expected a config or a session object but got {config_or_session}")
[docs] @staticmethod def clear_cache() -> None: """Clears cached sessions.""" SentinelHubDownloadClient._CACHED_SESSIONS = {}