爬蟲(三) redis&分散式爬蟲
阿新 • • 發佈:2018-11-09
redis
redis, 稱為記憶體資料庫, 以key-value的形式存放資料, 是一個非關係型資料庫
redis 提供類豐富的資料型別, 其有 string list map set sortSet 五種資料型別
redis 的資料型別指的是value的資料型別, key都是String型別的
1. 持久化
- RDB (預設開啟): 是一種基於快照機制來實現的持久化的方案, 可以把快照看做是照相機的照片, 可以將redis的某一個時刻記錄下來, 然後儲存到本地磁碟上, 一般情況下 快照檔案都比較小 只有幾kb
- 優點: 快照檔案一般很小, 適合於做災難恢復
- 缺點: 資料丟失的風險
- AOF: 是一種基於日誌機制的持久化方案, 可以將使用者操作的命令, 整體的都儲存下來, 儲存到一個本地磁碟上, 而一般這個檔案比較龐大, 最大有時候好幾T的檔案
- 優點: 資料的丟失的風險比較小
- 缺點: 不適合做災難恢復
- RDB: redis預設是打開了RDB的
save 900 1 : 在 900秒之內, 如果有1個數據傳送了改變, 就會執行一次儲存 save 300 10 : 在 300秒之內, 如果有10個數據傳送了改變, 就會執行一次儲存 save 60 10000 : 在 60秒之內, 如果有10000 個數據傳送了改變, 就會執行一次儲存
會發生資料丟失了, 極端: 在不到5分鐘的時間內, 丟失 9999個數據
- AOF: redis 預設是關閉了AOF機制的
appendonly yes : 是否開啟AOF機制 yes 已經開啟 no 表示未開啟
appendfsync everysec : AOF的儲存機制 [ always everysec no ]
- always: 總是,
- 會將使用者每一次的命令都會執行一次儲存的操作
- 缺點: 大大的影響redis的效能
- 會將使用者每一次的命令都會執行一次儲存的操作
- everysec: 每秒執行
- 會每秒鐘執行一次儲存操作
- no : redis不去主動的進行儲存
- 依賴於作業系統進行儲存操作(linux 大約 30分鐘)
- 缺點: 會丟失30分鐘的資料
- 依賴於作業系統進行儲存操作(linux 大約 30分鐘)
一般來redis的是不會宕機的, 除非redis伺服器的記憶體爆滿
jedis
jedis, 是一款用於連線redis的java API
1. 連線redis
//1. 建立jedis物件
Jedis jedis = new Jedis("192.168.4.200", 6379);
//2. 檢測是否是互通的
String pong = jedis.ping();
2. API
2.1 redis
驗證密碼 jedis.auth(“password”) 連線 jedis.connect() 斷開連線 jedis.disconnect() 獲取redis中所有key jedis.keys("*") 獲取redis中特定的key jedis.key(“pattern”) 移除redis中的key jedis.del(“key1”…) 移除key的生存時間 jedis.persist(“key1”) 檢查key是否存在 jedis.exists(“key1”) 給key改名 jedis.rename(“oldKeyName”,“newKeyName”) 返回key對應的value型別 jedis.type(“key1”) 清空redis中所有的key jedis.flushAll() 返回redis中key的個數 jedis.dbSize()
2.2 String
- 特點: 儲存所有的字元和字串
- 應用場景: 做快取使用
新增資料 jedis.set(“name”,“張三”) 獲取value jedis.get(“name”) value自增+1 jedis.incr(“age”) value自減-1 jedis.decr(“age”) value增指定值 jedis.incrBy(“100”, 10L) value拼接 jedis.append(“name”,“xixi”) 存多條資料 jedis.mset(“aaa”,“123”,“bbb”,“456”…) 取多條資料 jedis.mget(“aaa”,“bbb”…) 設定key有效期 jedis.setex(“sex”,5,“man”) 給存在的key設定有效期 jedis.expire(“sex”,20) 檢視key的有效期 jedis.ttl(“sex”);
2.3 list
- 特點: 相當於java中linkList, 是一個連結串列的結構
- 應用場景: 做任務佇列,
- 在java中客戶端提供了執行緒安全獲取集合資料的方式
左側插入 jedis.lpush(“list1”,“張三”,“李四”,“王五”…) 右側彈出 jedis.rpop(“list1”) 插入指定元素之後 jedis.linsert(“list1”,BinaryClient.LIST_POSITION.AFTER,“李四”,“趙六”) 插入指定元素之前 jedis.linsert(“list1”, BinaryClient.LIST_POSITION.BEFORE,“李四”,“陳七”); 擷取n個元素 jedis.ltrim(“list1”,0,2) 獲得list中多個value jedis.lrange(“list1”,0,-1) -1表示所有 返回列表的長度 jedis.llen(“list1”) 刪除count個值為value的元素 jedis.lrem(“list1”,0,“張三”) 設定連結串列中角標為index的值 jedis.lset(“list1”,0,“張三啊”) 尾部元素彈出並新增到頭部 jedis.rpoplpush(“list1”,“list1”)
2.4 map
- 特點: 相當於java中hashMap集合
- 應用場景: 可以儲存javaBean物件, 此種使用場景不多,可被String替代
新增資料 jedis.hset(“map1”,“name”,“張三”) 獲取資料 jedis.hget(“map1”,“name”) 獲取所有 jedis.hgetAll(“map1”) 獲取所有key jedis.hkeys(“map1”) 獲取所有value jedis.hvals(“map1”) value自增 jedis.hincrBy(“map1”,“age”,10) 刪除key jedis.hdel(“map1”,“sex”)
2.5 set
- 特點: 唯一, 無序
- 應用場景: 集合運算
- 例如去重複的操作
新增資料 jedis.sadd(“set1”,“張三”,“李四”…) 獲取資料 jedis.smembers(“set1”) set是否存在該value jedis.sismember(“set1”,“張三”) 獲取set中value個數 jedis.scard() 兩個set的交集 jedis.sinter(“set1”,“set2”) 兩個set的並集 jedis.sunion(“set1”,“set2”) 兩個set的差集 jedis.sdiff(“set1”,“set2”) 兩個set的差集合併到另一個set中 jedis.sdiffstore(“set3”,“set1”,“set2”)
2.6 zSet
- 特點:唯一, 有序
- 應用場景: 一般用來做排行榜
新增資料 jedis.zadd(“zset”,100,“張三”) 升序獲取資料 jedis.zrange(“zset”,0,-1) 降序獲取資料 jedis.zrevrange(“zset”,0,-1) 獲取set中value的排名 jedis.zrank(“zset”,“張三”) 獲取set中value的分數(score) jedis.zscore(“zset”,“張三”) set中value的score自增 jedis.zincrby(“zset”,10,“張三”)
3.jedis連線池
//jedis的連線池
@Test
public void jedisForJedisPool(){
//1. 建立連線池的配置物件
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(100);
config.setMaxIdle(20);//最大的閒時的數量
config.setMinIdle(5); //最小閒時
//2. 建立jeids的連線池物件
JedisPool jedisPool = new JedisPool(config,"192.168.72.142",6379);
//3. 獲取連線
Jedis jedis = jedisPool.getResource();
System.out.println(jedis.ping());
//4. 歸還連線
jedis.close();
}
分散式爬蟲
1. 概述
分散式: 指的是將一個程式, 或者說一個業務, 拆分成不同的子專案或者子程式, 進行分開部署
叢集: 指的是將一個程式或者說一個業務, 重複部署多次
一般情況下 叢集 和 分散式 會同時出現
通俗描述:
小飯店原來只有一個廚師,切菜洗菜備料炒菜全乾。
後來客人多了,廚房一個廚師忙不過來,又請了個廚師,兩個廚師都能炒一樣的菜,這兩個廚師的關係是叢集。
為了讓廚師專心炒菜,把菜做到極致,又請了個配菜師負責切菜,備菜,備料,廚師和配菜師的關係是分散式。
一個配菜師也忙不過來了,又請了個配菜師,兩個配菜師關係是叢集。
從剛才的案例中, 請分析出, 叢集和分散式能解決什麼樣的問題?
- 主要解決單節點壓力過大
- 提高程式碼的複用性
- 降低模組間或者各個子系統的耦合性
2.代理IP
HttpHost proxy = new HttpHost("someproxy", port);
DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy);
CloseableHttpClient httpclient = HttpClients.custom()
.setRoutePlanner(routePlanner)
.build();
httpClientUtils
private static String execute(HttpRequestBase request) {
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(5000)// 設定建立連線的最長時間
.setConnectionRequestTimeout(5000)// 設定獲取連線的最長時間
.setSocketTimeout(10 * 1000)// 設定資料傳輸的最長時間
.build();
request.setConfig(requestConfig);
String html = null;
// 從redis中獲取代理IP
Jedis conn = JedisUtil.getConn();
// 從右邊彈出一個元素之後,從新放回左邊
List<String> ipkv = conn.brpop(0, "spider:ip");
// CloseableHttpClient httpClient = getHttpClient();
CloseableHttpClient httpClient = getProxyHttpClient(ipkv.get(1));
try {
CloseableHttpResponse res = httpClient.execute(request);
if (200 == res.getStatusLine().getStatusCode()) {
html = EntityUtils.toString(res.getEntity(), Charset.forName("utf-8"));
//請求成功之後,將代理IP放回去,下次繼續使用
conn.lpush("spider:ip", ipkv.get(1));
conn.close();
}
} catch (Exception e) {
System.out.println("請求失敗");
// TODO 需要開發自動重試功能
throw new RuntimeException(e);
}
return html;
}
private static PoolingHttpClientConnectionManager cm;
private static CloseableHttpClient getProxyHttpClient(String ipkv) {
String[] vals = ipkv.split(":");
System.out.println(vals);
HttpHost proxy = new HttpHost(vals[0], Integer.parseInt(vals[1]));
DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy);
return HttpClients.custom().setConnectionManager(connectionManager).setRoutePlanner(routePlanner).build();
}
其他
1. 下載圖片
public static void main(String[] args) throws Exception {
// 1.指定圖片的地址
String image = "http://www.itcast.cn/images/logo.png";
// 2.使用httpclient獲取資料
HttpGet httpGet = new HttpGet(image);
// 3.發起請求
CloseableHttpClient httpClient = HttpClients.createDefault();
// 4.執行請求
CloseableHttpResponse res = httpClient.execute(httpGet);
// 5.獲取二進位制資料
HttpEntity entity = res.getEntity();
InputStream inputStream = entity.getContent();
// 6.儲存圖片
File logo = new File("d:/logo.png");
BufferedOutputStream outputStreamWriter = new BufferedOutputStream(new FileOutputStream(logo));
// 7.讀資料寫入到檔案
byte[] buf = new byte[1024];
int size;
while (-1 != (size = inputStream.read(buf))) {
outputStreamWriter.write(buf, 0, size);
}
// 8.打掃戰場
outputStreamWriter.close();
inputStream.close();
}
2. 下載視訊
public static void main(String[] args) throws Exception {
// 1.指定視訊的地址
String image = "http://124.205.69.164/mp4files/92110000062AEFFF/d.itheima.com:81/dc/czxy/1114lkfzf.mp4";
// 2.使用httpclient獲取資料
HttpGet httpGet = new HttpGet(image);
// 3.發起請求
CloseableHttpClient httpClient = HttpClients.createDefault();
// 4.執行請求
CloseableHttpResponse res = httpClient.execute(httpGet);
// 5.獲取二進位制資料
HttpEntity entity = res.getEntity();
InputStream inputStream = entity.getContent();
// 6.儲存圖片
File logo = new File("c:/likaifu.mp4");
BufferedOutputStream outputStreamWriter = new BufferedOutputStream(new FileOutputStream(logo));
// 7.讀資料寫入到檔案
byte[] buf = new byte[1024];
int size;
while ((size = inputStream.read(buf)) != -1 ) {
outputStreamWriter.write(buf, 0, size);
}
// 8.打掃戰場
outputStreamWriter.close();
inputStream.close();
}
3. 簡單驗證碼識別
Tesseract-OCR支援中文識別,並且開源和提供全套的訓練工具,是快速低成本開發的首選。
- Tesseract的OCR引擎最先由HP實驗室於1985年開始研發,至1995年時已經成為OCR業內最準確的三款識別引擎之一。然而,HP不久便決定放棄OCR業務,Tesseract也從此塵封。數年以後,HP意識到,與其將Tesseract束之高閣,不如貢獻給開源軟體業,讓其重煥新生--2005年,Tesseract由美國內華達州資訊科技研究所獲得,並求諸於Google對Tesseract進行改進、消除Bug、優化工作。
- Tesseract目前已作為開源專案釋出在Google Project,其專案主頁在這裡檢視。
Tess4J則是Tesseract在Java PC上的應用
<!-- https://mvnrepository.com/artifact/net.sourceforge.tess4j/tess4j -->
<dependency>
<groupId>net.sourceforge.tess4j</groupId>
<artifactId>tess4j</artifactId>
<version>3.4.0</version>
</dependency>
public static void main(String[] args) {
File imageFile = new File("E:\\env\\tess4j\\input\\2017-12-09_153956.jpg");
Tesseract tessreact = new Tesseract();
// 需要指定訓練集 訓練集到 https://github.com/tesseract-ocr/tessdata 下載。
tessreact.setDatapath("E:\\env\\tess4j\\tessdata");
// 注意 預設是英文識別,如果做中文識別,需要單獨設定。
tessreact.setLanguage("chi_sim");
try {
String result = tessreact.doOCR(imageFile);
System.out.println(result);
} catch (TesseractException e) {
System.err.println(e.getMessage());
}
}
爬蟲程式碼
主伺服器解析列表頁
package com.hrh.master;
import com.hrh.utils.HttpClientUtils;
import com.hrh.utils.JedisUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
/**
* @author QuietHR
* @create 2018/9/22
**/
public class JDMaster {
public static void main(String[] args) throws IOException, InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
while(true){
Jedis jedis= JedisUtils.getJedis();
try {
Thread.sleep(1000);
//獲得隊列當前的大小
List<String> phonePids = jedis.lrange("phonePids", 0, -1);
System.out.println("當前redis中有"+phonePids.size()+"個pid");
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
jedis.close();
}
}
}}).start();
page();
}
public static void page() throws IOException, InterruptedException {
//分頁查詢手機資料 共100頁
for (int i = 1; i <=100 ; i++) {
//京東分頁page為 1 3 5 7 .....
// 對應第一頁 第二頁....
String url="https://search.jd.com/Search?keyword=%E6%89%8B%E6%9C%BA&enc=utf-8&page="+(2*i-1);
String html = HttpClientUtils.doGet(url);
parseIndex(html);
}
}
//解析手機列表頁
private static void parseIndex(String html) throws IOException, InterruptedException {
Document document = Jsoup.parse(html);
//手機列表
Elements elements = document.select("#J_goodsList>ul>li");
if(elements!=null||elements.size()!=0){
for (Element element : elements) {
//獲得每個li的pid
String pid = element.attr("data-pid");
//將pid放入佇列中
Jedis jedis= JedisUtils.getJedis();
jedis.lpush("phonePids",pid);
jedis.close();
}
}
}
}
從伺服器解析pid
package com.hrh.slave;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.hrh.dao.ProductDao;
import com.hrh.pojo.Product;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import redis.clients.jedis.Jedis;
import utils.HttpClientUtils;
import utils.JedisUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author QuietHR
* @create 2018/9/22
**/
public class JDSlave {
//建立dao物件
static ProductDao productDao = new ProductDao();
//建立執行緒池
static ExecutorService threadPool = Executors.newFixedThreadPool(35);
public static void main(String[] args) throws Exception {
//開啟30個執行緒去解析手機列表頁獲得的pids
for (int i = 1; i <=30; i++) {
threadPool.execute(new Runnable() {
@Override
public void run()