Pipeline
A class to create and execute a data processing pipeline for pandas DataFrames. This pipeline applies a sequence of functions to a DataFrame, where the output of one function serves as the input for the next. It ensures type consistency and provides runtime validation for the function chain.
Example Usage:
Let’s illustrate how to define a data processing pipeline using the Pipeline
class
and execute it on a pandas DataFrame.
import pandas as pd
import logging
from llm_etl_pipeline.transformation import Pipeline
# Define some sample transformation functions
def add_one_column(df: pd.DataFrame) -> pd.DataFrame:
"""Adds a 'value_plus_one' column."""
logger.info(f"Step: add_one_column received {len(df)} rows.")
df['value_plus_one'] = df['value'] + 1
print(f"After add_one_column:\n{df}\n")
return df
def multiply_by_two_column(df: pd.DataFrame) -> pd.DataFrame:
"""Multiplies the 'value_plus_one' column by 2."""
logger.info(f"Step: multiply_by_two_column received {len(df)} rows.")
df['value_times_two'] = df['value_plus_one'] * 2
print(f"After multiply_by_two_column:\n{df}\n")
return df
def filter_rows(df: pd.DataFrame) -> pd.DataFrame:
"""Filters rows where 'value' is greater than 1."""
logger.info(f"Step: filter_rows received {len(df)} rows.")
filtered_df = df[df['value'] > 1]
print(f"After filter_rows:\n{filtered_df}\n")
return filtered_df
# Create an initial DataFrame
initial_df = pd.DataFrame({
'id': [1, 2, 3, 4],
'value': [10, 20, 5, 15]
})
pipeline = Pipeline(
functions=[
add_one_column,
filter_rows,
multiply_by_two_column # Note: filter_rows might remove data needed by subsequent steps
# This ordering will make multiply_by_two_column operate on a potentially smaller DF
]
)
final_df = pipeline.run(initial_df.copy())
API Reference
- class llm_etl_pipeline.Pipeline(**data)[source]
Bases:
BaseModel
A class to create and execute a data processing pipeline for pandas DataFrames.
This pipeline applies a sequence of functions to a DataFrame, where the output of one function serves as the input for the next. It ensures type consistency and provides runtime validation for the function chain.
- Parameters:
functions (List[Callable[[Annotated[DataFrame, InstanceOf(), BeforeValidator(func=~llm_etl_pipeline.typings.internal.validators._ensure_dataframe_type, json_schema_input_type=PydanticUndefined), AfterValidator(func=~llm_etl_pipeline.typings.internal.validators._validate_non_empty_dataframe)], Any], Annotated[DataFrame, InstanceOf(), BeforeValidator(func=~llm_etl_pipeline.typings.internal.validators._ensure_dataframe_type, json_schema_input_type=PydanticUndefined), AfterValidator(func=~llm_etl_pipeline.typings.internal.validators._validate_non_empty_dataframe)]]])
- functions
A list of callable functions that represent the steps in the pipeline. Each function is expected to accept a pandas DataFrame (specifically NonEmptyDataFrame or pd.DataFrame) as its first argument and return a pandas DataFrame (NonEmptyDataFrame or pd.DataFrame). Functions can optionally accept additional Any arguments, but at least one DataFrame-typed argument is required. Defaults to an empty list.
- Type:
List[Callable[[NonEmptyDataFrame, Any], NonEmptyDataFrame]]
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
-
functions:
typing.List
[typing.Callable
[[typing.Annotated
[pandas.core.frame.DataFrame
],typing.Any
],typing.Annotated
[pandas.core.frame.DataFrame
]]]
- run(input_df)[source]
Executes the defined pipeline of functions on the provided DataFrame.
The input_df is passed as the initial input to the first function. The output DataFrame from each function is then passed as the input to the subsequent function in the functions list. A runtime check ensures that each function indeed returns a pandas.DataFrame.
- Parameters:
input_df (NonEmptyDataFrame) – The initial pandas DataFrame to start the pipeline processing.
- Returns:
The final DataFrame after all functions in the pipeline have been successfully executed.
- Return type:
pd.DataFrame
- Raises:
TypeError – If any function in the pipeline returns a value that is not an instance of pandas.DataFrame.
Exception – Re-raises any other exceptions that occur during the execution of individual functions within the pipeline.