Files
webots-module-b/nav_test.py
2026-04-01 06:42:19 +03:00

528 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from sensor_msgs.msg import Image
from std_msgs.msg import String
from nav_msgs.msg import Odometry
import cv2
from cv_bridge import CvBridge
import numpy as np
import math
import threading
import time
from collections import deque
# ── Настраиваемые коэффициенты ──────────────────────────────────────────────
K_VISUAL = 0.8 # П-коэффициент визуального cross-track (агрессивная коррекция)
K_VISUAL_DECAY = 0.92 # Коэффициент затухания визуальной ошибки (память)
K_HEADING = 0.6 # П-регулятор удержания курса
K_CROSS_P = 2.0 # Пропорциональный коэффициент odom cross-track
MAX_ANGULAR = 0.35 # Максимальная скорость поворота (рад/с)
K_TURN = 1.5 # П-регулятор поворота на месте
DRIVE_SPEED = 0.3 # Скорость движения вперёд (м/с)
CELL_SIZE_M = 0.65 # Предполагаемая длина одной клетки поля (м)
# ────────────────────────────────────────────────────────────────────────────
class RMC2Navigator(Node):
FIELD_ROWS = 6
FIELD_COLS = 6
def __init__(self):
super().__init__('rmc2_navigator')
self.cmd_pub = self.create_publisher(Twist, '/RMC2/cmd_vel', 10)
self.create_subscription(String, '/RMC2/aruco_id', self.aruco_id_cb, 10)
self.create_subscription(Image, '/RMC2/camera_bottom/image_color', self.camera_cb, 10)
self.create_subscription(Odometry, '/RMC2/odometry', self.odom_cb, 10)
self.bridge = CvBridge()
self.current_marker_id = None
self.target_marker_id = None
self.current_yaw = 0.0
self.current_x = 0.0
self.current_y = 0.0
self.target_yaw = 0.0
self.state = 'IDLE'
self.total_cells = 0
self.drive_start_x = 0.0
self.drive_start_y = 0.0
self.start_marker_id = None
self.blocked_cells = set()
self.waypoints = []
self.waypoint_idx = 0
# ArUco
try:
self.aruco_params = cv2.aruco.DetectorParameters_create() # old API (< 4.7)
except AttributeError:
self.aruco_params = cv2.aruco.DetectorParameters() # new API (>= 4.7)
try:
self.aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_50)
except AttributeError:
self.aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_50)
self._centering_ok_count = 0
self.latest_image = None
self._debug_frame_idx = 0
self._yaw_reference = None # Базовый yaw при старте движения
self._cross_track_ref_marker = None # Маркер, от которого ведётся odom cross-track
self._visual_err_x_memory = 0.0 # Память визуальной коррекции
# Таймер главного цикла (20 Гц)
self.timer = self.create_timer(0.05, self.control_loop)
# Поток ввода пользователя
self.input_thread = threading.Thread(target=self.user_input_thread, daemon=True)
self.input_thread.start()
self.get_logger().info("Узел навигации RMC2 запущен. Ожидание стартового маркера...")
# ── Колбэки ─────────────────────────────────────────────────────────────
def aruco_id_cb(self, msg):
try:
clean_id = msg.data.replace('aruco_', '')
self.current_marker_id = int(clean_id)
except ValueError:
pass
def odom_cb(self, msg):
self.current_x = msg.pose.pose.position.x
self.current_y = msg.pose.pose.position.y
q = msg.pose.pose.orientation
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z)
odom_yaw = math.atan2(siny_cosp, cosy_cosp)
# Компас жестко привязан к глобальным осям (отключен _yaw_reference)
self.current_yaw = self.normalize_angle(odom_yaw)
def camera_cb(self, msg):
try:
img_array = np.frombuffer(msg.data, dtype=np.uint8)
if msg.encoding == 'bgra8':
img = img_array.reshape((msg.height, msg.width, 4))
self.latest_image = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
elif msg.encoding == 'bgr8':
self.latest_image = img_array.reshape((msg.height, msg.width, 3)).copy()
elif msg.encoding == 'rgb8':
img = img_array.reshape((msg.height, msg.width, 3))
self.latest_image = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
else:
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
self.latest_image = cv_image.copy()
except Exception as e:
self.get_logger().error(f"Ошибка чтения кадра: {e}")
# ── Ввод пользователя ───────────────────────────────────────────────────
def user_input_thread(self):
while rclpy.ok():
if self.state == 'IDLE':
if self.current_marker_id is None:
time.sleep(1)
continue
try:
print(f"\n--- Робот над маркером: {self.current_marker_id} ---")
print("Заблокированные клетки:", sorted(self.blocked_cells))
target = input("Введите ID целевой метки (или 'b <id>' для блокировки): ").strip()
if target.startswith('b '):
blocked_id = int(target[2:])
if blocked_id in self.blocked_cells:
self.blocked_cells.remove(blocked_id)
print(f"Клетка {blocked_id} разблокирована")
else:
self.blocked_cells.add(blocked_id)
print(f"Клетка {blocked_id} заблокирована")
continue
self.target_marker_id = int(target)
self.start_navigation()
except ValueError:
print("Ошибка: введите целое число.")
else:
time.sleep(0.5)
# ── Вспомогательные функции ─────────────────────────────────────────────
def get_marker_coords(self, marker_id):
row = marker_id % 6
col = marker_id // 6
x = col * 1.0
y = 5 - row * 1.0
return x, y
def marker_to_grid(self, marker_id):
return (marker_id % 6, marker_id // 6)
def grid_to_marker(self, row, col):
return col * 6 + row
def get_neighbors(self, row, col):
neighbors = []
for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nr, nc = row + dr, col + dc
if 0 <= nr < self.FIELD_ROWS and 0 <= nc < self.FIELD_COLS:
mid = self.grid_to_marker(nr, nc)
if mid not in self.blocked_cells:
neighbors.append((nr, nc))
return neighbors
def find_path_bfs(self, start_id, goal_id):
if goal_id in self.blocked_cells:
return None
start = self.marker_to_grid(start_id)
goal = self.marker_to_grid(goal_id)
queue = deque([(start, [start])])
visited = {start}
while queue:
(row, col), path = queue.popleft()
if (row, col) == goal:
return path
for nr, nc in self.get_neighbors(row, col):
if (nr, nc) not in visited:
visited.add((nr, nc))
queue.append(((nr, nc), path + [(nr, nc)]))
return None
def normalize_angle(self, angle):
return math.atan2(math.sin(angle), math.cos(angle))
def start_navigation(self):
path = self.find_path_bfs(self.current_marker_id, self.target_marker_id)
if path is None:
print(f"Маршрут до {self.target_marker_id} не найден (заблокирован или нет пути)")
return
self.waypoints = [self.grid_to_marker(r, c) for r, c in path]
self.waypoint_idx = 1
print(f"Маршрут: {''.join(map(str, self.waypoints))}")
self._navigate_to_next_waypoint()
def _navigate_to_next_waypoint(self):
if self.waypoint_idx >= len(self.waypoints):
self.get_logger().info("Все точки маршрута пройдены!")
self.state = 'IDLE'
return
next_marker = self.waypoints[self.waypoint_idx]
cx, cy = self.get_marker_coords(self.current_marker_id)
tx, ty = self.get_marker_coords(next_marker)
dx, dy = tx - cx, ty - cy
dist = math.sqrt(dx*dx + dy*dy)
if dist < 0.01:
self.waypoint_idx += 1
self._navigate_to_next_waypoint()
return
self.get_logger().info(f"DEBUG: {self.current_marker_id}({cx:.1f},{cy:.1f}) -> {next_marker}({tx:.1f},{ty:.1f}), d=({dx:.1f},{dy:.1f})")
self.get_logger().info(f"DEBUG: current_yaw={math.degrees(self.current_yaw):.1f}°")
raw_yaw = math.atan2(dy, dx)
# Угол строго 180° может быть +π или -π, выбираем ближе к текущему yaw
raw_yaw_normalized = math.atan2(dy, dx) # в диапазоне [-π, π]
# Проверяем оба варианта: raw_yaw и raw_yaw ± 2π
candidates = [raw_yaw_normalized - 2*math.pi, raw_yaw_normalized, raw_yaw_normalized + 2*math.pi]
# Выбираем ближайший к current_yaw
best_yaw = min(candidates, key=lambda y: abs(y - self.current_yaw))
self.target_yaw = best_yaw
self.get_logger().info(f"DEBUG: target_yaw={math.degrees(self.target_yaw):.1f}°")
self.total_cells = 1
self.start_marker_id = self.current_marker_id
self._centering_ok_count = 0
self._line_start_x, self._line_start_y = self.get_marker_coords(self.current_marker_id)
self._line_perp_yaw = self.target_yaw + math.pi / 2.0
self._cross_track_ref_marker = self.current_marker_id
self._visual_err_x_memory = 0.0
self.get_logger().info(
f"Еду к точке {next_marker} (waypoint {self.waypoint_idx}/{len(self.waypoints)-1}), "
f"курс {math.degrees(self.target_yaw):.1f}°")
self.state = 'TURN_TO_TARGET'
def _cross_track_error(self, x, y):
"""
Рассчитывает signed cross-track error — расстояние от текущей позиции
до идеальной линии движения (с указанием знака: + справа, слева).
Линия: проходит через (_line_start_x, _line_start_y) под углом target_yaw.
"""
dx = x - self._line_start_x
dy = y - self._line_start_y
# Проекция на нормаль к линии (перпендикуляр target_yaw)
cross_track = -dx * math.sin(self.target_yaw) + dy * math.cos(self.target_yaw)
return cross_track
# ── ArUco-детектор (общий, переиспользуемый) ────────────────────────────
def _detect_markers(self):
"""Детектирует маркеры на последнем кадре камеры.
Возвращает (ids_list, corners_list, img_w, img_h)
или (None, None, 0, 0) при ошибке / нет кадра.
"""
if self.latest_image is None:
return None, None, 0, 0
try:
frame = self.latest_image.copy()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = np.ascontiguousarray(gray, dtype=np.uint8)
h, w = gray.shape
scale = min(1.0, 640.0 / w)
small = cv2.resize(gray, (0, 0), fx=scale, fy=scale,
interpolation=cv2.INTER_AREA) if scale < 1.0 else gray
corners, ids, _ = cv2.aruco.detectMarkers(
small, self.aruco_dict, parameters=self.aruco_params)
if ids is not None and scale < 1.0:
corners = [c / scale for c in corners]
if ids is None:
return [], [], w, h
return ids.flatten().tolist(), corners, w, h
except Exception as e:
self.get_logger().error(f"Ошибка детекции: {e}")
return None, None, 0, 0
def _marker_offset(self, marker_id, ids_list, corners, img_w, img_h):
"""Возвращает нормализованные (err_x, err_y) ∈ [-1, 1] для заданного маркера
или (None, None) если маркер не найден.
err_x > 0: маркер правее центра камеры (робот левее центра маркера).
err_y > 0: маркер ниже центра камеры (маркер «за» роботом).
"""
if not ids_list or marker_id not in ids_list:
return None, None
idx = ids_list.index(marker_id)
pts = corners[idx][0]
mcx = np.mean(pts[:, 0])
mcy = np.mean(pts[:, 1])
err_x = (mcx - img_w / 2.0) / (img_w / 2.0)
err_y = (mcy - img_h / 2.0) / (img_h / 2.0)
return err_x, err_y
# ── Главный цикл управления (20 Гц) ─────────────────────────────────────
def control_loop(self):
if self.state == 'IDLE':
return
twist = Twist()
# ── Поворот на месте (к цели или финальное выравнивание) ────────────
if self.state in ('TURN_TO_TARGET', 'ALIGN_ORIENTATION'):
yaw_err = self.normalize_angle(self.target_yaw - self.current_yaw)
if abs(yaw_err) > 0.035: # ~2°
# Замедляемся при подходе к цели: ниже 5° уменьшаем усиление
gain = K_TURN if abs(yaw_err) > 0.087 else K_TURN * 0.6
twist.angular.z = gain * yaw_err
twist.angular.z = max(-1.0, min(1.0, twist.angular.z))
if not hasattr(self, '_log_counter'):
self._log_counter = 0
self._log_counter += 1
if self._log_counter % 10 == 1:
self.get_logger().info(
f"TURN: target={math.degrees(self.target_yaw):.1f}°, "
f"current={math.degrees(self.current_yaw):.1f}°, "
f"err={math.degrees(yaw_err):.1f}°, cmd={twist.angular.z:.2f}")
else:
if self.state == 'TURN_TO_TARGET':
self.get_logger().info(
f"Курс взят ({math.degrees(self.target_yaw):.1f}°). "
f"Еду {self.total_cells} кл. прямо.")
self.state = 'DRIVE_STRAIGHT'
self.drive_start_x = self.current_x
self.drive_start_y = self.current_y
else:
self.get_logger().info("Ориентация выровнена!")
self.current_marker_id = self.target_marker_id
self.state = 'IDLE'
elif self.state == 'DRIVE_STRAIGHT':
if self.current_marker_id == self.target_marker_id:
self.get_logger().info("Финальный маркер подтверждён!")
self.waypoint_idx += 1
if self.waypoint_idx < len(self.waypoints):
self._navigate_to_next_waypoint()
else:
self.state = 'IDLE'
self.cmd_pub.publish(twist)
return
ids_list, corners, img_w, img_h = self._detect_markers()
if self.waypoint_idx < len(self.waypoints):
next_wp = self.waypoints[self.waypoint_idx]
if ids_list is not None and next_wp in ids_list:
self.get_logger().info(f"Промежуточный waypoint {next_wp} достигнут")
self.current_marker_id = next_wp
self._line_start_x, self._line_start_y = self.get_marker_coords(next_wp)
self.waypoint_idx += 1
if self.waypoint_idx < len(self.waypoints):
self._navigate_to_next_waypoint()
else:
self.state = 'IDLE'
self.cmd_pub.publish(twist)
return
if ids_list is not None and self.target_marker_id in ids_list:
ex, ey = self._marker_offset(self.target_marker_id, ids_list, corners, img_w, img_h)
if ey is not None and ey > -0.2:
self.get_logger().info(
f"Финальный маркер {self.target_marker_id} в кадре (err_y={ey:.2f}).")
self.waypoint_idx += 1
if self.waypoint_idx < len(self.waypoints):
self._navigate_to_next_waypoint()
else:
self.state = 'IDLE'
self.cmd_pub.publish(twist)
return
visual_err_x = None
if ids_list:
best_dist = float('inf')
for mid in ids_list:
ex, ey = self._marker_offset(mid, ids_list, corners, img_w, img_h)
if ex is None:
continue
dist = abs(ex) + abs(ey)
if dist < best_dist:
best_dist = dist
visual_err_x = ex
if visual_err_x is not None:
self._visual_err_x_memory = visual_err_x
else:
self._visual_err_x_memory *= K_VISUAL_DECAY
base_yaw_err = self.normalize_angle(self.target_yaw - self.current_yaw)
use_visual = abs(self._visual_err_x_memory) > 0.01
if use_visual:
twist.linear.x = DRIVE_SPEED
twist.angular.z = K_HEADING * base_yaw_err - K_VISUAL * self._visual_err_x_memory
twist.angular.z = max(-MAX_ANGULAR, min(MAX_ANGULAR, twist.angular.z))
else:
twist.linear.x = DRIVE_SPEED
twist.angular.z = K_HEADING * base_yaw_err
twist.angular.z = max(-MAX_ANGULAR, min(MAX_ANGULAR, twist.angular.z))
if not hasattr(self, '_cross_log_counter'):
self._cross_log_counter = 0
self._cross_log_counter += 1
if self._cross_log_counter % 10 == 0:
self.get_logger().info(
f"[VIS-MEM] err_x={self._visual_err_x_memory:+.3f}, "
f"yaw={math.degrees(base_yaw_err):+.1f}°, tw.z={twist.angular.z:+.3f}")
# ── Визуальное центрирование над финальным маркером ─────────────────
elif self.state == 'CENTER_MARKER':
if self.latest_image is None:
self.get_logger().warn("CENTER_MARKER: нет кадра", throttle_duration_sec=1.0)
return
try:
frame = self.latest_image.copy()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = np.ascontiguousarray(gray, dtype=np.uint8)
self._debug_frame_idx += 1
if self._debug_frame_idx % 40 == 1:
cv2.imwrite('/tmp/aruco_debug_gray.png', gray)
self.get_logger().info(
f"DEBUG кадр #{self._debug_frame_idx}: shape={gray.shape}, "
f"mean={gray.mean():.1f} → /tmp/aruco_debug_gray.png")
h_orig, w_orig = gray.shape
scale = min(1.0, 640.0 / w_orig)
if scale < 1.0:
small = cv2.resize(gray, (0, 0), fx=scale, fy=scale,
interpolation=cv2.INTER_AREA)
else:
small = gray
corners, ids, _ = cv2.aruco.detectMarkers(
small, self.aruco_dict, parameters=self.aruco_params)
if ids is not None and scale < 1.0:
corners = [c / scale for c in corners]
target_corners = None
if ids is not None:
found_ids = ids.flatten().tolist()
for i in range(len(ids)):
if ids[i][0] == self.target_marker_id:
target_corners = corners[i][0]
break
if target_corners is None:
self.get_logger().warn(
f"CENTER_MARKER: вижу {found_ids}, цель={self.target_marker_id} не найдена",
throttle_duration_sec=1.0)
else:
self.get_logger().warn(
"CENTER_MARKER: маркеры не найдены", throttle_duration_sec=1.0)
self._centering_ok_count = 0
if target_corners is not None:
mcx = np.mean(target_corners[:, 0])
mcy = np.mean(target_corners[:, 1])
h, w = gray.shape
err_x = (mcx - w / 2.0) / (w / 2.0) # нормализованная ошибка X
err_y = (mcy - h / 2.0) / (h / 2.0) # нормализованная ошибка Y
twist.linear.x = -0.12 * err_y
twist.angular.z = -0.25 * err_x
twist.linear.x = max(-0.08, min(0.08, twist.linear.x))
twist.angular.z = max(-0.20, min(0.20, twist.angular.z))
if abs(err_x) < 0.05 and abs(err_y) < 0.05:
self._centering_ok_count += 1
if self._centering_ok_count >= 5:
self.get_logger().info(
f"Центрирование завершено (err={err_x:.2f},{err_y:.2f}). "
f"Выравниваю ориентацию.")
self._centering_ok_count = 0
self.state = 'ALIGN_ORIENTATION'
else:
self._centering_ok_count = 0
except Exception as e:
self.get_logger().error(f"OpenCV ошибка: {e}")
self.cmd_pub.publish(twist)
def main(args=None):
rclpy.init(args=args)
navigator = RMC2Navigator()
try:
rclpy.spin(navigator)
except KeyboardInterrupt:
pass
finally:
navigator.cmd_pub.publish(Twist())
navigator.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()