"""
Module implementing an interface with
`Sentinel Hub Batch Processing API <https://docs.sentinel-hub.com/api/latest/api/batch/>`__.
"""
# ruff: noqa: FA100
# do not use `from __future__ import annotations`, it clashes with `dataclass_json`
import datetime as dt
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from dataclasses_json import CatchAll, LetterCase, Undefined, dataclass_json
from dataclasses_json import config as dataclass_config
from ...constants import RequestType
from ...data_collections import DataCollection
from ...geometry import CRS, BBox, Geometry
from ...types import Json, JsonDict
from ..base import BaseCollection, SentinelHubFeatureIterator
from ..process import SentinelHubRequest
from ..utils import datetime_config, enum_config, remove_undefined
from .base import BaseBatchClient, BaseBatchRequest, BatchRequestStatus, BatchUserAction
LOGGER = logging.getLogger(__name__)
BatchRequestType = Union[str, dict, "BatchRequest"]
BatchCollectionType = Union[str, dict, "BatchCollection"]
[docs]class BatchTileStatus(Enum):
"""An enum class with all possible batch tile statuses"""
PENDING = "PENDING"
SCHEDULED = "SCHEDULED"
QUEUED = "QUEUED"
PROCESSING = "PROCESSING"
PROCESSED = "PROCESSED"
FAILED = "FAILED"
[docs]class SentinelHubBatch(BaseBatchClient):
"""An interface class for Sentinel Hub Batch API
For more info check `Batch API reference <https://docs.sentinel-hub.com/api/latest/reference/#tag/batch_process>`__.
"""
# pylint: disable=too-many-public-methods
@staticmethod
def _get_service_url(base_url: str) -> str:
"""Provides URL to Catalog API"""
return f"{base_url}/api/v1/batch"
[docs] def create(
self,
sentinelhub_request: Union[SentinelHubRequest, JsonDict],
tiling_grid: Dict[str, Any],
output: Optional[Dict[str, Any]] = None,
bucket_name: Optional[str] = None,
description: Optional[str] = None,
**kwargs: Any,
) -> "BatchRequest":
"""Create a new batch request
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/createNewBatchProcessingRequest>`__
:param sentinelhub_request: An instance of SentinelHubRequest class containing all request parameters.
Alternatively, it can also be just a payload dictionary for Process API request
:param tiling_grid: A dictionary with tiling grid parameters. It can be built with `tiling_grid` method
:param output: A dictionary with output parameters. It can be built with `output` method. Alternatively, one
can set `bucket_name` parameter instead.
:param bucket_name: A name of an S3 bucket where to save data. Alternatively, one can set `output` parameter
to specify more output parameters.
:param description: A description of a batch request
:param kwargs: Any other arguments to be added to a dictionary of parameters.
:return: An instance of `SentinelHubBatch` object that represents a newly created batch request.
"""
if isinstance(sentinelhub_request, SentinelHubRequest):
request_dict = sentinelhub_request.download_list[0].post_values
else:
request_dict = sentinelhub_request
if not isinstance(request_dict, dict):
raise ValueError(
"Parameter sentinelhub_request should be an instance of SentinelHubRequest or a "
"dictionary with a request payload"
)
payload = {
"processRequest": request_dict,
"tilingGrid": tiling_grid,
"output": output,
"bucketName": bucket_name,
"description": description,
**kwargs,
}
payload = remove_undefined(payload)
url = self._get_processing_url()
request_info = self.client.get_json_dict(url, post_values=payload, use_session=True)
return BatchRequest.from_dict(request_info)
[docs] @staticmethod
def tiling_grid(
grid_id: int, resolution: float, buffer: Optional[Tuple[int, int]] = None, **kwargs: Any
) -> JsonDict:
"""A helper method to build a dictionary with tiling grid parameters
:param grid_id: An ID of a tiling grid
:param resolution: A grid resolution
:param buffer: Optionally, a buffer around each tile can be defined. It can be defined with a tuple of integers
`(buffer_x, buffer_y)`, which specifies a number of buffer pixels in horizontal and vertical directions.
:param kwargs: Any other arguments to be added to a dictionary of parameters
:return: A dictionary with parameters
"""
payload = {"id": grid_id, "resolution": resolution, **kwargs}
if buffer:
payload = {**payload, "bufferX": buffer[0], "bufferY": buffer[1]}
return payload
[docs] @staticmethod
def output(
*,
default_tile_path: Optional[str] = None,
overwrite: Optional[bool] = None,
skip_existing: Optional[bool] = None,
cog_output: Optional[bool] = None,
cog_parameters: Optional[Dict[str, Any]] = None,
create_collection: Optional[bool] = None,
collection_id: Optional[str] = None,
responses: Optional[List[str]] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""A helper method to build a dictionary with tiling grid parameters
:param default_tile_path: A path or a template on an s3 bucket where to store results. More info at Batch API
documentation
:param overwrite: A flag specifying if a request should overwrite existing outputs without failing
:param skip_existing: A flag specifying if existing outputs should be overwritten
:param cog_output: A flag specifying if outputs should be written in COGs (cloud-optimized GeoTIFFs )or
normal GeoTIFFs
:param cog_parameters: A dictionary specifying COG creation parameters
:param create_collection: If True the results will be written in COGs and a batch collection will be created
:param collection_id: If True results will be added to an existing collection
:param responses: Specification of path template for individual outputs/responses
:param kwargs: Any other arguments to be added to a dictionary of parameters
:return: A dictionary of output parameters
"""
return remove_undefined(
{
"defaultTilePath": default_tile_path,
"overwrite": overwrite,
"skipExisting": skip_existing,
"cogOutput": cog_output,
"cogParameters": cog_parameters,
"createCollection": create_collection,
"collectionId": collection_id,
"responses": responses,
**kwargs,
}
)
[docs] def iter_tiling_grids(self, **kwargs: Any) -> SentinelHubFeatureIterator:
"""An iterator over tiling grids
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/getBatchTilingGridsProperties>`__
:param kwargs: Any other request query parameters
:return: An iterator over tiling grid definitions
"""
return SentinelHubFeatureIterator(
client=self.client,
url=self._get_tiling_grids_url(),
params=remove_undefined(kwargs),
exception_message="Failed to obtain information about available tiling grids",
)
[docs] def get_tiling_grid(self, grid_id: Union[int, str]) -> JsonDict:
"""Provides a single tiling grid
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/getBatchTilingGridProperties>`__
:param grid_id: An ID of a requested tiling grid
:return: A tiling grid definition
"""
url = self._get_tiling_grids_url(grid_id)
return self.client.get_json_dict(url=url, use_session=True)
[docs] def iter_requests(
self, user_id: Optional[str] = None, search: Optional[str] = None, sort: Optional[str] = None, **kwargs: Any
) -> Iterator["BatchRequest"]:
"""Iterate existing batch requests
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/getAllBatchProcessRequests>`__
:param user_id: Filter requests by a user id who defined a request
:param search: A search query to filter requests
:param sort: A sort query
:param kwargs: Any additional parameters to include in a request query
:return: An iterator over existing batch requests
"""
params = remove_undefined({"userid": user_id, "search": search, "sort": sort, **kwargs})
feature_iterator = SentinelHubFeatureIterator(
client=self.client, url=self._get_processing_url(), params=params, exception_message="No requests found"
)
for request_info in feature_iterator:
yield BatchRequest.from_dict(request_info)
[docs] def get_latest_request(self) -> "BatchRequest":
"""Provides a batch request that has been created the latest
:return: Batch request info
"""
latest_request_iter = self.iter_requests(sort="created:desc", count=1)
try:
return next(latest_request_iter)
except StopIteration as exception:
raise ValueError("No batch request is available") from exception
[docs] def get_request(self, batch_request: BatchRequestType) -> "BatchRequest":
"""Collects information about a single batch request
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/getSingleBatchProcessRequestById>`__
:return: Batch request info
"""
request_id = self._parse_request_id(batch_request)
request_info = self.client.get_json_dict(url=self._get_processing_url(request_id), use_session=True)
return BatchRequest.from_dict(request_info)
[docs] def update_request(
self,
batch_request: BatchRequestType,
output: Optional[Dict[str, Any]] = None,
description: Optional[str] = None,
**kwargs: Any,
) -> Json:
"""Update batch job request parameters
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/updateBatchProcessRequest>`__
Similarly to `update_info` method, this method also updates local information in the current instance of
`SentinelHubBatch`.
:param batch_request: It could be a batch request object, a raw batch request payload or only a batch
request ID.
:param output: A dictionary with output parameters to be updated.
:param description: A description of a batch request to be updated.
:param kwargs: Any other arguments to be added to a dictionary of parameters.
"""
request_id = self._parse_request_id(batch_request)
payload = remove_undefined({"output": output, "description": description, **kwargs})
return self.client.get_json(
url=self._get_processing_url(request_id),
post_values=payload,
request_type=RequestType.PUT,
use_session=True,
)
[docs] def delete_request(self, batch_request: BatchRequestType) -> Json:
"""Delete a batch job request
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/deleteBatchProcessRequest>`__
:param batch_request: It could be a batch request object, a raw batch request payload or only a batch
request ID.
"""
request_id = self._parse_request_id(batch_request)
return self.client.get_json(
url=self._get_processing_url(request_id), request_type=RequestType.DELETE, use_session=True
)
[docs] def start_analysis(self, batch_request: BatchRequestType) -> Json:
"""Starts analysis of a batch job request
`Batch API reference <https://docs.sentinel-hub.com/api/latest/reference/#operation/batchAnalyse>`__
:param batch_request: It could be a batch request object, a raw batch request payload or only a batch
request ID.
"""
return self._call_job(batch_request, "analyse")
[docs] def start_job(self, batch_request: BatchRequestType) -> Json:
"""Starts running a batch job
`Batch API reference <https://docs.sentinel-hub.com/api/latest/reference/#operation/batchStartProcessRequest>`__
:param batch_request: It could be a batch request object, a raw batch request payload or only a batch
request ID.
"""
return self._call_job(batch_request, "start")
[docs] def cancel_job(self, batch_request: BatchRequestType) -> Json:
"""Cancels a batch job
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/batchCancelProcessRequest>`__
:param batch_request: It could be a batch request object, a raw batch request payload or only a batch
request ID.
"""
return self._call_job(batch_request, "cancel")
[docs] def restart_job(self, batch_request: BatchRequestType) -> Json:
"""Restarts only those parts of a job that failed
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/batchRestartPartialProcessRequest>`__
:param batch_request: It could be a batch request object, a raw batch request payload or only a batch
request ID.
"""
return self._call_job(batch_request, "restartpartial")
[docs] def iter_tiles(
self, batch_request: BatchRequestType, status: Union[None, "BatchTileStatus", str] = None, **kwargs: Any
) -> SentinelHubFeatureIterator:
"""Iterate over info about batch request tiles
`Batch API reference <https://docs.sentinel-hub.com/api/latest/reference/#operation/getAllBatchProcessTiles>`__
:param batch_request: It could be a batch request object, a raw batch request payload or only a batch
request ID.
:param status: A filter to obtain only tiles with a certain status
:param kwargs: Any additional parameters to include in a request query
:return: An iterator over information about each tile
"""
request_id = self._parse_request_id(batch_request)
if isinstance(status, BatchTileStatus):
status = status.value
return SentinelHubFeatureIterator(
client=self.client,
url=self._get_tiles_url(request_id),
params={"status": status, **kwargs},
exception_message="No tiles found, please run analysis on batch request before calling this method",
)
[docs] def get_tile(self, batch_request: BatchRequestType, tile_id: Optional[int]) -> JsonDict:
"""Provides information about a single batch request tile
`Batch API reference <https://docs.sentinel-hub.com/api/latest/reference/#operation/getBatchTileById>`__
:param batch_request: It could be a batch request object, a raw batch request payload or only a batch
request ID.
:param tile_id: An ID of a tile
:return: Information about a tile
"""
request_id = self._parse_request_id(batch_request)
url = self._get_tiles_url(request_id, tile_id=tile_id)
return self.client.get_json_dict(url, use_session=True)
[docs] def iter_collections(self, search: Optional[str] = None, **kwargs: Any) -> SentinelHubFeatureIterator:
"""Iterate over batch collections
`Batch API reference <https://docs.sentinel-hub.com/api/latest/reference/#operation/getAllBatchCollections>`__
:param search: A search query to filter collections
:param kwargs: Any additional parameters to include in a request query
:return: An iterator over existing batch collections
"""
return SentinelHubFeatureIterator(
client=self.client,
url=self._get_collections_url(),
params={"search": search, **kwargs},
exception_message="Failed to obtain information about available Batch collections",
)
[docs] def get_collection(self, collection_id: str) -> JsonDict:
"""Get batch collection by its id
`Batch API reference
<https://docs.sentinel-hub.com/api/latest/reference/#operation/getSingleBatchCollectionById>`__
:param collection_id: A batch collection id
:return: A dictionary of the collection parameters
"""
url = self._get_collections_url(collection_id)
return self.client.get_json_dict(url=url, use_session=True, extract_key="data")
[docs] def create_collection(self, collection: Union["BatchCollection", dict]) -> JsonDict:
"""Create a new batch collection
`Batch API reference <https://docs.sentinel-hub.com/api/latest/reference/#operation/createNewBatchCollection>`__
:param collection: Batch collection definition
:return: A dictionary of a newly created collection
"""
collection_payload = self._parse_collection_to_dict(collection)
url = self._get_collections_url()
return self.client.get_json_dict(url=url, post_values=collection_payload, use_session=True, extract_key="data")
[docs] def update_collection(self, collection: Union["BatchCollection", dict]) -> Json:
"""Update an existing batch collection
`Batch API reference <https://docs.sentinel-hub.com/api/latest/reference/#operation/updateBatchCollection>`__
:param collection: Batch collection definition
"""
collection_id = self._parse_collection_id(collection)
return self.client.get_json(
url=self._get_collections_url(collection_id),
post_values=self._parse_collection_to_dict(collection),
request_type=RequestType.PUT,
use_session=True,
)
[docs] def delete_collection(self, collection: BatchCollectionType) -> Json:
"""Delete an existing batch collection
`Batch API reference <https://docs.sentinel-hub.com/api/latest/reference/#operation/deleteBatchCollection>`__
:param collection: Batch collection id or object
"""
collection_id = self._parse_collection_id(collection)
return self.client.get_json(
url=self._get_collections_url(collection_id), request_type=RequestType.DELETE, use_session=True
)
def _get_processing_url(self, request_id: Optional[str] = None) -> str:
"""Creates a URL for process endpoint"""
url = f"{self.service_url}/process"
if request_id is None:
return url
return f"{url}/{request_id}"
def _get_tiles_url(self, request_id: str, tile_id: Union[None, str, int] = None) -> str:
"""Creates a URL for tiles endpoint"""
url = f"{self._get_processing_url(request_id)}/tiles"
if tile_id is None:
return url
return f"{url}/{tile_id}"
def _get_tiling_grids_url(self, grid_id: Union[None, str, int] = None) -> str:
"""Creates a URL for tiling grids endpoint"""
url = f"{self.service_url}/tilinggrids"
if grid_id is None:
return url
return f"{url}/{grid_id}"
def _get_collections_url(self, collection_id: Optional[str] = None) -> str:
"""Creates a URL for batch collections endpoint"""
url = f"{self.service_url}/collections"
if collection_id is None:
return url
return f"{url}/{collection_id}"
@staticmethod
def _parse_collection_id(data: BatchCollectionType) -> Optional[str]:
"""Parses batch collection id from multiple possible inputs"""
if isinstance(data, (BatchCollection, DataCollection)):
return data.collection_id
if isinstance(data, dict):
return data["id"]
if isinstance(data, str):
return data
raise ValueError(f"Expected a BatchCollection dataclass, dictionary or a string, got {data}.")
@staticmethod
def _parse_collection_to_dict(data: Union["BatchCollection", dict]) -> dict:
"""Constructs a dictionary from given object"""
if isinstance(data, BatchCollection):
return data.to_dict() # type: ignore[attr-defined]
if isinstance(data, dict):
return data
raise ValueError(f"Expected either a BatchCollection or a dict, got {data}.")
[docs]@dataclass_json(letter_case=LetterCase.CAMEL, undefined=Undefined.INCLUDE)
@dataclass(repr=False)
class BatchRequest(BaseBatchRequest): # pylint: disable=abstract-method
"""A dataclass object that holds information about a batch request"""
# dataclass_json doesn't handle parameter inheritance correctly
# pylint: disable=duplicate-code
request_id: str = field(metadata=dataclass_config(field_name="id"))
process_request: dict
tile_count: int
status: BatchRequestStatus = field(metadata=enum_config(BatchRequestStatus))
user_id: Optional[str] = None
created: Optional[dt.datetime] = field(metadata=datetime_config, default=None)
tiling_grid: dict = field(default_factory=dict)
output: dict = field(default_factory=dict)
bucket_name: Optional[str] = None
description: Optional[str] = None
value_estimate: Optional[float] = None
tile_width_px: Optional[int] = None
tile_height_px: Optional[int] = None
user_action: Optional[BatchUserAction] = field(metadata=enum_config(BatchUserAction), default=None)
user_action_updated: Optional[str] = field(metadata=datetime_config, default=None)
error: Optional[str] = None
other_data: CatchAll = field(default_factory=dict)
_REPR_PARAM_NAMES = (
"request_id",
"description",
"bucket_name",
"created",
"status",
"user_action",
"value_estimate",
"tile_count",
)
@property
def evalscript(self) -> str:
"""Provides an evalscript used by a batch request
:return: An evalscript
"""
return self.process_request["evalscript"]
@property
def bbox(self) -> Optional[BBox]:
"""Provides a bounding box used by a batch request
:return: An area bounding box together with CRS
:raises: ValueError
"""
bbox, _, crs = self._parse_bounds_payload()
return None if bbox is None else BBox(bbox, crs) # type: ignore[arg-type]
@property
def geometry(self) -> Optional[Geometry]:
"""Provides a geometry used by a batch request
:return: An area geometry together with CRS
:raises: ValueError
"""
_, geometry, crs = self._parse_bounds_payload()
return None if geometry is None else Geometry(geometry, crs)
def _parse_bounds_payload(self) -> Tuple[Optional[List[float]], Optional[list], CRS]:
"""Parses bbox, geometry and crs from batch request payload. If bbox or geometry don't exist it returns None
instead.
"""
bounds_definition = self.process_request["input"]["bounds"]
crs = CRS(bounds_definition["properties"]["crs"].rsplit("/", 1)[-1])
return bounds_definition.get("bbox"), bounds_definition.get("geometry"), crs
[docs]@dataclass_json(letter_case=LetterCase.CAMEL, undefined=Undefined.INCLUDE)
@dataclass
class BatchCollectionBatchData:
"""Dataclass to hold batch collection batchData part of the payload"""
tiling_grid_id: Optional[int] = None
other_data: CatchAll = field(default_factory=dict)
[docs]@dataclass_json(letter_case=LetterCase.CAMEL, undefined=Undefined.INCLUDE)
@dataclass
class BatchCollectionAdditionalData:
"""Dataclass to hold batch collection additionalData part of the payload"""
bands: Optional[Dict[str, Any]] = None
other_data: CatchAll = field(default_factory=dict)
[docs]class BatchCollection(BaseCollection):
"""Dataclass for batch collections"""
batch_data: Optional[BatchCollectionBatchData] = None
additional_data: Optional[BatchCollectionAdditionalData] = None