1. 程式人生 > 其它 >PHP 高階程式設計之多執行緒

PHP 高階程式設計之多執行緒

PHP 高階程式設計之多執行緒

http://netkiller.github.io/journal/php.thread.html


目錄

  • 1. 多執行緒環境安裝
    • 1.1. PHP 5.5.9
    • 1.2. 安裝 pthreads 擴充套件
  • 2. Thread
  • 3. Worker 與 Stackable
  • 4. 互斥鎖
    • 4.1. 多執行緒與共享記憶體
  • 5. 執行緒同步
  • 6. 執行緒池
    • 6.1. 執行緒池
    • 6.2. 動態佇列執行緒池
    • 6.3. pthreads Pool類
  • 7. 多執行緒檔案安全讀寫(檔案鎖)
  • 8. 多執行緒與資料連線
    • 8.1. Worker 與 PDO
    • 8.2. Pool 與 PDO
    • 8.3. 多執行緒中操作資料庫總結
  • 9. Thread And ZeroMQ
    • 9.1. 資料庫端
    • 9.2. 資料處理端

1. 多執行緒環境安裝

1.1. PHP 5.5.9

安裝PHP 5.5.9

https://github.com/oscm/shell/blob/master/php/5.5.9.sh

./configure --prefix=/srv/php-5.5.9 
--with-config-file-path=/srv/php-5.5.9/etc 
--with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d 
--enable-fpm 
--with-fpm-user=www 
--with-fpm-group=www 
--with-pear 
--with-curl 
--with-gd 
--with-jpeg-dir 
--with-png-dir 
--with-freetype-dir 
--with-zlib-dir 
--with-iconv 
--with-mcrypt 
--with-mhash 
--with-pdo-mysql 
--with-mysql-sock=/var/lib/mysql/mysql.sock 
--with-openssl 
--with-xsl 
--with-recode 
--enable-sockets 
--enable-soap 
--enable-mbstring 
--enable-gd-native-ttf 
--enable-zip 
--enable-xml 
--enable-bcmath 
--enable-calendar 
--enable-shmop 
--enable-dba 
--enable-wddx 
--enable-sysvsem 
--enable-sysvshm 
--enable-sysvmsg 
--enable-opcache 
--enable-pcntl 
--enable-maintainer-zts 
--disable-debug			

編譯必須啟用zts支援否則無法安裝 pthreads(--enable-maintainer-zts)

1.2. 安裝 pthreads 擴充套件

安裝https://github.com/oscm/shell/blob/master/php/pecl/pthreads.sh

# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash			

檢視pthreads是否已經安裝

# php -m | grep pthreads			

2. Thread

		<?php
class HelloWorld extends Thread {
    public function __construct($world) {
       $this->world = $world;
    }

    public function run() {
        print_r(sprintf("Hello %sn", $this->world));
    }
}

$thread = new HelloWorld("World");

if ($thread->start()) {
    printf("Thread #%lu says: %sn", $thread->getThreadId(), $thread->join());
}
?>		

3. Worker 與 Stackable

		<?php
class SQLQuery extends Stackable {

        public function __construct($sql) {
                $this->sql = $sql;
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $row = $dbh->query($this->sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new SQLQuery('select * from members order by id desc limit 5');
$worker->stack($work);

$table1 = new SQLQuery('select * from demousers limit 2');
$worker->stack($table1);

$worker->start();
$worker->shutdown();
?>		

4. 互斥鎖

什麼情況下會用到互斥鎖?在你需要控制多個執行緒同一時刻只能有一個執行緒工作的情況下可以使用。

下面我們舉一個例子,一個簡單的計數器程式,說明有無互斥鎖情況下的不同。

		<?php
$counter = 0;
//$handle=fopen("php://memory", "rw");
//$handle=fopen("php://temp", "rw");
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
        $this->handle = fopen("/tmp/counter.txt", "w+");
    }
	public function __destruct(){
		fclose($this->handle);
	}
    public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %sn", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
    }
}

//沒有互斥鎖
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥鎖
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>		

我們使用檔案/tmp/counter.txt儲存計數器值,每次開啟該檔案將數值加一,然後寫回檔案。當多個執行緒同時操作一個檔案的時候,就會執行緒執行先後取到的數值不同,寫回的數值也不同,最終計數器的數值會混亂。

沒有加入鎖的結果是計數始終被覆蓋,最終結果是2

而加入互斥鎖後,只有其中的一個程序完成加一工作並釋放鎖,其他執行緒才能得到解鎖訊號,最終順利完成計數器累加操作

上面例子也可以通過對檔案加鎖實現,這裡主要講的是多執行緒鎖,後面會涉及檔案鎖。

4.1. 多執行緒與共享記憶體

在共享記憶體的例子中,沒有使用任何鎖,仍然可能正常工作,可能工作記憶體操作本身具備鎖的功能。

			<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
        $this->shmid = $shmid;
    }
    public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %sn", $this->getThreadId(),$counter);
    }
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>			

5. 執行緒同步

有些場景我們不希望 thread->start() 就開始執行程式,而是希望執行緒等待我們的命令。

$thread->wait();測作用是 thread->start()後執行緒並不會立即執行,只有收到 $thread->notify(); 發出的訊號後才執行

		<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
        $this->shmid = $shmid;
    }
    public function run() {

        $this->synchronized(function($thread){
            $thread->wait();
        }, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %sn", $this->getThreadId(),$counter);
    }
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->synchronized(function($thread){
		$thread->notify();
	}, $threads[$i]);
}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>		

6. 執行緒池

6.1. 執行緒池

自行實現一個Pool類

			<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
    }

    public function run() {

	if(strlen($this->row['bankno']) > 100 ){
		$bankno = safenet_decrypt($this->row['bankno']);
	}else{
		$error = sprintf("%s, %srn",$this->row['id'], $this->row['bankno']);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

		$this->sql = $sql;
	}

	printf("%sn",$this->sql);
    }

}

class Pool {
	public $pool = array();
	public function __construct($count) {
		$this->count = $count;
	}
	public function push($row){
		if(count($this->pool) < $this->count){
			$this->pool[] = new Update($row);
			return true;
		}else{
			return false;
		}
	}
	public function start(){
		foreach ( $this->pool as $id => $worker){
			$this->pool[$id]->start();
		}
	}
	public function join(){
		foreach ( $this->pool as $id => $worker){
               $this->pool[$id]->join();
		}
	}
	public function clean(){
		foreach ( $this->pool as $id => $worker){
			if(! $worker->isRunning()){
            	unset($this->pool[$id]);
            }
		}
	}
}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES 'UTF8'',
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql  = "select id,bankno from members order by id desc";
	$row = $dbh->query($sql);
	$pool = new Pool(5);
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while(true){
			if($pool->push($member)){ //壓入任務到池中
				break;
			}else{ //如果池已經滿,就開始啟動執行緒
				$pool->start();
				$pool->join();
				$pool->clean();
			}
		}
	}
	$pool->start();
    $pool->join();

	$dbh = null;

} catch (Exception $e) {
    echo '[' , date('H:i:s') , ']', '系統錯誤', $e->getMessage(), "n";
}
?>			

6.2. 動態佇列執行緒池

上面的例子是當執行緒池滿後執行start統一啟動,下面的例子是隻要執行緒池中有空閒便立即建立新執行緒。

			<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
	//print_r($this->row);
    }

    public function run() {

	if(strlen($this->row['bankno']) > 100 ){
		$bankno = safenet_decrypt($this->row['bankno']);
	}else{
		$error = sprintf("%s, %srn",$this->row['id'], $this->row['bankno']);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

		$this->sql = $sql;
	}

	printf("%sn",$this->sql);
    }

}



try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES 'UTF8'',
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql     = "select id,bankno from members order by id desc limit 50";

	$row = $dbh->query($sql);
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{
		$id 	= $member['id'];
		while (true){
			if(count($pool) < 5){
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}
		}

	}

	$dbh = null;

} catch (Exception $e) {
    echo '【' , date('H:i:s') , '】', '【系統錯誤】', $e->getMessage(), "n";
}
?>			

6.3. pthreads Pool類

pthreads 提供的 Pool class 例子

			<?php

class WebWorker extends Worker {

	public function __construct(SafeLog $logger) {
		$this->logger = $logger;
	}

	protected $loger;
}

class WebWork extends Stackable {

	public function isComplete() {
		return $this->complete;
	}

	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
				  __CLASS__, $this->worker->getThreadId());
		$this->complete = true;
	}

	protected $complete;
}

class SafeLog extends Stackable {

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf(
				"{$message}n", $args);
		}
	}
}


$pool = new Pool(8, WebWorker::class, [new SafeLog()]);

$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();

$pool->collect(function($work){
	return $work->isComplete();
});

var_dump($pool);			

7. 多執行緒檔案安全讀寫(檔案鎖)

檔案所種類。

LOCK_SH 取得共享鎖定(讀取的程式)。
LOCK_EX 取得獨佔鎖定(寫入的程式。
LOCK_UN 釋放鎖定(無論共享或獨佔)。
LOCK_NB 如果不希望 flock() 在鎖定時堵塞		

共享鎖例子

		<?php

$fp = fopen("/tmp/lock.txt", "r+");

if (flock($fp, LOCK_EX)) {  // 進行排它型鎖定
    ftruncate($fp, 0);      // truncate file
    fwrite($fp, "Write something heren");
    fflush($fp);            // flush output before releasing the lock
    flock($fp, LOCK_UN);    // 釋放鎖定
} else {
    echo "Couldn't get the lock!";
}

fclose($fp);

?>		

共享鎖例子2

		<?php
$fp = fopen('/tmp/lock.txt', 'r+');

/* Activate the LOCK_NB option on an LOCK_EX operation */
if(!flock($fp, LOCK_EX | LOCK_NB)) {
    echo 'Unable to obtain lock';
    exit(-1);
}

/* ... */

fclose($fp);
?>		

8. 多執行緒與資料連線

pthreads 與 pdo 同時使用是,需要注意一點,需要靜態宣告public static $dbh;並且通過單例模式訪問資料庫連線。

8.1. Worker 與 PDO

			<?php
class Work extends Stackable {

        public function __construct() {
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $sql     = "select id,name from members order by id desc limit 50";
                $row = $dbh->query($sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new Work();
$worker->stack($work);

$worker->start();
$worker->shutdown();
?>