搜片神器 之DHT網路爬蟲的程式碼實現方法
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow
也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!
繼續接著第一篇寫:使用C#實現DHT磁力搜尋的BT種子後端管理程式+資料庫設計(開源)[搜片神器]
開源地址:
程式下載:H31DHT下載
看大家對昨天此類文章的興趣沒有第一篇高,今天就簡單的對支援的朋友進行交流.園子裡的朋友希望授大家以漁,所以這部分程式碼就先不放出來.希望大家更多的加入進來.
也希望誰有能力將C++的程式碼轉換成C#的,新增到我們的搜片神器工具裡面.
昨天通過向大家介紹DHT的工作原理,相信大家大概明白怎麼回事,不明白的朋友可以繼續分享接下來的文章.
本人借鑑的程式碼是C++版本的:transmission裡面的DHT程式碼,大家可以訪問網站下載:
不過裡面的程式碼環境是LINUX下的,需要自己轉換到相應的WIN平臺上來.
有興趣使用C#來完成DHT功能的朋友可以借鑑mono-monotorrent,裡面的框架程式碼比較多,不如C++的transmission裡面就三個檔案來得明白.
transmission裡面只有三個檔案就可以實現dht的功能: dht.c dht.h dht-example.c,並且介面很簡單,複用性很好。
下面介紹進入DHT網路主要功能步驟
dht.c dht.h程式碼分成三部分:
1、路由表的插入操作。
1)如果節點已經在路由表中,則更新節點,返回。
2)如果桶沒有滿,則插入,返回。
3)如果發現失效節點,替換,返回。
4)發現可疑節點,則儲存新節點到快取中並且如果該可疑節點沒有ping,發出ping_node操作,返回。
5)現在,桶已經充滿了好的節點,如果自己的ID沒有落在這個桶中,返回。
6)將桶空間分成兩半。跳到步驟1)。
2、KAD遠端處理呼叫。
這部分又分成3種,
1)ping/pong操作。
所有的包的tid都使用pg\0\0
2)find_node操作。
所有的包的tid都使用fn\0\0
3)get_peers/annouce_peer操作。
對同一個HASH的一次遞迴查詢中,tid保持不變。
其中只有3)種實現bittorrent的DHT規範裡面提到的遞迴查詢操作,1)和2)僅僅用來維護路由表,並且不儲存狀態。
3、定時器處理:
為了檢測路由表中節點的有效性(根據規範,路由表中應該只儲存有效節點),在程式碼中,在執行krpc操作時如果發現時對路由表中的節點操作,那麼則儲存操作的開始時間 pinged_time,通過操作的開始時間來判斷操作是否超時。
expire_stuff_time 超時時,會執行下面的操作:
1、檢查路由表中失效的節點(根據pinged_time來判定),並將該節點刪除。
2、檢查用來儲存annoounce_peer的節點是否超過30分鐘(這個不打算深入討論,故不做解析)。
3、檢查遞迴查詢操作超時。
rotate_secrets_time 定時器。
用來每隔大約15分左右就更換token(見DHT規範).
confirm_nodes_time 定時器。
查詢長期沒有活動的桶,然後通過執行一個find_node的krpc操作來重新整理它。
search_time定時器。
有可能出現發出的所有的get_peers操作,都沒有應答,那麼search_time定時器遇到這種情形時負責重發所有請求。(注意: get_peers操作最大未決的krpc請求數是3)
用於維持路由表的ping/pong操作:
在試圖插入節點時,發現桶已經滿,而存在可疑節點時會觸發ping_node操作。未響應的節點會有可疑最終變為失效節點,而被替換。
下面介紹我們是如何進入DHT網路
- DHT必須把自己電腦當伺服器,別人才能夠知道自己是誰,所以需要通過UDP繫結埠,參考程式碼裡面支援IPV6,個人覺得可以過濾掉.WIN平臺程式碼如下:
1 //初始化socket 2 m_soListen =(int)socket(PF_INET, SOCK_DGRAM, IPPROTO_IP); 3 if (m_soListen == INVALID_SOCKET) { 4 m_iErrorNo=WSAGetLastError(); 5 _dout(_T("CH31CarMonitorDlg Start Error(%d).\n"),m_iErrorNo); 6 return -1; 7 } 8 //初始化伺服器地址 9 SOCKADDR_IN addr;10 memset(&addr, 0, sizeof(addr));11 addr.sin_family = AF_INET;12 addr.sin_port = htons(port);13 addr.sin_addr.s_addr = htonl(INADDR_ANY);14 //繫結埠監聽15 if (bind(m_soListen, (SOCKADDR*)&addr, sizeof(addr)) == SOCKET_ERROR) {16 m_iErrorNo=WSAGetLastError();17 _dout(_T("CH31CarMonitorDlg Start Error(%d).\n"),m_iErrorNo);18 return -2;19 }
UDP埠繫結 - DHT需要生成一個自己的20位ID號,當然可以通過隨機一個數值,然後通過SHA1來生成20位的ID號,WIN平臺程式碼如下:
1 unsigned char p[20];2 CSHA1 sha1;3 sha1.Reset();4 sha1.Update((const unsigned char *)m_myID.GetBuffer(), m_myID.GetLength());5 sha1.Final();6 sha1.GetHash(p);
SHA1生成ID號 - 初始化他人伺服器的IP資訊,這樣我們就可以從他們那裡查詢我們要的資訊,借鑑程式碼如下:
1 rc = getaddrinfo("router.utorrent.com","6881", &hints1, &info); 2 //rc = getaddrinfo("router.bittorrent.com","6881", &hints1, &info); 3 //rc = getaddrinfo("dht.transmissionbt.com","6881", &hints1, &info); 4 if(rc != 0) { 5 fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rc)); 6 exit(1); 7 } 8 infop = info; 9 while(infop&&m_bDataThread) 10 {11 memcpy(&bootstrap_nodes[num_bootstrap_nodes],infop->ai_addr, infop->ai_addrlen);12 infop = infop->ai_next;13 num_bootstrap_nodes++;14 }15 freeaddrinfo(info);
伺服器資訊 - 現在就可以初始化我們的DHT類了.由於此類使用C寫的,大家可以自行封裝成C++類使用.
1 rc = m_dht.dht_init(s, s6, m_myid,NULL);2 if(rc < 0) {3 perror("dht_init");4 exit(1);5 }
初始化DHT類 - 對伺服器進行PING操作,伺服器就會迴應PONG操作,這樣就表明伺服器活動正常.
1 for(int i = 0; i < num_bootstrap_nodes&&m_bDataThread; i++) 2 {3 m_dht.dht_ping_node((struct sockaddr*)&bootstrap_nodes[i],sizeof(bootstrap_nodes[i]));4 Sleep(m_dht.random() % 1000);5 }
PING伺服器 - 下面就可以使用搜索類進行操作,查詢我們要的HASH值的BT種子檔案程式碼.借鑑程式碼如下:
1 if(searching) {2 if(s >= 0)3 dht_search(hash, 0, AF_INET, callback, NULL);4 if(s6 >= 0)5 dht_search(hash, 0, AF_INET6, callback, NULL);6 searching = 0;7 }
dht_search - 大家可以借鑑dht-example.c裡面接下來的Search函式的操作,不過我們不是這樣來的,我們需要直接向伺服器傳送Findnode和Get_Peer操作.
1 unsigned char tid[16];2 m_dht.make_tid(tid, "fn", 0);3 m_dht.send_find_node(&ipRecvPingList[ipListPOS].fromaddr,sizeof(sockaddr),tid,4,ipRecvPingList[ipListPOS].ID,0,0);4 Sleep(100);5 memset(tid,0,sizeof(tid));6 m_dht.make_tid(tid, "gp", 0);7 m_dht.send_get_peers(&ipRecvPingList[ipListPOS].fromaddr,sizeof(sockaddr),tid,4,hashList[0],0,0);
傳送FINDNODE和GET_PEER操作 - 接下來的事情就是等待別人返回的資訊進行分析就可以了,當然DHT類程式碼已經全部為我們做好的.
1 FD_ZERO(&readfds); 2 if(m_soListen >= 0) 3 FD_SET(m_soListen, &readfds); 4 if(s6 >= 0) 5 FD_SET(s6, &readfds); 6 rc = select(m_soListen > s6 ? m_soListen + 1 : s6 + 1, &readfds, NULL, NULL, &tv); 7 if(rc <0&&m_bDataThread) 8 { 9 if(errno != EINTR) {10 perror("select");11 Sleep(1000);12 }13 }14 15 if(!m_bDataThread)16 break;17 18 if(rc > 0&&m_bDataThread) 19 {20 fromlen = sizeof(from1);21 memset(buf,0,sizeof(buf));22 if(m_soListen >= 0 && FD_ISSET(m_soListen, &readfds))23 rc = recvfrom(m_soListen, buf, sizeof(buf) - 1, 0,&from1, &fromlen);24 else if(s6 >= 0 && FD_ISSET(s6, &readfds))25 rc = recvfrom(s6, buf, sizeof(buf) - 1, 0,&from1, &fromlen);26 else27 abort();28 }29 30 if(rc > 0&&m_bDataThread) 31 {32 buf[rc] = '\0';33 rc = m_dht.dht_periodic(buf, rc, &from1, fromlen,&tosleep, DHT_callback, this);34 35 } 36 else 37 {38 rc = m_dht.dht_periodic(NULL, 0, NULL, 0, &tosleep, DHT_callback, this);39 }
等待返回DHT網路資訊 - 如何解析資訊DHT程式碼已經有了,如何別人的請求,程式碼也已經有了,大家可以分析DHT.c就知道是怎麼回事.
1 int CDHT::dht_periodic(const void *buf, size_t buflen,const struct sockaddr *fromAddr, int fromlen,time_t *tosleep,dht_callback *callback, void *closure) 2 { 3 gettimeofday(&nowTime, NULL); 4 5 if(buflen > 0) 6 { 7 int message; 8 unsigned char tid[16], id[20], info_hash[20], target[20]; 9 unsigned char nodes[256], nodes6[1024], token[128]; 10 int tid_len = 16, token_len = 128; 11 int nodes_len = 256, nodes6_len = 1024; 12 unsigned short port; 13 unsigned char values[2048], values6[2048]; 14 int values_len = 2048, values6_len = 2048; 15 int want; 16 unsigned short ttid; 17 18 struct sockaddr_in* tempip=(struct sockaddr_in *)fromAddr; 19 20 if(is_martian(fromAddr)) 21 goto dontread; 22 23 if(node_blacklisted(fromAddr, fromlen)) { 24 _dout("Received packet from blacklisted node.\n"); 25 goto dontread; 26 } 27 28 if(((char*)buf)[buflen] != '\0') { 29 _dout("Unterminated message.\n"); 30 errno = EINVAL; 31 return -1; 32 } 33 34 message = parse_message((unsigned char *)buf, buflen, tid, &tid_len, id, info_hash,target, &port, token, &token_len,nodes, &nodes_len, nodes6, &nodes6_len,values, &values_len, values6, &values6_len,&want); 35 36 if(token_len>0) 37 { 38 int a=0; 39 } 40 if(message < 0 || message == ERROR || id_cmp(id, zeroes) == 0) 41 { 42 _dout("Unparseable message: "); 43 debug_printable((const unsigned char *)buf, buflen); 44 _dout("\n"); 45 goto dontread; 46 } 47 48 if(id_cmp(id, myid) == 0) { 49 _dout("Received message from self.\n"); 50 goto dontread; 51 } 52 53 if(message > REPLY) { 54 /* Rate limit requests. */ 55 if(!token_bucket()) { 56 _dout("Dropping request due to rate limiting.\n"); 57 goto dontread; 58 } 59 } 60 61 switch(message) 62 { 63 case REPLY: 64 if(tid_len != 4) 65 { 66 _dout("Broken node truncates transaction ids: "); 67 debug_printable((const unsigned char *)buf, buflen); 68 _dout("\n"); 69 /* This is really annoying, as it means that we will 70 time-out all our searches that go through this node. 71 Kill it. */ 72 blacklist_node(id, fromAddr, fromlen); 73 goto dontread; 74 } 75 if(tid_match(tid, "pn", NULL)) 76 { 77 _dout("Pong!From IP:%s:[%d] id:[%s]\n",inet_ntoa(tempip->sin_addr),tempip->sin_port,id); 78 new_node(id, fromAddr, fromlen, 2); 79 (*callback)(closure, DHT_EVENT_PONG_VALUES,id,(void*)fromAddr, fromlen); 80 //send_find_node(from,fromlen,tid,4,id,0,0); 81 } 82 else if(tid_match(tid, "fn", NULL) ||tid_match(tid, "gp", NULL)) 83 { 84 int gp = 0; 85 struct search *sr = NULL; 86 if(tid_match(tid, "gp", &ttid)) 87 { 88 gp = 1; 89 sr = find_search(ttid, fromAddr->sa_family); 90 } 91 _dout("Nodes found (%d+%d)%s!From IP:%s:[%d]\n", nodes_len/26, nodes6_len/38,gp ? " for get_peers" : "",inet_ntoa(tempip->sin_addr),tempip->sin_port); 92 if(nodes_len % 26 != 0 || nodes6_len % 38 != 0) 93 { 94 _dout("Unexpected length for node info!\n"); 95 blacklist_node(id, fromAddr, fromlen); 96 } 97 //else if(gp && sr == NULL) 98 //{ 99 // _dout("Unknown search!\n");100 // new_node(id, fromAddr, fromlen, 1);101 // } 102 else 103 {104 int i;105 new_node(id, fromAddr, fromlen, 2);106 for(i = 0; i < nodes_len / 26; i++) 107 {108 unsigned char *ni = nodes + i * 26;109 struct sockaddr_in sin;110 if(id_cmp(ni, myid) == 0)111 continue;112 memset(&sin, 0, sizeof(sin));113 sin.sin_family = AF_INET;114 memcpy(&sin.sin_addr, ni + 20, 4);115 memcpy(&sin.sin_port, ni + 24, 2);116 new_node(ni, (struct sockaddr*)&sin, sizeof(sin), 0);117 (*callback)(closure, DHT_EVENT_FINDNODE_VALUES, ni,(void*)&sin, sizeof(sin));118 if(sr && sr->af == AF_INET) 119 {120 insert_search_node(ni,(struct sockaddr*)&sin,sizeof(sin),sr, 0, NULL, 0);121 }122 //send_get_peers((struct sockaddr*)&sin,sizeof(sockaddr),tid,4,ni,0,0);123 }124 for(i = 0; i < nodes6_len / 38; i++) 125 {126 unsigned char *ni = nodes6 + i * 38;127 struct sockaddr_in6 sinip6;128 if(id_cmp(ni, myid) == 0)129 continue;130 memset(&sinip6, 0, sizeof(sinip6));131 sinip6.sin6_family = AF_INET6;132 memcpy(&sinip6.sin6_addr, ni + 20, 16);133 memcpy(&sinip6.sin6_port, ni + 36, 2);134 new_node(ni, (struct sockaddr*)&sinip6, sizeof(sinip6), 0);135 if(sr && sr->af == AF_INET6) 136 {137 insert_search_node(ni,(struct sockaddr*)&sinip6,sizeof(sinip6),sr, 0, NULL, 0);138 }139 }140 if(sr)141 /* Since we received a reply, the number of requests in flight has decreased. Let's push another request. */142 search_send_get_peers(sr, NULL);143 }144 //if(sr) 145 {146 // insert_search_node(id, fromAddr, fromlen, sr,1, token, token_len);147 if(values_len > 0 || values6_len > 0) 148 {149 _dout("Got values (%d+%d)!\n", values_len / 6, values6_len / 18);150 if(callback) {151 if(values_len > 0)152 (*callback)(closure, DHT_EVENT_VALUES, sr->id,(void*)values, values_len);153 154 if(values6_len > 0)155 (*callback)(closure, DHT_EVENT_VALUES6, sr->id,(void*)values6, values6_len);156 }157 }158 }159 } 160 else if(tid_match(tid, "ap", &ttid)) 161 {162 struct search *sr;163 _dout("Got reply to announce_peer.\n");164 sr = find_search(ttid, fromAddr->sa_family);165 if(!sr) {166 _dout("Unknown search!\n");167 new_node(id, fromAddr, fromlen, 1);168 } 169 else 170 {171 int i;172 new_node(id, fromAddr, fromlen, 2);173 for(i = 0; i < sr->numnodes; i++)174 {175 if(id_cmp(sr->nodes[i].id, id) == 0) 176 {177 sr->nodes[i].request_time = 0;178 sr->nodes[i].reply_time = nowTime.tv_sec;179 sr->nodes[i].acked = 1;180 sr->nodes[i].pinged = 0;181 break;182 }183 }184 /* See comment for gp above. */185 search_send_get_peers(sr, NULL);186 }187 } 188 else 189 {190 _dout("Unexpected reply: ");191 debug_printable((const unsigned char *)buf, buflen);192 _dout("\n");193 }194 break;195 case PING:196 _dout("Ping (%d)!From IP:%s:%d\n", tid_len,inet_ntoa(tempip->sin_addr),tempip->sin_port);197 new_node(id, fromAddr, fromlen, 1);198 _dout("Sending pong.\n");199 send_pong(fromAddr, fromlen, tid, tid_len);200 break;201 case FIND_NODE:202 _dout("Find node!From IP:%s:%d\n",inet_ntoa(tempip->sin_addr),tempip->sin_port);203 new_node(id, fromAddr, fromlen, 1);204 _dout("Sending closest nodes (%d).\n", want);205 send_closest_nodes(fromAddr, fromlen,tid, tid_len, target, want,0, NULL, NULL, 0);206 break;207 case GET_PEERS:208 _dout("Get_peers!From IP:%s:%d\n",inet_ntoa(tempip->sin_addr),tempip->sin_port);209 new_node(id, fromAddr, fromlen, 1);210 if(id_cmp(info_hash, zeroes) == 0) 211 {212 _dout("Eek! Got get_peers with no info_hash.\n");213 send_error(fromAddr, fromlen, tid, tid_len,203, "Get_peers with no info_hash");214 break;215 } 216 else 217 {218 struct storage *st = find_storage(info_hash);219 unsigned char token[TOKEN_SIZE];220 make_token(fromAddr, 0, token);221 if(st && st->numpeers > 0) 222 {223 _dout("Sending found%s peers.\n",fromAddr->sa_family == AF_INET6 ? " IPv6" : "");224 send_closest_nodes(fromAddr, fromlen,tid, tid_len,info_hash, want,fromAddr->sa_family, st,token, TOKEN_SIZE);225 } 226 else 227 {228 _dout("Sending nodes for get_peers.\n");229 send_closest_nodes(fromAddr, fromlen,tid, tid_len, info_hash, want,0, NULL, token, TOKEN_SIZE);230 }231 if(callback) 232 {233 (*callback)(closure, DHT_EVENT_GET_PEER_VALUES, info_hash,(void *)fromAddr, fromlen);234 }235 }236 237 break;238 case ANNOUNCE_PEER:239 _dout("Announce peer!From IP:%s:%d\n",inet_ntoa(tempip->sin_addr),tempip->sin_port);240 new_node(id, fromAddr, fromlen, 1);241 242 if(id_cmp(info_hash, zeroes) == 0) 243 {244 &nb