Skip to content

Streaming Ingestion

Use streaming ingestion to monitor real-time inference models.

To use streaming ingestion, you must:

  1. Register a schema with your data collection
  2. Start ingesting points to TruEra for real-time monitoring
  3. Optionally define a callback to run after your point is ingested

Register a schema for streaming

Streaming schemas are similar to ColumnSpecs, except they include types.

from truera.client.ingestion.schema import *
from truera.client.ingestion.streaming import StreamingStatus


tru = TrueraWorkspace(...)
tru.add_project("my_project")
my_schema = Schema(
            id_column_name="id",
            timestamp_column_name="ts",
            tags_column_name="tags",
            input_columns=TabularInputColumns(
                pre_transformed_features=[
                    FloatColumn(name="f1"),
                    IntColumn(name="f2"),
                    StringColumn(name="f3"),
                    BoolColumn(name="f4")
                ]
            ),
            score_columns=BinaryClassificationColumns(
                label=IntColumn(name="actual"),
                probit_score=FloatColumn(name="proba"),
                class_score=IntColumn(name="prediction")
            ),
            extra_columns=[StringColumn(name="extra1"), StringColumn(name="extra2")]
)

# register your schema with a data collection
tru.add_data_collection(data_collection_name = "my_dc", schema = my_schema)

Ingesting Records

Ingest your model's inputs, outputs, and optional labels using ingest_events(). ingest_events() accepts a list of events to ingest, where each event is a dictionary that has keys that map to the column names in your schema. You can also supply a project name and model name to ingest_events() if you'd like to override your workspace's context.

We support ingesting at most 1000 events in a single ingest_events() request.

tru.add_model("my_model")
# Create a fake model inference event. Labels may not be known at inference time.
inference_event = {
    "id": "a_guid", 
    # timestamps must be in RFC 3339 format
    "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(),
    "tags": ["tag1", "tag2", "tag3"],
    "f1": 0.1,
    "f2": 2,
    "f3": "A",
    "f4": False,
    "proba": 0.8,
    "prediction": 1,
    "extra1": "CA",
    "extra2": "Male"
}
inference_future = tru.ingest_events(events=[inference_event])
print(inference_future.result())


# Assume that at some point in the future, some downstream system determines the ground truth. Ingest it.
label_event = {
    # use the same guid to associate the label with the previously ingested inference
    "id": "a_guid", 
    "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(),
    "actual": 0
}
label_future = tru.ingest_events(events=[label_event])
print(label_future.result())

Define a callback for your ingestion future to handle ingestion failures

TruEra ingests events asynchronously using a threadpool in the TruEra SDK to avoid blocking your program's main thread so you can ingest events with high throughput. You can register a callback to run after the ingest_event() function returns to handle failures. It may be useful to log failed events or write them to a dead letter queue for custom retries. TruEra rejects events that do not meet the defined schema and surfaces these rejections via exceptions or or as fields in the function's response.

def callback(f):
    result = future.result()
    if result.status == StreamingStatus.FAILURE:
        # do something with result.events and result.errors

tru.ingest_events(events=[inference_event], raise_errors = False).add_done_callback(callback)
By default, the raise_errors argument to ingest_events() is True, which means that the future raises an exception if there are any errors in ingesting the events. If raise_errors is overriden to False, then the function does not throw an exception. In this case, the reponse includes a structured set of errors alongside the original events for custom handling.

Building on ingest_events()

ingest_events() can be used directly in your application that invokes your hosted real-time model endpoint. For latency sensitive applications, it can be also be used to ingest inferences buffered into a message queue or serve as a building block to build a simple data export sidecar from structured inference endpoint logs.

We recommend buffering 1000 events into a single ingest_events() request for best throughput results.

Click Next below to continue.