File size: 12,635 Bytes
59844f8
 
7eed7bb
 
6781da9
 
 
 
 
3d3f535
 
6781da9
 
 
7eed7bb
6781da9
 
7eed7bb
 
 
 
 
 
 
a2c1222
 
 
 
 
 
8dff46f
 
a2c1222
 
59844f8
 
a2c1222
 
 
 
 
 
 
7eed7bb
 
 
 
 
 
 
 
 
 
 
 
a2c1222
 
 
 
2bf9045
a2c1222
7eed7bb
 
 
 
 
 
 
59844f8
 
 
a2c1222
7eed7bb
 
 
 
 
 
 
 
 
 
59844f8
 
7eed7bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59844f8
 
7eed7bb
9457a82
 
 
7eed7bb
 
 
 
 
 
 
 
 
 
59844f8
 
7eed7bb
 
 
 
 
 
 
 
a2c1222
 
 
 
 
 
7eed7bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9457a82
 
56a64f9
7eed7bb
56a64f9
3d3f535
7eed7bb
 
 
 
 
6781da9
59844f8
 
7eed7bb
 
6781da9
7eed7bb
 
b184fdb
6781da9
7eed7bb
 
 
 
56a64f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7eed7bb
6781da9
 
7eed7bb
 
59844f8
 
 
 
 
 
 
 
 
 
 
 
a2c1222
56a64f9
 
 
 
 
 
7eed7bb
 
56a64f9
7eed7bb
56a64f9
 
 
 
 
 
 
7eed7bb
9dc0fd2
59844f8
 
9dc0fd2
7eed7bb
 
 
56a64f9
7eed7bb
 
 
 
 
9dc0fd2
7eed7bb
a2c1222
7eed7bb
 
6234614
 
 
 
 
 
 
 
 
7eed7bb
 
 
 
6234614
 
7eed7bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59844f8
 
7eed7bb
 
 
 
59844f8
 
7eed7bb
9457a82
 
 
 
 
 
 
 
 
7eed7bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59844f8
 
7eed7bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6781da9
35224be
 
 
 
 
 
7eed7bb
9dc0fd2
7eed7bb
56a64f9
9dc0fd2
7eed7bb
 
 
9dc0fd2
7eed7bb
 
 
59844f8
 
7eed7bb
59844f8
 
56a64f9
7eed7bb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import torch
import tensorflow as tf
import time
import os
import logging
from pathlib import Path
from typing import List, NamedTuple

import av
import cv2
import numpy as np
import streamlit as st
from streamlit_webrtc import WebRtcMode, webrtc_streamer

from utils.download import download_file
from utils.turn import get_ice_servers

from mtcnn import MTCNN  # Import MTCNN for face detection
from PIL import Image, ImageDraw  # Import PIL for image processing
from transformers import pipeline  # Import Hugging Face transformers pipeline

import requests
from io import BytesIO  # Import for handling byte streams


# CHANGE CODE BELOW HERE, USE TO REPLACE WITH YOUR WANTED ANALYSIS.
# Update below string to set display title of analysis

# Appropriate imports needed for analysis

# Initialize MTCNN for face detection
mtcnn = MTCNN()

# Initialize the Hugging Face pipeline for facial emotion detection
emotion_pipeline = pipeline("image-classification",
                            model="trpakov/vit-face-expression")


# Default title - "Facial Sentiment Analysis"

ANALYSIS_TITLE = "Facial Sentiment Analysis"

# CHANGE THE CONTENTS OF THIS FUNCTION, USE TO REPLACE WITH YOUR WANTED ANALYSIS.
#
#
# Function to analyze an input frame and generate an analyzed frame
# This function takes an input video frame, detects faces in it using MTCNN,
# then for each detected face, it analyzes the sentiment (emotion) using the analyze_sentiment function,
# draws a rectangle around the face, and overlays the detected emotion on the frame.
# It also records the time taken to process the frame and stores it in a global container.
# Constants for text and line size in the output image
TEXT_SIZE = 1
LINE_SIZE = 2


# Set analysis results in img_container and result queue for display
# img_container["input"] - holds the input frame contents - of type np.ndarray
# img_container["analyzed"] - holds the analyzed frame with any added annotations - of type np.ndarray
# img_container["analysis_time"] - holds how long the analysis has taken in miliseconds
# img_container["detections"] - holds the analysis metadata results
def analyze_frame(frame: np.ndarray):
    start_time = time.time()  # Start timing the analysis
    img_container["input"] = frame  # Store the input frame
    frame = frame.copy()  # Create a copy of the frame to modify

    results = mtcnn.detect_faces(frame)  # Detect faces in the frame
    for result in results:
        x, y, w, h = result["box"]  # Get the bounding box of the detected face
        face = frame[y: y + h, x: x + w]  # Extract the face from the frame
        # Analyze the sentiment of the face
        sentiment = analyze_sentiment(face)
        result["label"] = sentiment
        # Draw a rectangle around the face
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), LINE_SIZE)
        text_size = cv2.getTextSize(sentiment, cv2.FONT_HERSHEY_SIMPLEX, TEXT_SIZE, 2)[
            0
        ]
        text_x = x
        text_y = y - 10
        background_tl = (text_x, text_y - text_size[1])
        background_br = (text_x + text_size[0], text_y + 5)
        # Draw a black background for the text
        cv2.rectangle(frame, background_tl, background_br,
                      (0, 0, 0), cv2.FILLED)
        # Put the sentiment text on the image
        cv2.putText(
            frame,
            sentiment,
            (text_x, text_y),
            cv2.FONT_HERSHEY_SIMPLEX,
            TEXT_SIZE,
            (255, 255, 255),
            2,
        )

    end_time = time.time()  # End timing the analysis
    execution_time_ms = round(
        (end_time - start_time) * 1000, 2
    )  # Calculate execution time in milliseconds
    # Store the execution time
    img_container["analysis_time"] = execution_time_ms

    # store the detections
    img_container["detections"] = results

    img_container["analyzed"] = frame  # Store the analyzed frame

    return  # End of the function


# Function to analyze the sentiment (emotion) of a detected face
# This function converts the face from BGR to RGB format, then converts it to a PIL image,
# uses a pre-trained emotion detection model to get emotion predictions,
# and finally returns the most dominant emotion detected.
def analyze_sentiment(face):
    # Convert face to RGB format
    rgb_face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(rgb_face)  # Convert to PIL image
    results = emotion_pipeline(pil_image)  # Run emotion detection on the image
    dominant_emotion = max(results, key=lambda x: x["score"])[
        "label"
    ]  # Get the dominant emotion
    return dominant_emotion  # Return the detected emotion


#
#
# DO NOT TOUCH THE BELOW CODE (NOT NEEDED)
#
#

# Suppress FFmpeg logs
os.environ["FFMPEG_LOG_LEVEL"] = "quiet"

# Suppress TensorFlow or PyTorch progress bars

tf.get_logger().setLevel("ERROR")
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

# Suppress PyTorch logs

logging.getLogger().setLevel(logging.WARNING)
torch.set_num_threads(1)
logging.getLogger("torch").setLevel(logging.ERROR)

# Suppress Streamlit logs using the logging module
logging.getLogger("streamlit").setLevel(logging.ERROR)

# Container to hold image data and analysis results
img_container = {"input": None, "analyzed": None,
                 "analysis_time": None, "detections": None}

# Logger for debugging and information
logger = logging.getLogger(__name__)


# Callback function to process video frames
# This function is called for each video frame in the WebRTC stream.
# It converts the frame to a numpy array in RGB format, analyzes the frame,
# and returns the original frame.
def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame:
    # Convert frame to numpy array in RGB format
    img = frame.to_ndarray(format="rgb24")
    analyze_frame(img)  # Analyze the frame
    return frame  # Return the original frame


# Get ICE servers for WebRTC
ice_servers = get_ice_servers()

# Streamlit UI configuration
st.set_page_config(layout="wide")

# Custom CSS for the Streamlit page
st.markdown(
    """
    <style>
        .main {
            padding: 2rem;
        }
        h1, h2, h3 {
            font-family: 'Arial', sans-serif;
        }
        h1 {
            font-weight: 700;
            font-size: 2.5rem;
        }
        h2 {
            font-weight: 600;
            font-size: 2rem;
        }
        h3 {
            font-weight: 500;
            font-size: 1.5rem;
        }
    </style>
    """,
    unsafe_allow_html=True,
)

# Streamlit page title and subtitle
st.title("Computer Vision Playground")

# Add a link to the README file
st.markdown(
    """
    <div style="text-align: left;">
        <p>See the <a href="https://huggingface.co/spaces/eusholli/sentiment-analyzer/blob/main/README.md" 
        target="_blank">README</a> to learn how to use this code to help you start your computer vision exploration.</p>
    </div>
    """,
    unsafe_allow_html=True,
)

st.subheader(ANALYSIS_TITLE)

# Columns for input and output streams
col1, col2 = st.columns(2)

with col1:
    st.header("Input Stream")
    st.subheader("input")
    # WebRTC streamer to get video input from the webcam
    webrtc_ctx = webrtc_streamer(
        key="input-webcam",
        mode=WebRtcMode.SENDRECV,
        rtc_configuration=ice_servers,
        video_frame_callback=video_frame_callback,
        media_stream_constraints={"video": True, "audio": False},
        async_processing=True,
    )

    # File uploader for images
    st.subheader("Upload an Image")
    uploaded_file = st.file_uploader(
        "Choose an image...", type=["jpg", "jpeg", "png"])

    # Text input for image URL
    st.subheader("Or Enter Image URL")
    image_url = st.text_input("Image URL")

    # File uploader for videos
    st.subheader("Upload a Video")
    uploaded_video = st.file_uploader(
        "Choose a video...", type=["mp4", "avi", "mov", "mkv"]
    )

    # Text input for video URL
    st.subheader("Or Enter Video Download URL")
    video_url = st.text_input("Video URL")

# Streamlit footer
st.markdown(
    """
    <div style="text-align: center; margin-top: 2rem;">
        <p>If you want to set up your own computer vision playground see <a href="https://huggingface.co/spaces/eusholli/computer-vision-playground/blob/main/README.md" target="_blank">here</a>.</p>
    </div>
    """,
    unsafe_allow_html=True
)

# Function to initialize the analysis UI
# This function sets up the placeholders and UI elements in the analysis section.
# It creates placeholders for input and output frames, analysis time, and detected labels.


def analysis_init():
    global analysis_time, show_labels, labels_placeholder, input_placeholder, output_placeholder

    with col2:
        st.header("Analysis")
        st.subheader("Input Frame")
        input_placeholder = st.empty()  # Placeholder for input frame

        st.subheader("Output Frame")
        output_placeholder = st.empty()  # Placeholder for output frame
        analysis_time = st.empty()  # Placeholder for analysis time
        show_labels = st.checkbox(
            "Show the detected labels", value=True
        )  # Checkbox to show/hide labels
        labels_placeholder = st.empty()  # Placeholder for labels


# Function to publish frames and results to the Streamlit UI
# This function retrieves the latest frames and results from the global container and result queue,
# and updates the placeholders in the Streamlit UI with the current input frame, analyzed frame, analysis time, and detected labels.
def publish_frame():

    img = img_container["input"]
    if img is None:
        return
    input_placeholder.image(img, channels="RGB")  # Display the input frame

    analyzed = img_container["analyzed"]
    if analyzed is None:
        return
    # Display the analyzed frame
    output_placeholder.image(analyzed, channels="RGB")

    time = img_container["analysis_time"]
    if time is None:
        return
    # Display the analysis time
    analysis_time.text(f"Analysis Time: {time} ms")

    detections = img_container["detections"]
    if detections is None:
        return

    if show_labels:
        labels_placeholder.table(
            detections
        )  # Display labels if the checkbox is checked


# If the WebRTC streamer is playing, initialize and publish frames
if webrtc_ctx.state.playing:
    analysis_init()  # Initialize the analysis UI
    while True:
        publish_frame()  # Publish the frames and results
        time.sleep(0.1)  # Delay to control frame rate


# If an image is uploaded or a URL is provided, process the image
if uploaded_file is not None or image_url:
    analysis_init()  # Initialize the analysis UI

    if uploaded_file is not None:
        image = Image.open(uploaded_file)  # Open the uploaded image
        img = np.array(image.convert("RGB"))  # Convert the image to RGB format
    else:
        response = requests.get(image_url)  # Download the image from the URL
        # Open the downloaded image
        image = Image.open(BytesIO(response.content))
        img = np.array(image.convert("RGB"))  # Convert the image to RGB format

    analyze_frame(img)  # Analyze the image
    publish_frame()  # Publish the results


# Function to process video files
# This function reads frames from a video file, analyzes each frame for face detection and sentiment analysis,
# and updates the Streamlit UI with the current input frame, analyzed frame, and detected labels.
def process_video(video_path):
    cap = cv2.VideoCapture(video_path)  # Open the video file
    while cap.isOpened():
        ret, frame = cap.read()  # Read a frame from the video
        if not ret:
            break  # Exit the loop if no more frames are available

        # Convert the frame from BGR to RGB format
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Analyze the frame for face detection and sentiment analysis
        analyze_frame(rgb_frame)

        publish_frame()  # Publish the results

    cap.release()  # Release the video capture object


# If a video is uploaded or a URL is provided, process the video
if uploaded_video is not None or video_url:
    analysis_init()  # Initialize the analysis UI

    if uploaded_video is not None:
        video_path = uploaded_video.name  # Get the name of the uploaded video
        with open(video_path, "wb") as f:
            # Save the uploaded video to a file
            f.write(uploaded_video.getbuffer())
    else:
        # Download the video from the URL
        video_path = download_file(video_url)

    process_video(video_path)  # Process the video