import sys import os import asyncio import comtypes import ctypes from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume from ctypes import cast, POINTER from typing import Dict, Any from winrt.windows.media.control import GlobalSystemMediaTransportControlsSessionManager as MediaManager # Добавляем родительскую директорию в sys.path parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) if parent_dir not in sys.path: sys.path.append(parent_dir) async def set_volume(args: Dict[str, Any]) -> Dict[str, Any]: """ Sets the Windows system volume using the Core Audio API. Args: args: Dictionary containing 'level' (float from 0.0 to 1.0) for volume level. Returns: Dictionary with status message and input arguments. Raises: OSError: If volume setting fails or the platform is not Windows. """ try: # Initialize COM comtypes.CoInitialize() # Get volume level from args level = args.get('level', 0.5) # Default to 50% if not specified # Validate input level if not isinstance(level, (int, float)) or not 0.0 <= level <= 1.0: raise ValueError("Volume level must be a float between 0.0 and 1.0") # Get audio endpoint interface devices = AudioUtilities.GetSpeakers() interface = devices.Activate(IAudioEndpointVolume._iid_, comtypes.CLSCTX_ALL, None) volume = cast(interface, POINTER(IAudioEndpointVolume)) # Set volume level volume.SetMasterVolumeLevelScalar(float(level), None) return { "message": f"Volume set to {level*100:.0f}% successfully", "args": args } except Exception as e: raise OSError(f"Error while setting volume: {str(e)}") finally: # Release COM resources comtypes.CoUninitialize() async def play(args): # Получаем менеджер медиасессий sessions = await MediaManager.request_async() current_session = sessions.get_current_session() if current_session: # Воспроизведение await current_session.try_play_async() else: print("No media session is active.") async def pause(args): #print(await read_config("media.yaml")) # Получаем менеджер медиасессий sessions = await MediaManager.request_async() current_session = sessions.get_current_session() if current_session: # пауза await current_session.try_pause_async() else: print("No media session is active.") async def next(args): # Получаем менеджер медиасессий sessions = await MediaManager.request_async() current_session = sessions.get_current_session() if current_session: # вперед await current_session.try_skip_next_async() # await show_notification({"title":"Test","message":"test"}) else: print("No media session is active.") async def prev(args): # Получаем менеджер медиасессий sessions = await MediaManager.request_async() current_session = sessions.get_current_session() if current_session: # назад await current_session.try_skip_previous_async() else: print("No media session is active.")