Source code for sentinelhub.api.batch.utils

"""
Module implementing utilities for working with batch jobs.
"""

from __future__ import annotations

import logging
import time
from typing import Union

from tqdm.auto import tqdm

from ...config import SHConfig
from ...types import JsonDict
from .base import BatchRequestStatus
from .process import BatchProcessClient, BatchProcessRequest
from .statistical import BatchStatisticalRequest, SentinelHubBatchStatistical

LOGGER = logging.getLogger(__name__)

BatchStatisticalRequestSpec = Union[str, dict, BatchStatisticalRequest]


_MIN_SLEEP_TIME = 60
_DEFAULT_SLEEP_TIME = 120
_MIN_STAT_SLEEP_TIME = 15
_DEFAULT_STAT_SLEEP_TIME = 30
_MIN_ANALYSIS_SLEEP_TIME = 5
_DEFAULT_ANALYSIS_SLEEP_TIME = 10


[docs]def monitor_batch_process_job( request: BatchProcessRequest, client: BatchProcessClient, sleep_time: int = _DEFAULT_SLEEP_TIME, analysis_sleep_time: int = _DEFAULT_ANALYSIS_SLEEP_TIME, ) -> BatchProcessRequest: """A utility function that keeps checking the progress of the batch processing job. Returns an updated version of the request Notes: - Before calling this function make sure to start a batch job by calling `BatchProcessingClient.start_job` method. In case a batch job is still being analysed this function will wait until the analysis ends. - This function will be continuously collecting information from Sentinel Hub service. To avoid making too many requests please make sure to adjust `sleep_time` parameter. :param request: The request to monitor. :param client: A batch processing client with appropriate configuration that is used to monitor the batch job. :param sleep_time: Number of seconds to sleep between consecutive progress bar updates. :param analysis_sleep_time: Number of seconds between consecutive status updates during analysis phase. """ if sleep_time < _MIN_SLEEP_TIME: raise ValueError(f"To avoid making too many service requests please set sleep_time>={_MIN_SLEEP_TIME}") batch_request: BatchProcessRequest = monitor_batch_process_analysis(request, client, sleep_time=analysis_sleep_time) if batch_request.status is BatchRequestStatus.PROCESSING: LOGGER.info("Batch job is running") completion = batch_request.completion_percentage progress_bar = tqdm(total=100, initial=completion, desc="Completion percentage") monitoring_status = [BatchRequestStatus.ANALYSIS_DONE, BatchRequestStatus.PROCESSING] with progress_bar: while completion < 100 and batch_request.status in monitoring_status: time.sleep(sleep_time) batch_request = client.get_request(batch_request) progress_bar.update(batch_request.completion_percentage - completion) completion = batch_request.completion_percentage while batch_request.status in monitoring_status: LOGGER.info("Waiting on batch job status update, currently %s", batch_request.status) time.sleep(sleep_time) batch_request = client.get_request(batch_request) LOGGER.info("Batch job finished with status %s", batch_request.status.value) return batch_request
[docs]def monitor_batch_statistical_job( batch_request: BatchStatisticalRequestSpec, config: SHConfig | None = None, sleep_time: int = _DEFAULT_STAT_SLEEP_TIME, analysis_sleep_time: int = _DEFAULT_ANALYSIS_SLEEP_TIME, ) -> JsonDict: """A utility function that keeps checking the completion percentage of a Batch Statistical request until complete. Notes: - Before calling this function make sure to start a batch job via `SentinelHubBatchStatistical.start_job` method. In case a batch job is still being analysed this function will wait until the analysis ends. - Some information about the progress of this function is reported to logging level INFO. :param batch_request: An object with information about a batch request. Alternatively, it could only be a batch request id or a payload. :param config: A configuration object with required parameters `sh_client_id`, `sh_client_secret`, and `sh_auth_base_url` which is used for authentication and `sh_base_url` which defines the service deployment where Batch API will be called. :param sleep_time: Number of seconds to sleep between consecutive progress bar updates. :param analysis_sleep_time: Number of seconds between consecutive status updates during analysis phase. :return: Final status of the batch request. """ if sleep_time < _MIN_STAT_SLEEP_TIME: raise ValueError(f"To avoid making too many service requests please set sleep_time>={_MIN_STAT_SLEEP_TIME}") batch_request = monitor_batch_statistical_analysis(batch_request, config, sleep_time=analysis_sleep_time) if batch_request.status is BatchRequestStatus.PROCESSING: LOGGER.info("Batch job is running") batch_client = SentinelHubBatchStatistical(config=config) request_status = batch_client.get_status(batch_request) progress = request_status["completionPercentage"] with tqdm(total=100, initial=progress, desc="Completion percentage") as progress_bar: while progress < 100: time.sleep(sleep_time) request_status = batch_client.get_status(batch_request) new_progress = request_status["completionPercentage"] progress_bar.update(new_progress - progress) progress = new_progress return request_status
[docs]def monitor_batch_process_analysis( request: BatchProcessRequest, client: BatchProcessClient, sleep_time: int = _DEFAULT_ANALYSIS_SLEEP_TIME, ) -> BatchProcessRequest: """A utility function that is waiting until analysis phase of a batch job finishes and regularly checks its status. In case analysis phase failed it raises an error at the end. :param request: The request to monitor. :param client: A batch processing client with appropriate configuration that is used to monitor the batch job. :param sleep_time: Number of seconds between consecutive status updates during analysis phase. """ if sleep_time < _MIN_ANALYSIS_SLEEP_TIME: raise ValueError( f"To avoid making too many service requests please set analysis sleep time >={_MIN_ANALYSIS_SLEEP_TIME}" ) batch_request = client.get_request(request) while batch_request.status in [BatchRequestStatus.CREATED, BatchRequestStatus.ANALYSING]: LOGGER.info("Batch job has a status %s, sleeping for %d seconds", batch_request.status.value, sleep_time) time.sleep(sleep_time) batch_request = client.get_request(batch_request) batch_request.raise_for_status(status=[BatchRequestStatus.FAILED, BatchRequestStatus.STOPPED]) return batch_request
[docs]def monitor_batch_statistical_analysis( batch_request: BatchStatisticalRequestSpec, config: SHConfig | None = None, sleep_time: int = _DEFAULT_ANALYSIS_SLEEP_TIME, ) -> BatchStatisticalRequest: """A utility function that is waiting until analysis phase of a batch job finishes and regularly checks its status. In case analysis phase failed it raises an error at the end. :param batch_request: An object with information about a batch request. Alternatively, it could only be a batch request id or a payload. :param config: A configuration object with required parameters `sh_client_id`, `sh_client_secret`, and `sh_auth_base_url` which is used for authentication and `sh_base_url` which defines the service deployment where Batch API will be called. :param sleep_time: Number of seconds between consecutive status updates during analysis phase. :return: Batch request info """ if sleep_time < _MIN_ANALYSIS_SLEEP_TIME: raise ValueError( f"To avoid making too many service requests please set analysis sleep time >={_MIN_ANALYSIS_SLEEP_TIME}" ) batch_client = SentinelHubBatchStatistical(config=config) request_status = BatchRequestStatus(batch_client.get_status(batch_request)["status"]) while request_status in [BatchRequestStatus.CREATED, BatchRequestStatus.ANALYSING]: LOGGER.info("Batch job has a status %s, sleeping for %d seconds", request_status.value, sleep_time) time.sleep(sleep_time) request_status = BatchRequestStatus(batch_client.get_status(batch_request)["status"]) batch_request = batch_client.get_request(batch_request) batch_request.raise_for_status(status=[BatchRequestStatus.FAILED, BatchRequestStatus.STOPPED]) return batch_request