11月 10

python 版 crontab tornado调用

  当前使用tornado框架,但是系统里有很多需要定时执行的报表,为了系统迁移方便,并且定时任务随web启停,系统的Crontab不能满足需求。
  通过网上查找,虽然python里有sched模块和第三方apsched,但是感觉不能满足当前需要。自己动手写一个类似crontab。
  代码如下,功能基本和语法与系统crontab相似,只是处理方式不同,我命名为pycrontab.py。处理到秒怕精度不够,所以也只是到分。有需要的自行修改。
  最后用tornado框架的callback机制定时调用pycrontab.py处理。

  pycrontab.conf为配置文件。pycrontab.py处理配置文件里的内容。tornado用的是官方的演示代码。

#!/bin/env python
# -*- coding:utf-8 -*-
# Filename:
# Revision:  
# Date:        2014-11-10
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
import re
import sys
import time

# 时间范围。格式为:分、小时、日、月、周
time_frame = [[0, 60], [0, 24], [1, 32], [1, 13], [0, 7]]

# 检查时间是否满足运行条件
def check_time(set_time, now_time, time_frame_num):
    # 初始化数据
    _skip = 1
    _time = []
    _status = "NO"
    _start_time = time_frame[time_frame_num][0]
    _end_time = time_frame[time_frame_num][1]
    # 按优先级处理标示符
    # 处理时间分组
    for list_time in set_time.split(','):
        # 处理判断间隔
        if list_time.find('/') > -1:
            _skip = int(list_time.split('/')[1])
        time_loop = list_time.split('/')[0]
        # 处理时间范围
        if time_loop.find('-') > -1:
            cut_time = time_loop.split('-')
            fri_time = int(cut_time[0])
            sec_time = int(cut_time[1])
            if fri_time > sec_time:
                for add_time_1 in xrange(fri_time, _end_time, _skip):
                    _time.append(add_time_1)
                for add_time_2 in xrange(_start_time, sec_time, _skip):
                    _time.append(add_time_2)
            else:
                for add_time_3 in xrange(fri_time, sec_time, _skip):
                    _time.append(add_time_3)
        else:
            # 如果为*放在最后判断
            if str(time_loop) != "*":
                _time.append(int(time_loop))
    if (int(now_time) in _time) or (str(set_time) == "*"):
        _status = "OK"
    return _status


# 运行命令行
def run_comm(comm_line):
    cut_comm_line = comm_line.split()
    # 组装运行命令
    _run_comm_line = ""
    for lo in xrange(5,len(cut_comm_line)):
        _run_comm_line = "%s %s" % (_run_comm_line, cut_comm_line[lo])
    from os import system
    system(_run_comm_line)


def loop_pycrontab_line(pycrontab_lines):
    # 初始化当前时间 
    get_now_time_list = []
    get_now_time = time.localtime()
    get_now_mon = get_now_time[1]
    get_now_day = get_now_time[2]
    get_now_hou = get_now_time[3]
    get_now_min = get_now_time[4]
    get_now_sec = get_now_time[5]
    get_now_wek = get_now_time[6]+1
    if get_now_wek == 7:
        get_now_wek = 0
    get_now_time_list = [get_now_min, \
                         get_now_hou, \
                         get_now_day, \
                         get_now_mon, \
                         get_now_wek]
    # 循环处理每条定时任务
    for single_line in pycrontab_lines:
        run_status = "OK"
        # 判断回车
        if single_line[-1] == '\n':
            single_line = single_line[:-1]
        # 判断每行时间格式是否正确
        split_pycrontab_line = single_line.split()
        for single_char in xrange(5):
            if len(re.findall("[\*|0-9|\/|\,|-]{0,20}", split_pycrontab_line[single_char])) > 2:
                run_status = "Error"
        if run_status == "OK":
            # 判断是否需要执行
            for lo in xrange(5):
                if check_time(split_pycrontab_line[lo], get_now_time_list[lo], lo) == "NO":
                    run_status = "NO"
            # 判断是否运行
            if run_status == "OK":
                run_comm(single_line)
        else:
            print "Error %s" % single_line
 

def main():
    # 
    try:
        with open("pycrontab.conf") as pycrontab_conf_file:  
            pycrontab_conf = pycrontab_conf_file.readlines()
    except:
        print "pycrontab.conf file ERROR!"
        sys.exit(1)
    loop_pycrontab_line(pycrontab_conf)



if __name__ == "__main__":
    main()
#!/bin/env python
# -*- coding:utf-8 -*-
# Date:        2014-11-10
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
from tornado import ioloop, web

def pycrontab():            
   import time
   print time.time()

class MainHandler(web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = web.Application([
    (r"/", MainHandler),
])

if __name__ == "__main__":
    application.listen(8891)
    ioloop.PeriodicCallback(pycrontab, 60000).start()
    ioloop.IOLoop.instance().start()

python版crontab源码下载

11月 06

Linux下用python处理excel

安装模块部分
pip install xlrd
pip install xlwt

读取常用部分
导入模块
import xlrd

打开Excel文件读取数据
data = xlrd.open_workbook(‘excelFile.xlsx’)

需要修改coding的,普通文件默认即可
data = xlrd.open_workbook(‘excelFile.xlsx’, encoding=’cp1242′)

打印文件sheet数量
data.nsheets

打印文件sheet名字
data.sheet_names()

根据ID获取一个工作表
table = data.sheets()[0]

根据名称获取工作表
table = data.sheet_by_name(u’Sheet2′)

获取整行或整列的值,返回值为数组
table.row_values(i)
table.col_values(i)

获取行数或列数
nrows = table.nrows
ncols = table.ncols

循环行列表数据
for i in range(nrows):
print table.row_values(i)

单元格
cell_A1 = table.cell(0,0).value
cell_C4 = table.cell(2,3).value

使用行列索引
cell_A1 = table.row(0)[0].value
cell_A2 = table.col(1)[0].value

打印单元格值
print cell_A1.value

数据类型(ctype)
0 empty
1 text
2 number
3 date
4 boolean
5 error
简单写入,xf为扩展
table.put_cell(row, col, ctype, value, xf)
table.put_cell(0,1,1,’test’,0)

写入常用部分
创建文件
from xlwt import Workbook
data = Workbook()

如需修改编码
data = Workbook(encoding=’utf-8′)

添加sheet
sheet = data.add_sheet(‘test sheet’)

写入数据
写法一
sheet.write(0, 0, ‘A1’)
写法二
row = sheet.row(1)
row.write(1, ‘B2′)

写入不同格式的数据
row.write(0, u’\xa3’)
row.write(1, ‘text’)
row.write(2, 3.14159)
row.write(3, 15)
row.write(4, 265L)
row.write(5, Decimal(‘3.65’))
row.write(6, date(214,11,5))
row.write(7, datetime(2014,11,5,17,0,1))
row.write(8, time(17,1))
row.write(9, False)
row.write(10, ”, Style.easyxf(‘pattern:pattern solid, fore_colour green;’))
row.write(11, Formula(“$A$!+SUM(‘testsheet’!$A$!:$B$2)”))

写入数据
sheet.flush_row_data()

保存数据
data.save(‘testExcel.xlsx’)

更多高级功能,比如对齐、链接、增加图片等详见pdf文件

测试脚本

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:
# Revision:   
# Date:        2014-11-5
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
import xlrd
from xlwt import Workbook

out_data = xlrd.open_workbook('/root/ab.xlsx')
in_data = Workbook()

# 获取第一张表
out_table = out_data.sheet_by_index(0)
# 获取列数
ncols = out_table.ncols
# 获取行数
nrows = out_table.nrows
# 创建sheet
in_table = in_data.add_sheet('test sheet')
# 循环写入数据(不判断输出格式,某些情况会有问题。比如时间)
for r in range(nrows):
    for c in range(ncols):
        in_table.write(r,c,out_table.cell(r,c).value)
# 写入文件
in_data.save('ab2.xls')

参考:
http://www.simplistix.co.uk/presentations/python-excel.pdf
http://www.cnblogs.com/lhj588/archive/2012/01/06/2314181.html

10月 29

splinter自动化测试工具学习

splinter自动化测试工具学习。模拟浏览器操作。

官方标文档:http://splinter.cobrateam.info/docs/

测试环境如下:
windows 8 64位操作系统安装python2.7.8

window按顺序安装下面的包,也可以直接用pip安装。
https://bootstrap.pypa.io/ez_setup.py
https://pypi.python.org/packages/source/p/pip/pip-1.5.6.tar.gz
https://pypi.python.org/packages/source/s/selenium/selenium-2.44.0.tar.gz
https://pypi.python.org/packages/source/s/splinter/splinter-0.6.0.tar.gz
下面库是后台使用也可以在后面安装
https://pypi.python.org/packages/source/z/zope.testbrowser/zope.testbrowser-4.0.4.zip

学习部分,简化实例
实例化
from splinter import Browser
browser = Browser()

基础部分:
访问部分 browser.visit(‘www.xxx.com’)
填表部分 browser.fill(‘hello’)
单击按钮 browser.find_by_name(‘button’).click()
获取返回值(用if判断) browser.is_text_present(‘OK’)
退出部分 browser.quit()
选择不同浏览器 browser = Browser(‘firefox’)
重新导入 browser.reload()
后退 browser.back()
前进 browser.forward()
获取title browser.title
获取页面 browser.html
获取当前url browser.url
模拟浏览器头 browser = Browser(user_agent=”Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en)”)

常用部分:
查找相关元素,可以使用火狐的F12(大家应该懂)。
browser.find_by_css(‘h1’)
browser.find_by_xpath(‘//h1’)
browser.find_by_tag(‘h1’)
browser.find_by_name(‘name’)
browser.find_by_id(‘firstheader’)
browser.find_by_value(‘query’)

获取该元素的开始 first_found = browser.find_by_name(‘name’).first
获取该元素的结尾 last_found = browser.find_by_name(‘name’).last
获取该元素的索引 second_found = browser.find_by_name(‘name’)[1]

查找相关连接
links_found = browser.find_link_by_text(‘Link for Example.com’)
links_found = browser.find_link_by_partial_text(‘for Example’)
links_found = browser.find_link_by_href(‘http://example.com’)
links_found = browser.find_link_by_partial_href(‘example’)
获取某个元素的值 browser.find_by_css(‘h1’).first.value

查找div
divs = browser.find_by_tag(“div”)
within_elements = divs.first.find_by_name(“name”)

鼠标移动到某资源 browser.find_by_tag(‘h1’).mouse_over()
鼠标移开某资源 browser.find_by_tag(‘h1’).mouse_out()
双击某元素 browser.find_by_tag(‘h1’).double_click()
鼠标右键某资源 browser.find_by_tag(‘h1’).right_click()
点击相关连接
browser.click_link_by_href(‘http://www.the_site.com/my_link’)
browser.click_link_by_partial_href(‘my_link’)
browser.click_link_by_text(‘my link’)
browser.click_link_by_partial_text(‘part of link text’)

选择 browser.choose(‘some-radio’, ‘radio-value’)
选择项目 browser.check(‘some-check’)
取消选项 browser.uncheck(‘some-check’)
选择下拉菜单 browser.select(‘uf’, ‘rj’)

针对ajax部分
对某个焦点 browser.is_text_present(‘splinter’, wait_time=10)
browser.is_text_not_present(‘text not present’, wait_time=10)
browser.is_text_not_present(‘text not present’, wait_time=10)
直接执行脚本

browser.execute_script('document.getElementById("body").innerHTML="

Hello world!

"')

cookie部分
添加cookie browser.cookies.add({‘whatever’: ‘and ever’})
获取所有cookie browser.cookies.all()
删除cookie值 browser.cookies.delete(‘mwahahahaha’)
删除所有cookie browser.cookies.delete() #deletes all cookies

浏览器状态 browser.status_code.code
获取异常部分 HttpResponseError, e:

后台执行部分
http://splinter.cobrateam.info/docs/drivers/zope.testbrowser.html

实例代码,模拟51job刷新部分(比较复杂,基本模拟了主流的点击操作)。

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    grab-centreon.py
# Revision:    
# Date:        2014-10-29
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
from splinter import Browser
from time import sleep

url = 'www.51job.com'
name = 'kkkkkkk'
passwd = '9999999'
mybr = Browser()
 
# 按照id发送账号密码
mybr.visit("http://%s" % url)
sleep(2)
mybr.find_by_id('username').fill(name)    
mybr.find_by_id('userpwd').fill(passwd)
# 按照tag顺序点击
mybr.find_by_tag('input').click()
sleep(15)
mybr.find_by_id('window_fans_close').click()
sleep(3)
# 按照css查找按钮,点击开刷新页面
aa = mybr.find_by_css('a.refresh')
aa.click()
sleep(3)
# 直接执行刷新脚本,此部分需要根据自己编号进行修改
mybr.execute_script("Save_RefreshResume('000000','0','http://my.51job.com');")
# 直接找到刷新关闭页面
mybr.execute_script("javascript:Refresh_success.close();Refresh_success_LastUpdate(mystr[1]);")
# 按照连接找到位置进行点击
mybr.click_link_by_href('http://my.51job.com/my/My_SignOut.php')
sleep(2)
# 关闭浏览器
mybr.quit()