1. 程式人生 > 其它 >判斷代理ip是否可用

判斷代理ip是否可用

from concurrent.futures import ThreadPoolExecutor, as_completed
import socket

# 判斷ip_port是否能夠連線上
def try_connection(ip_port):
s = socket.socket()
# 等待最大時間設定為5秒(可改小)
s.settimeout(5)
# 嘗試連線目標ip_port
try:
s.connect(ip_port)
# 捕獲連線超時錯誤並生成超時info
except socket.timeout as e:
info = ip_port[0] + ':' + str(ip_port[1]) + ' 超時'
# 捕獲連線錯誤並生成錯誤info
except socket.error as e:
info = ip_port[0] + ':' + str(ip_port[1]) + ' ' + str(e)
# 確認ip_port可用,生成可用info
else:
info = ip_port[0] + ':' + str(ip_port[1]) + ' 可用'
# 最後關閉socket物件並返回info
finally:
s.close()
return info

# 生成多執行緒池物件,最大執行緒數為10(可調大)
executor = ThreadPoolExecutor(max_workers=10)
# 讀取檔案中的ip_port(每行一個),並開啟資訊檔案備用
with open('server.txt','r') as fin, open('connection.txt','w') as fout:
servers = []
for addr in fin:
ip, port = addr.split(':')
port = int(port)
# 向servers列表新增讀取出的ip_port
servers.append((ip, port))
# 進行多執行緒訪問並驗證
for result in executor.map(try_connection, servers):
# 結果並寫入檔案
print(result)
fout.write(result + '\n')