Calculate metric values¶
To calculate custom metric values, you can use MetricEvaluator
to calculate metric values over time, BatchMetricEvaluator
to calculate metric values per batch, or IndividualMetricEvaluator
to evaluate metrics without data aggregation.
Evaluate metrics¶
The MetricEvaluator
class calculates metric values over time using the selected source. This class is used to "stream" data through the metric object, generating metric values. Initialize the MetricEvaluator
with the following mandatory parameters:
from dmm import MetricEvaluator, TimeBucket
from dmm.data_source.datarobot_source import DataRobotSource
from dmm.metric import MedianAbsoluteError
source = DataRobotSource(
deployment_id=DEPLOYMENT_ID,
start=datetime.utcnow() - timedelta(weeks=1),
end=datetime.utcnow(),
)
metric = MedianAbsoluteError()
metric_evaluator = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.MINUTE)
To use MetricEvaluator
, create a metric class implementing the MetricBase
interface and a source implementing DataSourceBase
. Then, specify the level of aggregation granularity. Initialize MetricEvaluator
with all parameters:
from dmm import ColumnName, MetricEvaluator, TimeBucket
metric_evaluator = MetricEvaluator(
metric=metric,
source=source,
time_bucket=TimeBucket.HOUR,
prediction_col=ColumnName.PREDICTIONS,
actuals_col=ColumnName.ACTUALS,
timestamp_col=ColumnName.TIMESTAMP,
filter_actuals=False,
filter_predictions=False,
filter_scoring_data=False,
segment_attribute=None,
segment_value=None,
)
Parameter | Description |
---|---|
metric: Union[str, MetricBase, List[str], List[MetricBase]] |
If a string or list of strings is passed, MetricEvaluator will look for matched Sklearn metrics. If a metric or a list of objects is passed, they must implement the MetricBase interface. |
source: DataSourceBase |
The source to pull the data from, DataRobotSource or DataFrameSource or other sources that implement the DataSourceBase interface. |
time_bucket: TimeBucket |
The time-bucket size to use for evaluating metrics, determining the granularity of aggregation. |
prediction_col: Optional[str] |
The name of the column that contains predictions. |
actuals_col: Optional[str] |
The name of the column that contains actuals. |
timestamp_col: Optional[str] |
The name of the column that contains timestamps. |
filter_actuals: Optional[bool] |
Whether the metric evaluator removes missing actuals values before scoring. True removes missing actuals; the default value is False . |
filter_predictions: Optional[bool] |
Whether the metric evaluator removes missing predictions values before scoring. True removes missing predictions; the default value is False . |
filter_scoring_data: Optional[bool] |
Whether the metric evaluator removes missing scoring values before scoring. True removes missing scoring values; the default value is False . |
segment_attribute: Optional[str] |
The name of the column with segment values. |
segment_value: Optional[Union[str or List[str]]] |
A single value or a list of values of the segment attribute to segment on. |
The score
method returns a metric aggregated as defined by TimeBucket
. The output returned as a pandas DataFrame contains the results per time bucket for all data from the source.
source = DataRobotSource(
deployment_id=DEPLOYMENT_ID,
start=datetime.utcnow() - timedelta(hours=3),
end=datetime.utcnow(),
)
metric = LogLossFromSklearn()
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)
aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples log_loss
0 2023-09-14 13:29:48.065000+00:00 499 0.539315
1 2023-09-14 14:01:51.484000+00:00 499 0.539397
# we can see the evaluator's statistics
stats = me.stats()
print(stats)
total rows: 998, score calls: 2, reduce calls: 2
To pass more than one metric at a time, do the following:
metrics = [LogLossFromSklearn(), AsymmetricError(), RocAuc()]
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)
aggregated_metric_per_time_bucket = me.score()
stats = me.stats()
print(aggregated_metric_per_time_bucket.to_string())
print(stats)
timestamp samples log_loss Asymmetric Error roc_auc_score
0 2023-09-14 13:29:48.065000+00:00 499 0.539315 0.365571 0.787030
1 2023-09-14 14:01:51.484000+00:00 499 0.539397 0.365636 0.786837
total rows: 998, score calls: 6, reduce calls: 6
For your data, provide the names of the columns to evaluate:
test_df = gen_dataframe_for_accuracy_metric(
nr_rows=5,
rows_per_time_bucket=1,
prediction_value=1,
time_bucket=TimeBucket.DAY,
prediction_col="my_pred_col",
actuals_col="my_actuals_col",
timestamp_col="my_timestamp_col"
)
print(test_df)
my_timestamp_col my_pred_col my_actuals_col
0 01/06/2005 13:00:00.000000 1 0.999
1 02/06/2005 13:00:00.000000 1 0.999
2 03/06/2005 13:00:00.000000 1 0.999
3 04/06/2005 13:00:00.000000 1 0.999
4 05/06/2005 13:00:00.000000 1 0.999
source = DataFrameSource(
df=test_df,
max_rows=10000,
timestamp_col="timestamp",
)
metric = LogLossFromSklearn()
me = MetricEvaluator(metric=metric,
source=source,
time_bucket=TimeBucket.DAY,
prediction_col="my_pred_col",
actuals_col="my_actuals_col",
timestamp_col="my_timestamp_col"
)
aggregated_metric_per_time_bucket = me.score()
Configure data filtering¶
If data is missing, use filtering flags. In the following example, the data is missing actuals. In this scenario without a flag, an exception is raised:
test_df = gen_dataframe_for_accuracy_metric(
nr_rows=10,
rows_per_time_bucket=5,
prediction_value=1,
time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[2] = None
test_df["actuals"].loc[5] = None
print(test_df)
timestamp predictions actuals
0 01/06/2005 13:00:00.000000 1 0.999
1 01/06/2005 13:00:00.000000 1 0.999
2 01/06/2005 13:00:00.000000 1 NaN
3 01/06/2005 13:00:00.000000 1 0.999
4 01/06/2005 13:00:00.000000 1 0.999
5 01/06/2005 14:00:00.000000 1 NaN
6 01/06/2005 14:00:00.000000 1 0.999
7 01/06/2005 14:00:00.000000 1 0.999
8 01/06/2005 14:00:00.000000 1 0.999
9 01/06/2005 14:00:00.000000 1 0.999
source = DataFrameSource(df=test_df)
metric = MedianAbsoluteError()
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)
aggregated_metric_per_time_bucket = me.score()
"ValueError: Could not apply metric median_absolute_error, make sure you are passing the right data (see the sklearn docs).
The error message was: Input contains NaN."
Compare the previous result with the result when you enable the filter_actuals
flag:
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)
aggregated_metric_per_time_bucket = me.score()
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples median_absolute_error
0 01/06/2005 13:00:00.000000 4 0.001
1 01/06/2005 14:00:00.000000 4 0.001
Using the filter_actuals
, filter_predictions
, and filter_scoring_data
flags, you can filter out missing values from the data before calculating the metric. By default, these flags are set to False
. If all data needed to calculate the metric is missing from the data chunk, the data chunk is skipped with the appropriate log:
test_df = gen_dataframe_for_accuracy_metric(
nr_rows=4,
rows_per_time_bucket=2,
prediction_value=1,
time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[0] = None
test_df["actuals"].loc[1] = None
print(test_df)
timestamp predictions actuals
0 01/06/2005 13:00:00.000000 1 NaN
1 01/06/2005 13:00:00.000000 1 NaN
2 01/06/2005 14:00:00.000000 1 0.999
3 01/06/2005 14:00:00.000000 1 0.999
source = DataFrameSource(df=test_df)
metric = MedianAbsoluteError()
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)
aggregated_metric_per_time_bucket = me.score()
"removed 2 rows out of 2 in the data chunk before scoring, due to missing values in ['actuals'] data"
"data chunk is empty, skipping scoring..."
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples median_absolute_error
1 01/06/2005 14:00:00.000000 2 0.001
Perform segmented analysis¶
Perform segmented analysis by defining the segment_attribute
and each segment_value
:
metrics = LogLossFromSklearn()
me = MetricEvaluator(metric=metric,
source=source,
time_bucket=TimeBucket.HOUR,
segment_attribute="insulin",
segment_value="Down",
)
aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples log_loss [Down]
0 2023-09-14 13:29:49.737000+00:00 49 0.594483
1 2023-09-14 14:01:52.437000+00:00 49 0.594483
# passing more than one segment value
me = MetricEvaluator(metric=metric,
source=source,
time_bucket=TimeBucket.HOUR,
segment_attribute="insulin",
segment_value=["Down", "Steady"],
)
aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples log_loss [Down] log_loss [Steady]
0 2023-09-14 13:29:48.502000+00:00 199 0.594483 0.515811
1 2023-09-14 14:01:51.758000+00:00 199 0.594483 0.515811
# passing more than one segment value and more than one metric
me = MetricEvaluator(metric=[LogLossFromSklearn(), RocAuc()],
source=source,
time_bucket=TimeBucket.HOUR,
segment_attribute="insulin",
segment_value=["Down", "Steady"],
)
aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
timestamp samples log_loss [Down] log_loss [Steady] roc_auc_score [Down] roc_auc_score [Steady]
0 2023-09-14 13:29:48.502000+00:00 199 0.594483 0.515811 0.783333 0.826632
1 2023-09-14 14:01:51.758000+00:00 199 0.594483 0.515811 0.783333 0.826632
Evaluate metrics with aggregation-per-batch¶
The BatchMetricEvaluator
class uses aggregation-per-batch instead of aggregation-over-time. For batches, don't define TimeBucket
:
from dmm.batch_metric_evaluator import BatchMetricEvaluator
from dmm.data_source.datarobot_source import BatchDataRobotSource
from dmm.metric import MissingValuesFraction
source = BatchDataRobotSource(
deployment_id=DEPLOYMENT_ID,
batch_ids=BATCH_IDS,
model_id=MODEL_ID,
)
feature_name = 'RAD'
metric = MissingValuesFraction(feature_name=feature_name)
missing_values_fraction_evaluator = BatchMetricEvaluator(metric=metric, source=source)
aggregated_metric_per_batch = missing_values_fraction_evaluator.score()
print(aggregated_metric_per_batch.to_string())
batch_id samples Missing Values Fraction
0 <batch_id> 506 0.0
1 <batch_id> 506 0.0
2 <batch_id> 506 0.0
Evaluate metrics without data aggregation¶
The IndividualMetricEvaluator
class is used to evaluate metrics without data aggregation. It performs metric calculations on all exported data and returns a list of individual results. This evaluator allows submitting individual data points with a corresponding association ID, which is useful for cases when you want to visualize your metric results alongside predictions and actuals. To use this evaluator with custom metrics, provide a score()
method that contains, among others, the following parameters: timestamps
and association_ids
.
from itertools import zip_longest
from typing import List
from datetime import datetime
from datetime import timedelta
from dmm.individual_metric_evaluator import IndividualMetricEvaluator
from dmm.custom_metric import CustomMetric
from dmm.custom_metric import SingleMetricResult
from dmm.data_source import DataRobotSource
from dmm.metric.metric_base import LLMMetricBase
from nltk import sent_tokenize
import numpy as np
import pandas as pd
source = DataRobotSource(
deployment_id=DEPLOYMENT_ID,
start=datetime.utcnow() - timedelta(weeks=1),
end=datetime.utcnow(),
)
custom_metric = CustomMetric.from_id()
class SentenceCount(LLMMetricBase):
"""
Calculates the total number of sentences created while working with the LLM model.
Returns the sum of the number of sentences from prompts and completions.
"""
def __init__(self):
super().__init__(
name=custom_metric.name,
description="Calculates the total number of sentences created while working with the LLM model.",
need_training_data=False,
)
self.prompt_column = "promptColumn"
def score(
self,
scoring_data: pd.DataFrame,
predictions: np.array,
timestamps: np.array,
association_ids: np.array,
**kwargs,
) -> List[SingleMetricResult]:
if self.prompt_column not in scoring_data.columns:
raise ValueError(
f"Prompt column {self.prompt_column} not found in the exported data, "
f"modify 'PROMPT_COLUMN' runtime parameter"
)
prompts = scoring_data[self.prompt_column].to_numpy()
sentence_count = []
for prompt, completion, ts, a_id in zip_longest(
prompts, predictions, timestamps, association_ids
):
if not isinstance(prompt, str) or not isinstance(completion, str):
continue
value = len(sent_tokenize(prompt)) + len(sent_tokenize(completion))
sentence_count.append(
SingleMetricResult(value=value, timestamp=ts, association_id=a_id)
)
return sentence_count
sentence_count_evaluator = IndividualMetricEvaluator(
metric=SentenceCount(),
source=source,
)
metric_results = sentence_count_evaluator.score()