1. 程式人生 > 其它 >訊息處理之時間格式轉化 | Pandas 真的加速嗎?

訊息處理之時間格式轉化 | Pandas 真的加速嗎?

前言

訊息中字串轉時間戳是比較耗時間的,Pandas 在這塊是尖刀,不過用法上還是要留點心的,不小心就白乾了一場。

訊息處理與耗時

  • 單條資訊為JSON,大約572位元組
  • 訊息時間為 '2021/09/28 00:03:45.227895784'
  • 單次批量為 1000條

原始處理抽象程式碼如下:

def to_timestamp(dt):
    """dt轉化為時間戳"""
    return time.mktime((dt.timetuple()))


def make_key(message):
    """生成唯一建"""
    return '%s_%s' % (message.get('
VolId'), message.get('Id')) def formatter(message): """ 簡單時間處理 :param message: create_time: '2021/08/12 01:01:19.220461019' Wait: '6333991us' :return: """ wait = message.get('Wait') u_wait = wait[:-2] wait_delta = timedelta(microseconds=int(u_wait)) log_time
= message.get("create_time") u_time = log_time.split('.')[0] u_time_obj = datetime.strptime(u_time, '%Y/%m/%d %H:%M:%S') alert_begin = u_time_obj - wait_delta alert_ts = to_timestamp(alert_begin) message['begin_ts'] = alert_ts message['alert_ts'] = alert_ts message['alert_count
'] = 0 message['Wait'] = float(u_wait) / 1e6 return message def cls_message_pure(raws): """訊息分揀""" slow_dict = {} pending_dict = {} for message in raws: key = make_key(message) if message.get('flag') == 'Pending': pending_dict[key] = formatter(message) elif message.get('flag') == 'Slow': slow_dict[key] = formatter(message) else: pass return slow_dict, pending_dict

對訊息做初步處理,生產主鍵,訊息建立時間格式化,以及一些報警相關初始化。然後再做資料進行分揀,簡單統計耗時如下

def run(data):
    t = time.time()
    cls_message_pure(data)
    print("formatter dt items expand %s" %(time.time() - t))


formatter dt items expand 0.0396201610565

才1千條資料分揀,業務邏輯還沒上,就花了39毫秒,其實很慢了。

Pandas粉墨登場 | 批量加速?

對照 Pandas手冊,全部都能搞定,程式碼如下:

def pandas_formatter(raws):
    t = time.time()
    df = pandas.DataFrame(raws)
    print "init %s" % (time.time() - t)

    slow_dict = {}
    pending_dict = {}
    # 一行直接批量轉為時間戳,真香!
    df['create_time'] = pd.to_datetime(df['create_time'], utc='Asia/Shanghai').astype('int64')/1e9
    df['Wait'] = df['Wait'].str[:-2].astype('int64')/1e6
    df['alert_ts'] = df['begin_ts'] = df['create_time'] - df['Wait']
    df['alert_count'] = 0
    # 直接欄位合併生產主鍵,看起來也香(實際效能不太行)
    df['key'] = df['VolId'].str.cat(df['Id'].astype('str').str, sep='_')
    print "traslate %s" % (time.time() - t)


    # 直接分揀,看起來也香!(實際真呵呵)
    groups = df.groupby(df.flag)
    slow_df = groups.get_group('Slow')
    pending_df = groups.get_group('Pending')
    print "cls %s" % (time.time() - t)

    # 還有 to_dict,還真是貼心。(實際呵呵的 N 次方)
    for _, k in slow_df.iterrows():
        item = k.to_dict()
        slow_dict[item['key']] = item
    for _, k in pending_df.iterrows():
        item = k.to_dict()
        pending_dict[item['key']] = item

    return slow_dict, pending_dict

一套批量操作全搞定,感覺不錯,加個統計日誌,來驗證下:

init 0.0113050937653
traslate 0.0349180698395
cls 0.0542259216309
formatter run_pd items expand 0.358073949814

看到這個結果,我瞬間石化了!

各取所長,綜合實戰

個人認為 Pandas 比較擅長列處理,在時間處理上有大幅度優化。在列表與DataFrame 來回轉化耗時很大,按列直接輸出效能很高。最終程式碼如下:

def pandas_formatter2(raws):
    t = time.time()
    ts = list(i['create_time'] for i in raws)
    series = pd.to_datetime(ts, utc='Asia/Shanghai').astype('int64')/1e9
    df = series.to_list()

    slow_dict = {}
    pending_dict = {}
    for index, message in enumerate(raws):
        wait = message.get('Wait')
        u_wait = float(wait[:-2]) / 1e6
        message['alert_ts'] = message['begin_ts'] = df[index] - u_wait
        message['alert_count'] = 0
        message['Wait'] = u_wait

        key = make_key(message)
        if message.get('flag') == 'Pending':
            pending_dict[key] = message
        elif message.get('flag') == 'Slow':
            slow_dict[key] = message
        else:
            pass
    return slow_dict, pending_dict

測試結果如下:

formatter run_pd2 items expand 0.00854301452637

終於起到加速效果!