695 lines
30 KiB
Python
695 lines
30 KiB
Python
#!/usr/bin/env python3
|
||
|
||
import math
|
||
import threading
|
||
import time
|
||
from collections import deque
|
||
|
||
import cv2
|
||
import numpy as np
|
||
import rclpy
|
||
from cv_bridge import CvBridge
|
||
from geometry_msgs.msg import Twist
|
||
from nav_msgs.msg import Odometry
|
||
from rclpy.node import Node
|
||
from sensor_msgs.msg import Image, LaserScan
|
||
from std_msgs.msg import String
|
||
|
||
# ── Настраиваемые коэффициенты ──────────────────────────────────────────────
|
||
K_VISUAL = 0.8 # П-коэффициент визуального cross-track (агрессивная коррекция)
|
||
K_VISUAL_DECAY = 0.92 # Коэффициент затухания визуальной ошибки (память)
|
||
K_HEADING = 0.6 # П-регулятор удержания курса
|
||
K_CROSS_P = 2.0 # Пропорциональный коэффициент odom cross-track
|
||
MAX_ANGULAR = 0.35 # Максимальная скорость поворота (рад/с)
|
||
K_TURN = 1.5 # П-регулятор поворота на месте
|
||
DRIVE_SPEED = 0.3 # Скорость движения вперёд (м/с)
|
||
CELL_SIZE_M = 0.65 # Предполагаемая длина одной клетки поля (м)
|
||
|
||
# ── Параметры проверки препятствия лидаром ──────────────────────────────────
|
||
LIDAR_FORWARD_HALF_ANGLE_DEG = 15.0 # Проверяем узкий сектор строго перед роботом
|
||
LIDAR_MIN_VALID_RANGE_M = 0.05 # Игнорируем слишком близкие шумные значения
|
||
LIDAR_BLOCK_DISTANCE_M = 0.85 # Если впереди объект ближе этого порога — следующая клетка занята
|
||
LIDAR_CONFIRM_CYCLES = 1 # Сколько циклов подряд нужно подтвердить препятствие
|
||
# ────────────────────────────────────────────────────────────────────────────
|
||
|
||
|
||
class RMC2Navigator(Node):
|
||
FIELD_ROWS = 6
|
||
FIELD_COLS = 6
|
||
|
||
def __init__(self):
|
||
super().__init__('rmc2_navigator')
|
||
|
||
self.cmd_pub = self.create_publisher(Twist, '/RMC2/cmd_vel', 10)
|
||
self.create_subscription(String, '/RMC2/aruco_id', self.aruco_id_cb, 10)
|
||
self.create_subscription(Image, '/RMC2/camera_bottom/image_color', self.camera_cb, 10)
|
||
self.create_subscription(Odometry, '/RMC2/odometry', self.odom_cb, 10)
|
||
self.create_subscription(LaserScan, '/RMC2/scan_front', self.scan_cb, 10)
|
||
|
||
self.bridge = CvBridge()
|
||
|
||
self.current_marker_id = None
|
||
self.target_marker_id = None
|
||
self.current_yaw = 0.0
|
||
self.current_x = 0.0
|
||
self.current_y = 0.0
|
||
self.target_yaw = 0.0
|
||
|
||
self.state = 'IDLE'
|
||
|
||
self.total_cells = 0
|
||
self.drive_start_x = 0.0
|
||
self.drive_start_y = 0.0
|
||
self.start_marker_id = None
|
||
|
||
# Статические блокировки — ручные. Динамические — найденные лидаром.
|
||
self.blocked_cells = set()
|
||
self.dynamic_blocked_cells = set()
|
||
self.waypoints = []
|
||
self.waypoint_idx = 0
|
||
|
||
# ArUco
|
||
try:
|
||
self.aruco_params = cv2.aruco.DetectorParameters_create() # old API (< 4.7)
|
||
except AttributeError:
|
||
self.aruco_params = cv2.aruco.DetectorParameters() # new API (>= 4.7)
|
||
|
||
try:
|
||
self.aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_50)
|
||
except AttributeError:
|
||
self.aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_6X6_50)
|
||
|
||
self._centering_ok_count = 0
|
||
self.latest_image = None
|
||
self.latest_scan = None
|
||
self._debug_frame_idx = 0
|
||
self._yaw_reference = None # Базовый yaw при старте движения
|
||
self._cross_track_ref_marker = None # Маркер, от которого ведётся odom cross-track
|
||
self._visual_err_x_memory = 0.0 # Память визуальной коррекции
|
||
|
||
# Служебное состояние детектора динамических препятствий.
|
||
self._lidar_block_confirm_count = 0
|
||
self._lidar_checked_marker = None
|
||
self._last_front_range = None
|
||
self._occupancy_check_after_turn_pending = False
|
||
|
||
# Таймер главного цикла (20 Гц)
|
||
self.timer = self.create_timer(0.05, self.control_loop)
|
||
|
||
# Поток ввода пользователя
|
||
self.input_thread = threading.Thread(target=self.user_input_thread, daemon=True)
|
||
self.input_thread.start()
|
||
|
||
self.get_logger().info(
|
||
'Узел навигации RMC2 запущен. '
|
||
'Статические препятствия задаются вручную, динамические — ищутся лидаром.'
|
||
)
|
||
|
||
# ── Колбэки ─────────────────────────────────────────────────────────────
|
||
|
||
def aruco_id_cb(self, msg):
|
||
try:
|
||
clean_id = msg.data.replace('aruco_', '')
|
||
self.current_marker_id = int(clean_id)
|
||
except ValueError:
|
||
pass
|
||
|
||
def odom_cb(self, msg):
|
||
self.current_x = msg.pose.pose.position.x
|
||
self.current_y = msg.pose.pose.position.y
|
||
|
||
q = msg.pose.pose.orientation
|
||
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
|
||
cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z)
|
||
odom_yaw = math.atan2(siny_cosp, cosy_cosp)
|
||
|
||
# Компас жестко привязан к глобальным осям (отключен _yaw_reference)
|
||
self.current_yaw = self.normalize_angle(odom_yaw)
|
||
|
||
def camera_cb(self, msg):
|
||
try:
|
||
img_array = np.frombuffer(msg.data, dtype=np.uint8)
|
||
if msg.encoding == 'bgra8':
|
||
img = img_array.reshape((msg.height, msg.width, 4))
|
||
self.latest_image = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
|
||
elif msg.encoding == 'bgr8':
|
||
self.latest_image = img_array.reshape((msg.height, msg.width, 3)).copy()
|
||
elif msg.encoding == 'rgb8':
|
||
img = img_array.reshape((msg.height, msg.width, 3))
|
||
self.latest_image = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
||
else:
|
||
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
|
||
self.latest_image = cv_image.copy()
|
||
except Exception as e:
|
||
self.get_logger().error(f'Ошибка чтения кадра: {e}')
|
||
|
||
def scan_cb(self, msg):
|
||
self.latest_scan = msg
|
||
|
||
# ── Ввод пользователя ───────────────────────────────────────────────────
|
||
|
||
def user_input_thread(self):
|
||
while rclpy.ok():
|
||
if self.state == 'IDLE':
|
||
if self.current_marker_id is None:
|
||
time.sleep(1)
|
||
continue
|
||
try:
|
||
all_blocked = sorted(self.get_all_blocked_cells())
|
||
print(f'\n--- Робот над маркером: {self.current_marker_id} ---')
|
||
print('Статические блокировки:', sorted(self.blocked_cells))
|
||
print('Динамические блокировки:', sorted(self.dynamic_blocked_cells))
|
||
print('Все занятые клетки:', all_blocked)
|
||
target = input("Введите ID целевой метки (или 'b <id>' для блокировки): ").strip()
|
||
if target.startswith('b '):
|
||
blocked_id = int(target[2:])
|
||
if blocked_id in self.blocked_cells:
|
||
self.blocked_cells.remove(blocked_id)
|
||
print(f'Статическая блокировка {blocked_id} снята')
|
||
else:
|
||
self.blocked_cells.add(blocked_id)
|
||
print(f'Клетка {blocked_id} добавлена в статические препятствия')
|
||
continue
|
||
self.target_marker_id = int(target)
|
||
self.start_navigation()
|
||
except ValueError:
|
||
print('Ошибка: введите целое число.')
|
||
else:
|
||
time.sleep(0.5)
|
||
|
||
# ── Вспомогательные функции ─────────────────────────────────────────────
|
||
|
||
def get_all_blocked_cells(self):
|
||
return self.blocked_cells | self.dynamic_blocked_cells
|
||
|
||
def get_marker_coords(self, marker_id):
|
||
row = marker_id % 6
|
||
col = marker_id // 6
|
||
x = col * 1.0
|
||
y = 5 - row * 1.0
|
||
return x, y
|
||
|
||
def marker_to_grid(self, marker_id):
|
||
return (marker_id % 6, marker_id // 6)
|
||
|
||
def grid_to_marker(self, row, col):
|
||
return col * 6 + row
|
||
|
||
def get_neighbors(self, row, col):
|
||
neighbors = []
|
||
blocked = self.get_all_blocked_cells()
|
||
for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
|
||
nr, nc = row + dr, col + dc
|
||
if 0 <= nr < self.FIELD_ROWS and 0 <= nc < self.FIELD_COLS:
|
||
mid = self.grid_to_marker(nr, nc)
|
||
if mid not in blocked:
|
||
neighbors.append((nr, nc))
|
||
return neighbors
|
||
|
||
def find_path_bfs(self, start_id, goal_id):
|
||
blocked = self.get_all_blocked_cells()
|
||
if goal_id in blocked:
|
||
return None
|
||
|
||
start = self.marker_to_grid(start_id)
|
||
goal = self.marker_to_grid(goal_id)
|
||
queue = deque([(start, [start])])
|
||
visited = {start}
|
||
|
||
while queue:
|
||
(row, col), path = queue.popleft()
|
||
if (row, col) == goal:
|
||
return path
|
||
for nr, nc in self.get_neighbors(row, col):
|
||
if (nr, nc) not in visited:
|
||
visited.add((nr, nc))
|
||
queue.append(((nr, nc), path + [(nr, nc)]))
|
||
return None
|
||
|
||
def normalize_angle(self, angle):
|
||
return math.atan2(math.sin(angle), math.cos(angle))
|
||
|
||
def _reset_lidar_block_detector(self):
|
||
self._lidar_block_confirm_count = 0
|
||
self._lidar_checked_marker = None
|
||
self._last_front_range = None
|
||
|
||
def _get_forward_min_range(self):
|
||
if self.latest_scan is None:
|
||
return None
|
||
|
||
ranges = np.asarray(self.latest_scan.ranges, dtype=np.float32)
|
||
if ranges.size == 0:
|
||
return None
|
||
|
||
angles = self.latest_scan.angle_min + np.arange(ranges.size) * self.latest_scan.angle_increment
|
||
half_angle = math.radians(LIDAR_FORWARD_HALF_ANGLE_DEG)
|
||
|
||
valid_mask = np.isfinite(ranges)
|
||
valid_mask &= ranges >= max(LIDAR_MIN_VALID_RANGE_M, self.latest_scan.range_min)
|
||
if math.isfinite(self.latest_scan.range_max) and self.latest_scan.range_max > 0.0:
|
||
valid_mask &= ranges <= self.latest_scan.range_max
|
||
valid_mask &= np.abs(angles) <= half_angle
|
||
|
||
if not np.any(valid_mask):
|
||
return None
|
||
|
||
return float(np.min(ranges[valid_mask]))
|
||
|
||
def _check_next_waypoint_occupied(self):
|
||
if self.waypoint_idx >= len(self.waypoints):
|
||
self._reset_lidar_block_detector()
|
||
return None
|
||
|
||
next_marker = self.waypoints[self.waypoint_idx]
|
||
|
||
if next_marker in self.get_all_blocked_cells():
|
||
self._reset_lidar_block_detector()
|
||
return next_marker
|
||
|
||
front_range = self._get_forward_min_range()
|
||
self._last_front_range = front_range
|
||
|
||
if self._lidar_checked_marker != next_marker:
|
||
self._lidar_checked_marker = next_marker
|
||
self._lidar_block_confirm_count = 0
|
||
|
||
if front_range is None or front_range > LIDAR_BLOCK_DISTANCE_M:
|
||
self._lidar_block_confirm_count = 0
|
||
return None
|
||
|
||
self._lidar_block_confirm_count += 1
|
||
self.get_logger().info(
|
||
f'[LIDAR] obstacle candidate for marker {next_marker}: '
|
||
f'range={front_range:.2f} m, confirm={self._lidar_block_confirm_count}/{LIDAR_CONFIRM_CYCLES}',
|
||
throttle_duration_sec=0.5,
|
||
)
|
||
|
||
if self._lidar_block_confirm_count >= LIDAR_CONFIRM_CYCLES:
|
||
self._lidar_block_confirm_count = 0
|
||
return next_marker
|
||
|
||
return None
|
||
|
||
def _mark_dynamic_blocked(self, marker_id, distance_m=None):
|
||
if marker_id is None or marker_id == self.current_marker_id:
|
||
return
|
||
|
||
if marker_id not in self.dynamic_blocked_cells:
|
||
self.dynamic_blocked_cells.add(marker_id)
|
||
msg = f'Клетка {marker_id} помечена как занятая по лидару'
|
||
if distance_m is not None:
|
||
msg += f' (препятствие ~ {distance_m:.2f} м впереди)'
|
||
self.get_logger().warn(msg)
|
||
else:
|
||
self.get_logger().warn(
|
||
f'Лидар снова подтверждает занятость клетки {marker_id}',
|
||
throttle_duration_sec=1.0,
|
||
)
|
||
|
||
def _build_path_to_target(self, announce_prefix='Маршрут'):
|
||
if self.current_marker_id is None or self.target_marker_id is None:
|
||
self.get_logger().error('Невозможно построить маршрут: неизвестен текущий или целевой маркер')
|
||
self.state = 'IDLE'
|
||
return False
|
||
|
||
path = self.find_path_bfs(self.current_marker_id, self.target_marker_id)
|
||
if path is None:
|
||
blocked = sorted(self.get_all_blocked_cells())
|
||
self.get_logger().error(
|
||
f'Маршрут до {self.target_marker_id} не найден. Занятые клетки: {blocked}'
|
||
)
|
||
self.waypoints = []
|
||
self.waypoint_idx = 0
|
||
self.state = 'IDLE'
|
||
return False
|
||
|
||
self.waypoints = [self.grid_to_marker(r, c) for r, c in path]
|
||
self.waypoint_idx = 1
|
||
self._reset_lidar_block_detector()
|
||
|
||
print(f"{announce_prefix}: {' → '.join(map(str, self.waypoints))}")
|
||
return True
|
||
|
||
def start_navigation(self):
|
||
if self._build_path_to_target('Маршрут'):
|
||
self._navigate_to_next_waypoint()
|
||
|
||
def _replan_from_current_marker(self, reason='Перестроение маршрута'):
|
||
self.cmd_pub.publish(Twist())
|
||
self.state = 'REPLAN'
|
||
if self._build_path_to_target(reason):
|
||
self._navigate_to_next_waypoint()
|
||
|
||
def _navigate_to_next_waypoint(self):
|
||
if self.waypoint_idx >= len(self.waypoints):
|
||
self.get_logger().info('Все точки маршрута пройдены!')
|
||
self.state = 'IDLE'
|
||
return
|
||
|
||
next_marker = self.waypoints[self.waypoint_idx]
|
||
cx, cy = self.get_marker_coords(self.current_marker_id)
|
||
tx, ty = self.get_marker_coords(next_marker)
|
||
dx, dy = tx - cx, ty - cy
|
||
|
||
dist = math.sqrt(dx * dx + dy * dy)
|
||
if dist < 0.01:
|
||
self.waypoint_idx += 1
|
||
self._navigate_to_next_waypoint()
|
||
return
|
||
|
||
self.get_logger().info(
|
||
f'DEBUG: {self.current_marker_id}({cx:.1f},{cy:.1f}) -> '
|
||
f'{next_marker}({tx:.1f},{ty:.1f}), d=({dx:.1f},{dy:.1f})'
|
||
)
|
||
self.get_logger().info(f'DEBUG: current_yaw={math.degrees(self.current_yaw):.1f}°')
|
||
|
||
raw_yaw_normalized = math.atan2(dy, dx)
|
||
candidates = [
|
||
raw_yaw_normalized - 2 * math.pi,
|
||
raw_yaw_normalized,
|
||
raw_yaw_normalized + 2 * math.pi,
|
||
]
|
||
best_yaw = min(candidates, key=lambda y: abs(y - self.current_yaw))
|
||
self.target_yaw = best_yaw
|
||
|
||
self.get_logger().info(f'DEBUG: target_yaw={math.degrees(self.target_yaw):.1f}°')
|
||
self.total_cells = 1
|
||
self.start_marker_id = self.current_marker_id
|
||
self._centering_ok_count = 0
|
||
|
||
self._line_start_x, self._line_start_y = self.get_marker_coords(self.current_marker_id)
|
||
self._line_perp_yaw = self.target_yaw + math.pi / 2.0
|
||
self._cross_track_ref_marker = self.current_marker_id
|
||
self._visual_err_x_memory = 0.0
|
||
self._reset_lidar_block_detector()
|
||
self._occupancy_check_after_turn_pending = True
|
||
|
||
self.get_logger().info(
|
||
f'Еду к точке {next_marker} (waypoint {self.waypoint_idx}/{len(self.waypoints) - 1}), '
|
||
f'курс {math.degrees(self.target_yaw):.1f}°'
|
||
)
|
||
self.state = 'TURN_TO_TARGET'
|
||
|
||
def _cross_track_error(self, x, y):
|
||
"""
|
||
Рассчитывает signed cross-track error — расстояние от текущей позиции
|
||
до идеальной линии движения (с указанием знака: + справа, − слева).
|
||
|
||
Линия: проходит через (_line_start_x, _line_start_y) под углом target_yaw.
|
||
"""
|
||
dx = x - self._line_start_x
|
||
dy = y - self._line_start_y
|
||
cross_track = -dx * math.sin(self.target_yaw) + dy * math.cos(self.target_yaw)
|
||
return cross_track
|
||
|
||
# ── ArUco-детектор (общий, переиспользуемый) ────────────────────────────
|
||
|
||
def _detect_markers(self):
|
||
"""Детектирует маркеры на последнем кадре камеры.
|
||
|
||
Возвращает (ids_list, corners_list, img_w, img_h)
|
||
или (None, None, 0, 0) при ошибке / нет кадра.
|
||
"""
|
||
if self.latest_image is None:
|
||
return None, None, 0, 0
|
||
try:
|
||
frame = self.latest_image.copy()
|
||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||
gray = np.ascontiguousarray(gray, dtype=np.uint8)
|
||
h, w = gray.shape
|
||
|
||
scale = min(1.0, 640.0 / w)
|
||
small = cv2.resize(
|
||
gray,
|
||
(0, 0),
|
||
fx=scale,
|
||
fy=scale,
|
||
interpolation=cv2.INTER_AREA,
|
||
) if scale < 1.0 else gray
|
||
|
||
corners, ids, _ = cv2.aruco.detectMarkers(
|
||
small,
|
||
self.aruco_dict,
|
||
parameters=self.aruco_params,
|
||
)
|
||
|
||
if ids is not None and scale < 1.0:
|
||
corners = [c / scale for c in corners]
|
||
|
||
if ids is None:
|
||
return [], [], w, h
|
||
|
||
return ids.flatten().tolist(), corners, w, h
|
||
except Exception as e:
|
||
self.get_logger().error(f'Ошибка детекции: {e}')
|
||
return None, None, 0, 0
|
||
|
||
def _marker_offset(self, marker_id, ids_list, corners, img_w, img_h):
|
||
"""Возвращает нормализованные (err_x, err_y) ∈ [-1, 1] для заданного маркера
|
||
или (None, None) если маркер не найден.
|
||
|
||
err_x > 0: маркер правее центра камеры (робот левее центра маркера).
|
||
err_y > 0: маркер ниже центра камеры (маркер «за» роботом).
|
||
"""
|
||
if not ids_list or marker_id not in ids_list:
|
||
return None, None
|
||
idx = ids_list.index(marker_id)
|
||
pts = corners[idx][0]
|
||
mcx = np.mean(pts[:, 0])
|
||
mcy = np.mean(pts[:, 1])
|
||
err_x = (mcx - img_w / 2.0) / (img_w / 2.0)
|
||
err_y = (mcy - img_h / 2.0) / (img_h / 2.0)
|
||
return err_x, err_y
|
||
|
||
# ── Главный цикл управления (20 Гц) ─────────────────────────────────────
|
||
|
||
def control_loop(self):
|
||
if self.state == 'IDLE':
|
||
return
|
||
|
||
twist = Twist()
|
||
|
||
# ── Поворот на месте (к цели или финальное выравнивание) ────────────
|
||
if self.state in ('TURN_TO_TARGET', 'ALIGN_ORIENTATION'):
|
||
yaw_err = self.normalize_angle(self.target_yaw - self.current_yaw)
|
||
|
||
if abs(yaw_err) > 0.035: # ~2°
|
||
gain = K_TURN if abs(yaw_err) > 0.087 else K_TURN * 0.6
|
||
twist.angular.z = gain * yaw_err
|
||
twist.angular.z = max(-1.0, min(1.0, twist.angular.z))
|
||
if not hasattr(self, '_log_counter'):
|
||
self._log_counter = 0
|
||
self._log_counter += 1
|
||
if self._log_counter % 10 == 1:
|
||
self.get_logger().info(
|
||
f'TURN: target={math.degrees(self.target_yaw):.1f}°, '
|
||
f'current={math.degrees(self.current_yaw):.1f}°, '
|
||
f'err={math.degrees(yaw_err):.1f}°, cmd={twist.angular.z:.2f}'
|
||
)
|
||
else:
|
||
if self.state == 'TURN_TO_TARGET':
|
||
if self._occupancy_check_after_turn_pending:
|
||
blocked_marker = self._check_next_waypoint_occupied()
|
||
if blocked_marker is not None:
|
||
self._mark_dynamic_blocked(blocked_marker, self._last_front_range)
|
||
self._replan_from_current_marker(
|
||
f'Перестроение маршрута после обнаружения препятствия на {blocked_marker}'
|
||
)
|
||
self.cmd_pub.publish(Twist())
|
||
return
|
||
self._occupancy_check_after_turn_pending = False
|
||
|
||
self.get_logger().info(
|
||
f'Курс взят ({math.degrees(self.target_yaw):.1f}°). '
|
||
f'Еду {self.total_cells} кл. прямо.'
|
||
)
|
||
self.state = 'DRIVE_STRAIGHT'
|
||
self.drive_start_x = self.current_x
|
||
self.drive_start_y = self.current_y
|
||
else:
|
||
self.get_logger().info('Ориентация выровнена!')
|
||
self.current_marker_id = self.target_marker_id
|
||
self.state = 'IDLE'
|
||
|
||
elif self.state == 'DRIVE_STRAIGHT':
|
||
if self.current_marker_id == self.target_marker_id:
|
||
self.get_logger().info('Финальный маркер подтверждён!')
|
||
self.waypoint_idx += 1
|
||
if self.waypoint_idx < len(self.waypoints):
|
||
self._navigate_to_next_waypoint()
|
||
else:
|
||
self.state = 'IDLE'
|
||
self.cmd_pub.publish(twist)
|
||
return
|
||
|
||
ids_list, corners, img_w, img_h = self._detect_markers()
|
||
|
||
if self.waypoint_idx < len(self.waypoints):
|
||
next_wp = self.waypoints[self.waypoint_idx]
|
||
if ids_list is not None and next_wp in ids_list:
|
||
self.get_logger().info(f'Промежуточный waypoint {next_wp} достигнут')
|
||
self.current_marker_id = next_wp
|
||
self._line_start_x, self._line_start_y = self.get_marker_coords(next_wp)
|
||
self.waypoint_idx += 1
|
||
if self.waypoint_idx < len(self.waypoints):
|
||
self._navigate_to_next_waypoint()
|
||
else:
|
||
self.state = 'IDLE'
|
||
self.cmd_pub.publish(twist)
|
||
return
|
||
|
||
if ids_list is not None and self.target_marker_id in ids_list:
|
||
ex, ey = self._marker_offset(self.target_marker_id, ids_list, corners, img_w, img_h)
|
||
if ey is not None and ey > -0.2:
|
||
self.get_logger().info(
|
||
f'Финальный маркер {self.target_marker_id} в кадре (err_y={ey:.2f}).'
|
||
)
|
||
self.waypoint_idx += 1
|
||
if self.waypoint_idx < len(self.waypoints):
|
||
self._navigate_to_next_waypoint()
|
||
else:
|
||
self.state = 'IDLE'
|
||
self.cmd_pub.publish(twist)
|
||
return
|
||
|
||
visual_err_x = None
|
||
if ids_list:
|
||
best_dist = float('inf')
|
||
for mid in ids_list:
|
||
ex, ey = self._marker_offset(mid, ids_list, corners, img_w, img_h)
|
||
if ex is None:
|
||
continue
|
||
dist = abs(ex) + abs(ey)
|
||
if dist < best_dist:
|
||
best_dist = dist
|
||
visual_err_x = ex
|
||
|
||
if visual_err_x is not None:
|
||
self._visual_err_x_memory = visual_err_x
|
||
else:
|
||
self._visual_err_x_memory *= K_VISUAL_DECAY
|
||
|
||
base_yaw_err = self.normalize_angle(self.target_yaw - self.current_yaw)
|
||
|
||
use_visual = abs(self._visual_err_x_memory) > 0.01
|
||
twist.linear.x = DRIVE_SPEED
|
||
if use_visual:
|
||
twist.angular.z = K_HEADING * base_yaw_err - K_VISUAL * self._visual_err_x_memory
|
||
else:
|
||
twist.angular.z = K_HEADING * base_yaw_err
|
||
twist.angular.z = max(-MAX_ANGULAR, min(MAX_ANGULAR, twist.angular.z))
|
||
|
||
if not hasattr(self, '_cross_log_counter'):
|
||
self._cross_log_counter = 0
|
||
self._cross_log_counter += 1
|
||
if self._cross_log_counter % 10 == 0:
|
||
lidar_msg = 'n/a' if self._last_front_range is None else f'{self._last_front_range:.2f} m'
|
||
self.get_logger().info(
|
||
f'[VIS-MEM] err_x={self._visual_err_x_memory:+.3f}, '
|
||
f'yaw={math.degrees(base_yaw_err):+.1f}°, '
|
||
f'tw.z={twist.angular.z:+.3f}, lidar={lidar_msg}'
|
||
)
|
||
|
||
# ── Визуальное центрирование над финальным маркером ─────────────────
|
||
elif self.state == 'CENTER_MARKER':
|
||
if self.latest_image is None:
|
||
self.get_logger().warn('CENTER_MARKER: нет кадра', throttle_duration_sec=1.0)
|
||
return
|
||
|
||
try:
|
||
frame = self.latest_image.copy()
|
||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||
gray = np.ascontiguousarray(gray, dtype=np.uint8)
|
||
|
||
self._debug_frame_idx += 1
|
||
if self._debug_frame_idx % 40 == 1:
|
||
cv2.imwrite('/tmp/aruco_debug_gray.png', gray)
|
||
self.get_logger().info(
|
||
f'DEBUG кадр #{self._debug_frame_idx}: shape={gray.shape}, '
|
||
f'mean={gray.mean():.1f} → /tmp/aruco_debug_gray.png'
|
||
)
|
||
|
||
h_orig, w_orig = gray.shape
|
||
scale = min(1.0, 640.0 / w_orig)
|
||
if scale < 1.0:
|
||
small = cv2.resize(gray, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
|
||
else:
|
||
small = gray
|
||
|
||
corners, ids, _ = cv2.aruco.detectMarkers(
|
||
small,
|
||
self.aruco_dict,
|
||
parameters=self.aruco_params,
|
||
)
|
||
|
||
if ids is not None and scale < 1.0:
|
||
corners = [c / scale for c in corners]
|
||
|
||
target_corners = None
|
||
if ids is not None:
|
||
found_ids = ids.flatten().tolist()
|
||
for i in range(len(ids)):
|
||
if ids[i][0] == self.target_marker_id:
|
||
target_corners = corners[i][0]
|
||
break
|
||
if target_corners is None:
|
||
self.get_logger().warn(
|
||
f'CENTER_MARKER: вижу {found_ids}, цель={self.target_marker_id} не найдена',
|
||
throttle_duration_sec=1.0,
|
||
)
|
||
else:
|
||
self.get_logger().warn(
|
||
'CENTER_MARKER: маркеры не найдены',
|
||
throttle_duration_sec=1.0,
|
||
)
|
||
self._centering_ok_count = 0
|
||
|
||
if target_corners is not None:
|
||
mcx = np.mean(target_corners[:, 0])
|
||
mcy = np.mean(target_corners[:, 1])
|
||
h, w = gray.shape
|
||
|
||
err_x = (mcx - w / 2.0) / (w / 2.0)
|
||
err_y = (mcy - h / 2.0) / (h / 2.0)
|
||
|
||
twist.linear.x = -0.12 * err_y
|
||
twist.angular.z = -0.25 * err_x
|
||
|
||
twist.linear.x = max(-0.08, min(0.08, twist.linear.x))
|
||
twist.angular.z = max(-0.20, min(0.20, twist.angular.z))
|
||
|
||
if abs(err_x) < 0.05 and abs(err_y) < 0.05:
|
||
self._centering_ok_count += 1
|
||
if self._centering_ok_count >= 5:
|
||
self.get_logger().info(
|
||
f'Центрирование завершено (err={err_x:.2f},{err_y:.2f}). '
|
||
'Выравниваю ориентацию.'
|
||
)
|
||
self._centering_ok_count = 0
|
||
self.state = 'ALIGN_ORIENTATION'
|
||
else:
|
||
self._centering_ok_count = 0
|
||
|
||
except Exception as e:
|
||
self.get_logger().error(f'OpenCV ошибка: {e}')
|
||
|
||
self.cmd_pub.publish(twist)
|
||
|
||
|
||
def main(args=None):
|
||
rclpy.init(args=args)
|
||
navigator = RMC2Navigator()
|
||
try:
|
||
rclpy.spin(navigator)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
finally:
|
||
navigator.cmd_pub.publish(Twist())
|
||
navigator.destroy_node()
|
||
rclpy.shutdown()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|