avatarDaniel García

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

10157

Abstract

ss="hljs-params">self, frame: np.ndarray,face_frame=<span class="hljs-literal">False</span></span>) -> np.ndarray: names = <span class="hljs-literal">None</span> <span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> face_frame: face_crops = {index: {<span class="hljs-string">"name"</span>: <span class="hljs-string">""</span>, <span class="hljs-string">"tlbr"</span>: tlbr} <span class="hljs-keyword">for</span> index, tlbr <span class="hljs-keyword">in</span> <span class="hljs-built_in">enumerate</span>(self.detector(frame, return_tlbr=<span class="hljs-literal">True</span>))} <span class="hljs-keyword">for</span> key, value <span class="hljs-keyword">in</span> face_crops.items(): t, l, b, r = value[<span class="hljs-string">"tlbr"</span>] face_encoding = self.encode(frame[t:b, l:r]) distances = self.cosine_distance(face_encoding, <span class="hljs-built_in">list</span>(self.anchors.values())) <span class="hljs-keyword">if</span> np.<span class="hljs-built_in">max</span>(distances) > self.threshold: face_crops[key][<span class="hljs-string">"name"</span>] = <span class="hljs-built_in">list</span>(self.anchors.keys())[np.argmax(distances)] names = face_crops[key][<span class="hljs-string">"name"</span>] names = names.rsplit(<span class="hljs-string">''</span>)[<span class="hljs-number">0</span>] <span class="hljs-built_in">print</span>(names,np.<span class="hljs-built_in">max</span>(distances)) <span class="hljs-keyword">else</span>: face_encoding = self.encode(frame) distances = self.cosine_distance(face_encoding, <span class="hljs-built_in">list</span>(self.anchors.values())) <span class="hljs-keyword">if</span> np.<span class="hljs-built_in">max</span>(distances) > self.threshold: names = <span class="hljs-built_in">list</span>(self.anchors.keys())[np.argmax(distances)].rsplit(<span class="hljs-string">''</span>)[<span class="hljs-number">0</span>] <span class="hljs-built_in">print</span>(names,np.<span class="hljs-built_in">max</span>(distances))

    <span class="hljs-keyword">return</span> names</pre></div><p id="0f13"><b>Tracking algorithms

</b>People tracking involves monitoring and following individuals as they move through a scene or across frames in a video. DeepSORT (Deep Learning for Single Object Tracking) and SORT (Simple Online and Realtime Tracking) are examples of tracking algorithms that leverage deep learning for improved accuracy and robustness.</p><h1 id="192f">Methodology</h1><p id="757e">The system works as follows:</p><ol><li>The application grabs a new frame from the camera.</li><li>The object detection system processes frames and extracts people from the scene. For each person, a sub-region of the frame is cropped for detailed processing.</li><li>Each person’s region is processed by a face detector algorithm to extract a person’s face from the body.</li><li>Each face is scanned by a face recognition system that compares the current face with faces stored in the database. If a face is recognized, the name is returned, “undefined” is returned otherwise.</li></ol><figure id="701c"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*x9KkLsHRvPeX-M5_T5fC4g.png"><figcaption></figcaption></figure><p id="67c4">Associating recognition with a unique track ID is a common and effective approach in people recognition and tracking systems, especially in scenarios where individuals may move in and out of view or temporarily obstruct their faces. This method ensures that <b>even if a person’s face is temporarily obscured or no longer visible in a given frame, the system can still recognize them based on their assigned track ID</b>. Here’s how it works:</p><ol><li><b>Assignment of a Track ID</b>: The system assigns a unique track ID to the detected person. This ID is associated with their facial features and other relevant information.</li><li><b>Continued Tracking</b>: As the video stream or frames progress, the tracking algorithm continuously monitors the movements and appearances of individuals. Even if a person’s face becomes temporarily obscured or is no longer visible, the system still tracks their movement based on their unique track ID.</li><li><b>Re-Recognition</b>: When the person’s face becomes invisible, the system can re-recognize them by matching their current track ID. This allows for the seamless tracking of individuals across different frames, even in challenging scenarios.</li></ol><div id="3888"><pre><span class="hljs-comment">#name is the output of face recognition calling</span> <span class="hljs-keyword">if</span> <span class="hljs-built_in">bool</span>(name): to_remove = [] <span class="hljs-keyword">for</span> key, value <span class="hljs-keyword">in</span> id_face_dictionary.items(): <span class="hljs-keyword">if</span> value == name: <span class="hljs-keyword">if</span> <span class="hljs-built_in">id</span> != key: to_remove.append(key) loggers[<span class="hljs-string">"recognition"</span>].info(<span class="hljs-string">f"<span class="hljs-subst">{name}</span> already in dict. ID: <span class="hljs-subst">{<span class="hljs-built_in">id</span>}</span>"</span>) <span class="hljs-keyword">for</span> k <span class="hljs-keyword">in</span> to_remove: id_face_dictionary.pop(k)

<span class="hljs-comment">#once deleted, we add new key</span> id_face_dictionary[<span class="hljs-built_in">id</span>] = name loggers[<span class="hljs-string">"recognition"</span>].info(<span class="hljs-string">f"Added <span class="hljs-subst">{name}</span> to key <span class="hljs-subst">{<span class="hljs-built_in">id</span>}</span>"</span>)</pre></div><p id="fe5f">By using track IDs, the system m<b>aintains a consistent identity for each individual throughout the video or sequence of frames</b>, ensuring that recognition can be maintained even when the face is not visible at all times. This approach is valuable in various applications, including video surveillance, where continuous tracking and recognition are essential for security and analysis purposes.</p><figure id="4ef2"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*kCbMvCzasr6Iqelumpc9VQ.gif"><figcaption></figcaption></figure><h1 id="fa64">Optimizing Real-Time Object Detection and Tracking</h1><p id="7b9a">Real-time object detection and tracking are critical components in various applications such as surveillance, autonomous driving, and interactive systems. However, performing detection and tracking within the stringent time constraint of 30 milliseconds (ms) per frame, equivalent to the frame rate of 30 frames per second (fps), presents significant computational challenges. To overcome this, we propose a multi-threaded architecture that divides processing into three independent threads: <i>Core, Detector, and Recognizer.</i> Each is designed to operate concurrently, reducing processing latency and resource contention.</p><p id="0544"><b>Core Thread: The Application Manager</b></p><p id="6def">The Core thread acts as the central coordinator. Its primary functions are to:</p><ol><li><b>Acquire video frames</b> directly from the camera input.</li><li><b>Dispatch these frames to the Detector</b> thread without delay.</li><li><b>Collect processed data</b> from the Detector and Recognizer threads.</li><li><b>Display the resulting frames</b> with detected objects and recognized entities.</li></ol><p id="0fd5">This thread ensures that the most recent frame is always the one being processed. If the Detector thread takes longer than <i>30ms</i> to process a frame, the Core thread skips ahead, avoiding backlogs and ensuring real-time performance without queuing frames.</p><div id="f23d"><pre><span class="hljs-keyword">while</span> vid.isOpened(): ret, frame = vid.read() <span class="hljs-comment"># out = None</span> <span class="hljs-keyword">if</span> ret: <span class="hljs-keyword">if</span> queuepulls == <span class="hljs-number">1</span>: timer2 = time.time() <span class="hljs-comment"># Capture frame-by-frame</span> <span class="hljs-comment"># if the input queue is empty, give the current frame to</span> <span class="hljs-comment"># classify</span> <span class="hljs-keyword">if</span> inputQueue.empty(): inputQueue.put(frame) <span class="hljs-keyword">else</span>: loggers[<span class="hljs-string">"general"</span>].debug(<span class="hljs-string">"Skipping frame from face detection"</span>)

  <span class="hljs-comment"># if the output queue *is not* empty, grab the detections</span>
  <span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> outputQueue.empty():
      out = outputQueue.get()
  <span class="hljs-keyword">if</span> out <span class="hljs-keyword">is</span> <span class="hljs-keyword">not</span> <span class="hljs-literal">None</span>:
      queuepulls += <span class="hljs-number">1</span>
      <span class="hljs-keyword">for</span> output <span class="hljs-keyword">in</span> out:
          bbox_left = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">0</span>])
          bbox_top = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">1</span>])
          bbox_w = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">2</span>]) 
          bbox_h = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">3</span>])
          <span class="hljs-keyword">if</span> output.shape[<span class="hljs-number">0</span>] == <span class="hljs-number">7</span>:
              <span class="hljs-built_in">id</span> = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">4</span>])
              prev_id = <span class="hljs-built_in">id</span>
          <span class="hljs

Options

-keyword">else</span>: <span class="hljs-built_in">id</span> =prev_id <span class="hljs-keyword">if</span> <span class="hljs-built_in">id</span> <span class="hljs-keyword">in</span> id_face_dictionary: name = id_face_dictionary[<span class="hljs-built_in">id</span>] <span class="hljs-keyword">else</span>: name = <span class="hljs-string">"undefined"</span> color = (<span class="hljs-number">255</span>,<span class="hljs-number">0</span>,<span class="hljs-number">0</span>) <span class="hljs-comment"># Use your custom color</span> drawPerson(frame,bbox_left,bbox_top,bbox_w,bbox_h,name,color)
cv2.imshow(<span class="hljs-string">'frame'</span>, frame) <span class="hljs-keyword">if</span> cv2.waitKey(<span class="hljs-number">1</span>) & <span class="hljs-number">0xFF</span> == <span class="hljs-built_in">ord</span>(<span class="hljs-string">'q'</span>): vid.release() cv2.destroyAllWindows() p.kill() pRec.kill() <span class="hljs-keyword">break</span></pre></div><p id="5e49"><b>Detector Thread: The Object Detection Engine</b></p><p id="4e82">Running in an infinite loop, the Detector thread is tasked with:</p><ol><li>Executing object detection algorithms on the current frame.</li><li>Sending the detection results back to the Core thread.</li><li>Forwarding information regarding face detections to the Recognizer thread.</li></ol><p id="dd1d">The Detector is designed for speed and accuracy, utilizing optimized algorithms capable of identifying various objects within the 30ms time frame.</p><div id="83d2"><pre><span class="hljs-keyword">def</span> <span class="hljs-title function_">object_detection_</span>(<span class="hljs-params">model_path,confidence,inputQueue,outputQueue,recognitionQueue</span>): <span class="hljs-keyword">global</span> id_face_dictionary yolov8_detector = YOLO(model_path) device = torch.device(<span class="hljs-string">'cuda'</span> <span class="hljs-keyword">if</span> torch.cuda.is_available() <span class="hljs-keyword">else</span> <span class="hljs-string">'cpu'</span>) yolov8_detector.to(device) loggers[<span class="hljs-string">'tracking'</span>].info(<span class="hljs-string">"Detection initialized"</span>) <span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>: <span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> inputQueue.empty(): frame = inputQueue.get() result = yolov8_detector.track(frame,verbose=<span class="hljs-literal">False</span>,conf=confidence,persist=<span class="hljs-literal">True</span>)[<span class="hljs-number">0</span>] <span class="hljs-comment">#Verbose False to avoid yolov8 messages</span> data = result.cpu().numpy().boxes.data outputQueue.put(data) <span class="hljs-keyword">if</span> recognitionQueue.empty(): recognitionQueue.put((data,frame))</pre></div><p id="ca63"><b>Recognizer Thread: The Identification Specialist</b></p><p id="1265">Parallel to the Detector, the Recognizer thread is responsible for:</p><ol><li>Performing face recognition tasks on detected facial data.</li><li>Relaying recognition results back to the Core thread.</li></ol><p id="8909">It also operates in an infinite loop, checking for new data from the Detector and processing it immediately to identify individuals or features in the video frame.</p><div id="9a5a"><pre><span class="hljs-keyword">def</span> <span class="hljs-title function_">recognize_algorithm</span>(<span class="hljs-params">model_path,recognitionQueue,id_face_dictionary,confidence</span>): detector = face_detector.FaceDetection() recog = face_recognition.FaceNet( detector=detector, threshold=confidence, onnx_model_path = model_path) loggers[<span class="hljs-string">'recognition'</span>].info(<span class="hljs-string">"Recognition initialized"</span>) <span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>: <span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> recognitionQueue.empty(): out = recognitionQueue.get() frame = out[<span class="hljs-number">1</span>] boxes = out[<span class="hljs-number">0</span>] <span class="hljs-keyword">for</span> output <span class="hljs-keyword">in</span> boxes: bbox_left = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">0</span>]) bbox_top = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">1</span>]) bbox_w = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">2</span>]) bbox_h = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">3</span>]) <span class="hljs-built_in">id</span> = <span class="hljs-built_in">int</span>(output[<span class="hljs-number">4</span>]) <span class="hljs-keyword">if</span> bbox_w > <span class="hljs-number">0</span> <span class="hljs-keyword">and</span> bbox_h > <span class="hljs-number">0</span>: person_frame = frame[bbox_top:bbox_h,bbox_left:bbox_w,:] start_time = time.time()

                name = recog(frame=person_frame,face_frame=<span class="hljs-literal">True</span>)
                loggers[<span class="hljs-string">'recognition'</span>].debug(<span class="hljs-string">f"RECOGNITION - Inference time: <span class="hljs-subst">{<span class="hljs-built_in">round</span>(time.time()-start_time,<span class="hljs-number">2</span>)}</span>"</span>)

                <span class="hljs-keyword">if</span> <span class="hljs-built_in">bool</span>(name):
                    to_remove = []
                    <span class="hljs-keyword">for</span> key, value <span class="hljs-keyword">in</span> id_face_dictionary.items():
                        <span class="hljs-keyword">if</span> value == name:
                            <span class="hljs-keyword">if</span> <span class="hljs-built_in">id</span> != key:
                                    to_remove.append(key)
                            loggers[<span class="hljs-string">"recognition"</span>].info(<span class="hljs-string">f"<span class="hljs-subst">{name}</span> already in dict. ID: <span class="hljs-subst">{<span class="hljs-built_in">id</span>}</span>"</span>)
                    <span class="hljs-keyword">for</span> k <span class="hljs-keyword">in</span> to_remove:
                        id_face_dictionary.pop(k)

                    <span class="hljs-comment">#once deleted, we add new key</span>
                    id_face_dictionary[<span class="hljs-built_in">id</span>] = name
                    loggers[<span class="hljs-string">"recognition"</span>].info(<span class="hljs-string">f"Added <span class="hljs-subst">{name}</span> to key <span class="hljs-subst">{<span class="hljs-built_in">id</span>}</span>"</span>)</pre></div><p id="b154"><b>Inter-Thread Communication</b></p><p id="e8c9">Inter-thread communication is a cornerstone of this architecture. It allows for the asynchronous processing of frames, where each thread independently checks for new frames and processes them. This design ensures that the system is always working on the latest available frame, thus maintaining real-time performance without lag. Each thread communicates via <a href="https://docs.python.org/es/3/library/queue.html">Python Queues</a>, with synchronization mechanisms in place to prevent race conditions and data corruption.</p><figure id="3ee0"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*scTx75EB3M_hpihIRhrlYg.png"><figcaption></figcaption></figure><div id="770c"><pre>  inputQueue = Queue(maxsize=<span class="hljs-number">1</span>)

outputQueue = Queue(maxsize=<span class="hljs-number">1</span>) recognitionQueue = Queue() p = Process(target=object_detection_, args=(model_path,detection_confidence,inputQueue, outputQueue,recognitionQueue,)) p.daemon = <span class="hljs-literal">True</span> p.start()

pRec = Process(target=recognize_algorithm, args=(recognition_model_path,recognitionQueue,id_face_dictionary,recognition_confidence,)) pRec.daemon = <span class="hljs-literal">True</span> pRec.start()</pre></div><h1 id="e6ed">System specifications</h1><p id="1e5e">The application is designed to run seamlessly on Python, making it accessible to a wide range of users across different operating systems. Its cross-platform compatibility ensures that it can be utilized on popular operating systems such as Windows, macOS, and various Linux distributions. While the application is versatile in terms of OS support, it’s important to note that for optimal real-time performance, a GPU (Graphics Processing Unit) is highly recommended, especially when working with resource-intensive deep learning models. A GPU can significantly accelerate the execution of these models, enabling faster processing and enhancing the application’s ability to perform real-time tasks efficiently.</p><figure id="3a6e"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*1hX7TqaSHSNuD8BFChpwvw.png"><figcaption>LatinX in AI (LXAI) logo</figcaption></figure><p id="0946"><b>Do you identify as Latinx and are working in artificial intelligence or know someone who is Latinx and is working in artificial intelligence?</b></p><ul><li>Get listed on our directory and become a member of our member’s forum: <a href="https://forum.latinxinai.org/">https://forum.latinxinai.org/</a></li><li>Become a writer for the LatinX in AI Publication by emailing us at <a href="mailto:[email protected]">[email protected]</a></li><li>Learn more on our website: <a href="http://www.latinxinai.org/">http://www.latinxinai.org/</a></li></ul><p id="fd3b"><b>Don’t forget to hit the 👏 below to help support our community — it means a lot!</b></p></article></body>

How to do real time people tracking and recognition using DL

In an era of rapidly evolving technology, the utilization of deep learning for people recognition and tracking has revolutionized surveillance and security systems.

Deep Learning fundamentals

To understand people recognition and tracking using deep learning, it’s essential to grasp the basics of deep learning. Deep learning is a subset of machine learning that employs artificial neural networks, which can automatically learn patterns and representations from large amounts of data. Convolutional Neural Networks (CNNs) and Recurrent Neural Networks (RNNs) are the backbone of many deep learning models for this purpose.

Object Detection Object detection is a crucial step in recognizing people within an image or video stream. Deep learning models, such as Single Shot MultiBox Detector (SSD) and You Only Look Once (YOLO), are at the forefront of real-time object detection. These models can locate and classify individuals in an image or video frame with remarkable accuracy.

Face Recognition Face recognition is a subdomain of people recognition that has witnessed significant advancements due to deep learning. Deep neural networks, including FaceNet and VGGFace, have made it possible to accurately identify individuals based on facial features. These systems are widely used for access control, law enforcement, and authentication purposes.

class FaceNet:
    def __init__(
        self, 
        detector: object,
        onnx_model_path: str = "assets/models/facenet512_weights.onnx", 
        anchors: typing.Union[dict] = data,
        force_cpu: bool = False,
        threshold: float = 0.5,
        color: tuple = (255, 255, 255),
        thickness: int = 2,
        ) -> None:
        if not stow.exists(onnx_model_path):
            raise Exception(f"Model doesn't exists in {onnx_model_path}")

        self.detector = detector
        self.threshold = threshold
        self.color = color
        self.thickness = thickness

        providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']

        providers = providers if ort.get_device() == "GPU" and not force_cpu else providers[::-1]

        self.ort_sess = ort.InferenceSession(onnx_model_path, providers=providers)

        self.input_shape = self.ort_sess._inputs_meta[0].shape[1:3]
        
        self.anchors = self.load_anchors(anchors) if isinstance(anchors, str) else anchors

    def normalize(self, img: np.ndarray) -> np.ndarray:
        mean, std = img.mean(), img.std()
        return (img - mean) / std

    def l2_normalize(self, x: np.ndarray, axis: int = -1, epsilon: float = 1e-10) -> np.ndarray:
        output = x / np.sqrt(np.maximum(np.sum(np.square(x), axis=axis, keepdims=True), epsilon))
        return output

    def detect_save_faces(self, image: np.ndarray, output_dir: str = "faces"):
        face_crops = [image[t:b, l:r] for t, l, b, r in self.detector(image, return_tlbr=True)]
        # face_crops = [face for f in self.detector(image,return_tlbr=True)]
        if face_crops == []: 
            return False

        stow.mkdir(output_dir)

        for index, crop in enumerate(face_crops):
            output_path = stow.join(output_dir, f"face_{str(index)}.png")
            cv2.imwrite(output_path, crop)
            print("Crop saved to:", output_path)

        self.anchors = self.load_anchors(output_dir)
        
        return True

    def load_anchors(self, faces_path: str):
        anchors = {}
        if not stow.exists(faces_path):
            return {}

        for face_path in stow.ls(faces_path):
            anchors[stow.basename(face_path)] = self.encode(cv2.imread(face_path.path))

        return anchors

    def encode(self, face_image: np.ndarray) -> np.ndarray:
        face = self.normalize(face_image)
        face = cv2.resize(face, self.input_shape).astype(np.float32)

        encode = self.ort_sess.run(None, {self.ort_sess._inputs_meta[0].name: np.expand_dims(face, axis=0)})[0][0]
        normalized_encode = self.l2_normalize(encode)

        return normalized_encode
    
    def l1_distance(self, a: np.ndarray, b: typing.Union[np.ndarray, list]) -> np.ndarray:
        if isinstance(a, list):
            a = np.array(a)

        if isinstance(b, list):
            b = np.array(b)

        return np.sum(np.abs(a - b))
    
    def cosine_distance(self, a: np.ndarray, b: typing.Union[np.ndarray, list]) -> np.ndarray:
        if isinstance(a, list):
            a = np.array(a)

        if isinstance(b, list):
            b = np.array(b)

        return np.dot(a, b.T) / (np.linalg.norm(a) * np.linalg.norm(b))

    def draw(self, image: np.ndarray, face_crops: dict):
        for value in face_crops.values():
            t, l, b, r = value["tlbr"]
            cv2.rectangle(image, (l, t), (r, b), self.color, self.thickness)
            name = stow.name(value['name'])
            name = name.rsplit('_')[0]
            cv2.putText(image, name, (l, t - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, self.color, self.thickness)

        return image

    def __call__(self, frame: np.ndarray,face_frame=False) -> np.ndarray:
        names = None
        if not face_frame:
            face_crops = {index: {"name": "", "tlbr": tlbr} for index, tlbr in enumerate(self.detector(frame, return_tlbr=True))}
            for key, value in face_crops.items():
                t, l, b, r = value["tlbr"]
                face_encoding = self.encode(frame[t:b, l:r])
                distances = self.cosine_distance(face_encoding, list(self.anchors.values()))
                if np.max(distances) > self.threshold:
                    face_crops[key]["name"] = list(self.anchors.keys())[np.argmax(distances)]
                    names = face_crops[key]["name"]
                    names = names.rsplit('_')[0]
                    print(names,np.max(distances))
        else:
            face_encoding = self.encode(frame)
            distances = self.cosine_distance(face_encoding, list(self.anchors.values()))
            if np.max(distances) > self.threshold:
                names = list(self.anchors.keys())[np.argmax(distances)].rsplit('_')[0]
                print(names,np.max(distances))

        return names

Tracking algorithms People tracking involves monitoring and following individuals as they move through a scene or across frames in a video. DeepSORT (Deep Learning for Single Object Tracking) and SORT (Simple Online and Realtime Tracking) are examples of tracking algorithms that leverage deep learning for improved accuracy and robustness.

Methodology

The system works as follows:

  1. The application grabs a new frame from the camera.
  2. The object detection system processes frames and extracts people from the scene. For each person, a sub-region of the frame is cropped for detailed processing.
  3. Each person’s region is processed by a face detector algorithm to extract a person’s face from the body.
  4. Each face is scanned by a face recognition system that compares the current face with faces stored in the database. If a face is recognized, the name is returned, “undefined” is returned otherwise.

Associating recognition with a unique track ID is a common and effective approach in people recognition and tracking systems, especially in scenarios where individuals may move in and out of view or temporarily obstruct their faces. This method ensures that even if a person’s face is temporarily obscured or no longer visible in a given frame, the system can still recognize them based on their assigned track ID. Here’s how it works:

  1. Assignment of a Track ID: The system assigns a unique track ID to the detected person. This ID is associated with their facial features and other relevant information.
  2. Continued Tracking: As the video stream or frames progress, the tracking algorithm continuously monitors the movements and appearances of individuals. Even if a person’s face becomes temporarily obscured or is no longer visible, the system still tracks their movement based on their unique track ID.
  3. Re-Recognition: When the person’s face becomes invisible, the system can re-recognize them by matching their current track ID. This allows for the seamless tracking of individuals across different frames, even in challenging scenarios.
#**name is the output of face recognition calling**
if bool(name):
  to_remove = []
  for key, value in id_face_dictionary.items():
      if value == name:
          if id != key:
                  to_remove.append(key)
          loggers["recognition"].info(f"{name} already in dict. ID: {id}")
  for k in to_remove:
      id_face_dictionary.pop(k)

  #once deleted, we add new key
  id_face_dictionary[id] = name
  loggers["recognition"].info(f"Added {name} to key {id}")

By using track IDs, the system maintains a consistent identity for each individual throughout the video or sequence of frames, ensuring that recognition can be maintained even when the face is not visible at all times. This approach is valuable in various applications, including video surveillance, where continuous tracking and recognition are essential for security and analysis purposes.

Optimizing Real-Time Object Detection and Tracking

Real-time object detection and tracking are critical components in various applications such as surveillance, autonomous driving, and interactive systems. However, performing detection and tracking within the stringent time constraint of 30 milliseconds (ms) per frame, equivalent to the frame rate of 30 frames per second (fps), presents significant computational challenges. To overcome this, we propose a multi-threaded architecture that divides processing into three independent threads: Core, Detector, and Recognizer. Each is designed to operate concurrently, reducing processing latency and resource contention.

Core Thread: The Application Manager

The Core thread acts as the central coordinator. Its primary functions are to:

  1. Acquire video frames directly from the camera input.
  2. Dispatch these frames to the Detector thread without delay.
  3. Collect processed data from the Detector and Recognizer threads.
  4. Display the resulting frames with detected objects and recognized entities.

This thread ensures that the most recent frame is always the one being processed. If the Detector thread takes longer than 30ms to process a frame, the Core thread skips ahead, avoiding backlogs and ensuring real-time performance without queuing frames.

while vid.isOpened():
  ret, frame = vid.read()
  # out = None
  if ret:
      if queuepulls == 1:
          timer2 = time.time()
      # Capture frame-by-frame
      # if the input queue *is* empty, give the current frame to
      # classify
      if inputQueue.empty():
          inputQueue.put(frame)
      else:
          loggers["general"].debug("Skipping frame from face detection")
  
           
      # if the output queue *is not* empty, grab the detections
      if not outputQueue.empty():
          out = outputQueue.get()
      if out is not None:
          queuepulls += 1
          for output in out:
              bbox_left = int(output[0])
              bbox_top = int(output[1])
              bbox_w = int(output[2]) 
              bbox_h = int(output[3])
              if output.shape[0] == 7:
                  id = int(output[4])
                  prev_id = id
              else:
                  id =prev_id
              if id in id_face_dictionary:
                  name = id_face_dictionary[id]
              else:
                  name = "undefined"
              color = (255,0,0) # Use your custom color
              drawPerson(frame,bbox_left,bbox_top,bbox_w,bbox_h,name,color)  
      cv2.imshow('frame', frame)
      if cv2.waitKey(1) & 0xFF == ord('q'):
          vid.release()
          cv2.destroyAllWindows()
          p.kill()
          pRec.kill()
          break

Detector Thread: The Object Detection Engine

Running in an infinite loop, the Detector thread is tasked with:

  1. Executing object detection algorithms on the current frame.
  2. Sending the detection results back to the Core thread.
  3. Forwarding information regarding face detections to the Recognizer thread.

The Detector is designed for speed and accuracy, utilizing optimized algorithms capable of identifying various objects within the 30ms time frame.

def object_detection_(model_path,confidence,inputQueue,outputQueue,recognitionQueue):
    global id_face_dictionary
    yolov8_detector = YOLO(model_path)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    yolov8_detector.to(device)
    loggers['tracking'].info("Detection initialized")
    while True:
        if not inputQueue.empty():
            frame = inputQueue.get()
            result = yolov8_detector.track(frame,verbose=False,conf=confidence,persist=True)[0] #Verbose False to avoid yolov8 messages
            data = result.cpu().numpy().boxes.data
            outputQueue.put(data)
            if recognitionQueue.empty():
                recognitionQueue.put((data,frame))

Recognizer Thread: The Identification Specialist

Parallel to the Detector, the Recognizer thread is responsible for:

  1. Performing face recognition tasks on detected facial data.
  2. Relaying recognition results back to the Core thread.

It also operates in an infinite loop, checking for new data from the Detector and processing it immediately to identify individuals or features in the video frame.

def recognize_algorithm(model_path,recognitionQueue,id_face_dictionary,confidence):
    detector = face_detector.FaceDetection()
    recog = face_recognition.FaceNet(
        detector=detector,
        threshold=confidence,
        onnx_model_path = model_path)
    loggers['recognition'].info("Recognition initialized")
    while True:
        if not recognitionQueue.empty():
            out = recognitionQueue.get()
            frame = out[1]
            boxes = out[0]
            for output in boxes: 
                bbox_left = int(output[0])
                bbox_top = int(output[1])
                bbox_w = int(output[2]) 
                bbox_h = int(output[3])
                id = int(output[4])
                if bbox_w > 0 and bbox_h > 0:
                    person_frame = frame[bbox_top:bbox_h,bbox_left:bbox_w,:]
                    start_time = time.time()

                    name = recog(frame=person_frame,face_frame=True)
                    loggers['recognition'].debug(f"RECOGNITION - Inference time: {round(time.time()-start_time,2)}")

                    if bool(name):
                        to_remove = []
                        for key, value in id_face_dictionary.items():
                            if value == name:
                                if id != key:
                                        to_remove.append(key)
                                loggers["recognition"].info(f"{name} already in dict. ID: {id}")
                        for k in to_remove:
                            id_face_dictionary.pop(k)

                        #once deleted, we add new key
                        id_face_dictionary[id] = name
                        loggers["recognition"].info(f"Added {name} to key {id}")

Inter-Thread Communication

Inter-thread communication is a cornerstone of this architecture. It allows for the asynchronous processing of frames, where each thread independently checks for new frames and processes them. This design ensures that the system is always working on the latest available frame, thus maintaining real-time performance without lag. Each thread communicates via Python Queues, with synchronization mechanisms in place to prevent race conditions and data corruption.

  inputQueue = Queue(maxsize=1)
  outputQueue = Queue(maxsize=1)
  recognitionQueue = Queue()
  p = Process(target=object_detection_, args=(model_path,detection_confidence,inputQueue, outputQueue,recognitionQueue,))
  p.daemon = True
  p.start()

  pRec = Process(target=recognize_algorithm, args=(recognition_model_path,recognitionQueue,id_face_dictionary,recognition_confidence,))
  pRec.daemon = True
  pRec.start()

System specifications

The application is designed to run seamlessly on Python, making it accessible to a wide range of users across different operating systems. Its cross-platform compatibility ensures that it can be utilized on popular operating systems such as Windows, macOS, and various Linux distributions. While the application is versatile in terms of OS support, it’s important to note that for optimal real-time performance, a GPU (Graphics Processing Unit) is highly recommended, especially when working with resource-intensive deep learning models. A GPU can significantly accelerate the execution of these models, enabling faster processing and enhancing the application’s ability to perform real-time tasks efficiently.

LatinX in AI (LXAI) logo

Do you identify as Latinx and are working in artificial intelligence or know someone who is Latinx and is working in artificial intelligence?

Don’t forget to hit the 👏 below to help support our community — it means a lot!

Deep Learning
Data Science
Programming
Artificial Intelligence
Machine Learning
Recommended from ReadMedium