1. 程式人生 > >使用python腳本提取數據

使用python腳本提取數據

limit usr a10 gets ram console open tab spa

版權聲明:本文為博主原創文章,轉載請註明出處:https://www.cnblogs.com/sgqhappy/p/9956956.html

我們經常用到數據提取的Hive Sql的編寫,每次數據提取都得進行hive的編寫,為了將這種重復性強的運行命令簡單化自動化人性化,我特地編寫了一個python腳本,可以實現數據清洗,數據處理,計數下發,讀寫文件,保存日誌等功能。

1. 導包

 1 #!/usr/bin/python
 2 #coding:utf-8
 3 
 4 ‘‘‘
 5 Made by sgqhappy
 6 Date: 20181113
 7 function: data extract
 8 ‘‘‘
9 10 from subprocess import Popen,PIPE 11 import os 12 import sys 13 import io 14 import re 15 import commands 16 import logging 17 from logging import handlers 18 from re import match

2. 定義一個類,用來打印腳本運行的log日誌

日誌既可以打印在控制臺上,也可以輸出到log文件。

技術分享圖片
 1 class Logger(object):
 2     def __init__(self,log_file_name,log_level,logger_name):
3 self.__logger = logging.getLogger(logger_name); 4 self.__logger.setLevel(log_level); 5 file_handler = logging.FileHandler(log_file_name); 6 console_handler = logging.StreamHandler(); 7 8 #set log format and show log at console and log_file. 9
LOG_FORMAT = "%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s : %(message)s"; 10 formatter = logging.Formatter(LOG_FORMAT); 11 12 file_handler.setFormatter(formatter); 13 console_handler.setFormatter(formatter); 14 15 self.__logger.addHandler(file_handler); 16 self.__logger.addHandler(console_handler); 17 18 def get_log(self): 19 return self.__logger;
View Code

3. 定義文件名及文件路徑

技術分享圖片
 1 #This is file name.
 2     file_name = "%s_%s_%s" % (sys.argv[2],sys.argv[4],sys.argv[11]);
 3     info_log_path = /python_test/%s.info.log % (file_name);
 4     
 5     #this is record name and path.
 6     record_name = "data_extract_record.txt";
 7     record_path = "/python_test/";
 8     
 9     logger = Logger(log_file_name="%s" % (info_log_path),log_level=logging.DEBUG,logger_name="myLogger").get_log();
10     
11     #this is log path.
12     path = /python_test/%s.desc.log % (file_name);
13     logger.info("\n");
14     logger.info("log path: %s" % (path));
15     logger.info("\n");
View Code

4. 提取字段信息保存

技術分享圖片
 1 #function:write all fields to log file.
 2     hive_cmd_desc = beeline -u ip -n username -e "desc %s.%s" >> %s % (sys.argv[1],sys.argv[2],path);
 3     logger.info(hive_cmd_desc);
 4     logger.info("\n");
 5     status,output = commands.getstatusoutput(hive_cmd_desc);
 6     logger.info(output);
 7     logger.info("\n");
 8     
 9     #logger.info success or failed information.
10     if status ==0:
11         logger.info("desc %s to %s successful!" % (sys.argv[2],path));
12     else:
13         #set color: ‘\033[;31;40m‘+...+‘\033[0m‘
14         logger.error(\033[;31;40m+"desc %s to %s failed!" % (sys.argv[2],path)+\033[0m);
15         #exit program.
16         exit();
17     logger.info("\n");
View Code

5. 字符串處理

技術分享圖片
 1 #this is fields list
 2     fields_list = [];
 3     with io.open(path,r,encoding="utf-8") as f:
 4         fields = list(f);
 5         for line in fields:
 6             #remove start letter "|".
 7             line_rm_start_letter = line.strip("|");
 8             logger.info(line_rm_start_letter);
 9             #remove start and end space.
10             pos = line_rm_start_letter.find("|");
11             fields_list.append(line_rm_start_letter[0:pos].strip());
12     logger.info("\n");
13     
14     #remove desc.log.
15     remove_desc_log = rm %s % (path);
16     logger.info(remove_desc_log);
17     status,output = commands.getstatusoutput(remove_desc_log);
18     
19     #logger.info success or failed information.
20     if status == 0:
21         logger.info("remove %s successful!" % (path));
22     else:
23         logger.error(\033[;31;40m+"remove %s failed!" % (path)+\033[0m);
24         exit();
25     logger.info("\n");
26     
27     #remove the first three lines.
28     del fields_list[0:3];
29     create = "";
30     start_or_etl = "";
31     if etl_load_date in fields_list:
32         start_or_etl = "etl_load_date";
33         end_letter_pos = fields_list.index("etl_load_date");
34         len = len(fields_list);
35         del fields_list[end_letter_pos:len+1];
36     if start_dt in fields_list:
37         start_or_etl = "start_dt";
38         end_letter_pos = fields_list.index("start_dt");
39         len = len(fields_list);
40         del fields_list[end_letter_pos:len+1];    
View Code

6. 添加附加條件

技術分享圖片
 1 #add condition_field.
 2     condition_field = "%s" % (sys.argv[3]);
 3     if condition_field == "0":
 4         pass;
 5     else:
 6         start_or_etl = condition_field;
 7         
 8     for i in fields_list:
 9         #logger.info(len(i));
10         logger.info(i);
11     logger.info("\n");
View Code

7. 拼接字段

技術分享圖片
1 #splice fields.
2     fields_splice = "";
3     for i in fields_list:
4         fields_splice = fields_splice+"nvl(a.\`"+i+"\`,‘‘),‘|‘,";
5     logger.info(fields_splice);
6     logger.info("\n");
View Code

8. 建表

技術分享圖片
 1 #create table command.
 2     add_conditions = "%s" % (sys.argv[9]);
 3     if add_conditions == "and 1=1":
 4         create = "create table if not exists database.%s stored as textfile as select concat (%s from %s.%s a join %s b on trim(a.\`%s\`)=trim(b.\`%s\`) where b.code=‘%s‘ and a.\`%s\`>=‘%s‘ and a.\`%s\`<=‘%s‘ %s;" % (file_name,fields_splice,sys_argv[1],sys.argv[2],sys.argv[6],sys.argv[7],sys.argv[8],sys.argv[4],start_or_etl,sys.argv[10],start_or_etl,sys.argv[11],sys.argv[9]);
 5     else:
 6         create = "create table if not exists database.%s stored as textfile as select concat(%s from %s.%s a %s;" % (file_name,fields_splice,sys.argv[1],sys.argv[2],sys.argv[9]);
 7     logger.info(create);
 8     logger.info("\n");
 9     
10     #execute the command.
11     hive_cmd_create = beeline -u ip -n username -e "%s" % (create);
12     logger.info(hive_cmd_create);
13     logger.info("\n");
14     status,output = commands.getstatusoutput(hive_cmd_create);
15     logger.info(output);
16     logger.info("\n");
17     
18     #logger.info success or failed information.
19     if status ==0:
20         logger.info("create database.%s successful!" % (file_name));
21     else:
22         #set color: ‘\033[;31;40m‘+...+‘\033[0m‘
23         logger.error(\033[;31;40m+"create database.%s failed!" % (file_name)+\033[0m);
24         #exit program.
25         exit();
26     logger.info("\n");
View Code

9. 計數

技術分享圖片
 1 #count table_new command.
 2     count = "select count(*) from database.%s;" % (file_name);
 3     logger.info(count);
 4     logger.info("\n");
 5     
 6     #execute the command.
 7     hive_cmd_count = beeline -u ip -n username -e "%s" % (count);
 8     logger.info(hive_cmd_count);
 9     logger.info("\n");
10     status,output = commands.getstatusoutput(hive_cmd_count);
11     
12     #logger.info success or failed information.
13     if status ==0:
14         logger.info("count database.%s successful!" % (file_name));
15     else:
16         #set color: ‘\033[;31;40m‘+...+‘\033[0m‘
17         logger.error(\033[;31;40m+"count database.%s failed!" % (file_name)+\033[0m);
18         #exit program.
19         exit();
20     logger.info("\n");
21     logger.info(output);
22     logger.info("\n");
View Code

10. 提取數量

技術分享圖片
 1 #extract number.
 2     output_split = output.split("\n");
 3     number = output_split[7].strip("|").strip();
 4     result = re.match(r"^\d+$",number);
 5     if result:
 6         #logger.info count.
 7         logger.info("The number matched success!");
 8         logger.info(\033[1;33;40m+"The count is : %s" % (number)+\033[0m);
 9         logger.info("\n");
10     else:
11         logger.warning("The number matched failed!");
View Code

11. 抽樣查看數據的準確性

技術分享圖片
 1 #show the first five data.
 2     first_five_data = "select * from database.%s limit 5;" % (file_name);
 3     logger.info(first_five_data);
 4     logger.info("\n");
 5     
 6     #execute the command.
 7     hive_first_five_data = beeline -u ip -n username -e "%s" % (first_five_data);
 8     logger.info(hive_first_five_data);
 9     logger.info("\n");
10     status,output = commands.getstatusoutput(hive_first_five_data);
11     
12     #logger.info success or failed information.
13     if status == 0:
14         logger.info("show the first five data of database.%s successful!" % (file_name));
15     else:
16         #set color: ‘\033[;31;40m‘+...+‘\033[0m‘
17         logger.error(\033[;31;40m+"show the first five data of database.%s failed!" % (file_name)+\033[0m);
18         #exit program.
19         exit();
20     logger.info("\n");
21     
22     #logger.info the first five data.
23     logger.info(\033[1;33;40m+"the first five data are : \n\n%s" % (output)+\033[0m);
24     logger.info("\n");
View Code

12. 記錄相關信息到文件

技術分享圖片
 1 #append to record.txt.
 2     output = open("%s%s" % (record_path,record_name),a);
 3     if add_conditions == "and 1=1":
 4         output.write("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n" % (database_name,table_name,code,extract_date,count,rel_tb_name,rel_field_name_pre,rel_field_name_after,date_pre,date_after));
 5         output.write("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n" % (sys.argv[1],sys.argv[2],sys.argv[4],sys.argv[5],number,sys.argv[6],sys.argv[7],sys.argv[8],sys.argv[10],sys.argv[11]));
 6     else:
 7         output.write("%s\t%s\t%s\t%s\t%s\t%s\n" % (database_name,table_name,code,extract_date,count,add_conditions));
 8         output.write("%s\t%s\t%s\t%s\t%s\t%s\n" % (sys.argv[1],sys.argv[2],sys.argv[4],sys.argv[5],number,sys.argv[9]));
 9     output.close();
10     
11     #logger.info the data extraction success information.
12     logger.info(\033[1;35;40m+"*****Data extract success!*****"+\033[0m);
13     logger.info(\033[1;35;40m+"*****Made by sgqhappy in %s!*****" % (sys.argv[5])+\033[0m);
14     logger.info("\n");
View Code

作者:sgqhappy
出處:https://www.cnblogs.com/sgqhappy/p/9956956.html
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接。

使用python腳本提取數據