# Calculate metric values

> Calculate metric values - To calculate custom metric values, you can use MetricEvaluator to
> calculate metric values over time, BatchMetricEvaluator to calculate metric values per batch, or
> IndividualMetricEvaluator to evaluate metrics without data aggregation.

This Markdown file sits beside the HTML page at the same path (with a `.md` suffix). It summarizes the topic and lists links for tools and LLM context.

Companion generated at `2026-04-24T16:03:56.255606+00:00` (UTC).

## Primary page

- [Calculate metric values](https://docs.datarobot.com/en/docs/api/code-first-tools/dr-model-metrics/dmm-metric-evaluator.html): Full documentation for this topic (HTML).

## Sections on this page

- [Evaluate metrics](https://docs.datarobot.com/en/docs/api/code-first-tools/dr-model-metrics/dmm-metric-evaluator.html#evaluate-metrics): In-page section heading.
- [Configure data filtering](https://docs.datarobot.com/en/docs/api/code-first-tools/dr-model-metrics/dmm-metric-evaluator.html#configure-data-filtering): In-page section heading.
- [Perform segmented analysis](https://docs.datarobot.com/en/docs/api/code-first-tools/dr-model-metrics/dmm-metric-evaluator.html#perform-segmented-analysis): In-page section heading.
- [Evaluate metrics with aggregation-per-batch](https://docs.datarobot.com/en/docs/api/code-first-tools/dr-model-metrics/dmm-metric-evaluator.html#evaluate-metrics-with-aggregation-per-batch): In-page section heading.
- [Evaluate metrics without data aggregation](https://docs.datarobot.com/en/docs/api/code-first-tools/dr-model-metrics/dmm-metric-evaluator.html#evaluate-metrics-without-data-aggregation): In-page section heading.

## Related documentation

- [Developer documentation](https://docs.datarobot.com/en/docs/api/index.html): Linked from this page.
- [Code-first tools](https://docs.datarobot.com/en/docs/api/code-first-tools/index.html): Linked from this page.
- [Model Metrics](https://docs.datarobot.com/en/docs/api/code-first-tools/dr-model-metrics/index.html): Linked from this page.

## Documentation content

# Calculate metric values

To calculate custom metric values, you can use `MetricEvaluator` to calculate metric values over time, `BatchMetricEvaluator` to calculate metric values per batch, or `IndividualMetricEvaluator` to evaluate metrics without data aggregation.

## Evaluate metrics

The `MetricEvaluator` class calculates metric values over time using the selected source. This class is used to "stream" data through the metric object, generating metric values. Initialize the `MetricEvaluator` with the following mandatory parameters:

```
from dmm import MetricEvaluator, TimeBucket
from dmm import DataRobotSource
from dmm.metric import MedianAbsoluteError

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(weeks=1),
    end=datetime.utcnow(),
)

metric = MedianAbsoluteError()

metric_evaluator = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.MINUTE)
```

To use `MetricEvaluator`, create a metric class implementing the `MetricBase` interface and a source implementing `DataSourceBase`. Then, specify the level of aggregation granularity. Initialize `MetricEvaluator` with all parameters:

```
from dmm import ColumnName, MetricEvaluator, TimeBucket

metric_evaluator = MetricEvaluator(
    metric=metric,
    source=source,
    time_bucket=TimeBucket.HOUR,
    prediction_col=ColumnName.PREDICTIONS,
    actuals_col=ColumnName.ACTUALS,
    timestamp_col=ColumnName.TIMESTAMP,
    filter_actuals=False,
    filter_predictions=False,
    filter_scoring_data=False,
    segment_attribute=None,
    segment_value=None,
)
```

| Parameter | Description |
| --- | --- |
| metric: Union[str, MetricBase, List[str], List[MetricBase]] | If a string or list of strings is passed, MetricEvaluator will look for matched Sklearn metrics. If a metric or a list of objects is passed, they must implement the MetricBase interface. |
| source: DataSourceBase | The source to pull the data from, DataRobotSource or DataFrameSource or other sources that implement the DataSourceBase interface. |
| time_bucket: TimeBucket | The time-bucket size to use for evaluating metrics, determining the granularity of aggregation. |
| prediction_col: Optional[str] | The name of the column that contains predictions. |
| actuals_col: Optional[str] | The name of the column that contains actuals. |
| timestamp_col: Optional[str] | The name of the column that contains timestamps. |
| filter_actuals: Optional[bool] | Whether the metric evaluator removes missing actuals values before scoring. True removes missing actuals; the default value is False. |
| filter_predictions: Optional[bool] | Whether the metric evaluator removes missing predictions values before scoring. True removes missing predictions; the default value is False. |
| filter_scoring_data: Optional[bool] | Whether the metric evaluator removes missing scoring values before scoring. True removes missing scoring values; the default value is False. |
| segment_attribute: Optional[str] | The name of the column with segment values. |
| segment_value: Optional[Union[str or List[str]]] | A single value or a list of values of the segment attribute to segment on. |

The `score` method returns a metric aggregated as defined by `TimeBucket`. The output returned as a pandas DataFrame contains the results per time bucket for all data from the source.

```
source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(hours=3),
    end=datetime.utcnow(),
)
metric = LogLossFromSklearn()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())

                          timestamp  samples  log_loss
0  2023-09-14 13:29:48.065000+00:00      499  0.539315
1  2023-09-14 14:01:51.484000+00:00      499  0.539397

# we can see the evaluator's statistics
stats = me.stats()
print(stats)
total rows: 998, score calls: 2, reduce calls: 2
```

To pass more than one metric at a time, do the following:

```
metrics = [LogLossFromSklearn(), AsymmetricError(), RocAuc()]
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
stats = me.stats()
print(aggregated_metric_per_time_bucket.to_string())
print(stats)

                          timestamp  samples  log_loss  Asymmetric Error  roc_auc_score
0  2023-09-14 13:29:48.065000+00:00      499  0.539315          0.365571       0.787030
1  2023-09-14 14:01:51.484000+00:00      499  0.539397          0.365636       0.786837
total rows: 998, score calls: 6, reduce calls: 6
```

For your data, provide the names of the columns to evaluate:

```
test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=5,
    rows_per_time_bucket=1,
    prediction_value=1,
    time_bucket=TimeBucket.DAY,
    prediction_col="my_pred_col",
    actuals_col="my_actuals_col",
    timestamp_col="my_timestamp_col"
)
print(test_df)
             my_timestamp_col  my_pred_col  my_actuals_col
0  01/06/2005 13:00:00.000000            1           0.999
1  02/06/2005 13:00:00.000000            1           0.999
2  03/06/2005 13:00:00.000000            1           0.999
3  04/06/2005 13:00:00.000000            1           0.999
4  05/06/2005 13:00:00.000000            1           0.999

source = DataFrameSource(
    df=test_df,
    max_rows=10000,
    timestamp_col="timestamp",
)

metric = LogLossFromSklearn()

me = MetricEvaluator(metric=metric, 
                     source=source, 
                     time_bucket=TimeBucket.DAY,
                     prediction_col="my_pred_col", 
                     actuals_col="my_actuals_col", 
                     timestamp_col="my_timestamp_col"
                     )
aggregated_metric_per_time_bucket = me.score()
```

### Configure data filtering

If data is missing, use filtering flags. In the following example, the data is missing actuals. In this scenario without a flag, an exception is raised:

```
test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=10,
    rows_per_time_bucket=5,
    prediction_value=1,
    time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[2] = None
test_df["actuals"].loc[5] = None
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1    0.999
1  01/06/2005 13:00:00.000000            1    0.999
2  01/06/2005 13:00:00.000000            1      NaN
3  01/06/2005 13:00:00.000000            1    0.999
4  01/06/2005 13:00:00.000000            1    0.999
5  01/06/2005 14:00:00.000000            1      NaN
6  01/06/2005 14:00:00.000000            1    0.999
7  01/06/2005 14:00:00.000000            1    0.999
8  01/06/2005 14:00:00.000000            1    0.999
9  01/06/2005 14:00:00.000000            1    0.999

source = DataFrameSource(df=test_df)

metric = MedianAbsoluteError()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
"ValueError: Could not apply metric median_absolute_error, make sure you are passing the right data (see the sklearn docs).
The error message was: Input contains NaN."
```

Compare the previous result with the result when you enable the `filter_actuals` flag:

```
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)

aggregated_metric_per_time_bucket = me.score()
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"

print(aggregated_metric_per_time_bucket.to_string())
                    timestamp  samples  median_absolute_error
0  01/06/2005 13:00:00.000000        4                  0.001
1  01/06/2005 14:00:00.000000        4                  0.001
```

Using the `filter_actuals`, `filter_predictions`, and `filter_scoring_data` flags, you can filter out missing values from the data before calculating the metric. By default, these flags are set to `False`. If all data needed to calculate the metric is missing from the data chunk, the data chunk is skipped with the appropriate log:

```
test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=4,
    rows_per_time_bucket=2,
    prediction_value=1,
    time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[0] = None
test_df["actuals"].loc[1] = None
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1      NaN
1  01/06/2005 13:00:00.000000            1      NaN
2  01/06/2005 14:00:00.000000            1    0.999
3  01/06/2005 14:00:00.000000            1    0.999

source = DataFrameSource(df=test_df)

metric = MedianAbsoluteError()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)

aggregated_metric_per_time_bucket = me.score()
"removed 2 rows out of 2 in the data chunk before scoring, due to missing values in ['actuals'] data"
"data chunk is empty, skipping scoring..."

print(aggregated_metric_per_time_bucket.to_string())
                    timestamp  samples  median_absolute_error
1  01/06/2005 14:00:00.000000        2                  0.001
```

### Perform segmented analysis

Perform segmented analysis by defining the `segment_attribute` and each `segment_value`:

```
metrics = LogLossFromSklearn()
me = MetricEvaluator(metric=metric,
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value="Down",
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]
0  2023-09-14 13:29:49.737000+00:00       49         0.594483
1  2023-09-14 14:01:52.437000+00:00       49         0.594483

# passing more than one segment value
me = MetricEvaluator(metric=metric,
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value=["Down", "Steady"],
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]  log_loss [Steady]
0  2023-09-14 13:29:48.502000+00:00      199         0.594483           0.515811
1  2023-09-14 14:01:51.758000+00:00      199         0.594483           0.515811

# passing more than one segment value and more than one metric
me = MetricEvaluator(metric=[LogLossFromSklearn(), RocAuc()],
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value=["Down", "Steady"],
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]  log_loss [Steady]  roc_auc_score [Down]  roc_auc_score [Steady]
0  2023-09-14 13:29:48.502000+00:00      199         0.594483           0.515811              0.783333                0.826632
1  2023-09-14 14:01:51.758000+00:00      199         0.594483           0.515811              0.783333                0.826632
```

## Evaluate metrics with aggregation-per-batch

The `BatchMetricEvaluator` class uses aggregation-per-batch instead of aggregation-over-time. For batches, don't define `TimeBucket`:

```
from dmm.batch_metric_evaluator import BatchMetricEvaluator
from dmm.data_source.datarobot_source import BatchDataRobotSource
from dmm.metric import MissingValuesFraction

source = BatchDataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    batch_ids=BATCH_IDS,
    model_id=MODEL_ID,
)

feature_name = 'RAD'
metric = MissingValuesFraction(feature_name=feature_name)

missing_values_fraction_evaluator = BatchMetricEvaluator(metric=metric, source=source)

aggregated_metric_per_batch = missing_values_fraction_evaluator.score()
print(aggregated_metric_per_batch.to_string())
     batch_id   samples  Missing Values Fraction
0  <batch_id>       506                      0.0
1  <batch_id>       506                      0.0
2  <batch_id>       506                      0.0
```

## Evaluate metrics without data aggregation

The `IndividualMetricEvaluator` class is used to evaluate metrics without data aggregation. It performs metric calculations on all exported data and returns a list of individual results. This evaluator allows submitting individual data points with a corresponding association ID, which is useful for cases when you want to visualize your metric results alongside predictions and actuals. To use this evaluator with custom metrics, provide a `score()` method that contains, among others, the following parameters: `timestamps` and `association_ids`.

```
from itertools import zip_longest
from typing import List
from datetime import datetime
from datetime import timedelta

from dmm import CustomMetric
from dmm import DataRobotSource
from dmm import SingleMetricResult
from dmm.individual_metric_evaluator import IndividualMetricEvaluator
from dmm.metric import LLMMetricBase
from nltk import sent_tokenize
import numpy as np
import pandas as pd

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(weeks=1),
    end=datetime.utcnow(),
)

custom_metric = CustomMetric.from_id()

class SentenceCount(LLMMetricBase):
    """
    Calculates the total number of sentences created while working with the LLM model.
    Returns the sum of the number of sentences from prompts and completions.
    """

    def __init__(self):
        super().__init__(
            name=custom_metric.name,
            description="Calculates the total number of sentences created while working with the LLM model.",
            need_training_data=False,
        )
        self.prompt_column = "promptColumn"

    def score(
        self,
        scoring_data: pd.DataFrame,
        predictions: np.ndarray,
        timestamps: np.ndarray,
        association_ids: np.ndarray,
        **kwargs,
    ) -> List[SingleMetricResult]:
        if self.prompt_column not in scoring_data.columns:
            raise ValueError(
                f"Prompt column {self.prompt_column} not found in the exported data, "
                f"modify 'PROMPT_COLUMN' runtime parameter"
            )
        prompts = scoring_data[self.prompt_column].to_numpy()

        sentence_count = []
        for prompt, completion, ts, a_id in zip_longest(
            prompts, predictions, timestamps, association_ids
        ):
            if not isinstance(prompt, str) or not isinstance(completion, str):
                continue
            value = len(sent_tokenize(prompt)) + len(sent_tokenize(completion))
            sentence_count.append(
                SingleMetricResult(value=value, timestamp=ts, association_id=a_id)
            )
        return sentence_count


sentence_count_evaluator = IndividualMetricEvaluator(
    metric=SentenceCount(),
    source=source,
)
metric_results = sentence_count_evaluator.score()
```
