1. 程式人生 > >Linux下多執行緒檔案傳輸

Linux下多執行緒檔案傳輸

要求:服務端客戶端分辨各佔一個程序,客戶端中可設定TCP連線數n,之後將檔案等分成n塊同時傳輸。
思路: 在網上查到了許多關於Linux下socket檔案傳輸的文章,受益許多,其中有個部落格寫的很好
連結:http://blog.csdn.net/zhqianpeng/article/details/46489959
於是可以在客戶端中根據n進行檔案分包,之後建立n個執行緒傳輸,服務端建立相應n個執行緒完成接收。

客戶端程式碼
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <sys/types.h> #include <sys/socket.h> #include <sys/time.h> #include <netinet/in.h> #include <arpa/inet.h> #include <pthread.h> #include <fcntl.h> #include <errno.h> #include <sys/stat.h> #define SERV_PORT 8888 #define BUFFER_SIZE 4096
typedef struct { long long cur; int size; }thread_data; int filefd; int file_block_size = 0; struct sockaddr_in server_address; void *sender(void *s) { thread_data tdata = *((thread_data*)s); long long cur = tdata.cur; int size = tdata.size; char buf[BUFFER_SIZE] = {0
}; int read_count; char head_buf[29] = {0}; snprintf(head_buf,29,"%016lld:%011d",cur,size); int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { perror("socket error"); exit(EXIT_SUCCESS); } if (connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { exit(EXIT_FAILURE); } send(sockfd, &head_buf, strlen(head_buf), 0); long long read_size = 0; while (size) { read_count = pread(filefd, buf,(sizeof(buf) < size)?sizeof(buf):size, cur + read_size); if (read_count < 0 && errno == EINTR) { //pread 是原子操作 puts("break by signal"); continue; } else if (read_count == 0) { break; } else if(read_count < 0) { perror("pread"); exit(1); } send(sockfd, buf, read_count, 0); size -= read_count; read_size += read_count; } close(sockfd); printf("cur = %lld, end\n",tdata.cur); free(s); pthread_exit(&sockfd); } int main(int argc, char *argv[]) { struct sockaddr_in servaddr; if (argc != 3) { fprintf(stderr, "please input: %s N filename\n", argv[0]); exit(1); } printf("sizeof(off_t) = %lu\n",sizeof(off_t)); //開啟巨集開關後 32系統該值為8位元組 const int n = atoi(argv[1]); const char* filename = argv[2]; struct stat statbuf; if (lstat(filename, &statbuf) < 0) { perror("lstat error"); exit(EXIT_FAILURE); } long long file_len = (long long)statbuf.st_size; printf("file len: %lld\n",file_len); file_block_size=file_len/n; printf("file_block_size = %d\n", file_block_size); int sockfd; sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { perror("socket error"); exit(EXIT_SUCCESS); } bzero(&server_address, sizeof(server_address)); server_address.sin_family = AF_INET; inet_pton(AF_INET, "127.0.0.1", &server_address.sin_addr.s_addr); server_address.sin_port = htons(SERV_PORT); if (connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { printf("connect error"); exit(EXIT_FAILURE); } int thread_number=htonl(n); int ret = send(sockfd, &thread_number, sizeof(thread_number), 0); if (ret < 0) { perror("send error"); exit(EXIT_FAILURE); } close(sockfd); struct timeval start, end; gettimeofday(&start, NULL); pthread_t* tid = (pthread_t*)malloc(n * sizeof(pthread_t)); //記得釋放 if ( (filefd = open(filename, O_RDONLY)) < 0) { perror("open error"); exit(EXIT_FAILURE); } //分塊 long long i; for ( i = 0; i < n; ++i) { thread_data* data = (thread_data*)malloc(sizeof(thread_data)); if (i == n - 1 ) { data->cur = (long long)i * file_block_size; printf("data->cur = %lld\n",data->cur); data->size = file_len-(long long)i * file_block_size; } else { data->cur = i * file_block_size; data->size = file_block_size; } pthread_create(&tid[i], NULL, sender, (void*)data); } for(i=0;i<n;i++) { void* ret; pthread_join(tid[i], &ret); printf("the thread %d connecttion socket finished sending\n", *(int*)ret); } close(filefd); free(tid); gettimeofday(&end, NULL); double timeuse = 1000000*(end.tv_sec - start.tv_sec) + end.tv_usec - start.tv_usec; timeuse /= 1000000; printf("run time = %f\n", timeuse); exit(EXIT_SUCCESS); } 服務端程式碼 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/time.h> #include <netinet/in.h> #include <arpa/inet.h> #include <pthread.h> #include <fcntl.h> #include <errno.h> #include <sys/stat.h> #define SERV_PORT 8888 #define BUFFER_SIZE 4096 #define MAX_LISTEN 2048 typedef struct { int socket; int i; }thread_data; int filefd; long long ato_ll(const char* p_str) { long long result = 0; long long mult = 1; unsigned int len = strlen(p_str); unsigned int i; for (i=0; i<len; ++i) { char the_char = p_str[len-(i+1)]; long long val; if (the_char < '0' || the_char > '9') { return 0; } val = the_char - '0'; val *= mult; result += val; mult *= 10; } return result; } void* receive(void* s) { thread_data tdata = *((thread_data*)s); int sock=tdata.socket; int thread_order=tdata.i; printf("thread_order = %d\n",thread_order); char buf[BUFFER_SIZE]; char head_buf[29] = {0}; int ret = recv(sock, head_buf, sizeof(head_buf) - 1, MSG_WAITALL); if (ret < 0) { fprintf(stderr, "thread %d recv error number %d: %s\n", thread_order, errno, strerror(errno)); exit(EXIT_FAILURE); } char* cur_ptr = head_buf; char* bk = strchr(head_buf,':'); if(bk!=NULL) { *bk = '\0'; } char* size_ptr = bk + 1; long long cur = ato_ll(cur_ptr); int size = atoi(size_ptr); printf("thread %d cur = %lld size = %d\n",thread_order,cur,size); while (size) { ret = read(sock, buf, BUFFER_SIZE); if (ret < 0 && errno ==EINTR) { puts("break by signal"); continue; } else if (ret == 0) { break; } else if(ret < 0) { perror("read"); exit(1); } if(pwrite(filefd, buf, ret, cur) < 0) { perror("pwrite"); exit(1); } cur += ret; size -= ret; } close(sock); fprintf(stderr, "thread %d finished receiving\n", thread_order); free(s); pthread_exit(&thread_order); } int main(void) { struct sockaddr_in server_address; memset(&server_address, 0, sizeof(server_address)); server_address.sin_family = AF_INET; server_address.sin_port = htons(SERV_PORT); server_address.sin_addr.s_addr = INADDR_ANY; int listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { perror("socket error"); exit(EXIT_FAILURE); } int reuse = 1; if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) { perror("setsockopt SO_REUSEADDR error"); exit(EXIT_FAILURE); } if (bind(listenfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { perror("bind error"); exit(EXIT_FAILURE); } if (listen(listenfd, MAX_LISTEN) < 0) { perror("listen error"); exit(EXIT_FAILURE); } else { printf("listen success\n"); } int connfd = accept(listenfd, NULL, NULL); if (connfd < 0) { perror("accept error"); exit(EXIT_FAILURE); } int thread_number = 0; int ret = recv(connfd, &thread_number, sizeof(thread_number), MSG_WAITALL); close(connfd); if (ret < 0) { perror("recv MSG_WAITALL error"); exit(EXIT_FAILURE); } thread_number = ntohl(thread_number); printf("thread_number = %d\n",thread_number); if ( (filefd = open("receive_file", O_WRONLY | O_CREAT |O_TRUNC, 0777)) < 0) { perror("open error"); exit(EXIT_FAILURE); } else { printf("open success\n"); } struct timeval start, end; gettimeofday(&start, NULL); pthread_t* tid = (pthread_t*)malloc(thread_number * sizeof(pthread_t)); if(tid==NULL) { perror("malloc::"); exit(1); } printf("thread_number = %d\n",thread_number); int i=0; for ( i = 0; i < thread_number; ++i) { connfd = accept(listenfd, NULL, NULL); if (connfd < 0) { perror("accept error"); exit(EXIT_FAILURE); } thread_data* data = (thread_data*)malloc(sizeof(thread_data)); data->socket=connfd; data->i=i; pthread_create(&tid[i], NULL, receive, (void*)data); } for(i=0;i<thread_number;i++) { char *ret; pthread_join(tid[i], (void**)&ret); printf("thread %d finished receiving\n", i); } close(connfd); close(listenfd); free(tid); gettimeofday(&end, NULL); double timeuse = 1000000*(end.tv_sec - start.tv_sec) + end.tv_usec - start.tv_usec; timeuse /= 1000000; printf("run time = %f\n", timeuse); exit(EXIT_SUCCESS); }

檔案829.3MB
      伺服器完成時間
N=1     14.945732
N=10    14.414628
N=100    14.995387
N=1000   32.775722 //伺服器建立執行緒太多,容易崩潰,需要改進