Skip to content

アプリケーション内で をクリックすると、お使いのDataRobotバージョンに関する全プラットフォームドキュメントにアクセスできます。

Calculate metric values

To calculate custom metric values, you can use MetricEvaluator to calculate metric values over time, BatchMetricEvaluator to calculate metric values per batch, or IndividualMetricEvaluator to evaluate metrics without data aggregation.

Evaluate metrics

The MetricEvaluator class calculates metric values over time using the selected source. This class is used to "stream" data through the metric object, generating metric values. Initialize the MetricEvaluator with the following mandatory parameters:

from dmm import MetricEvaluator, TimeBucket
from dmm.data_source.datarobot_source import DataRobotSource
from dmm.metric import MedianAbsoluteError

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(weeks=1),
    end=datetime.utcnow(),
)

metric = MedianAbsoluteError()

metric_evaluator = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.MINUTE) 

To use MetricEvaluator, create a metric class implementing the MetricBase interface and a source implementing DataSourceBase. Then, specify the level of aggregation granularity. Initialize MetricEvaluator with all parameters:

from dmm import ColumnName, MetricEvaluator, TimeBucket

metric_evaluator = MetricEvaluator(
    metric=metric,
    source=source,
    time_bucket=TimeBucket.HOUR,
    prediction_col=ColumnName.PREDICTIONS,
    actuals_col=ColumnName.ACTUALS,
    timestamp_col=ColumnName.TIMESTAMP,
    filter_actuals=False,
    filter_predictions=False,
    filter_scoring_data=False,
    segment_attribute=None,
    segment_value=None,
) 
パラメーター 説明
metric: Union[str, MetricBase, List[str], List[MetricBase]] If a string or list of strings is passed, MetricEvaluator will look for matched Sklearn metrics. If a metric or a list of objects is passed, they must implement the MetricBase interface.
source: DataSourceBase The source to pull the data from, DataRobotSource or DataFrameSource or other sources that implement the DataSourceBase interface.
time_bucket: TimeBucket The time-bucket size to use for evaluating metrics, determining the granularity of aggregation.
prediction_col: Optional[str] The name of the column that contains predictions.
actuals_col: Optional[str] The name of the column that contains actuals.
timestamp_col: Optional[str] The name of the column that contains timestamps.
filter_actuals: Optional[bool] Whether the metric evaluator removes missing actuals values before scoring. True removes missing actuals; the default value is False.
filter_predictions: Optional[bool] Whether the metric evaluator removes missing predictions values before scoring. True removes missing predictions; the default value is False.
filter_scoring_data: Optional[bool] Whether the metric evaluator removes missing scoring values before scoring. True removes missing scoring values; the default value is False.
segment_attribute: Optional[str] The name of the column with segment values.
segment_value: Optional[Union[str or List[str]]] A single value or a list of values of the segment attribute to segment on.

The score method returns a metric aggregated as defined by TimeBucket. The output returned as a pandas DataFrame contains the results per time bucket for all data from the source.

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(hours=3),
    end=datetime.utcnow(),
)
metric = LogLossFromSklearn()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())

                          timestamp  samples  log_loss
0  2023-09-14 13:29:48.065000+00:00      499  0.539315
1  2023-09-14 14:01:51.484000+00:00      499  0.539397

# we can see the evaluator's statistics
stats = me.stats()
print(stats)
total rows: 998, score calls: 2, reduce calls: 2 

To pass more than one metric at a time, do the following:

metrics = [LogLossFromSklearn(), AsymmetricError(), RocAuc()]
me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
stats = me.stats()
print(aggregated_metric_per_time_bucket.to_string())
print(stats)

                          timestamp  samples  log_loss  Asymmetric Error  roc_auc_score
0  2023-09-14 13:29:48.065000+00:00      499  0.539315          0.365571       0.787030
1  2023-09-14 14:01:51.484000+00:00      499  0.539397          0.365636       0.786837
total rows: 998, score calls: 6, reduce calls: 6 

For your data, provide the names of the columns to evaluate:

test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=5,
    rows_per_time_bucket=1,
    prediction_value=1,
    time_bucket=TimeBucket.DAY,
    prediction_col="my_pred_col",
    actuals_col="my_actuals_col",
    timestamp_col="my_timestamp_col"
)
print(test_df)
             my_timestamp_col  my_pred_col  my_actuals_col
0  01/06/2005 13:00:00.000000            1           0.999
1  02/06/2005 13:00:00.000000            1           0.999
2  03/06/2005 13:00:00.000000            1           0.999
3  04/06/2005 13:00:00.000000            1           0.999
4  05/06/2005 13:00:00.000000            1           0.999

source = DataFrameSource(
    df=test_df,
    max_rows=10000,
    timestamp_col="timestamp",
)

metric = LogLossFromSklearn()

me = MetricEvaluator(metric=metric, 
                     source=source, 
                     time_bucket=TimeBucket.DAY,
                     prediction_col="my_pred_col", 
                     actuals_col="my_actuals_col", 
                     timestamp_col="my_timestamp_col"
                     )
aggregated_metric_per_time_bucket = me.score() 

Configure data filtering

If data is missing, use filtering flags. In the following example, the data is missing actuals. In this scenario without a flag, an exception is raised:

test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=10,
    rows_per_time_bucket=5,
    prediction_value=1,
    time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[2] = None
test_df["actuals"].loc[5] = None
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1    0.999
1  01/06/2005 13:00:00.000000            1    0.999
2  01/06/2005 13:00:00.000000            1      NaN
3  01/06/2005 13:00:00.000000            1    0.999
4  01/06/2005 13:00:00.000000            1    0.999
5  01/06/2005 14:00:00.000000            1      NaN
6  01/06/2005 14:00:00.000000            1    0.999
7  01/06/2005 14:00:00.000000            1    0.999
8  01/06/2005 14:00:00.000000            1    0.999
9  01/06/2005 14:00:00.000000            1    0.999

source = DataFrameSource(df=test_df)

metric = MedianAbsoluteError()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR)

aggregated_metric_per_time_bucket = me.score()
"ValueError: Could not apply metric median_absolute_error, make sure you are passing the right data (see the sklearn docs).
The error message was: Input contains NaN." 

Compare the previous result with the result when you enable the filter_actuals flag:

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)

aggregated_metric_per_time_bucket = me.score()
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"
"removed 1 rows out of 5 in the data chunk before scoring, due to missing values in ['actuals'] data"

print(aggregated_metric_per_time_bucket.to_string())
                    timestamp  samples  median_absolute_error
0  01/06/2005 13:00:00.000000        4                  0.001
1  01/06/2005 14:00:00.000000        4                  0.001 

Using the filter_actuals, filter_predictions, and filter_scoring_data flags, you can filter out missing values from the data before calculating the metric. By default, these flags are set to False. If all data needed to calculate the metric is missing from the data chunk, the data chunk is skipped with the appropriate log:

test_df = gen_dataframe_for_accuracy_metric(
    nr_rows=4,
    rows_per_time_bucket=2,
    prediction_value=1,
    time_bucket=TimeBucket.HOUR,
)
test_df["actuals"].loc[0] = None
test_df["actuals"].loc[1] = None
print(test_df)
                    timestamp  predictions  actuals
0  01/06/2005 13:00:00.000000            1      NaN
1  01/06/2005 13:00:00.000000            1      NaN
2  01/06/2005 14:00:00.000000            1    0.999
3  01/06/2005 14:00:00.000000            1    0.999

source = DataFrameSource(df=test_df)

metric = MedianAbsoluteError()

me = MetricEvaluator(metric=metric, source=source, time_bucket=TimeBucket.HOUR, filter_actuals=True)

aggregated_metric_per_time_bucket = me.score()
"removed 2 rows out of 2 in the data chunk before scoring, due to missing values in ['actuals'] data"
"data chunk is empty, skipping scoring..."

print(aggregated_metric_per_time_bucket.to_string())
                    timestamp  samples  median_absolute_error
1  01/06/2005 14:00:00.000000        2                  0.001 

Perform segmented analysis

Perform segmented analysis by defining the segment_attribute and each segment_value:

metrics = LogLossFromSklearn()
me = MetricEvaluator(metric=metric,
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value="Down",
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]
0  2023-09-14 13:29:49.737000+00:00       49         0.594483
1  2023-09-14 14:01:52.437000+00:00       49         0.594483

# passing more than one segment value
me = MetricEvaluator(metric=metric,
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value=["Down", "Steady"],
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]  log_loss [Steady]
0  2023-09-14 13:29:48.502000+00:00      199         0.594483           0.515811
1  2023-09-14 14:01:51.758000+00:00      199         0.594483           0.515811

# passing more than one segment value and more than one metric
me = MetricEvaluator(metric=[LogLossFromSklearn(), RocAuc()],
                     source=source,
                     time_bucket=TimeBucket.HOUR,
                     segment_attribute="insulin",
                     segment_value=["Down", "Steady"],
                     )

aggregated_metric_per_time_bucket = me.score()
print(aggregated_metric_per_time_bucket.to_string())
                          timestamp  samples  log_loss [Down]  log_loss [Steady]  roc_auc_score [Down]  roc_auc_score [Steady]
0  2023-09-14 13:29:48.502000+00:00      199         0.594483           0.515811              0.783333                0.826632
1  2023-09-14 14:01:51.758000+00:00      199         0.594483           0.515811              0.783333                0.826632 

Evaluate metrics with aggregation-per-batch

The BatchMetricEvaluator class uses aggregation-per-batch instead of aggregation-over-time. For batches, don't define TimeBucket:

from dmm.batch_metric_evaluator import BatchMetricEvaluator
from dmm.data_source.datarobot_source import BatchDataRobotSource
from dmm.metric import MissingValuesFraction

source = BatchDataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    batch_ids=BATCH_IDS,
    model_id=MODEL_ID,
)

feature_name = 'RAD'
metric = MissingValuesFraction(feature_name=feature_name)

missing_values_fraction_evaluator = BatchMetricEvaluator(metric=metric, source=source)

aggregated_metric_per_batch = missing_values_fraction_evaluator.score()
print(aggregated_metric_per_batch.to_string())
     batch_id   samples  Missing Values Fraction
0  <batch_id>       506                      0.0
1  <batch_id>       506                      0.0
2  <batch_id>       506                      0.0 
!!! note "備考"
    For batches, actuals and multiple segments are not supported.

Evaluate metrics without data aggregation

The IndividualMetricEvaluator class is used to evaluate metrics without data aggregation. It performs metric calculations on all exported data and returns a list of individual results. This evaluator allows submitting individual data points with a corresponding association ID, which is useful for cases when you want to visualize your metric results alongside predictions and actuals. To use this evaluator with custom metrics, provide a score() method that contains, among others, the following parameters: timestamps and association_ids.

from itertools import zip_longest
from typing import List
from datetime import datetime
from datetime import timedelta

from dmm.individual_metric_evaluator import IndividualMetricEvaluator
from dmm.custom_metric import CustomMetric
from dmm.custom_metric import SingleMetricResult
from dmm.data_source import DataRobotSource
from dmm.metric.metric_base import LLMMetricBase
from nltk import sent_tokenize
import numpy as np
import pandas as pd

source = DataRobotSource(
    deployment_id=DEPLOYMENT_ID,
    start=datetime.utcnow() - timedelta(weeks=1),
    end=datetime.utcnow(),
)

custom_metric = CustomMetric.from_id()

class SentenceCount(LLMMetricBase):
    """
    Calculates the total number of sentences created while working with the LLM model.
    Returns the sum of the number of sentences from prompts and completions.
    """

    def __init__(self):
        super().__init__(
            name=custom_metric.name,
            description="Calculates the total number of sentences created while working with the LLM model.",
            need_training_data=False,
        )
        self.prompt_column = "promptColumn"

    def score(
        self,
        scoring_data: pd.DataFrame,
        predictions: np.array,
        timestamps: np.array,
        association_ids: np.array,
        **kwargs,
    ) -> List[SingleMetricResult]:
        if self.prompt_column not in scoring_data.columns:
            raise ValueError(
                f"Prompt column {self.prompt_column} not found in the exported data, "
                f"modify 'PROMPT_COLUMN' runtime parameter"
            )
        prompts = scoring_data[self.prompt_column].to_numpy()

        sentence_count = []
        for prompt, completion, ts, a_id in zip_longest(
            prompts, predictions, timestamps, association_ids
        ):
            if not isinstance(prompt, str) or not isinstance(completion, str):
                continue
            value = len(sent_tokenize(prompt)) + len(sent_tokenize(completion))
            sentence_count.append(
                SingleMetricResult(value=value, timestamp=ts, association_id=a_id)
            )
        return sentence_count


sentence_count_evaluator = IndividualMetricEvaluator(
    metric=SentenceCount(),
    source=source,
)
metric_results = sentence_count_evaluator.score() 

更新しました July 27, 2024