diff --git a/FAIRWALL/main.py b/FAIRWALL/main.py new file mode 100644 index 0000000..a1b1d63 --- /dev/null +++ b/FAIRWALL/main.py @@ -0,0 +1,37 @@ +import socket +import os +from hashlib import sha256 + +def hasshing_data(data): + hashed = sha256(data).hexdigest() + return hashed + + +# 8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918 +# 1a7163c60c611f66527ba4ff5d07d45e2015897be68c00417302928711af81bb + +def auth(login, passwd): + login_valid = '8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918' + passwd_valid = '1a7163c60c611f66527ba4ff5d07d45e2015897be68c00417302928711af81bb' + + if len(login.split()) != 0 and len(passwd.split()) != 0: + if hasshing_data(login.encode()) == login_valid and hasshing_data(passwd.encode()) == passwd_valid: + print('Allowed success\n') + main_cycle() + else: + print('Not allowed start') + +def main_cycle(): + while True: + rule = input('RULE: ') + + if len(rule.split()) != 0: + if rule == 'exit': + break + else: + print(rule) + +if __name__ == '__main__': + login = input('login: ') + passwd = input('password: ') + auth(login, passwd) \ No newline at end of file diff --git a/FAIRWALL/test.py b/FAIRWALL/test.py new file mode 100644 index 0000000..483f7bc --- /dev/null +++ b/FAIRWALL/test.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +""" +Файрволл на Python с использованием iptables +""" + +import subprocess +import os +import sys + + +class IptablesFirewall: + def __init__(self): + self.chain_name = "PYTHON_FIREWALL" + + def run_command(self, cmd): + """Выполнение команды iptables""" + try: + subprocess.run(cmd, shell=True, check=True) + return True + except subprocess.CalledProcessError as e: + print(f"Error executing command: {cmd}") + return False + + def initialize(self): + """Инициализация цепочки файрволла""" + # Создание пользовательской цепочки + self.run_command(f"iptables -N {self.chain_name}") + # Переход к нашей цепочке из INPUT + self.run_command(f"iptables -I INPUT -j {self.chain_name}") + + def add_rule(self, protocol=None, source_ip=None, dest_port=None, action="DROP"): + """Добавление правила в iptables""" + cmd = f"iptables -A {self.chain_name}" + + if protocol: + cmd += f" -p {protocol}" + if source_ip: + cmd += f" -s {source_ip}" + if dest_port: + cmd += f" --dport {dest_port}" + + cmd += f" -j {action}" + + return self.run_command(cmd) + + def block_ip(self, ip_address): + """Блокировка IP-адреса""" + return self.add_rule(source_ip=ip_address, action="DROP") + + def allow_port(self, port, protocol="tcp"): + """Разрешение порта""" + return self.add_rule(protocol=protocol, dest_port=port, action="ACCEPT") + + def block_port(self, port, protocol="tcp"): + """Блокировка порта""" + return self.add_rule(protocol=protocol, dest_port=port, action="DROP") + + def cleanup(self): + """Очистка правил""" + self.run_command(f"iptables -D INPUT -j {self.chain_name}") + self.run_command(f"iptables -F {self.chain_name}") + self.run_command(f"iptables -X {self.chain_name}") + + +# Пример использования +if __name__ == "__main__": + if os.geteuid() != 0: + print("Requires root privileges") + sys.exit(1) + + fw = IptablesFirewall() + + try: + fw.initialize() + fw.block_port(23) # Блокировка Telnet + fw.allow_port(22) # Разрешение SSH + fw.allow_port(80) + fw.allow_port(443) # Разрешение HTTPS + fw.block_ip("10.0.12.206") + + print("Firewall rules applied. Press Enter to cleanup...") + input() + + finally: + fw.cleanup() + print("Firewall rules cleaned up") \ No newline at end of file diff --git a/FAIRWALL/test2.py b/FAIRWALL/test2.py new file mode 100644 index 0000000..e005fb3 --- /dev/null +++ b/FAIRWALL/test2.py @@ -0,0 +1,451 @@ +#!/usr/bin/env python3 +""" +Файрволл на Python с использованием iptables и блокировкой по доменам +""" + +import subprocess +import os +import sys +import socket +import threading +import time +from datetime import datetime +import dns.resolver + + +class IptablesFirewall: + def __init__(self): + self.chain_name = "PYTHON_FIREWALL" + self.blocked_domains = set() + self.domain_to_ip_cache = {} + self.dns_cache_ttl = 300 # 5 минут TTL для кэша + self.is_monitoring = False + self.monitor_thread = None + self.log_file = "iptables_firewall.log" + + # Настройка DNS резолвера + self.dns_resolver = dns.resolver.Resolver() + self.dns_resolver.timeout = 2 + self.dns_resolver.lifetime = 2 + + def run_command(self, cmd): + """Выполнение команды iptables""" + try: + result = subprocess.run(cmd, shell=True, check=True, + capture_output=True, text=True) + return True + except subprocess.CalledProcessError as e: + print(f"Error executing command: {cmd}") + print(f"Error output: {e.stderr}") + return False + + def log_event(self, message): + """Логирование событий""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_message = f"[{timestamp}] {message}" + + print(log_message) + + with open(self.log_file, 'a') as f: + f.write(log_message + "\n") + + def initialize(self): + """Инициализация цепочки файрволла""" + # Очистка старых правил если они есть + self.cleanup() + + # Создание пользовательской цепочки + self.run_command(f"iptables -N {self.chain_name}") + # Переход к нашей цепочке из INPUT и OUTPUT + self.run_command(f"iptables -I INPUT -j {self.chain_name}") + self.run_command(f"iptables -I OUTPUT -j {self.chain_name}") + self.run_command(f"iptables -I FORWARD -j {self.chain_name}") + + # Разрешить localhost трафик + self.run_command(f"iptables -A {self.chain_name} -s 127.0.0.1 -d 127.0.0.1 -j ACCEPT") + + self.log_event("Firewall initialized") + + def add_rule(self, protocol=None, source_ip=None, dest_ip=None, dest_port=None, action="DROP"): + """Добавление правила в iptables""" + cmd = f"iptables -A {self.chain_name}" + + if protocol: + cmd += f" -p {protocol}" + if source_ip: + cmd += f" -s {source_ip}" + if dest_ip: + cmd += f" -d {dest_ip}" + if dest_port: + cmd += f" --dport {dest_port}" + + cmd += f" -j {action}" + + success = self.run_command(cmd) + if success: + self.log_event(f"Rule added: {cmd}") + return success + + def remove_rule(self, protocol=None, source_ip=None, dest_ip=None, dest_port=None, action="DROP"): + """Удаление правила из iptables""" + cmd = f"iptables -D {self.chain_name}" + + if protocol: + cmd += f" -p {protocol}" + if source_ip: + cmd += f" -s {source_ip}" + if dest_ip: + cmd += f" -d {dest_ip}" + if dest_port: + cmd += f" --dport {dest_port}" + + cmd += f" -j {action}" + + success = self.run_command(cmd) + if success: + self.log_event(f"Rule removed: {cmd}") + return success + + def block_ip(self, ip_address): + """Блокировка IP-адреса""" + # Блокировка входящего и исходящего трафика + success1 = self.add_rule(source_ip=ip_address, action="DROP") + success2 = self.add_rule(dest_ip=ip_address, action="DROP") + + if success1 and success2: + self.log_event(f"IP blocked: {ip_address}") + return True + return False + + def unblock_ip(self, ip_address): + """Разблокировка IP-адреса""" + # Удаление правил блокировки + success1 = self.remove_rule(source_ip=ip_address, action="DROP") + success2 = self.remove_rule(dest_ip=ip_address, action="DROP") + + if success1 and success2: + self.log_event(f"IP unblocked: {ip_address}") + return True + return False + + def allow_port(self, port, protocol="tcp"): + """Разрешение порта""" + return self.add_rule(protocol=protocol, dest_port=port, action="ACCEPT") + + def block_port(self, port, protocol="tcp"): + """Блокировка порта""" + return self.add_rule(protocol=protocol, dest_port=port, action="DROP") + + def resolve_domain(self, domain): + """Разрешение домена в IP-адреса""" + domain = domain.lower().strip() + + # Проверка кэша + if domain in self.domain_to_ip_cache: + cache_time, ips = self.domain_to_ip_cache[domain] + if time.time() - cache_time < self.dns_cache_ttl: + return ips + + try: + answers = self.dns_resolver.resolve(domain, 'A') + ips = {str(answer) for answer in answers} + + # Обновление кэша + self.domain_to_ip_cache[domain] = (time.time(), ips) + + self.log_event(f"DNS resolved: {domain} -> {', '.join(ips)}") + return ips + + except Exception as e: + self.log_event(f"DNS resolution failed for {domain}: {str(e)}") + return set() + + def block_domain(self, domain): + """Блокировка домена""" + domain = domain.lower().strip() + + if domain in self.blocked_domains: + self.log_event(f"Domain already blocked: {domain}") + return True + + # Разрешаем домен в IP-адреса + ips = self.resolve_domain(domain) + + if not ips: + self.log_event(f"Warning: No IP addresses found for domain {domain}") + return False + + # Блокируем все IP-адреса домена + success_count = 0 + for ip in ips: + if self.block_ip(ip): + success_count += 1 + + if success_count > 0: + self.blocked_domains.add(domain) + self.log_event(f"Domain blocked: {domain} ({success_count} IPs)") + return True + + return False + + def unblock_domain(self, domain): + """Разблокировка домена""" + domain = domain.lower().strip() + + if domain not in self.blocked_domains: + self.log_event(f"Domain not blocked: {domain}") + return True + + # Получаем IP-адреса из кэша или разрешаем заново + ips = set() + if domain in self.domain_to_ip_cache: + cache_time, cached_ips = self.domain_to_ip_cache[domain] + ips = cached_ips + else: + ips = self.resolve_domain(domain) + + # Разблокируем все IP-адреса домена + success_count = 0 + for ip in ips: + if self.unblock_ip(ip): + success_count += 1 + + # Удаляем домен из списка блокировок + self.blocked_domains.discard(domain) + + self.log_event(f"Domain unblocked: {domain} ({success_count} IPs)") + return True + + def start_domain_monitoring(self, interval=60): + """Запуск мониторинга доменов для обновления IP-адресов""" + + def monitor_loop(): + self.log_event("Domain monitoring started") + while self.is_monitoring: + try: + self.update_domain_ips() + time.sleep(interval) + except Exception as e: + self.log_event(f"Error in domain monitoring: {e}") + time.sleep(interval) + + self.is_monitoring = True + self.monitor_thread = threading.Thread(target=monitor_loop) + self.monitor_thread.daemon = True + self.monitor_thread.start() + + def stop_domain_monitoring(self): + """Остановка мониторинга доменов""" + self.is_monitoring = False + if self.monitor_thread: + self.monitor_thread.join(timeout=5) + self.log_event("Domain monitoring stopped") + + def update_domain_ips(self): + """Обновление IP-адресов для заблокированных доменов""" + if not self.blocked_domains: + return + + self.log_event("Updating domain IP addresses...") + + for domain in list(self.blocked_domains): + old_ips = set() + if domain in self.domain_to_ip_cache: + cache_time, cached_ips = self.domain_to_ip_cache[domain] + old_ips = cached_ips + + # Разрешаем домен заново + new_ips = self.resolve_domain(domain) + + if not new_ips: + continue + + # Находим новые IP-адреса для блокировки + ips_to_block = new_ips - old_ips + # Находим старые IP-адреса для разблокировки + ips_to_unblock = old_ips - new_ips + + # Блокируем новые IP-адреса + for ip in ips_to_block: + self.block_ip(ip) + + # Разблокируем старые IP-адреса + for ip in ips_to_unblock: + self.unblock_ip(ip) + + if ips_to_block or ips_to_unblock: + self.log_event(f"Domain {domain} updated: +{len(ips_to_block)} -{len(ips_to_unblock)} IPs") + + def show_status(self): + """Показать статус файрволла""" + print("\n=== Firewall Status ===") + print(f"Blocked domains: {len(self.blocked_domains)}") + print(f"Cached domains: {len(self.domain_to_ip_cache)}") + + # Показать правила iptables + print("\nCurrent rules:") + os.system(f"iptables -L {self.chain_name} -n") + + if self.blocked_domains: + print("\nBlocked domains:") + for domain in sorted(self.blocked_domains): + if domain in self.domain_to_ip_cache: + cache_time, ips = self.domain_to_ip_cache[domain] + ips_str = ', '.join(ips) + print(f" {domain} -> {ips_str}") + else: + print(f" {domain} -> [not resolved]") + + def cleanup(self): + """Очистка правил""" + # Удаляем ссылки на нашу цепочку + self.run_command(f"iptables -D INPUT -j {self.chain_name} 2>/dev/null || true") + self.run_command(f"iptables -D OUTPUT -j {self.chain_name} 2>/dev/null || true") + self.run_command(f"iptables -D FORWARD -j {self.chain_name} 2>/dev/null || true") + + # Очищаем цепочку + self.run_command(f"iptables -F {self.chain_name} 2>/dev/null || true") + self.run_command(f"iptables -X {self.chain_name} 2>/dev/null || true") + + self.stop_domain_monitoring() + self.log_event("Firewall cleaned up") + + +def load_domains_from_file(filename): + """Загрузка доменов из файла""" + domains = [] + try: + with open(filename, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + domains.append(line) + except FileNotFoundError: + print(f"File {filename} not found") + return domains + + +def interactive_menu(): + """Интерактивное меню управления файрволлом""" + fw = IptablesFirewall() + + try: + fw.initialize() + + while True: + print("\n=== iptables Firewall with Domain Blocking ===") + print("1. Block domain") + print("2. Unblock domain") + print("3. Block IP") + print("4. Block port") + print("5. Allow port") + print("6. Show status") + print("7. Start domain monitoring") + print("8. Stop domain monitoring") + print("9. Load domains from file") + print("10. Exit") + + choice = input("Select option: ").strip() + + if choice == '1': + domain = input("Enter domain to block: ").strip() + if domain: + fw.block_domain(domain) + + elif choice == '2': + domain = input("Enter domain to unblock: ").strip() + if domain: + fw.unblock_domain(domain) + + elif choice == '3': + ip = input("Enter IP to block: ").strip() + if ip: + fw.block_ip(ip) + + elif choice == '4': + port = input("Enter port to block: ").strip() + if port.isdigit(): + fw.block_port(int(port)) + + elif choice == '5': + port = input("Enter port to allow: ").strip() + if port.isdigit(): + fw.allow_port(int(port)) + + elif choice == '6': + fw.show_status() + + elif choice == '7': + fw.start_domain_monitoring() + print("Domain monitoring started") + + elif choice == '8': + fw.stop_domain_monitoring() + print("Domain monitoring stopped") + + elif choice == '9': + filename = input("Enter filename: ").strip() + domains = load_domains_from_file(filename) + for domain in domains: + fw.block_domain(domain) + print(f"Loaded {len(domains)} domains from {filename}") + + elif choice == '10': + break + + else: + print("Invalid option") + + finally: + fw.cleanup() + + +# Пример использования +if __name__ == "__main__": + if os.geteuid() != 0: + print("Requires root privileges") + sys.exit(1) + + # Проверка наличия dnspython + try: + import dns.resolver + except ImportError: + print("Error: dnspython library required. Install with: pip install dnspython") + sys.exit(1) + + if len(sys.argv) > 1 and sys.argv[1] == "--interactive": + interactive_menu() + else: + # Автоматическая настройка с примером блокировки доменов + fw = IptablesFirewall() + + try: + fw.initialize() + + # Базовые правила + fw.block_port(23) # Блокировка Telnet + fw.allow_port(22) # Разрешение SSH + fw.block_port(80) # Разрешение HTTP + fw.block_port(443) # Разрешение HTTPS + + # Блокировка доменов (пример) + fw.block_domain("example.com") + fw.block_domain("test.org") + + # Запуск мониторинга доменов + fw.start_domain_monitoring() + + print("Firewall rules applied with domain blocking") + print("Domain monitoring is running...") + print("Press Ctrl+C to stop") + + # Бесконечный цикл для поддержания работы + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nShutting down...") + + finally: + fw.cleanup() + print("Firewall rules cleaned up") \ No newline at end of file diff --git a/FAIRWALL/test2_win.py b/FAIRWALL/test2_win.py new file mode 100644 index 0000000..2a61dd0 --- /dev/null +++ b/FAIRWALL/test2_win.py @@ -0,0 +1,589 @@ +#!/usr/bin/env python3 +""" +Файрволл на Python для Windows с использованием Windows Firewall и блокировкой по доменам +""" + +import subprocess +import os +import sys +import threading +import time +from datetime import datetime +import dns.resolver +import re + + +class WindowsFirewall: + def __init__(self): + self.blocked_domains = set() + self.blocked_ips = set() + self.blocked_ports = set() + self.domain_to_ip_cache = {} + self.dns_cache_ttl = 300 # 5 минут TTL для кэша + self.is_monitoring = False + self.monitor_thread = None + self.log_file = "windows_firewall.log" + self.rule_prefix = "PyFirewall_" + + # Настройка DNS резолвера + self.dns_resolver = dns.resolver.Resolver() + self.dns_resolver.timeout = 2 + self.dns_resolver.lifetime = 2 + + def run_powershell(self, cmd): + """Выполнение PowerShell команды""" + try: + result = subprocess.run( + ["powershell", "-Command", cmd], + check=True, + capture_output=True, + text=True, + shell=True + ) + return True, result.stdout + except subprocess.CalledProcessError as e: + error_msg = f"PowerShell error: {e.stderr}" + print(error_msg) + self.log_event(error_msg) + return False, e.stderr + + def run_netsh(self, cmd): + """Выполнение netsh команды""" + try: + result = subprocess.run( + f"netsh {cmd}", + check=True, + capture_output=True, + text=True, + shell=True + ) + return True, result.stdout + except subprocess.CalledProcessError as e: + error_msg = f"netsh error: {e.stderr}" + print(error_msg) + self.log_event(error_msg) + return False, e.stderr + + def log_event(self, message): + """Логирование событий""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_message = f"[{timestamp}] {message}" + + print(log_message) + + with open(self.log_file, 'a', encoding='utf-8') as f: + f.write(log_message + "\n") + + def initialize(self): + """Инициализация файрволла""" + self.log_event("Windows Firewall manager initialized") + return True + + def create_firewall_rule(self, name, direction="out", action="block", + protocol="any", port=None, remote_ip=None): + """Создание правила в Windows Firewall""" + + # Формируем базовую команду + cmd_parts = [ + "advfirewall firewall add rule", + f'name="{self.rule_prefix}{name}"', + f"dir={direction}", + f"action={action}", + f"protocol={protocol}", + "enable=yes" + ] + + if port: + cmd_parts.append(f"remoteport={port}") + + if remote_ip: + cmd_parts.append(f"remoteip={remote_ip}") + + cmd = " ".join(cmd_parts) + success, output = self.run_netsh(cmd) + + if success: + self.log_event(f"Firewall rule created: {name}") + return success + + def delete_firewall_rule(self, name): + """Удаление правила из Windows Firewall""" + cmd = f'advfirewall firewall delete rule name="{self.rule_prefix}{name}"' + success, output = self.run_netsh(cmd) + + if success: + self.log_event(f"Firewall rule deleted: {name}") + return success + + def block_ip(self, ip_address): + """Блокировка IP-адреса""" + if ip_address in self.blocked_ips: + return True + + # Блокируем исходящий трафик к IP + rule_name = f"block_ip_{ip_address}" + success = self.create_firewall_rule( + name=rule_name, + direction="out", + action="block", + remote_ip=ip_address + ) + + if success: + self.blocked_ips.add(ip_address) + self.log_event(f"IP blocked: {ip_address}") + return True + return False + + def unblock_ip(self, ip_address): + """Разблокировка IP-адреса""" + if ip_address not in self.blocked_ips: + return True + + rule_name = f"block_ip_{ip_address}" + success = self.delete_firewall_rule(rule_name) + + if success: + self.blocked_ips.discard(ip_address) + self.log_event(f"IP unblocked: {ip_address}") + return True + return False + + def allow_port(self, port, protocol="tcp"): + """Разрешение порта""" + rule_name = f"allow_{protocol}_{port}" + + # Удаляем блокирующее правило если существует + block_rule_name = f"block_{protocol}_{port}" + self.delete_firewall_rule(block_rule_name) + + success = self.create_firewall_rule( + name=rule_name, + direction="out", + action="allow", + protocol=protocol, + port=str(port) + ) + + if success: + self.blocked_ports.discard((protocol, port)) + self.log_event(f"Port allowed: {protocol}/{port}") + return True + return False + + def block_port(self, port, protocol="tcp"): + """Блокировка порта""" + rule_name = f"block_{protocol}_{port}" + + # Удаляем разрешающее правило если существует + allow_rule_name = f"allow_{protocol}_{port}" + self.delete_firewall_rule(allow_rule_name) + + success = self.create_firewall_rule( + name=rule_name, + direction="out", + action="block", + protocol=protocol, + port=str(port) + ) + + if success: + self.blocked_ports.add((protocol, port)) + self.log_event(f"Port blocked: {protocol}/{port}") + return True + return False + + def resolve_domain(self, domain): + """Разрешение домена в IP-адреса""" + domain = domain.lower().strip() + + # Проверка кэша + if domain in self.domain_to_ip_cache: + cache_time, ips = self.domain_to_ip_cache[domain] + if time.time() - cache_time < self.dns_cache_ttl: + return ips + + try: + answers = self.dns_resolver.resolve(domain, 'A') + ips = {str(answer) for answer in answers} + + # Обновление кэша + self.domain_to_ip_cache[domain] = (time.time(), ips) + + self.log_event(f"DNS resolved: {domain} -> {', '.join(ips)}") + return ips + + except Exception as e: + self.log_event(f"DNS resolution failed for {domain}: {str(e)}") + return set() + + def block_domain(self, domain): + """Блокировка домена""" + domain = domain.lower().strip() + + if domain in self.blocked_domains: + self.log_event(f"Domain already blocked: {domain}") + return True + + # Разрешаем домен в IP-адреса + ips = self.resolve_domain(domain) + + if not ips: + self.log_event(f"Warning: No IP addresses found for domain {domain}") + return False + + # Блокируем все IP-адреса домена + success_count = 0 + for ip in ips: + if self.block_ip(ip): + success_count += 1 + + if success_count > 0: + self.blocked_domains.add(domain) + self.log_event(f"Domain blocked: {domain} ({success_count} IPs)") + return True + + return False + + def unblock_domain(self, domain): + """Разблокировка домена""" + domain = domain.lower().strip() + + if domain not in self.blocked_domains: + self.log_event(f"Domain not blocked: {domain}") + return True + + # Получаем IP-адреса из кэша или разрешаем заново + ips = set() + if domain in self.domain_to_ip_cache: + cache_time, cached_ips = self.domain_to_ip_cache[domain] + ips = cached_ips + else: + ips = self.resolve_domain(domain) + + # Разблокируем все IP-адреса домена + success_count = 0 + for ip in ips: + if self.unblock_ip(ip): + success_count += 1 + + # Удаляем домен из списка блокировок + self.blocked_domains.discard(domain) + + self.log_event(f"Domain unblocked: {domain} ({success_count} IPs)") + return True + + def start_domain_monitoring(self, interval=60): + """Запуск мониторинга доменов для обновления IP-адресов""" + + def monitor_loop(): + self.log_event("Domain monitoring started") + while self.is_monitoring: + try: + self.update_domain_ips() + time.sleep(interval) + except Exception as e: + self.log_event(f"Error in domain monitoring: {e}") + time.sleep(interval) + + self.is_monitoring = True + self.monitor_thread = threading.Thread(target=monitor_loop) + self.monitor_thread.daemon = True + self.monitor_thread.start() + + def stop_domain_monitoring(self): + """Остановка мониторинга доменов""" + self.is_monitoring = False + if self.monitor_thread: + self.monitor_thread.join(timeout=5) + self.log_event("Domain monitoring stopped") + + def update_domain_ips(self): + """Обновление IP-адресов для заблокированных доменов""" + if not self.blocked_domains: + return + + self.log_event("Updating domain IP addresses...") + + for domain in list(self.blocked_domains): + old_ips = set() + if domain in self.domain_to_ip_cache: + cache_time, cached_ips = self.domain_to_ip_cache[domain] + old_ips = cached_ips + + # Разрешаем домен заново + new_ips = self.resolve_domain(domain) + + if not new_ips: + continue + + # Находим новые IP-адреса для блокировки + ips_to_block = new_ips - old_ips + # Находим старые IP-адреса для разблокировки + ips_to_unblock = old_ips - new_ips + + # Блокируем новые IP-адреса + for ip in ips_to_block: + self.block_ip(ip) + + # Разблокируем старые IP-адреса + for ip in ips_to_unblock: + self.unblock_ip(ip) + + if ips_to_block or ips_to_unblock: + self.log_event(f"Domain {domain} updated: +{len(ips_to_block)} -{len(ips_to_unblock)} IPs") + + def get_existing_rules(self): + """Получение списка существующих правил""" + cmd = 'advfirewall firewall show rule name=all' + success, output = self.run_netsh(cmd) + + rules = [] + if success: + # Парсим вывод netsh + lines = output.split('\n') + current_rule = {} + for line in lines: + line = line.strip() + if line.startswith('Rule Name:'): + if current_rule: + rules.append(current_rule) + current_rule = {'name': line.split(':', 1)[1].strip()} + elif ':' in line: + key, value = line.split(':', 1) + current_rule[key.strip()] = value.strip() + if current_rule: + rules.append(current_rule) + + return [rule for rule in rules if rule.get('name', '').startswith(self.rule_prefix)] + + def show_status(self): + """Показать статус файрволла""" + print("\n=== Windows Firewall Status ===") + print(f"Blocked domains: {len(self.blocked_domains)}") + print(f"Blocked IPs: {len(self.blocked_ips)}") + print(f"Blocked ports: {len(self.blocked_ports)}") + print(f"Cached domains: {len(self.domain_to_ip_cache)}") + print(f"Domain monitoring: {'Running' if self.is_monitoring else 'Stopped'}") + + # Показать правила брандмауэра + rules = self.get_existing_rules() + print(f"\nFirewall rules: {len(rules)}") + for rule in rules: + print(f" {rule.get('name', 'Unknown')}: {rule.get('Direction', '')} {rule.get('Action', '')}") + + if self.blocked_domains: + print("\nBlocked domains:") + for domain in sorted(self.blocked_domains): + if domain in self.domain_to_ip_cache: + cache_time, ips = self.domain_to_ip_cache[domain] + ips_str = ', '.join(ips) + print(f" {domain} -> {ips_str}") + else: + print(f" {domain} -> [not resolved]") + + def cleanup(self): + """Очистка правил""" + self.log_event("Cleaning up firewall rules...") + + # Удаляем все правила, созданные нашим файрволлом + rules = self.get_existing_rules() + for rule in rules: + rule_name = rule.get('name', '') + if rule_name.startswith(self.rule_prefix): + # Извлекаем оригинальное имя правила + original_name = rule_name[len(self.rule_prefix):] + self.delete_firewall_rule(original_name) + + self.stop_domain_monitoring() + self.blocked_ips.clear() + self.blocked_domains.clear() + self.blocked_ports.clear() + self.log_event("Firewall cleaned up") + + def enable_windows_firewall(self): + """Включение Windows Firewall если выключен""" + cmds = [ + "advfirewall set allprofiles state on", + "advfirewall set allprofiles firewallpolicy blockinbound,allowoutbound" + ] + + for cmd in cmds: + success, output = self.run_netsh(cmd) + if not success: + self.log_event("Warning: Could not enable Windows Firewall") + return False + return True + + +def load_domains_from_file(filename): + """Загрузка доменов из файла""" + domains = [] + try: + with open(filename, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + domains.append(line) + except FileNotFoundError: + print(f"File {filename} not found") + return domains + + +def interactive_menu(): + """Интерактивное меню управления файрволлом""" + fw = WindowsFirewall() + + try: + if not fw.initialize(): + print("Failed to initialize firewall") + return + + # Включаем брандмауэр Windows + fw.enable_windows_firewall() + + while True: + print("\n=== Windows Firewall with Domain Blocking ===") + print("1. Block domain") + print("2. Unblock domain") + print("3. Block IP") + print("4. Block port") + print("5. Allow port") + print("6. Show status") + print("7. Start domain monitoring") + print("8. Stop domain monitoring") + print("9. Load domains from file") + print("10. Enable Windows Firewall") + print("11. Exit") + + choice = input("Select option: ").strip() + + if choice == '1': + domain = input("Enter domain to block: ").strip() + if domain: + fw.block_domain(domain) + + elif choice == '2': + domain = input("Enter domain to unblock: ").strip() + if domain: + fw.unblock_domain(domain) + + elif choice == '3': + ip = input("Enter IP to block: ").strip() + if ip: + fw.block_ip(ip) + + elif choice == '4': + port = input("Enter port to block: ").strip() + protocol = input("Protocol (tcp/udp, default tcp): ").strip() or "tcp" + if port.isdigit(): + fw.block_port(int(port), protocol) + + elif choice == '5': + port = input("Enter port to allow: ").strip() + protocol = input("Protocol (tcp/udp, default tcp): ").strip() or "tcp" + if port.isdigit(): + fw.allow_port(int(port), protocol) + + elif choice == '6': + fw.show_status() + + elif choice == '7': + fw.start_domain_monitoring() + print("Domain monitoring started") + + elif choice == '8': + fw.stop_domain_monitoring() + print("Domain monitoring stopped") + + elif choice == '9': + filename = input("Enter filename: ").strip() + domains = load_domains_from_file(filename) + for domain in domains: + fw.block_domain(domain) + print(f"Loaded {len(domains)} domains from {filename}") + + elif choice == '10': + fw.enable_windows_firewall() + print("Windows Firewall enabled") + + elif choice == '11': + break + + else: + print("Invalid option") + + finally: + fw.cleanup() + + +def check_admin_privileges(): + """Проверка прав администратора на Windows""" + try: + # Попытка выполнить команду, требующую админские права + result = subprocess.run( + ["netsh", "advfirewall", "show", "allprofiles"], + capture_output=True, + text=True, + shell=True + ) + return result.returncode == 0 + except: + return False + + +# Пример использования +if __name__ == "__main__": + if not check_admin_privileges(): + print("Requires administrator privileges") + print("Please run as Administrator") + sys.exit(1) + + # Проверка наличия dnspython + try: + import dns.resolver + except ImportError: + print("Error: dnspython library required. Install with: pip install dnspython") + sys.exit(1) + + if len(sys.argv) > 1 and sys.argv[1] == "--interactive": + interactive_menu() + else: + # Автоматическая настройка с примером блокировки доменов + fw = WindowsFirewall() + + try: + if not fw.initialize(): + print("Failed to initialize firewall") + sys.exit(1) + + # Включаем брандмауэр Windows + fw.enable_windows_firewall() + + # Базовые правила + fw.block_port(23) # Блокировка Telnet + fw.allow_port(22) # Разрешение SSH + fw.allow_port(80) # Разрешение HTTP + fw.allow_port(443) # Разрешение HTTPS + + # Блокировка доменов (пример) + fw.block_domain("example.com") + fw.block_domain("test.org") + + # Запуск мониторинга доменов + fw.start_domain_monitoring() + + print("Windows Firewall rules applied with domain blocking") + print("Domain monitoring is running...") + print("Press Ctrl+C to stop") + + # Бесконечный цикл для поддержания работы + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nShutting down...") + + finally: + fw.cleanup() + print("Firewall rules cleaned up") \ No newline at end of file