pyqt5練手專案-抖音小姐姐短視訊下載
阿新 • • 發佈:2018-12-16
pyqt5=Python+ qt,這塊的資料現在慢慢多起來了,這裡給大家送一個小的demo用來練手。
裡面技術點:
1)控制元件 Pushbutton的使用;
2)pyqt5執行緒的用法;
3)介面和邏輯分離;
4)pyqt5 控制檯日誌重定向到editText控制元件中;這塊比較有意思,大家可以學學;
介面如下:
另外打個廣告:這段時間閒著沒事,自己弄了個淘客網站。領取天貓淘寶優惠券的,和一般的找券網不同,採集下來的天貓淘寶產品都是經過我的大資料模型篩選的,希望大家幫忙測試下,網頁右上角有安卓app下載。
接下來是正題:
一共四個檔案,大家可以分別拷貝到自己工程裡面除錯一下,可以直接執行:
work.py
# -*- coding: utf-8 -*- from PyQt5.QtGui import * from PyQt5.QtCore import * from PyQt5.QtWidgets import * import sys from ui_main import Ui_MainWindow import threading import time import os import inspect import ctypes import re from tools import * from douyin import * gMaxThreadNum = 2 # 最大執行緒數目 gWorkingThreadNum = 0 gThreadsList = [] #存放執行緒ID gThreadLock = threading.Lock() #獲取發帖內容時,需要加鎖 gTotalLink = [] gThreadCountLock = threading.Lock() #@函式介紹:啟動多個執行緒進行發帖 #@返回值: True/False #@data: 20180803 def mainTask(): global gMaxThreadNum #最大執行緒個數 global gWorkingThreadNum #正在執行的工作執行緒的個數 global gThreadCountLock #全域性鎖 global gTotalLink gWorkingThreadNum = 0 ipublishIndex = 1 while True: if gWorkingThreadNum < gMaxThreadNum: if ipublishIndex > len(gTotalLink): printStr('所有任務完成,等待各個子執行緒完成後,退出........') break; ipublishIndex = ipublishIndex +1 #啟動一個執行緒 t = threading.Thread(target=workThread) t.start() gThreadsList.append(t) gThreadCountLock.acquire() gWorkingThreadNum = gWorkingThreadNum +1 gThreadCountLock.release() time.sleep(0.1) return True #@函式介紹: 退出執行緒 #@返回值: true/false, 成功/失敗 #@data: 20180806 def stop_thread(thread): tid = thread.ident exctype = SystemExit ret = True """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: ret = False #raise ValueError("invalid thread id") strs = '%s子執行緒已經退出'%tid printStr(strs) elif res != 1: # """if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect""" ret = False ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") else: ret = True strs = '%s 執行緒被Kill'%tid printStr(strs) return ret def workThread(): global gTotalLink global gThreadLock global gWorkingThreadNum gThreadCountLock.acquire() video_url = gTotalLink.pop() gThreadCountLock.release() printStr('video_url:%s'%video_url) #video_url = 'http://v.douyin.com/dU2Dsn/' handle = douyin() downloadUrl , name = handle.downloadUrlGet(video_url) if True == os.path.exists(name+'.mp4'): name = name + str(randomInt(2))+'.mp4' handle.video_downloader(downloadUrl, name+'.mp4') gThreadCountLock.acquire() gWorkingThreadNum = gWorkingThreadNum +1 gThreadCountLock.release() class EmittingStream(QObject): textWritten = pyqtSignal(str) #定義一個傳送str的訊號 def write(self, text): self.textWritten.emit(str(text)) #@類介紹:主類 #@返回值: #@data: 20180801 class mainForm(QMainWindow, Ui_MainWindow): def __init__(self): super(mainForm, self).__init__() self.setupUi(self) #self.setWindowFlags(Qt.WindowMinimizeButtonHint) #禁止調整視窗大小 self.setFixedWidth(848) self.setFixedHeight(588) #訊號槽,點選開始會去連線startTask self.pushButton_startRun.clicked.connect(self.startTask) #訊號槽,暫停 self.pushButton_stopRun.clicked.connect(self.stopTask) self.spinBox_maxThreadNum.valueChanged['int'].connect(self.threadNumSet) #編輯內容 #self.pushButton_editFileContent.clicked.connect(self.editContent) #下面將輸出重定向到textEdit中 sys.stdout = EmittingStream(textWritten=self.outputWritten) sys.stderr = EmittingStream(textWritten=self.outputWritten) self.initPara() #編輯內容 def editContent(self): dig = QFileDialog() dig.getOpenFileName(self, 'Open file', '', "") def initPara(self): global gTotalLink file = open('需要下載的的連結.txt', 'a+') file.close() #讀取內容 file = open('需要下載的的連結.txt', 'r') strContent = file.read() file.close() resultList = re.findall('(?<=###).+?(?=###)', strContent, re.S) for item in resultList: if '\n' == item: pass else: p = item.find('http') item = item[p: item.find(' 復', p)] if item == '': continue gTotalLink.append(item) printStr('gTotalLink:%s'%gTotalLink) #@data: 20180801 def startTask(self): global gThreadsList if len(gThreadsList) != 0: for t in gThreadsList: stop_thread(t) #執行主任務 printStr('開啟工作任務...') t = threading.Thread(target=mainTask) t.start() gThreadsList.append(t) #@函式介紹: 主函式入口 #@返回值: 無 #@data: 20180801 def stopTask(self): global gThreadsList for t in gThreadsList: stop_thread(t) #清空gThreadsList gThreadsList = [] def threadNumSet(self): global gMaxThreadNum gMaxThreadNum = self.spinBox_maxThreadNum.value() printStr('設定引數->執行緒個數:%s'%gMaxThreadNum) #@函式介紹:接收訊號str的訊號槽 #@返回值: 無 #@data: 20180803 def outputWritten(self, text): self.textEditLog.setReadOnly(True) cursor = self.textEditLog.textCursor() cursor.movePosition(QTextCursor.End) cursor.insertText(text) self.textEditLog.setTextCursor(cursor) self.textEditLog.ensureCursorVisible() if __name__ =="__main__": app = QApplication(sys.argv) mainHandle = mainForm() mainHandle.show() sys.exit(app.exec())
ui_main.py:
# -*- coding: utf-8 -*- # Form implementation generated from reading ui file 'main.ui' # # Created by: PyQt5 UI code generator 5.9 # # WARNING! All changes made in this file will be lost! from PyQt5 import QtCore, QtGui, QtWidgets class Ui_MainWindow(object): def setupUi(self, MainWindow): MainWindow.setObjectName("MainWindow") MainWindow.resize(815, 588) self.centralwidget = QtWidgets.QWidget(MainWindow) self.centralwidget.setObjectName("centralwidget") self.tabWidget_fullInterface = QtWidgets.QTabWidget(self.centralwidget) self.tabWidget_fullInterface.setEnabled(True) self.tabWidget_fullInterface.setGeometry(QtCore.QRect(20, 0, 841, 561)) self.tabWidget_fullInterface.setStyleSheet("font: 12pt \"Arial\";") self.tabWidget_fullInterface.setObjectName("tabWidget_fullInterface") self.tab_startInterface = QtWidgets.QWidget() self.tab_startInterface.setObjectName("tab_startInterface") self.pushButton_startRun = QtWidgets.QPushButton(self.tab_startInterface) self.pushButton_startRun.setGeometry(QtCore.QRect(90, 430, 131, 51)) self.pushButton_startRun.setStyleSheet("font: 87 14pt \"Arial\";color: white;background-color: rgb(218,181,150)") self.pushButton_startRun.setObjectName("pushButton_startRun") self.label_37 = QtWidgets.QLabel(self.tab_startInterface) self.label_37.setGeometry(QtCore.QRect(40, 395, 81, 21)) self.label_37.setStyleSheet("color:rgb(255, 0, 0)") self.label_37.setObjectName("label_37") self.line_5 = QtWidgets.QFrame(self.tab_startInterface) self.line_5.setGeometry(QtCore.QRect(10, 485, 821, 41)) self.line_5.setFrameShape(QtWidgets.QFrame.HLine) self.line_5.setFrameShadow(QtWidgets.QFrame.Sunken) self.line_5.setObjectName("line_5") self.line_7 = QtWidgets.QFrame(self.tab_startInterface) self.line_7.setGeometry(QtCore.QRect(120, 390, 711, 31)) self.line_7.setFrameShape(QtWidgets.QFrame.HLine) self.line_7.setFrameShadow(QtWidgets.QFrame.Sunken) self.line_7.setObjectName("line_7") self.pushButton_stopRun = QtWidgets.QPushButton(self.tab_startInterface) self.pushButton_stopRun.setGeometry(QtCore.QRect(290, 430, 131, 51)) self.pushButton_stopRun.setStyleSheet("font: 87 14pt \"Arial\";color: white;background-color: rgb(218,181,150)") self.pushButton_stopRun.setObjectName("pushButton_stopRun") self.line_6 = QtWidgets.QFrame(self.tab_startInterface) self.line_6.setGeometry(QtCore.QRect(-20, 405, 61, 101)) self.line_6.setFrameShape(QtWidgets.QFrame.VLine) self.line_6.setFrameShadow(QtWidgets.QFrame.Sunken) self.line_6.setObjectName("line_6") self.line_8 = QtWidgets.QFrame(self.tab_startInterface) self.line_8.setGeometry(QtCore.QRect(10, 395, 31, 20)) self.line_8.setFrameShape(QtWidgets.QFrame.HLine) self.line_8.setFrameShadow(QtWidgets.QFrame.Sunken) self.line_8.setObjectName("line_8") self.line_11 = QtWidgets.QFrame(self.tab_startInterface) self.line_11.setGeometry(QtCore.QRect(820, 410, 20, 101)) self.line_11.setFrameShape(QtWidgets.QFrame.VLine) self.line_11.setFrameShadow(QtWidgets.QFrame.Sunken) self.line_11.setObjectName("line_11") self.spinBox_maxThreadNum = QtWidgets.QSpinBox(self.tab_startInterface) self.spinBox_maxThreadNum.setGeometry(QtCore.QRect(670, 440, 61, 22)) self.spinBox_maxThreadNum.setMinimum(1) self.spinBox_maxThreadNum.setMaximum(10) self.spinBox_maxThreadNum.setProperty("value", 3) self.spinBox_maxThreadNum.setObjectName("spinBox_maxThreadNum") self.label_41 = QtWidgets.QLabel(self.tab_startInterface) self.label_41.setGeometry(QtCore.QRect(580, 440, 81, 21)) self.label_41.setObjectName("label_41") self.textEditLog = QtWidgets.QTextEdit(self.tab_startInterface) self.textEditLog.setGeometry(QtCore.QRect(0, 10, 781, 371)) self.textEditLog.setObjectName("textEditLog") self.tabWidget_fullInterface.addTab(self.tab_startInterface, "") MainWindow.setCentralWidget(self.centralwidget) self.menubar = QtWidgets.QMenuBar(MainWindow) self.menubar.setGeometry(QtCore.QRect(0, 0, 815, 26)) self.menubar.setObjectName("menubar") MainWindow.setMenuBar(self.menubar) self.statusbar = QtWidgets.QStatusBar(MainWindow) self.statusbar.setObjectName("statusbar") MainWindow.setStatusBar(self.statusbar) self.retranslateUi(MainWindow) self.tabWidget_fullInterface.setCurrentIndex(0) QtCore.QMetaObject.connectSlotsByName(MainWindow) def retranslateUi(self, MainWindow): _translate = QtCore.QCoreApplication.translate MainWindow.setWindowTitle(_translate("MainWindow", "抖音短視訊下載")) self.pushButton_startRun.setText(_translate("MainWindow", "開始執行")) self.label_37.setText(_translate("MainWindow", "執行控制")) self.pushButton_stopRun.setText(_translate("MainWindow", "停止執行")) self.label_41.setText(_translate("MainWindow", "執行緒個數")) self.tabWidget_fullInterface.setTabText(self.tabWidget_fullInterface.indexOf(self.tab_startInterface), _translate("MainWindow", "啟動介面"))
douyin.py:
# -*- coding:utf-8 -*-
from splinter.driver.webdriver.chrome import Options, Chrome
from splinter.browser import Browser
from contextlib import closing
import requests, json, time, re, os, sys
class douyin():
def __init__(self):
pass
"""
視訊下載
Parameters:
video_url: 帶水印的視訊地址
video_name: 視訊名
Returns:
無
"""
def video_downloader(self, video_url, video_name=r'douyinsss.mp4'):
size = 0
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Maxthon/4.3.2.1000 Chrome/30.0.1599.101 Safari/537.36"}
try:
with closing(requests.get(video_url, headers = headers, stream=True, verify = False)) as response:
chunk_size = 1024
#print(response.text)
content_size = int(response.headers['content-length'])
if response.status_code == 200:
sys.stdout.write(' [檔案大小]:%0.2f MB\n' % (content_size / chunk_size / 1024))
"""
with open(video_name, 'ab') as file:
file.write(response.content)
file.flush()
print('receive data,file size : %d total size:%d' % (os.path.getsize(video_name), content_size))
"""
with open(video_name, "wb") as file:
for data in response.iter_content(chunk_size = chunk_size):
file.write(data)
size += len(data)
file.flush()
#sys.stdout.write(' [下載進度]:%.2f%%' % float(size / content_size * 100) + '\r')
#sys.stdout.flush()
print('視訊下載完了...')
except Exception as e:
print(e)
print('下載出錯啦.....')
"""
視訊下載地址獲取
Parameters:
video_url: 帶水印的視訊地址
Returns:
視訊下載連結,視訊名字
"""
def downloadUrlGet(self, video_url):
name = ''
downloadUrl = ''
headers = {
'Proxy-Connection':'keep-alive',
'Host': 'v.douyin.com',
'Upgrade-Insecure-Requests':'1',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36",
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8'
}
req = requests.get(url = video_url, headers = headers, verify = False)
newUrl = req.url
#print(req.text)
print('newUrl:%s'%newUrl)
print(req.history)
#302重定向後的請求
headers = {
'Proxy-Connection':'keep-alive',
'Host': 'www.iesdouyin.com',
'Upgrade-Insecure-Requests':'1',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36",
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8'
}
req = requests.get(url = newUrl, headers = headers, verify = False)
reply = req.text
#print(reply)
p = reply.find('playAddr: "') + len('playAddr: "')
downloadUrl = reply[p: reply.find('"', p)]
print('downloadUrl:%s'%downloadUrl)
p = reply.find('"name nowrap">') + len('"name nowrap">')
name = reply[p: reply.find('<', p)]
print(name)
return downloadUrl, name
url = 'http://v.douyin.com/dU2Dsn/'
handel = douyin()
downloadUrl, name = handel.downloadUrlGet(url)
handel.video_downloader(url, name)
"""
def do_load_media(url, path):
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Maxthon/4.3.2.1000 Chrome/30.0.1599.101 Safari/537.36"}
pre_content_length = 0
# 迴圈接收視訊資料
while True:
# 若檔案已經存在,則斷點續傳,設定接收來需接收資料的位置
if os.path.exists(path):
headers['Range'] = 'bytes=%d-' % os.path.getsize(path)
res = requests.get(url, stream=True, headers=headers)
content_length = int(res.headers['content-length'])
print(content_length)
# 若當前報文長度小於前次報文長度,或者已接收檔案等於當前報文長度,則可以認為視訊接收完成
if content_length < pre_content_length or (
os.path.exists(path) and os.path.getsize(path) == content_length):
break
pre_content_length = content_length
# 寫入收到的視訊資料
with open(path, 'ab') as file:
file.write(res.content)
file.flush()
print('receive data,file size : %d total size:%d' % (os.path.getsize(path), content_length))
except Exception as e:
print(e)
downloadUrl = 'http://www.gifshow.com/s/mDg7eWq9'
path = r'test.mp4'
do_load_media(download_url, path)
"""
這是我自己寫的一個工具庫,tools.py:
# -*- coding:utf-8 -*-
import requests
import json
import random
import datetime
import os
import time
import hashlib
import string
import base64
import execjs
import re
gIpLink = ''
gDialName = ''
gDialAccount = ''
gDialPasswd =''
openDebug = False
gIpList = []
gStatusCode = ''
gUsername = ''
header = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/64.0.3282.186 Safari/537.36'}
def dict2proxy(dic):
url = dic['type'] + '://' + dic['ip'] + ':' + str(dic['port'])
return {'http':url, 'https':url}
#@函式介紹: 檢驗Ip是否是好的
#@返回值: ip是好的/壞的: True/false
#@data: 20180801
def check_ip(ipDict):
try:
proxie = dict2proxy(ipDict)
#printStr(proxie)
if ipDict['type'] == 'https':
url = 'https://www.ipip.net/'
else:
#url = "http://www.discuz.net/forum.php"
url = 'http://www.lvye.cn/'
r = requests.get(url, headers=header, proxies=proxie, timeout=15)
r.raise_for_status()
isOk = True
except Exception as e:
printStr('經過測試IP無效...')
isOk = False
else:
printStrDebug('經過測試,IP有效...')
#printStr(ipDict)
return isOk
#@函式介紹: 獲取一個可用的代理Ip
#@返回值: 可用的代理Ip
#@data: 20180801
# freeProxieIpGet()
def getProxieIp():
cycleFlag = True
count = 0
#開啟代理ip檔案
with open('資料資料夾\proxies.json', 'r') as file:
jsonStr = file.read()
dataList = json.loads(jsonStr) #data 是個list
listLen = len(dataList)
maxCount = listLen + 5
if listLen == 0:
printStr('proxies.json檔案中沒有可用ip....')
return 'null'
printStr('可用ip個數:%d'%listLen)
#讀取dict格式的ip
while cycleFlag == True:
index = random.randint(0, listLen-1) #隨機生成一個Ip
printStr('隨機生成第%d個ip'%index)
ipDict = dataList[index]
flag = check_ip(ipDict)
if flag == True:
cycleFlag = False
if count > maxCount: #如果迴圈次數超過maxCount次,則暫停迴圈
cycleFlag = False
ipDict = 'null'
count = count + 1
return ipDict
#@函式介紹: 格式化列印輸出
#@返回值: 無
#@data: 20180807
def printStr(str):
nowTime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')#現在
print('[%s]>>>>%s'%(nowTime, str))
def printStrDebug(str):
if openDebug == True:
nowTime=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')#現在
print('[%s]>>>>%s'%(nowTime, str))
else:
pass
#@函式介紹: 檢測代理IP地址 全域性變數是否已經設定
#@返回值: True/False
#@data: 20180810 stefan1240
def payPorxieIpSettingCheck():
global gIpLink
if gIpLink == '':
return False
else:
return True
#功能未完成
def payPorxieIpLinkMake(httpType):
if gIpLink == '':
printStr('IP代理連結為空...')
return 'null'
#@函式介紹: 付費代理Ip池清零
#@返回值:無
#@data: 20180813,stefan1240
def payProxieIpsClear():
global gIpList
gIpList = []
return
#@函式介紹: 付費代理Ip池設定
#@返回值:無
#@data: 20180809,stefan1240
def payProxieIpsUpdate(httpType):
global gIpList
global gIpLink
#初始值判斷
if gIpLink == '':
return
#判斷是哪個平臺的代理IP
if 'zhimacangku' in gIpLink: #
#判斷是需要哪種代理ip
if httpType == 'http':
if 'port=2' in gIpLink: #port 代表IP協議(1:HTTP 2:SOCK5 11:HTTPS )
gIpLink = gIpLink.replace('port=2', 'port=1')
if 'port=11' in gIpLink:
gIpLink.replace('port=3', 'port=1')
else: #https
if 'port=1' in gIpLink: #port 代表IP協議(1:HTTP 2:SOCK5 11:HTTPS )
gIpLink.replace('port=1', 'port=2')
if 'port=11' in gIpLink:
gIpLink.replace('port=11', 'port=2')
#修改返回IP的資料格式,這裡使用txt
if 'type=2' in gIpLink: #type資料格式(1TXT 2JSON 3html)
gIpLink = gIpLink.replace('type=2', 'type=1')
if 'type=3' in gIpLink:
gIpLink.replace('type=3', 'type=1')
if 'num=1' in gIpLink: #一次性獲取的IP個數設定為20
gIpLink = gIpLink.replace('num=1', 'num=20')
#獲取一批量ip,並存於全域性變數
r = requests.get(gIpLink)
ipStr = r.text
#IP資料處理
if 'false' in ipStr: #獲取太頻繁
printStr('代理平臺返回結果:%s'%ipStr)
return
elif 'balde' in ipStr: #欠費了,直接返回False
printStr('代理IP欠費了,請到相應平臺續費')
return
else:
tmpIpList = ipStr.splitlines() #按照行('\r', '\r\n', \n')分隔,返回一個包含各行作為元素的列表,如果引數 keepends 為 False,不包含換行符,如果為 True,則保留換行符。
for strs in tmpIpList:
tmpIpDict = {}
if strs.count(':') >1:
printStr('IP代理格式設定不正確,請核對.')
return
if ":" not in strs:
printStr('可能是IP代理格式設定不正確,返回結果請設定為txt格式.')
return
ip, port = strs.split(':')
tmpIpDict['ip'] = ip
tmpIpDict['port'] = port
tmpIpDict['type'] = httpType
#printStrDebug(tmpIpDict)
gIpList.append(tmpIpDict)
elif 'xdaili' in gIpLink: #訊代理 可以直接按照以下格式組裝所需的API連結:http://api.xdaili.cn/xdaili-api/greatRecharge/getGreatIp?spiderId=xx&orderno=xx&returnType=2&count=xx
if 'spiderId' in gIpLink:
p = gIpLink.find('spiderId') + len('spiderId=')
spiderId = gIpLink[p: gIpLink.find('&', p)]
if 'orderno' in gIpLink:
p = gIpLink.find('orderno') + len('orderno=')
orderno = gIpLink[p: gIpLink.find('&', p)]
fullLink = 'http://api.xdaili.cn/xdaili-api/greatRecharge/getGreatIp?spiderId='+spiderId+'&orderno='+ orderno+'&returnType=1&count=15'
#獲取一批量ip,並存於全域性變數
r = requests.get(fullLink)
ipStr = r.text
if 'ERRORCODE' in ipStr:
p = gIpLink.find('ERRORCODE') + len('ERRORCODE":"')
replyCode = gIpLink[p: gIpLink.find('"', p)]
if (replyCode == '10036' )| (replyCode == '10038') |(replyCode =='10055'):
printStr('提取代理IP太頻繁,請按規定頻率提取!')
if replyCode == '10032':
printStr('今日代理IP提取已達上限,請隔日提取或額外購買')
else:
tmpIpList = ipStr.splitlines() #按照行('\r', '\r\n', \n')分隔,返回一個包含各行作為元素的列表,如果引數 keepends 為 False,不包含換行符,如果為 True,則保留換行符。
for strs in tmpIpList:
tmpIpDict = {}
if strs.count(':') >1:
printStr('IP代理格式設定不正確,請核對.')
return
if ":" not in strs:
printStr('可能是IP代理格式設定不正確,返回結果請設定為txt格式.')
return
ip, port = strs.split(':')
tmpIpDict['ip'] = ip
tmpIpDict['port'] = port
tmpIpDict['type'] = httpType
#printStrDebug(tmpIpDict)
gIpList.append(tmpIpDict)
else: #別的型別平臺
#獲取一批量ip,並存於全域性變數
r = requests.get(gIpLink)
ipStr = r.text
printStr(ipStr)
#IP資料處理
if 'false' in ipStr: #獲取太頻繁
printStr('獲取IP太過頻繁,平臺給予拒絕...')
time.sleep(2)
r = requests.get(gIpLink)
ipStr = r.text
elif 'balde' in ipStr: #欠費了,直接返回False
printStr('代理IP欠費了,請到相應平臺續費')
return
else:
tmpIpDict = {}
tmpIpList = ipStr.splitlines() #按照行('\r', '\r\n', \n')分隔,返回一個包含各行作為元素的列表,如果引數 keepends 為 False,不包含換行符,如果為 True,則保留換行符。
for strs in tmpIpList:
if ":" not in strs:
printStr('IP代理格式設定不正確,返回結果請設定為txt格式.')
return
ip, port = strs.split(':')
tmpIpDict['ip'] = ip
tmpIpDict['port'] = port
tmpIpDict['type'] = httpType
gIpList.append(tmpIpDict)
#printStrDebug('gIpList:%s'%gIpList)
return
#@函式介紹: 付費代理Ip獲取
#@返回值: isGetIpSucess, tmpIpDict
#@data: 20180809,stefan1240
def payProxieIpGet(httpType):
isGetIpSucess = False
tmpIpDict = {}
ret = False
while ret == False:
if len(gIpList) == 0:
payProxieIpsUpdate(httpType)
if len(gIpList) == 0:
isGetIpSucess = False
break
tmpIpDict = gIpList.pop() #彈出一個IP,
ret = check_ip(tmpIpDict)
if ret == True:
isGetIpSucess = True
return isGetIpSucess, tmpIpDict
#@函式介紹: 付費代理Ip獲取
#@返回值: 返回一個可用的dictIp
#@data: 20180809,stefan1240
def payProxieIpGet_old(httpType):
global gIpLink
isSucess = False
isGetIpSucess = True
iCirclMax = 50 #最多迴圈5次獲取
circleIndex = 0
gIpLink = 'http://webapi.http.zhimacangku.com/getip?num=1&type=1&pro=&city=0&yys=0&port=1&pack=25657&ts=0&ys=0&cs=0&lb=1&sb=0&pb=4&mr=1®ions='
""" Ip的格式是這樣的:
http://webapi.http.zhimacangku.com/getip?num=1&type=1&pro=&city=0&yys=0&port=1&time=1&ts=0&ys=0&cs=0&lb=1&sb=0&pb=4&mr=1®ions=
"""
ipDict = {}
while isSucess == False:
if circleIndex > iCirclMax:
isGetIpSucess = False
break;
circleIndex = circleIndex + 1
r = requests.get(gIpLink)
ipStr = r.text
printStr('get:%s'%ipStr)
if 'false' in ipStr: #獲取太頻繁
isSucess = False
time.sleep(1)
elif 'balde' in ipStr: #欠費了,直接返回False
isGetIpSucess = False
break
else:
ipStr = ipStr.strip('\r\n')
ip, port = ipStr.split(':')
ipDict['ip'] = ip
ipDict['port'] = port
ipDict['type'] = httpType
#ret = True
ret = check_ip(ipDict)
if ret == True:
isSucess = True
else:
isSucess = False
return isGetIpSucess, ipDict
#@函式介紹: 付費IP代理連結設定
#@返回值: 成功/失敗: True/False
#@data: 20180809, stefan1240
def payPorxieIplinkSet(ipAddrsLink):
global gIpLink
if ipAddrsLink == '':
printStr('未輸入代理IP連結,請輸入有效的IP代理連結...')
return False
elif not ipAddrsLink.startswith(('http://', 'https://')):
printStr('你輸入的IP代理連結格式不對,請檢查一下格式...')
return False
else:
gIpLink = ipAddrsLink
return True
#@函式介紹: 寬頻引數設定
#@返回值: 成功/失敗: True/False
#@data: 20180809, stefan1240
def dialParaSet(dialName, dialAccount, dialPasswd):
global gDialName
global gDialAccount
global gDialPasswd
gDialName = dialName
gDialAccount = dialAccount
gDialPasswd = dialPasswd
return True
#@函式介紹: 寬頻連線
#@返回值: 成功/失敗: True/False
#@data: 20180809, stefan1240
def dialConnect():
global gDialName
global gDialAccount
global gDialPasswd
if gDialName == '':
return False
cmdStr = 'rasdial %s %s %s'%(gDialName, gDialAccount, gDialPasswd)
ret = os.system(cmdStr)
if ret == 0:
isSucess = True
else:
isSucess = False
printStr('撥號失敗,錯誤程式碼:%s'%ret)
return isSucess
#@函式介紹: 寬頻斷開
#@返回值: 成功/失敗: True/False
#@data: 20180809, stefan1240
def dialDisconnect():
global gDialName
cmdStr = 'rasdial %s /disconnect'%gDialName
ret = os.system(cmdStr)
if ret == 0:
isSucess = True
else:
isSucess = False
return isSucess
#@函式介紹: 寬頻重連
#@返回值: 無
#@data: 20180809, stefan1240
def dialReconnect():
ret = dialDisconnect()
if ret == True:
time.sleep(3)
dialConnect()
time.sleep(1)
#@函式介紹: 隨機生成n箇中文字元
#@返回值: 中文字串
#@data: 20180814, stefan1240
def randomChineseGet(num):
strs = ''
for i in range(num):
val = random.randint(0x4e00, 0x9fbf)
strs = strs + chr(val)
return strs
def randomChar(num):
return ''.join(random.choice(string.ascii_letters) for x in range(num))
def randomInt(num):
strs = ''
for i in range(num):
number = random.randint(1, 9)
strs = strs+ str(number)
return strs
#@函式介紹: md5加密
#@輸入: 字串型
#@返回值: md5加密後的字串
def genearteMD5(strs):
# 建立md5物件
hl = hashlib.md5()
# Tips
# 此處必須宣告encode
# 否則報錯為:hl.update(str) Unicode-objects must be encoded before hashing
hl.update(strs.encode(encoding='utf-8'))
return hl.hexdigest()
#@函式介紹: sh256加密
#@輸入: 字串型
#@返回值: sh256加密後的字串
def genearteSH256(strs):
sha256 = hashlib.sha256()
sha256.update(strs.encode('utf-8'))
return sha256.hexdigest()
#@函式介紹: base64加密
#@輸入: 字串型
#@返回值: base64加密後的字元
def genearteBase64(strs):
strEncode = base64.b64encode(strs.encode('utf-8'))
return strEncode.decode()
#@函式介紹: 新浪web加密某個欄位需要呼叫JS程式碼
#@輸入: servertime, nonce, passwd, pubkey
#@返回值: base64加密後的字元
def xinlanJsEcode(servertime, nonce, passwd, pubkey):
xinlanJsStr = xinlanJsStrGet()
handle = execjs.compile(xinlanJsStr)
ret = handle.call('Getsp', servertime, nonce, passwd, pubkey)
return ret
#@data: 20180814, stefan1240
#dic = {'ip':'182.244.109.222', 'port':'4230', 'type':'https'}
#check_ip(dic)
#------------------------易遊對接相關API---------------------------------------------------------------
#@函式介紹:從配置檔案獲取登入使用者資訊相關
#@輸入值:
#@返回值: gStatusCode, gUsername
#@data: 201808022
def loingAccountInfoGet():
global gStatusCode
global gUsername
file = open('系統檔案(勿動)\para.ini', 'r')
for line in file.readlines():
line = line.strip('\n')
if '<registerInfo>' in line:
para = re.findall("<registerInfo>(.+?)<registerInfo>", line)
gUsername, passwd = para[0].split(':')
if '<retCodeInfo>' in line:
para = re.findall("<retCodeInfo>(.+?)<retCodeInfo>", line)
gStatusCode = para[0]
file.close()
#@函式介紹:獲取使用者狀態
#@輸入值: statusCode:登入成功後返回的狀態碼, userName:使用者名稱
#@返回值: 成功的返回1或者錯誤程式碼,程式碼含義:http://dev.eydata.net/webapi/errorlist
#@data: 201808021
def checkUserStatus():
global gStatusCode
global gUsername
url = 'https://w.eydata.net/aec651134b90f1ab'
data = {
"StatusCode": gStatusCode,
"UserName": gUsername
}
r = requests.post(url, data)
return r.text
#@函式介紹:退出介面
#@輸入值:
#@返回值: 成功的返回1或者錯誤程式碼,程式碼含義:http://dev.eydata.net/webapi/errorlist
#@data: 201808021
def logOut():
statusCode = ''
userName = ''
file = open('系統檔案(勿動)\para.ini', 'r')
for line in file.readlines():
line = line.strip('\n')
if '<registerInfo>' in line:
para = re.findall("<registerInfo>(.+?)<registerInfo>", line)
userName, passwd = para[0].split(':')
if '<retCodeInfo>' in line:
para = re.findall("<retCodeInfo>(.+?)<retCodeInfo>", line)
statusCode = para[0]
file.close()
url = 'https://w.eydata.net/a8a66a6329265b37'
data = {
"StatusCode": statusCode,
"UserName": userName
}
r = requests.post(url, data)
return r.text
#-----------------------------------------------------------------------------------
#@函式介紹:把代理IP連結寫入到配置檔案
#@輸入值: statusCode:登入成功後返回的狀態碼, userName:使用者名稱
#@返回值: 成功的返回1或者錯誤程式碼,程式碼含義:http://dev.eydata.net/webapi/errorlist
#@data: 201808021
def writeIpLinkIntoFile():
global gIpLink
needWrite = True
if gIpLink == '':
printStr('代理IP連結為空,停止寫入到配置檔案...')
ipAddresLink = gIpLink
#寫入配置檔案
strs = '<IplinkInfo>%s<IplinkInfo>\n'%ipAddresLink
#使用w模式會將原本的檔案清空/覆蓋。可以先用讀(r)的方式開啟,寫到記憶體中,然後再用寫(w)的方式開啟。
with open('系統檔案(勿動)\para.ini', 'r' ,encoding="utf-8") as file:
lines = file.readlines()
with open('系統檔案(勿動)\para.ini', 'w',encoding="utf-8") as file:
for line in lines: #替換
if '<IplinkInfo>' in line:
s = line.replace(line, strs)
file.write(s)
needWrite = False
else:
file.write(line)
printStrDebug('needWrite=%s'%needWrite)
if needWrite == True: #首次寫入
with open('系統檔案(勿動)\para.ini', 'a+',encoding="utf-8") as file:
file.write(strs)
"""
articleBody ='hello'
articleTitle = 'hello'
deviceid = 'ab56b4d92b40713acc5af89985d4b786'
uid = '6611942773'
str1 ="act=1&allow_comment=1&allow_repost=1&appid=2&appver=3.0.2&article_body=" + articleBody + "&article_title="+articleTitle
str2 = "&chno=515_112&class_id=0&deviceid=" + deviceid + "&is_secret=0&is_to_weibo=0&login_uid=" +uid
str3 = "&signkey=e3eb41c951f264a6daa16b6e4367e829&sort_id=117"
body = str1 + str2 + str3
sign = genearteSH256(body)
print(sign)
"""