Skip to content

Powertrain Event Service API Client

Bases: BasePowertrainAPIClient

Powertrain Event Service API Client.

Swagger documentation

Source code in reportconnectors/api_client/powertrain/event_service/__init__.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class PowertrainEventServiceAPIClient(BasePowertrainAPIClient):
    """
    Powertrain Event Service API Client.

    [Swagger documentation](https://motion-pt-dev-we-eventservice-api.azurewebsites.net/swagger/index.html)
    """

    _status_prefix = "/eventservice"

    def get_event_comments(self, event_id: int) -> EventCommentsResponse:
        """
        Gets the event related comments.

        Args:
            event_id: Identifier of the event

        Returns:
            Asset properties.
        """
        endpoint = f"/api/eventservice/Event/{event_id}/Comments"
        response = self._make_request(method="GET", endpoint=endpoint)
        response_model = self._decode_response_to_model(response, EventCommentsResponse)
        return response_model

    def get_events(
        self,
        asset_ids: Union[int, Sequence[int]],
        start_date: datetime.datetime,
        end_date: datetime.datetime,
        use_cursor: bool = False,
        page_size: int = 100,
        **kwargs,
    ) -> EventSearchResponse:
        """
        Returns the events for the given asset ids and date range.

        It's based on the `Event/Search` endpoint.

        It also allows to filter events by severity, status, cause, existence of comments, binary logger,
        data loggers, parameter backup, and message text.

        Results are paginated with a default page size of 100.
        However, if you set `use_cursor=True`, it will fetch all the events,
        by using the cursor returned in the response.

        Args:
            asset_ids: Asset identifier or list of asset identifiers.
            start_date: Start date of the search.
            end_date: End date of the search.
            use_cursor: If True, it will fetch all the events using the cursor returned in the response. Otherwise,
                it will fetch only the first page with the results. Default: `False`.
            page_size: Number of events per page. Default: `100`.

        Keyword Args:
            severity_codes (List[str]): List of severity codes to filter the events.
            status_codes (List[str]): List of status codes to filter the events.
            cause_codes (List[str]): List of cause codes to filter the events.
            has_comments (bool): If True, it will filter the events that have comments.
            has_binary_logger (bool): If True, it will filter the events that have binary logger.
            has_data_loggers (bool): If True, it will filter the events that have data loggers.
            has_parameter_backup (bool): If True, it will filter the events that have parameter backup.
            message_text_contains (str): Text to search in the message text.
            cursor (str): Cursor that points to a specific results page.

        Returns:
            EventSearchResponse object, that has the list of events and the next cursor.
        """
        endpoint = "/api/eventservice/Event/Search"
        asset_ids = [asset_ids] if isinstance(asset_ids, int) else list(asset_ids)
        data = {
            "assetIds": asset_ids,
            "timestampFrom": start_date.strftime(self._datetime_format),
            "timestampTo": end_date.strftime(self._datetime_format),
            "pageSize": page_size,
        }
        # Since API expect extra filtering parameters in camelCase and the kwargs are provided in the snake_case
        # we need to convert the keys to camelCase and update the data payload
        converted_kwargs = {to_camel(k): v for k, v in kwargs.items()}
        data.update(converted_kwargs)

        response = self._make_request(method="POST", endpoint=endpoint, json_data=data)
        response_model = self._decode_response_to_model(response, EventSearchResponse)

        # If the `use_cursor` is True and the first response has `next_cursor`
        # we request the next page until the cursor is None.
        # Results is a merged response model.
        while use_cursor and response_model.next_cursor is not None:
            data["cursor"] = response_model.next_cursor
            next_response = self._make_request(method="POST", endpoint=endpoint, json_data=data)
            next_response_model = self._decode_response_to_model(next_response, EventSearchResponse)
            response_model.events += next_response_model.events
            response_model.next_cursor = next_response_model.next_cursor
        return response_model

get_event_comments(event_id)

Gets the event related comments.

Parameters:

Name Type Description Default
event_id int

Identifier of the event

required

Returns:

Type Description
EventCommentsResponse

Asset properties.

Source code in reportconnectors/api_client/powertrain/event_service/__init__.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def get_event_comments(self, event_id: int) -> EventCommentsResponse:
    """
    Gets the event related comments.

    Args:
        event_id: Identifier of the event

    Returns:
        Asset properties.
    """
    endpoint = f"/api/eventservice/Event/{event_id}/Comments"
    response = self._make_request(method="GET", endpoint=endpoint)
    response_model = self._decode_response_to_model(response, EventCommentsResponse)
    return response_model

get_events(asset_ids, start_date, end_date, use_cursor=False, page_size=100, **kwargs)

Returns the events for the given asset ids and date range.

It's based on the Event/Search endpoint.

It also allows to filter events by severity, status, cause, existence of comments, binary logger, data loggers, parameter backup, and message text.

Results are paginated with a default page size of 100. However, if you set use_cursor=True, it will fetch all the events, by using the cursor returned in the response.

Parameters:

Name Type Description Default
asset_ids Union[int, Sequence[int]]

Asset identifier or list of asset identifiers.

required
start_date datetime

Start date of the search.

required
end_date datetime

End date of the search.

required
use_cursor bool

If True, it will fetch all the events using the cursor returned in the response. Otherwise, it will fetch only the first page with the results. Default: False.

False
page_size int

Number of events per page. Default: 100.

100

Other Parameters:

Name Type Description
severity_codes List[str]

List of severity codes to filter the events.

status_codes List[str]

List of status codes to filter the events.

cause_codes List[str]

List of cause codes to filter the events.

has_comments bool

If True, it will filter the events that have comments.

has_binary_logger bool

If True, it will filter the events that have binary logger.

has_data_loggers bool

If True, it will filter the events that have data loggers.

has_parameter_backup bool

If True, it will filter the events that have parameter backup.

message_text_contains str

Text to search in the message text.

cursor str

Cursor that points to a specific results page.

Returns:

Type Description
EventSearchResponse

EventSearchResponse object, that has the list of events and the next cursor.

Source code in reportconnectors/api_client/powertrain/event_service/__init__.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_events(
    self,
    asset_ids: Union[int, Sequence[int]],
    start_date: datetime.datetime,
    end_date: datetime.datetime,
    use_cursor: bool = False,
    page_size: int = 100,
    **kwargs,
) -> EventSearchResponse:
    """
    Returns the events for the given asset ids and date range.

    It's based on the `Event/Search` endpoint.

    It also allows to filter events by severity, status, cause, existence of comments, binary logger,
    data loggers, parameter backup, and message text.

    Results are paginated with a default page size of 100.
    However, if you set `use_cursor=True`, it will fetch all the events,
    by using the cursor returned in the response.

    Args:
        asset_ids: Asset identifier or list of asset identifiers.
        start_date: Start date of the search.
        end_date: End date of the search.
        use_cursor: If True, it will fetch all the events using the cursor returned in the response. Otherwise,
            it will fetch only the first page with the results. Default: `False`.
        page_size: Number of events per page. Default: `100`.

    Keyword Args:
        severity_codes (List[str]): List of severity codes to filter the events.
        status_codes (List[str]): List of status codes to filter the events.
        cause_codes (List[str]): List of cause codes to filter the events.
        has_comments (bool): If True, it will filter the events that have comments.
        has_binary_logger (bool): If True, it will filter the events that have binary logger.
        has_data_loggers (bool): If True, it will filter the events that have data loggers.
        has_parameter_backup (bool): If True, it will filter the events that have parameter backup.
        message_text_contains (str): Text to search in the message text.
        cursor (str): Cursor that points to a specific results page.

    Returns:
        EventSearchResponse object, that has the list of events and the next cursor.
    """
    endpoint = "/api/eventservice/Event/Search"
    asset_ids = [asset_ids] if isinstance(asset_ids, int) else list(asset_ids)
    data = {
        "assetIds": asset_ids,
        "timestampFrom": start_date.strftime(self._datetime_format),
        "timestampTo": end_date.strftime(self._datetime_format),
        "pageSize": page_size,
    }
    # Since API expect extra filtering parameters in camelCase and the kwargs are provided in the snake_case
    # we need to convert the keys to camelCase and update the data payload
    converted_kwargs = {to_camel(k): v for k, v in kwargs.items()}
    data.update(converted_kwargs)

    response = self._make_request(method="POST", endpoint=endpoint, json_data=data)
    response_model = self._decode_response_to_model(response, EventSearchResponse)

    # If the `use_cursor` is True and the first response has `next_cursor`
    # we request the next page until the cursor is None.
    # Results is a merged response model.
    while use_cursor and response_model.next_cursor is not None:
        data["cursor"] = response_model.next_cursor
        next_response = self._make_request(method="POST", endpoint=endpoint, json_data=data)
        next_response_model = self._decode_response_to_model(next_response, EventSearchResponse)
        response_model.events += next_response_model.events
        response_model.next_cursor = next_response_model.next_cursor
    return response_model