9月 23

python到mysql 2006问题解决

  tornado和Django框架都是使用MySQLdb,在长时间不与数据库有交互,如采集、SQL运行数据过大而数据库等待时间过短等,这时会报“Error 2006: MySQL Server has gone away”错误。收集了网上的三个解决方案是,
  一、修改数据库的参数wait_timeout,interactive_timeout到2880000,
  二、在每次连接时做判断,如果报错将再连接一次(http://xulizhao.com/blog/python_db)。

class DB:
    conn = None
    def connect(self):
        self.conn = MySQLdb.connect(localhost,root,pass,test)
    def cursor(self):
        try:
            return self.conn.cursor()
        except (AttributeError, MySQLdb.OperationalError):
            self.connect()
            return self.conn.cursor()

  三、定时进行连接,以防断开

from tornado.ioloop import PeriodicCallback

#torndb.Connection(host, database, user=None, password=None, max_idle_time=25200, connect_timeout=0, time_zone='+0:00')
db = torndb.Connection(db_ip, db_database, db_user, db_pass, db_conn_time)
# https://github.com/felinx/poweredsites/blob/master/poweredsites/db/mysql.py
# ping db periodically to avoid mysql go away
PeriodicCallback(db.query(show variables), 3600*4*1000).start()

我的测试,第一种方法比较好用,但是在某些情况下会造成其它问题。第三种效果不理想,需要继续研究。第二种正在试验中。

8月 16

raspberry pi 使用mpu 6050三轴加速度 陀螺仪模块

连接线图。

raspberry pi 使用mpu 6050三轴加速度 陀螺仪模块

raspberry pi 使用mpu 6050三轴加速度 陀螺仪模块


raspberry pi 使用mpu 6050三轴加速度 陀螺仪模块

raspberry pi 使用mpu 6050三轴加速度 陀螺仪模块


  安装工具
sudo apt-get install -y python-smbus i2c-tools

编辑文件
$sudo vim /etc/modules
内容如下
i2c-bcm2708
i2c-dev

$vim /etc/modprobe.d/raspi-blacklist.conf
内容如下,直接注释掉
#blacklist spi-bcm2708
#blacklist i2c-bcm2708

测试一下
512M Pi’s use i2c port 1, 256M ones use i2c port 0!

pi@raspberrypi ~ $ sudo i2cdetect -y 1
0 1 2 3 4 5 6 7 8 9 a b c d e f
00: — — — — — — — — — — — — —
10: — — — — — — — — — — — — — — — —
20: — — — — — — — — — — — — — — — —
30: — — — — — — — — — — — — — — — —
40: — — — — — — — — — — — — — — — —
50: — — — — — — — — — — — — — — — —
60: — — — — — — — — 68 — — — — — — —
70: — — — — — — — —

  一切正常。地址68在代码中会用到。源码如下。

#! /usr/bin/python
# -*- coding:utf-8 -*-
# Revision:
# Date:        2013-08-16
# Author:      simonzhang
# Email:       simon-zzm@163.com
# Web:         www.simonzhang.net
# -------------------------------

from time import sleep
from math import sqrt, atan

class i2c(object):
    # MPU6050内部地址,需查手册获得
    def __init__(self):
        # raspberry pi A is 0,B is 1
        self.pi = 1
        # iic address of MCP23017
        # 上面测试时候的地址
        self.address = 0x68
        # 
        import smbus
        self.bus = smbus.SMBus(self.pi)
        # 电源管理,正常启动接触休眠值0x00
        self.bus.write_byte_data(self.address, 0x6B, 0x00)
        # 陀螺仪采样率,典型值:0x07(125Hz)
        self.bus.write_byte_data(self.address, 0x19, 0x07)
        # 低通滤波频率,典型值:0x06(5Hz)
        self.bus.write_byte_data(self.address, 0x1A, 0x06)
        # 陀螺仪自检及测量范围,典型值:0x18(不自检,2000deg/s)
        self.bus.write_byte_data(self.address, 0x1B, 0x18)
        # 加速计自检、测量范围及高通滤波频率,典型值:0x01(不自检,2G,5Hz)
        self.bus.write_byte_data(self.address, 0x1C, 0x01)
    # 3轴的加速度
    def read_accel(self):
        # x轴 
        self.accel_x_h = self.bus.read_byte_data(self.address, 0x3B)
        self.accel_x_l = self.bus.read_byte_data(self.address, 0x3C)
        # y轴
        self.accel_y_h = self.bus.read_byte_data(self.address, 0x3D)
        self.accel_y_l = self.bus.read_byte_data(self.address, 0x3E)
        # z轴
        self.accel_z_h = self.bus.read_byte_data(self.address, 0x3E)
        self.accel_z_l = self.bus.read_byte_data(self.address, 0x3F)
        return self.accel_x_h, self.accel_x_l, \
               self.accel_y_h, self.accel_y_l, \
               self.accel_z_h, self.accel_z_l, \
    # 陀螺仪3轴角加速度,每秒多少度
    def read_gyro(self):
        # x轴角速度
        self.gyro_x_h = self.bus.read_byte_data(self.address, 0x43)
        self.gyro_x_l = self.bus.read_byte_data(self.address, 0x44)
        # y轴角速度
        self.gyro_y_h = self.bus.read_byte_data(self.address, 0x45)
        self.gyro_y_l = self.bus.read_byte_data(self.address, 0x46)
        # z轴角速度
        self.gyro_z_h = self.bus.read_byte_data(self.address, 0x47)
        self.gyro_z_l = self.bus.read_byte_data(self.address, 0x48)
        return self.gyro_x_h, self.gyro_x_l, \
               self.gyro_y_h, self.gyro_y_l, \
               self.gyro_z_h, self.gyro_z_l, \
    # 3轴与自然轴角度,参考他人的算法,也不是很明白
    def read_nature_axle_angle(self):
        # 先开方,然后在算出弧度,最后把弧度转换为角度
        # 使用try 防止分母为0
        # x轴
        tmp = self.accel_x_h/int(sqrt((self.accel_y_h*self.accel_y_h+self.accel_z_h*self.accel_z_h)))
        self.x_nature_axle_angle = int(atan(tmp)*1800/3.14)
        # y轴
        tmp = self.accel_y_h/int(sqrt((self.accel_x_h*self.accel_x_h+self.accel_z_h*self.accel_z_h)))
        self.y_nature_axle_angle = int(atan(tmp)*1800/3.14)
        # z轴
        try:
            tmp = int(sqrt((self.accel_x_h*self.accel_x_h+self.accel_y_h*self.accel_y_h)))/self.accel_z_h
        except:
            pass
        self.z_nature_axle_angle = int(atan(tmp)*1800/3.14)
        return self.x_nature_axle_angle,self.y_nature_axle_angle, self.z_nature_axle_angle
    # 芯片温度
    def read_temp(self):
        self.temp_out_h = self.bus.read_byte_data(self.address, 0x41)
        self.temp_out_l = self.bus.read_byte_data(self.address, 0x42)
        return self.temp_out_h, self.temp_out_l
        
def main():
    i = i2c()
    while 1:
        print i.read_accel()
        print i.read_gyro()
        print i.read_temp()
        print i.read_nature_axle_angle()
        print "=" * 20
        sleep(0.5)
   
if __name__ == "__main__":
   main()

  因为要在管理员环境下运行。命令如下
sudo python iic_python.py

开始收集测试数据。
测试源码

参考
http://wiki.erazor-zone.de/wiki:linux:python:smbus:doc
http://learn.adafruit.com/adafruits-raspberry-pi-lesson-4-gpio-setup/configuring-i2c
http://blog.sina.com.cn/s/blog_8240cbef01018i10.html
http://www.geek-workshop.com/thread-2328-1-1.html

7月 03

在linux下用python读取其他操作系统编写的配置文件

  在windows、mac、linux下编写的配置文件会有头部或者换行有区别。为了分析方便简单做个记录,只为演示,代码并不完整,需要自己修改

def conf(conf_context):
    # 替换window或mac操作系统下的换行符
    import re
    _get_file = re.sub(r'(\r\n|\r|\n)', '\n', conf_context)
    # 将配置放在内存中
    import StringIO
    _tmp_file = StringIO.StringIO()
    # 将文件写入内存
    _tmp_file.write(_get_file)
    _tmp_file.seek(0)
    # 如果是在window下编辑的文件将,utf-8 BOM开始的头替换掉
    if _tmp_file.read(3).startswith('\xef\xbb\xbf'):
        _tmp_file.seek(3)
    else:
        _tmp_file.seek(0)
    # 导入配置
    import ConfigParser
    try:
        cf = ConfigParser.SafeConfigParser()
        cf.readfp(_tmp_file)
        config_list = cf.sections()
    except:
        _tmp_file.close()
        return "error"
    _tmp_file.close()
    return "ok"
6月 24

监控mysql从机同步状态脚本1.1

  之前写了个一个检查mysql从机的脚本(http://www.simonzhang.net/?p=1823),但是在使用中发现一个问题。如果数据库被重启了,但是同步的没有启动,此脚本检查还是正常,不会进行报警,数据不会同步。
  我做了个调整,每次检查同步主机的pos,通过crontab进行调用,如果多次都没有变化则进行告警。如果10分钟调用一次,设为3次,就是半个小时内没有更新则报警。
crontab配置如下:
*/10 * * * * /bin/bash /script/check_mysql_slave/check_mysql_slave.sh start >/dev/null 2>&1
部分代码如下:

#!/usr/local/bin/python
# -*- coding:utf-8 -*-
# -------------------------------------------------------------------------------
# Filename:    check_nagios.py
# Revision:    1.1
# Date:        2013-06-24
# Author:      simonzhang
# Email:       simon-zzm@163.com
# -------------------------------------------------------------------------------
import os
import pexpect
import time
import smtplib
from email.mime.text import MIMEText

#### base se
mysql_bin = '/program/mysql5/bin/mysql'
mysql_user = 'checkslavestatus'
mysql_pass = 'xxxxxxxxxx'
#设置错多少次开始告警
max_error = 3
mail_host = 'smtp.exmail.qq.com'
mail_user = 'warning@xxx.net'
mail_pwd = 'xxxxxxxxx'
mail_cc = "simon-zzm@163.com"
####

def mail_warn(error_ip):
    content = 'IP %s mysql slave is error!'%error_ip
    msg = MIMEText(content)
    msg['From'] = mail_user
    msg['Subject'] = 'mysql warnning %s'%error_ip
    msg['To'] = mail_to
    try:
        s = smtplib.SMTP()
        s.connect(mail_host)
        s.login(mail_user,mail_pwd)
        s.sendmail(mail_user,[mail_to],msg.as_string())
        s.close()
    except Exception ,e:
        print e

def main():
    error_context = ''
    #读取上次检查master同步点的记录
    try:
        f = open('MasterPos.txt', 'rb').read()
        try:
            old_master_pos = f.split(':')[0]
            error_count = f.split(':')[1]
        except:
            old_master_pos = 0
            error_count = 0
    except:
        old_master_pos = 0
        error_count = 0
        pass
    # 获得数据库同步状态
    status = os.popen("%s -u%s -p%s -e 'show slave status\G'"%
                      (mysql_bin,mysql_user,mysql_pass)).readlines()
    # 查看同步主节点数据
    for status_l in status:
        if status_l.find('Read_Master_Log_Pos: ') > 0:
            f = open('MasterPos.txt', 'wb')
            # 防止出现空值
            try:
                new_master_pos = int(status_l.split(': ')[1])
            except:
                new_master_pos = 0
            if int(new_master_pos) == int(old_master_pos) or int(old_master_pos):
                f.write('%s:%s' % (new_master_pos, int(error_count)+1))
            else:
                f.write('%s:0' % new_master_pos)
            f.close()
            if int(error_count)+1 > max_error:
                error_context += 'slave error!'
    # 判断是否报警
    print error_context:
    if len(error_context) > 1:
        ip = os.popen("/sbin/ifconfig|grep 'inet addr'|awk '{print $2}'").read()
        get_local_ip = ip[ip.find(':')+1:ip.find('n')]
        mail_warn("%s"%get_local_ip)

if __name__ == "__main__":
    main()

源代码