Skip to content

Python SDK Tutorial: PySpark Local Compute Flow

In this notebook tutorial, we'll demonstrate the local model execution flow in TruEra's Python SDK using PySpark DataFrames and models.

What does "local model execution" mean? What's different about it?

In local model execution mode, you can use the SDK to perform computations with your model locally. This is especially useful when it is difficult to execute models remotely.

Before you begin ⬇️

⚠️ Make sure you have truera and truera_qii SDK packages installed before going through this tutorial. See the Installation Instructions for additional help.

What we'll cover ☑️

  • Create a project with some PySpark data and models.
  • Compare performance and explanations across models.

Step 1a: Create a Spark session

The Spark session is local in this case, but you can modify this to connect to your cluster.

from pyspark.sql import SparkSession

appName = "PySpark Example - Distributed QII"
master = "local"
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()
spark_context = spark.sparkContext

Step 1b: Connect to TruEra endpoint

This is a tutorial for a basic flow using the SDK's local environment. All artifacts and computations will only exist on your local machine, precluding the need to connect to a TruEra deployment for most functionality.

In this tutorial, however, we will also demonstrate how to sync local and remote projects (much like you would with a local vs. remote Git branch). To enable this, we will connect to a TruEra deployment.

What do I need to connect to my TruEra deployment?

  • TruEra deployment URI (the connection string). For most users, the TruEra URI will take the form https://<your-truera-access-uri>.
  • Some form of authentication (basic auth or token auth).

For examples on how to authenticate, see Authentication in Diagnostics Quickstart. Here, we will use basic authentication with a username and password.

import pandas as pd
import numpy as np
import os
from truera.client.truera_authentication import BasicAuthentication
from truera.client.truera_workspace import TrueraWorkspace

# FILL ME! 
CONNECTION_STRING = "<TRUERA_URL>"
USERNAME = "<USERNAME>"
PASSWORD = "<PASSWORD>"
QUICKSTART_DOWNLOAD_DIRECTORY = "<QUICKSTART_DOWNLOAD_DIRECTORY>"

auth = BasicAuthentication(USERNAME, PASSWORD)
tru = TrueraWorkspace(CONNECTION_STRING, auth)

Step 2: Build a model

Here we're using the Census Income dataset from our Quickstart data, which you can get via your deployment's Downloads page. We will just train a basic PySpark RandomForestClassifier on the full dataset.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# source paths for data
X_PATH = os.path.join(QUICKSTART_DOWNLOAD_DIRECTORY, "census_income", "data_num.csv")
Y_PATH = os.path.join(QUICKSTART_DOWNLOAD_DIRECTORY, "census_income", "label.csv")

# grab input & label data & preprocess data with small modifications
background_data_as_pandas = pd.read_csv(X_PATH)
sdf_input = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(X_PATH)
sdf_label = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(Y_PATH)

sdf_label = sdf_label.withColumnRenamed("_c0", "id")
sdf_label = sdf_label.withColumnRenamed("_c1", "label")
sdf_label = sdf_label.withColumn("label", sdf_label["label"].cast("integer"))

sdf = sdf_input.join(sdf_label, on="id").drop("id")

feature_assemble = VectorAssembler(inputCols=sdf.columns[:-1], outputCol='features')
sdf = feature_assemble.transform(sdf)
# build a model
rf_model = RandomForestClassifier(labelCol="label", featuresCol="features")
trained_rf_model = rf_model.fit(sdf)

Step 3: Add your model and split data!

tru.add_project("test-local-pyspark", score_Type="logits")
tru.add_data_collection("data")

We can directly add our PySpark-based tree model to our local workspace, as well as our split data.

tru.add_python_model("pyspark-rf-classifier", trained_rf_model)
tru.add_data(
    background_data_as_pandas
    data_split_name="split1",
    column_spec=ColumnSpec(
        id_col_name="id",
        pre_data_col_names=[c for c in background_data_as_pandas.columns if c != "id"]
    )
)

Now, we can compute information about our model on our own machine, such as model predictions or feature influences.

tru.get_ys_pred()
influences = tru.get_feature_influences()
print(influences.head(10).to_markdown())