1. 程式人生 > 其它 >Akshare 獲取日線策略併發送郵件

Akshare 獲取日線策略併發送郵件

import akshare as ak
import time
# import datetime
import numpy as np
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from email.message import EmailMessage
import logging
import os

nowtime = datetime.now()


day_nums = 1  # 使用前一天的收盤價資料做訊號判斷
stock_num = 1  # 買入評分最高的前stock_num只股票 可以修改
momentum_day = 20  # 最新動量參考最近momentum_day的
ref_stock = 'sh000300'  # 用ref_stock做擇時計算的基礎資料
N = 18  # 計算最新斜率slope,擬合度r2參考最近N天
M = 600  # 計算最新標準分zscore,rsrs_score參考最近M天
score_threshold = 0.7  # rsrs標準分指標閾值



def get_index_list(index_symbol='sh000068'):
    stocks2 = []
    stocks = ak.index_stock_hist(index_symbol).stock_code
    for stock in stocks[:]:
        if int(stock) < 100000:
            stock = 'sz' + stock
        else:
            stock = 'sh' + stock
        stocks2.append(stock)
    return stocks2


stock_pool = get_index_list()


# 找到有交易訊號的股票,為之後交易進行準備


# 動量因子:由收益率動量改為相對MA90均線的乖離動量
def get_rank(stock_pool):
    rank, biasN = [], 90
    for stock in stock_pool:
        # print(stock)
        from_date = '2010-01-01'
        from_date = datetime.strptime(from_date, "%Y-%m-%d")
        day_nums = 1
        current_dt = time.strftime("%Y-%m-%d", time.localtime())
        current_dt = datetime.strptime(current_dt, '%Y-%m-%d')
        previous_date = current_dt - timedelta(days=day_nums)
        #         data = jq.get_price(stock, end_date=previous_date, count=biasN +
        #                             momentum_day, frequency='daily', fields=['close'])
        try:
            data = ak.stock_zh_a_daily(symbol=stock, start_date=from_date, end_date=previous_date)
        except:
            pass
        # print('2222',data.head())
        bias = np.array((data.close / data.close.rolling(biasN).mean())[-momentum_day:])  # 乖離因子
        #         print(bias)
        #         print(bias[0])
        score = np.polyfit(np.arange(momentum_day), bias / bias[0], 1)[0].real  # 乖離動量擬合
        rank.append([stock, score])
    rank.sort(key=lambda x: x[-1], reverse=True)
    return rank[0]


# 線性迴歸:復現statsmodels的get_OLS函式
def get_ols(x, y):
    slope, intercept = np.polyfit(x, y, 1)
    r2 = 1 - (sum((y - (slope * x + intercept)) ** 2) / ((len(y) - 1) * np.var(y, ddof=1)))
    return (intercept, slope, r2)


def get_zscore(slope_series):
    mean = np.mean(slope_series)
    std = np.std(slope_series)
    return (slope_series[-1] - mean) / std


# 擇時過程 ----->--------------------------------------------
def initial_slope_series():
    current_dt = time.strftime("%Y-%m-%d", time.localtime())
    current_dt = datetime.strptime(current_dt, '%Y-%m-%d')
    from_date = '2010-01-01'
    from_date = datetime.strptime(from_date, "%Y-%m-%d")
    previous_date = current_dt - timedelta(days=day_nums)
    data = ak.stock_zh_index_daily(symbol=ref_stock)
    data['date'] = data['date'].apply(lambda x: str(x))
    data['date'] = data['date'].apply(lambda x: datetime.strptime(str(x), '%Y-%m-%d'))
    data = data[(data['date'] >= from_date) & (data['date'] <= previous_date)]
    return [get_ols(data.low[i:i + N], data.high[i:i + N])[1] for i in range(M)]


# 只看RSRS因子值作為買入、持有和清倉依據,前版本還加入了移動均線的上行作為條件
def get_timing_signal(stock):
    current_dt = time.strftime("%Y-%m-%d", time.localtime())
    current_dt = datetime.strptime(current_dt, '%Y-%m-%d')
    previous_date = current_dt - timedelta(days=day_nums)
    from_date = '2010-01-01'
    from_date = datetime.strptime(from_date, "%Y-%m-%d")
    data = ak.stock_zh_index_daily(symbol=ref_stock)
    data['date'] = data['date'].apply(lambda x: str(x))
    data['date'] = data['date'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d'))
    data['date'] = data['date'].apply(lambda x: x.to_pydatetime())
    # data = data[data['date']>=from_date & data['date']<= previous_date]
    data = data[(data['date'] >= from_date) & (data['date'] <= previous_date)]
    intercept, slope, r2 = get_ols(data.low, data.high)
    slope_series.append(slope)
    rsrs_score = get_zscore(slope_series[-M:]) * r2
    print('rsrs_score {:.3f}'.format(rsrs_score))
    if (rsrs_score > score_threshold):
        return "BUY"
    elif (rsrs_score < -score_threshold):
        return "SELL"
    else:
        return "KEEP"


# slope_series = initial_slope_series()[:-1]  # 除去回測第一天的 slope ,避免執行時重複加入
slope_series = initial_slope_series()[:-1]


def my_trade():
    # print(stock_pool)
    # print(get_rank(stock_pool))
    check_out_list = get_rank(stock_pool)
    timing_signal = get_timing_signal(ref_stock)
    message = ""
    if len(check_out_list) > 0:
        each_check_out = check_out_list[0]
        #         security_info = jq.get_security_info(each_check_out)
        #         stock_name = security_info.display_name
        #         stock_code = each_check_out
        print('今日自選股:{}({})'.format(each_check_out, each_check_out))
        if timing_signal == 'SELL':
            #             for stock in list(positions.keys()):
            #                 close_position(stock)
            #                 message = '清倉!賣賣賣!'
            #                 message += "\r\n\r\n".join(positions.keys())
            #                 positions.clear()
            #                 print('今日擇時訊號:{}'.format(timing_signal))
            pass
        else:
            message = "今日自選股:{}({})".format(each_check_out, each_check_out)
            # adjust_position([each_check_out])
        print(message)
        sendMail(message)


def mail(message):
    ret = True

    try:

        # 定義SMTP郵件伺服器地址
        smtp_server = 'smtp.qq.com'
        # 郵件傳送人郵箱
        from_addr = 'x x x x x x [email protected]'  # 自己的郵想
        # 郵件傳送人郵箱密碼
        password = 'xxxxxxxx  # 郵箱密碼
        # 郵件接收人
        to_addr = '[email protected]'  # 測試接收郵件地址郵箱

        # 建立SMTP連線
        conn = smtplib.SMTP_SSL(smtp_server, 465)
        # 設計除錯級別
        conn.set_debuglevel(1)
        # 登入郵箱
        conn.login(from_addr, password)
        # 建立郵件內容物件
        msg = EmailMessage()
        # 設定郵件內容
        msg.set_content('{}'.format(message), 'plain', 'utf-8')
        msg['Subject'] = '現在時間為:{}'.format(nowtime)
        msg['From'] = '星涅'
        msg['To'] = '我摯愛的朋友'
        # 傳送郵件
        conn.sendmail(from_addr, [to_addr], msg.as_string())
        # 退出連線
        conn.quit()



    except Exception as e:  # 如果 try 中的語句沒有執行,則會執行下面的 ret = False
        ret = False
        print(e)

    return ret


def sendMail(message):
    ret = 0
    for _ in range(10):
        if ret:
            # 郵件傳送成功推出
            break
        else:
            # 沒有傳送成功或失敗繼續
            ret = mail(message)
            time.sleep(1)


if __name__ == '__main__':
    # positions["159928.XSHE"] = 100
    my_trade()