4月 08

python的聪明组合

  一直在买双色就是没有中过,看过高人指点,根据“聪明组合”写了这个脚本。在技术上没有任何难度,都是体力活。为了大家方便。
  运行脚本输入12个红球数,组合成10组。然后在自己加上篮球即可。

#!/bin/env python
# -*- coding:utf-8 -*-
# -------------------------------------------
# Filename:    clever12.py
# Revision:    1.0
# Date:        2012-3-13
# Author:      simonzhang
# WEB:         www.simonzhang.net
# Email:       simon-zzm@163.com
# -------------------------------------------

def run_group(di):
    fen = di.split(' ')
    A = fen[0]
    B = fen[1]
    C = fen[2]
    D = fen[3]
    E = fen[4]
    F = fen[5]
    G = fen[6]
    H = fen[7]
    I = fen[8]
    J = fen[9]
    K = fen[10]
    L = fen[11]
    print("%s %s %s %s %s %s"%(A,B,D,E,K,L))
    print("%s %s %s %s %s %s"%(A,B,E,F,H,I))
    print("%s %s %s %s %s %s"%(A,B,E,G,I,K))
    print("%s %s %s %s %s %s"%(A,B,E,I,J,L))
    print("%s %s %s %s %s %s"%(A,C,D,E,F,L))
    print("%s %s %s %s %s %s"%(A,C,D,G,H,J))
    print("%s %s %s %s %s %s"%(A,C,D,I,K,L))
    print("%s %s %s %s %s %s"%(A,C,F,G,H,L))
    print("%s %s %s %s %s %s"%(A,C,F,H,J,K))
    print("%s %s %s %s %s %s"%(A,D,F,G,J,L))

def main():
    get_list = raw_input("12 number :")
    if len(get_list) == 35:
        run_group(get_list)
    else:
        print "input error"

if __name__ == "__main__":
    main()
3月 23

python 用ping循环监测服务器心跳

        循环用ping命令监测服务器是否在运行。本脚本为循环监测,每次监测间隔3秒钟。当错误达到一定数量时,发邮件告知。为了防止脚本错误退出,建议写一个启停的脚本放在crontab中调用,具体做法参见:http://www.simonzhang.net/?p=697

#!/usr/local/bin/python
# -------------------------------------------------------------------------------
# Filename:    check_ip.py
# Revision:    1.0
# Date:        2012-03-20
# Author:      simonzhang
# web:         www.simonzhang.net
# Email:       simon-zzm@163.com
# -------------------------------------------------------------------------------
import pexpect
import time
import smtplib
from email.mime.text import MIMEText

#### base set
#### 配置要监测的IP地址
check_ip ={"192.168.1.100":0
           ,"192.168.1.101":0
           ,"192.168.1.102":0}
#### 定义几次ping不通发邮件
send_mail_limit = [3,4]
#### 发邮件的邮件服务器配置
mail_host = 'smtp.exmail.qq.com'
mail_user = 'simon-zzm@qq.net'
mail_pwd = 'XXXXXXX'
mail_to = "simon-zzm@139.com"
mail_cc = "simon-zzm@163.com"
####

def mail_warn(error_ip):
    content = 'Ping IP %s is error!'%error_ip
    msg = MIMEText(content)
    msg['From'] = mail_user
    msg['Subject'] = 'warnning %s'%error_ip
    msg['To'] = mail_to
    try:
        s = smtplib.SMTP()
        s.connect(mail_host)
        s.login(mail_user,mail_pwd)
        s.sendmail(mail_user,[mail_to,mail_cc],msg.as_string())
        s.close()
    except Exception ,e:
        print e

def check(get_ip):
    try :
        ping=pexpect.spawn("ping -c1 %s" % (get_ip))
        check_result =ping.expect([pexpect.TIMEOUT,"1 packets transmitted, 1 received, 0% packet loss"],2)
    except :
        check_result = 0
    return check_result

def main():
    while True :
        for i in check_ip:
            check_status = check("%s"%i)
            if check_status == 1:
                check_ip["%s"%i] = 0
            else :
                check_ip["%s"%i] +=1
            if check_ip["%s"%i] in send_mail_limit :
                mail_warn("%s"%i)
        time.sleep(3)

if __name__ == "__main__":
    main()
3月 22

python 多线程程序,控制线程数

python 控制线程数

         用python写多线程的脚本,需要控制线程数。需要自己判断线程是否运行完毕。测试代码如下:

#!/bin/env python
# -------------------------------------------------------------------------------
# Filename:    my-thread.py
# Revision:
# Date:        2012-12-19
# Author:      simonzhang
# web :        www.simonzhang.net
# Email:       simon-zzm@163.com
# ------------------------------------------------------------------------------- 

####加载多线程模块
import threading
####需要个随机数和延迟,为测试用
import random
from time import sleep

#### 多线程运行的测试部分。循环3次,每次间隔0到2的随机秒数,
#### 等待后打印,运行总次数,线程数和循环值
def test_func(thread_number,sequence):
    for i in range(3):
        sleep(random.randint(0,2))
        print('run sequence:%s thread %d is running %d ' % (sequence,thread_number,i))   

def main():
    #### 定义循环序列,就是一个线程池
    threads = []
    #### 定义总共运行的次数
    all_number = 5
    #### 定义运行所使用的线程数
    thread_lines = 3
    #### 定义开始线程数
    start_line = 0
    #### 首先构建线程池
    for i in range(0,thread_lines):
        t = threading.Thread(target=test_func, args=(i,start_line,))
        threads.append(t)
        start_line +=1
    #### 运行第一批线程的任务
    for t in threads:
        t.start()
    #### 循环运行全部任务
    for number_line in xrange(start_line,all_number):
        #### 初始化当前线程的状态
        thread_status = False
        #### 初始化检查循环线程的开始值
        loop_line = 0
        #### 开始循环检查线程池中的线程状态
        while thread_status == False :
            #### 如果检查当前线程,如果线程停止,代表任务完成,则分配给此线程新任务,
            #### 如果检查当先线程正在运行,则开始检查下一个线程,直到分配完新任务。
            #### 如果线程池中线程全部在运行,则开始从头检查
            if threads[loop_line].isAlive() == False :
                t = threading.Thread(target=test_func, args=(loop_line,number_line,))
                threads[loop_line]=t
                threads[loop_line].start()
                thread_status = True
            else:
                if loop_line >= thread_lines-1 :
                    loop_line=0
                else:
                    loop_line+=1
    #### 等待超时
    sleep(30)
    #### 结束所有线程
    for number_line in xrange(start_line,thread_lines):
        thread[number_line].exit()

if __name__ == "__main__":
    main()

          本代码存在一个问题,运行完毕后主进程在运行,程序走到下一步,但是还有线程也在运行。所以需要在调用全部完毕后,有一段检查线程是否全部结束,可以使用jion()进行阻塞判断即可。根据实际需要编写。

3月 19

python 脚本不能并行运行

        写了个脚本,数据需要定时循环处理,但是不能同时重复处理。也就是说,脚本需要单进程运行。脚本定时运行配置在crontab中。如果在脚本开始写状态文件,运行完成后关闭状态文件也可以,但是如果脚本中途退出,状态文件不会更改,下次运行就不容易判断了。使用进程号来判断就比较准确了,所以写了以下代码。
       以下脚本为linux下使用。

#!/usr/local/bin/python
# -------------------------------------------------------------------------------
# Filename:    my-pid.py
# Revision:
# Date:        2012-12-19
# Author:      simonzhang
# Email:       simon-zzm@163.com
# -------------------------------------------------------------------------------
import os

if __name__ == '__main__':
    try :
        #首先查看是否有pid文件
        #读取pid
        now_pid = open("./my_pid.pid","rb").read()
        #通过操作系统命令,统计pid运行的数量
        get_count = os.popen("ps -ef|awk ' ''{print $2}'|grep -w %s|wc -l"%now_pid).read()
    except:
        #如果没有pid文件,则统计为零
        get_count = 0
    #判断pid是否在进程中
    if int(get_count) > 0 :
        #如果系统进程中有pid进程,则打印后推出
        print "run...."
        sys.exit()
    else:
        #如果没有进程则获得后,保存在pid文件中
        w_pid = os.getpid()
        w_pid_file = open("./my_pid.pid","wb")
        w_pid_file.write("%s"%w_pid)
        w_pid_file.close()
        #运行脚本
         main()
3月 15

python中通过 pexpect 使用 rsync

  之前写了脚本,死循环调用inotifywait监控文件夹,如果文件有变动,则启动rsync进行同步。但是当前需求有点变化,文件要按照日期建文件夹进行存储,且文件变化很快,如果直接监控最顶级目录系统资源将消耗很大,所以考虑还是自写一下好,如果系统资源量变化大,且不用时时同步时可以根据文件变动对变动对文件进行记录,可以按照优先规则进行同步。
  官方网站 : http://trac.dbzteam.org/pyinotify
  系统要求:Linux kernel with inotify ≥ 2.6.13
Python ≥ 2.4

  直接使用easy_install安装非常简单
  # sudo easy_install pyinotify
  如果不能联网,则需要直接下载压缩包,进行编译安装。

  首先测试下是否可用:
  使用以下命令监控/tmp文件夹,
  # python -m pyinotify /tmp

在/tmp文件夹下新建1.txt,并随后进行删除。看到显示的记录如下:







  效果不错,继续向下进行。只要有创建、删除和关闭写就打印出变化的文件或目录,代码如下:

import  re
import pyinotify

wm = pyinotify.WatchManager()
mask = pyinotify.IN_DELETE | pyinotify.IN_CREATE | pyinotify.IN_CLOSE_WRITE
class EventHandler(pyinotify.ProcessEvent):
    def process_IN_CREATE(self, event):
        self.rebuild(event) 

    def process_IN_DELETE(self, event):
        self.rebuild(event) 

    def process_IN_CLOSE_WRITE(self, event):
        self.rebuild(event)

    def rebuild(self, event):
        chang_name=re.compile(".+\.swp$|.+\.swx$|.+\.swpx$")
        if not chang_name.match(event.pathname):
            print event.pathname 

handler = EventHandler()
notifier = pyinotify.Notifier(wm, handler)
wdd = wm.add_watch('/tmp', mask, rec=True,auto_add=True  

notifier.loop()

  代码里使用正则表达式过滤因为使用vim打开文件产生的缓存文件。也可以用exclude_filter方法在官方文档中例子如下,但是我测试多次没有成功,所以直接用正则过滤。

# Exclude patterns from list
excl_lst = ['^/etc/apache[2]?/',
            '^/etc/rc.*',
            '^/etc/hostname',
            '^/etc/hosts',
            '^/etc/(fs|m)tab',
            '^/etc/cron\..*']
excl = pyinotify.ExcludeFilter(excl_lst)
# Add watches
res = wm.add_watch(['/etc/hostname', '/etc/cups', '/etc/rc0.d'],
                   pyinotify.ALL_EVENTS, rec=True, exclude_filter=excl)

  如果监控文件太多,需要对系统做一下修改sysctl -n -w fs.inotify.max_user_watches=16384。