#!/usr/bin/env python3 import math import threading import time from collections import deque import cv2 import numpy as np import rclpy from cv_bridge import CvBridge from geometry_msgs.msg import Twist from nav_msgs.msg import Odometry from rclpy.node import Node from sensor_msgs.msg import Image, LaserScan from std_msgs.msg import String # ── Настраиваемые коэффициенты ────────────────────────────────────────────── K_VISUAL = 0.8 # П-коэффициент визуального cross-track (агрессивная коррекция) K_VISUAL_DECAY = 0.92 # Коэффициент затухания визуальной ошибки (память) K_HEADING = 0.6 # П-регулятор удержания курса K_CROSS_P = 2.0 # Пропорциональный коэффициент odom cross-track MAX_ANGULAR = 0.35 # Максимальная скорость поворота (рад/с) K_TURN = 1.5 # П-регулятор поворота на месте DRIVE_SPEED = 0.3 # Скорость движения вперёд (м/с) CELL_SIZE_M = 0.65 # Предполагаемая длина одной клетки поля (м) # ── Параметры проверки препятствия лидаром ────────────────────────────────── LIDAR_FORWARD_HALF_ANGLE_DEG = 15.0 # Проверяем узкий сектор строго перед роботом LIDAR_MIN_VALID_RANGE_M = 0.05 # Игнорируем слишком близкие шумные значения LIDAR_BLOCK_DISTANCE_M = 0.85 # Если впереди объект ближе этого порога — следующая клетка занята LIDAR_CONFIRM_CYCLES = 1 # Сколько циклов подряд нужно подтвердить препятствие # ──────────────────────────────────────────────────────────────────────────── class RMC2Navigator(Node): FIELD_ROWS = 6 FIELD_COLS = 6 def __init__(self): super().__init__('rmc2_navigator') self.cmd_pub = self.create_publisher(Twist, '/RMC2/cmd_vel', 10) self.create_subscription(String, '/RMC2/aruco_id', self.aruco_id_cb, 10) self.create_subscription(Image, '/RMC2/camera_bottom/image_color', self.camera_cb, 10) self.create_subscription(Odometry, '/RMC2/odometry', self.odom_cb, 10) self.create_subscription(LaserScan, '/RMC2/scan_front', self.scan_cb, 10) self.bridge = CvBridge() self.current_marker_id = None self.target_marker_id = None self.current_yaw = 0.0 self.current_x = 0.0 self.current_y = 0.0 self.target_yaw = 0.0 self.state = 'IDLE' self.total_cells = 0 self.drive_start_x = 0.0 self.drive_start_y = 0.0 self.start_marker_id = None # Статические блокировки — ручные. Динамические — найденные лидаром. self.blocked_cells = set() self.dynamic_blocked_cells = set() self.waypoints = [] self.waypoint_idx = 0 # ArUco try: self.aruco_params = cv2.aruco.DetectorParameters_create() # old API (< 4.7) except AttributeError: self.aruco_params = cv2.aruco.DetectorParameters() # new API (>= 4.7) try: self.aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_50) except AttributeError: self.aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_50) self._centering_ok_count = 0 self.latest_image = None self.latest_scan = None self._debug_frame_idx = 0 self._yaw_reference = None # Базовый yaw при старте движения self._cross_track_ref_marker = None # Маркер, от которого ведётся odom cross-track self._visual_err_x_memory = 0.0 # Память визуальной коррекции # Служебное состояние детектора динамических препятствий. self._lidar_block_confirm_count = 0 self._lidar_checked_marker = None self._last_front_range = None self._occupancy_check_after_turn_pending = False # Таймер главного цикла (20 Гц) self.timer = self.create_timer(0.05, self.control_loop) # Поток ввода пользователя self.input_thread = threading.Thread(target=self.user_input_thread, daemon=True) self.input_thread.start() self.get_logger().info( 'Узел навигации RMC2 запущен. ' 'Статические препятствия задаются вручную, динамические — ищутся лидаром.' ) # ── Колбэки ───────────────────────────────────────────────────────────── def aruco_id_cb(self, msg): try: clean_id = msg.data.replace('aruco_', '') self.current_marker_id = int(clean_id) except ValueError: pass def odom_cb(self, msg): self.current_x = msg.pose.pose.position.x self.current_y = msg.pose.pose.position.y q = msg.pose.pose.orientation siny_cosp = 2 * (q.w * q.z + q.x * q.y) cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z) odom_yaw = math.atan2(siny_cosp, cosy_cosp) # Компас жестко привязан к глобальным осям (отключен _yaw_reference) self.current_yaw = self.normalize_angle(odom_yaw) def camera_cb(self, msg): try: img_array = np.frombuffer(msg.data, dtype=np.uint8) if msg.encoding == 'bgra8': img = img_array.reshape((msg.height, msg.width, 4)) self.latest_image = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) elif msg.encoding == 'bgr8': self.latest_image = img_array.reshape((msg.height, msg.width, 3)).copy() elif msg.encoding == 'rgb8': img = img_array.reshape((msg.height, msg.width, 3)) self.latest_image = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) else: cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') self.latest_image = cv_image.copy() except Exception as e: self.get_logger().error(f'Ошибка чтения кадра: {e}') def scan_cb(self, msg): self.latest_scan = msg # ── Ввод пользователя ─────────────────────────────────────────────────── def user_input_thread(self): while rclpy.ok(): if self.state == 'IDLE': if self.current_marker_id is None: time.sleep(1) continue try: all_blocked = sorted(self.get_all_blocked_cells()) print(f'\n--- Робот над маркером: {self.current_marker_id} ---') print('Статические блокировки:', sorted(self.blocked_cells)) print('Динамические блокировки:', sorted(self.dynamic_blocked_cells)) print('Все занятые клетки:', all_blocked) target = input("Введите ID целевой метки (или 'b ' для блокировки): ").strip() if target.startswith('b '): blocked_id = int(target[2:]) if blocked_id in self.blocked_cells: self.blocked_cells.remove(blocked_id) print(f'Статическая блокировка {blocked_id} снята') else: self.blocked_cells.add(blocked_id) print(f'Клетка {blocked_id} добавлена в статические препятствия') continue self.target_marker_id = int(target) self.start_navigation() except ValueError: print('Ошибка: введите целое число.') else: time.sleep(0.5) # ── Вспомогательные функции ───────────────────────────────────────────── def get_all_blocked_cells(self): return self.blocked_cells | self.dynamic_blocked_cells def get_marker_coords(self, marker_id): row = marker_id % 6 col = marker_id // 6 x = col * 1.0 y = 5 - row * 1.0 return x, y def marker_to_grid(self, marker_id): return (marker_id % 6, marker_id // 6) def grid_to_marker(self, row, col): return col * 6 + row def get_neighbors(self, row, col): neighbors = [] blocked = self.get_all_blocked_cells() for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]: nr, nc = row + dr, col + dc if 0 <= nr < self.FIELD_ROWS and 0 <= nc < self.FIELD_COLS: mid = self.grid_to_marker(nr, nc) if mid not in blocked: neighbors.append((nr, nc)) return neighbors def find_path_bfs(self, start_id, goal_id): blocked = self.get_all_blocked_cells() if goal_id in blocked: return None start = self.marker_to_grid(start_id) goal = self.marker_to_grid(goal_id) queue = deque([(start, [start])]) visited = {start} while queue: (row, col), path = queue.popleft() if (row, col) == goal: return path for nr, nc in self.get_neighbors(row, col): if (nr, nc) not in visited: visited.add((nr, nc)) queue.append(((nr, nc), path + [(nr, nc)])) return None def normalize_angle(self, angle): return math.atan2(math.sin(angle), math.cos(angle)) def _reset_lidar_block_detector(self): self._lidar_block_confirm_count = 0 self._lidar_checked_marker = None self._last_front_range = None def _get_forward_min_range(self): if self.latest_scan is None: return None ranges = np.asarray(self.latest_scan.ranges, dtype=np.float32) if ranges.size == 0: return None angles = self.latest_scan.angle_min + np.arange(ranges.size) * self.latest_scan.angle_increment half_angle = math.radians(LIDAR_FORWARD_HALF_ANGLE_DEG) valid_mask = np.isfinite(ranges) valid_mask &= ranges >= max(LIDAR_MIN_VALID_RANGE_M, self.latest_scan.range_min) if math.isfinite(self.latest_scan.range_max) and self.latest_scan.range_max > 0.0: valid_mask &= ranges <= self.latest_scan.range_max valid_mask &= np.abs(angles) <= half_angle if not np.any(valid_mask): return None return float(np.min(ranges[valid_mask])) def _check_next_waypoint_occupied(self): if self.waypoint_idx >= len(self.waypoints): self._reset_lidar_block_detector() return None next_marker = self.waypoints[self.waypoint_idx] if next_marker in self.get_all_blocked_cells(): self._reset_lidar_block_detector() return next_marker front_range = self._get_forward_min_range() self._last_front_range = front_range if self._lidar_checked_marker != next_marker: self._lidar_checked_marker = next_marker self._lidar_block_confirm_count = 0 if front_range is None or front_range > LIDAR_BLOCK_DISTANCE_M: self._lidar_block_confirm_count = 0 return None self._lidar_block_confirm_count += 1 self.get_logger().info( f'[LIDAR] obstacle candidate for marker {next_marker}: ' f'range={front_range:.2f} m, confirm={self._lidar_block_confirm_count}/{LIDAR_CONFIRM_CYCLES}', throttle_duration_sec=0.5, ) if self._lidar_block_confirm_count >= LIDAR_CONFIRM_CYCLES: self._lidar_block_confirm_count = 0 return next_marker return None def _mark_dynamic_blocked(self, marker_id, distance_m=None): if marker_id is None or marker_id == self.current_marker_id: return if marker_id not in self.dynamic_blocked_cells: self.dynamic_blocked_cells.add(marker_id) msg = f'Клетка {marker_id} помечена как занятая по лидару' if distance_m is not None: msg += f' (препятствие ~ {distance_m:.2f} м впереди)' self.get_logger().warn(msg) else: self.get_logger().warn( f'Лидар снова подтверждает занятость клетки {marker_id}', throttle_duration_sec=1.0, ) def _build_path_to_target(self, announce_prefix='Маршрут'): if self.current_marker_id is None or self.target_marker_id is None: self.get_logger().error('Невозможно построить маршрут: неизвестен текущий или целевой маркер') self.state = 'IDLE' return False path = self.find_path_bfs(self.current_marker_id, self.target_marker_id) if path is None: blocked = sorted(self.get_all_blocked_cells()) self.get_logger().error( f'Маршрут до {self.target_marker_id} не найден. Занятые клетки: {blocked}' ) self.waypoints = [] self.waypoint_idx = 0 self.state = 'IDLE' return False self.waypoints = [self.grid_to_marker(r, c) for r, c in path] self.waypoint_idx = 1 self._reset_lidar_block_detector() print(f"{announce_prefix}: {' → '.join(map(str, self.waypoints))}") return True def start_navigation(self): if self._build_path_to_target('Маршрут'): self._navigate_to_next_waypoint() def _replan_from_current_marker(self, reason='Перестроение маршрута'): self.cmd_pub.publish(Twist()) self.state = 'REPLAN' if self._build_path_to_target(reason): self._navigate_to_next_waypoint() def _navigate_to_next_waypoint(self): if self.waypoint_idx >= len(self.waypoints): self.get_logger().info('Все точки маршрута пройдены!') self.state = 'IDLE' return next_marker = self.waypoints[self.waypoint_idx] cx, cy = self.get_marker_coords(self.current_marker_id) tx, ty = self.get_marker_coords(next_marker) dx, dy = tx - cx, ty - cy dist = math.sqrt(dx * dx + dy * dy) if dist < 0.01: self.waypoint_idx += 1 self._navigate_to_next_waypoint() return self.get_logger().info( f'DEBUG: {self.current_marker_id}({cx:.1f},{cy:.1f}) -> ' f'{next_marker}({tx:.1f},{ty:.1f}), d=({dx:.1f},{dy:.1f})' ) self.get_logger().info(f'DEBUG: current_yaw={math.degrees(self.current_yaw):.1f}°') raw_yaw_normalized = math.atan2(dy, dx) candidates = [ raw_yaw_normalized - 2 * math.pi, raw_yaw_normalized, raw_yaw_normalized + 2 * math.pi, ] best_yaw = min(candidates, key=lambda y: abs(y - self.current_yaw)) self.target_yaw = best_yaw self.get_logger().info(f'DEBUG: target_yaw={math.degrees(self.target_yaw):.1f}°') self.total_cells = 1 self.start_marker_id = self.current_marker_id self._centering_ok_count = 0 self._line_start_x, self._line_start_y = self.get_marker_coords(self.current_marker_id) self._line_perp_yaw = self.target_yaw + math.pi / 2.0 self._cross_track_ref_marker = self.current_marker_id self._visual_err_x_memory = 0.0 self._reset_lidar_block_detector() self._occupancy_check_after_turn_pending = True self.get_logger().info( f'Еду к точке {next_marker} (waypoint {self.waypoint_idx}/{len(self.waypoints) - 1}), ' f'курс {math.degrees(self.target_yaw):.1f}°' ) self.state = 'TURN_TO_TARGET' def _cross_track_error(self, x, y): """ Рассчитывает signed cross-track error — расстояние от текущей позиции до идеальной линии движения (с указанием знака: + справа, − слева). Линия: проходит через (_line_start_x, _line_start_y) под углом target_yaw. """ dx = x - self._line_start_x dy = y - self._line_start_y cross_track = -dx * math.sin(self.target_yaw) + dy * math.cos(self.target_yaw) return cross_track # ── ArUco-детектор (общий, переиспользуемый) ──────────────────────────── def _detect_markers(self): """Детектирует маркеры на последнем кадре камеры. Возвращает (ids_list, corners_list, img_w, img_h) или (None, None, 0, 0) при ошибке / нет кадра. """ if self.latest_image is None: return None, None, 0, 0 try: frame = self.latest_image.copy() gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = np.ascontiguousarray(gray, dtype=np.uint8) h, w = gray.shape scale = min(1.0, 640.0 / w) small = cv2.resize( gray, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_AREA, ) if scale < 1.0 else gray corners, ids, _ = cv2.aruco.detectMarkers( small, self.aruco_dict, parameters=self.aruco_params, ) if ids is not None and scale < 1.0: corners = [c / scale for c in corners] if ids is None: return [], [], w, h return ids.flatten().tolist(), corners, w, h except Exception as e: self.get_logger().error(f'Ошибка детекции: {e}') return None, None, 0, 0 def _marker_offset(self, marker_id, ids_list, corners, img_w, img_h): """Возвращает нормализованные (err_x, err_y) ∈ [-1, 1] для заданного маркера или (None, None) если маркер не найден. err_x > 0: маркер правее центра камеры (робот левее центра маркера). err_y > 0: маркер ниже центра камеры (маркер «за» роботом). """ if not ids_list or marker_id not in ids_list: return None, None idx = ids_list.index(marker_id) pts = corners[idx][0] mcx = np.mean(pts[:, 0]) mcy = np.mean(pts[:, 1]) err_x = (mcx - img_w / 2.0) / (img_w / 2.0) err_y = (mcy - img_h / 2.0) / (img_h / 2.0) return err_x, err_y # ── Главный цикл управления (20 Гц) ───────────────────────────────────── def control_loop(self): if self.state == 'IDLE': return twist = Twist() # ── Поворот на месте (к цели или финальное выравнивание) ──────────── if self.state in ('TURN_TO_TARGET', 'ALIGN_ORIENTATION'): yaw_err = self.normalize_angle(self.target_yaw - self.current_yaw) if abs(yaw_err) > 0.035: # ~2° gain = K_TURN if abs(yaw_err) > 0.087 else K_TURN * 0.6 twist.angular.z = gain * yaw_err twist.angular.z = max(-1.0, min(1.0, twist.angular.z)) if not hasattr(self, '_log_counter'): self._log_counter = 0 self._log_counter += 1 if self._log_counter % 10 == 1: self.get_logger().info( f'TURN: target={math.degrees(self.target_yaw):.1f}°, ' f'current={math.degrees(self.current_yaw):.1f}°, ' f'err={math.degrees(yaw_err):.1f}°, cmd={twist.angular.z:.2f}' ) else: if self.state == 'TURN_TO_TARGET': if self._occupancy_check_after_turn_pending: blocked_marker = self._check_next_waypoint_occupied() if blocked_marker is not None: self._mark_dynamic_blocked(blocked_marker, self._last_front_range) self._replan_from_current_marker( f'Перестроение маршрута после обнаружения препятствия на {blocked_marker}' ) self.cmd_pub.publish(Twist()) return self._occupancy_check_after_turn_pending = False self.get_logger().info( f'Курс взят ({math.degrees(self.target_yaw):.1f}°). ' f'Еду {self.total_cells} кл. прямо.' ) self.state = 'DRIVE_STRAIGHT' self.drive_start_x = self.current_x self.drive_start_y = self.current_y else: self.get_logger().info('Ориентация выровнена!') self.current_marker_id = self.target_marker_id self.state = 'IDLE' elif self.state == 'DRIVE_STRAIGHT': if self.current_marker_id == self.target_marker_id: self.get_logger().info('Финальный маркер подтверждён!') self.waypoint_idx += 1 if self.waypoint_idx < len(self.waypoints): self._navigate_to_next_waypoint() else: self.state = 'IDLE' self.cmd_pub.publish(twist) return ids_list, corners, img_w, img_h = self._detect_markers() if self.waypoint_idx < len(self.waypoints): next_wp = self.waypoints[self.waypoint_idx] if ids_list is not None and next_wp in ids_list: self.get_logger().info(f'Промежуточный waypoint {next_wp} достигнут') self.current_marker_id = next_wp self._line_start_x, self._line_start_y = self.get_marker_coords(next_wp) self.waypoint_idx += 1 if self.waypoint_idx < len(self.waypoints): self._navigate_to_next_waypoint() else: self.state = 'IDLE' self.cmd_pub.publish(twist) return if ids_list is not None and self.target_marker_id in ids_list: ex, ey = self._marker_offset(self.target_marker_id, ids_list, corners, img_w, img_h) if ey is not None and ey > -0.2: self.get_logger().info( f'Финальный маркер {self.target_marker_id} в кадре (err_y={ey:.2f}).' ) self.waypoint_idx += 1 if self.waypoint_idx < len(self.waypoints): self._navigate_to_next_waypoint() else: self.state = 'IDLE' self.cmd_pub.publish(twist) return visual_err_x = None if ids_list: best_dist = float('inf') for mid in ids_list: ex, ey = self._marker_offset(mid, ids_list, corners, img_w, img_h) if ex is None: continue dist = abs(ex) + abs(ey) if dist < best_dist: best_dist = dist visual_err_x = ex if visual_err_x is not None: self._visual_err_x_memory = visual_err_x else: self._visual_err_x_memory *= K_VISUAL_DECAY base_yaw_err = self.normalize_angle(self.target_yaw - self.current_yaw) use_visual = abs(self._visual_err_x_memory) > 0.01 twist.linear.x = DRIVE_SPEED if use_visual: twist.angular.z = K_HEADING * base_yaw_err - K_VISUAL * self._visual_err_x_memory else: twist.angular.z = K_HEADING * base_yaw_err twist.angular.z = max(-MAX_ANGULAR, min(MAX_ANGULAR, twist.angular.z)) if not hasattr(self, '_cross_log_counter'): self._cross_log_counter = 0 self._cross_log_counter += 1 if self._cross_log_counter % 10 == 0: lidar_msg = 'n/a' if self._last_front_range is None else f'{self._last_front_range:.2f} m' self.get_logger().info( f'[VIS-MEM] err_x={self._visual_err_x_memory:+.3f}, ' f'yaw={math.degrees(base_yaw_err):+.1f}°, ' f'tw.z={twist.angular.z:+.3f}, lidar={lidar_msg}' ) # ── Визуальное центрирование над финальным маркером ───────────────── elif self.state == 'CENTER_MARKER': if self.latest_image is None: self.get_logger().warn('CENTER_MARKER: нет кадра', throttle_duration_sec=1.0) return try: frame = self.latest_image.copy() gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = np.ascontiguousarray(gray, dtype=np.uint8) self._debug_frame_idx += 1 if self._debug_frame_idx % 40 == 1: cv2.imwrite('/tmp/aruco_debug_gray.png', gray) self.get_logger().info( f'DEBUG кадр #{self._debug_frame_idx}: shape={gray.shape}, ' f'mean={gray.mean():.1f} → /tmp/aruco_debug_gray.png' ) h_orig, w_orig = gray.shape scale = min(1.0, 640.0 / w_orig) if scale < 1.0: small = cv2.resize(gray, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_AREA) else: small = gray corners, ids, _ = cv2.aruco.detectMarkers( small, self.aruco_dict, parameters=self.aruco_params, ) if ids is not None and scale < 1.0: corners = [c / scale for c in corners] target_corners = None if ids is not None: found_ids = ids.flatten().tolist() for i in range(len(ids)): if ids[i][0] == self.target_marker_id: target_corners = corners[i][0] break if target_corners is None: self.get_logger().warn( f'CENTER_MARKER: вижу {found_ids}, цель={self.target_marker_id} не найдена', throttle_duration_sec=1.0, ) else: self.get_logger().warn( 'CENTER_MARKER: маркеры не найдены', throttle_duration_sec=1.0, ) self._centering_ok_count = 0 if target_corners is not None: mcx = np.mean(target_corners[:, 0]) mcy = np.mean(target_corners[:, 1]) h, w = gray.shape err_x = (mcx - w / 2.0) / (w / 2.0) err_y = (mcy - h / 2.0) / (h / 2.0) twist.linear.x = -0.12 * err_y twist.angular.z = -0.25 * err_x twist.linear.x = max(-0.08, min(0.08, twist.linear.x)) twist.angular.z = max(-0.20, min(0.20, twist.angular.z)) if abs(err_x) < 0.05 and abs(err_y) < 0.05: self._centering_ok_count += 1 if self._centering_ok_count >= 5: self.get_logger().info( f'Центрирование завершено (err={err_x:.2f},{err_y:.2f}). ' 'Выравниваю ориентацию.' ) self._centering_ok_count = 0 self.state = 'ALIGN_ORIENTATION' else: self._centering_ok_count = 0 except Exception as e: self.get_logger().error(f'OpenCV ошибка: {e}') self.cmd_pub.publish(twist) def main(args=None): rclpy.init(args=args) navigator = RMC2Navigator() try: rclpy.spin(navigator) except KeyboardInterrupt: pass finally: navigator.cmd_pub.publish(Twist()) navigator.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()