Linux下多執行緒檔案傳輸
阿新 • • 發佈:2019-01-28
要求:服務端客戶端分辨各佔一個程序,客戶端中可設定TCP連線數n,之後將檔案等分成n塊同時傳輸。
思路: 在網上查到了許多關於Linux下socket檔案傳輸的文章,受益許多,其中有個部落格寫的很好
連結:http://blog.csdn.net/zhqianpeng/article/details/46489959
於是可以在客戶端中根據n進行檔案分包,之後建立n個執行緒傳輸,服務端建立相應n個執行緒完成接收。
客戶端程式碼
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/stat.h>
#define SERV_PORT 8888
#define BUFFER_SIZE 4096
typedef struct
{
long long cur;
int size;
}thread_data;
int filefd;
int file_block_size = 0;
struct sockaddr_in server_address;
void *sender(void *s)
{
thread_data tdata = *((thread_data*)s);
long long cur = tdata.cur;
int size = tdata.size;
char buf[BUFFER_SIZE] = {0 };
int read_count;
char head_buf[29] = {0};
snprintf(head_buf,29,"%016lld:%011d",cur,size);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
perror("socket error");
exit(EXIT_SUCCESS);
}
if (connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0)
{
exit(EXIT_FAILURE);
}
send(sockfd, &head_buf, strlen(head_buf), 0);
long long read_size = 0;
while (size)
{
read_count = pread(filefd, buf,(sizeof(buf) < size)?sizeof(buf):size, cur + read_size);
if (read_count < 0 && errno == EINTR)
{ //pread 是原子操作
puts("break by signal");
continue;
}
else if (read_count == 0)
{
break;
}
else if(read_count < 0)
{
perror("pread"); exit(1);
}
send(sockfd, buf, read_count, 0);
size -= read_count;
read_size += read_count;
}
close(sockfd);
printf("cur = %lld, end\n",tdata.cur);
free(s);
pthread_exit(&sockfd);
}
int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
if (argc != 3)
{
fprintf(stderr, "please input: %s N filename\n", argv[0]);
exit(1);
}
printf("sizeof(off_t) = %lu\n",sizeof(off_t)); //開啟巨集開關後 32系統該值為8位元組
const int n = atoi(argv[1]);
const char* filename = argv[2];
struct stat statbuf;
if (lstat(filename, &statbuf) < 0)
{
perror("lstat error");
exit(EXIT_FAILURE);
}
long long file_len = (long long)statbuf.st_size;
printf("file len: %lld\n",file_len);
file_block_size=file_len/n;
printf("file_block_size = %d\n", file_block_size);
int sockfd;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
perror("socket error");
exit(EXIT_SUCCESS);
}
bzero(&server_address, sizeof(server_address));
server_address.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &server_address.sin_addr.s_addr);
server_address.sin_port = htons(SERV_PORT);
if (connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0)
{
printf("connect error");
exit(EXIT_FAILURE);
}
int thread_number=htonl(n);
int ret = send(sockfd, &thread_number, sizeof(thread_number), 0);
if (ret < 0)
{
perror("send error");
exit(EXIT_FAILURE);
}
close(sockfd);
struct timeval start, end;
gettimeofday(&start, NULL);
pthread_t* tid = (pthread_t*)malloc(n * sizeof(pthread_t));
//記得釋放
if ( (filefd = open(filename, O_RDONLY)) < 0)
{
perror("open error");
exit(EXIT_FAILURE);
} //分塊
long long i;
for ( i = 0; i < n; ++i)
{
thread_data* data = (thread_data*)malloc(sizeof(thread_data));
if (i == n - 1 )
{
data->cur = (long long)i * file_block_size;
printf("data->cur = %lld\n",data->cur);
data->size = file_len-(long long)i * file_block_size;
}
else
{
data->cur = i * file_block_size;
data->size = file_block_size;
}
pthread_create(&tid[i], NULL, sender, (void*)data);
}
for(i=0;i<n;i++)
{
void* ret;
pthread_join(tid[i], &ret);
printf("the thread %d connecttion socket finished sending\n", *(int*)ret);
}
close(filefd);
free(tid);
gettimeofday(&end, NULL);
double timeuse = 1000000*(end.tv_sec - start.tv_sec) + end.tv_usec - start.tv_usec;
timeuse /= 1000000;
printf("run time = %f\n", timeuse);
exit(EXIT_SUCCESS);
}
服務端程式碼
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/stat.h>
#define SERV_PORT 8888
#define BUFFER_SIZE 4096
#define MAX_LISTEN 2048
typedef struct
{
int socket;
int i;
}thread_data;
int filefd;
long long ato_ll(const char* p_str)
{
long long result = 0;
long long mult = 1;
unsigned int len = strlen(p_str);
unsigned int i;
for (i=0; i<len; ++i)
{
char the_char = p_str[len-(i+1)];
long long val;
if (the_char < '0' || the_char > '9')
{
return 0;
}
val = the_char - '0';
val *= mult;
result += val;
mult *= 10;
}
return result;
}
void* receive(void* s)
{
thread_data tdata = *((thread_data*)s);
int sock=tdata.socket;
int thread_order=tdata.i;
printf("thread_order = %d\n",thread_order);
char buf[BUFFER_SIZE];
char head_buf[29] = {0};
int ret = recv(sock, head_buf, sizeof(head_buf) - 1, MSG_WAITALL);
if (ret < 0)
{
fprintf(stderr, "thread %d recv error number %d: %s\n",
thread_order, errno, strerror(errno));
exit(EXIT_FAILURE);
}
char* cur_ptr = head_buf;
char* bk = strchr(head_buf,':');
if(bk!=NULL)
{
*bk = '\0';
}
char* size_ptr = bk + 1;
long long cur = ato_ll(cur_ptr);
int size = atoi(size_ptr);
printf("thread %d cur = %lld size = %d\n",thread_order,cur,size);
while (size)
{
ret = read(sock, buf, BUFFER_SIZE);
if (ret < 0 && errno ==EINTR)
{
puts("break by signal");
continue;
}
else if (ret == 0)
{
break;
}
else if(ret < 0)
{
perror("read");
exit(1);
}
if(pwrite(filefd, buf, ret, cur) < 0)
{
perror("pwrite");
exit(1);
}
cur += ret;
size -= ret;
}
close(sock);
fprintf(stderr, "thread %d finished receiving\n", thread_order);
free(s);
pthread_exit(&thread_order);
}
int main(void)
{
struct sockaddr_in server_address;
memset(&server_address, 0, sizeof(server_address));
server_address.sin_family = AF_INET;
server_address.sin_port = htons(SERV_PORT);
server_address.sin_addr.s_addr = INADDR_ANY;
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0)
{
perror("socket error");
exit(EXIT_FAILURE);
}
int reuse = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
{
perror("setsockopt SO_REUSEADDR error");
exit(EXIT_FAILURE);
}
if (bind(listenfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0)
{
perror("bind error");
exit(EXIT_FAILURE);
}
if (listen(listenfd, MAX_LISTEN) < 0)
{
perror("listen error");
exit(EXIT_FAILURE);
}
else
{
printf("listen success\n");
}
int connfd = accept(listenfd, NULL, NULL);
if (connfd < 0)
{
perror("accept error");
exit(EXIT_FAILURE);
}
int thread_number = 0;
int ret = recv(connfd, &thread_number, sizeof(thread_number), MSG_WAITALL);
close(connfd);
if (ret < 0)
{
perror("recv MSG_WAITALL error");
exit(EXIT_FAILURE);
}
thread_number = ntohl(thread_number);
printf("thread_number = %d\n",thread_number);
if ( (filefd = open("receive_file", O_WRONLY | O_CREAT |O_TRUNC, 0777)) < 0)
{
perror("open error");
exit(EXIT_FAILURE);
}
else
{
printf("open success\n");
}
struct timeval start, end;
gettimeofday(&start, NULL);
pthread_t* tid = (pthread_t*)malloc(thread_number * sizeof(pthread_t));
if(tid==NULL)
{
perror("malloc::");
exit(1);
}
printf("thread_number = %d\n",thread_number);
int i=0;
for ( i = 0; i < thread_number; ++i)
{
connfd = accept(listenfd, NULL, NULL);
if (connfd < 0)
{
perror("accept error");
exit(EXIT_FAILURE);
}
thread_data* data = (thread_data*)malloc(sizeof(thread_data));
data->socket=connfd;
data->i=i;
pthread_create(&tid[i], NULL, receive, (void*)data);
}
for(i=0;i<thread_number;i++)
{
char *ret;
pthread_join(tid[i], (void**)&ret);
printf("thread %d finished receiving\n", i);
}
close(connfd);
close(listenfd);
free(tid);
gettimeofday(&end, NULL);
double timeuse = 1000000*(end.tv_sec - start.tv_sec) + end.tv_usec - start.tv_usec;
timeuse /= 1000000;
printf("run time = %f\n", timeuse);
exit(EXIT_SUCCESS);
}
檔案829.3MB
伺服器完成時間
N=1 14.945732
N=10 14.414628
N=100 14.995387
N=1000 32.775722 //伺服器建立執行緒太多,容易崩潰,需要改進