#!/usr/bin/env python3 """ Файрволл на Python для Windows с использованием Windows Firewall и блокировкой по доменам """ import subprocess import os import sys import threading import time from datetime import datetime import dns.resolver import re class WindowsFirewall: def __init__(self): self.blocked_domains = set() self.blocked_ips = set() self.blocked_ports = set() self.domain_to_ip_cache = {} self.dns_cache_ttl = 300 # 5 минут TTL для кэша self.is_monitoring = False self.monitor_thread = None self.log_file = "windows_firewall.log" self.rule_prefix = "PyFirewall_" # Настройка DNS резолвера self.dns_resolver = dns.resolver.Resolver() self.dns_resolver.timeout = 2 self.dns_resolver.lifetime = 2 def run_powershell(self, cmd): """Выполнение PowerShell команды""" try: result = subprocess.run( ["powershell", "-Command", cmd], check=True, capture_output=True, text=True, shell=True ) return True, result.stdout except subprocess.CalledProcessError as e: error_msg = f"PowerShell error: {e.stderr}" print(error_msg) self.log_event(error_msg) return False, e.stderr def run_netsh(self, cmd): """Выполнение netsh команды""" try: result = subprocess.run( f"netsh {cmd}", check=True, capture_output=True, text=True, shell=True ) return True, result.stdout except subprocess.CalledProcessError as e: error_msg = f"netsh error: {e.stderr}" print(error_msg) self.log_event(error_msg) return False, e.stderr def log_event(self, message): """Логирование событий""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_message = f"[{timestamp}] {message}" print(log_message) with open(self.log_file, 'a', encoding='utf-8') as f: f.write(log_message + "\n") def initialize(self): """Инициализация файрволла""" self.log_event("Windows Firewall manager initialized") return True def create_firewall_rule(self, name, direction="out", action="block", protocol="any", port=None, remote_ip=None): """Создание правила в Windows Firewall""" # Формируем базовую команду cmd_parts = [ "advfirewall firewall add rule", f'name="{self.rule_prefix}{name}"', f"dir={direction}", f"action={action}", f"protocol={protocol}", "enable=yes" ] if port: cmd_parts.append(f"remoteport={port}") if remote_ip: cmd_parts.append(f"remoteip={remote_ip}") cmd = " ".join(cmd_parts) success, output = self.run_netsh(cmd) if success: self.log_event(f"Firewall rule created: {name}") return success def delete_firewall_rule(self, name): """Удаление правила из Windows Firewall""" cmd = f'advfirewall firewall delete rule name="{self.rule_prefix}{name}"' success, output = self.run_netsh(cmd) if success: self.log_event(f"Firewall rule deleted: {name}") return success def block_ip(self, ip_address): """Блокировка IP-адреса""" if ip_address in self.blocked_ips: return True # Блокируем исходящий трафик к IP rule_name = f"block_ip_{ip_address}" success = self.create_firewall_rule( name=rule_name, direction="out", action="block", remote_ip=ip_address ) if success: self.blocked_ips.add(ip_address) self.log_event(f"IP blocked: {ip_address}") return True return False def unblock_ip(self, ip_address): """Разблокировка IP-адреса""" if ip_address not in self.blocked_ips: return True rule_name = f"block_ip_{ip_address}" success = self.delete_firewall_rule(rule_name) if success: self.blocked_ips.discard(ip_address) self.log_event(f"IP unblocked: {ip_address}") return True return False def allow_port(self, port, protocol="tcp"): """Разрешение порта""" rule_name = f"allow_{protocol}_{port}" # Удаляем блокирующее правило если существует block_rule_name = f"block_{protocol}_{port}" self.delete_firewall_rule(block_rule_name) success = self.create_firewall_rule( name=rule_name, direction="out", action="allow", protocol=protocol, port=str(port) ) if success: self.blocked_ports.discard((protocol, port)) self.log_event(f"Port allowed: {protocol}/{port}") return True return False def block_port(self, port, protocol="tcp"): """Блокировка порта""" rule_name = f"block_{protocol}_{port}" # Удаляем разрешающее правило если существует allow_rule_name = f"allow_{protocol}_{port}" self.delete_firewall_rule(allow_rule_name) success = self.create_firewall_rule( name=rule_name, direction="out", action="block", protocol=protocol, port=str(port) ) if success: self.blocked_ports.add((protocol, port)) self.log_event(f"Port blocked: {protocol}/{port}") return True return False def resolve_domain(self, domain): """Разрешение домена в IP-адреса""" domain = domain.lower().strip() # Проверка кэша if domain in self.domain_to_ip_cache: cache_time, ips = self.domain_to_ip_cache[domain] if time.time() - cache_time < self.dns_cache_ttl: return ips try: answers = self.dns_resolver.resolve(domain, 'A') ips = {str(answer) for answer in answers} # Обновление кэша self.domain_to_ip_cache[domain] = (time.time(), ips) self.log_event(f"DNS resolved: {domain} -> {', '.join(ips)}") return ips except Exception as e: self.log_event(f"DNS resolution failed for {domain}: {str(e)}") return set() def block_domain(self, domain): """Блокировка домена""" domain = domain.lower().strip() if domain in self.blocked_domains: self.log_event(f"Domain already blocked: {domain}") return True # Разрешаем домен в IP-адреса ips = self.resolve_domain(domain) if not ips: self.log_event(f"Warning: No IP addresses found for domain {domain}") return False # Блокируем все IP-адреса домена success_count = 0 for ip in ips: if self.block_ip(ip): success_count += 1 if success_count > 0: self.blocked_domains.add(domain) self.log_event(f"Domain blocked: {domain} ({success_count} IPs)") return True return False def unblock_domain(self, domain): """Разблокировка домена""" domain = domain.lower().strip() if domain not in self.blocked_domains: self.log_event(f"Domain not blocked: {domain}") return True # Получаем IP-адреса из кэша или разрешаем заново ips = set() if domain in self.domain_to_ip_cache: cache_time, cached_ips = self.domain_to_ip_cache[domain] ips = cached_ips else: ips = self.resolve_domain(domain) # Разблокируем все IP-адреса домена success_count = 0 for ip in ips: if self.unblock_ip(ip): success_count += 1 # Удаляем домен из списка блокировок self.blocked_domains.discard(domain) self.log_event(f"Domain unblocked: {domain} ({success_count} IPs)") return True def start_domain_monitoring(self, interval=60): """Запуск мониторинга доменов для обновления IP-адресов""" def monitor_loop(): self.log_event("Domain monitoring started") while self.is_monitoring: try: self.update_domain_ips() time.sleep(interval) except Exception as e: self.log_event(f"Error in domain monitoring: {e}") time.sleep(interval) self.is_monitoring = True self.monitor_thread = threading.Thread(target=monitor_loop) self.monitor_thread.daemon = True self.monitor_thread.start() def stop_domain_monitoring(self): """Остановка мониторинга доменов""" self.is_monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=5) self.log_event("Domain monitoring stopped") def update_domain_ips(self): """Обновление IP-адресов для заблокированных доменов""" if not self.blocked_domains: return self.log_event("Updating domain IP addresses...") for domain in list(self.blocked_domains): old_ips = set() if domain in self.domain_to_ip_cache: cache_time, cached_ips = self.domain_to_ip_cache[domain] old_ips = cached_ips # Разрешаем домен заново new_ips = self.resolve_domain(domain) if not new_ips: continue # Находим новые IP-адреса для блокировки ips_to_block = new_ips - old_ips # Находим старые IP-адреса для разблокировки ips_to_unblock = old_ips - new_ips # Блокируем новые IP-адреса for ip in ips_to_block: self.block_ip(ip) # Разблокируем старые IP-адреса for ip in ips_to_unblock: self.unblock_ip(ip) if ips_to_block or ips_to_unblock: self.log_event(f"Domain {domain} updated: +{len(ips_to_block)} -{len(ips_to_unblock)} IPs") def get_existing_rules(self): """Получение списка существующих правил""" cmd = 'advfirewall firewall show rule name=all' success, output = self.run_netsh(cmd) rules = [] if success: # Парсим вывод netsh lines = output.split('\n') current_rule = {} for line in lines: line = line.strip() if line.startswith('Rule Name:'): if current_rule: rules.append(current_rule) current_rule = {'name': line.split(':', 1)[1].strip()} elif ':' in line: key, value = line.split(':', 1) current_rule[key.strip()] = value.strip() if current_rule: rules.append(current_rule) return [rule for rule in rules if rule.get('name', '').startswith(self.rule_prefix)] def show_status(self): """Показать статус файрволла""" print("\n=== Windows Firewall Status ===") print(f"Blocked domains: {len(self.blocked_domains)}") print(f"Blocked IPs: {len(self.blocked_ips)}") print(f"Blocked ports: {len(self.blocked_ports)}") print(f"Cached domains: {len(self.domain_to_ip_cache)}") print(f"Domain monitoring: {'Running' if self.is_monitoring else 'Stopped'}") # Показать правила брандмауэра rules = self.get_existing_rules() print(f"\nFirewall rules: {len(rules)}") for rule in rules: print(f" {rule.get('name', 'Unknown')}: {rule.get('Direction', '')} {rule.get('Action', '')}") if self.blocked_domains: print("\nBlocked domains:") for domain in sorted(self.blocked_domains): if domain in self.domain_to_ip_cache: cache_time, ips = self.domain_to_ip_cache[domain] ips_str = ', '.join(ips) print(f" {domain} -> {ips_str}") else: print(f" {domain} -> [not resolved]") def cleanup(self): """Очистка правил""" self.log_event("Cleaning up firewall rules...") # Удаляем все правила, созданные нашим файрволлом rules = self.get_existing_rules() for rule in rules: rule_name = rule.get('name', '') if rule_name.startswith(self.rule_prefix): # Извлекаем оригинальное имя правила original_name = rule_name[len(self.rule_prefix):] self.delete_firewall_rule(original_name) self.stop_domain_monitoring() self.blocked_ips.clear() self.blocked_domains.clear() self.blocked_ports.clear() self.log_event("Firewall cleaned up") def enable_windows_firewall(self): """Включение Windows Firewall если выключен""" cmds = [ "advfirewall set allprofiles state on", "advfirewall set allprofiles firewallpolicy blockinbound,allowoutbound" ] for cmd in cmds: success, output = self.run_netsh(cmd) if not success: self.log_event("Warning: Could not enable Windows Firewall") return False return True def load_domains_from_file(filename): """Загрузка доменов из файла""" domains = [] try: with open(filename, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#'): domains.append(line) except FileNotFoundError: print(f"File {filename} not found") return domains def interactive_menu(): """Интерактивное меню управления файрволлом""" fw = WindowsFirewall() try: if not fw.initialize(): print("Failed to initialize firewall") return # Включаем брандмауэр Windows fw.enable_windows_firewall() while True: print("\n=== Windows Firewall with Domain Blocking ===") print("1. Block domain") print("2. Unblock domain") print("3. Block IP") print("4. Block port") print("5. Allow port") print("6. Show status") print("7. Start domain monitoring") print("8. Stop domain monitoring") print("9. Load domains from file") print("10. Enable Windows Firewall") print("11. Exit") choice = input("Select option: ").strip() if choice == '1': domain = input("Enter domain to block: ").strip() if domain: fw.block_domain(domain) elif choice == '2': domain = input("Enter domain to unblock: ").strip() if domain: fw.unblock_domain(domain) elif choice == '3': ip = input("Enter IP to block: ").strip() if ip: fw.block_ip(ip) elif choice == '4': port = input("Enter port to block: ").strip() protocol = input("Protocol (tcp/udp, default tcp): ").strip() or "tcp" if port.isdigit(): fw.block_port(int(port), protocol) elif choice == '5': port = input("Enter port to allow: ").strip() protocol = input("Protocol (tcp/udp, default tcp): ").strip() or "tcp" if port.isdigit(): fw.allow_port(int(port), protocol) elif choice == '6': fw.show_status() elif choice == '7': fw.start_domain_monitoring() print("Domain monitoring started") elif choice == '8': fw.stop_domain_monitoring() print("Domain monitoring stopped") elif choice == '9': filename = input("Enter filename: ").strip() domains = load_domains_from_file(filename) for domain in domains: fw.block_domain(domain) print(f"Loaded {len(domains)} domains from {filename}") elif choice == '10': fw.enable_windows_firewall() print("Windows Firewall enabled") elif choice == '11': break else: print("Invalid option") finally: fw.cleanup() def check_admin_privileges(): """Проверка прав администратора на Windows""" try: # Попытка выполнить команду, требующую админские права result = subprocess.run( ["netsh", "advfirewall", "show", "allprofiles"], capture_output=True, text=True, shell=True ) return result.returncode == 0 except: return False # Пример использования if __name__ == "__main__": if not check_admin_privileges(): print("Requires administrator privileges") print("Please run as Administrator") sys.exit(1) # Проверка наличия dnspython try: import dns.resolver except ImportError: print("Error: dnspython library required. Install with: pip install dnspython") sys.exit(1) if len(sys.argv) > 1 and sys.argv[1] == "--interactive": interactive_menu() else: # Автоматическая настройка с примером блокировки доменов fw = WindowsFirewall() try: if not fw.initialize(): print("Failed to initialize firewall") sys.exit(1) # Включаем брандмауэр Windows fw.enable_windows_firewall() # Базовые правила fw.block_port(23) # Блокировка Telnet fw.allow_port(22) # Разрешение SSH fw.allow_port(80) # Разрешение HTTP fw.allow_port(443) # Разрешение HTTPS # Блокировка доменов (пример) fw.block_domain("example.com") fw.block_domain("test.org") # Запуск мониторинга доменов fw.start_domain_monitoring() print("Windows Firewall rules applied with domain blocking") print("Domain monitoring is running...") print("Press Ctrl+C to stop") # Бесконечный цикл для поддержания работы try: while True: time.sleep(1) except KeyboardInterrupt: print("\nShutting down...") finally: fw.cleanup() print("Firewall rules cleaned up")