1. 程式人生 > 實用技巧 >騰訊雲函式實現今日校園自動填寫

騰訊雲函式實現今日校園自動填寫

騰訊雲函式實現今日校園自動填寫

說明

  1. 參考github上auto-submit專案
  2. 學習交流使用

工具

騰訊雲函式

注意

1 附件裡面的config.yml只有HFUT可以用,其他學校需要用generate.py來進行配置
2 少部分學校不支援雲函式

使用步驟

  1. 配置config.yml中對應的學號(username)和密碼(password)還有地址(address)等等資訊,詳情請看config.yml中的註釋說明,注意這裡的學號和密碼都是今日校園的學號和密碼
  2. 開啟百度搜索騰訊雲函式,註冊認證後
    1. 進入控制檯
    2. 進入雲函式
    3. 點選左邊的層
    4. 點新建,名稱隨意
    5. 然後點選上傳zip,選擇dependency.zip
      上傳
    6. 選擇執行環境python3.6
    7. 然後點選確定,耐心等待一下,上傳依賴包需要花費的時間比較長, 完成後介面如下
    8. 點左邊的函式服務,新建雲函式

      名稱隨意,執行環境選擇python3.6,建立方式選擇空白函式,然後點選下一步
  3. 提交方法選擇線上編輯,把附件裡面的index.py直接全文複製貼上到雲函式的index.py
  4. 然後點選檔案->新建,檔名命名為config.yml,然後把下面附件裡面config.yml的內容直接全文複製貼上到雲函式的config.yml檔案(別的學校需要自己用generate.py配置好),點選下面的高階設定,設定超時時間為60秒,新增層為剛剛新建的函式依賴層,然後點選完成
  5. 進入新建好的雲函式,左邊點選觸發管理,點選建立觸發器,名稱隨意,觸發週期選擇自定義,然後配置cron表示式,下面的表示式表示每天晚上20:20執行
    0 20 20 * * * *
    

附件

dependcy.zip

dependency.zip

config.yml

#登陸相關配置
login:
  #開放的模擬登陸api伺服器地址
  api: "http://www.zimo.wiki:8080/wisedu-unified-login-api-v1.0/api/login"
#使用者組配置
users:
  #單個使用者配置
  - user:
      #username 學號或者工號
      username: '1111111111'
      #password 密碼
      password: '111111111'
      #address 地址,定位資訊
      address: 中國四川省成都市金牛區一環路北1段-129號-附9號
      #email 接受通知訊息的郵箱
      email: [email protected]
      #school 學校全稱
      school: 合肥工業大學
  #多使用者配置,將下面的註釋去掉即可,如果有表單內容有圖片,不建議使用多使用者配置
#  - user:
#      #username 學號或者工號
#      username: 161105024
#      #password 密碼
#      password: 161105024
#      #address 地址
#      address: 中國四川省成都市金牛區一環路北1段-129號-附9號
#      #email 接受通知訊息的郵箱
#      email: [email protected]
#      school: 宜賓學院
#今日校園相關配置
cpdaily:
  #表單組預設選項配置
  defaults:
    #表單預設選項配置,按順序,注意,只有標必填項的才處理
    - default:
        title: 今日在校情況
        type: 2
        value: 在校,校內住宿
    - default:
        title: 今日所在地
        type: 1
        value: 安徽省/合肥市/蜀山區
    - default:
        title: 今日晨檢體溫
        type: 2
        value: 正常,不發熱
    - default:
        title: 今日午檢體溫
        type: 2
        value: 正常,不發熱
    - default:
        title: 今日晚檢體溫
        type: 2
        value: 正常,不發熱
    - default:
        title: 今日是否有咳嗽、呼吸困難、腹瀉症狀
        type: 2
        value: 否,無上述症狀
    - default:
        title: 今日是否被醫學隔離觀察
        type: 2
        value: 否,未隔離
    - default:
        title: 簡述今日主要行程
        type: 1
        value: 校內
    - default:
        title: 今日晚間返回宿舍(家)時間
        type: 1
        value: 2020-12-19/22:00

Info:
  ServerChan: # 填寫Server醬的SCKEY
  Qsmg: # 填寫Qsmg醬的SCKEY
  Email:
    enable: true
    server: smtp.exmail.qq.com # 填寫郵件的smtp伺服器
    port: 465 # 填寫郵件伺服器的埠號
    account: '' # 郵件伺服器登入使用者名稱
    password: '' # 郵件伺服器登入密碼

generate.py

  1. 功能:生成config.yml,今日校園問題的配置
  2. 如學校為hfut,不用該步驟
  3. 如其他學校,需要輸入自己學校今日校園使用者名稱和密碼資訊
# -*- coding: utf-8 -*-
import index as app
import yaml

# 生成預設配置
def generate():
    config = app.config
    user = config['users'][0]
    apis = app.getCpdailyApis(user)
    session = app.getSession(user, apis['login-url'])
    form = dict(app.queryForm(session, apis))['form']
    # app.log(form)
    defaults = []
    sort = 1
    for formItem in form:
        if formItem['isRequired'] == 1:
            default = {}
            one = {}
            default['title'] = formItem['title']
            default['type'] = formItem['fieldType']
            print('問題%d:' % sort + default['title'])
            if default['type'] == 1 or default['type'] == 5:
                default['value'] = input("請輸入文字:")
            if default['type'] == 2:
                fieldItems = formItem['fieldItems']
                num = 1
                for fieldItem in fieldItems:
                    print('\t%d ' % num + fieldItem['content'])
                    num += 1
                choose = int(input("請輸入序號:"))
                if choose < 1 or choose > num:
                    print('輸入錯誤,請重新執行此指令碼')
                    exit(-1)
                default['value'] = fieldItems[choose - 1]['content']
            if default['type'] == 3:
                fieldItems = formItem['fieldItems']
                num = 1
                for fieldItem in fieldItems:
                    print('\t%d ' % num + fieldItem['content'])
                    num += 1
                chooses = list(map(int, input('請輸入序號(可輸入多個,請用空格隔開):').split()))
                default['value'] = ''
                for i in range(0, len(chooses)):
                    choose = chooses[i]
                    if choose < 1 or choose > num:
                        print('輸入錯誤,請重新執行此指令碼')
                        exit(-1)
                    if i != len(chooses) - 1:
                        default['value'] += fieldItems[choose - 1]['content'] + ','
                    else:
                        default['value'] += fieldItems[choose - 1]['content']
            if default['type'] == 4:
                default['value'] = input("請輸入圖片名稱:")
            one['default'] = default
            defaults.append(one)
            sort += 1
    print('======================分隔線======================')
    print(yaml.dump(defaults, allow_unicode=True))

if __name__ == "__main__":
    generate()

index.py

  1. 功能:自動打卡,需要配置好config.yml
# -*- coding: utf-8 -*-
import sys
import requests
import json
import yaml
import oss2
from urllib.parse import urlparse
from datetime import datetime, timedelta, timezone
from urllib3.exceptions import InsecureRequestWarning
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr

# debug模式
debug = False
if debug:
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


# 讀取yml配置
def getYmlConfig(yaml_file='config.yml'):
    file = open(yaml_file, 'r', encoding="utf-8")
    file_data = file.read()
    file.close()
    config = yaml.load(file_data, Loader=yaml.FullLoader)
    return dict(config)


# 全域性配置
config = getYmlConfig(yaml_file='config.yml')


# 獲取今日校園api
def getCpdailyApis(user):
    apis = {}
    user = user['user']
    schools = requests.get(
        url='https://mobile.campushoy.com/v6/config/guest/tenant/list', verify=not debug).json()['data']
    flag = True
    for one in schools:
        if one['name'] == user['school']:
            if one['joinType'] == 'NONE':
                log(user['school'] + ' 未加入今日校園')
                exit(-1)
            flag = False
            params = {
                'ids': one['id']
            }
            res = requests.get(url='https://mobile.campushoy.com/v6/config/guest/tenant/info', params=params,
                               verify=not debug)
            data = res.json()['data'][0]
            joinType = data['joinType']
            idsUrl = data['idsUrl']
            ampUrl = data['ampUrl']
            if 'campusphere' in ampUrl or 'cpdaily' in ampUrl:
                parse = urlparse(ampUrl)
                host = parse.netloc
                res = requests.get(parse.scheme + '://' + host)
                parse = urlparse(res.url)
                apis[
                    'login-url'] = idsUrl + '/login?service=' + parse.scheme + r"%3A%2F%2F" + host + r'%2Fportal%2Flogin'
                apis['host'] = host

            ampUrl2 = data['ampUrl2']
            if 'campusphere' in ampUrl2 or 'cpdaily' in ampUrl2:
                parse = urlparse(ampUrl2)
                host = parse.netloc
                res = requests.get(parse.scheme + '://' + host)
                parse = urlparse(res.url)
                apis[
                    'login-url'] = idsUrl + '/login?service=' + parse.scheme + r"%3A%2F%2F" + host + r'%2Fportal%2Flogin'
                apis['host'] = host
            break
    if flag:
        log(user['school'] + ' 未找到該院校資訊,請檢查是否是學校全稱錯誤')
        exit(-1)
    log(apis)
    return apis


# 獲取當前utc時間,並格式化為北京時間
def getTimeStr():
    utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc)
    bj_dt = utc_dt.astimezone(timezone(timedelta(hours=8)))
    return bj_dt.strftime("%Y-%m-%d %H:%M:%S")


# 輸出除錯資訊,並及時重新整理緩衝區
def log(content):
    print(getTimeStr() + ' ' + str(content))
    sys.stdout.flush()


# 登陸並返回session
def getSession(user, loginUrl):
    user = user['user']
    params = {
        'login_url': loginUrl,
        # 保證學工號和密碼正確下面兩項就不需要配置
        'needcaptcha_url': '',
        'captcha_url': '',
        'username': user['username'],
        'password': user['password']
    }

    cookies = {}
    # 藉助上一個專案開放出來的登陸API,模擬登陸
    res = requests.post(config['login']['api'], params, verify=not debug)
    cookieStr = str(res.json()['cookies'])
    log(cookieStr)
    if cookieStr == 'None':
        log(res.json())
        return None

    # 解析cookie
    for line in cookieStr.split(';'):
        name, value = line.strip().split('=', 1)
        cookies[name] = value
    session = requests.session()
    session.cookies = requests.utils.cookiejar_from_dict(cookies)
    return session


# 查詢表單
def queryForm(session, apis):
    host = apis['host']
    headers = {
        'Accept': 'application/json, text/plain, */*',
        'User-Agent': 'Mozilla/5.0 (Linux; Android 4.4.4; OPPO R11 Plus Build/KTU84P) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/33.0.0.0 Safari/537.36 yiban/8.1.11 cpdaily/8.1.11 wisedu/8.1.11',
        'content-type': 'application/json',
        'Accept-Encoding': 'gzip,deflate',
        'Accept-Language': 'zh-CN,en-US;q=0.8',
        'Content-Type': 'application/json;charset=UTF-8'
    }
    queryCollectWidUrl = 'https://{host}/wec-counselor-collector-apps/stu/collector/queryCollectorProcessingList'.format(
        host=host)
    params = {
        'pageSize': 6,
        'pageNumber': 1
    }
    res = session.post(queryCollectWidUrl, headers=headers,
                       data=json.dumps(params), verify=not debug)
    if len(res.json()['datas']['rows']) < 1:
        return None

    collectWid = res.json()['datas']['rows'][0]['wid']
    formWid = res.json()['datas']['rows'][0]['formWid']

    detailCollector = 'https://{host}/wec-counselor-collector-apps/stu/collector/detailCollector'.format(
        host=host)
    res = session.post(url=detailCollector, headers=headers,
                       data=json.dumps({"collectorWid": collectWid}), verify=not debug)
    schoolTaskWid = res.json()['datas']['collector']['schoolTaskWid']

    getFormFields = 'https://{host}/wec-counselor-collector-apps/stu/collector/getFormFields'.format(
        host=host)
    res = session.post(url=getFormFields, headers=headers, data=json.dumps(
        {"pageSize": 100, "pageNumber": 1, "formWid": formWid, "collectorWid": collectWid}), verify=not debug)

    form = res.json()['datas']['rows']
    return {'collectWid': collectWid, 'formWid': formWid, 'schoolTaskWid': schoolTaskWid, 'form': form}


# 填寫form
def fillForm(session, form, host):
    sort = 1
    for formItem in form[:]:
        # 只處理必填項
        if formItem['isRequired'] == 1:
            default = config['cpdaily']['defaults'][sort - 1]['default']
            if formItem['title'] != default['title']:
                log('第%d個預設配置不正確,請檢查' % sort)
                exit(-1)
            # 文字直接賦值
            if formItem['fieldType'] == 1 or formItem['fieldType'] == 5:
                formItem['value'] = default['value']
            # 單選框需要刪掉多餘的選項
            if formItem['fieldType'] == 2:
                # 填充預設值
                formItem['value'] = default['value']
                fieldItems = formItem['fieldItems']
                for i in range(0, len(fieldItems))[::-1]:
                    if fieldItems[i]['content'] != default['value']:
                        del fieldItems[i]
            # 多選需要分割預設選項值,並且刪掉無用的其他選項
            if formItem['fieldType'] == 3:
                fieldItems = formItem['fieldItems']
                defaultValues = default['value'].split(',')
                for i in range(0, len(fieldItems))[::-1]:
                    flag = True
                    for j in range(0, len(defaultValues))[::-1]:
                        if fieldItems[i]['content'] == defaultValues[j]:
                            # 填充預設值
                            formItem['value'] += defaultValues[j] + ' '
                            flag = False
                    if flag:
                        del fieldItems[i]
            # 圖片需要上傳到阿里雲oss
            if formItem['fieldType'] == 4:
                fileName = uploadPicture(session, default['value'], host)
                formItem['value'] = getPictureUrl(session, fileName, host)
            log('必填問題%d:' % sort + formItem['title'])
            log('答案%d:' % sort + formItem['value'])
            sort += 1
        else:
            form.remove(formItem)
    # print(form)
    return form


# 上傳圖片到阿里雲oss
def uploadPicture(session, image, host):
    url = 'https://{host}/wec-counselor-collector-apps/stu/collector/getStsAccess'.format(
        host=host)
    res = session.post(url=url, headers={
                       'content-type': 'application/json'}, data=json.dumps({}), verify=not debug)
    datas = res.json().get('datas')
    fileName = datas.get('fileName')
    accessKeyId = datas.get('accessKeyId')
    accessSecret = datas.get('accessKeySecret')
    securityToken = datas.get('securityToken')
    endPoint = datas.get('endPoint')
    bucket = datas.get('bucket')
    bucket = oss2.Bucket(oss2.Auth(access_key_id=accessKeyId,
                                   access_key_secret=accessSecret), endPoint, bucket)
    with open(image, "rb") as f:
        data = f.read()
    bucket.put_object(key=fileName, headers={
                      'x-oss-security-token': securityToken}, data=data)
    res = bucket.sign_url('PUT', fileName, 60)
    # log(res)
    return fileName


# 獲取圖片上傳位置
def getPictureUrl(session, fileName, host):
    url = 'https://{host}/wec-counselor-collector-apps/stu/collector/previewAttachment'.format(
        host=host)
    data = {
        'ossKey': fileName
    }
    res = session.post(url=url, headers={
                       'content-type': 'application/json'}, data=json.dumps(data), verify=not debug)
    photoUrl = res.json().get('datas')
    return photoUrl


# 提交表單
def submitForm(formWid, address, collectWid, schoolTaskWid, form, session, host):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Linux; Android 4.4.4; OPPO R11 Plus Build/KTU84P) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/33.0.0.0 Safari/537.36 okhttp/3.12.4',
        'CpdailyStandAlone': '0',
        'extension': '1',
        'Cpdaily-Extension': '1wAXD2TvR72sQ8u+0Dw8Dr1Qo1jhbem8Nr+LOE6xdiqxKKuj5sXbDTrOWcaf v1X35UtZdUfxokyuIKD4mPPw5LwwsQXbVZ0Q+sXnuKEpPOtk2KDzQoQ89KVs gslxPICKmyfvEpl58eloAZSZpaLc3ifgciGw+PIdB6vOsm2H6KSbwD8FpjY3 3Tprn2s5jeHOp/3GcSdmiFLYwYXjBt7pwgd/ERR3HiBfCgGGTclquQz+tgjJ PdnDjA==',
        'Content-Type': 'application/json; charset=utf-8',
        # 請注意這個應該和配置檔案中的host保持一致
        'Host': host,
        'Connection': 'Keep-Alive',
        'Accept-Encoding': 'gzip'
    }

    # 預設正常的提交引數json
    params = {"formWid": formWid, "address": address, "collectWid": collectWid, "schoolTaskWid": schoolTaskWid,
              "form": form}
    # print(params)
    submitForm = 'https://{host}/wec-counselor-collector-apps/stu/collector/submitForm'.format(
        host=host)
    r = session.post(url=submitForm, headers=headers,
                     data=json.dumps(params), verify=not debug)
    msg = r.json()['message']
    return msg

title_text = '今日校園疫結果通知'

# 傳送郵件通知
def sendMessage(send, msg):
    if send != '':
        log('正在傳送郵件通知。。。')
        res = requests.post(url='http://www.zimo.wiki:8080/mail-sender/sendMail',
                            data={'title': title_text, 'content': getTimeStr() + str(msg), 'to': send})

        code = res.json()['code']
        if code == 0:
            log('傳送郵件通知成功。。。')
        else:
            log('傳送郵件通知失敗。。。')
            log(res.json())

def sendEmail(send,msg):
    my_sender= config['Info']['Email']['account']   # 發件人郵箱賬號
    my_pass = config['Info']['Email']['password']         # 發件人郵箱密碼
    my_user = send      # 收件人郵箱賬號,我這邊傳送給自己
    try:
        msg=MIMEText(getTimeStr() + str(msg),'plain','utf-8')
        msg['From']=formataddr(["FromRunoob",my_sender])  # 括號裡的對應發件人郵箱暱稱、發件人郵箱賬號
        msg['To']=formataddr(["FK",my_user])              # 括號裡的對應收件人郵箱暱稱、收件人郵箱賬號
        msg['Subject']=title_text               # 郵件的主題,也可以說是標題

        server=smtplib.SMTP_SSL(config['Info']['Email']['server'], config['Info']['Email']['port'])  # 發件人郵箱中的SMTP伺服器,埠是25
        server.login(my_sender, my_pass)  # 括號中對應的是發件人郵箱賬號、郵箱密碼
        server.sendmail(my_sender,[my_user,],msg.as_string())  # 括號中對應的是發件人郵箱賬號、收件人郵箱賬號、傳送郵件
        server.quit()  # 關閉連線
    except Exception:  # 如果 try 中的語句沒有執行,則會執行下面的 ret=False
        log("郵件傳送失敗")
    else: print("郵件傳送成功")

# server醬通知
def sendServerChan(msg):
    log('正在傳送Server醬。。。')
    res = requests.post(url='https://sc.ftqq.com/{0}.send'.format(config['Info']['ServerChan']),
                            data={'text': title_text, 'desp': getTimeStr() + "\n" + str(msg)})
    code = res.json()['errmsg']
    if code == 'success':
        log('傳送Server醬通知成功。。。')
    else:
        log('傳送Server醬通知失敗。。。')
        log('Server醬返回結果'+code)

# Qmsg醬通知
def sendQmsgChan(msg):
    log('正在傳送Qmsg醬。。。')
    res = requests.post(url='https://qmsg.zendee.cn:443/send/{0}'.format(config['Info']['Qsmg']),
                            data={'msg': title_text + '\n時間:' + getTimeStr() + "\n 返回結果:" + str(msg)})
    code = res.json()['success']
    if code:
        log('傳送Qmsg醬通知成功。。。')
    else:
        log('傳送Qmsg醬通知失敗。。。')
        log('Qmsg醬返回結果'+code)

# 綜合提交
def InfoSubmit(msg, send=None):
    if(None != send):
        if(config['Info']['Email']['enable']): sendEmail(send,msg)
        else: sendMessage(send, msg)
    if(config['Info']['ServerChan']): sendServerChan(msg)
    if(config['Info']['Qsmg']): sendQmsgChan(msg)


def main_handler(event, context):
    try:
        for user in config['users']:
            log('當前使用者:' + str(user['user']['username']))
            apis = getCpdailyApis(user)
            log('指令碼開始執行。。。')
            log('開始模擬登陸。。。')
            session = getSession(user, apis['login-url'])
            if session != None:
                log('模擬登陸成功。。。')
                log('正在查詢最新待填寫問卷。。。')
                params = queryForm(session, apis)
                if str(params) == 'None':
                    log('獲取最新待填寫問卷失敗,可能是輔導員還沒有釋出。。。')
                    InfoSubmit('沒有新問卷')
                    exit(-1)
                log('查詢最新待填寫問卷成功。。。')
                log('正在自動填寫問卷。。。')
                form = fillForm(session, params['form'], apis['host'])
                log('填寫問卷成功。。。')
                log('正在自動提交。。。')
                msg = submitForm(params['formWid'], user['user']['address'], params['collectWid'],
                                 params['schoolTaskWid'], form, session, apis['host'])
                if msg == 'SUCCESS':
                    log('自動提交成功!')
                    InfoSubmit('自動提交成功!', user['user']['email'])
                elif msg == '該收集已填寫無需再次填寫':
                    log('今日已提交!')
                    # InfoSubmit('今日已提交!', user['user']['email'])
                    InfoSubmit('今日已提交!')
                else:
                    log('自動提交失敗。。。')
                    log('錯誤是' + msg)
                    InfoSubmit('自動提交失敗!錯誤是' + msg, user['user']['email'])
                    exit(-1)
            else:
                log('模擬登陸失敗。。。')
                log('原因可能是學號或密碼錯誤,請檢查配置後,重啟指令碼。。。')
                exit(-1)
    except Exception as e:
        InfoSubmit("出現問題了!"+str(e))
        raise e
    else:
        return 'success'


# 配合Windows計劃任務等使用
if __name__ == '__main__':
    print(main_handler({}, {}))
    # for user in config['users']:
    #     log(getCpdailyApis(user))