自学内容网 自学内容网

信息收集ip测活-Python脚本编写

编写一个python脚本,下面为实现的功能点:
    文件读取与IP提取:读取指定文件夹下的所有txt文件,提取其中的IP地址。
    端口号添加:为每个IP地址添加多个指定的端口号。
    测活功能:使用socket库进行连接测试,判断IP:端口号是否存活。
    多个验证URL,包含国内和国外常见网站,用于更严格地验证代理IP是否可用,
    结果保存:将存活的IP:端口号组合保存到新的txt文件中。
    测活过程展示:在测活过程中,实时展示每一条测活的结果。
    强制终止:设置一个信号处理机制,允许用户强制终止脚本。
    多线程/多进程:使用多线程或多进程技术提高并发处理能力。
    结果保存:将存活的IP:端口号组合保存到新的txt文件中。
 

文件读取与IP提取
首先,我们需要读取指定文件夹下的所有txt文件,并从中提取IP地址。可以使用Python的os模块来遍历文件夹,re模块来提取IP地址。

import os 
import re 
 
def extract_ips_from_files(folder_path): 
    ip_pattern = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')  
    ips = [] 
    for filename in os.listdir(folder_path):  
        if filename.endswith('.txt'):  
            with open(os.path.join(folder_path,  filename), 'r') as file: 
                content = file.read()  
                ips.extend(ip_pattern.findall(content))  
    return ips 

端口号添加
为每个IP地址添加多个指定的端口号。

def add_ports_to_ips(ips, ports): 
    ip_port_pairs = [(ip, port) for ip in ips for port in ports] 
    return ip_port_pairs 

测活功能
使用socket库进行连接测试,判断IP:端口号是否存活。

import socket 
 
def is_ip_port_alive(ip, port): 
    try: 
        with socket.socket(socket.AF_INET,  socket.SOCK_STREAM) as s: 
            s.settimeout(2)  
            s.connect((ip,  port)) 
        return True 
    except (socket.timeout,  ConnectionRefusedError): 
        return False 

多个验证URL
包含国内和国外常见网站,用于更严格地验证代理IP是否可用。

import requests 
 
def validate_proxy_with_url(ip, port, url): 
    proxies = { 
        'http': f'http://{ip}:{port}', 
        'https': f'http://{ip}:{port}' 
    } 
    try: 
        response = requests.get(url,  proxies=proxies, timeout=5) 
        return response.status_code  == 200 
    except requests.RequestException: 
        return False 

结果保存
将存活的IP:端口号组合保存到新的txt文件中。

python
复制
def save_results(results, output_file): 
    with open(output_file, 'w') as file: 
        for ip, port in results: 
            file.write(f'{ip}:{port}\n')  

测活过程展示
在测活过程中,实时展示每一条测活的结果。

def display_live_results(ip, port, is_alive): 
    status = 'Alive' if is_alive else 'Dead' 
    print(f'Testing {ip}:{port} - {status}') 

强制终止
设置一个信号处理机制,允许用户强制终止脚本。

import signal 
import sys 
 
def signal_handler(sig, frame): 
    print('Script terminated by user.') 
    sys.exit(0)  
 
signal.signal(signal.SIGINT,  signal_handler) 

多线程/多进程
使用多线程或多进程技术提高并发处理能力。
from concurrent.futures  import ThreadPoolExecutor 
 
def test_ip_port_pairs(ip_port_pairs, urls): 
    alive_pairs = [] 
    with ThreadPoolExecutor(max_workers=10) as executor: 
        futures = [] 
        for ip, port in ip_port_pairs: 
            futures.append(executor.submit(is_ip_port_alive,  ip, port)) 
         
        for (ip, port), future in zip(ip_port_pairs, futures): 
            is_alive = future.result()  
            display_live_results(ip, port, is_alive) 
            if is_alive: 
                for url in urls: 
                    if validate_proxy_with_url(ip, port, url): 
                        alive_pairs.append((ip,  port)) 
                        break 
    return alive_pairs 

主函数
将以上功能整合到主函数中。

def main(): 
    folder_path = 'path/to/your/folder' 
    output_file = 'alive_proxies.txt'  
    ports = [80, 443, 8080] 
    urls = ['http://www.google.com',  'http://www.baidu.com']  
 
    ips = extract_ips_from_files(folder_path) 
    ip_port_pairs = add_ports_to_ips(ips, ports) 
    alive_pairs = test_ip_port_pairs(ip_port_pairs, urls) 
    save_results(alive_pairs, output_file) 
 
if __name__ == '__main__': 
    main() 


原文地址:https://blog.csdn.net/m0_62828084/article/details/143955796

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!