import os import importlib import inspect import asyncio from typing import Dict, Any # Словарь для хранения загруженных функций _action_functions = {} # Загружаем все файлы из папки ./callbacks def load_callbacks(): callbacks_dir = "./callbacks" if not os.path.exists(callbacks_dir): return print("\nLoaded callbacks:") for filename in os.listdir(callbacks_dir): if filename.endswith(".py") and filename != "__init__.py": module_name = filename[:-3] # Удаляем .py try: module = importlib.import_module(f"callbacks.{module_name}") # Получаем все функции из модуля for func_name, func in inspect.getmembers(module, inspect.isfunction): # Сохраняем с префиксом имени файла _action_functions[f"{module_name}.{func_name}"] = func print(f"{module_name}.{func_name}") # Получаем все асинхронные функции for func_name, func in inspect.getmembers(module, inspect.iscoroutinefunction): _action_functions[f"{module_name}.{func_name}"] = func except Exception as e: print(f"Error loading module {module_name}: {str(e)}") # Загружаем функции при старте load_callbacks() async def call_action(name: str, args: Dict[str, Any]): """ Вызывает функцию с заданным именем в формате filename.function_name, передавая ей аргументы. Если функция асинхронная, использует await. """ if name not in _action_functions: raise ValueError(f"Action '{name}' not found") func = _action_functions[name] # Проверяем, является ли функция асинхронной if inspect.iscoroutinefunction(func): return await func(args) else: return func(args)