ในบทความนี้ เราจะมาสร้างแอปพลิเคชันที่สามารถตรวจจับ "คนที่อาจจะได้รับบาดเจ็บ" โดยเฉพาะคนที่หยุดเคลื่อนไหวหรือนอนนิ่งอยู่ใกล้ๆ กับเครื่องออกกำลังกายครับ เราจะใช้ Raspberry Pi AI Camera ซึ่งเป็นโมดูลกล้อง Edge AI สำหรับ Raspberry Pi ที่ขับเคลื่อนด้วยขุมพลังเซ็นเซอร์อัจฉริยะ IMX500 จาก Sony นั่นเอง!
View more (ดูวิดีโอสาธิตการทำงานของ AI)
อุปกรณ์ที่ต้องใช้ (Hardware Components)
ของที่ต้องเตรียมสำหรับโปรเจกต์นี้มีดังนี้ครับ:
บอร์ด Raspberry Pi (ในบทความนี้ใช้ Raspberry Pi 5)
microSD card ที่ลงระบบปฏิบัติการ Raspberry Pi OS (64-bit) เรียบร้อยแล้ว ข้อควรระวัง: แอปพลิเคชันนี้ทำงานได้เฉพาะบนเวอร์ชัน Bookworm เท่านั้นนะครับ!
💡 ทริคเพิ่มเติม: หากคุณกำลังมองหาสถานที่สั่งซื้อบอร์ด Raspberry Pi 5 หรือ AI Camera รวมไปถึงอุปกรณ์เสริมต่างๆ ในไทย สามารถเข้าไปดูสินค้าและสั่งซื้อได้ง่ายๆ ที่ https://openlink.co/globalbyte เลยครับ!
ขั้นตอนการทำโปรเจกต์ (Procedure)
1. เตรียมฮาร์ดแวร์ให้พร้อม ประกอบบอร์ด Raspberry Pi เสียบสายต่างๆ และเตรียมระบบปฏิบัติการ Bookworm ให้เรียบร้อย
2. ตั้งค่ากล้อง Raspberry Pi AI Camera ทำการตั้งค่าการสื่อสารระหว่างบอร์ดกับกล้อง AI Camera โดยสามารถทำตามขั้นตอนใน the official documentation (คู่มืออย่างเป็นทางการ) ได้เลยครับ
# Activate the virtual environment where the Application Module Library (modlib) is installed cd aitrios-rpi-application-module-library python3 examples/aicam/posenet.py
View more (ดู Code Snippet การทำงานทั้ง 4 ฟีเจอร์)
1. การใช้ PoseNet เป็น AI Model เราใช้โมเดล PoseNet (โมเดลประเมินโครงร่างมนุษย์) ในการจับการเคลื่อนไหวของแขนขา โค้ดการตั้งค่ากล้องและโมเดลมีดังนี้:
# Camera and model initialization function def initialize_camera(): device = AiCamera() model = Posenet() device.deploy(model) return device, Annotator() device, annotator = initialize_camera()
monitoring_area = {'x': 100, 'y': 100, 'width': 300, 'height': 300} # Function to calculate the center point of a person def get_center_point(keypoints): valid_points = [kp for kp in keypoints if kp['x'] > 0.0 and kp['y'] > 0.0] if not valid_points: return None sum_x = sum(kp['x'] for kp in valid_points) sum_y = sum(kp['y'] for kp in valid_points) return {'x': sum_x / len(valid_points), 'y': sum_y / len(valid_points)} # Function to check if a point is in the monitoring area def is_point_in_area(point, area): if not point: return False return (point['x'] >= area['x'] and point['x'] <= area['x'] + area['width'] and point['y'] >= area['y'] and point['y'] <= area['y'] + area['height'])
motion_settings = {'time_window': 6, 'movement_threshold': 10000, 'check_interval': 1} # Periodically check motion current_time = time.time() if current_time - person_tracker.last_check_time >= motion_setting['check_interval']: check_movement() person_tracker.last_check_time = current_time # Function to check motion def check_movement(): global person_tracker, alert_active # Do not check if no person is in the area if not person_tracker.in_area or len(person_tracker.positions) < 2: return current_time = time.time() # Get data within a certain time window cutoff_time = current_time - motion_settings['time_window'] valid_positions = [p for p in person_tracker.positions if p['timestamp'] >= cutoff_time] # Check if there is enough data if len(valid_positions) >= 2: # Calculate the amount of motion movement_amount = calculate_movement_specific_keypoints(valid_positions) person_tracker.last_movement = movement_amount # Trigger alert if below the threshold if movement_amount < motion_settings['movement_threshold']: alert_active = True print("No Motion Detected:", movement_amount, "<", motion_settings['movement_threshold']) else: # Clear alert if there is enough motion alert_active = False print("Motion Detected :", movement_amount, ">", motion_settings['movement_threshold']) MOTION_CHECK_KEYPOINTS = [9, 10, 13, 14] # leftWrist, rightWrist, leftKnee, rightKnee # Function to calculate motion for specific keypoints (both wrists and both knees) def calculate_movement_specific_keypoints(positions): # Get the oldest and newest positions oldest = positions[0] newest = positions[-1] # Get frame size w, h = oldest['frame_size'] total_distance = 0 valid_keypoint_count = 0 # Check only specific keypoints (both wrists and both knees) for keypoint_idx in MOTION_CHECK_KEYPOINTS: # Find corresponding keypoints in the old and new frames old_kp = next((kp for kp in oldest['keypoints'] if kp['id'] == keypoint_idx), None) new_kp = next((kp for kp in newest['keypoints'] if kp['id'] == keypoint_idx), None) # If keypoints are detected in both frames if old_kp and new_kp: # Convert to pixel coordinates x0 = int(old_kp['x'] * w) y0 = int(old_kp['y'] * h) x1 = int(new_kp['x'] * w) y1 = int(new_kp['y'] * h) # Calculate squared distance (Euclidean distance squared) distance = (x1 - x0)**2 + (y1 - y0)**2 total_distance += np.sqrt(distance) # Take square root for actual distance valid_keypoint_count += 1 # Return average distance if there are valid keypoints, otherwise 0 if valid_keypoint_count > 0: return total_distance / valid_keypoint_count else: return 0
4. การแสดงผลแบบ Web Application ผลการประมวลผลทั้งหมดจะถูกเก็บไว้ในตัวแปร frame_buffer แล้วส่งออกไปเป็นวิดีโอสตรีมแบบ MJPEG ให้เราดูผ่านเว็บบราวเซอร์ได้แบบเรียลไทม์เลยครับ
def generate_frames(): global frame_buffer while True: # Wait until the frame buffer is available if frame_buffer is not None: with frame_lock: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame_buffer + b'\r\n') # Short wait time time.sleep(0.03) # Video stream endpoint @app.route('/video_feed') def video_feed(): return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')