281 lines
8.7 KiB
Python
281 lines
8.7 KiB
Python
|
|
"""
|
||
|
|
Module V: Box detection and classification for RMK-1 manipulator.
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python3 detect_box.py --target 1
|
||
|
|
|
||
|
|
Target classes (per competition spec):
|
||
|
|
1 - hammer (молоток)
|
||
|
|
2 - wrench (гаечный ключ)
|
||
|
|
3 - pliers (пассатижи)
|
||
|
|
|
||
|
|
This script:
|
||
|
|
1. Subscribes to camera topic /RMC1/arm95/camera_gripper/image_color
|
||
|
|
2. Classifies visible boxes using trained YOLOv8 model
|
||
|
|
3. Finds the target box and computes its position relative to Base_link
|
||
|
|
4. Publishes coordinates for manipulator control
|
||
|
|
"""
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
import cv2
|
||
|
|
import numpy as np
|
||
|
|
from ultralytics import YOLO
|
||
|
|
|
||
|
|
# Class mapping: YOLO class index -> competition ID
|
||
|
|
# YOLO trains alphabetically: hammer=0, pliers=1, wrench=2
|
||
|
|
YOLO_TO_COMPETITION = {
|
||
|
|
0: 1, # hammer -> 1
|
||
|
|
1: 3, # pliers -> 3
|
||
|
|
2: 2, # wrench -> 2
|
||
|
|
}
|
||
|
|
|
||
|
|
COMPETITION_TO_YOLO = {v: k for k, v in YOLO_TO_COMPETITION.items()}
|
||
|
|
|
||
|
|
CLASS_NAMES = {
|
||
|
|
1: "hammer (молоток)",
|
||
|
|
2: "wrench (гаечный ключ)",
|
||
|
|
3: "pliers (пассатижи)",
|
||
|
|
}
|
||
|
|
|
||
|
|
# YOLO class names (alphabetical order)
|
||
|
|
YOLO_CLASS_NAMES = {0: "hammer", 1: "pliers", 2: "wrench"}
|
||
|
|
|
||
|
|
|
||
|
|
def load_model(model_path=None):
|
||
|
|
if model_path is None:
|
||
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
|
|
model_path = os.path.join(base_dir, "best.pt")
|
||
|
|
return YOLO(model_path)
|
||
|
|
|
||
|
|
|
||
|
|
def classify_image(model, image):
|
||
|
|
"""
|
||
|
|
Classify an image and return (class_id, confidence).
|
||
|
|
class_id is competition ID (1=hammer, 2=wrench, 3=pliers).
|
||
|
|
"""
|
||
|
|
results = model(image, imgsz=224, verbose=False)
|
||
|
|
if results and results[0].probs is not None:
|
||
|
|
probs = results[0].probs
|
||
|
|
yolo_class = probs.top1
|
||
|
|
confidence = probs.top1conf.item()
|
||
|
|
comp_class = YOLO_TO_COMPETITION.get(yolo_class, -1)
|
||
|
|
return comp_class, confidence
|
||
|
|
return -1, 0.0
|
||
|
|
|
||
|
|
|
||
|
|
def detect_boxes_in_frame(image, target_id):
|
||
|
|
"""
|
||
|
|
Given a top-down camera frame, segment individual boxes and classify each.
|
||
|
|
Returns list of (competition_class_id, confidence, center_x, center_y, bbox).
|
||
|
|
"""
|
||
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||
|
|
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
|
||
|
|
|
||
|
|
# Adaptive threshold to find box regions
|
||
|
|
thresh = cv2.adaptiveThreshold(
|
||
|
|
blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
||
|
|
cv2.THRESH_BINARY_INV, 21, 5
|
||
|
|
)
|
||
|
|
|
||
|
|
# Morphological operations to clean up
|
||
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
|
||
|
|
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel, iterations=2)
|
||
|
|
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=1)
|
||
|
|
|
||
|
|
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
|
|
|
||
|
|
h, w = image.shape[:2]
|
||
|
|
min_area = (h * w) * 0.01 # minimum 1% of frame
|
||
|
|
max_area = (h * w) * 0.5 # maximum 50% of frame
|
||
|
|
|
||
|
|
boxes = []
|
||
|
|
for cnt in contours:
|
||
|
|
area = cv2.contourArea(cnt)
|
||
|
|
if area < min_area or area > max_area:
|
||
|
|
continue
|
||
|
|
|
||
|
|
x, y, bw, bh = cv2.boundingRect(cnt)
|
||
|
|
# Extract box region
|
||
|
|
roi = image[y:y+bh, x:x+bw]
|
||
|
|
if roi.size == 0:
|
||
|
|
continue
|
||
|
|
|
||
|
|
boxes.append({
|
||
|
|
"roi": roi,
|
||
|
|
"bbox": (x, y, bw, bh),
|
||
|
|
"center": (x + bw // 2, y + bh // 2),
|
||
|
|
"area": area,
|
||
|
|
})
|
||
|
|
|
||
|
|
return boxes
|
||
|
|
|
||
|
|
|
||
|
|
def find_target_box(model, image, target_id):
|
||
|
|
"""
|
||
|
|
Find the target box in the camera frame.
|
||
|
|
Returns (center_x, center_y, confidence) in pixel coordinates, or None.
|
||
|
|
"""
|
||
|
|
boxes = detect_boxes_in_frame(image, target_id)
|
||
|
|
|
||
|
|
best_match = None
|
||
|
|
best_conf = 0.0
|
||
|
|
|
||
|
|
for box in boxes:
|
||
|
|
comp_class, conf = classify_image(model, box["roi"])
|
||
|
|
if comp_class == target_id and conf > best_conf:
|
||
|
|
best_conf = conf
|
||
|
|
best_match = box
|
||
|
|
|
||
|
|
if best_match and best_conf > 0.5:
|
||
|
|
cx, cy = best_match["center"]
|
||
|
|
return cx, cy, best_conf, best_match["bbox"]
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def pixel_to_base_link(cx, cy, img_w, img_h,
|
||
|
|
camera_x=0.0, camera_y=0.0, camera_z=0.7,
|
||
|
|
fov_x=0.6, fov_y=0.6):
|
||
|
|
"""
|
||
|
|
Convert pixel coordinates to approximate Base_link coordinates.
|
||
|
|
Assumes camera looks straight down from (camera_x, camera_y, camera_z).
|
||
|
|
fov_x/fov_y: field of view in meters at camera_z height.
|
||
|
|
|
||
|
|
These parameters should be calibrated for the actual setup.
|
||
|
|
"""
|
||
|
|
# Normalize to [-0.5, 0.5]
|
||
|
|
nx = (cx / img_w) - 0.5
|
||
|
|
ny = (cy / img_h) - 0.5
|
||
|
|
|
||
|
|
# Map to world coordinates relative to camera
|
||
|
|
world_x = camera_x + nx * fov_x
|
||
|
|
world_y = camera_y + ny * fov_y
|
||
|
|
world_z = 0.0 # boxes are on the shelf surface
|
||
|
|
|
||
|
|
return world_x, world_y, world_z
|
||
|
|
|
||
|
|
|
||
|
|
def demo_on_image(image_path, target_id, model):
|
||
|
|
"""Demo classification on a single image."""
|
||
|
|
image = cv2.imread(image_path)
|
||
|
|
if image is None:
|
||
|
|
print(f"Error: cannot read {image_path}")
|
||
|
|
return
|
||
|
|
|
||
|
|
comp_class, confidence = classify_image(model, image)
|
||
|
|
print(f"Image: {image_path}")
|
||
|
|
print(f" Predicted: {CLASS_NAMES.get(comp_class, 'unknown')} (ID={comp_class})")
|
||
|
|
print(f" Confidence: {confidence:.4f}")
|
||
|
|
print(f" Target: {CLASS_NAMES.get(target_id, 'unknown')} (ID={target_id})")
|
||
|
|
print(f" Match: {'YES' if comp_class == target_id else 'NO'}")
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(description="Module V: Box Detection")
|
||
|
|
parser.add_argument("--target", type=int, required=True, choices=[1, 2, 3],
|
||
|
|
help="Target box ID: 1=hammer, 2=wrench, 3=pliers")
|
||
|
|
parser.add_argument("--model", type=str, default=None,
|
||
|
|
help="Path to model weights (default: best.pt)")
|
||
|
|
parser.add_argument("--image", type=str, default=None,
|
||
|
|
help="Test on a single image instead of ROS2 camera")
|
||
|
|
parser.add_argument("--ros", action="store_true",
|
||
|
|
help="Run in ROS2 mode (subscribe to camera topic)")
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
print(f"Target: {CLASS_NAMES[args.target]}")
|
||
|
|
model = load_model(args.model)
|
||
|
|
print("Model loaded successfully.")
|
||
|
|
|
||
|
|
if args.image:
|
||
|
|
demo_on_image(args.image, args.target, model)
|
||
|
|
return
|
||
|
|
|
||
|
|
if args.ros:
|
||
|
|
run_ros2(model, args.target)
|
||
|
|
return
|
||
|
|
|
||
|
|
# Default: demo on source images
|
||
|
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||
|
|
images_dir = os.path.join(base_dir, "images")
|
||
|
|
for name in ["Hammer.jpg", "Wrench.jpg", "Pliers.jpg"]:
|
||
|
|
path = os.path.join(images_dir, name)
|
||
|
|
if os.path.exists(path):
|
||
|
|
demo_on_image(path, args.target, model)
|
||
|
|
print()
|
||
|
|
|
||
|
|
|
||
|
|
def run_ros2(model, target_id):
|
||
|
|
"""ROS2 node that subscribes to camera and finds target box."""
|
||
|
|
try:
|
||
|
|
import rclpy
|
||
|
|
from rclpy.node import Node
|
||
|
|
from sensor_msgs.msg import Image
|
||
|
|
from geometry_msgs.msg import PoseStamped
|
||
|
|
from cv_bridge import CvBridge
|
||
|
|
except ImportError:
|
||
|
|
print("Error: ROS2 packages not found. Install rclpy, cv_bridge, etc.")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
class BoxDetectorNode(Node):
|
||
|
|
def __init__(self):
|
||
|
|
super().__init__("box_detector")
|
||
|
|
self.bridge = CvBridge()
|
||
|
|
self.model = model
|
||
|
|
self.target_id = target_id
|
||
|
|
self.detected = False
|
||
|
|
|
||
|
|
self.subscription = self.create_subscription(
|
||
|
|
Image,
|
||
|
|
"/RMC1/arm95/camera_gripper/image_color",
|
||
|
|
self.image_callback,
|
||
|
|
10
|
||
|
|
)
|
||
|
|
self.get_logger().info(
|
||
|
|
f"Box detector started. Target: {CLASS_NAMES[target_id]}"
|
||
|
|
)
|
||
|
|
|
||
|
|
def image_callback(self, msg):
|
||
|
|
if self.detected:
|
||
|
|
return
|
||
|
|
|
||
|
|
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8")
|
||
|
|
h, w = cv_image.shape[:2]
|
||
|
|
|
||
|
|
result = find_target_box(self.model, cv_image, self.target_id)
|
||
|
|
if result is not None:
|
||
|
|
cx, cy, conf, bbox = result
|
||
|
|
wx, wy, wz = pixel_to_base_link(cx, cy, w, h)
|
||
|
|
|
||
|
|
self.get_logger().info(
|
||
|
|
f"TARGET FOUND: {CLASS_NAMES[self.target_id]} "
|
||
|
|
f"at pixel ({cx}, {cy}), conf={conf:.2f}"
|
||
|
|
)
|
||
|
|
self.get_logger().info(
|
||
|
|
f"Base_link coords: x={wx:.3f}, y={wy:.3f}, z={wz:.3f}"
|
||
|
|
)
|
||
|
|
|
||
|
|
self.detected = True
|
||
|
|
else:
|
||
|
|
# Try classifying the whole frame
|
||
|
|
comp_class, conf = classify_image(self.model, cv_image)
|
||
|
|
self.get_logger().info(
|
||
|
|
f"Frame classified as: {CLASS_NAMES.get(comp_class, 'unknown')} "
|
||
|
|
f"(conf={conf:.2f})"
|
||
|
|
)
|
||
|
|
|
||
|
|
rclpy.init()
|
||
|
|
node = BoxDetectorNode()
|
||
|
|
try:
|
||
|
|
rclpy.spin(node)
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
pass
|
||
|
|
finally:
|
||
|
|
node.destroy_node()
|
||
|
|
rclpy.shutdown()
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|