Files
oaip-project-app/callbacks/v2ray.py
2025-05-21 07:55:46 +03:00

136 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiohttp
import sys
import os
# Добавляем родительскую директорию в sys.path
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if parent_dir not in sys.path:
sys.path.append(parent_dir)
from utils.windows_toast import show_notification
from utils.config import read_config
class V2rayAController:
def __init__(self, host='http://localhost:2017', username='admin', password='admin'):
self.host = host
self.username = username
self.password = password
self.token = None
self.session = None
async def authenticate(self):
"""Аутентификация и получение токена"""
self.session = aiohttp.ClientSession()
auth_url = f"{self.host}/api/login"
credentials = {
'username': self.username,
'password': self.password
}
try:
async with self.session.post(auth_url, json=credentials) as response:
if response.status == 200:
data = await response.json()
self.token = data.get('data').get('token')
print("V2RayA auth OK")
return True
else:
print(f"Ошибка аутентификации: {response.status}")
return False
except Exception as e:
raise OSError("Failed to authenticate v2ray")
print(f"Ошибка при аутентификации: {e}")
return False
async def enable_proxy(self):
"""Включение прокси"""
if not self.token:
print("Не выполнен вход. Пожалуйста, сначала выполните аутентификацию")
return False
url = f"{self.host}/api/v2ray"
headers = {'Authorization': f"Bearer {self.token}"}
data = {}
try:
async with self.session.post(url, headers=headers, json=data) as response:
if response.status == 200:
print("Прокси успешно включен")
await show_notification({"title":"✅ V2Ray proxy","message":"Прокси успешно включен"})
return True
else:
print(f"Ошибка при включении прокси: {response.status}")
return False
except Exception as e:
raise OSError("Failed to enable proxy")
print(f"Ошибка при включении прокси: {e}")
return False
async def disable_proxy(self):
"""Выключение прокси"""
if not self.token:
print("Не выполнен вход. Пожалуйста, сначала выполните аутентификацию")
return False
url = f"{self.host}/api/v2ray"
headers = {'Authorization': f"Bearer {self.token}"}
data = {'operation': 'stop'}
try:
async with self.session.delete(url, headers=headers, json=data) as response:
if response.status == 200:
print("Прокси успешно выключен")
await show_notification({"title":"🔴 V2Ray proxy","message":"Прокси успешно выключен"})
return True
else:
print(f"Ошибка при выключении прокси: {response.status}")
return False
except Exception as e:
raise OSError("Failed to disable proxy")
print(f"Ошибка при выключении прокси: {e}")
return False
async def close(self):
"""Закрытие сессии"""
if self.session:
await self.session.close()
async def enable_vpn(args):
# Инициализация контроллера
config = await read_config("v2ray.yaml")
if config.get("username") and config.get("password"):
controller = V2rayAController(
host='http://localhost:2017',
username=config.get("username"),
password=config.get("password")
)
# Аутентификация
if await controller.authenticate():
await controller.enable_proxy()
# Закрытие сессии
await controller.close()
else:
raise OSError("Config unset")
async def disable_vpn(args):
# Инициализация контроллера
config = await read_config("v2ray.yaml")
if config.get("username") and config.get("password"):
controller = V2rayAController(
host='http://localhost:2017',
username=config.get("username"),
password=config.get("password")
)
# Аутентификация
if await controller.authenticate():
await controller.disable_proxy()
# Закрытие сессии
await controller.close()
else:
raise OSError("Config unset")