Files
FairWall/FAIRWALL/test2_win.py

589 lines
20 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Файрволл на Python для Windows с использованием Windows Firewall и блокировкой по доменам
"""
import subprocess
import os
import sys
import threading
import time
from datetime import datetime
import dns.resolver
import re
class WindowsFirewall:
def __init__(self):
self.blocked_domains = set()
self.blocked_ips = set()
self.blocked_ports = set()
self.domain_to_ip_cache = {}
self.dns_cache_ttl = 300 # 5 минут TTL для кэша
self.is_monitoring = False
self.monitor_thread = None
self.log_file = "windows_firewall.log"
self.rule_prefix = "PyFirewall_"
# Настройка DNS резолвера
self.dns_resolver = dns.resolver.Resolver()
self.dns_resolver.timeout = 2
self.dns_resolver.lifetime = 2
def run_powershell(self, cmd):
"""Выполнение PowerShell команды"""
try:
result = subprocess.run(
["powershell", "-Command", cmd],
check=True,
capture_output=True,
text=True,
shell=True
)
return True, result.stdout
except subprocess.CalledProcessError as e:
error_msg = f"PowerShell error: {e.stderr}"
print(error_msg)
self.log_event(error_msg)
return False, e.stderr
def run_netsh(self, cmd):
"""Выполнение netsh команды"""
try:
result = subprocess.run(
f"netsh {cmd}",
check=True,
capture_output=True,
text=True,
shell=True
)
return True, result.stdout
except subprocess.CalledProcessError as e:
error_msg = f"netsh error: {e.stderr}"
print(error_msg)
self.log_event(error_msg)
return False, e.stderr
def log_event(self, message):
"""Логирование событий"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"[{timestamp}] {message}"
print(log_message)
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_message + "\n")
def initialize(self):
"""Инициализация файрволла"""
self.log_event("Windows Firewall manager initialized")
return True
def create_firewall_rule(self, name, direction="out", action="block",
protocol="any", port=None, remote_ip=None):
"""Создание правила в Windows Firewall"""
# Формируем базовую команду
cmd_parts = [
"advfirewall firewall add rule",
f'name="{self.rule_prefix}{name}"',
f"dir={direction}",
f"action={action}",
f"protocol={protocol}",
"enable=yes"
]
if port:
cmd_parts.append(f"remoteport={port}")
if remote_ip:
cmd_parts.append(f"remoteip={remote_ip}")
cmd = " ".join(cmd_parts)
success, output = self.run_netsh(cmd)
if success:
self.log_event(f"Firewall rule created: {name}")
return success
def delete_firewall_rule(self, name):
"""Удаление правила из Windows Firewall"""
cmd = f'advfirewall firewall delete rule name="{self.rule_prefix}{name}"'
success, output = self.run_netsh(cmd)
if success:
self.log_event(f"Firewall rule deleted: {name}")
return success
def block_ip(self, ip_address):
"""Блокировка IP-адреса"""
if ip_address in self.blocked_ips:
return True
# Блокируем исходящий трафик к IP
rule_name = f"block_ip_{ip_address}"
success = self.create_firewall_rule(
name=rule_name,
direction="out",
action="block",
remote_ip=ip_address
)
if success:
self.blocked_ips.add(ip_address)
self.log_event(f"IP blocked: {ip_address}")
return True
return False
def unblock_ip(self, ip_address):
"""Разблокировка IP-адреса"""
if ip_address not in self.blocked_ips:
return True
rule_name = f"block_ip_{ip_address}"
success = self.delete_firewall_rule(rule_name)
if success:
self.blocked_ips.discard(ip_address)
self.log_event(f"IP unblocked: {ip_address}")
return True
return False
def allow_port(self, port, protocol="tcp"):
"""Разрешение порта"""
rule_name = f"allow_{protocol}_{port}"
# Удаляем блокирующее правило если существует
block_rule_name = f"block_{protocol}_{port}"
self.delete_firewall_rule(block_rule_name)
success = self.create_firewall_rule(
name=rule_name,
direction="out",
action="allow",
protocol=protocol,
port=str(port)
)
if success:
self.blocked_ports.discard((protocol, port))
self.log_event(f"Port allowed: {protocol}/{port}")
return True
return False
def block_port(self, port, protocol="tcp"):
"""Блокировка порта"""
rule_name = f"block_{protocol}_{port}"
# Удаляем разрешающее правило если существует
allow_rule_name = f"allow_{protocol}_{port}"
self.delete_firewall_rule(allow_rule_name)
success = self.create_firewall_rule(
name=rule_name,
direction="out",
action="block",
protocol=protocol,
port=str(port)
)
if success:
self.blocked_ports.add((protocol, port))
self.log_event(f"Port blocked: {protocol}/{port}")
return True
return False
def resolve_domain(self, domain):
"""Разрешение домена в IP-адреса"""
domain = domain.lower().strip()
# Проверка кэша
if domain in self.domain_to_ip_cache:
cache_time, ips = self.domain_to_ip_cache[domain]
if time.time() - cache_time < self.dns_cache_ttl:
return ips
try:
answers = self.dns_resolver.resolve(domain, 'A')
ips = {str(answer) for answer in answers}
# Обновление кэша
self.domain_to_ip_cache[domain] = (time.time(), ips)
self.log_event(f"DNS resolved: {domain} -> {', '.join(ips)}")
return ips
except Exception as e:
self.log_event(f"DNS resolution failed for {domain}: {str(e)}")
return set()
def block_domain(self, domain):
"""Блокировка домена"""
domain = domain.lower().strip()
if domain in self.blocked_domains:
self.log_event(f"Domain already blocked: {domain}")
return True
# Разрешаем домен в IP-адреса
ips = self.resolve_domain(domain)
if not ips:
self.log_event(f"Warning: No IP addresses found for domain {domain}")
return False
# Блокируем все IP-адреса домена
success_count = 0
for ip in ips:
if self.block_ip(ip):
success_count += 1
if success_count > 0:
self.blocked_domains.add(domain)
self.log_event(f"Domain blocked: {domain} ({success_count} IPs)")
return True
return False
def unblock_domain(self, domain):
"""Разблокировка домена"""
domain = domain.lower().strip()
if domain not in self.blocked_domains:
self.log_event(f"Domain not blocked: {domain}")
return True
# Получаем IP-адреса из кэша или разрешаем заново
ips = set()
if domain in self.domain_to_ip_cache:
cache_time, cached_ips = self.domain_to_ip_cache[domain]
ips = cached_ips
else:
ips = self.resolve_domain(domain)
# Разблокируем все IP-адреса домена
success_count = 0
for ip in ips:
if self.unblock_ip(ip):
success_count += 1
# Удаляем домен из списка блокировок
self.blocked_domains.discard(domain)
self.log_event(f"Domain unblocked: {domain} ({success_count} IPs)")
return True
def start_domain_monitoring(self, interval=60):
"""Запуск мониторинга доменов для обновления IP-адресов"""
def monitor_loop():
self.log_event("Domain monitoring started")
while self.is_monitoring:
try:
self.update_domain_ips()
time.sleep(interval)
except Exception as e:
self.log_event(f"Error in domain monitoring: {e}")
time.sleep(interval)
self.is_monitoring = True
self.monitor_thread = threading.Thread(target=monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_domain_monitoring(self):
"""Остановка мониторинга доменов"""
self.is_monitoring = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
self.log_event("Domain monitoring stopped")
def update_domain_ips(self):
"""Обновление IP-адресов для заблокированных доменов"""
if not self.blocked_domains:
return
self.log_event("Updating domain IP addresses...")
for domain in list(self.blocked_domains):
old_ips = set()
if domain in self.domain_to_ip_cache:
cache_time, cached_ips = self.domain_to_ip_cache[domain]
old_ips = cached_ips
# Разрешаем домен заново
new_ips = self.resolve_domain(domain)
if not new_ips:
continue
# Находим новые IP-адреса для блокировки
ips_to_block = new_ips - old_ips
# Находим старые IP-адреса для разблокировки
ips_to_unblock = old_ips - new_ips
# Блокируем новые IP-адреса
for ip in ips_to_block:
self.block_ip(ip)
# Разблокируем старые IP-адреса
for ip in ips_to_unblock:
self.unblock_ip(ip)
if ips_to_block or ips_to_unblock:
self.log_event(f"Domain {domain} updated: +{len(ips_to_block)} -{len(ips_to_unblock)} IPs")
def get_existing_rules(self):
"""Получение списка существующих правил"""
cmd = 'advfirewall firewall show rule name=all'
success, output = self.run_netsh(cmd)
rules = []
if success:
# Парсим вывод netsh
lines = output.split('\n')
current_rule = {}
for line in lines:
line = line.strip()
if line.startswith('Rule Name:'):
if current_rule:
rules.append(current_rule)
current_rule = {'name': line.split(':', 1)[1].strip()}
elif ':' in line:
key, value = line.split(':', 1)
current_rule[key.strip()] = value.strip()
if current_rule:
rules.append(current_rule)
return [rule for rule in rules if rule.get('name', '').startswith(self.rule_prefix)]
def show_status(self):
"""Показать статус файрволла"""
print("\n=== Windows Firewall Status ===")
print(f"Blocked domains: {len(self.blocked_domains)}")
print(f"Blocked IPs: {len(self.blocked_ips)}")
print(f"Blocked ports: {len(self.blocked_ports)}")
print(f"Cached domains: {len(self.domain_to_ip_cache)}")
print(f"Domain monitoring: {'Running' if self.is_monitoring else 'Stopped'}")
# Показать правила брандмауэра
rules = self.get_existing_rules()
print(f"\nFirewall rules: {len(rules)}")
for rule in rules:
print(f" {rule.get('name', 'Unknown')}: {rule.get('Direction', '')} {rule.get('Action', '')}")
if self.blocked_domains:
print("\nBlocked domains:")
for domain in sorted(self.blocked_domains):
if domain in self.domain_to_ip_cache:
cache_time, ips = self.domain_to_ip_cache[domain]
ips_str = ', '.join(ips)
print(f" {domain} -> {ips_str}")
else:
print(f" {domain} -> [not resolved]")
def cleanup(self):
"""Очистка правил"""
self.log_event("Cleaning up firewall rules...")
# Удаляем все правила, созданные нашим файрволлом
rules = self.get_existing_rules()
for rule in rules:
rule_name = rule.get('name', '')
if rule_name.startswith(self.rule_prefix):
# Извлекаем оригинальное имя правила
original_name = rule_name[len(self.rule_prefix):]
self.delete_firewall_rule(original_name)
self.stop_domain_monitoring()
self.blocked_ips.clear()
self.blocked_domains.clear()
self.blocked_ports.clear()
self.log_event("Firewall cleaned up")
def enable_windows_firewall(self):
"""Включение Windows Firewall если выключен"""
cmds = [
"advfirewall set allprofiles state on",
"advfirewall set allprofiles firewallpolicy blockinbound,allowoutbound"
]
for cmd in cmds:
success, output = self.run_netsh(cmd)
if not success:
self.log_event("Warning: Could not enable Windows Firewall")
return False
return True
def load_domains_from_file(filename):
"""Загрузка доменов из файла"""
domains = []
try:
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
domains.append(line)
except FileNotFoundError:
print(f"File {filename} not found")
return domains
def interactive_menu():
"""Интерактивное меню управления файрволлом"""
fw = WindowsFirewall()
try:
if not fw.initialize():
print("Failed to initialize firewall")
return
# Включаем брандмауэр Windows
fw.enable_windows_firewall()
while True:
print("\n=== Windows Firewall with Domain Blocking ===")
print("1. Block domain")
print("2. Unblock domain")
print("3. Block IP")
print("4. Block port")
print("5. Allow port")
print("6. Show status")
print("7. Start domain monitoring")
print("8. Stop domain monitoring")
print("9. Load domains from file")
print("10. Enable Windows Firewall")
print("11. Exit")
choice = input("Select option: ").strip()
if choice == '1':
domain = input("Enter domain to block: ").strip()
if domain:
fw.block_domain(domain)
elif choice == '2':
domain = input("Enter domain to unblock: ").strip()
if domain:
fw.unblock_domain(domain)
elif choice == '3':
ip = input("Enter IP to block: ").strip()
if ip:
fw.block_ip(ip)
elif choice == '4':
port = input("Enter port to block: ").strip()
protocol = input("Protocol (tcp/udp, default tcp): ").strip() or "tcp"
if port.isdigit():
fw.block_port(int(port), protocol)
elif choice == '5':
port = input("Enter port to allow: ").strip()
protocol = input("Protocol (tcp/udp, default tcp): ").strip() or "tcp"
if port.isdigit():
fw.allow_port(int(port), protocol)
elif choice == '6':
fw.show_status()
elif choice == '7':
fw.start_domain_monitoring()
print("Domain monitoring started")
elif choice == '8':
fw.stop_domain_monitoring()
print("Domain monitoring stopped")
elif choice == '9':
filename = input("Enter filename: ").strip()
domains = load_domains_from_file(filename)
for domain in domains:
fw.block_domain(domain)
print(f"Loaded {len(domains)} domains from {filename}")
elif choice == '10':
fw.enable_windows_firewall()
print("Windows Firewall enabled")
elif choice == '11':
break
else:
print("Invalid option")
finally:
fw.cleanup()
def check_admin_privileges():
"""Проверка прав администратора на Windows"""
try:
# Попытка выполнить команду, требующую админские права
result = subprocess.run(
["netsh", "advfirewall", "show", "allprofiles"],
capture_output=True,
text=True,
shell=True
)
return result.returncode == 0
except:
return False
# Пример использования
if __name__ == "__main__":
if not check_admin_privileges():
print("Requires administrator privileges")
print("Please run as Administrator")
sys.exit(1)
# Проверка наличия dnspython
try:
import dns.resolver
except ImportError:
print("Error: dnspython library required. Install with: pip install dnspython")
sys.exit(1)
if len(sys.argv) > 1 and sys.argv[1] == "--interactive":
interactive_menu()
else:
# Автоматическая настройка с примером блокировки доменов
fw = WindowsFirewall()
try:
if not fw.initialize():
print("Failed to initialize firewall")
sys.exit(1)
# Включаем брандмауэр Windows
fw.enable_windows_firewall()
# Базовые правила
fw.block_port(23) # Блокировка Telnet
fw.allow_port(22) # Разрешение SSH
fw.allow_port(80) # Разрешение HTTP
fw.allow_port(443) # Разрешение HTTPS
# Блокировка доменов (пример)
fw.block_domain("example.com")
fw.block_domain("test.org")
# Запуск мониторинга доменов
fw.start_domain_monitoring()
print("Windows Firewall rules applied with domain blocking")
print("Domain monitoring is running...")
print("Press Ctrl+C to stop")
# Бесконечный цикл для поддержания работы
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
fw.cleanup()
print("Firewall rules cleaned up")