Metrics and video recording

Observers are attached to pipeline steps to be notified when a method is called.

observers.py

For this use case we need to record gaze analysis metrics on ArUcoCamera.on_look call and to record sector screen image on ArUcoCamera.on_copy_background_into_scenes_frames signal.

import logging

from argaze.utils import UtilsFeatures

import cv2

class ScanPathAnalysisRecorder(UtilsFeatures.FileWriter):

    def __init__(self, **kwargs):

        super().__init__(**kwargs)

        self.header = "Timestamp (ms)", "Path duration (ms)", "Steps number", "Fixation durations average (ms)", "Explore/Exploit ratio", "K coefficient"

    def on_look(self, timestamp, frame, exception):
        """Log scan path metrics."""

        if frame.is_analysis_available():

            analysis = frame.analysis()

            data = (
                int(timestamp), 
                analysis['argaze.GazeAnalysis.Basic.ScanPathAnalyzer'].path_duration, 
                analysis['argaze.GazeAnalysis.Basic.ScanPathAnalyzer'].steps_number,
                analysis['argaze.GazeAnalysis.Basic.ScanPathAnalyzer'].step_fixation_durations_average,
                analysis['argaze.GazeAnalysis.ExploreExploitRatio.ScanPathAnalyzer'].explore_exploit_ratio,
                analysis['argaze.GazeAnalysis.KCoefficient.ScanPathAnalyzer'].K
            )

            self.write(data)

class AOIScanPathAnalysisRecorder(UtilsFeatures.FileWriter):

    def __init__(self, **kwargs):

        super().__init__(**kwargs)

        self.header = "Timestamp (ms)", "Path duration (ms)", "Steps number", "Fixation durations average (ms)", "Transition matrix probabilities", "Transition matrix density", "N-Grams count", "Stationary entropy", "Transition entropy"

    def on_look(self, timestamp, layer, exception):
        """Log aoi scan path metrics"""

        if layer.is_analysis_available():

            analysis = layer.analysis()

            data = (
                int(timestamp),
                analysis['argaze.GazeAnalysis.Basic.AOIScanPathAnalyzer'].path_duration, 
                analysis['argaze.GazeAnalysis.Basic.AOIScanPathAnalyzer'].steps_number,
                analysis['argaze.GazeAnalysis.Basic.AOIScanPathAnalyzer'].step_fixation_durations_average,
                analysis['argaze.GazeAnalysis.TransitionMatrix.AOIScanPathAnalyzer'].transition_matrix_probabilities,
                analysis['argaze.GazeAnalysis.TransitionMatrix.AOIScanPathAnalyzer'].transition_matrix_density, 
                analysis['argaze.GazeAnalysis.NGram.AOIScanPathAnalyzer'].ngrams_count,
                analysis['argaze.GazeAnalysis.Entropy.AOIScanPathAnalyzer'].stationary_entropy,
                analysis['argaze.GazeAnalysis.Entropy.AOIScanPathAnalyzer'].transition_entropy
            )

            self.write(data)

class VideoRecorder(UtilsFeatures.VideoWriter):

    def __init__(self, **kwargs):

        super().__init__(**kwargs)

    def on_copy_background_into_scenes_frames(self, timestamp, frame, exception):
        """Write frame image."""

        logging.debug('VideoRecorder.on_map')

        image = frame.image()

        # Write video timing
        cv2.rectangle(image, (0, 0), (550, 50), (63, 63, 63), -1)
        cv2.putText(image, f'Time: {int(timestamp)} ms', (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 1, cv2.LINE_AA)

        self.write(image)