Python数据库访问公共组件及模拟Http请求
模拟Http请求
在请求别人接口时,我们最常使用的是模拟Http请求。在python中有许多方式,我选用了新版的httplib2。有兴趣的可以查看一下其他文档。
# encoding: utf-8 __author__ = 'changyang' ''' @author: changyang @software: PyCharm @file: httphelper.py @time: 2015/12/14 10:48 @function:http请求操作 ''' import httplib2,json #get def get(url): return handler(url,None,'GET') #post def post(url,data): return handler(url,data,'POST') #统一处理http函数 def handler(url,data,method): try: httpClient=httplib2.Http() headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} if data!=None: data=json.dumps(data) response,content=httpClient.request(uri=url,method=method,body=data,headers=headers) return content.decode('utf-8') except Exception as e: print(e) if __name__=='__main__': print('choice http method...')
Mysql数据库访问类
由于使用.net习惯了,还真不知道怎样描述,大家理解意思就行。是在不知道怎样说了,直接上代码。
# encoding: utf-8 __author__ = 'changyang' ''' @author: changyang @software: PyCharm @file: mysql_helper.py @time: 2015/12/24 16:15 @function:数据库访问帮助类 ''' import mysql.connector class MySqlHelper(object): def __init__(self,config_mysql): self.create_connector(config_mysql) #创建数据库连接 def create_connector(self,config_mysql): try: self.connector= mysql.connector.connect( host=config_mysql['host'], user=config_mysql['user'], password=config_mysql['password'], database=config_mysql['database'], port=config_mysql['port'], charset='utf8', buffered=True ) self.cursor=self.connector.cursor(buffered=True) except Exception as e: print('myql connector is error:%s' % e) #插入单条信息,parameters为元组,sql语句中的占位符必须与参数的顺序相同,且sql语句中以‘%s’进行占位 def insert(self,sql,parameters): try: if sql==None or sql=='': return 0 self.cursor.execute(sql,parameters) self.connector.commit() return self.cursor.rowcount except Exception as e: print('insert is error:%s' % e) finally: self.cursor.close() self.connector.close()
#一次性插入多条数据,parameters为数组,每个元素都是一个元组,元组内容的顺序必须与sql语句中的占位符相同,且sql语句中以‘%s’进行占位
def multiinsert(self,sql,parameters): try: if sql==None or sql=='': return 0 self.cursor.executemany(sql,parameters) self.connector.commit() return self.cursor.rowcount except Exception as e: print('multiinsert is error:%s' % e) finally: self.cursor.close() self.connector.close() #分页查询,parameters为元组,sql语句中的占位符必须与参数的顺序相同,且sql语句中以‘%s’进行占位 #可用于分页查询,但是需要在sql语句中进行分页 def findlimit(self,sql,parameters,size): try: if sql==None or sql=='': return 0 self.cursor.execute(sql,parameters) allcount=self.cursor.rowcount list=None if size!=0: list= self.cursor.fetchmany(size) else: list= self.cursor.fetchall() return list,allcount except Exception as e: print('findlimit is error:%s' % e) finally: self.connector.commit() self.cursor.close() self.connector.close() #查询全部,parameters为元组,sql语句中的占位符必须与参数的顺序相同,且sql语句中以‘%s’进行占位 def findall(self,sql,parameters): return self.findlimit(sql,parameters,0) 这里我使用了配置文件,便于后期管理,其实说白了,也就是一个数组。直接上配置 configs_mysql={ 'host':'ip地址', 'user':'账号', 'password':'密码', 'database':'数据库', 'port':端口号 }
其他比较重要的访问类
xml和json相互转化
:
# encoding: utf-8 __author__ = 'changyang' ''' @author: changyang @software: PyCharm @file: json_to_xml.py @time: 2015/12/15 9:57 @function:json转化为xml ''' import xmltodict,json #xml转化为json def xml_to_json(str): if str=='': raise 'str is null' str=xmltodict.parse(str) return json.dumps(str)
#
json转化为xml
def json_to_xml(str): if str=='': raise 'str is null' str={ 'Ticket':json.loads(str) } return xmltodict.unparse(str,encoding='utf-8',full_document=True) if __name__=='__main__': xml = """ 10213 name xxx@xxx.com male math 90 english 88 """ result=xml_to_json(xml) print(result) print(json_to_xml(result))
文件操作
# encoding: utf-8 __author__ = 'changyang' ''' @author: changyang @software: PyCharm @file: file_helper.py @time: 2015/12/15 8:49 @function:文件操作 ''' import sys,time,os,shutil #保存xml文件并写入内容 def save(path_type,filename,content): try: path=get_common_path(path_type) if not os.path.exists(path): os.makedirs(path) filename='%s\%s' % (path,filename) if os.path.exists(filename): os.remove(filename) with open(filename, "w",encoding='utf-8') as f: f.write(content) except Exception as e: print(e) #移除文件类型下的所有文件 def remove(path_type): try: path=get_common_path(path_type) if os.path.exists(path): shutil.rmtree(path,True) except Exception as e: print(e) #获取当前门票xml路径 def getpath(xml,path_type): return get_common_path(path_type,xml)