Skip to content

CIAM API Client

Bases: BaseAPIClient

Source code in reportconnectors/api_client/ciam/__init__.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
class CiamAPIClient(BaseAPIClient):
    __version__ = "0.1.0"
    _accepted_algorithms = ("RS256",)

    class KeyNames(BaseAPIClient.KeyNames):
        OAUTH_TOKEN = "oauth_token"
        JWT_PAYLOAD = "jwt_payload"

    def __init__(self, url: str, **kwargs):
        """
        Initialize the CIAM client with the URL of the API.

        It is based on the BaseAPIClient class and extends it with the CIAM specific methods.
        Therefore, it supports all keyword arguments of the BaseAPIClient class.

        Args:
            url: URL of the CIAM OAuth2 API.

        Keyword Args:
            jwks (Dict): JSON Web Keys store. Default: `{}`
            openid_config (OpenIdConfiguration): OpenID configuration data model. If not provided, it will be obtained
                from the local store file.
            token_refresh_function (TokenRefreshFunction): External function to refresh the token. If not provided then
                the token refreshment will be done locally. If there are several clients using the same access_token
                the token refreshment can be done externally to avoid multiple refreshes and race conditions.
        """
        self._jwks_store: Optional[Dict] = kwargs.pop("jwks", None)
        self._openid_config: Optional[OpenIdConfiguration] = kwargs.pop("openid_config", None)
        self._token_refresh_function: Optional[TokenRefreshFunction] = kwargs.pop("token_refresh_function", None)
        super().__init__(url=url, **kwargs)

        self._token_refresh_dates: List[datetime.datetime] = []
        if self._openid_config is None:
            self._openid_config = get_ciam_openid_config_from_local_file(url=url)
        if self._jwks_store is None:
            self._jwks_store = get_ciam_jwks_store_from_local_file(url=url)

    def authenticate(self, client_id: str, client_secret: str, password: str, username: str, **kwargs) -> bool:
        """
        Authenticate to the CIAM APIs using grant type `password`.

        To authenticate we need to provide client_id, client_secret, username and password.

        Additionally, we can provide the token_url, which is the URL of the authentication provider.
        If not provided, the URL is obtained from the OpenID configuration.

        As a result it sets the access token, id token, refresh token and expiration date in the auth_data,
        and returns True if the authentication is successful and the client is logged in.

        Args:
            client_id: Client ID
            client_secret: Client secret
            password: User password
            username: Username

        Keyword Args:
            token_url (str): Authentication token URL. If not provided, it's obtained from the OpenID configuration.
            timeout (float): Timeout for the authentication process. Default: `60`

        Returns:
            True if client is successfully authenticated and access token in obtained and set. Otherwise, False.
        """
        token_url = kwargs.pop("token_url", None)
        timeout = kwargs.pop("timeout", self._default_timeout)

        if not token_url:
            token_url = str(self.get_openid_config().token_endpoint)

        try:
            token = self.get_token(
                token_url=token_url,
                client_id=client_id,
                client_secret=client_secret,
                username=username,
                password=password,
                proxies=self._proxies,
                timeout=timeout,
                **kwargs,
            )
            self.auth_data[self.KeyNames.OAUTH_TOKEN] = token
        except (OAuth2Error, ValidationError) as e:
            log.warning(f"Authentication failed: {e}")
            return False

        self.set_token(
            access_token=token.access_token,
            refresh_token=token.refresh_token,
            id_token=token.id_token,
            expiration_date=token.expires_at,
            client_id=client_id,
        )
        return self.is_logged

    @property
    def is_logged(self) -> bool:
        """
        Checks if the client is logged in to the API.
        """
        time_now = datetime.datetime.now(datetime.timezone.utc)
        _is_logged = (
            bool(self.auth_token)
            and isinstance(self.token_expiration_date, datetime.datetime)
            and (self.token_expiration_date > time_now)
        )
        return _is_logged

    def decode_jwt_payload(self, token: str, validate: bool = False, **kwargs) -> Optional[JwtPayload]:
        """
        Decodes the JWT token and returns the payload.

        If the validate flag is set to True, the token will be validated. Otherwise, it will be just decoded.
        If the audience is provided, the token will be validated with the audience check. Otherwise, it will be
        validated without the audience check.

        If the JWK (JSON Web Key) is provided, it will be used for the token validation.
        Otherwise, the JWK will be obtained from the JWKS endpoint.

        Args:
            token: Authentication token to decode
            validate: If set to True, the token will be validated. Default: False

        Keyword Args:
            audience (str): Expected audience value of the token. In CIAM it's equal to Client ID. If not provided,
                the token will be validated without the audience check.
            jwk (dict): JSON Web Key to be used for the token validation. If not provided, it will be obtained from the
                JWKS endpoint.

        Returns:
            JWT payload if the token is successfully decoded and validated. Otherwise, None.
        """

        audience: Optional[str] = kwargs.get("audience", self.client_id)
        jwk: Optional[Dict] = kwargs.get("jwk", None)
        user_key: str = kwargs.get("key", "")

        key: Union[str, RSAPublicKey, RSAPrivateKey]
        if not validate:
            key = ""
        elif user_key:
            key = user_key
        else:
            if not jwk:
                kid = str(jwt.get_unverified_header(token).get("kid"))
                jwk = self.get_jwk(kid=kid)
            key = RSAAlgorithm.from_jwk(jwk) if jwk and jwk.get("alg") in self._accepted_algorithms else ""
            key = key if isinstance(key, RSAPublicKey) else ""

        options = {
            "verify_aud": bool(audience),
            "verify_signature": validate,
            "verify_exp": False,
        }
        log.debug("Decoding token with the following options: %s", options)
        if key:
            log.debug("Using public key to validate token signature")
        try:
            payload = jwt.decode(
                token,
                key=key,
                algorithms=list(self._accepted_algorithms),
                audience=audience,
                options=options,
            )
            jwt_payload = JwtPayload.model_validate(payload)
            log.debug("Token decoded successfully. Payload: %s", jwt_payload)
            return jwt_payload
        except (ValueError, TypeError, jwt.exceptions.InvalidTokenError) as e:
            log.exception(e)
            return None

    def refresh_authentication(
        self,
        force: bool = False,
        refresh_token: Optional[str] = None,
        client_id: Optional[str] = None,
        token_url: Optional[str] = None,
        auto_refresh_delta: datetime.timedelta = datetime.timedelta(minutes=5),
    ) -> bool:
        """
        Refreshes the tokens using the refresh token. If no refresh token is provided, the one from the
        `refresh_token` property is used.

        If token's remaining time is more than auto_refresh_delta (5 min by default) then the token is not refreshed.
        However, it can be forced by setting force to True.

        Args:
            force: If true then the token refresh process is executed regardless of the above conditions. Default: False
            refresh_token: Custom refresh token to be used. If not provided, then the `self.refresh_token` property
                is used.
            client_id: Client ID to be used for the token refreshment. If not provided, it is taken from client
                auth data.
            token_url: Token URL that identifies the authentication provider. If not provided, it is taken from the
                OpenID configuration.
            auto_refresh_delta: Time delta before the token expiration date when the token should be refreshed.
                Default: 5 minutes.

        Returns:
            True, if the token refresh process is successful. False, otherwise.
        """

        # Check if it's time to refresh
        expiration_date = self.auth_data.get(
            self.KeyNames.EXPIRATION_DATE, datetime.datetime.now(datetime.timezone.utc)
        )
        if not (force or isinstance(expiration_date, datetime.datetime)):
            return False
        deadline = expiration_date - auto_refresh_delta
        is_time_to_refresh = force or (deadline < datetime.datetime.now(datetime.timezone.utc))
        if not is_time_to_refresh:
            return False
        # Check required inputs
        _refresh_token = refresh_token if refresh_token else self.refresh_token
        _client_id = client_id if client_id else self.client_id
        if not (_refresh_token and _client_id):
            log.debug(f"Not enough data to refresh the token. {_refresh_token=}, {_client_id=}")
            return False
        # Finally perform the token refreshment
        _token_url = token_url if token_url else str(self.get_openid_config().token_endpoint)
        try:
            if self._token_refresh_function is not None:
                token = self._token_refresh_function(refresh_token=_refresh_token, client_id=_client_id)
                log.debug(
                    f"Token refreshed successfully with external function. New expiration date: {token.expires_at}"
                )
            else:
                token = self.get_token(token_url=_token_url, refresh_token=_refresh_token, client_id=_client_id)
                log.debug(f"Token refreshed successfully with local function. New expiration date: {token.expires_at}")
        except (OAuth2Error, ValidationError) as e:
            log.warning(f"Token refresh failed: {e}")
            return False

        self.auth_data[self.KeyNames.OAUTH_TOKEN] = token
        status = self.set_token(
            access_token=token.access_token,
            refresh_token=token.refresh_token,
            id_token=token.id_token,
            expiration_date=token.expires_at,
        )
        if status:
            self._token_refresh_dates.append(datetime.datetime.now(datetime.timezone.utc))
            log.debug(f"Access token has been refreshed. New expiration date is: {self.token_expiration_date}")
        return status

    @classmethod
    def get_token(
        cls,
        token_url: str,
        client_id: Optional[str] = None,
        client_secret: Optional[str] = None,
        username: Optional[str] = None,
        password: Optional[str] = None,
        refresh_token: Optional[str] = None,
        **kwargs,
    ) -> OAuth2TokenResponse:
        """
        Gets OpenID token from the auth provider using the password authentication flow.

        It uses the `client_id` and `client_secret` with `username` and `password` to authenticate communicate
        with authentication provided at `token_url`.

        By default, it is the CIAM portal, but in theory any OAuth2 provided will do.

        Args:
            token_url: Token URL that identifies the authentication provider.
            client_id: Client ID
            client_secret: Client Secret
            username: Username
            password: User password
            refresh_token: Refresh token to be used for the token refreshment.

        Keyword Args:
            timeout (float): Timeout for the authentication process. Default: `60`
            content_type (str): Content type of the request. Default: `application/x-www-form-urlencoded`
            proxies (Optional[Dict]): Proxy settings. Default: `None`
            scope (List[str]): List of required scopes. Default: `["openid"]`

        Returns:
             OAuth2TokenResponse object with the response from the authentication provider.

        Raises:
            OAuth2Error: If the authentication fails.
            ValidationError: If the response is not valid.

        Notes:
            When authorizing at CIAM the User-Agent header is expected. That is the
            reason why we add the custom headers to the fetch_token method call.
        """
        timeout: float = kwargs.get("timeout", 60.0)
        content_type: str = kwargs.get("content_type", "application/x-www-form-urlencoded")
        proxies: Optional[Dict] = kwargs.get("proxies", None)
        scope: List[str] = kwargs.get("scope", ["openid"])
        verify: Union[str, bool] = kwargs.get("verify", True)

        headers = {"User-Agent": f"{cls.__name__}", "Content-Type": content_type}

        log.debug(f"Requesting token from the authentication provider. {token_url=} {client_id=}")
        if refresh_token and client_id:
            oauth_session = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
            response = oauth_session.refresh_token(
                token_url=token_url,
                refresh_token=refresh_token,
                proxies=proxies,
                timeout=timeout,
                headers=headers,
                verify=verify,
                client_id=client_id,
            )
            log.debug(f"Token refreshed. {response=}")

        elif client_id and client_secret and username and password:
            oauth_session = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
            response = oauth_session.fetch_token(
                token_url=token_url,
                client_id=client_id,
                client_secret=client_secret,
                scope=scope,
                username=username,
                password=password,
                include_client_id=True,
                proxies=proxies,
                timeout=timeout,
                headers=headers,
                verify=verify,
            )
            log.debug(f"Token fetched using grant_type='password'. {response=}")
        else:
            raise ValueError("Not enough data to authenticate.")
        token = OAuth2TokenResponse.model_validate(response)
        return token

    def set_token(self, access_token: str, validate: bool = False, **kwargs) -> bool:
        """
        Sets the authorization tokens without the authentication process. It might be used when the
        tokens are obtained from external source.

        We expect the access token to be JWT Bearer token. We also support the refresh token and the ID token as input
        parameters. If the validate flag is set to True, we will validate the access token's signature
        using the available public keys. Otherwise, we will just decode the access token to check its payload.

        If token expiration date is not provided, it will be extracted from the access token payload.
        Only if the token is valid and not expired, it will be set.

        Args:
            access_token: Access token to be set.
            validate: If set to True, the token will be validated. Default: False

        Keyword Args:
            refresh_token (str): Refresh token to be set.
            id_token (str): Refresh token to be set.
            client_id (str): Client ID to be set. If not provided, it will be extracted from the token's payload.
            expiration_date (datetime.datetime): Expiration date of the access token. If not provided,
                it will be extracted from the token's payload.

        Returns:
            True if provided token is set and valid. Otherwise, False.
        """
        refresh_token: Optional[str] = kwargs.pop("refresh_token", None)
        id_token: Optional[str] = kwargs.pop("id_token", None)
        client_id: Optional[str] = kwargs.pop("client_id", None)
        expiration_date: Optional[datetime.datetime] = kwargs.pop("expiration_date", None)

        jwt_payload = self.decode_jwt_payload(token=access_token, validate=validate, **kwargs)
        if jwt_payload is None:
            log.error("Invalid token provided.")
            return False

        if expiration_date is None:
            expiration_date = jwt_payload.expiration_date
        if client_id is None:
            client_id = jwt_payload.client_id

        if expiration_date < datetime.datetime.now(datetime.timezone.utc):
            log.error(f"Token is expired. {expiration_date=}")
        # Set new tokens ...
        self.auth_data[self.KeyNames.ACCESS_TOKEN] = access_token
        self.auth_data[self.KeyNames.EXPIRATION_DATE] = expiration_date
        self.auth_data[self.KeyNames.CLIENT_ID] = client_id
        self.auth_data[self.KeyNames.JWT_PAYLOAD] = jwt_payload
        if refresh_token:
            self.auth_data[self.KeyNames.REFRESH_TOKEN] = refresh_token
        if id_token:
            self.auth_data[self.KeyNames.ID_TOKEN] = id_token
        return True

    def get_openid_config(
        self,
        endpoint: str = "/polaris/oidc/.well-known/openid-configuration",
        force: bool = False,
    ) -> OpenIdConfiguration:
        """
        Gets OpenID config from the ".well-known/openid-configuration" endpoint.

        Args:
            endpoint: URL to the Well-Known endpoint. Default: `/polaris/oidc/.well-known/openid-configuration`
            force: If True then the config read from the ".well-known" endpoint is forced. Default: `False`

        Returns:
            OpenID configuration data model.

        """
        if isinstance(self._openid_config, OpenIdConfiguration) and not force:
            return self._openid_config

        response = self._make_request(endpoint=endpoint, method="GET")
        self._openid_config = self._decode_response_to_model(response, OpenIdConfiguration)
        log.debug(f"OpenID configuration read from the {endpoint=}.")
        return self._openid_config

    def get_jwk(self, kid: str) -> Optional[Dict]:
        """
        Gets the JSON Web Key identified by the `kid` key.
        If the key is not found in the local store, it will be updated from the JWKS endpoint.

        Args:
            kid: JWK private key identifier

        Returns:
            Private key as a dictionary.
        """
        if self._jwks_store is None:
            return None
        if kid not in self._jwks_store:
            self._update_jwks()
        return self._jwks_store.get(kid, None)

    def get_user_info(self) -> UserInfo:
        """
        Gets the user information from the CIAM's OIDC userinfo endpoint.

        Returns:
            User info data model.
        """
        endpoint = "/polaris/oidc/userinfo"

        response = self._make_request(endpoint=endpoint, method="GET")
        user_info = self._decode_response_to_model(response, model_type=UserInfo)
        return user_info

    def _update_jwks(self, jwks_uri: Optional[str] = None) -> None:
        """
        Updates the private keys from the CIAM by using the JWKS uri.
        If the JWKS uri is not provided, it will be obtained from the OpenID configuration.

        Args:
            jwks_uri: JWKS uri to be used for the private keys update. If not provided, it will be obtained from the
                OpenID configuration.
        """
        if not jwks_uri:
            oauth_config = self.get_openid_config()
            jwks_uri = str(oauth_config.jwks_uri)
        response = self._make_request(url=jwks_uri, method="GET")
        jwks_data: Dict[str, Any] = self._decode_response(response, default={})
        keys = jwks_data.get("keys", [])
        if self._jwks_store is None:
            self._jwks_store = {}
        for jwk in keys:
            kid = jwk.get("kid")
            if kid:
                self._jwks_store[kid] = jwk
                log.debug(f"Key {kid} added to the private keys.")
        return None

    def logout(self, end_session_url: Optional[str] = None) -> bool:
        """
        Ends session with CIAM client and fully logs out from the API.

        Args:
            end_session_url: End session URL. If not provided then the one from OpenID configuration used

        Returns:
            True, if fully logged out. Otherwise, False.

        """
        if not end_session_url:
            oauth_config = self.get_openid_config()
            end_session_url = str(oauth_config.end_session_endpoint)

        response = self._make_request(url=end_session_url, method="GET")
        super().logout()
        if response is None:
            return False
        is_logged_out = (response.status_code == HTTPStatus.OK) and (self.auth_data == {})
        return is_logged_out

is_logged property

Checks if the client is logged in to the API.

__init__(url, **kwargs)

Initialize the CIAM client with the URL of the API.

It is based on the BaseAPIClient class and extends it with the CIAM specific methods. Therefore, it supports all keyword arguments of the BaseAPIClient class.

Parameters:

Name Type Description Default
url str

URL of the CIAM OAuth2 API.

required

Other Parameters:

Name Type Description
jwks Dict

JSON Web Keys store. Default: {}

openid_config OpenIdConfiguration

OpenID configuration data model. If not provided, it will be obtained from the local store file.

token_refresh_function TokenRefreshFunction

External function to refresh the token. If not provided then the token refreshment will be done locally. If there are several clients using the same access_token the token refreshment can be done externally to avoid multiple refreshes and race conditions.

Source code in reportconnectors/api_client/ciam/__init__.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(self, url: str, **kwargs):
    """
    Initialize the CIAM client with the URL of the API.

    It is based on the BaseAPIClient class and extends it with the CIAM specific methods.
    Therefore, it supports all keyword arguments of the BaseAPIClient class.

    Args:
        url: URL of the CIAM OAuth2 API.

    Keyword Args:
        jwks (Dict): JSON Web Keys store. Default: `{}`
        openid_config (OpenIdConfiguration): OpenID configuration data model. If not provided, it will be obtained
            from the local store file.
        token_refresh_function (TokenRefreshFunction): External function to refresh the token. If not provided then
            the token refreshment will be done locally. If there are several clients using the same access_token
            the token refreshment can be done externally to avoid multiple refreshes and race conditions.
    """
    self._jwks_store: Optional[Dict] = kwargs.pop("jwks", None)
    self._openid_config: Optional[OpenIdConfiguration] = kwargs.pop("openid_config", None)
    self._token_refresh_function: Optional[TokenRefreshFunction] = kwargs.pop("token_refresh_function", None)
    super().__init__(url=url, **kwargs)

    self._token_refresh_dates: List[datetime.datetime] = []
    if self._openid_config is None:
        self._openid_config = get_ciam_openid_config_from_local_file(url=url)
    if self._jwks_store is None:
        self._jwks_store = get_ciam_jwks_store_from_local_file(url=url)

authenticate(client_id, client_secret, password, username, **kwargs)

Authenticate to the CIAM APIs using grant type password.

To authenticate we need to provide client_id, client_secret, username and password.

Additionally, we can provide the token_url, which is the URL of the authentication provider. If not provided, the URL is obtained from the OpenID configuration.

As a result it sets the access token, id token, refresh token and expiration date in the auth_data, and returns True if the authentication is successful and the client is logged in.

Parameters:

Name Type Description Default
client_id str

Client ID

required
client_secret str

Client secret

required
password str

User password

required
username str

Username

required

Other Parameters:

Name Type Description
token_url str

Authentication token URL. If not provided, it's obtained from the OpenID configuration.

timeout float

Timeout for the authentication process. Default: 60

Returns:

Type Description
bool

True if client is successfully authenticated and access token in obtained and set. Otherwise, False.

Source code in reportconnectors/api_client/ciam/__init__.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def authenticate(self, client_id: str, client_secret: str, password: str, username: str, **kwargs) -> bool:
    """
    Authenticate to the CIAM APIs using grant type `password`.

    To authenticate we need to provide client_id, client_secret, username and password.

    Additionally, we can provide the token_url, which is the URL of the authentication provider.
    If not provided, the URL is obtained from the OpenID configuration.

    As a result it sets the access token, id token, refresh token and expiration date in the auth_data,
    and returns True if the authentication is successful and the client is logged in.

    Args:
        client_id: Client ID
        client_secret: Client secret
        password: User password
        username: Username

    Keyword Args:
        token_url (str): Authentication token URL. If not provided, it's obtained from the OpenID configuration.
        timeout (float): Timeout for the authentication process. Default: `60`

    Returns:
        True if client is successfully authenticated and access token in obtained and set. Otherwise, False.
    """
    token_url = kwargs.pop("token_url", None)
    timeout = kwargs.pop("timeout", self._default_timeout)

    if not token_url:
        token_url = str(self.get_openid_config().token_endpoint)

    try:
        token = self.get_token(
            token_url=token_url,
            client_id=client_id,
            client_secret=client_secret,
            username=username,
            password=password,
            proxies=self._proxies,
            timeout=timeout,
            **kwargs,
        )
        self.auth_data[self.KeyNames.OAUTH_TOKEN] = token
    except (OAuth2Error, ValidationError) as e:
        log.warning(f"Authentication failed: {e}")
        return False

    self.set_token(
        access_token=token.access_token,
        refresh_token=token.refresh_token,
        id_token=token.id_token,
        expiration_date=token.expires_at,
        client_id=client_id,
    )
    return self.is_logged

decode_jwt_payload(token, validate=False, **kwargs)

Decodes the JWT token and returns the payload.

If the validate flag is set to True, the token will be validated. Otherwise, it will be just decoded. If the audience is provided, the token will be validated with the audience check. Otherwise, it will be validated without the audience check.

If the JWK (JSON Web Key) is provided, it will be used for the token validation. Otherwise, the JWK will be obtained from the JWKS endpoint.

Parameters:

Name Type Description Default
token str

Authentication token to decode

required
validate bool

If set to True, the token will be validated. Default: False

False

Other Parameters:

Name Type Description
audience str

Expected audience value of the token. In CIAM it's equal to Client ID. If not provided, the token will be validated without the audience check.

jwk dict

JSON Web Key to be used for the token validation. If not provided, it will be obtained from the JWKS endpoint.

Returns:

Type Description
Optional[JwtPayload]

JWT payload if the token is successfully decoded and validated. Otherwise, None.

Source code in reportconnectors/api_client/ciam/__init__.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def decode_jwt_payload(self, token: str, validate: bool = False, **kwargs) -> Optional[JwtPayload]:
    """
    Decodes the JWT token and returns the payload.

    If the validate flag is set to True, the token will be validated. Otherwise, it will be just decoded.
    If the audience is provided, the token will be validated with the audience check. Otherwise, it will be
    validated without the audience check.

    If the JWK (JSON Web Key) is provided, it will be used for the token validation.
    Otherwise, the JWK will be obtained from the JWKS endpoint.

    Args:
        token: Authentication token to decode
        validate: If set to True, the token will be validated. Default: False

    Keyword Args:
        audience (str): Expected audience value of the token. In CIAM it's equal to Client ID. If not provided,
            the token will be validated without the audience check.
        jwk (dict): JSON Web Key to be used for the token validation. If not provided, it will be obtained from the
            JWKS endpoint.

    Returns:
        JWT payload if the token is successfully decoded and validated. Otherwise, None.
    """

    audience: Optional[str] = kwargs.get("audience", self.client_id)
    jwk: Optional[Dict] = kwargs.get("jwk", None)
    user_key: str = kwargs.get("key", "")

    key: Union[str, RSAPublicKey, RSAPrivateKey]
    if not validate:
        key = ""
    elif user_key:
        key = user_key
    else:
        if not jwk:
            kid = str(jwt.get_unverified_header(token).get("kid"))
            jwk = self.get_jwk(kid=kid)
        key = RSAAlgorithm.from_jwk(jwk) if jwk and jwk.get("alg") in self._accepted_algorithms else ""
        key = key if isinstance(key, RSAPublicKey) else ""

    options = {
        "verify_aud": bool(audience),
        "verify_signature": validate,
        "verify_exp": False,
    }
    log.debug("Decoding token with the following options: %s", options)
    if key:
        log.debug("Using public key to validate token signature")
    try:
        payload = jwt.decode(
            token,
            key=key,
            algorithms=list(self._accepted_algorithms),
            audience=audience,
            options=options,
        )
        jwt_payload = JwtPayload.model_validate(payload)
        log.debug("Token decoded successfully. Payload: %s", jwt_payload)
        return jwt_payload
    except (ValueError, TypeError, jwt.exceptions.InvalidTokenError) as e:
        log.exception(e)
        return None

get_jwk(kid)

Gets the JSON Web Key identified by the kid key. If the key is not found in the local store, it will be updated from the JWKS endpoint.

Parameters:

Name Type Description Default
kid str

JWK private key identifier

required

Returns:

Type Description
Optional[Dict]

Private key as a dictionary.

Source code in reportconnectors/api_client/ciam/__init__.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def get_jwk(self, kid: str) -> Optional[Dict]:
    """
    Gets the JSON Web Key identified by the `kid` key.
    If the key is not found in the local store, it will be updated from the JWKS endpoint.

    Args:
        kid: JWK private key identifier

    Returns:
        Private key as a dictionary.
    """
    if self._jwks_store is None:
        return None
    if kid not in self._jwks_store:
        self._update_jwks()
    return self._jwks_store.get(kid, None)

get_openid_config(endpoint='/polaris/oidc/.well-known/openid-configuration', force=False)

Gets OpenID config from the ".well-known/openid-configuration" endpoint.

Parameters:

Name Type Description Default
endpoint str

URL to the Well-Known endpoint. Default: /polaris/oidc/.well-known/openid-configuration

'/polaris/oidc/.well-known/openid-configuration'
force bool

If True then the config read from the ".well-known" endpoint is forced. Default: False

False

Returns:

Type Description
OpenIdConfiguration

OpenID configuration data model.

Source code in reportconnectors/api_client/ciam/__init__.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def get_openid_config(
    self,
    endpoint: str = "/polaris/oidc/.well-known/openid-configuration",
    force: bool = False,
) -> OpenIdConfiguration:
    """
    Gets OpenID config from the ".well-known/openid-configuration" endpoint.

    Args:
        endpoint: URL to the Well-Known endpoint. Default: `/polaris/oidc/.well-known/openid-configuration`
        force: If True then the config read from the ".well-known" endpoint is forced. Default: `False`

    Returns:
        OpenID configuration data model.

    """
    if isinstance(self._openid_config, OpenIdConfiguration) and not force:
        return self._openid_config

    response = self._make_request(endpoint=endpoint, method="GET")
    self._openid_config = self._decode_response_to_model(response, OpenIdConfiguration)
    log.debug(f"OpenID configuration read from the {endpoint=}.")
    return self._openid_config

get_token(token_url, client_id=None, client_secret=None, username=None, password=None, refresh_token=None, **kwargs) classmethod

Gets OpenID token from the auth provider using the password authentication flow.

It uses the client_id and client_secret with username and password to authenticate communicate with authentication provided at token_url.

By default, it is the CIAM portal, but in theory any OAuth2 provided will do.

Parameters:

Name Type Description Default
token_url str

Token URL that identifies the authentication provider.

required
client_id Optional[str]

Client ID

None
client_secret Optional[str]

Client Secret

None
username Optional[str]

Username

None
password Optional[str]

User password

None
refresh_token Optional[str]

Refresh token to be used for the token refreshment.

None

Other Parameters:

Name Type Description
timeout float

Timeout for the authentication process. Default: 60

content_type str

Content type of the request. Default: application/x-www-form-urlencoded

proxies Optional[Dict]

Proxy settings. Default: None

scope List[str]

List of required scopes. Default: ["openid"]

Returns:

Type Description
OAuth2TokenResponse

OAuth2TokenResponse object with the response from the authentication provider.

Raises:

Type Description
OAuth2Error

If the authentication fails.

ValidationError

If the response is not valid.

Notes

When authorizing at CIAM the User-Agent header is expected. That is the reason why we add the custom headers to the fetch_token method call.

Source code in reportconnectors/api_client/ciam/__init__.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
@classmethod
def get_token(
    cls,
    token_url: str,
    client_id: Optional[str] = None,
    client_secret: Optional[str] = None,
    username: Optional[str] = None,
    password: Optional[str] = None,
    refresh_token: Optional[str] = None,
    **kwargs,
) -> OAuth2TokenResponse:
    """
    Gets OpenID token from the auth provider using the password authentication flow.

    It uses the `client_id` and `client_secret` with `username` and `password` to authenticate communicate
    with authentication provided at `token_url`.

    By default, it is the CIAM portal, but in theory any OAuth2 provided will do.

    Args:
        token_url: Token URL that identifies the authentication provider.
        client_id: Client ID
        client_secret: Client Secret
        username: Username
        password: User password
        refresh_token: Refresh token to be used for the token refreshment.

    Keyword Args:
        timeout (float): Timeout for the authentication process. Default: `60`
        content_type (str): Content type of the request. Default: `application/x-www-form-urlencoded`
        proxies (Optional[Dict]): Proxy settings. Default: `None`
        scope (List[str]): List of required scopes. Default: `["openid"]`

    Returns:
         OAuth2TokenResponse object with the response from the authentication provider.

    Raises:
        OAuth2Error: If the authentication fails.
        ValidationError: If the response is not valid.

    Notes:
        When authorizing at CIAM the User-Agent header is expected. That is the
        reason why we add the custom headers to the fetch_token method call.
    """
    timeout: float = kwargs.get("timeout", 60.0)
    content_type: str = kwargs.get("content_type", "application/x-www-form-urlencoded")
    proxies: Optional[Dict] = kwargs.get("proxies", None)
    scope: List[str] = kwargs.get("scope", ["openid"])
    verify: Union[str, bool] = kwargs.get("verify", True)

    headers = {"User-Agent": f"{cls.__name__}", "Content-Type": content_type}

    log.debug(f"Requesting token from the authentication provider. {token_url=} {client_id=}")
    if refresh_token and client_id:
        oauth_session = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
        response = oauth_session.refresh_token(
            token_url=token_url,
            refresh_token=refresh_token,
            proxies=proxies,
            timeout=timeout,
            headers=headers,
            verify=verify,
            client_id=client_id,
        )
        log.debug(f"Token refreshed. {response=}")

    elif client_id and client_secret and username and password:
        oauth_session = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
        response = oauth_session.fetch_token(
            token_url=token_url,
            client_id=client_id,
            client_secret=client_secret,
            scope=scope,
            username=username,
            password=password,
            include_client_id=True,
            proxies=proxies,
            timeout=timeout,
            headers=headers,
            verify=verify,
        )
        log.debug(f"Token fetched using grant_type='password'. {response=}")
    else:
        raise ValueError("Not enough data to authenticate.")
    token = OAuth2TokenResponse.model_validate(response)
    return token

get_user_info()

Gets the user information from the CIAM's OIDC userinfo endpoint.

Returns:

Type Description
UserInfo

User info data model.

Source code in reportconnectors/api_client/ciam/__init__.py
452
453
454
455
456
457
458
459
460
461
462
463
def get_user_info(self) -> UserInfo:
    """
    Gets the user information from the CIAM's OIDC userinfo endpoint.

    Returns:
        User info data model.
    """
    endpoint = "/polaris/oidc/userinfo"

    response = self._make_request(endpoint=endpoint, method="GET")
    user_info = self._decode_response_to_model(response, model_type=UserInfo)
    return user_info

logout(end_session_url=None)

Ends session with CIAM client and fully logs out from the API.

Parameters:

Name Type Description Default
end_session_url Optional[str]

End session URL. If not provided then the one from OpenID configuration used

None

Returns:

Type Description
bool

True, if fully logged out. Otherwise, False.

Source code in reportconnectors/api_client/ciam/__init__.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def logout(self, end_session_url: Optional[str] = None) -> bool:
    """
    Ends session with CIAM client and fully logs out from the API.

    Args:
        end_session_url: End session URL. If not provided then the one from OpenID configuration used

    Returns:
        True, if fully logged out. Otherwise, False.

    """
    if not end_session_url:
        oauth_config = self.get_openid_config()
        end_session_url = str(oauth_config.end_session_endpoint)

    response = self._make_request(url=end_session_url, method="GET")
    super().logout()
    if response is None:
        return False
    is_logged_out = (response.status_code == HTTPStatus.OK) and (self.auth_data == {})
    return is_logged_out

refresh_authentication(force=False, refresh_token=None, client_id=None, token_url=None, auto_refresh_delta=datetime.timedelta(minutes=5))

Refreshes the tokens using the refresh token. If no refresh token is provided, the one from the refresh_token property is used.

If token's remaining time is more than auto_refresh_delta (5 min by default) then the token is not refreshed. However, it can be forced by setting force to True.

Parameters:

Name Type Description Default
force bool

If true then the token refresh process is executed regardless of the above conditions. Default: False

False
refresh_token Optional[str]

Custom refresh token to be used. If not provided, then the self.refresh_token property is used.

None
client_id Optional[str]

Client ID to be used for the token refreshment. If not provided, it is taken from client auth data.

None
token_url Optional[str]

Token URL that identifies the authentication provider. If not provided, it is taken from the OpenID configuration.

None
auto_refresh_delta timedelta

Time delta before the token expiration date when the token should be refreshed. Default: 5 minutes.

timedelta(minutes=5)

Returns:

Type Description
bool

True, if the token refresh process is successful. False, otherwise.

Source code in reportconnectors/api_client/ciam/__init__.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def refresh_authentication(
    self,
    force: bool = False,
    refresh_token: Optional[str] = None,
    client_id: Optional[str] = None,
    token_url: Optional[str] = None,
    auto_refresh_delta: datetime.timedelta = datetime.timedelta(minutes=5),
) -> bool:
    """
    Refreshes the tokens using the refresh token. If no refresh token is provided, the one from the
    `refresh_token` property is used.

    If token's remaining time is more than auto_refresh_delta (5 min by default) then the token is not refreshed.
    However, it can be forced by setting force to True.

    Args:
        force: If true then the token refresh process is executed regardless of the above conditions. Default: False
        refresh_token: Custom refresh token to be used. If not provided, then the `self.refresh_token` property
            is used.
        client_id: Client ID to be used for the token refreshment. If not provided, it is taken from client
            auth data.
        token_url: Token URL that identifies the authentication provider. If not provided, it is taken from the
            OpenID configuration.
        auto_refresh_delta: Time delta before the token expiration date when the token should be refreshed.
            Default: 5 minutes.

    Returns:
        True, if the token refresh process is successful. False, otherwise.
    """

    # Check if it's time to refresh
    expiration_date = self.auth_data.get(
        self.KeyNames.EXPIRATION_DATE, datetime.datetime.now(datetime.timezone.utc)
    )
    if not (force or isinstance(expiration_date, datetime.datetime)):
        return False
    deadline = expiration_date - auto_refresh_delta
    is_time_to_refresh = force or (deadline < datetime.datetime.now(datetime.timezone.utc))
    if not is_time_to_refresh:
        return False
    # Check required inputs
    _refresh_token = refresh_token if refresh_token else self.refresh_token
    _client_id = client_id if client_id else self.client_id
    if not (_refresh_token and _client_id):
        log.debug(f"Not enough data to refresh the token. {_refresh_token=}, {_client_id=}")
        return False
    # Finally perform the token refreshment
    _token_url = token_url if token_url else str(self.get_openid_config().token_endpoint)
    try:
        if self._token_refresh_function is not None:
            token = self._token_refresh_function(refresh_token=_refresh_token, client_id=_client_id)
            log.debug(
                f"Token refreshed successfully with external function. New expiration date: {token.expires_at}"
            )
        else:
            token = self.get_token(token_url=_token_url, refresh_token=_refresh_token, client_id=_client_id)
            log.debug(f"Token refreshed successfully with local function. New expiration date: {token.expires_at}")
    except (OAuth2Error, ValidationError) as e:
        log.warning(f"Token refresh failed: {e}")
        return False

    self.auth_data[self.KeyNames.OAUTH_TOKEN] = token
    status = self.set_token(
        access_token=token.access_token,
        refresh_token=token.refresh_token,
        id_token=token.id_token,
        expiration_date=token.expires_at,
    )
    if status:
        self._token_refresh_dates.append(datetime.datetime.now(datetime.timezone.utc))
        log.debug(f"Access token has been refreshed. New expiration date is: {self.token_expiration_date}")
    return status

set_token(access_token, validate=False, **kwargs)

Sets the authorization tokens without the authentication process. It might be used when the tokens are obtained from external source.

We expect the access token to be JWT Bearer token. We also support the refresh token and the ID token as input parameters. If the validate flag is set to True, we will validate the access token's signature using the available public keys. Otherwise, we will just decode the access token to check its payload.

If token expiration date is not provided, it will be extracted from the access token payload. Only if the token is valid and not expired, it will be set.

Parameters:

Name Type Description Default
access_token str

Access token to be set.

required
validate bool

If set to True, the token will be validated. Default: False

False

Other Parameters:

Name Type Description
refresh_token str

Refresh token to be set.

id_token str

Refresh token to be set.

client_id str

Client ID to be set. If not provided, it will be extracted from the token's payload.

expiration_date datetime

Expiration date of the access token. If not provided, it will be extracted from the token's payload.

Returns:

Type Description
bool

True if provided token is set and valid. Otherwise, False.

Source code in reportconnectors/api_client/ciam/__init__.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def set_token(self, access_token: str, validate: bool = False, **kwargs) -> bool:
    """
    Sets the authorization tokens without the authentication process. It might be used when the
    tokens are obtained from external source.

    We expect the access token to be JWT Bearer token. We also support the refresh token and the ID token as input
    parameters. If the validate flag is set to True, we will validate the access token's signature
    using the available public keys. Otherwise, we will just decode the access token to check its payload.

    If token expiration date is not provided, it will be extracted from the access token payload.
    Only if the token is valid and not expired, it will be set.

    Args:
        access_token: Access token to be set.
        validate: If set to True, the token will be validated. Default: False

    Keyword Args:
        refresh_token (str): Refresh token to be set.
        id_token (str): Refresh token to be set.
        client_id (str): Client ID to be set. If not provided, it will be extracted from the token's payload.
        expiration_date (datetime.datetime): Expiration date of the access token. If not provided,
            it will be extracted from the token's payload.

    Returns:
        True if provided token is set and valid. Otherwise, False.
    """
    refresh_token: Optional[str] = kwargs.pop("refresh_token", None)
    id_token: Optional[str] = kwargs.pop("id_token", None)
    client_id: Optional[str] = kwargs.pop("client_id", None)
    expiration_date: Optional[datetime.datetime] = kwargs.pop("expiration_date", None)

    jwt_payload = self.decode_jwt_payload(token=access_token, validate=validate, **kwargs)
    if jwt_payload is None:
        log.error("Invalid token provided.")
        return False

    if expiration_date is None:
        expiration_date = jwt_payload.expiration_date
    if client_id is None:
        client_id = jwt_payload.client_id

    if expiration_date < datetime.datetime.now(datetime.timezone.utc):
        log.error(f"Token is expired. {expiration_date=}")
    # Set new tokens ...
    self.auth_data[self.KeyNames.ACCESS_TOKEN] = access_token
    self.auth_data[self.KeyNames.EXPIRATION_DATE] = expiration_date
    self.auth_data[self.KeyNames.CLIENT_ID] = client_id
    self.auth_data[self.KeyNames.JWT_PAYLOAD] = jwt_payload
    if refresh_token:
        self.auth_data[self.KeyNames.REFRESH_TOKEN] = refresh_token
    if id_token:
        self.auth_data[self.KeyNames.ID_TOKEN] = id_token
    return True