2018-2019-1 實驗三 並發程序
實驗三 並發程序
任務一
學習使用Linux命令wc(1)
基於Linux Socket程序設計實現wc(1)服務器(端口號是你學號的後6位)和客戶端
客戶端傳一個文本文件給服務器
服務器返加文本文件中的單詞數
server:
include <netinet/in.h>
include <sys/types.h>
include <sys/socket.h>
include <stdio.h>
include <stdlib.h>
include <string.h>
include <unistd.h>
define HELLO_WORLD_SERVER_PORT 5213
define LENGTH_OF_LISTEN_QUEUE 20
define BUFFER_SIZE 1024
define FILE_NAME_MAX_SIZE 512
define FILE_WORDS_NUMBER 32
int wc_func(char *file_name);
int main(int argc, char **argv)
{
struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htons(INADDR_ANY);
server_addr.sin_port = htons(HELLO_WORLD_SERVER_PORT);
int server_socket = socket(PF_INET,SOCK_STREAM,0); if( server_socket < 0) { printf("Create Socket Failed!"); exit(1); } if( bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr))) { printf("Server Bind Port : %d Failed!", HELLO_WORLD_SERVER_PORT); exit(1); } if ( listen(server_socket, LENGTH_OF_LISTEN_QUEUE) ) { printf("Server Listen Failed!"); exit(1); } while (1) { struct sockaddr_in client_addr; socklen_t length = sizeof(client_addr); int new_server_socket = accept(server_socket,(struct sockaddr*)&client_addr,&length); if ( new_server_socket < 0) { printf("Server Accept Failed!\n"); break; } char file_name[FILE_NAME_MAX_SIZE+1]; bzero(file_name, FILE_NAME_MAX_SIZE+1); char buffer[BUFFER_SIZE]; bzero(buffer,BUFFER_SIZE); recv(new_server_socket,file_name,BUFFER_SIZE,0); FILE * fp = fopen(file_name,"w"); if(NULL == fp ) { printf("File:\t%s Can Not Open To Write\n", file_name); exit(1); } bzero(buffer,BUFFER_SIZE); int len = 0; while( len = recv(new_server_socket,buffer,BUFFER_SIZE,0)) { if(len < 0) { printf("Recieve Data From Client %s Failed!\n", argv[1]); break; } int write_length = fwrite(buffer,sizeof(char),len,fp); if (write_length<len) { printf("File:\t%s Write Failed\n", file_name); break; } bzero(buffer,BUFFER_SIZE); } printf("File:\t%s Transfer Finished!\n",file_name); fclose(fp); close(new_server_socket); } close(server_socket); return 0;
}
client:
include <netinet/in.h>
include <sys/types.h>
include <sys/socket.h>
include <stdio.h>
include <stdlib.h>
include <string.h>
include <unistd.h>
define HELLO_WORLD_SERVER_PORT 5213
define BUFFER_SIZE 1024
define FILE_NAME_MAX_SIZE 512
define FILE_WORDS_NUMBER 32
int main(int argc, char **argv)
{
if (argc != 2)
{
printf("Usage: ./%s ServerIPAddress\n",argv[0]);
exit(1);
}
struct sockaddr_in client_addr;
bzero(&client_addr,sizeof(client_addr));
client_addr.sin_family = AF_INET;
client_addr.sin_addr.s_addr = htons(INADDR_ANY);//INADDR_ANY表示自動獲取本機地址
client_addr.sin_port = htons(0);
int client_socket = socket(AF_INET,SOCK_STREAM,0);
if( client_socket < 0)
{
printf("Create Socket Failed!\n");
exit(1);
}
if( bind(client_socket,(struct sockaddr*)&client_addr,sizeof(client_addr)))
{
printf("Client Bind Port Failed!\n");
exit(1);
}
struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family = AF_INET;
if(inet_aton(argv[1],&server_addr.sin_addr) == 0)
{
printf("Server IP Address Error!\n");
exit(1);
}
server_addr.sin_port = htons(HELLO_WORLD_SERVER_PORT);
socklen_t server_addr_length = sizeof(server_addr);
if(connect(client_socket,(struct sockaddr*)&server_addr, server_addr_length) < 0)
{
printf("Can Not Connect To %s!\n",argv[1]);
exit(1);
}
char file_name[FILE_NAME_MAX_SIZE+1];
bzero(file_name, FILE_NAME_MAX_SIZE+1);
printf("Please Input File Name On Server:\t");
scanf("%s", file_name);
char buffer[BUFFER_SIZE];
bzero(buffer,BUFFER_SIZE);
strncpy(buffer, file_name, strlen(file_name)>BUFFER_SIZE?BUFFER_SIZE:strlen(file_name));
send(client_socket,buffer,BUFFER_SIZE,0);
FILE * fp = fopen(file_name,"r");
if(NULL == fp )
{
printf("File:\t%s Not Found\n", file_name);
exit(1);
}
else
{
bzero(buffer, BUFFER_SIZE);
int file_block_length = 0;
while( (file_block_length = fread(buffer,sizeof(char),BUFFER_SIZE, fp))>0)
{
if(send(client_socket,buffer,file_block_length,0)<0)
{
printf("Send File:\t%s Failed\n", file_name);
break;
}
bzero(buffer, BUFFER_SIZE);
}
}
printf("Send File:\t %s To Server[%s] Finished\n",file_name, argv[1]);
printf("The File has %d words.\n", wc_func(file_name));
fclose(fp);
close(client_socket);
return 0;
}
int wc_func(char file_name)
{
int t;
int w = 0;
int state = 0;
FILE in;
if((in = fopen(file_name,"r"))==NULL)
{
printf("wc %s:no this file or dir\n",file_name);
return;
}
while((t=fgetc(in))!=EOF)
{
if(t==‘\n‘||t==‘ ‘||t==‘\r‘) {
state = 0;
continue;
} else {
if(state == 0) {
state = 1;
w++;
}
continue;
}
任務二
使用多線程實現wc服務器並使用同步互斥機制保證計數正確
上方提交代碼
下方提交測試
對比單線程版本的性能,並分析原因
原因:所有數據結構的生存期,以及對這些數據結構的access,都用這一根邏輯線程。
不需要考慮數據結構的race。
把任何耗時的操作都給其他線程(IO線程、定時器線程,DB線程等)做,做完之後向事件隊列(多線程安全的隊列,其他線程是生產者,邏輯線程是消費者)丟事件。
多線程邏輯設計的思路:
所有數據結構的生存期,以及對這些數據結構的access,不一定在一根線程。
需要考慮數據結構的race。
網絡事件、定時器事件喚醒工作線程(一般通過iocp或者epoll來喚醒)執行所有工作,一般不需要交換到其他線程。
很顯然,單線程邏輯多了一層事件隊列交換,會增加延遲,以及所有的邏輯都在一根線程上跑,邏輯被阻塞也會帶來延遲。
其實吞吐量對於rpc來說,是個宏觀的概念,盡可能快地消費網絡消息就會提升吞吐量。
對於高並發的程序,是無法忍受單線程邏輯
server:
include <stdio.h>
include <fcntl.h>
include <pthread.h>
include <sys/stat.h>
include <sys/types.h>
include <sys/socket.h>
include <arpa/inet.h>
define PORT 8887
define BUFF_SIZE 1024
define LISTEN_SIZE 20
typedef struct{
char type;
char data[BUFF_SIZE];
}m_package;
void* process_client();
int main(){
int ss = create_tcp_server(PORT);
if(-1 == ss)
exit(-1);
while(1){
//接受客戶端連接
socklen_t addrlen = sizeof(struct sockaddr);
struct sockaddr_in client_addr; //客戶端地址結構
int client_sock = accept(ss, (struct sockaddr*)&client_addr, &addrlen);
if(client_sock < 0){
printf("accept error\n");
}
printf("accept success\n");
pthread_t pid;
if(pthread_create(&pid, NULL, process_client, &client_sock) < 0){
printf("pthread_create error\n");
}
}
}
//處理客戶端程序
void process_client(void arg){
int size = 0, fd, count = 0, sockid = (int)arg;
m_package pac;
long total = 0, cur = 0;
//循環接收文件
while(1) {
memset(&pac, 0, sizeof(pac));
size = read(sockid, &pac, sizeof(pac));
if(size > 0){
if (pac.type == 1){
fd = open(pac.data, O_CREAT|O_WRONLY, 0777);
if(-1 == fd){
printf("open file error!\n");
continue;
}
count = total = cur = 0;
}
else if (pac.type == 2){
cur += write(fd, pac.data, strlen(pac.data));
if(count++ % 5000 == 0){
printf("recv from client < %d > : %.01lf%\n", sockid, cur * 100.0 / total);
count = 0;
}
}
else if (pac.type == 3){
printf("recv from client < %d > : 100.0%\n", sockid);
printf("recv success\n");
close(fd);
}
else if(pac.type == 4){//文件長度
total = strtol(pac.data, NULL, 10);
printf("%ld\n", total);
}
}else{
printf("client disconnected\n");
close(sockid);
break;
}
}
return 0;
}
int start_server(int port, int type){
//建立服務器套接字
int ss = socket(AF_INET, type, 0);
if(ss < 0){
printf("create socket error\n");
return -1;
}
//設置服務器地址
struct sockaddr_in server_addr; //服務器地址結構
bzero(&server_addr, sizeof(struct sockaddr_in)); //清零
server_addr.sin_family = AF_INET; //協議族
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); //ip地址
server_addr.sin_port = htons(port); //端口
//綁定地址結構到套接字描述符
if(bind(ss, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0){
printf("bind error\n");
return -1;
}
//TCP
if(SOCK_STREAM == type){
//設置偵聽
if(listen(ss, LISTEN_SIZE) < 0){
printf("listen error\n");
return -1;
}
printf("tcp server start\n");
}
else
printf("udp server start\n");
return ss;
}
int create_tcp_server(int port){
start_server(port, SOCK_STREAM);
}
int create_udp_server(int port){
start_server(port, SOCK_DGRAM);
}
client:#include <stdio.h>
include <fcntl.h>
include <pthread.h>
include <sys/stat.h>
include <sys/types.h>
include <sys/socket.h>
include <arpa/inet.h>
define PORT 8887
define BUFF_SIZE 1024
define LISTEN_SIZE 20
typedef struct{
char type;
char data[BUFF_SIZE];
}m_package;
int main(){
//創建連接
int sock_fd = connect_tcp("127.0.0.1", PORT);
if(-1 == sock_fd)
return -1;
m_package pac;
int fd, cur = 0, count = 0;
long filesize = 0;
while(1){
//打開文件
memset(&pac, 0, sizeof(pac));
pac.type = 1;
// strcpy(pac.data, "/home/SKZH/a.txt");
scanf("%s", pac.data);
//獲取文件信息
struct stat sfile;
stat(pac.data, &sfile );
filesize = sfile.st_size;
time_t t;
long begin = time(&t);
cur = count = 0;
fd = open(pac.data, O_RDONLY);
if(-1 == fd){
printf("file open error\n");
continue;
}
//讀取文件並發送
//發送文件名
strcpy(pac.data, strrchr(pac.data, ‘/‘) + 1);
write(sock_fd, &pac, sizeof(pac));
memset(&pac, 0, sizeof(pac));
//發送文件長度
pac.type = 4;
sprintf(pac.data,"%ld",filesize);
write(sock_fd, &pac, sizeof(pac));
memset(&pac, 0, sizeof(pac));
int read_len = 0;
while((read_len = read(fd, pac.data, BUFF_SIZE)) > 0){
pac.type = 2;
write(sock_fd, &pac, sizeof(pac));
memset(&pac, 0, sizeof(pac));
cur += read_len;
if(count++ % 5000 == 0){
count = 0;
printf("send to server : %.1lf\%\n", cur * 100.0 / filesize);
}
}
//發送結束標記
memset(&pac, 3, sizeof(pac));
write(sock_fd, &pac, BUFF_SIZE + 1);
close(fd);
printf("send to server : 100.0\%\n");
printf("file size : %d B\n", filesize);
printf("time : %ld ms\n", time(&t) - begin);
printf("send file success\n");
printf("------------------------\n");
}
close(sock_fd);
}
int connectsock(char* server_ip, int server_port, int type){
int sock_fd = socket(AF_INET, type, 0);
if(-1 == sock_fd){
printf("create socket error\n");
return -1;
}
struct sockaddr_in server_addr;
//設置服務器地址
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(server_port);
inet_pton(AF_INET, server_ip, &server_addr.sin_addr);
//連接服務器
if(-1 == connect(sock_fd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr_in))){
printf("connect server error\n");
return -1;
}
printf("connect server success\n");
return sock_fd;
}
int connect_tcp(char* server_ip, int server_port){
return connectsock(server_ip, server_port, SOCK_STREAM);
}
int connect_udp(char* server_ip, int server_port){
return connectsock(server_ip, server_port, SOCK_DGRAM);
}
2018-2019-1 實驗三 並發程序