首页 > Web开发 > 详细

php多进程编程实例

时间:2017-02-15 13:39:52      阅读:297      评论:0      收藏:0      [点我收藏+]
<?php

namespace Console;

include_once(dirname(__FILE__) . "/../DAOs/DAOFactory.php");

use Helpers\Helper;
use Config\Config;
use Services\TaskService;
use Libs\Log\Log;
use Libs\Log\CLogFileHandler;

class UctDataWorker
{

    protected $diagnosticSessionDAO = null;
    protected $DAOResultEnum = null;
    protected $childs = array();
    protected $signal = null;
    protected $master_pid = 0;

    public function __construct()
    {
        include(dirname(__FILE__) . "/../Enums/DAOEnums.php");
        $this->DAOResultEnum = $DAOResultEnum;

        $this->diagnosticSessionDAO = \DAOFactory::getFactory()->getDiagnosticSessionDAO();
        $this->signal = $this->getSignal();
        $this->master_pid = posix_getpid();
    }

    protected function getSignal()
    {
        return new Signal($this);
    }

    //通过文件独占锁防止多个子进程并发读取同一条数据
    //如果是多台web服务器,独占锁文件需要创建到公共存储上,以防止并发
    protected function lockProcess($task_num)
    {
        $lock_handle = Helper::lockFile();
        if($lock_handle !== false) {
            try{
                $task_list = TaskService::getUnstartTask($task_num);
                if($task_list) {
                    foreach ($task_list as $task) {
                        $tas_ser_id = $task[‘tas_ser_id‘];
                        $tas_id = $task[‘tas_id‘];
                        TaskService::updateTask($tas_ser_id, $tas_id, ‘running‘);
                    }
                    sleep(1);
                }

                Helper::freeLock($lock_handle);
                return $task_list;

            } catch(\Exception $e) {
                Log::Init(new CLogFileHandler());
                Log::INFO(date(‘Y-m-d H:i:s‘, time())."---exception---".$e->getMessage()."\r\n");
                Helper::freeLock($lock_handle);
            }

        } else { //获取锁失败,结束当前子进程
            Log::Init(new CLogFileHandler());
            Log::INFO(date(‘Y-m-d H:i:s‘, time())."---get_file_lock_fail---"."\r\n");
            exit;
        }
    }

    public function run($argv)
    {

        $per_worker_task_num = intval($argv[3]);

        $task_list = $this->lockProcess($per_worker_task_num);

        if($task_list) {
            foreach ($task_list as $task) {
                if ($task) {
                    try {

                        $tas_ser_id = $task[‘tas_ser_id‘];
                        $tas_id = $task[‘tas_id‘];

                        Log::Init(new CLogFileHandler());
                        Log::INFO(date(‘Y-m-d H:i:s‘, time())."---tas_id---" . $tas_id . "\r\n");

                        $tas_data = unserialize($task[‘tas_data‘]);
                        $diagnosticSessionId = $tas_data[‘diagnosticSessionId‘];
                        $dataPath = $tas_data[‘dataPath‘];
                        $cevDataArray = $tas_data[‘cevDataArray‘];
                        $submitForAnalysis = $tas_data[‘submitForAnalysis‘];
                        $user = $tas_data[‘user‘];
                        $start_time = date(‘Y-m-d H:i:s‘, time());
                        $tasid_flag = ‘tas_id:‘ . $tas_id . ‘---tas_ser_id:‘ . $tas_ser_id . "---";

                        $daoResult = $this->diagnosticSessionDAO->AppendToDiagnosticSession($diagnosticSessionId,
                            $dataPath,
                            $cevDataArray,
                            $submitForAnalysis,
                            $patientId, //for return
                            $invalidFiles, //for return
                            $dateTime, //for return
                            $user,
                            false
                        );

                        //insert db success
                        if ($daoResult === $this->DAOResultEnum["DAO_SUCCESS"] && $patientId && $patientId->isValid() && $dateTime) {
                            $returnObj = new \stdClass;
                            $returnObj->diagnosticSessionId = $diagnosticSessionId->toString();
                            $returnObj->patientId = $patientId->toString();
                            $returnObj->confirmationCode = $diagnosticSessionId->toString() . ‘; ‘ . $dateTime->format(‘Y-m-d\TH:i:s‘);
                            $returnObj->invalidFiles = $invalidFiles;

                            echo "\r\n\r\n";

                            echo $tasid_flag;
                            var_dump($returnObj);
                            echo date(‘Y-m-d H:i:s‘, time()) . ‘---success‘ . "\r\n";

                            $arr = array(
                                ‘tas_start_date_time‘ => $start_time,
                                ‘tas_end_date_time‘ => date(‘Y-m-d H:i:s‘, time())
                            );

                            $result = TaskService::updateTask($tas_ser_id, $tas_id, ‘success‘, $arr);
                            if ($result === true) {
                                $rm_result = Helper::delDir($task[‘tas_path‘]);
                                if ($rm_result === true) {
                                    echo $tasid_flag . ‘rm dir---‘ . $task[‘tas_path‘] . ‘---success‘ . "\r\n";
                                } else {
                                    echo $tasid_flag . ‘rm dir---‘ . $task[‘tas_path‘] . ‘---fail‘ . "\r\n";
                                }
                            }

                            echo "\r\n\r\n";

                        } else { //insert db fail

                            $tip = ‘‘;
                            $arr = array(
                                ‘tas_start_date_time‘ => $start_time,
                                ‘tas_end_date_time‘ => date(‘Y-m-d H:i:s‘, time())
                            );

                            if ($daoResult === $this->DAOResultEnum["DAO_SUCCESS"] && $patientId && $patientId->isValid() && $dateTime === null) {
                                $tip = ‘---no new data added!---‘;
                                $rm_result = Helper::delDir($task[‘tas_path‘]); //把已经存到数据库里的文件删除

                                if ($rm_result === true) {
                                    echo $tasid_flag . ‘rm dir---‘ . $task[‘tas_path‘] . ‘---success‘ . "\r\n";
                                } else {
                                    echo $tasid_flag . ‘rm dir---‘ . $task[‘tas_path‘] . ‘---fail‘ . "\r\n";
                                }

                                TaskService::updateTask($tas_ser_id, $tas_id, ‘nonewadd‘, $arr);
                            } else {
                                TaskService::updateTask($tas_ser_id, $tas_id, ‘fail‘, $arr);
                            }

                            echo "\r\n";
                            echo $tasid_flag;
                            echo date(‘Y-m-d H:i:s‘, time()) . ‘:fail‘;
                            echo $tip;
                            echo "\r\n";

                            $result = array(
                                ‘log_time‘ => date(‘Y-m-d H:i:s‘, time()),
                                ‘tas_ser_id‘ => $tas_ser_id,
                                ‘tas_id‘ => $tas_id,
                                ‘diagnosticSessionId‘ => $diagnosticSessionId,
                                ‘dataPath‘ => $dataPath,
                                ‘daoResult‘ => $daoResult,
                                ‘patientId‘ => $patientId,
                                ‘dateTime‘ => $dateTime,
                            );

                            Log::Init(new CLogFileHandler());
                            Log::INFO("\r\n" . var_export($result, true) . "\r\n");
                        }

                    } catch (\Exception $e) {

                        Log::Init(new CLogFileHandler());
                        Log::ERROR($e->getMessage());

                        echo "\r\n";
                        echo $tasid_flag;
                        echo date(‘Y-m-d H:i:s‘, time()) . ‘:exception‘;
                        echo "\r\n";

                        $arr = array(
                            ‘tas_start_date_time‘ => $start_time,
                            ‘tas_end_date_time‘ => date(‘Y-m-d H:i:s‘, time())
                        );
                        TaskService::updateTask($tas_ser_id, $tas_id, ‘fail‘, $arr);

                    }
                }

            }
        }

    }

    //手动启动master命令:
    //nohup /usr/bin/php /var/www/html/ECGVuService_Dev/Console/index.php UctDataWorker masterRun >> /data/log/uctWorker.log 2>&1 &
    //命令行常驻内存运行方式,每10秒扫描一次(配置文件里可以修改该配置)
    //每次修改代码后需要重启主进程才能生效
    public function masterRun()
    {
        $worker_num = Config::$uct_insert_worker_num;
        $master_sleep_time = Config::$master_sleep_time;
        $master_pid = $this->master_pid;
        $master_pid_log = Config::$master_pid_log;

        file_put_contents($master_pid_log, $master_pid);

        while(true){

            $this->signal->dispatch();

            for($i=1; $i<=$worker_num; $i++) {

                if(array_key_exists($i, $this->childs)) {
                    $pid = $this->childs[$i];
                    $child_pid_status = Helper::waitpid($pid); //通知内核释放僵尸进程、非阻塞方式

                    if($child_pid_status==-1 || $child_pid_status>0) { //子进程出错(-1)或退出(>0)再启动子进程
                        sleep(2);
                        $this->forkWorker($i);
                    }

                } else {
                    sleep(2);
                    $this->forkWorker($i);
                }
            }

            sleep($master_sleep_time); //防止CPU 100%
        }
    }

    /**
     * 开启子进程
     * @params Int $index 子进程参数
     */
    protected function forkWorker($index)
    {

        $path = dirname(__FILE__) . ‘/index.php‘;
        $php_path = Config::$php_path;

        // 开启分支
        $pid = pcntl_fork();
        if($pid == -1) {
            $log = sprintf("Fork worker failed! Index: %d\n", $index);
            echo $log;
        }

        if($pid) { //父进程处理逻辑
            $log = sprintf("Fork worker success! pid: %s!\n", $pid);
            echo $log;
            $this->childs[$index] = $pid;

        } else {  //子进程处理逻辑

            $param = array("$path", "UctDataWorker", "run", "1");
            pcntl_exec("$php_path", $param);

            exit(); //此处中断子进程
        }

        $log = sprintf("Finish fork! Index: %d\n", $index);
        echo $log;
    }

    /*
    public function stopMaster($argv=array())
    {
        $pid = intval($argv[3]);

        if($pid<=0) {
            $master_pid_log = Config::$master_pid_log;
            $pid = trim(file_get_contents($master_pid_log));
        }

        echo "Wait stop: [$pid] \n";
        posix_kill($pid, SIGTERM);

        $n = 0;
        while(++$n < 100) {

            pcntl_waitpid($pid, $status);

            $priority = @ pcntl_getpriority($pid);
            if(false === $priority) {
                break;
            }

            echo ‘.‘;
            usleep(1000*200);
        }

        echo "Stopped.\n";
    }
    */

    /*
    * /usr/bin/php /var/www/html/ECGVuService_Dev/Console/index.php UctDataWorker forceStopMaster
    */
    public function forceStopMaster($argv=array())
    {
        $pid = intval($argv[3]);

        if($pid<=0) {
            $master_pid_log = Config::$master_pid_log;
            $pid = trim(file_get_contents($master_pid_log));
        }

        echo "Wait stop: [$pid] \n";

        $cmd = "kill -s 9 ".$pid;
        shell_exec("$cmd");

        echo "Stopped.\n";
    }

    public function reload()
    {
        echo ‘receive signal reload and call function reload---pid is ‘.posix_getpid()."\r\n";
    }

    public function childExit()
    {

    }

    public function stop()
    {
        echo ‘receive signal stop and call function stop---pid is ‘.posix_getpid()."\r\n";
    }

    //计划任务定时检测master进程是否存在,不存在则启动,以root用户运行
    public function checkMaster()
    {
        $cmd = ‘ps axu|grep "UctDataWorker masterRun"|grep -v "grep"|wc -l‘;
        $ret = shell_exec("$cmd");

        $ret = rtrim($ret, "\r\n");

        if($ret === "0") {
            $path = dirname(__FILE__) . ‘/index.php‘;
            $php_path = Config::$php_path;
            $worker_run_log = Config::$worker_run_log;
            $start_master_cmd = "nohup ".$php_path." ".$path." UctDataWorker masterRun >> ".$worker_run_log." 2>&1 &";
            exec("$start_master_cmd", $result);
        }
    }

}

 

Helper.php

<?php

namespace Helpers;

set_time_limit(0);

define(‘DS‘, DIRECTORY_SEPARATOR); // I always use this short form in my code.

class Helper
{

    public static function checkOs()
    {
        $os_name=PHP_OS;
        if(strpos($os_name,"Linux")!==false){
            $os_flag = ‘Linux‘;
        }else if(strpos($os_name,"WIN")!==false){
            $os_flag = ‘Win‘;
        }

        return $os_flag;
    }

    /**
     * @param $filename
     * @param int $n
     * @return bool|string
     */
    public static function getLastLines($filename, $n=1)
    {
        if(!$fp=fopen($filename,‘r‘)){
            echo "打开文件失败,请检查文件路径是否正确,路径和文件名不要包含中文";
            return false;
        }

        $pos=-2;
        $eof="";
        $str="";

        while($n>0){
            while($eof!="\n"){
                if(!fseek($fp,$pos,SEEK_END)){
                    $eof=fgetc($fp);
                    $pos--;
                }else{
                    break;
                }
            }

            $str.=fgets($fp);
            $eof="";
            $n--;
        }

        return $str;
    }

    public static function downloadFile($file_path)
    {
        $fileinfo = pathinfo($file_path);
        header(‘Content-type: application/x-‘.$fileinfo[‘extension‘]);
        header(‘Content-Disposition: attachment; filename=‘.$fileinfo[‘basename‘]);
        header(‘Content-Length: ‘.filesize($file_path));
        readfile($file_path);
    }

    public static function arrIconv($input_charset=‘GBK‘, $output_charset=‘UTF-8‘, $arr)
    {
        $return = array();

        if(is_array($arr)) {
            foreach($arr as $k=>$item) {
                if(is_array($item)) {
                    $return[$k] = self::arrIconv($input_charset, $output_charset, $item);
                } else {
                    $return[$k] = iconv($input_charset, $output_charset, $item);
                }
            }
        } else {
            $return = iconv($input_charset, $output_charset, $arr);
        }

        return $return;
    }

    public static function copyR($path, $dest) { // 原目录,复制到的目录

        if( is_dir($path) ) {
            @mkdir( $dest );
            $objects = scandir($path);
            if( sizeof($objects) > 0 ) {
                foreach( $objects as $file ) {
                    if( $file == "." || $file == ".." ) {
                        continue;
                    }

                    // go on
                    if( is_dir( $path.DS.$file ) ) {
                        self::copyR( $path.DS.$file, $dest.DS.$file );
                    } else {
                        copy( $path.DS.$file, $dest.DS.$file );
                    }
                }
            }

            return true;

        } elseif( is_file($path) ) {
            return copy($path, $dest);
        } else {
            return false;
        }
    }

    public static function getDirList($dir_path)
    {
        if(!is_dir($dir_path)) {
            return false;
        }

        $dir_list = array();

        $d = dir($dir_path);
        if ($d) {
            while (false !== ($entry = $d->read())) {
                if ($entry != ‘.‘ && $entry != ‘..‘) {
                    $k = str_replace(array(‘-‘, ‘_‘), array(‘‘), $entry);
                    preg_match(‘/(\d+)/‘, $k, $matches);
                    $dir_list[$matches[0]] = $entry;
                }
            }

            $d->close();
        }

        ksort($dir_list);

        return $dir_list;
    }

    public static function random($length, $numeric = 0)
    {
        $seed = base_convert(md5(microtime().$_SERVER[‘DOCUMENT_ROOT‘]), 16, $numeric ? 10 : 35);
        $seed = $numeric ? (str_replace(‘0‘, ‘‘, $seed).‘012340567890‘) : ($seed.‘zZ‘.strtoupper($seed));

        if($numeric) {
            $hash = ‘‘;
        } else {
            $hash = chr(rand(1, 26) + rand(0, 1) * 32 + 64);
            $length--;
        }

        $max = strlen($seed) - 1;
        for($i = 0; $i < $length; $i++) {
            $hash .= $seed{mt_rand(0, $max)};
        }

        return $hash;
    }

    public static function delDir($dir)
    {
        if(strtoupper(substr(PHP_OS, 0, 3)) == ‘WIN‘) {
            $str = "rmdir /s/q " . $dir;
        } else {
            $str = "rm -Rf " . $dir;
        }

        exec($str, $result, $return_var);
        return true;
    }

    /**
     * @see pcntl_waitpid()
     */
    public static function waitpid($pid, &$status=0, $flag = WNOHANG) {
        return pcntl_waitpid($pid, $status, $flag);
    }

    /**
     * @see posix_kill()
     */
    public static function kill($pid) {
        return posix_kill($pid, SIGTERM);
    }

    public static function lockFile($file_path=‘/tmp/file_lock.lock‘)
    {
        $fp = fopen($file_path, "w+");
        if(flock($fp, LOCK_EX)) {
            fwrite($fp, date(‘Y-m-d H:i:s‘, time())."---pid[".posix_getpid()."]---lock suc!\n");
            return $fp;
        } else {
            return false;
        }
    }

    public static function freeLock($fp)
    {
        fflush($fp);
        flock($fp, LOCK_UN);
        fclose($fp);
    }

}

 

php多进程编程实例

原文:http://www.cnblogs.com/dongruiha/p/6400979.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!