Tobii XR Devzone > Develop > Tobii Ocumen > Ocumen Filters > Python >

Ocumen Filters - Getting Started in Python

This page provides a tutorial for getting started with Ocumen Filters in Python. The tutorial explains how to implement the Fixation Dispersion Angles filter, and a similar methodology can be used to implement other filters.

The entire Python script for this tutorial is available in Step 7.

Table of Contents

Step 1: Open the Ocumen Filters Python library

  • Download your Ocumen bundle.
  • Locate and open the tutorial folder inside \Python\Ocumen Filters\

Step 2: Create a Python script and add a reference to a .ocumen recording

In a blank python script, add a Windows path reference to a .ocumen recording file. You can create your own .ocumen recording by following the getting started steps in Ocumen for Unity Getting Started or you can use the test recording which is located in the data folder of the Ocumen Python API as well as in the Ocumen Studio download. The windows path to the recording can be absolute (eg. C:\Users\<YourUsername>\Documents\sample-recording.ocumen) or relative (eg. data/sample-recording.ocumen).

import ocumen_io
import ocumen_filters
import ctypes

sample_recording_path = b"data/sample-recording.ocumen"

Step 3: Get gaze data with timestamps from the recording

The following function can be copied into your python script. It takes a .ocumen recording path as input and outputs binocular gaze values with timestamps, split into two native arrays.


# Get binocular data and associated timestamps
def get_binocular_data(recording_path):
    # Open file
    ocumen_connector = ocumen_io.WorldConnector.from_file(recording_path)
    ocumen_connector.check_compatible()

    # Read available data
    ocumen_recording = ocumen_io.WorldReader.from_connector(ocumen_connector)
    ocumen_recording.read_incremental()

    # Knowing this is a single-user recording, get the user ID
    user = ocumen_recording.list_users()[0]

    # Get all points in time when an event happened for this user. Here,
    # all points when binocular eye tracking data arrived.
    timestamps = ocumen_recording.track_times(user, ocumen_io.TrackedData.GazeBinocular).copied()

    gaze_array = []

    # Query for data for the given times.
    for timestamp in timestamps:
        meta = ocumen_io.UserInTimeRange(user, 0, timestamp)
        gaze_array.append(ocumen_recording.latest_gaze_binocular_for(meta).value.value)

    # Construct a native array of gaze times to pass to filter.
    timestamps_native_array = ocumen_filters.Slicei64(data=timestamps.data, len=timestamps.len)

    # Construct a native array to be able to pass gaze data to filter.
    temp_gaze_array = (ocumen_io.BinocularGaze * len(gaze_array))(*gaze_array)
    gaze_native_array = ocumen_filters.SliceBinocularGaze(data=ctypes.cast(temp_gaze_array, ctypes.POINTER(ocumen_filters.BinocularGaze)), len=len(gaze_array))

    return timestamps_native_array, gaze_native_array

After copying in the get_binocular_data function, it can be called with the recording path as input.

# Get binocular gaze data with timestamps from the Ocumen recording.
(timestamps_native_array, gaze_native_array) = get_binocular_data(sample_recording_path)

Step 4: Get velocity values by running a velocity filter

One of the inputs required for the Fixation Dispersion Angles filter is gaze velocity at each timestamp. To obtain these velocities, you can use the Instantaneous Velocities filter.

Before running a filter, define and initialize an output array with the same length as the input array(s). Each filter uses a specific output type, such as a VelocityOutput array in this case. The output array is the final parameter in a filter function, and the function writes the results into this array.

All functions in the filter library return a FilterError result. Be sure to check this result for potential errors.

# Run a velocity filter and save the result, which will be used as input to the fixation dispersion angles filter.
# A velocity output native array needs to initialized before running a velocity filter.
velocity_array = (ocumen_filters.VelocityOutput * len(gaze_native_array))()
velocty_result = ocumen_filters.velocity_instantaneous(timestamps_native_array, gaze_native_array, velocity_array)
if velocty_result != ocumen_filters.FilterError.Ok:
    raise Exception(f"Failed to calculate velocities using VelocityInstantaneous with error {velocty_result}.")

Step 5: Run the Fixation Dispersion Angles filter with configuration values

The Dispersion Angles filter is a configurable filter. This filter operates similarly to Step 4, but with an additional config variable provided for the filter function. A config contains all adjustable parameters for a filter. You can find each filter’s configuration information and default values in the API Reference.

# Run the fixation dispersion angles filter with configuration values and save the result.
fixation_array = (ocumen_filters.FixationOutput * len(gaze_native_array))()
config = ocumen_filters.DispersionAnglesFilterConfig(max_angle_for_fixations_deg=3, min_duration_for_fixation_us=100_000, max_outliers=1)
fixation_result = ocumen_filters.fixation_dispersion_angles(config, timestamps_native_array, velocity_array, fixation_array)
if fixation_result != ocumen_filters.FilterError.Ok:
    raise Exception(f"Failed to calculate fixations using FixationDispersionAngles with error {fixation_result}.")

Step 6: Print the results of the Fixation Dispersion Angles filter

To see the final results, the Fixation Dispersion Angles filter output array is printed. Each result in the filter’s output array is looped through and logged, while checking for null values to write “No Data” instead of an empty string.

# Print the output of the fixation dispersion angles filter.
for x, fixation in enumerate(fixation_array):
    left_string = ''
    right_string = ''

    if fixation.is_fixation_left.is_some():
        left_string = str(fixation.is_fixation_left.value)
    else:
        left_string = 'No Data'

    if fixation.is_fixation_right.is_some():
        right_string = str(fixation.is_fixation_right.value)
    else:
        right_string = 'No Data'


    print('Timestamp: ' + str(timestamps_native_array[x]) + ', Left: ' + left_string + ', Right: ' + right_string)

Step 7: Run the code

That’s it! You are now ready to run the Python code.

Here is the full code example:

import ocumen_io
import ocumen_filters
import ctypes

sample_recording_path = b"data/sample-recording.ocumen"

# Get binocular data and associated timestamps
def get_binocular_data():
    # Open file
    ocumen_connector = ocumen_io.WorldConnector.from_file(sample_recording_path)
    ocumen_connector.check_compatible()

    # Read available data
    ocumen_recording = ocumen_io.WorldReader.from_connector(ocumen_connector)
    ocumen_recording.read_incremental()

    # Knowing this is a single-user recording, get the user ID
    user = ocumen_recording.list_users()[0]

    # Get all points in time when an event happened for this user. Here,
    # all points when binocular eye tracking data arrived.
    timestamps = ocumen_recording.track_times(user, ocumen_io.TrackedData.GazeBinocular).copied()

    gaze_array = []

    # Query for data for the given times.
    for timestamp in timestamps:
        meta = ocumen_io.UserInTimeRange(user, 0, timestamp)
        gaze_array.append(ocumen_recording.latest_gaze_binocular_for(meta).value.value)

    # Construct a native array of gaze times to pass to filter.
    timestamps_native_array = ocumen_filters.Slicei64(data=timestamps.data, len=timestamps.len)

    # Construct a native array to be able to pass gaze data to filter.
    temp_gaze_array = (ocumen_io.BinocularGaze * len(gaze_array))(*gaze_array)
    gaze_native_array = ocumen_filters.SliceBinocularGaze(data=ctypes.cast(temp_gaze_array, ctypes.POINTER(ocumen_filters.BinocularGaze)), len=len(gaze_array))

    return timestamps_native_array, gaze_native_array

# Get binocular gaze data with timestamps from the Ocumen recording.
(timestamps_native_array, gaze_native_array) = get_binocular_data()

# Run a velocity filter and save the result, which will be used as input to the fixation dispersion angles filter.
# A velocity output native array needs to initialized before running a velocity filter.
velocity_array = (ocumen_filters.VelocityOutput * len(gaze_native_array))()
velocty_result = ocumen_filters.velocity_instantaneous(timestamps_native_array, gaze_native_array, velocity_array)
if velocty_result != ocumen_filters.FilterError.Ok:
    raise Exception(f"Failed to calculate velocities using VelocityInstantaneous with error {velocty_result}.")

# Run the fixation dispersion angles filter with configuration values and save the result.
fixation_array = (ocumen_filters.FixationOutput * len(gaze_native_array))()
config = ocumen_filters.DispersionAnglesFilterConfig(max_angle_for_fixations_deg=3, min_duration_for_fixation_us=100_000, max_outliers=1)
fixation_result = ocumen_filters.fixation_dispersion_angles(config, timestamps_native_array, velocity_array, fixation_array)
if fixation_result != ocumen_filters.FilterError.Ok:
    raise Exception(f"Failed to calculate fixations using FixationDispersionAngles with error {fixation_result}.")

# Print the output of the fixation dispersion angles filter.
for x, fixation in enumerate(fixation_array):
    left_string = ''
    right_string = ''

    if fixation.is_fixation_left.is_some():
        left_string = str(fixation.is_fixation_left.value)
    else:
        left_string = 'No Data'

    if fixation.is_fixation_right.is_some():
        right_string = str(fixation.is_fixation_right.value)
    else:
        right_string = 'No Data'


    print('Timestamp: ' + str(timestamps_native_array[x]) + ', Left: ' + left_string + ', Right: ' + right_string)