9月 13

eventlet 学习测试

  eventlet是一款使用Python编写的为高并发的网络编程而设计的库。它通过greenlet提供的协程功能,让开发者可以不用将以往的多线程等并发程序的开发方式转变成异步状态机模型,就能直接使用select/epoll/kqueue等操作系统提供的支持高并发IO接口,并且能尽可能地发挥它们在并发上的优势。
  我用eventlet写一个服务端的socket接口,客户端的测试使用thread进行连接测试。代码如下。

服务端

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    teve.py
# Revision:    1.0
# Date:        2014-09-13
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
import eventlet
from string import strip

# 根据客户端的发送进行返回
def welcome(str):
    _str = str
    hui = ""
    if "%s" % _str == "hi":
        hui = "你好"
    elif "%s" % _str == "hello":
        hui = "hi"
    return hui

def handle(client, add):
    error_count = 0
    while True:
        c = ""
        c = client.recvfrom(2048)[0].strip()
		# 如果循环10次没有取到数据则断开
        if len(c) == 0:
            pass
            error_count += 1
            if error_count > 10:
                client.shutdown(2)
                break
        else:
            _re = welcome(c)
            error_count = 0
            client.sendto(_re, add)

# 启动监听1300端口
server = eventlet.listen(('0.0.0.0', 1300))
# 创建5000线程,如果不填默认1000
pool = eventlet.GreenPool(5000)
# 循环监听
while True:
    new_sock, address = server.accept()
    pool.spawn_n(handle, new_sock, address)

客户端

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    tcptest.py
# Revision: 
# Date:        2014-09-13
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO

####加载多线程模块
import threading
####需要个随机数和延迟,为测试用
import random
from time import sleep
from socket import *

HOST='192.168.1.112'
#HOST='192.168.1.109'
PORT=1300
BUFSIZE=1024
ADDR=(HOST, PORT)


#### 多线程运行的测试部分。循环3次,每次间隔0到2的随机秒数,
#### 等待后打印,运行总次数,线程数和循环值
def test_func(thread_number,sequence):
    print "thread %s" % thread_number
    tcpCliSock=socket(AF_INET, SOCK_STREAM)
    tcpCliSock.connect(ADDR)
    tcpCliSock.send("hi")
    data=tcpCliSock.recv(BUFSIZE)
    print data
    sleep(180)
    #tcpCliSock.send("hello")
    ##data=tcpCliSock.recv(BUFSIZE)
    #print data
    tcpCliSock.close()

def main():
    #### 定义循环序列,就是一个线程池
    threads = []
    #### 定义总共运行的次数
    all_number = 100000
    #### 定义运行所使用的线程数
    thread_lines = 300
    #### 定义开始线程数
    start_line = 0
    #### 首先构建线程池
    for i in range(0,thread_lines):  
        t = threading.Thread(target=test_func, args=(i,start_line,))  
        threads.append(t)
        start_line +=1
    #### 运行第一批线程的任务
    for t in threads:  
        t.start()  
    #### 循环运行全部任务
    for number_line in xrange(start_line,all_number):
        #### 初始化当前线程的状态
        thread_status = False
        #### 初始化检查循环线程的开始值
        loop_line = 0
        #### 开始循环检查线程池中的线程状态
        while thread_status == False :
            #### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
            #### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
            #### 如果线程池中线程全部在运行,则开始从头检查
            if threads[loop_line].isAlive() == False :
                t = threading.Thread(target=test_func, args=(loop_line,number_line,))
                threads[loop_line]=t
                threads[loop_line].start()
                thread_status = True
            else:
                if loop_line >= thread_lines-1 :
                    loop_line=0
                else:
                    loop_line+=1

if __name__ == "__main__":
    main()

  测试过程。服务端放在阿里云服务器上,单CPU,内存1G,带宽3M,CentOS6.5 64位。使用pypy启动。操作系统参数如下:
net.ipv4.conf.default.arp_announce = 2
net.ipv4.conf.all.arp_announce=2
vm.swappiness = 50
net.core.netdev_max_backlog = 2048
net.core.somaxconn = 250000
net.ipv4.tcp_max_tw_buckets = 5000
net.ipv4.tcp_max_syn_backlog = 1024
net.ipv4.tcp_synack_retries = 2
net.ipv4.conf.lo.arp_announce=2
fs.file-max=12000000
fs.nr_open=11000000

ulimit -n 为 65535

  客户端使用自己笔记本的虚拟机、raspberry pi和一台云主机。然后用bash脚本调用多个客户端,虚拟机上python启动10个客户端,在raspberry pi上使用pypy启动6个客户端。云主机上一个200连接。

  启动后服务端系统指标如下图。
tcpevenlet-top
tcpeventlet-count
  使用telnet连接1300端口信息回复正常。
  查看系统日志如下,被认为是洪水攻击
Sep 13 22:34:06 iZ23i076qv9Z kernel: possible SYN flooding on port 1300. Sending cookies.
修改系统参数如下,问题暂时环节。
net.ipv4.tcp_max_syn_backlog = 4096

  总结eventlet的性能确实不一般,因为连接后操作并不多,所以cpu基本没有使用,跑了一会,内存使用也下降到800M。操作系统使用内存300M到400M。
  综上所述,当前连接1万连接应该没有问题。实际应用中根据单个务处理需要的CPU和并发数算出CPU需要量,内存为1G可以1到2万并发,C10K问题解决。golang也支持协程,但是不知道能不能高出一个数量级。

5月 23

python 生成统计图

  想用python+matplotlib生成每天用户使用的统计图。测试环境,CentOS6.5,数据存放于mysql中,python2.7,matplotlib使用easy_install安装。

  首先建一个表。

CREATE TABLE `member` (
  `id` int(1) NOT NULL AUTO_INCREMENT,
  `user` varchar(15) DEFAULT NULL,
  `access_date` datetime DEFAULT NULL,
  `mark` varchar(1) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

  测试数据部分还是写个python脚本生成,这样批量做就方便了。

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    main.py
# Revision:    1.0
# Date:        2012-06-14
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
import time
import random

# 基础设置
dict = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
        'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
        'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
        'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z']
# 生成日期的unix起始时间
create_unix_time = 1396281600
# 生成每次日期的间隔时间。单位是小时。
hour_skip_start = 1
hour_skip_end = 12

def main():
    start_unix_time = create_unix_time 
    f = open('create.sql', 'wb')
    # 开始循环生成sql
    for i in range(280):
        # 生成用户名
        user_name = ''
        for j in range(random.randint(1,5)):
            user_name = user_name+random.choice(dict)
        # 生成时间
        start_unix_time = start_unix_time+(random.randint(hour_skip_start, hour_skip_end)*3600)
        get_create_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_unix_time))
        f.write("INSERT INTO `member` VALUES ('%s', '%s', '%s', null);\n" % (i, user_name, get_create_time))
    f.close()

if __name__ == '__main__':
    main()

  开始正式部分,获取数据并生成图片。

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    main.py
# Revision:    1.0
# Date:        2012-06-14
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO
import time
import MySQLdb
from pylab import *
from matplotlib.font_manager import FontProperties

#### set
nearest_day = 32

def create_pic(memberData):
    print memberData
    # 将数据x,y整理两个列表
    x_date = []
    y_count = []
    for i in memberData:
        x_date.append(i[2])
        y_count.append(i[1])
    # 将x轴日期元组化
    zu = []
    da = []
    for j in range(1, len(x_date)/5+1):
        zu.append(j*5)
        da.append(x_date[j*5])
    # 开始生成表
    font = FontProperties(fname=r"/usr/share/fonts/truetype/freefont/msyh.ttf", size=10) 
    cla()
    label = u'测试' 
    plot(y_count)
    # x和y轴定义
    xlabel(u'日期', fontproperties=font)
    ylabel(u'统计数量', fontproperties=font)
    xticks(tuple(zu), tuple(da), fontproperties=font)
    title(u'统计', fontproperties=font)
    grid(True)
    legend(loc = 'lower right')
    savefig("test1.png")

def main():
    try: 
        conn=MySQLdb.connect(host='115.28.42.253',user='test',passwd='123456',db='pymatplotlib',port=3306,charset='utf8')
        cur=conn.cursor()
        start_date = time.strftime('%Y-%m-%d', time.localtime(time.time()-3600*24*nearest_day))
        end_date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
        my_sql = "SELECT id,count(*),DATE_FORMAT(access_date, '%Y-%m-%d' ) \
                  FROM member \
                  WHERE access_date BETWEEN '"+start_date+"' and '"+end_date+"' \
                  GROUP BY DATE_FORMAT(access_date, '%Y-%m-%d' ) "
        print my_sql
        cur.execute(my_sql)
        cds=cur.fetchall()
        conn.close()
    except MySQLdb.Error,e:
        print "Mysql Error %d: %s" % (e.args[0], e.args[1])
    create_pic(cds)


if __name__ == '__main__':
    main()

  运行脚本完成,生成图片如下:
mysql数据生成图片

试验代码部分

5月 15

centos 6.5 下编译测试pypy2.3

  编译环境使用dell服务器,CPU主频2.6GHz,内存32G,操作系统使用CentOS6.5 64位,python为2.7。简单操作记录如下。

yum install gcc* make libffi libffi-devel expat expat-devel \
       bzip2 bzip2-devel libncurses-devel libssl-dev libgc-dev
wget https://bitbucket.org/pypy/pypy/downloads/pypy-2.3-src.tar.bz2
tar jxvf pypy-2.3-src.tar.bz2
mv pypy-pypy-394146e9bb67 /usr/local/pypy
cd /usr/local/pypy
python rpython/bin/rpython -Ojit pypy/goal/targetpypystandalone.py

  耐心等待一个多小时(104m56.034s)编译完成。到pypy目录下看到pypy-c的可执行文件。

rpyhon 编译参数
-O : 编译为字节码后的优化级别。默认为2,此处使用的jit。

级别说明:
0 – 关闭优化功能,编译速度最快。使用的是系统垃圾回收器(Debian package libgc-dev)。0和1级别外都是使用的内置垃圾回收器。0和1运行速度会非常慢。
1 – 使用非耗时的优化
size – 最大限度的减小可执行文件的体积
mem – 最大程度减小内存消耗
2 – 开启所有优化,获得好的运行性能
3 – 同2类似,使用gcc重新编译pypy部分。
jit – 开启所有优化,使用jit即时编译器。jit是将字节码转换成可以直接发送给处理器的指令的程序。

  编译是单核进行的,所以多核是没有意义,如果想快就要主频高,内存高。如果是用云主机2G内存的话估计要一、两天时间。
  注:操作系统CentOS 6.3编译有问题。具体问题没有详细查。
  计算速度测试代码位置http://www.simonzhang.net/?p=1844
  使用python运行 14.9753940105秒,pyp运行3.56524395943秒。

  安装pip,后用pip安装tornado和flask框架。

tornado测试代码

import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = tornado.web.Application([
    (r"/", MainHandler),
])

if __name__ == "__main__":
    application.listen(5000)
    tornado.ioloop.IOLoop.instance().start()

flask测试代码

from flask import Flask
app = Flask(__name__)

@app.route('/')
def hello_world():
    return 'Hello World!'

if __name__ == '__main__':
    app.run(host='192.168.6.250')

  为了方便使用将命令放到系统环境中。
ln -s /usr/local/pypy/pypy-c /usr/local/bin/pypy
ln -s /usr/local/pypy/bin/pypy-c /usr/local/bin/pip

参考网页
http://pypy.readthedocs.org/en/latest/config/opt.html

3月 31

用户自行修改svn密码的简单服务

svn做好,最基础的使用方法。将来用户要修改密码或忘记密码总要来问还是挺麻烦。不想处理这种简单问题。所以在python+tornado的框架上写了个页面,直接python跑起来。省了自己不少的事。

#!/bin/python
#-*- coding:utf-8 -*-
# Filename:    websvn.py
# Revision:    
# Date:        2014-03-27
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
### END INIT INFO

import tornado.ioloop
import tornado.web

import smtplib
from email.mime.text import MIMEText 

from string import strip

# base set
webport = '88'
passfile = '/program/svn/conf/passwd'
# svn start comm
svn_start_comm = '/program/svn/bin/svnserve -d -r /program/svn/ --listen-port 59999'
# use send user info
mail_host = 'smtp.simonzhang.net'
mail_user = 'test@simonzhang.net'
mail_pwd = '123456'

def mail_send(content, mailto):
     msg = MIMEText(content.encode('utf8'), _subtype='html',  _charset='utf8')
     msg['From'] = mail_user
     msg['Subject'] = u'svninfo'
     msg['To'] = mailto
     try:
         s = smtplib.SMTP()
         s.connect(mail_host)
         s.login(mail_user,mail_pwd)
         s.sendmail(mail_user,[mailto],msg.as_string())
         s.close()
     except Exception ,e:
         print e 

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        index_html = '''
用户名 老密码 新密码 再次输入
用户邮箱
''' self.write(index_html) class EditPassHandler(tornado.web.RequestHandler): def get(self): # 清理临时文件 try: import os os.remove('%s.tmp' % passfile) except: pass # 是否重启的状态 reboot_status = 0 user = strip(self.get_argument('user')) oldpass = strip(self.get_argument('oldpass')) newpass1 = strip(self.get_argument('newpass')) newpass2 = strip(self.get_argument('newpass2')) if (len(user) == 0) or (len(oldpass) == 0) or \ (len(newpass1) == 0) or (len(newpass2) == 0): html = '输入信息错误' else: user_info_list = open(passfile, 'rb').readlines() tmp_file = open('%s.tmp' % passfile, 'wb') # 如果是用户配置则查看是否为用户,如果不是直接写入临时文件 for li in xrange(len(user_info_list)): if user_info_list[li].find('=') > 0 : u = strip(user_info_list[li].split('=')[0]) p = strip(user_info_list[li].split('=')[1]) #e = strip(user_info_list[li+1].split('#')[1]) if (u == user) and (p == oldpass): tmp_file.write('%s=%s\n' % (u, newpass1)) # tmp_file.write('#%s\n' % (e)) reboot_status = 1 # li = li+1 else: tmp_file.write(user_info_list[li]) else: tmp_file.write(user_info_list[li]) if reboot_status == 1: import os import shutil shutil.move('%s.tmp' % passfile, passfile) os.system("killall svnserve && %s" % svn_start_comm) html = '处理完成请重试' self.write(html) class SendUserInfoHandler(tornado.web.RequestHandler): def get(self): email = self.get_argument('email') user_info_list = open(passfile, 'rb').readlines() # 循环读取每一行 html = '没有找到相关信息' for li in xrange(len(user_info_list)): context = user_info_list[li][:-1] # 如果有用户配置的则分解 if context.find('=') > 0 : u = strip(user_info_list[li].split('=')[0]) p = strip(user_info_list[li].split('=')[1]) e = strip(user_info_list[li+1].split('#')[1]) # 如果用户邮箱与输入邮箱相同则发邮件 if e == strip(email): mail_send("user:%s. passwd:%s" % (u, p), email) html = '邮件发送注意查收' self.write(html) application = tornado.web.Application([ (r"/", MainHandler), (r"/editpass/", EditPassHandler), (r"/senduserinfo/", SendUserInfoHandler), ]) if __name__ == "__main__": application.listen(webport) tornado.ioloop.IOLoop.instance().start()

源码包websvn