作者:容易   2013-03-29 20:20:31


    该测试版本,增加了随机SQL的选择,SQL类型可以根据实际情况扩展和定义,可以实现各种不同类型的数据库操作,实现与生产一致的并发请求测试,该测试模型理论上也适用其他twisted支持的数据库,只需要将dbpool的相关配置变更即可,时间紧迫代码细节没有太过细究,还可以更美观简洁。

 

 


 

代码如下:


[root@hadoop-slave script]# cat random_msql.py 

#-*-coding:utf-8-*-
from twisted.internet import epollreactor
epollreactor.install()
from twisted.internet import reactor
from twisted.enterprise import adbapi
import time,random,cx_Oracle
reactor.suggestThreadPoolSize(120)
pool = cx_Oracle.SessionPool(user='test',password='oracle',
dsn='192.168.0.205:1521/test',min=1,max=4,increment=1)
conn = pool.acquire()
cursor = conn.cursor()
sql='select name from test where id > 4000 and id < 10000'
cursor.execute(sql)
name_values=cursor.fetchall()
def m_sql(x,y):
        sql_type=random_sql_type()
        sql=random_sql(sql_type)
        dbpool = y
        dbpool.runOperation(sql[0],sql[1])
def random_sql(sql_type):
        sql_type=sql_type
        if sql_type[1] == 'id':
                values={}
                values['id']=random.randint(100,100000)
                return (sql_type[0],values)
        elif sql_type[1] == 'name':
                values={}
                global name_values
                values['name']=str(random.choice(name_values)[0])
                return (sql_type[0],values)
        else:
                values={}
                values['id']=random.randint(20000,30000)
                values['name']=str(random.randint(70000,80000))
                return (sql_type[0],values)
def random_sql_type():
    sql_insert=("insert into test values (:id,:name)","insert")
    select_id=("select * from test where id = :id","id")
    select_name=("select * from test where name = :name","name")
    return random.choice([sql_insert, select_id, select_name])
def loop_factory(x):
        dbpool = adbapi.ConnectionPool('cx_Oracle', user='test',password ='oracle',dsn='192.168.0.205:1521/test',cp_min=80,cp_max=100)
        for i in xrange(1,x):
                m_sql(i,dbpool)
        stopreactor(10)
def stopreactor(i):
        time.sleep(i)
        reactor.stop()
def main():
    reactor.callWhenRunning(loop_factory,100000)
    reactor.run()
if __name__ == '__main__':
    main()

  



One Response


    还没有评论!
1  

Leave your comment

请留下您的姓名(*)

请输入正确的邮箱地址(*)

请输入你的评论(*)


感谢开源 © 2016. All rights reserved.&3Q Open Source&^_^赣ICP备15012863号-1^_^
乐于分享共同进步 KreativeThemes