10月 29

splinter自动化测试工具学习

splinter自动化测试工具学习。模拟浏览器操作。

官方标文档:http://splinter.cobrateam.info/docs/

测试环境如下:
windows 8 64位操作系统安装python2.7.8

window按顺序安装下面的包,也可以直接用pip安装。
https://bootstrap.pypa.io/ez_setup.py
https://pypi.python.org/packages/source/p/pip/pip-1.5.6.tar.gz
https://pypi.python.org/packages/source/s/selenium/selenium-2.44.0.tar.gz
https://pypi.python.org/packages/source/s/splinter/splinter-0.6.0.tar.gz
下面库是后台使用也可以在后面安装
https://pypi.python.org/packages/source/z/zope.testbrowser/zope.testbrowser-4.0.4.zip

学习部分,简化实例
实例化
from splinter import Browser
browser = Browser()

基础部分:
访问部分 browser.visit(‘www.xxx.com’)
填表部分 browser.fill(‘hello’)
单击按钮 browser.find_by_name(‘button’).click()
获取返回值(用if判断) browser.is_text_present(‘OK’)
退出部分 browser.quit()
选择不同浏览器 browser = Browser(‘firefox’)
重新导入 browser.reload()
后退 browser.back()
前进 browser.forward()
获取title browser.title
获取页面 browser.html
获取当前url browser.url
模拟浏览器头 browser = Browser(user_agent=”Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en)”)

常用部分:
查找相关元素,可以使用火狐的F12(大家应该懂)。
browser.find_by_css(‘h1’)
browser.find_by_xpath(‘//h1’)
browser.find_by_tag(‘h1’)
browser.find_by_name(‘name’)
browser.find_by_id(‘firstheader’)
browser.find_by_value(‘query’)

获取该元素的开始 first_found = browser.find_by_name(‘name’).first
获取该元素的结尾 last_found = browser.find_by_name(‘name’).last
获取该元素的索引 second_found = browser.find_by_name(‘name’)[1]

查找相关连接
links_found = browser.find_link_by_text(‘Link for Example.com’)
links_found = browser.find_link_by_partial_text(‘for Example’)
links_found = browser.find_link_by_href(‘http://example.com’)
links_found = browser.find_link_by_partial_href(‘example’)
获取某个元素的值 browser.find_by_css(‘h1’).first.value

查找div
divs = browser.find_by_tag(“div”)
within_elements = divs.first.find_by_name(“name”)

鼠标移动到某资源 browser.find_by_tag(‘h1’).mouse_over()
鼠标移开某资源 browser.find_by_tag(‘h1’).mouse_out()
双击某元素 browser.find_by_tag(‘h1’).double_click()
鼠标右键某资源 browser.find_by_tag(‘h1’).right_click()
点击相关连接
browser.click_link_by_href(‘http://www.the_site.com/my_link’)
browser.click_link_by_partial_href(‘my_link’)
browser.click_link_by_text(‘my link’)
browser.click_link_by_partial_text(‘part of link text’)

选择 browser.choose(‘some-radio’, ‘radio-value’)
选择项目 browser.check(‘some-check’)
取消选项 browser.uncheck(‘some-check’)
选择下拉菜单 browser.select(‘uf’, ‘rj’)

针对ajax部分
对某个焦点 browser.is_text_present(‘splinter’, wait_time=10)
browser.is_text_not_present(‘text not present’, wait_time=10)
browser.is_text_not_present(‘text not present’, wait_time=10)
直接执行脚本

browser.execute_script('document.getElementById("body").innerHTML="

Hello world!

"')

cookie部分
添加cookie browser.cookies.add({‘whatever’: ‘and ever’})
获取所有cookie browser.cookies.all()
删除cookie值 browser.cookies.delete(‘mwahahahaha’)
删除所有cookie browser.cookies.delete() #deletes all cookies

浏览器状态 browser.status_code.code
获取异常部分 HttpResponseError, e:

后台执行部分
http://splinter.cobrateam.info/docs/drivers/zope.testbrowser.html

实例代码,模拟51job刷新部分(比较复杂,基本模拟了主流的点击操作)。

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    grab-centreon.py
# Revision:    
# Date:        2014-10-29
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
from splinter import Browser
from time import sleep

url = 'www.51job.com'
name = 'kkkkkkk'
passwd = '9999999'
mybr = Browser()
 
# 按照id发送账号密码
mybr.visit("http://%s" % url)
sleep(2)
mybr.find_by_id('username').fill(name)    
mybr.find_by_id('userpwd').fill(passwd)
# 按照tag顺序点击
mybr.find_by_tag('input').click()
sleep(15)
mybr.find_by_id('window_fans_close').click()
sleep(3)
# 按照css查找按钮,点击开刷新页面
aa = mybr.find_by_css('a.refresh')
aa.click()
sleep(3)
# 直接执行刷新脚本,此部分需要根据自己编号进行修改
mybr.execute_script("Save_RefreshResume('000000','0','http://my.51job.com');")
# 直接找到刷新关闭页面
mybr.execute_script("javascript:Refresh_success.close();Refresh_success_LastUpdate(mystr[1]);")
# 按照连接找到位置进行点击
mybr.click_link_by_href('http://my.51job.com/my/My_SignOut.php')
sleep(2)
# 关闭浏览器
mybr.quit()
10月 13

trondb 返回值简单处理

trondb在处理数据库update部分后,因为没有返回值,所以报错如下。
File “/usr/local/lib/python2.7/site-packages/torndb.py”, line 137, in query
column_names = [d[0] for d in cursor.description]
TypeError: ‘NoneType’ object is not iterable

我的原则是不修改框架内容,将来框架升级了此问题可能忘了修改,如果不升级,框架有了漏洞不升级也不现实。所以还是修改自己代码比较现实。
直接在提交数据后用try屏蔽,然后在用select进行一次查询。

def init_pass(user, passwd):
    try:
        idb.query('UPDATE user SET passwd=\'%s\' where user=\'%s\'' % (passwd, user))
    except:
        pass
    _v = odb.query('SELECT count(*) as count FROM user WHERE passwd=\'%s\' and user=\'%s\'' % (passwd, user))
    if int(_v[0]['count']) == 1:
        _get = 'OK'
    else:
        _get = 'ERROR'
    return _get
9月 25

CentOS6.5_64 安装python3.4.0rc3 测试

CentOS6.5_x64操作系统自带的是python2.6.6r266。
升级操作系统
yum makecache
yum -y update
yum -y upgrade
yum -y install gcc* gcc-c++ sysstat ncurses ncurses-devel make automake openssl* zlib*
yum -y install vim ntpdate lsof setuptool net-snmp tcpdump zip unzip svn wget curl git
yum -y install libevent libevent-devel uuid-devel libboost libboost-devel gd gd-*
yum -y install crontabs net-tools ntpdate
yum -y install mysql-client mysql-devel
yum -y groupinstall “Chinese Support”
ntpdate pool.ntp.org

开始安装python
wget https://www.python.org/ftp/python/3.4.0/Python-3.4.0rc3.tgz

/bin/tar zxvf Python-3.4.0rc3.tgz
cd Python-3.4.0rc3
./configure
make && make install

运行速度测试代码参考:http://www.simonzhang.net/?p=1844

python 2.7.6rc1 测试结果如下:

20.5176548958

real 0m20.568s
user 0m20.546s
sys 0m0.025s

Python-3.4.0rc3 测试结果如下:

18.61501383781433

real 0m18.674s
user 0m18.656s
sys 0m0.010s

python-3.4 自带pip工具
pip3 install PyMySQL
pip3 install tornado
pip3 install torndb

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    testpython3.py
# Revision:    1.0
# Date:        2014-09-22
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
import pymysql

def sql_comm(sql_run,comm):
    try:
        conn = pymysql.connect(host = '192.168.0.1', \
                               port = 3306, \
                               user = 'test', passwd = '123456', \
                               db = 'tt', charset = 'utf8', \
                               connect_timeout = 30)
    except pymysql.MySQLError:
        pass
    cur = conn.cursor()
    try:
        cur.execute(sql_run)
        if comm == 'y':
            cur.execute("commit")
        result_set = cur.fetchall()
        return(result_set)
        cur.close()
        db.close()
    except:
        pass


if __name__== '__main__':
    sql_run = "select count(*) from user"
    comm = "n"
    _get = sql_comm(sql_run, comm)
    print(_get)

总结python3的语法更标准化,但是速度提升不明显。通过查询资料显示第三方库的稳定性也是不特别好。所以当前我还是老老实实使用python2。

9月 13

eventlet 学习测试

  eventlet是一款使用Python编写的为高并发的网络编程而设计的库。它通过greenlet提供的协程功能,让开发者可以不用将以往的多线程等并发程序的开发方式转变成异步状态机模型,就能直接使用select/epoll/kqueue等操作系统提供的支持高并发IO接口,并且能尽可能地发挥它们在并发上的优势。
  我用eventlet写一个服务端的socket接口,客户端的测试使用thread进行连接测试。代码如下。

服务端

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    teve.py
# Revision:    1.0
# Date:        2014-09-13
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
import eventlet
from string import strip

# 根据客户端的发送进行返回
def welcome(str):
    _str = str
    hui = ""
    if "%s" % _str == "hi":
        hui = "你好"
    elif "%s" % _str == "hello":
        hui = "hi"
    return hui

def handle(client, add):
    error_count = 0
    while True:
        c = ""
        c = client.recvfrom(2048)[0].strip()
		# 如果循环10次没有取到数据则断开
        if len(c) == 0:
            pass
            error_count += 1
            if error_count > 10:
                client.shutdown(2)
                break
        else:
            _re = welcome(c)
            error_count = 0
            client.sendto(_re, add)

# 启动监听1300端口
server = eventlet.listen(('0.0.0.0', 1300))
# 创建5000线程,如果不填默认1000
pool = eventlet.GreenPool(5000)
# 循环监听
while True:
    new_sock, address = server.accept()
    pool.spawn_n(handle, new_sock, address)

客户端

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    tcptest.py
# Revision: 
# Date:        2014-09-13
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO

####加载多线程模块
import threading
####需要个随机数和延迟,为测试用
import random
from time import sleep
from socket import *

HOST='192.168.1.112'
#HOST='192.168.1.109'
PORT=1300
BUFSIZE=1024
ADDR=(HOST, PORT)


#### 多线程运行的测试部分。循环3次,每次间隔0到2的随机秒数,
#### 等待后打印,运行总次数,线程数和循环值
def test_func(thread_number,sequence):
    print "thread %s" % thread_number
    tcpCliSock=socket(AF_INET, SOCK_STREAM)
    tcpCliSock.connect(ADDR)
    tcpCliSock.send("hi")
    data=tcpCliSock.recv(BUFSIZE)
    print data
    sleep(180)
    #tcpCliSock.send("hello")
    ##data=tcpCliSock.recv(BUFSIZE)
    #print data
    tcpCliSock.close()

def main():
    #### 定义循环序列,就是一个线程池
    threads = []
    #### 定义总共运行的次数
    all_number = 100000
    #### 定义运行所使用的线程数
    thread_lines = 300
    #### 定义开始线程数
    start_line = 0
    #### 首先构建线程池
    for i in range(0,thread_lines):  
        t = threading.Thread(target=test_func, args=(i,start_line,))  
        threads.append(t)
        start_line +=1
    #### 运行第一批线程的任务
    for t in threads:  
        t.start()  
    #### 循环运行全部任务
    for number_line in xrange(start_line,all_number):
        #### 初始化当前线程的状态
        thread_status = False
        #### 初始化检查循环线程的开始值
        loop_line = 0
        #### 开始循环检查线程池中的线程状态
        while thread_status == False :
            #### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
            #### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
            #### 如果线程池中线程全部在运行,则开始从头检查
            if threads[loop_line].isAlive() == False :
                t = threading.Thread(target=test_func, args=(loop_line,number_line,))
                threads[loop_line]=t
                threads[loop_line].start()
                thread_status = True
            else:
                if loop_line >= thread_lines-1 :
                    loop_line=0
                else:
                    loop_line+=1

if __name__ == "__main__":
    main()

  测试过程。服务端放在阿里云服务器上,单CPU,内存1G,带宽3M,CentOS6.5 64位。使用pypy启动。操作系统参数如下:
net.ipv4.conf.default.arp_announce = 2
net.ipv4.conf.all.arp_announce=2
vm.swappiness = 50
net.core.netdev_max_backlog = 2048
net.core.somaxconn = 250000
net.ipv4.tcp_max_tw_buckets = 5000
net.ipv4.tcp_max_syn_backlog = 1024
net.ipv4.tcp_synack_retries = 2
net.ipv4.conf.lo.arp_announce=2
fs.file-max=12000000
fs.nr_open=11000000

ulimit -n 为 65535

  客户端使用自己笔记本的虚拟机、raspberry pi和一台云主机。然后用bash脚本调用多个客户端,虚拟机上python启动10个客户端,在raspberry pi上使用pypy启动6个客户端。云主机上一个200连接。

  启动后服务端系统指标如下图。
tcpevenlet-top
tcpeventlet-count
  使用telnet连接1300端口信息回复正常。
  查看系统日志如下,被认为是洪水攻击
Sep 13 22:34:06 iZ23i076qv9Z kernel: possible SYN flooding on port 1300. Sending cookies.
修改系统参数如下,问题暂时环节。
net.ipv4.tcp_max_syn_backlog = 4096

  总结eventlet的性能确实不一般,因为连接后操作并不多,所以cpu基本没有使用,跑了一会,内存使用也下降到800M。操作系统使用内存300M到400M。
  综上所述,当前连接1万连接应该没有问题。实际应用中根据单个务处理需要的CPU和并发数算出CPU需要量,内存为1G可以1到2万并发,C10K问题解决。golang也支持协程,但是不知道能不能高出一个数量级。

5月 23

python 生成统计图

  想用python+matplotlib生成每天用户使用的统计图。测试环境,CentOS6.5,数据存放于mysql中,python2.7,matplotlib使用easy_install安装。

  首先建一个表。

CREATE TABLE `member` (
  `id` int(1) NOT NULL AUTO_INCREMENT,
  `user` varchar(15) DEFAULT NULL,
  `access_date` datetime DEFAULT NULL,
  `mark` varchar(1) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

  测试数据部分还是写个python脚本生成,这样批量做就方便了。

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    main.py
# Revision:    1.0
# Date:        2012-06-14
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
import time
import random

# 基础设置
dict = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
        'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
        'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
        'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z']
# 生成日期的unix起始时间
create_unix_time = 1396281600
# 生成每次日期的间隔时间。单位是小时。
hour_skip_start = 1
hour_skip_end = 12

def main():
    start_unix_time = create_unix_time 
    f = open('create.sql', 'wb')
    # 开始循环生成sql
    for i in range(280):
        # 生成用户名
        user_name = ''
        for j in range(random.randint(1,5)):
            user_name = user_name+random.choice(dict)
        # 生成时间
        start_unix_time = start_unix_time+(random.randint(hour_skip_start, hour_skip_end)*3600)
        get_create_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_unix_time))
        f.write("INSERT INTO `member` VALUES ('%s', '%s', '%s', null);\n" % (i, user_name, get_create_time))
    f.close()

if __name__ == '__main__':
    main()

  开始正式部分,获取数据并生成图片。

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    main.py
# Revision:    1.0
# Date:        2012-06-14
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
import time
import MySQLdb
from pylab import *
from matplotlib.font_manager import FontProperties

#### set
nearest_day = 32

def create_pic(memberData):
    print memberData
    # 将数据x,y整理两个列表
    x_date = []
    y_count = []
    for i in memberData:
        x_date.append(i[2])
        y_count.append(i[1])
    # 将x轴日期元组化
    zu = []
    da = []
    for j in range(1, len(x_date)/5+1):
        zu.append(j*5)
        da.append(x_date[j*5])
    # 开始生成表
    font = FontProperties(fname=r"/usr/share/fonts/truetype/freefont/msyh.ttf", size=10) 
    cla()
    label = u'测试' 
    plot(y_count)
    # x和y轴定义
    xlabel(u'日期', fontproperties=font)
    ylabel(u'统计数量', fontproperties=font)
    xticks(tuple(zu), tuple(da), fontproperties=font)
    title(u'统计', fontproperties=font)
    grid(True)
    legend(loc = 'lower right')
    savefig("test1.png")

def main():
    try: 
        conn=MySQLdb.connect(host='115.28.42.253',user='test',passwd='123456',db='pymatplotlib',port=3306,charset='utf8')
        cur=conn.cursor()
        start_date = time.strftime('%Y-%m-%d', time.localtime(time.time()-3600*24*nearest_day))
        end_date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
        my_sql = "SELECT id,count(*),DATE_FORMAT(access_date, '%Y-%m-%d' ) \
                  FROM member \
                  WHERE access_date BETWEEN '"+start_date+"' and '"+end_date+"' \
                  GROUP BY DATE_FORMAT(access_date, '%Y-%m-%d' ) "
        print my_sql
        cur.execute(my_sql)
        cds=cur.fetchall()
        conn.close()
    except MySQLdb.Error,e:
        print "Mysql Error %d: %s" % (e.args[0], e.args[1])
    create_pic(cds)


if __name__ == '__main__':
    main()

  运行脚本完成,生成图片如下:
mysql数据生成图片

试验代码部分