1. 程式人生 > >嵌入式開發第31、32天(專案2:用執行緒池實現大批量複製檔案)

嵌入式開發第31、32天(專案2:用執行緒池實現大批量複製檔案)

專案目標

1)在控制檯呼叫程式輸入 原始檔 和目標檔案,實現檔案的完美複製。 2)即把檔案的屬性也複製了。 3)把批量的複製工作交給執行緒池去執行,體現多執行緒的效率。 4)在學會使用執行緒池的同時,瞭解執行緒池的運作機制,並且能夠為我所用。

專案框架

1.      建立若干執行緒,置入執行緒池

2.      任務達到時,從執行緒池取空閒執行緒

3.      取得了空閒執行緒,立即進行任務處理

4.      否則新建一個執行緒,並置入執行緒池,執行3

5.      如果建立失敗或者執行緒池已滿,根據設計策略選擇返回錯誤或將任務置入處理佇列,等待處理

6.      銷燬執行緒池

專案感想

1:做專案真的能夠很好的鞏固自己所學的東西 2:剛開始學習的執行緒池的時候真的學習的非常辛苦 3:裡面的各種互斥,多執行緒並行工作的邏輯方式,真是抽象的不得了。 4:在做一兩條執行緒的時候,你能很好的理清他們的執行方向,當你做到執行緒池的時候,成倍的難度增。可當你完成的時候你又能無比的瞭解他的執行方式。 5:雖然專案是實現出來了,複製的速度也確實比原來快個幾秒,但這個專案並不能很好的體現出執行緒池的威力,這是我的個人感覺 6:其實選擇用執行緒池實現複製功能,是因為 (電腦裡面的檔案) 能夠給我帶來大量的任務~而不是我自己去一個個的新增這些任務。 7:做這個專案,執行緒的另外一個很強大的功能沒有體現,就是他的框架,執行緒池是可以新增各種各樣的任務,而我為了嫌麻煩並沒有搞,所以挖個坑,看以後有沒有機會加上去。

專案程式碼

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <dirent.h>
#include <stdbool.h>
#include <string.h>
#include <errno.h>

#define MAX_WAITING_TASKS	1000  //等待任務最大數目
#define MAX_ACTIVE_THREADS	200	  //最大執行緒數

struct task						//任務連結串列
{
	void *(*task)(void *arg);	//void *  能夠放置任何格式,只要在呼叫時強制轉換
	void *arg;				

	struct task *next;
};

typedef struct thread_pool		//執行緒池 其實就是一個執行緒的結構體
{
	pthread_mutex_t lock;		//互斥鎖
	pthread_cond_t  cond;		//條件變數  跟互斥鎖是搭配使用
	struct task *task_list;		//一個任務節點

	pthread_t *tids;			//執行緒屬性變數

	unsigned waiting_tasks;		//等待任務
	unsigned active_threads;	//執行執行緒
	
	bool shutdown;				//一個執行緒池銷燬開關
}thread_pool;

 struct file					//一個結構體 存放檔案路徑 和  複製後路徑的
{
	char srcfile[4096];
	char dstfile[4096];
};

bool add_task(thread_pool *pool,void *(*task)(void *arg), void *arg);

void *copyregfile(void * arg)	//複製檔案函式 引數是檔案結構體
{
	

	struct file *dofile = (struct file *)arg; //強制轉換 賦值給檔案結構體

	//printf("srcfile=%s\n",dofile->srcfile);	//檢視 資訊是否正確
	//printf("dstfile =%s\n",dofile->dstfile );

	struct stat file_stat;				//這個結構體來自#include <sys/stat.h>
										//裡面存放著一個檔案的屬性
	/*
	
	struct stat 
	{
	    dev_t         st_dev;       //檔案的裝置編號
	    ino_t         st_ino;       //節點
	    mode_t        st_mode;      //檔案的型別和存取的許可權
	    nlink_t       st_nlink;     //連到該檔案的硬連線數目,剛建立的檔案值為1
	    uid_t         st_uid;       //使用者ID
	    gid_t         st_gid;       //組ID
	    dev_t         st_rdev;      //(裝置型別)若此檔案為裝置檔案,則為其裝置編號
	    off_t         st_size;      //檔案位元組數(檔案大小)
	    unsigned long st_blksize;   //塊大小(檔案系統的I/O 緩衝區大小)
	    unsigned long st_blocks;    //塊數
	    time_t        st_atime;     //最後一次訪問時間
	    time_t        st_mtime;     //最後一次修改時間
	    time_t        st_ctime;     //最後一次改變時間(指屬性)
	};
	
	*/
										
	stat(dofile->srcfile, &file_stat);//通過檔名 獲取檔案的屬性把他存放到結構體裡面

	int srcfd,dstfd;					
	srcfd = open(dofile->srcfile,O_RDONLY);//用只讀的方式開啟原始檔
	if(srcfd == -1 )
	{
		printf("open file %s\n failed.\n",dofile->srcfile);
		return NULL;
	}
	
	dstfd = open(dofile->dstfile,O_CREAT | O_TRUNC | O_RDWR,file_stat.st_mode);//以原始檔的型別和許可權建立檔案
																			//st_mode存放的東西看下面*1*
	if( dstfd == -1)
	{
		printf("open file %s failed.\n",dofile->dstfile);
		return NULL;
	}

	int nread;
	char buf[100];
	while((nread = read(srcfd,buf,100)) > 0)  //讀取原始檔的內容
	{
		if( write(dstfd,buf,nread) == -1)	  //把讀到的全部寫進目標檔案
		{
			break;
		}
	}

	close(srcfd);
	close(dstfd);
	return NULL;
}



//拷貝目錄,成功返回0.失敗返回-1
int copydir( struct file *dofile,thread_pool *pool)
{
	struct stat file_stat;				
	stat(dofile->srcfile,&file_stat); //獲取檔案的屬性
	mkdir(dofile->dstfile,file_stat.st_mode); //以源目錄的型別和目錄來建立一個目錄

	DIR *srcdir = opendir(dofile->srcfile); //開啟源目錄
	struct dirent *dp;
	
	
	
	while( (dp = readdir(srcdir))!=NULL )    //獲取資料夾內檔案的資訊
	{
		
	
		if(dp->d_name[0] == '.') //如果檔案為. 或者 .. 則跳過
		{						 
			continue;
		}
		//對本目錄下的所有檔案進行拷貝操作
		struct file *tmpfile = malloc(sizeof(struct file)); //為檔案結構體開闢記憶體空間
		
		memset(tmpfile,0,sizeof(struct file));				//對記憶體清零 
		
		sprintf(tmpfile->srcfile,"%s/%s",dofile->srcfile,dp->d_name);//拼湊原始檔路徑
		sprintf(tmpfile->dstfile,"%s/%s",dofile->dstfile,dp->d_name);//拼湊目標檔案路徑

		struct stat tmpstat;
		stat(tmpfile->srcfile,&tmpstat);	     			

		if(S_ISREG(tmpstat.st_mode))						//如果為普通檔案,則拷貝
		{
			printf("srcfile = %s\n",tmpfile->srcfile);
			printf("tmpfile->dstfile = %s\n", tmpfile->dstfile);
			
			add_task( pool, copyregfile, tmpfile);			//把複製的任務丟到任務連結串列
			
			
			
		}
		else if(S_ISDIR(tmpstat.st_mode))//如果為目錄,則遞迴
		{
			copydir(tmpfile,pool);
		}

	}
	return 0;
}


void handler(void *arg)								//防止死鎖 上一個部落格有細講
{ 
	pthread_mutex_unlock((pthread_mutex_t *)arg);
}

void *routine(void *arg)							//執行緒的呼叫的任務
{
	thread_pool *pool = (thread_pool *)arg;			//強制格式轉換 複製給 執行緒池 
													//其實執行緒池說白了  就是名人的影分身~ 
	struct task *p;									//任務節點~

	while(1)
	{

		pthread_cleanup_push(handler, (void *)&pool->lock);//預防死鎖的機制 上個部落格有講
		pthread_mutex_lock(&pool->lock);					//互斥鎖~ 上鎖 


		while(pool->waiting_tasks == 0 && !pool->shutdown)	//判斷有沒任務,沒有就睡眠~
		{
			pthread_cond_wait(&pool->cond, &pool->lock); //等待一個喚醒的訊號
		}


		if(pool->waiting_tasks == 0 && pool->shutdown == true)//判斷是不是要關閉執行緒
		{
			pthread_mutex_unlock(&pool->lock);				//解鎖
			pthread_exit(NULL);								//退出執行緒
		}


		p = pool->task_list->next;			//從執行緒池中的任務連結串列裡拿一個任務賦值給P
		
		pool->task_list->next = p->next;	//然後讓原來的任務節點脫離連結串列 
		
		pool->waiting_tasks--;				//簡單的來說就是 從連結串列裡提取了一個任務。拿了以後,連結串列也就沒了這個任務											


		pthread_mutex_unlock(&pool->lock);				
		pthread_cleanup_pop(0);


		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //強制性的阻塞 任何的取消
		
	
		(p->task)(p->arg);			//這就是一個函式
					
									//他執行的是add_task( pool, copyregfile, tmpfile);	
									//傳過來的引數 
									//p->task 等於copyregfile
									//p->arg 等於  tmpfile
									//想一下為啥p突然有值了~ 這個邏輯很關鍵 
		
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);//關閉強制阻塞
		
		free(p->arg);							//釋放在檢索目錄時 在記憶體開闢的空間
		free(p);								//釋放掉完成任務的節點
	}

	pthread_exit(NULL);
}

bool init_pool(thread_pool *pool, unsigned int threads_number)//初始化執行緒池
{
	pthread_mutex_init(&pool->lock, NULL);					
	pthread_cond_init(&pool->cond, NULL);

	pool->shutdown = false;
	pool->task_list = malloc(sizeof(struct task));
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);

	if(pool->task_list == NULL || pool->tids == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	pool->task_list->next = NULL;

	pool->waiting_tasks = 0;
	pool->active_threads = threads_number;

	int i;
	for(i=0; i<pool->active_threads; i++)
	{
		if(pthread_create(&((pool->tids)[i]), NULL,
					routine, (void *)pool) != 0)
		{
			perror("create threads error");
			return false;
		}
	}

	return true;
}

bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)//新增任務
{
	struct task *new_task = malloc(sizeof(struct task));
	if(new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}
	
	
	new_task->task = task;
	new_task->arg = arg;
	new_task->next = NULL;
	
	
	pthread_mutex_lock(&pool->lock);
	
	if(pool->waiting_tasks >= MAX_WAITING_TASKS)
	{
		pthread_mutex_unlock(&pool->lock);

		fprintf(stderr, "too many tasks.\n");
		free(new_task);

		return false;
	}
	
	struct task *tmp = pool->task_list;
	while(tmp->next != NULL)
		tmp = tmp->next;

	tmp->next = new_task;
	pool->waiting_tasks++;


	pthread_mutex_unlock(&pool->lock);
	pthread_cond_signal(&pool->cond);	//喚醒一個休眠的執行緒

	return true;
}


int add_thread(thread_pool *pool, unsigned additional_threads)		//新增執行緒
{
	if(additional_threads == 0)
		return 0;

	unsigned total_threads =
		     pool->active_threads + additional_threads;

	int i, actual_increment = 0;
	for(i = pool->active_threads;
	    i < total_threads && i < MAX_ACTIVE_THREADS;
	    i++)
	{
		if(pthread_create(&((pool->tids)[i]),
				NULL, routine, (void *)pool) != 0)
		{
			perror("add threads error");

			if(actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++; 
	}

	pool->active_threads += actual_increment;
	return actual_increment;
}

int remove_thread(thread_pool *pool, unsigned int removing_threads) //刪除執行緒
{
	if(removing_threads == 0)
		return pool->active_threads;

	int remain_threads = pool->active_threads - removing_threads;
	remain_threads = remain_threads>0 ? remain_threads:1;

	int i;
	for(i=pool->active_threads-1; i>remain_threads-1; i--)
	{
		errno = pthread_cancel(pool->tids[i]);
		if(errno != 0)
			break;
	}

	if(i == pool->active_threads-1)
		return -1;
	else
	{
		pool->active_threads = i+1;
		return i+1;
	}
}

bool destroy_pool(thread_pool *pool)							//摧毀執行緒池
{

	pool->shutdown = true;
	pthread_cond_broadcast(&pool->cond);

	int i;
	for(i=0; i<pool->active_threads; i++)
	{
		errno = pthread_join(pool->tids[i], NULL);
		if(errno != 0)
		{
			printf("join tids[%d] error: %s\n",
					i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
		
	}

	free(pool->task_list);
	free(pool->tids);
	free(pool);

	return true;
}


void *count_time(void *arg)							//寫的一個計算時間的執行緒。。
{													//後來發現 在執行時加入 time 這個命令,系統會自動算時間
	int i = 0;
	while(1)
	{
		sleep(1);
		printf("sec: %d\n", ++i);
	}
}


int main(int argc,char *argv[])
{
	
	if(argc != 3)
	{
		printf("Please run : ./%s xxx yyy\n",argv[0]);
		return -1;
	}
	
	//計算執行時間
//	pthread_t a;
	//pthread_create(&a, NULL, count_time, NULL);
	
	//初始化池
	
	thread_pool *pool = malloc(sizeof(thread_pool));
	init_pool(pool,100);
	
	struct file dofile;
	strcpy(dofile.srcfile,argv[1]);
	strcpy(dofile.dstfile,argv[2]);

	struct stat srcstat;
	stat(dofile.srcfile,&srcstat);
	

	if(S_ISREG(srcstat.st_mode))//如果為普通檔案,則拷貝
	{
		copyregfile(&dofile);
		
	}
	else if(S_ISDIR(srcstat.st_mode))//如果為目錄,則遞迴
	{
		copydir(&dofile,pool);
	}
	


	destroy_pool(pool);
	return 0;
	
}


/*     --------*1*---------
先前所描述的st_mode 則定義了下列數種情況:
    S_IFMT   0170000    檔案型別的位遮罩
    S_IFSOCK 0140000    scoket
    S_IFLNK 0120000     符號連線
    S_IFREG 0100000     一般檔案
    S_IFBLK 0060000     區塊裝置
    S_IFDIR 0040000     目錄
    S_IFCHR 0020000     字元裝置
    S_IFIFO 0010000     先進先出

    S_ISUID 04000     檔案的(set user-id on execution)位
    S_ISGID 02000     檔案的(set group-id on execution)位
    S_ISVTX 01000     檔案的sticky位

    S_IRUSR(S_IREAD) 00400     檔案所有者具可讀取許可權
    S_IWUSR(S_IWRITE)00200     檔案所有者具可寫入許可權
    S_IXUSR(S_IEXEC) 00100     檔案所有者具可執行許可權

    S_IRGRP 00040             使用者組具可讀取許可權
    S_IWGRP 00020             使用者組具可寫入許可權
    S_IXGRP 00010             使用者組具可執行許可權

    S_IROTH 00004             其他使用者具可讀取許可權
    S_IWOTH 00002             其他使用者具可寫入許可權
    S_IXOTH 00001             其他使用者具可執行許可權
*/


相關推薦

嵌入式開發3132專案2:執行實現批量複製檔案

專案目標 1)在控制檯呼叫程式輸入 原始檔 和目標檔案,實現檔案的完美複製。 2)即把檔案的屬性也複製了。 3)把批量的複製工作交給執行緒池去執行,體現多執行緒的效率。 4)在學會使用執行緒池的同時,瞭解執行緒池的運作機制,並且能夠為我所用。 專案框架

執行實現原理Executor框架,java提供常用的幾種執行死鎖產生條件和避免

 為什麼使用執行緒池 伺服器應用程式中經常出現的情況是:單個任務處理的時間很短而請求的數目卻是巨大的。如果每個請求對應一個執行緒(thread-per-request)方法的不足之一是:為每個請求建立一個新執行緒的開銷很大;為每個請求建立新執行緒的伺服器在建立和銷燬執行緒上

21Java併發類庫提供的執行有哪幾種? 分別有什麼特點?高併發程式設計----7

目錄 今天我要問你的問題是,Java 併發類庫提供的執行緒池有哪幾種? 分別有什麼特點? 典型回答 考點分析 知識擴充套件 下面我就從原始碼角度,分析執行緒池的設計與實現,我將主要圍繞最基礎的 ThreadPoolExecutor 原始碼。 進一步分析,執行緒池既然

跟著阿里p7一起學java高併發 - 18:玩轉java執行,這一篇就夠了

java中的執行緒池,這一篇就夠了 java高併發系列第18篇文章。 本文主要內容 什麼是執行緒池 執行緒池實現原理 執行緒池中常見的各種佇列 自定義執行緒建立的工廠 常見的飽和策略 自定義飽和策略 執行緒池中兩種關閉方法有何不同 擴充套件執行緒池 合理地配置執行緒池 執行緒池中執行緒數量的配置 什麼是執

【程式設計筆記】執行實現原始碼從POCO中剝離出來

原始碼下載:https://download.csdn.net/download/fzuim/10625204 CThreadPool類 /***************************************************************

2.3四種執行連線的配置和使用和自定義執行

四種執行緒連線池的配置和使用 最終呼叫類和方法 {引數有 核心執行緒數目,最大執行緒數目,存活時間(當前執行緒執行完這個任務之後,等待下一個任務到來的最長等待時間。如果在這個時間內沒有新的任務來到,那當前執行緒就會退出),時間單位,等待佇列(用於存放待執行的任務)} public

Java併發二十一執行實現原理 Java併發十八:阻塞佇列BlockingQueue Java併發十八:阻塞佇列BlockingQueue Java併發程式設計:執行的使用

一、總覽 執行緒池類ThreadPoolExecutor的相關類需要先了解:  (圖片來自:https://javadoop.com/post/java-thread-pool#%E6%80%BB%E8%A7%88) Executor:位於最頂層,只有一個 execute(Runnab

Java併發二十一執行實現原理

Java併發(二十一):執行緒池實現原理 一、總覽 執行緒池類ThreadPoolExecutor的相關類需要先了解:  (圖片來自:https://javadoop.com/post/java-thread-pool#%E6%80%BB%E8%A7%88) E

Java定時任務的幾種方法Thread 和 Timer,執行

/**   * 普通thread   * 這是最常見的,建立一個thread,然後讓它在while迴圈裡一直執行著,   * 通過sleep方法來達到定時任務的效果。這樣可以快速簡單的實現,程式碼如

Java多執行程式設計-7-使用執行實現執行的複和一些坑的避免

原文出自 : https://blog.csdn.net/xlgen157387/article/details/78253096 執行緒複用:執行緒池 首先舉個例子: 假設這裡有一個系統,大概每秒需要處理5萬條資料,這5萬條資料為一個批次,而這沒秒傳送的5萬條資料

javaSE高階開發執行——1 程序與執行 and 2執行實現

一、程序與執行緒 1.程序的概念 執行緒隸屬於某個程序,程序是一個程式的執行週期,但是我們的執行緒是執行程序中的某個任務 所以如果程序不存在的話,那麼執行緒自然也就不會存在了。 我們應該時刻將執行緒和任務對等起來,執行一個程式啟動一個程序。這樣就可以提升沃恩程式的執行

互斥鎖解決多個執行幾乎同時修改某個共享資料

def test1(): global g_num mutex.acquire() g_num += 100 mutex.release() print(g_num) def test2(): global g_num # 上鎖 如果之前沒有被上鎖 ,那麼此時上所成功 # 如果之間已經被

併發程式設計十一—— Java 執行 實現原理與原始碼深度解析

史上最清晰的執行緒池原始碼分析 鼎鼎大名的執行緒池。不需要多說!!!!! 這篇部落格深入分析 Java 中執行緒池的實現。 總覽 下圖是 java 執行緒池幾個相關類的繼承結構:    先簡單說說這個繼承結構,Executor 位於最頂層,也是最簡單的,就一個 execute(

併發程式設計十二—— Java 執行 實現原理與原始碼深度解析 之submit方法

在上一篇《併發程式設計(十一)—— Java 執行緒池 實現原理與原始碼深度解析(一)》中提到了執行緒池ThreadPoolExecutor的原理以及它的execute方法。這篇文章是接著上一篇文章寫的,如果你沒有閱讀上一篇文章,建議你去讀讀。本文解析ThreadPoolExecutor#submit。  

c++多執行模式下的socket程式設計執行實現

     socket 程式設計可以說是一個基本的技術掌握,而多個客戶端向服務端傳送請求又是一個非常常見的場景,因此多執行緒模式下的socket程式設計則顯得尤為常見與重要。     本文主要利用執行緒池的技術,來實現多執行緒的模式,執行緒池的優點就不多述了,相信大家都能理

java中synchronized修飾程式碼塊兩種建立執行的方式講解賣票程式

格式: synchronized(類物件名 aa) { //同步程式碼塊 } 功能: synchronized(類物件名 aa)的含義是:判斷aa是否已經被其他執行緒所霸佔,如果發現已經被其他執行緒霸

嵌入式開發42ARM的體系結構

一、ARM的工作狀態 CPU執行的是彙編編譯後的機器碼。ARM處理器支援兩套匯編指令,一套是ARM彙編指令,另外一套THUMB彙編指令。 ARM彙編 ----> 32bits(預設) THUMB

嵌入式開發47看門狗定時器的原理

看門狗定時器有兩個作用: 1、看門狗復位 可以設定看門狗定時器的計數值,計數值在看門狗的工作頻率下不斷的減1,當計數值減到0,看門狗就會產生一個復位訊號,造成了整個系統的復位。 例如: 看門狗的工作頻

嵌入式開發9結構體,列舉,共同體,typedef

複合資料型別自定義型別,由基本的型別構成 結構體 結構體型別定義struct 名字{基本的型別; }; 結構體的大小是怎麼算出來的? struct student_info{char name[19

python就業班32----flask程式碼複資料庫

程式碼複用 flask中程式碼複用: 1.巨集(macro) 定義巨集: {% macro 巨集名(形參=預設值1, ...) %} html程式碼 {% endmacro %} 使用當前檔案巨集:{{ 巨集名(形參=值1,...) }} 2.繼承(