Python Module API Reference¶
The DataRobot Pipelines python module provides a built-in API for working with pipeline data. This page provides details on the different classes and methods available from this API.
When authoring code for a python module, this library can be imported using:
import pipelines
However, this is usually not required as the necessary class, Module,
is automatically imported as module
. Code examples are provided throughout
this page that demonstrate the different ways to work with data in a Python
module.
Module¶
Module is a helper class for operating on the current pipeline module. This
class is already provided as module
in the module runtime environment.
This helper is primarily useful for reading and writing data to and from the
ports on the module. For example, given a module with an input port named
raw_data
and an output port named parsed_data
the data could be read and
written as such:
import pandas as pd
# this reads the data and returns a pandas dataframe
df: pd.DataFrame = module.input("raw_data").read()
# an example of filtering out some data from our dataset
new_df: pd.DataFrame = df["Age"] > 35
# finally, let's write the new data
module.output("parsed_data").write(new_df)
Methods¶
Function Signature | Description |
---|---|
input(name: str) -> InputPort | Returns the input port with the given name. |
inputs() -> List[InputPort] | Returns a list of all input ports on the module. |
output(name: str) -> OutputPort | Returns the output port with the given name. |
outputs() -> List[OutputPort] | Returns a list of all output ports on the module. |
config() -> Dict[str, Any] | Returns the configuration for this module run. |
Module.input(name: str) -> InputPort
¶
Returns the input port with the given name.
If no input port with the given name exists, an UnknownPortException is raised.
Parameters
name: str - The name of the input port to access.
Returns
InputPort - An InputPort object that allows reading from the named port.
Raises
UnknownPortException - If no input port has the name defined in the parameters, an UnknownPortException is raised.
Module.inputs() -> List[InputPort]
¶
Returns a list of all input ports on the module.
Returns
List[InputPort] - All input ports defined on the module.
Module.output(name: str) -> OutputPort
¶
Returns the output port with the given name.
If no output port with the given name exists, an UnknownPortException is raised.
Parameters
name: str - The name of the output port to access.
Returns
OutputPort - An OutputPort object that allows reading from the named port.
Raises
UnknownPortException - If no output port matches the name defined in the parameters, an UnknownPortException is raised.
Module.outputs() -> List[OutputPort]
¶
Returns a list of all output ports on the module.
Returns
List[OutputPort] - All output ports defined on the module.
Module.config() -> Dict[str, Any]
¶
Returns a dict containing general configuration details for the module.
The configuration returned will contain values that will differ between module runs.
Returns
Dict[str, Any] - The configuration for the module. Most of the information here is internal to the module execution. Notably, this dictionary contains the following keys:
ai_catalog_workspace_id
- The ID of the DataRobot Pipelines workspace in the AI Catalog.public_api_location
- A URL to the DataRobot public API.user_id
- The ID of the user who executed the pipeline.user_token
- A temporary API token to connect the the DataRobot public API.
InputPort¶
An object representing an input port on a module.
Data from an input port can be read all at once or in chunks. The latter is recommended for large datasets. Data from the port can only be read once.
Objects of this class should not be defined manually but returned from module.input or module.inputs methods.
Methods¶
Function Signature | Description |
---|---|
read(port_type: PortType) -> Union[pd.DataFrame, pa.Table] | Reads all the data in the port. |
chunks(port_type: PortType) -> Iterable[Union[pd.DataFrame, pa.Table]] | Returns a generator of data chunks that can be iterated over. |
read_chunk(port_type: PortType) -> Union[pd.DataFrame, pa.Table] | Reads a single chunk of data from the port. |
InputPort.read(port_type: PortType = PortType.pandas) -> Union[pd.DataFrame, pa.Table]
¶
Reads all data from the port.
Parameters
port_type: PortType - (Optional) The type that the data should be
returned as. Defaults to PortType.pandas
.
Returns
Union[pd.DataFrame, pa.Table] - All the data from the port. The type of collection varies based on the PortType used in the port_type parameter. Most commonly, this will be a Pandas DataFrame. See the section on the PortType for examples of getting all types.
Raises
BadPortTypeException - If the port has already been read from with one port_type then read will raise this error if a different port_type is used.
from collections import Iterable
import pandas as pd
import pyarrow as pa
from pipelines import InputPort, PortType
input_port: InputPort = module.input('raw_data')
# this read will setup the port to expect PortType.pandas
chunks: Iterable[pd.DataFrame] = input_port.chunks(port_type=PortType.pandas)
# so this read will fail due to bad port type
table: pa.Table = input_port.read(port_type=PortType.pyarrow)
InputPort.chunks(port_type: PortType = PortType.pandas) -> Iterable[Union[pd.DataFrame, pa.Table]]
¶
Returns a generator that provides all the data from the port in memory-safe chunks. This is recommended over read for operating on large datasets. Each chunk will be returned in the format dictated by the port_type parameter.
import pandas as pd
from pipelines import InputPort, OutputPort
input_port: InputPort = module.input('raw_data')
output_port: OutputPort = module.output('parsed_data')
chunk: pd.DataFrame
for chunk in input_port.chunks():
new_df: pd.DataFrame = df["Age"] > 35
output_port.write(new_df)
Parameters
port_type: PortType - (Optional) The type that the data should be
returned as. Defaults to PortType.pandas
.
Returns
Iterable[Union[pd.DataFrame, pa.Table]] - The generator that will return all the data from the port in chunks. The collection type varies based on the PortType used in the port_type parameter. Most commonly, this will be a Pandas DataFrame. See the section on the PortType for examples of getting all types.
Raises
BadPortTypeException - If the port has already been read from with one port_type then chunks will raise this error if a different port_type is used.
from collection import Iterable
import pandas as pd
import pyarrow as pa
from pipelines import InputPort, PortType
input_port: InputPort = module.input('raw_data')
# this read will setup the port to expect PortType.pandas
chunks: Iterable[pd.DataFrame] = input_port.chunks(port_type=PortType.pandas)
# so this read will fail due to bad port type
new_chunks: Iterable[pa.Table] = input_port.chunks(port_type=PortType.pyarrow)
InputPort.read_chunk(port_type: PortType = PortType.pandas) -> Union[pd.DataFrame, pa.Table]
¶
Reads a single chunk from the input port. The chunk returned will be the next chunk of data after the previous call to read_chunk or yield from the chunks generator. For example:
import pandas as pd
from pipelines import InputPort
input_port: InputPort = module.input('raw_data')
# this will the very beginning of the data from the port
first_chunk: pd.DataFrame = input_port.read_chunk()
# this chunk will continue from where first_chunk ended
next_chunk: pd.DataFrame = input_port.read_chunk()
or
from collections import Iterable
import pandas as pd
from pipelines import InputPort
input_port: InputPort = module.input('raw_data')
# we've created the chunks generator here but no data has been read from it
chunks: Iterable[pd.DataFrame] = input_port.chunks()
# so this will still give us the very beginning of the data from the port
first_chunk: pd.DataFrame = input_port.read_chunk()
# this chunk will continue from where first_chunk ended
next_chunk: pd.DataFrame = next(chunks)
# this chunk will continue from where next_chunk ended
third_chunk: pd.DataFrame = input_port.read_chunk()
Parameters
port_type: PortType - (Optional) The type that the data should be
returned as. Defaults to PortType.pandas
.
Returns
Union[pd.DataFrame, pa.Table] - A single chunk of data from the port. The collection type varies based on the PortType used in the port_type parameter. Most commonly, this will be a Pandas DataFrame. See the section on the PortType for examples of getting all types.
Raises
BadPortTypeException - If the port has already been read from with one port_type then read_chunk will raise this error if a different port_type is used.
from collection import Iterable
import pandas as pd
import pyarrow as pa
from pipelines import InputPort, PortType
input_port: InputPort = module.input('raw_data')
# this read will setup the port to expect PortType.pandas
chunks: Iterable[pd.DataFrame] = input_port.chunks(port_type=PortType.pandas)
# so this read will fail due to bad port type
new_chunks: pa.Table = input_port.read_chunk(port_type=PortType.pyarrow)
OutputPort¶
An object representing an output port on a module.
This object allows you to write data to the given port. Data written to the port after the first call appends new data. There is no way to delete or overwrite data that has already been written.
Objects of this class should not be defined manually but returned from Module.output or Module.outputs methods.
Methods¶
Function Signature | Description |
---|---|
write(data: Union[pd.DataFrame, pa.Table]) | Writes data to the port. |
OutputPort.write(data: Union[pd.DataFrame, pa.Table])
¶
Writes the given data to the port.
Note that you can call write
multiple times. Any additional calls after the
first will result in the new data being appended to the existing data that has
already been written. Any new data should have the same columns as the existing
data.
Parameters
data: Union[pd.DataFrame, pa.Table] - The data to be written.
PortType¶
A simple enum defining the different data type formats that data can be read as. The possible values are:
- PortType.pandas
- PortType.pyarrow
Passing the PortType to an InputPort function will result in a different type of data collection used to return the data.
import pandas as pd
import pyarrow as pa
from pipelines import PortType, InputPort
port1: InputPort = module.input("first_port")
port2: InputPort = module.input("second_port")
# using PortType.pandas will return a pandas DataFrame
df: pd.DataFrame = port1.read(port_type=PortType.pandas)
#using PortType.pyarrow will return a PyArrow table
table: pa.Table = port2.read(port_type=PortType.pyarrow)