php自己实现memcached的队列类
php的memcache队列类
version 0.2
1.修改了changehead方法,当get(1)只取一条数据时$head_key的值没改变的问题
2.修改了clear方法,当队列较小时按最大队列长度删除的问题
测试结果
队列类添加20000条数据,用时10.390461921692秒
队列类读取20000条数据,用时0.087434053421021秒
队列类取出20000条数据,用时0.91940212249756秒
队列类清空20000条数据,用时0.00077390670776367秒
------------------
php扩展添加20000条数据,用时0.9195499420166秒
php扩展读取20000条数据,用时0.050693988800049秒
php扩展取出20000条数据,用时0.88281202316284秒
php扩展删除20000条数据,用时0.81996202468872秒
------------------
队列写入的性能是个问题,待改善
[email protected]
memcachequeue.class.php
add('1asdf'); * $obj->getqueuelength(); * $obj->read(11); * $obj->get(8); */ class memcachequeue{ public static $client; //memcache客户端连接 public $access; //队列是否可更新 private $currentside; //当前轮值的队列面:a/b private $lastside; //上一轮值的队列面:a/b private $sideahead; //a面队首值 private $sideatail; //a面队尾值 private $sidebhead; //b面队首值 private $sidebtail; //b面队尾值 private $currenthead; //当前队首值 private $currenttail; //当前队尾值 private $lasthead; //上轮队首值 private $lasttail; //上轮队尾值 private $expire; //过期时间,秒,1~2592000,即30天内 private $sleeptime; //等待解锁时间,微秒 private $queuename; //队列名称,唯一值 private $retrynum; //重试次数,= 10 * 理论并发数 const maxnum = 10000; //(单面)最大队列数,建议上限10k const head_key = '_lkkqueuehead_'; //队列首kye const tail_key = '_lkkqueuetail_'; //队列尾key const valu_key = '_lkkqueuevalu_'; //队列值key const lock_key = '_lkkqueuelock_'; //队列锁key const side_key = '_lkkqueueside_'; //轮值面key /* * 构造函数 * @param [queuename] string 队列名称 * @param [expire] string 过期时间 * @param [config] array memcache服务器参数 * @return null */ public function __construct($queuename ='',$expire='',$config =''){ if(empty($config)){ self::$client = memcache_pconnect('127.0.0.1',11211); }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211') self::$client = memcache_pconnect($config['host'],$config['port']); }elseif(is_string($config)){//127.0.0.1:11211 $tmp = explode(':',$config); $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1'; $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211'; self::$client = memcache_pconnect($conf['host'],$conf['port']); } if(!self::$client) return false; ignore_user_abort(true);//当客户断开连接,允许继续执行 set_time_limit(0);//取消脚本执行延时上限 $this->access = false; $this->sleeptime = 1000; $expire = (empty($expire)) ? 3600 : (int)$expire+1; $this->expire = $expire; $this->queuename = $queuename; $this->retrynum = 20000; $side = memcache_add(self::$client, $queuename . self::side_key, 'a',false, $expire); $this->getheadntail($queuename); if(!isset($this->sideahead) || empty($this->sideahead)) $this->sideahead = 0; if(!isset($this->sideatail) || empty($this->sideatail)) $this->sideatail = 0; if(!isset($this->sidebhead) || empty($this->sidebhead)) $this->sidebhead = 0; if(!isset($this->sidebtail) || empty($this->sidebtail)) $this->sidebtail = 0; } /* * 获取队列首尾值 * @param [queuename] string 队列名称 * @return null */ private function getheadntail($queuename){ $this->sideahead = (int)memcache_get(self::$client, $queuename.'a'. self::head_key); $this->sideatail = (int)memcache_get(self::$client, $queuename.'a'. self::tail_key); $this->sidebhead = (int)memcache_get(self::$client, $queuename.'b'. self::head_key); $this->sidebtail = (int)memcache_get(self::$client, $queuename.'b'. self::tail_key); } /* * 获取当前轮值的队列面 * @return string 队列面名称 */ public function getcurrentside(){ $currentside = memcache_get(self::$client, $this->queuename . self::side_key); if($currentside == 'a'){ $this->currentside = 'a'; $this->lastside = 'b'; $this->currenthead = $this->sideahead; $this->currenttail = $this->sideatail; $this->lasthead = $this->sidebhead; $this->lasttail = $this->sidebtail; }else{ $this->currentside = 'b'; $this->lastside = 'a'; $this->currenthead = $this->sidebhead; $this->currenttail = $this->sidebtail; $this->lasthead = $this->sideahead; $this->lasttail = $this->sideatail; } return $this->currentside; } /* * 队列加锁 * @return boolean */ private function getlock(){ if($this->access === false){ while(!memcache_add(self::$client, $this->queuename .self::lock_key, 1, false, $this->expire) ){ usleep($this->sleeptime); @$i++; if($i > $this->retrynum){//尝试等待n次 return false; break; } } return $this->access = true; } return false; } /* * 队列解锁 * @return null */ private function unlock(){ memcache_delete(self::$client, $this->queuename .self::lock_key); $this->access = false; } /* * 添加数据 * @param [data] 要存储的值 * @return boolean */ public function add($data=''){ $result = false; if(empty($data)) return $result; if(!$this->getlock()){ return $result; } $this->getheadntail($this->queuename); $this->getcurrentside(); if($this->isfull()){ $this->unlock(); return false; } if($this->currenttail queuename .$this->currentside . self::valu_key . $this->currenttail; if(memcache_set(self::$client, $value_key, $data, false, $this->expire)){ $this->changetail(); $result = true; } }else{//当前队列已满,更换轮值面 $this->unlock(); $this->changecurrentside(); return $this->add($data); } $this->unlock(); return $result; } /* * 取出数据 * @param [length] int 数据的长度 * @return array */ public function get($length=0){ if(!is_numeric($length)) return false; if(empty($length)) $length = self::maxnum * 2;//默认读取所有 if(!$this->getlock()) return false; if($this->isempty()){ $this->unlock(); return false; } $keyarray = $this->getkeyarray($length); $lastkey = $keyarray['lastkey']; $currentkey = $keyarray['currentkey']; $keys = $keyarray['keys']; $this->changehead($this->lastside,$lastkey); $this->changehead($this->currentside,$currentkey); $data = @memcache_get(self::$client, $keys); if(empty($data)) $data = array(); foreach($keys as $v){//取出之后删除 @memcache_delete(self::$client, $v, 0); } $this->unlock(); return $data; } /* * 读取数据 * @param [length] int 数据的长度 * @return array */ public function read($length=0){ if(!is_numeric($length)) return false; if(empty($length)) $length = self::maxnum * 2;//默认读取所有 $keyarray = $this->getkeyarray($length); $data = @memcache_get(self::$client, $keyarray['keys']); if(empty($data)) $data = array(); return $data; } /* * 获取队列某段长度的key数组 * @param [length] int 队列长度 * @return array */ private function getkeyarray($length){ $result = array('keys'=>array(),'lastkey'=>null,'currentkey'=>null); $this->getheadntail($this->queuename); $this->getcurrentside(); if(empty($length)) return $result; //先取上一面的key $i = $result['lastkey'] = 0; for($i=0;$ilasthead + $i; if($result['lastkey'] >= $this->lasttail) break; $result['keys'][] = $this->queuename .$this->lastside . self::valu_key . $result['lastkey']; } //再取当前面的key $j = $length - $i; $k = $result['currentkey'] = 0; for($k=0;$kcurrenthead + $k; if($result['currentkey'] >= $this->currenttail) break; $result['keys'][] = $this->queuename .$this->currentside . self::valu_key . $result['currentkey']; } return $result; } /* * 更新当前轮值面队列尾的值 * @return null */ private function changetail(){ $tail_key = $this->queuename .$this->currentside . self::tail_key; memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false; $v = memcache_get(self::$client, $tail_key) +1; memcache_set(self::$client, $tail_key,$v,false,$this->expire); } /* * 更新队列首的值 * @param [side] string 要更新的面 * @param [headvalue] int 队列首的值 * @return null */ private function changehead($side,$headvalue){ $head_key = $this->queuename .$side . self::head_key; $tail_key = $this->queuename .$side . self::tail_key; $sidetail = memcache_get(self::$client, $tail_key); if($headvalue expire); }elseif($headvalue >= $sidetail){ $this->resetside($side); } } /* * 重置队列面,即将该队列面的队首、队尾值置为0 * @param [side] string 要重置的面 * @return null */ private function resetside($side){ $head_key = $this->queuename .$side . self::head_key; $tail_key = $this->queuename .$side . self::tail_key; memcache_set(self::$client, $head_key,0,false,$this->expire); memcache_set(self::$client, $tail_key,0,false,$this->expire); } /* * 改变当前轮值队列面 * @return string */ private function changecurrentside(){ $currentside = memcache_get(self::$client, $this->queuename . self::side_key); if($currentside == 'a'){ memcache_set(self::$client, $this->queuename . self::side_key,'b',false,$this->expire); $this->currentside = 'b'; }else{ memcache_set(self::$client, $this->queuename . self::side_key,'a',false,$this->expire); $this->currentside = 'a'; } return $this->currentside; } /* * 检查当前队列是否已满 * @return boolean */ public function isfull(){ $result = false; if($this->sideatail == self::maxnum && $this->sidebtail == self::maxnum){ $result = true; } return $result; } /* * 检查当前队列是否为空 * @return boolean */ public function isempty(){ $result = true; if($this->sideatail > 0 || $this->sidebtail > 0){ $result = false; } return $result; } /* * 获取当前队列的长度 * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度 * @return int */ public function getqueuelength(){ $this->getheadntail($this->queuename); $sidealength = $this->sideatail - $this->sideahead; $sideblength = $this->sidebtail - $this->sidebhead; $result = $sidealength + $sideblength; return $result; } /* * 清空当前队列数据,仅保留head_key、tail_key、side_key三个key * @return boolean */ public function clear(){ if(!$this->getlock()) return false; $this->getheadntail($this->queuename); $ahead = $this->sideahead;$ahead--; $atail = $this->sideatail;$atail++; $bhead = $this->sidebhead;$bhead--; $btail = $this->sidebtail;$btail++; //删除a面 for($i=$ahead;$iqueuename.'a'. self::valu_key .$i, 0); } //删除b面 for($j=$bhead;$jqueuename.'a'. self::valu_key .$j, 0); } $this->unlock(); $this->resetside('a'); $this->resetside('b'); return true; } /* * 清除所有memcache缓存数据 * @return null */ public function memflush(){ memcache_flush(self::$client); } }
test.php
getqueuelength();$time_use = $time_add_end - $time_add_start;echo 队列类添加{$count}条数据,用时{$time_use}秒
; //读数据$time_read_start = microtimefloat();$tmparr = $queobj->read();$time_read_end = microtimefloat();$count = $queobj->getqueuelength();$time_use = $time_read_end - $time_read_start;echo 队列类读取{$count}条数据,用时{$time_use}秒
; //取出数据$time_get_start = microtimefloat();$tmparr = $queobj->get();$time_get_end = microtimefloat();$count = count($tmparr);$time_use = $time_get_end - $time_get_start;echo 队列类取出{$count}条数据,用时{$time_use}秒
; //清空队列$time_clear_start = microtimefloat();$queobj->clear();$time_clear_end = microtimefloat();$time_use = $time_clear_end - $time_clear_start;echo 队列类清空{$count}条数据,用时{$time_use}秒
; echo
; /*------------------------------------------ * php memcache扩展测试 *------------------------------------------ */ //写数据$time_add_start = microtimefloat();for($i=1;$i; //读数据$time_read_start = microtimefloat();$keyarr = array();for($i=1;$i; //取出数据//读数据$time_get_start = microtimefloat();$keyarr = array();for($i=1;$i;$time_use2 = $time_get_end2 - $time_get_end1;echo php扩展删除{$coutn}条数据,用时{$time_use2}秒
;
本文来自:http://www.oschina.net/code/snippet_111731_20890