Skip to content

Drive DB Client based on Cosmos DB

Bases: DriveDBClient

Source code in reportconnectors/drive_db/cosmos_client.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
class CosmosDBClient(DriveDBClient):

    class Config(DriveDBClient.Config):
        DEFAULT_TABLE = "c"
        DEFAULT_DB = "analytics"
        RESULTS_CONTAINER = "results"
        INFO_CONTAINER = "info"

    def __init__(self, url: str, proxies: Optional[Dict] = None):
        self.url = url
        self.cosmos_client: Optional[CosmosClient] = None
        self.db_client: Optional[DatabaseProxy] = None
        self.container_client: Optional[ContainerProxy] = None
        self.proxies = proxies if proxies else {}

    def authenticate(
        self,
        token: str,
        db: Optional[str] = Config.DEFAULT_DB,
        container: Optional[str] = Config.RESULTS_CONTAINER,
    ) -> bool:
        if self.url is None:
            raise ValueError("URL to DB is not present")
        try:
            auth_dict = {"masterKey": f"{token}=="}
            connection_policy = self._build_cosmosdb_connection_policy(self.proxies)
            self.cosmos_client = CosmosClient(url=self.url, credential=auth_dict, connection_policy=connection_policy)
            if db:
                self.connect_to_database(name=db)
                log.info(f"Connected to {self.db_client}")
                if container:
                    self.connect_to_container(name=container)
                    log.info(f"Connected to {self.container_client}")
            return self.is_logged
        except HttpResponseError as e:
            log.exception(e)
            return False

    @property
    def is_logged(self) -> bool:
        if not self.cosmos_client or not self.db_client or not self.container_client:
            return False
        return bool(self.db_client.id) and bool(self.container_client.id)

    def logout(self) -> None:
        """
        Logs out the client
        """
        self.cosmos_client = None
        self.db_client = None
        self.container_client = None

    def list_databases(self) -> List:
        """
        Query for databases in CosmosDB.

        Returns:
            List of dictionaries with database properties
        """
        if self.cosmos_client:
            return list(self.cosmos_client.list_databases())
        else:
            return []

    def list_containers(self, database: Optional[str] = None) -> List:
        """
        Query for containers present in the connected database.

        Args:
            database: Name of the Database that is searched for containers. If not provided, current database is used.

        Returns:
            List of dictionaries with container properties.

        """
        if self.cosmos_client is None:
            return []

        db_client = self.db_client if database is None else self.cosmos_client.get_database_client(database)
        if db_client:
            return list(db_client.list_containers())
        else:
            return []

    def connect_to_database(self, name: str) -> None:
        """
        Connect CosmosDB client to database.

        Args:
            name: Identifier of the database that the client should connect to.

        Raises:
            ValueError: If the CosmosDB client is not initialized.
        """
        if self.cosmos_client is None:
            raise ValueError("CosmosDB client is not initialized")

        if self.db_client is None or (self.db_client and self.db_client.id != name):
            self.db_client = self.cosmos_client.get_database_client(name)
        return None

    def connect_to_container(self, name: str) -> None:
        """
        Connect database client to container.

        Args:
            name: Identifier of the container that the database client should connect to.

        Raises:
            ValueError: If the database client is not initialized.
        """
        if self.db_client is None:
            raise ValueError("Database client is not initialized")

        if self.container_client is None or (self.container_client and self.container_client.id != name):
            self.container_client = self.db_client.get_container_client(name)
        return None

    @look_in_container(name=Config.INFO_CONTAINER)
    @authorization_required
    def get_nameplate_details(self, serial_number: str, **kwargs) -> Union[List, Dict]:
        """
        Gets a nameplate details of for Drive identified with the given serial number.

        Args:
            serial_number: Drive serial number

        Keyword Args:
            columns (Optional[List[str]]): List of columns to return. If not provided, all columns are returned.
            partition (str): Partition name. Default is "drives".

        Returns:
            Nameplate details dictionary.
        """
        t = self.Config.DEFAULT_TABLE
        columns: Optional[List[str]] = kwargs.pop("columns", None)
        partition: str = kwargs.pop("partition", self.Config.DRIVES_PARTITION)

        parsed_columns = self._parse_columns(columns, t)
        query = f"SELECT {parsed_columns} FROM {t} WHERE {t}.partition = @partition AND {t}.id = @serial_number"

        parameters = {"partition": partition, "serial_number": serial_number}
        result = self._query_documents(query=query, parameters=parameters, **kwargs)
        return result

    @look_in_container(name=Config.INFO_CONTAINER)
    @authorization_required
    def list_nameplate_details(self, serial_numbers: Optional[Sequence[str]] = None, **kwargs) -> List[Dict]:
        """
        Gets a list of nameplate details data for drives identified by the provided list of serial numbers.

        Args:
            serial_numbers: Sequence of serial numbers

        Keyword Args:
            columns (Optional[List[str]]): List of columns to return. If not provided, all columns are returned.
            partition (str): Partition name. Default is "drives".

        Returns:
            List of the nameplate details dictionaries.
        """

        serial_numbers = serial_numbers if serial_numbers else []
        t = self.Config.DEFAULT_TABLE

        columns: Optional[List[str]] = kwargs.pop("columns", None)
        partition: str = kwargs.pop("partition", self.Config.DRIVES_PARTITION)

        parsed_columns = self._parse_columns(columns, t)
        query = f"SELECT {parsed_columns} FROM {t} WHERE {t}.partition = @partition"
        parameters = {"partition": partition}

        if serial_numbers:
            query += f" AND {t}.id IN (@serial_numbers)"
            parameters["serial_numbers"] = ", ".join(f'"{s}"' for s in serial_numbers)

        result = self._query_documents(query=query, parameters=parameters, **kwargs)
        return result

    @look_in_container(name=Config.RESULTS_CONTAINER)
    @authorization_required
    def list_available_serial_numbers(self, **kwargs) -> List:
        """
        Lists available serial numbers in the current container.

        Keyword Args:
            column (str): Name of the column with serial numbers. Default is "serialNumber".

        Returns:
            List of available serial numbers.
        """

        t = self.Config.DEFAULT_TABLE
        column: str = kwargs.pop("column", self.Config.SERIAL_NUMBER_COLUMN)

        query = f'SELECT DISTINCT VALUE {t}["{column}"] FROM {t}'
        result = self._query_documents(query=query, **kwargs)
        return result

    @look_in_container(name=Config.RESULTS_CONTAINER)
    @authorization_required
    def get_available_date_ranges(
        self, serial_number: str, data_mode: Literal["all", "null", "valid"] = "all", **kwargs
    ) -> List[Tuple[datetime.datetime, datetime.datetime]]:
        """
        Gets available date ranges as a list of tuples with continuous time ranges of available data.

        Args:
            serial_number: Drive Serial Number
            data_mode: Defines what kind of documents are considered as available data.
                "valid" - only documents with _search_values_ != null, are included
                "null" - only documents with _search_values_ == null, are included
                "all" - (default) all found documents are included.

        Keyword Args:
            add_one_day (bool): If True, the end date of the range is increased by one day.
            search_column (str): Name of the column to search for. Default is "InherentAvailability".

        Return:
            List of Tuples with start and end datetimes of available data ranges.
        """
        add_one_day = kwargs.pop("add_one_day", True)
        search_column = kwargs.pop("search_column", self.Config.DEFAULT_SEARCH_KPI)

        t = self.Config.DEFAULT_TABLE

        _search_column_name = self.Config.DEFAULT_SEARCH_COLUMN
        _device_id_column = self.Config.SERIAL_NUMBER_COLUMN
        _kpi_name_column = self.Config.KPI_NAME_COLUMN
        _details_column = self.Config.DETAILS_COLUMN
        _date_column = self.Config.DATE_COLUMN

        query = f"""
            SELECT VALUE ({t}["{_date_column}"])
            FROM {t} WHERE {t}["{_device_id_column}"] = @serial_number
            AND {t}["{_kpi_name_column}"] = @search_column
        """
        parameters = {"serial_number": serial_number, "search_column": search_column}
        if data_mode == "null":
            query += f' AND {t}["{_details_column}"]["{_search_column_name}"] = null'
        elif data_mode == "valid":
            query += f' AND {t}["{_details_column}"]["{_search_column_name}"] != null'

        result = self._query_documents(query=query, parameters=parameters, **kwargs)
        dates = [datetime.datetime.strptime(element, self.Config.KPI_DATE_FORMAT) for element in result]
        datetime_groups = list(group_dates_to_ranges(dates=dates, add_one_day=add_one_day))
        return datetime_groups

    @look_in_container(name=Config.RESULTS_CONTAINER)
    @authorization_required
    def get_kpi_values(
        self,
        serial_number: str,
        kpi_name: str,
        start_date: Optional[datetime.datetime] = None,
        end_date: Optional[datetime.datetime] = None,
        **kwargs,
    ) -> List[Dict]:
        """
        Gets KPI values for a given serial number and KPI name. Optionally, the search can be limited to a specific
        date range.

        Args:
            serial_number: Drive Serial Number
            kpi_name: KPI Name
            start_date: Start date of the search range. If not provided, there is no limit on the start date.
            end_date: End date of the search range. If not provided, there is no limit on the end date.

        Keyword Args:
            result_type (str): Result type of the KPI value. If not provided, all result types are included.
            columns (List[str]): List of columns to return. If not provided, all columns are returned.
            order_by (str): Column name to order the results by. Default is "date".

        Returns:
            List of dictionaries with KPI values.

        """

        columns: List[str] = kwargs.pop("columns", self.Config.DEFAULT_KPI_COLUMNS)
        result_type: Optional[str] = kwargs.pop("result_type", None)
        order_by: str = kwargs.pop("order_by", self.Config.DATE_COLUMN)

        t = self.Config.DEFAULT_TABLE
        sn_column = self.Config.SERIAL_NUMBER_COLUMN
        kpi_column = self.Config.KPI_NAME_COLUMN
        rt_column = self.Config.RESULT_TYPE_COLUMN
        date_column = self.Config.DATE_COLUMN

        parsed_columns = self._parse_columns(columns, t)
        query = f"""
            SELECT {parsed_columns} FROM {t} 
            WHERE {t}["{sn_column}"] = @serial_number AND {t}["{kpi_column}"] = @kpi_name 
        """
        parameters = {"serial_number": serial_number, "kpi_name": kpi_name}

        if result_type:
            query += f'AND {t}["{rt_column}"] = "@result_type" '
            parameters["result_type"] = result_type
        if start_date:
            query += f'AND {t}["{date_column}"] >= @start_date '
            parameters["start_date"] = start_date.strftime(self.Config.KPI_DATE_FORMAT)
        if end_date:
            query += f'AND {t}["{date_column}"] <= @end_date '
            parameters["end_date"] = end_date.strftime(self.Config.KPI_DATE_FORMAT)
        if order_by:
            query += f'ORDER BY {t}["{order_by}"]'

        result = self._query_documents(query=query, parameters=parameters, **kwargs)
        return result

    @look_in_container(name=Config.RESULTS_CONTAINER)
    @authorization_required
    def get_scatter_data(
        self,
        serial_number: str,
        table_name: str,
        start_date: datetime.datetime,
        end_date: datetime.datetime,
        **kwargs,
    ) -> List[List]:
        """
        Gets scatter data for a given serial number from a given table name. The search is limited to a specific
        date range.

        Args:
            serial_number: Drive Serial Number
            table_name: Name of the table to search for.
            start_date: Start date of the search range.
            end_date: End date of the search range.

        Keyword Args:
            agg_function (str): Aggregation function to use. Default is "sum".
                Other options are "avg", "min", "max" and "count".

            x_index (int): Index of the x-axis value in the data array. Default is 0.
            y_index (int): Index of the y-axis value in the data array. Default is 1.
            z_index (int): Index of the z-axis value in the data array. Default is 2.

        Returns:

        """
        agg_function: str = str(kwargs.pop("agg_function", "sum"))
        x_index: int = kwargs.pop("x_index", 0)
        y_index: int = kwargs.pop("y_index", 1)
        z_index: int = kwargs.pop("z_index", 2)

        agg = self._parse_agg_function(agg_function)
        t = self.Config.DEFAULT_TABLE
        data_location = f'["{self.Config.DETAILS_COLUMN}"]["data"]'

        query = f"""
            SELECT VALUE [g.x, g.y, g.c]
            FROM (
                SELECT e.x, e.y, {agg}(e.z) as c
                FROM(
                    SELECT i[{x_index}] as x, i[{y_index}] as y, i[{z_index}] as z
                    FROM {t} JOIN i IN {t}{data_location}
                    WHERE {t}["{self.Config.SERIAL_NUMBER_COLUMN}"] = @serial_number
                    AND {t}["{self.Config.KPI_NAME_COLUMN}"] = @table_name
                    AND {t}["{self.Config.DATE_COLUMN}"] >= @start_date
                    AND {t}["{self.Config.DATE_COLUMN}"] <= @end_date) as e
                GROUP BY e.x, e.y) as g
            """
        parameters = {
            "serial_number": serial_number,
            "table_name": table_name,
            "start_date": start_date.strftime(self.Config.KPI_DATE_FORMAT),
            "end_date": end_date.strftime(self.Config.KPI_DATE_FORMAT),
        }

        result = self._query_documents(query=query, parameters=parameters, **kwargs)
        # For unknown reasons, the results are not unique and full aggregated.
        # I don't have time to debug the internals of CosmosDB, so I will aggregate the results here.
        aggregated_result = aggregate_by_xy(result, agg_function=agg_function)

        return aggregated_result

    @look_in_container(name=Config.RESULTS_CONTAINER)
    @authorization_required
    def list_column_names(
        self,
        serial_number: str,
        table_name: str,
        start_date: Optional[datetime.datetime] = None,
        end_date: Optional[datetime.datetime] = None,
        **kwargs,
    ) -> List[str]:
        """
        Lists column names for a given serial number and table name. Optionally, the search can be limited to a specific
        date range.

        Args:
            serial_number: Drive Serial Number
            table_name: Name of the table to search for.
            start_date: Start date of the search range. If not provided, there is no limit on the start date.
            end_date: End date of the search range. If not provided, there is no limit on the end date.

        Returns:
            List of column names.
        """

        t = self.Config.DEFAULT_TABLE
        dt_column = self.Config.DATE_COLUMN
        columns_location = "details.columns"
        index_location = "details.index"

        query = f"""
            SELECT DISTINCT VALUE {t}.{columns_location}
            FROM {t}
            WHERE {t}["{self.Config.SERIAL_NUMBER_COLUMN}"] = @serial_number
            AND {t}.{index_location} != []
            AND {t}["{self.Config.KPI_NAME_COLUMN}"] = @table_name
            """
        parameters = {
            "serial_number": serial_number,
            "table_name": table_name,
        }
        if start_date:
            query += f' AND {t}["{dt_column}"] >= @start_date '
            parameters["start_date"] = start_date.strftime(self.Config.KPI_DATE_FORMAT)
        if end_date:
            query += f' AND {t}["{dt_column}"] <= @end_date '
            parameters["end_date"] = end_date.strftime(self.Config.KPI_DATE_FORMAT)

        result = self._query_documents(query=query, parameters=parameters, **kwargs)
        result = result[0] if len(result) > 0 else []
        return result

    def _query_documents(
        self,
        query: str,
        parameters: Optional[Dict[str, str]] = None,
        partition_key: Optional[str] = None,
        container: Optional[ContainerProxy] = None,
        **kwargs,
    ) -> List:
        """
        Query for documents in the CosmosDB's container. The query supports parameters and partition key.
        If the partition key is not provided, the query is executed as a cross-partition query.
        If container is not provided, the default container is used.

        Args:
            query: Query string
            parameters: Dictionary with parameters. Key is the parameter name, value is the parameter value.
            partition_key: Partition key. If not provided, the query is executed as a cross-partition query.
            container: Container Proxy to search in. If not provided, the default container is used.

        Keyword Args:
            flatten (bool): If True, the result is flattened to a list of dictionaries. Default is False.

        Raises:
            ValueError: If the container client is not initialized.

        Returns:
            List of documents.
        """

        flatten: bool = kwargs.get("flatten", False)
        enable_cross_partition_query = True if partition_key is None else False
        container_client = container if isinstance(container, ContainerProxy) else self.container_client
        if container_client is None:
            raise ValueError("Container client is not initialized")

        # Query Builder
        parsed_parameters = self._build_parameters_dict(**parameters) if parameters else None
        log.debug(f"Query: {query}")
        if parsed_parameters:
            log.debug(f"Parameters: {parameters}")

        try:
            result = container_client.query_items(
                query=query,
                parameters=parsed_parameters,
                partition_key=partition_key,
                enable_cross_partition_query=enable_cross_partition_query,
            )
            parsed_result = [flatten_dictionary(element) for element in result] if flatten else list(result)
            return parsed_result
        except (HttpResponseError, KeyError) as e:
            log.exception(e)
            return []

    @classmethod
    def _build_cosmosdb_connection_policy(cls, proxies: Dict[str, str]) -> ConnectionPolicy:
        """
        Builds CosmosDB connection policy object with proxy configuration.

        Args:
            proxies: Dictionary with proxy configuration.

        Returns:
            ConnectionPolicy object with proxy configuration
        """
        proxy_url = proxies.get("https") or proxies.get("http")
        parsed = urlparse(proxy_url)
        connection_policy = ConnectionPolicy()
        if parsed.hostname and parsed.port:
            _scheme = str(parsed.scheme) or "https"
            _hostname = str(parsed.hostname)
            connection_policy.ProxyConfiguration = ProxyConfiguration()
            connection_policy.ProxyConfiguration.Host = f"{_scheme}://{_hostname}"
            connection_policy.ProxyConfiguration.Port = parsed.port
        return connection_policy

    @classmethod
    def _parse_columns(cls, columns: Union[None, str, List[str]], table: str = Config.DEFAULT_TABLE) -> str:
        """
        Parses columns to a string that can be used in a query. If columns is a string, it is returned as is.
        If columns is None, all columns are returned. If columns is a list, the columns are joined with the table name.

        Args:
            columns: Columns to parse
            table: Table name to prefix the columns with.

        Returns:
            Parsed columns string.
        """
        if isinstance(columns, str):
            return columns
        if columns is None:
            return "*"
        parsed_columns = ", ".join([f'{table}["{col}"]' for col in columns])
        return parsed_columns

    @classmethod
    def _parse_agg_function(cls, agg_function: str, default: str = "sum") -> str:
        """
        Parses aggregation function to a valid CosmosDB SQL aggregation function. If the function is not recognized,
        "sum" is returned.

        Args:
            agg_function: Aggregation function to parse.
            default: Default aggregation function to return if the function is not recognized. Default is "sum".

        Returns:
            Parsed aggregation function string.
        """
        if agg_function.casefold() in ("sum", "avg", "min", "max", "count"):
            return agg_function.casefold()
        return default

    @classmethod
    def _build_parameters_dict(cls, **parameters) -> List[Dict[str, Any]]:
        """
        Converts parameters dictionary to a list of dictionaries with "name" and "value" keys which are expected by
        the CosmosDB query_items method.

        Examples:
            >>> CosmosDBClient._build_parameters_dict(a=1, b="test")
            [{"name": "@a", "value": 1}, {"name": "@b", "value": "test"}]

        Returns:
            List of dictionaries with "name" and "value" keys.
        """
        return [{"name": f"@{key}", "value": value} for key, value in parameters.items()]

connect_to_container(name)

Connect database client to container.

Parameters:

Name Type Description Default
name str

Identifier of the container that the database client should connect to.

required

Raises:

Type Description
ValueError

If the database client is not initialized.

Source code in reportconnectors/drive_db/cosmos_client.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def connect_to_container(self, name: str) -> None:
    """
    Connect database client to container.

    Args:
        name: Identifier of the container that the database client should connect to.

    Raises:
        ValueError: If the database client is not initialized.
    """
    if self.db_client is None:
        raise ValueError("Database client is not initialized")

    if self.container_client is None or (self.container_client and self.container_client.id != name):
        self.container_client = self.db_client.get_container_client(name)
    return None

connect_to_database(name)

Connect CosmosDB client to database.

Parameters:

Name Type Description Default
name str

Identifier of the database that the client should connect to.

required

Raises:

Type Description
ValueError

If the CosmosDB client is not initialized.

Source code in reportconnectors/drive_db/cosmos_client.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def connect_to_database(self, name: str) -> None:
    """
    Connect CosmosDB client to database.

    Args:
        name: Identifier of the database that the client should connect to.

    Raises:
        ValueError: If the CosmosDB client is not initialized.
    """
    if self.cosmos_client is None:
        raise ValueError("CosmosDB client is not initialized")

    if self.db_client is None or (self.db_client and self.db_client.id != name):
        self.db_client = self.cosmos_client.get_database_client(name)
    return None

get_available_date_ranges(serial_number, data_mode='all', **kwargs)

Gets available date ranges as a list of tuples with continuous time ranges of available data.

Parameters:

Name Type Description Default
serial_number str

Drive Serial Number

required
data_mode Literal['all', 'null', 'valid']

Defines what kind of documents are considered as available data. "valid" - only documents with search_values != null, are included "null" - only documents with search_values == null, are included "all" - (default) all found documents are included.

'all'

Other Parameters:

Name Type Description
add_one_day bool

If True, the end date of the range is increased by one day.

search_column str

Name of the column to search for. Default is "InherentAvailability".

Return

List of Tuples with start and end datetimes of available data ranges.

Source code in reportconnectors/drive_db/cosmos_client.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
@look_in_container(name=Config.RESULTS_CONTAINER)
@authorization_required
def get_available_date_ranges(
    self, serial_number: str, data_mode: Literal["all", "null", "valid"] = "all", **kwargs
) -> List[Tuple[datetime.datetime, datetime.datetime]]:
    """
    Gets available date ranges as a list of tuples with continuous time ranges of available data.

    Args:
        serial_number: Drive Serial Number
        data_mode: Defines what kind of documents are considered as available data.
            "valid" - only documents with _search_values_ != null, are included
            "null" - only documents with _search_values_ == null, are included
            "all" - (default) all found documents are included.

    Keyword Args:
        add_one_day (bool): If True, the end date of the range is increased by one day.
        search_column (str): Name of the column to search for. Default is "InherentAvailability".

    Return:
        List of Tuples with start and end datetimes of available data ranges.
    """
    add_one_day = kwargs.pop("add_one_day", True)
    search_column = kwargs.pop("search_column", self.Config.DEFAULT_SEARCH_KPI)

    t = self.Config.DEFAULT_TABLE

    _search_column_name = self.Config.DEFAULT_SEARCH_COLUMN
    _device_id_column = self.Config.SERIAL_NUMBER_COLUMN
    _kpi_name_column = self.Config.KPI_NAME_COLUMN
    _details_column = self.Config.DETAILS_COLUMN
    _date_column = self.Config.DATE_COLUMN

    query = f"""
        SELECT VALUE ({t}["{_date_column}"])
        FROM {t} WHERE {t}["{_device_id_column}"] = @serial_number
        AND {t}["{_kpi_name_column}"] = @search_column
    """
    parameters = {"serial_number": serial_number, "search_column": search_column}
    if data_mode == "null":
        query += f' AND {t}["{_details_column}"]["{_search_column_name}"] = null'
    elif data_mode == "valid":
        query += f' AND {t}["{_details_column}"]["{_search_column_name}"] != null'

    result = self._query_documents(query=query, parameters=parameters, **kwargs)
    dates = [datetime.datetime.strptime(element, self.Config.KPI_DATE_FORMAT) for element in result]
    datetime_groups = list(group_dates_to_ranges(dates=dates, add_one_day=add_one_day))
    return datetime_groups

get_kpi_values(serial_number, kpi_name, start_date=None, end_date=None, **kwargs)

Gets KPI values for a given serial number and KPI name. Optionally, the search can be limited to a specific date range.

Parameters:

Name Type Description Default
serial_number str

Drive Serial Number

required
kpi_name str

KPI Name

required
start_date Optional[datetime]

Start date of the search range. If not provided, there is no limit on the start date.

None
end_date Optional[datetime]

End date of the search range. If not provided, there is no limit on the end date.

None

Other Parameters:

Name Type Description
result_type str

Result type of the KPI value. If not provided, all result types are included.

columns List[str]

List of columns to return. If not provided, all columns are returned.

order_by str

Column name to order the results by. Default is "date".

Returns:

Type Description
List[Dict]

List of dictionaries with KPI values.

Source code in reportconnectors/drive_db/cosmos_client.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
@look_in_container(name=Config.RESULTS_CONTAINER)
@authorization_required
def get_kpi_values(
    self,
    serial_number: str,
    kpi_name: str,
    start_date: Optional[datetime.datetime] = None,
    end_date: Optional[datetime.datetime] = None,
    **kwargs,
) -> List[Dict]:
    """
    Gets KPI values for a given serial number and KPI name. Optionally, the search can be limited to a specific
    date range.

    Args:
        serial_number: Drive Serial Number
        kpi_name: KPI Name
        start_date: Start date of the search range. If not provided, there is no limit on the start date.
        end_date: End date of the search range. If not provided, there is no limit on the end date.

    Keyword Args:
        result_type (str): Result type of the KPI value. If not provided, all result types are included.
        columns (List[str]): List of columns to return. If not provided, all columns are returned.
        order_by (str): Column name to order the results by. Default is "date".

    Returns:
        List of dictionaries with KPI values.

    """

    columns: List[str] = kwargs.pop("columns", self.Config.DEFAULT_KPI_COLUMNS)
    result_type: Optional[str] = kwargs.pop("result_type", None)
    order_by: str = kwargs.pop("order_by", self.Config.DATE_COLUMN)

    t = self.Config.DEFAULT_TABLE
    sn_column = self.Config.SERIAL_NUMBER_COLUMN
    kpi_column = self.Config.KPI_NAME_COLUMN
    rt_column = self.Config.RESULT_TYPE_COLUMN
    date_column = self.Config.DATE_COLUMN

    parsed_columns = self._parse_columns(columns, t)
    query = f"""
        SELECT {parsed_columns} FROM {t} 
        WHERE {t}["{sn_column}"] = @serial_number AND {t}["{kpi_column}"] = @kpi_name 
    """
    parameters = {"serial_number": serial_number, "kpi_name": kpi_name}

    if result_type:
        query += f'AND {t}["{rt_column}"] = "@result_type" '
        parameters["result_type"] = result_type
    if start_date:
        query += f'AND {t}["{date_column}"] >= @start_date '
        parameters["start_date"] = start_date.strftime(self.Config.KPI_DATE_FORMAT)
    if end_date:
        query += f'AND {t}["{date_column}"] <= @end_date '
        parameters["end_date"] = end_date.strftime(self.Config.KPI_DATE_FORMAT)
    if order_by:
        query += f'ORDER BY {t}["{order_by}"]'

    result = self._query_documents(query=query, parameters=parameters, **kwargs)
    return result

get_nameplate_details(serial_number, **kwargs)

Gets a nameplate details of for Drive identified with the given serial number.

Parameters:

Name Type Description Default
serial_number str

Drive serial number

required

Other Parameters:

Name Type Description
columns Optional[List[str]]

List of columns to return. If not provided, all columns are returned.

partition str

Partition name. Default is "drives".

Returns:

Type Description
Union[List, Dict]

Nameplate details dictionary.

Source code in reportconnectors/drive_db/cosmos_client.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@look_in_container(name=Config.INFO_CONTAINER)
@authorization_required
def get_nameplate_details(self, serial_number: str, **kwargs) -> Union[List, Dict]:
    """
    Gets a nameplate details of for Drive identified with the given serial number.

    Args:
        serial_number: Drive serial number

    Keyword Args:
        columns (Optional[List[str]]): List of columns to return. If not provided, all columns are returned.
        partition (str): Partition name. Default is "drives".

    Returns:
        Nameplate details dictionary.
    """
    t = self.Config.DEFAULT_TABLE
    columns: Optional[List[str]] = kwargs.pop("columns", None)
    partition: str = kwargs.pop("partition", self.Config.DRIVES_PARTITION)

    parsed_columns = self._parse_columns(columns, t)
    query = f"SELECT {parsed_columns} FROM {t} WHERE {t}.partition = @partition AND {t}.id = @serial_number"

    parameters = {"partition": partition, "serial_number": serial_number}
    result = self._query_documents(query=query, parameters=parameters, **kwargs)
    return result

get_scatter_data(serial_number, table_name, start_date, end_date, **kwargs)

Gets scatter data for a given serial number from a given table name. The search is limited to a specific date range.

Parameters:

Name Type Description Default
serial_number str

Drive Serial Number

required
table_name str

Name of the table to search for.

required
start_date datetime

Start date of the search range.

required
end_date datetime

End date of the search range.

required

Other Parameters:

Name Type Description
agg_function str

Aggregation function to use. Default is "sum". Other options are "avg", "min", "max" and "count".

x_index int

Index of the x-axis value in the data array. Default is 0.

y_index int

Index of the y-axis value in the data array. Default is 1.

z_index int

Index of the z-axis value in the data array. Default is 2.

Returns:

Source code in reportconnectors/drive_db/cosmos_client.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
@look_in_container(name=Config.RESULTS_CONTAINER)
@authorization_required
def get_scatter_data(
    self,
    serial_number: str,
    table_name: str,
    start_date: datetime.datetime,
    end_date: datetime.datetime,
    **kwargs,
) -> List[List]:
    """
    Gets scatter data for a given serial number from a given table name. The search is limited to a specific
    date range.

    Args:
        serial_number: Drive Serial Number
        table_name: Name of the table to search for.
        start_date: Start date of the search range.
        end_date: End date of the search range.

    Keyword Args:
        agg_function (str): Aggregation function to use. Default is "sum".
            Other options are "avg", "min", "max" and "count".

        x_index (int): Index of the x-axis value in the data array. Default is 0.
        y_index (int): Index of the y-axis value in the data array. Default is 1.
        z_index (int): Index of the z-axis value in the data array. Default is 2.

    Returns:

    """
    agg_function: str = str(kwargs.pop("agg_function", "sum"))
    x_index: int = kwargs.pop("x_index", 0)
    y_index: int = kwargs.pop("y_index", 1)
    z_index: int = kwargs.pop("z_index", 2)

    agg = self._parse_agg_function(agg_function)
    t = self.Config.DEFAULT_TABLE
    data_location = f'["{self.Config.DETAILS_COLUMN}"]["data"]'

    query = f"""
        SELECT VALUE [g.x, g.y, g.c]
        FROM (
            SELECT e.x, e.y, {agg}(e.z) as c
            FROM(
                SELECT i[{x_index}] as x, i[{y_index}] as y, i[{z_index}] as z
                FROM {t} JOIN i IN {t}{data_location}
                WHERE {t}["{self.Config.SERIAL_NUMBER_COLUMN}"] = @serial_number
                AND {t}["{self.Config.KPI_NAME_COLUMN}"] = @table_name
                AND {t}["{self.Config.DATE_COLUMN}"] >= @start_date
                AND {t}["{self.Config.DATE_COLUMN}"] <= @end_date) as e
            GROUP BY e.x, e.y) as g
        """
    parameters = {
        "serial_number": serial_number,
        "table_name": table_name,
        "start_date": start_date.strftime(self.Config.KPI_DATE_FORMAT),
        "end_date": end_date.strftime(self.Config.KPI_DATE_FORMAT),
    }

    result = self._query_documents(query=query, parameters=parameters, **kwargs)
    # For unknown reasons, the results are not unique and full aggregated.
    # I don't have time to debug the internals of CosmosDB, so I will aggregate the results here.
    aggregated_result = aggregate_by_xy(result, agg_function=agg_function)

    return aggregated_result

list_available_serial_numbers(**kwargs)

Lists available serial numbers in the current container.

Other Parameters:

Name Type Description
column str

Name of the column with serial numbers. Default is "serialNumber".

Returns:

Type Description
List

List of available serial numbers.

Source code in reportconnectors/drive_db/cosmos_client.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@look_in_container(name=Config.RESULTS_CONTAINER)
@authorization_required
def list_available_serial_numbers(self, **kwargs) -> List:
    """
    Lists available serial numbers in the current container.

    Keyword Args:
        column (str): Name of the column with serial numbers. Default is "serialNumber".

    Returns:
        List of available serial numbers.
    """

    t = self.Config.DEFAULT_TABLE
    column: str = kwargs.pop("column", self.Config.SERIAL_NUMBER_COLUMN)

    query = f'SELECT DISTINCT VALUE {t}["{column}"] FROM {t}'
    result = self._query_documents(query=query, **kwargs)
    return result

list_column_names(serial_number, table_name, start_date=None, end_date=None, **kwargs)

Lists column names for a given serial number and table name. Optionally, the search can be limited to a specific date range.

Parameters:

Name Type Description Default
serial_number str

Drive Serial Number

required
table_name str

Name of the table to search for.

required
start_date Optional[datetime]

Start date of the search range. If not provided, there is no limit on the start date.

None
end_date Optional[datetime]

End date of the search range. If not provided, there is no limit on the end date.

None

Returns:

Type Description
List[str]

List of column names.

Source code in reportconnectors/drive_db/cosmos_client.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
@look_in_container(name=Config.RESULTS_CONTAINER)
@authorization_required
def list_column_names(
    self,
    serial_number: str,
    table_name: str,
    start_date: Optional[datetime.datetime] = None,
    end_date: Optional[datetime.datetime] = None,
    **kwargs,
) -> List[str]:
    """
    Lists column names for a given serial number and table name. Optionally, the search can be limited to a specific
    date range.

    Args:
        serial_number: Drive Serial Number
        table_name: Name of the table to search for.
        start_date: Start date of the search range. If not provided, there is no limit on the start date.
        end_date: End date of the search range. If not provided, there is no limit on the end date.

    Returns:
        List of column names.
    """

    t = self.Config.DEFAULT_TABLE
    dt_column = self.Config.DATE_COLUMN
    columns_location = "details.columns"
    index_location = "details.index"

    query = f"""
        SELECT DISTINCT VALUE {t}.{columns_location}
        FROM {t}
        WHERE {t}["{self.Config.SERIAL_NUMBER_COLUMN}"] = @serial_number
        AND {t}.{index_location} != []
        AND {t}["{self.Config.KPI_NAME_COLUMN}"] = @table_name
        """
    parameters = {
        "serial_number": serial_number,
        "table_name": table_name,
    }
    if start_date:
        query += f' AND {t}["{dt_column}"] >= @start_date '
        parameters["start_date"] = start_date.strftime(self.Config.KPI_DATE_FORMAT)
    if end_date:
        query += f' AND {t}["{dt_column}"] <= @end_date '
        parameters["end_date"] = end_date.strftime(self.Config.KPI_DATE_FORMAT)

    result = self._query_documents(query=query, parameters=parameters, **kwargs)
    result = result[0] if len(result) > 0 else []
    return result

list_containers(database=None)

Query for containers present in the connected database.

Parameters:

Name Type Description Default
database Optional[str]

Name of the Database that is searched for containers. If not provided, current database is used.

None

Returns:

Type Description
List

List of dictionaries with container properties.

Source code in reportconnectors/drive_db/cosmos_client.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def list_containers(self, database: Optional[str] = None) -> List:
    """
    Query for containers present in the connected database.

    Args:
        database: Name of the Database that is searched for containers. If not provided, current database is used.

    Returns:
        List of dictionaries with container properties.

    """
    if self.cosmos_client is None:
        return []

    db_client = self.db_client if database is None else self.cosmos_client.get_database_client(database)
    if db_client:
        return list(db_client.list_containers())
    else:
        return []

list_databases()

Query for databases in CosmosDB.

Returns:

Type Description
List

List of dictionaries with database properties

Source code in reportconnectors/drive_db/cosmos_client.py
75
76
77
78
79
80
81
82
83
84
85
def list_databases(self) -> List:
    """
    Query for databases in CosmosDB.

    Returns:
        List of dictionaries with database properties
    """
    if self.cosmos_client:
        return list(self.cosmos_client.list_databases())
    else:
        return []

list_nameplate_details(serial_numbers=None, **kwargs)

Gets a list of nameplate details data for drives identified by the provided list of serial numbers.

Parameters:

Name Type Description Default
serial_numbers Optional[Sequence[str]]

Sequence of serial numbers

None

Other Parameters:

Name Type Description
columns Optional[List[str]]

List of columns to return. If not provided, all columns are returned.

partition str

Partition name. Default is "drives".

Returns:

Type Description
List[Dict]

List of the nameplate details dictionaries.

Source code in reportconnectors/drive_db/cosmos_client.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@look_in_container(name=Config.INFO_CONTAINER)
@authorization_required
def list_nameplate_details(self, serial_numbers: Optional[Sequence[str]] = None, **kwargs) -> List[Dict]:
    """
    Gets a list of nameplate details data for drives identified by the provided list of serial numbers.

    Args:
        serial_numbers: Sequence of serial numbers

    Keyword Args:
        columns (Optional[List[str]]): List of columns to return. If not provided, all columns are returned.
        partition (str): Partition name. Default is "drives".

    Returns:
        List of the nameplate details dictionaries.
    """

    serial_numbers = serial_numbers if serial_numbers else []
    t = self.Config.DEFAULT_TABLE

    columns: Optional[List[str]] = kwargs.pop("columns", None)
    partition: str = kwargs.pop("partition", self.Config.DRIVES_PARTITION)

    parsed_columns = self._parse_columns(columns, t)
    query = f"SELECT {parsed_columns} FROM {t} WHERE {t}.partition = @partition"
    parameters = {"partition": partition}

    if serial_numbers:
        query += f" AND {t}.id IN (@serial_numbers)"
        parameters["serial_numbers"] = ", ".join(f'"{s}"' for s in serial_numbers)

    result = self._query_documents(query=query, parameters=parameters, **kwargs)
    return result

logout()

Logs out the client

Source code in reportconnectors/drive_db/cosmos_client.py
67
68
69
70
71
72
73
def logout(self) -> None:
    """
    Logs out the client
    """
    self.cosmos_client = None
    self.db_client = None
    self.container_client = None