Skip to content

Click in-app to access the full platform documentation for your version of DataRobot.

Python Module API Reference

The DataRobot Pipelines python module provides a built-in API for working with pipeline data. This page provides details on the different classes and methods available from this API.

When authoring code for a python module, this library can be imported using:

import pipelines

However, this is usually not required as the necessary class, Module, is automatically imported as module. Code examples are provided throughout this page that demonstrate the different ways to work with data in a Python module.

Module

Module is a helper class for operating on the current pipeline module. This class is already provided as module in the module runtime environment.

This helper is primarily useful for reading and writing data to and from the ports on the module. For example, given a module with an input port named raw_data and an output port named parsed_data the data could be read and written as such:

import pandas as pd

# this reads the data and returns a pandas dataframe
df: pd.DataFrame = module.input("raw_data").read()

# an example of filtering out some data from our dataset
new_df: pd.DataFrame = df["Age"] > 35

# finally, let's write the new data
module.output("parsed_data").write(new_df)

Methods

Function Signature Description
input(name: str) -> InputPort Returns the input port with the given name.
inputs() -> List[InputPort] Returns a list of all input ports on the module.
output(name: str) -> OutputPort Returns the output port with the given name.
outputs() -> List[OutputPort] Returns a list of all output ports on the module.
config() -> Dict[str, Any] Returns the configuration for this module run.

Module.input(name: str) -> InputPort

Returns the input port with the given name.

If no input port with the given name exists, an UnknownPortException is raised.

Parameters

name: str - The name of the input port to access.

Returns

InputPort - An InputPort object that allows reading from the named port.

Raises

UnknownPortException - If no input port has the name defined in the parameters, an UnknownPortException is raised.

Module.inputs() -> List[InputPort]

Returns a list of all input ports on the module.

Returns

List[InputPort] - All input ports defined on the module.

Module.output(name: str) -> OutputPort

Returns the output port with the given name.

If no output port with the given name exists, an UnknownPortException is raised.

Parameters

name: str - The name of the output port to access.

Returns

OutputPort - An OutputPort object that allows reading from the named port.

Raises

UnknownPortException - If no output port matches the name defined in the parameters, an UnknownPortException is raised.

Module.outputs() -> List[OutputPort]

Returns a list of all output ports on the module.

Returns

List[OutputPort] - All output ports defined on the module.

Module.config() -> Dict[str, Any]

Returns a dict containing general configuration details for the module.

The configuration returned will contain values that will differ between module runs.

Returns

Dict[str, Any] - The configuration for the module. Most of the information here is internal to the module execution. Notably, this dictionary contains the following keys:

  • ai_catalog_workspace_id - The ID of the DataRobot Pipelines workspace in the AI Catalog.
  • public_api_location - A URL to the DataRobot public API.
  • user_id - The ID of the user who executed the pipeline.
  • user_token - A temporary API token to connect the the DataRobot public API.

InputPort

An object representing an input port on a module.

Data from an input port can be read all at once or in chunks. The latter is recommended for large datasets. Data from the port can only be read once.

Objects of this class should not be defined manually but returned from module.input or module.inputs methods.

Methods

Function Signature Description
read(port_type: PortType) -> Union[pd.DataFrame, pa.Table] Reads all the data in the port.
chunks(port_type: PortType) -> Iterable[Union[pd.DataFrame, pa.Table]] Returns a generator of data chunks that can be iterated over.
read_chunk(port_type: PortType) -> Union[pd.DataFrame, pa.Table] Reads a single chunk of data from the port.

InputPort.read(port_type: PortType = PortType.pandas) -> Union[pd.DataFrame, pa.Table]

Reads all data from the port.

Parameters

port_type: PortType - (Optional) The type that the data should be returned as. Defaults to PortType.pandas.

Returns

Union[pd.DataFrame, pa.Table] - All the data from the port. The type of collection varies based on the PortType used in the port_type parameter. Most commonly, this will be a Pandas DataFrame. See the section on the PortType for examples of getting all types.

Raises

BadPortTypeException - If the port has already been read from with one port_type then read will raise this error if a different port_type is used.

from collections import Iterable

import pandas as pd
import pyarrow as pa
from pipelines import InputPort, PortType

input_port: InputPort = module.input('raw_data')

# this read will setup the port to expect PortType.pandas
chunks: Iterable[pd.DataFrame] = input_port.chunks(port_type=PortType.pandas)

# so this read will fail due to bad port type
table: pa.Table = input_port.read(port_type=PortType.pyarrow)

InputPort.chunks(port_type: PortType = PortType.pandas) -> Iterable[Union[pd.DataFrame, pa.Table]]

Returns a generator that provides all the data from the port in memory-safe chunks. This is recommended over read for operating on large datasets. Each chunk will be returned in the format dictated by the port_type parameter.

import pandas as pd
from pipelines import InputPort, OutputPort

input_port: InputPort = module.input('raw_data')
output_port: OutputPort = module.output('parsed_data')

chunk: pd.DataFrame
for chunk in input_port.chunks():
    new_df: pd.DataFrame = df["Age"] > 35
    output_port.write(new_df)

Parameters

port_type: PortType - (Optional) The type that the data should be returned as. Defaults to PortType.pandas.

Returns

Iterable[Union[pd.DataFrame, pa.Table]] - The generator that will return all the data from the port in chunks. The collection type varies based on the PortType used in the port_type parameter. Most commonly, this will be a Pandas DataFrame. See the section on the PortType for examples of getting all types.

Raises

BadPortTypeException - If the port has already been read from with one port_type then chunks will raise this error if a different port_type is used.

from collection import Iterable

import pandas as pd
import pyarrow as pa
from pipelines import InputPort, PortType

input_port: InputPort = module.input('raw_data')

# this read will setup the port to expect PortType.pandas
chunks: Iterable[pd.DataFrame] = input_port.chunks(port_type=PortType.pandas)

# so this read will fail due to bad port type
new_chunks: Iterable[pa.Table] = input_port.chunks(port_type=PortType.pyarrow)

InputPort.read_chunk(port_type: PortType = PortType.pandas) -> Union[pd.DataFrame, pa.Table]

Reads a single chunk from the input port. The chunk returned will be the next chunk of data after the previous call to read_chunk or yield from the chunks generator. For example:

import pandas as pd
from pipelines import InputPort

input_port: InputPort = module.input('raw_data')

# this will the very beginning of the data from the port
first_chunk: pd.DataFrame = input_port.read_chunk()

# this chunk will continue from where first_chunk ended
next_chunk: pd.DataFrame = input_port.read_chunk()

or

from collections import Iterable

import pandas as pd
from pipelines import InputPort

input_port: InputPort = module.input('raw_data')

# we've created the chunks generator here but no data has been read from it
chunks: Iterable[pd.DataFrame] = input_port.chunks()

# so this will still give us the very beginning of the data from the port
first_chunk: pd.DataFrame = input_port.read_chunk()

# this chunk will continue from where first_chunk ended
next_chunk: pd.DataFrame = next(chunks)

# this chunk will continue from where next_chunk ended
third_chunk: pd.DataFrame = input_port.read_chunk()

Parameters

port_type: PortType - (Optional) The type that the data should be returned as. Defaults to PortType.pandas.

Returns

Union[pd.DataFrame, pa.Table] - A single chunk of data from the port. The collection type varies based on the PortType used in the port_type parameter. Most commonly, this will be a Pandas DataFrame. See the section on the PortType for examples of getting all types.

Raises

BadPortTypeException - If the port has already been read from with one port_type then read_chunk will raise this error if a different port_type is used.

from collection import Iterable

import pandas as pd
import pyarrow as pa
from pipelines import InputPort, PortType

input_port: InputPort = module.input('raw_data')

# this read will setup the port to expect PortType.pandas
chunks: Iterable[pd.DataFrame] = input_port.chunks(port_type=PortType.pandas)

# so this read will fail due to bad port type
new_chunks: pa.Table = input_port.read_chunk(port_type=PortType.pyarrow)

OutputPort

An object representing an output port on a module.

This object allows you to write data to the given port. Data written to the port after the first call appends new data. There is no way to delete or overwrite data that has already been written.

Objects of this class should not be defined manually but returned from Module.output or Module.outputs methods.

Methods

Function Signature Description
write(data: Union[pd.DataFrame, pa.Table]) Writes data to the port.

OutputPort.write(data: Union[pd.DataFrame, pa.Table])

Writes the given data to the port.

Note that you can call write multiple times. Any additional calls after the first will result in the new data being appended to the existing data that has already been written. Any new data should have the same columns as the existing data.

Parameters

data: Union[pd.DataFrame, pa.Table] - The data to be written.

PortType

A simple enum defining the different data type formats that data can be read as. The possible values are:

  • PortType.pandas
  • PortType.pyarrow

Passing the PortType to an InputPort function will result in a different type of data collection used to return the data.

import pandas as pd
import pyarrow as pa
from pipelines import PortType, InputPort

port1: InputPort = module.input("first_port")
port2: InputPort = module.input("second_port")

# using PortType.pandas will return a pandas DataFrame
df: pd.DataFrame = port1.read(port_type=PortType.pandas)

#using PortType.pyarrow will return a PyArrow table
table: pa.Table = port2.read(port_type=PortType.pyarrow)

Updated March 15, 2022
Back to top