首页 行业资讯 宠物日常 宠物养护 宠物健康 宠物故事
您的当前位置:首页正文

python3结合pymysql模块对数据库进行建表,增, 改,查

2023-11-13 来源:画鸵萌宠网

# !/usr/bin/python3# -*- coding: utf-8 -*-import pymysqlOPTION = { "check_sql_data": "0", "create_tables": "1", "add_sql_data": "2", "put_sql_data": "3",}def __get_connect(): # 连接数据库 connect = pymysql.Connect( host=host, port=port, user=user, passwd=‘password‘, db="db_name", charset=‘utf8‘ ) return connectdef create_tables(): tables = "5" # 创建数据库表 create_table = """ create table my{}( id int unsigned primary key auto_increment, first_name char(10) not null, last_name char(10) not null, age int unsigned, sex tinyint, money float ) """.format(tables) try: connect = __get_connect() cursor = connect.cursor() cursor.execute(create_table) except Exception as e: connect.rollback() # 事务回滚 print(‘事务处理失败‘, e) else: connect.commit() # 事务提交 print(‘事务处理成功‘, cursor.rowcount) # 关闭连接 cursor.close() connect.close()def check_sql_data(): # 数据库查询 check_sql = "select * from my23" try: connect = __get_connect() cursor = connect.cursor() cursor.execute(check_sql) data = cursor.fetchmany(11) except Exception as e: connect.rollback() # 事务回滚 print(‘数据查询失败‘, e) return "数据查询失败" else: connect.commit() # 事务提交 print("查询到{}条数据," .format(cursor.rowcount), data) # 关闭连接 cursor.close() connect.close()def add_sql_data(): # 往数据库表增加数据 add_sql = "insert into myt033(first_name,last_name,age,sex,money) values(%s,%s,%s,%s,%s)" try: connect = __get_connect() cursor = connect.cursor() cursor.executemany(add_sql, [("Liu", "Mick", 90, 1, 9.9), ("L", "M", 12, 3, 44)]) except Exception as e: connect.rollback() # 事务回滚 print(‘数据新增失败‘, e) else: connect.commit() # 事务提交 print("新增{}数据成功".format(cursor.rowcount)) # 关闭连接 cursor.close() connect.close()def put_sql_data(): # 修改数据库数据 put_sql = "update my23 set first_name = %s where id = %s" try: connect = __get_connect() cursor = connect.cursor() cursor.executemany(put_sql, [("test11", 1), ("test211", 2)]) print("修改{}条数据成功".format(cursor.rowcount)) except Exception as e: connect.rollback() # 事务回滚 print(‘数据修改失败‘, e) else: connect.commit() # 事务提交 print("{}条数据成功".format(cursor.rowcount)) # 关闭连接 cursor.close() connect.close()def option_in(arg1): if arg1 == OPTION.get("check_sql_data"): check_sql_data() elif arg1 == OPTION.get("create_tables"): create_tables() elif arg1 == OPTION.get("add_sql_data"): add_sql_data() elif arg1 == OPTION.get("put_sql_data"): put_sql_data() else: print("您输入的参数无效,请输入数字, 只支持0~3")if __name__ == "__main__": arg = input("请输入:") option_in(arg) # python3交流群: 305357273

python3结合pymysql模块对数据库进行建表,增, 改,查

标签:ble   int   creat   man   pytho   return   not   数据库   charset   

小编还为您整理了以下内容,可能对您也有帮助:

python对数据库表格里面的内容增删查改怎么写

本文主要给大家介绍了关于python模拟sql语句对员工表格进行增删改查的相关内容,分享出来供大家参考学习,下面来一起看看详细的介绍:
具体需求:
员工信息表程序,实现增删改查操作:
可进行模糊查询,语法支持下面3种:
select name,age from staff_data where age > 22 多个查询参数name,age 用','分割
select * from staff_data where dept = 人事
select * from staff_data where enroll_date like 2013
查到的信息,打印后,最后面还要显示查到的条数
可创建新员工纪录,以phone做唯一键,phone存在即提示,staff_id需自增,添加多个记录record1/record2中间用'/'分割
insert into staff_data values record1/record2
可删除指定员工信息纪录,输入员工id,即可删除
delete from staff_data where staff_id>=5andstaff_id<=10
可修改员工信息,语法如下:
update staff_table set dept=Market,phone=13566677787 where dept = 运维 多个set值用','分割
使用re模块,os模块,充分使用函数精简代码,熟练使用 str.split()来解析格式化字符串
由于,sql命令中的几个关键字符串有一定规律,只出现一次,并且有顺序!!!
按照key_lis = ['select', 'insert', 'delete', 'update', 'from', 'into', 'set', 'values', 'where', 'limit']的元素顺序分割sql.
分割元素作为sql_dic字典的key放进字典中.分割后的列表为b,如果len(b)>1,说明sql字符串中含有分割元素,同时b[0]对应上一个分割元素的值,b[-1]为下一次分割对象!
这样不断迭代直到把sql按出现的所有分割元素分割完毕,但注意这里每次循环都是先分割后赋值!!!当前分割元素比如'select'对应的值,需要等到下一个分割元素
比如'from'执行分割后的列表b,其中b[0]的值才会赋值给sql_dic['select'] ,所以最后一个分割元素的值,不能通过上述循环来完成,必须先处理可能是最后一个分割元素,再正常循环!!
在这sql语句中,有可能成为最后一个分割元素的 'limit' ,'values', 'where', 按优先级别,先处理'limit' ,再处理'values'或 'where'.....
处理完得到sql_dic后,就是你按不同命令执行,对数据文件的增删改查,最后返回处理结果!!
示例代码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308

# _*_coding:utf-8_*_# Author:Jaye Heimport reimport os def sql_parse(sql, key_lis): ''' 解析sql命令字符串,按照key_lis列表里的元素分割sql得到字典形式的命令sql_dic :param sql: :param key_lis: :return: ''' sql_list = [] sql_dic = {} for i in key_lis: b = [j.strip() for j in sql.split(i)] if len(b) > 1: if len(sql.split('limit')) > 1: sql_dic['limit'] = sql.split('limit')[-1] if i == 'where' or i == 'values': sql_dic[i] = b[-1] if sql_list: sql_dic[sql_list[-1]] = b[0] sql_list.append(i) sql = b[-1] else: sql = b[0] if sql_dic.get('select'): if not sql_dic.get('from') and not sql_dic.get('where'): sql_dic['from'] = b[-1] if sql_dic.get('select'): sql_dic['select'] = sql_dic.get('select').split(',') if sql_dic.get('where'): sql_dic['where'] = where_parse(sql_dic.get('where')) return sql_dic def where_parse(where): ''' 格式化where字符串为列表where_list,用'and', 'or', 'not'分割字符串 :param where: :return: ''' casual_l = [where] logic_key = ['and', 'or', 'not'] for j in logic_key: for i in casual_l: if i not in logic_key: if len(i.split(j)) > 1: ele = i.split(j) index = casual_l.index(i) casual_l.pop(index) casual_l.insert(index, ele[0]) casual_l.insert(index+1, j) casual_l.insert(index+2, ele[1]) casual_l = [k for k in casual_l if k] where_list = three_parse(casual_l, logic_key) return where_list def three_parse(casual_l, logic_key): ''' 处理临时列表casual_l中具体的条件,'staff_id>5'-->['staff_id','>','5'] :param casual_l: :param logic_key: :return: ''' where_list = [] for i in casual_l: if i not in logic_key: b = i.split('like') if len(b) > 1: b.insert(1, 'like') where_list.append(b) else: key = ['<', '=', '>'] new_lis = [] opt = '' lis = [j for j in re.split('([=<>])', i) if j] for k in lis: if k in key: opt += k else: new_lis.append(k) new_lis.insert(1, opt) where_list.append(new_lis) else: where_list.append(i) return where_list def sql_action(sql_dic, title): ''' 把解析好的sql_dic分发给相应函数执行处理 :param sql_dic: :param title: :return: ''' key = {'select': select, 'insert': insert, 'delete': delete, 'update': update} res = [] for i in sql_dic: if i in key: res = key[i](sql_dic, title) return res def select(sql_dic, title): ''' 处理select语句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r', encoding='utf-8') as fh: filter_res = where_action(fh, sql_dic.get('where'), title) limit_res = limit_action(filter_res, sql_dic.get('limit')) search_res = search_action(limit_res, sql_dic.get('select'), title) return search_res def insert(sql_dic, title): ''' 处理insert语句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r+', encoding='utf-8') as f: data = f.readlines() phone_list = [i.strip().split(',')[4] for i in data] ins_count = 0 if not data: new_id = 1 else: last = data[-1] last_id = int(last.split(',')[0]) new_id = last_id+1 record = sql_dic.get('values').split('/') for i in record: if i.split(',')[3] in phone_list: print('\033[1;31m%s 手机号已存在\033[0m' % i) else: new_record = '%s,%s\n' % (str(new_id), i) f.write(new_record) new_id += 1 ins_count += 1 f.flush() return ['insert successful'], [str(ins_count)] def delete(sql_dic, title): ''' 处理delete语句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r', encoding='utf-8') as r_file,\ open('staff_data_bak', 'w', encoding='utf-8') as w_file: del_count = 0 for line in r_file: dic = dict(zip(title.split(','), line.split(','))) filter_res = logic_action(dic, sql_dic.get('where')) if not filter_res: w_file.write(line) else: del_count += 1 w_file.flush() os.remove('staff_data') os.rename('staff_data_bak', 'staff_data') return ['delete successful'], [str(del_count)] def update(sql_dic, title): ''' 处理update语句命令 :param sql_dic: :param title: :return: ''' set_l = sql_dic.get('set').strip().split(',') set_list = [i.split('=') for i in set_l] update_count = 0 with open('staff_data', 'r', encoding='utf-8') as r_file,\ open('staff_data_bak', 'w', encoding='utf-8') as w_file: for line in r_file: dic = dict(zip(title.split(','), line.strip().split(','))) filter_res = logic_action(dic, sql_dic.get('where')) if filter_res: for i in set_list: k = i[0] v = i[-1] dic[k] = v line = [dic[i] for i in title.split(',')] update_count += 1 line = ','.join(line)+'\n' w_file.write(line) w_file.flush() os.remove('staff_data') os.rename('staff_data_bak', 'staff_data') return ['update successful'], [str(update_count)] def where_action(fh, where_list, title): ''' 具体处理where_list里的所有条件 :param fh: :param where_list: :param title: :return: ''' res = [] if len(where_list) != 0: for line in fh: dic = dict(zip(title.split(','), line.strip().split(','))) if dic['name'] != 'name': logic_res = logic_action(dic, where_list) if logic_res: res.append(line.strip().split(',')) else: res = [i.split(',') for i in fh.readlines()] return res pass def logic_action(dic, where_list): ''' 判断数据文件中每一条是否符合where_list条件 :param dic: :param where_list: :return: ''' logic = [] for exp in where_list: if type(exp) is list: exp_k, opt, exp_v = exp if exp[1] == '=': opt = '==' logical_char = "'%s'%s'%s'" % (dic[exp_k], opt, exp_v) if opt != 'like': exp = str(eval(logical_char)) else: if exp_v in dic[exp_k]: exp = 'True' else: exp = 'False' logic.append(exp) res = eval(' '.join(logic)) return res def limit_action(filter_res, limit_l): ''' 用列表切分处理显示符合条件的数量 :param filter_res: :param limit_l: :return: ''' if limit_l: index = int(limit_l[0]) res = filter_res[:index] else: res = filter_res return res def search_action(limit_res, select_list, title): ''' 处理需要查询并显示的title和相应数据 :param limit_res: :param select_list: :param title: :return: ''' res = [] fields_list = title.split(',') if select_list[0] == '*': res = limit_res else: fields_list = select_list for data in limit_res: dic = dict(zip(title.split(','), data)) r_l = [] for i in fields_list: r_l.append((dic[i].strip())) res.append(r_l) return fields_list, res if __name__ == '__main__': with open('staff_data', 'r', encoding='utf-8') as f: title = f.readline().strip() key_lis = ['select', 'insert', 'delete', 'update', 'from', 'into', 'set', 'values', 'where', 'limit'] while True: sql = input('请输入sql命令,退出请输入exit:').strip() sql = re.sub(' ', '', sql) if len(sql) == 0:continue if sql == 'exit':break sql_dict = sql_parse(sql, key_lis) fields_list, fields_data = sql_action(sql_dict, title) print('\033[1;33m结果如下:\033[0m') print('-'.join(fields_list)) for data in fields_data: print('-'.join(data))

总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

python对数据库表格里面的内容增删查改怎么写

本文主要给大家介绍了关于python模拟sql语句对员工表格进行增删改查的相关内容,分享出来供大家参考学习,下面来一起看看详细的介绍:
具体需求:
员工信息表程序,实现增删改查操作:
可进行模糊查询,语法支持下面3种:
select name,age from staff_data where age > 22 多个查询参数name,age 用','分割
select * from staff_data where dept = 人事
select * from staff_data where enroll_date like 2013
查到的信息,打印后,最后面还要显示查到的条数
可创建新员工纪录,以phone做唯一键,phone存在即提示,staff_id需自增,添加多个记录record1/record2中间用'/'分割
insert into staff_data values record1/record2
可删除指定员工信息纪录,输入员工id,即可删除
delete from staff_data where staff_id>=5andstaff_id<=10
可修改员工信息,语法如下:
update staff_table set dept=Market,phone=13566677787 where dept = 运维 多个set值用','分割
使用re模块,os模块,充分使用函数精简代码,熟练使用 str.split()来解析格式化字符串
由于,sql命令中的几个关键字符串有一定规律,只出现一次,并且有顺序!!!
按照key_lis = ['select', 'insert', 'delete', 'update', 'from', 'into', 'set', 'values', 'where', 'limit']的元素顺序分割sql.
分割元素作为sql_dic字典的key放进字典中.分割后的列表为b,如果len(b)>1,说明sql字符串中含有分割元素,同时b[0]对应上一个分割元素的值,b[-1]为下一次分割对象!
这样不断迭代直到把sql按出现的所有分割元素分割完毕,但注意这里每次循环都是先分割后赋值!!!当前分割元素比如'select'对应的值,需要等到下一个分割元素
比如'from'执行分割后的列表b,其中b[0]的值才会赋值给sql_dic['select'] ,所以最后一个分割元素的值,不能通过上述循环来完成,必须先处理可能是最后一个分割元素,再正常循环!!
在这sql语句中,有可能成为最后一个分割元素的 'limit' ,'values', 'where', 按优先级别,先处理'limit' ,再处理'values'或 'where'.....
处理完得到sql_dic后,就是你按不同命令执行,对数据文件的增删改查,最后返回处理结果!!
示例代码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308

# _*_coding:utf-8_*_# Author:Jaye Heimport reimport os def sql_parse(sql, key_lis): ''' 解析sql命令字符串,按照key_lis列表里的元素分割sql得到字典形式的命令sql_dic :param sql: :param key_lis: :return: ''' sql_list = [] sql_dic = {} for i in key_lis: b = [j.strip() for j in sql.split(i)] if len(b) > 1: if len(sql.split('limit')) > 1: sql_dic['limit'] = sql.split('limit')[-1] if i == 'where' or i == 'values': sql_dic[i] = b[-1] if sql_list: sql_dic[sql_list[-1]] = b[0] sql_list.append(i) sql = b[-1] else: sql = b[0] if sql_dic.get('select'): if not sql_dic.get('from') and not sql_dic.get('where'): sql_dic['from'] = b[-1] if sql_dic.get('select'): sql_dic['select'] = sql_dic.get('select').split(',') if sql_dic.get('where'): sql_dic['where'] = where_parse(sql_dic.get('where')) return sql_dic def where_parse(where): ''' 格式化where字符串为列表where_list,用'and', 'or', 'not'分割字符串 :param where: :return: ''' casual_l = [where] logic_key = ['and', 'or', 'not'] for j in logic_key: for i in casual_l: if i not in logic_key: if len(i.split(j)) > 1: ele = i.split(j) index = casual_l.index(i) casual_l.pop(index) casual_l.insert(index, ele[0]) casual_l.insert(index+1, j) casual_l.insert(index+2, ele[1]) casual_l = [k for k in casual_l if k] where_list = three_parse(casual_l, logic_key) return where_list def three_parse(casual_l, logic_key): ''' 处理临时列表casual_l中具体的条件,'staff_id>5'-->['staff_id','>','5'] :param casual_l: :param logic_key: :return: ''' where_list = [] for i in casual_l: if i not in logic_key: b = i.split('like') if len(b) > 1: b.insert(1, 'like') where_list.append(b) else: key = ['<', '=', '>'] new_lis = [] opt = '' lis = [j for j in re.split('([=<>])', i) if j] for k in lis: if k in key: opt += k else: new_lis.append(k) new_lis.insert(1, opt) where_list.append(new_lis) else: where_list.append(i) return where_list def sql_action(sql_dic, title): ''' 把解析好的sql_dic分发给相应函数执行处理 :param sql_dic: :param title: :return: ''' key = {'select': select, 'insert': insert, 'delete': delete, 'update': update} res = [] for i in sql_dic: if i in key: res = key[i](sql_dic, title) return res def select(sql_dic, title): ''' 处理select语句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r', encoding='utf-8') as fh: filter_res = where_action(fh, sql_dic.get('where'), title) limit_res = limit_action(filter_res, sql_dic.get('limit')) search_res = search_action(limit_res, sql_dic.get('select'), title) return search_res def insert(sql_dic, title): ''' 处理insert语句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r+', encoding='utf-8') as f: data = f.readlines() phone_list = [i.strip().split(',')[4] for i in data] ins_count = 0 if not data: new_id = 1 else: last = data[-1] last_id = int(last.split(',')[0]) new_id = last_id+1 record = sql_dic.get('values').split('/') for i in record: if i.split(',')[3] in phone_list: print('\033[1;31m%s 手机号已存在\033[0m' % i) else: new_record = '%s,%s\n' % (str(new_id), i) f.write(new_record) new_id += 1 ins_count += 1 f.flush() return ['insert successful'], [str(ins_count)] def delete(sql_dic, title): ''' 处理delete语句命令 :param sql_dic: :param title: :return: ''' with open('staff_data', 'r', encoding='utf-8') as r_file,\ open('staff_data_bak', 'w', encoding='utf-8') as w_file: del_count = 0 for line in r_file: dic = dict(zip(title.split(','), line.split(','))) filter_res = logic_action(dic, sql_dic.get('where')) if not filter_res: w_file.write(line) else: del_count += 1 w_file.flush() os.remove('staff_data') os.rename('staff_data_bak', 'staff_data') return ['delete successful'], [str(del_count)] def update(sql_dic, title): ''' 处理update语句命令 :param sql_dic: :param title: :return: ''' set_l = sql_dic.get('set').strip().split(',') set_list = [i.split('=') for i in set_l] update_count = 0 with open('staff_data', 'r', encoding='utf-8') as r_file,\ open('staff_data_bak', 'w', encoding='utf-8') as w_file: for line in r_file: dic = dict(zip(title.split(','), line.strip().split(','))) filter_res = logic_action(dic, sql_dic.get('where')) if filter_res: for i in set_list: k = i[0] v = i[-1] dic[k] = v line = [dic[i] for i in title.split(',')] update_count += 1 line = ','.join(line)+'\n' w_file.write(line) w_file.flush() os.remove('staff_data') os.rename('staff_data_bak', 'staff_data') return ['update successful'], [str(update_count)] def where_action(fh, where_list, title): ''' 具体处理where_list里的所有条件 :param fh: :param where_list: :param title: :return: ''' res = [] if len(where_list) != 0: for line in fh: dic = dict(zip(title.split(','), line.strip().split(','))) if dic['name'] != 'name': logic_res = logic_action(dic, where_list) if logic_res: res.append(line.strip().split(',')) else: res = [i.split(',') for i in fh.readlines()] return res pass def logic_action(dic, where_list): ''' 判断数据文件中每一条是否符合where_list条件 :param dic: :param where_list: :return: ''' logic = [] for exp in where_list: if type(exp) is list: exp_k, opt, exp_v = exp if exp[1] == '=': opt = '==' logical_char = "'%s'%s'%s'" % (dic[exp_k], opt, exp_v) if opt != 'like': exp = str(eval(logical_char)) else: if exp_v in dic[exp_k]: exp = 'True' else: exp = 'False' logic.append(exp) res = eval(' '.join(logic)) return res def limit_action(filter_res, limit_l): ''' 用列表切分处理显示符合条件的数量 :param filter_res: :param limit_l: :return: ''' if limit_l: index = int(limit_l[0]) res = filter_res[:index] else: res = filter_res return res def search_action(limit_res, select_list, title): ''' 处理需要查询并显示的title和相应数据 :param limit_res: :param select_list: :param title: :return: ''' res = [] fields_list = title.split(',') if select_list[0] == '*': res = limit_res else: fields_list = select_list for data in limit_res: dic = dict(zip(title.split(','), data)) r_l = [] for i in fields_list: r_l.append((dic[i].strip())) res.append(r_l) return fields_list, res if __name__ == '__main__': with open('staff_data', 'r', encoding='utf-8') as f: title = f.readline().strip() key_lis = ['select', 'insert', 'delete', 'update', 'from', 'into', 'set', 'values', 'where', 'limit'] while True: sql = input('请输入sql命令,退出请输入exit:').strip() sql = re.sub(' ', '', sql) if len(sql) == 0:continue if sql == 'exit':break sql_dict = sql_parse(sql, key_lis) fields_list, fields_data = sql_action(sql_dict, title) print('\033[1;33m结果如下:\033[0m') print('-'.join(fields_list)) for data in fields_data: print('-'.join(data))

总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

Python之MySQL操作

MySQL 是目前使用最广泛的数据库之一,它有着良好的性能,能够跨平台,支持分布式,能够承受高并发。下载地址: MySQL :: Download MySQL Community Server 安装参考: 图解MySQL5.7.20免安装版配置方法-百度经验 (.com)

Python 大致有如下 5 种方式操作 MySQL。

先使用如下建表语句创建一张简单的数据库表。

2.1 mysqlclient

执行 pip install mysqlclient 进行安装,看一下具体操作。

新增

查询

cursor 查看方法

修改

删除

2.2 PyMySQL

执行 pip install pymysql 进行安装,使用方式与 mysqlclient 基本类似。

2.3 peewee

执行 pip install peewee 进行安装,看一下具体操作。

定义映射类

新增

查询

修改

删除

2.4 SQLAlchemy

执行 pip install sqlalchemy 进行安装,看一下具体操作。

定义映射类

新增

查询

修改

删除

Python学习日记

在python3下怎样用flask-sqlalchemy对mysql数据库操作

以 Debian/Ubuntu 为例(请确保有管理员权限):

1.MySQL

代码如下:

apt-get install mysql-server

apt-get install mysql-client

apt-get install libmysqlclient15-dev

2.python-mysqldb

代码如下:

apt-get install python-mysqldb

3.easy_install

代码如下:

wget http:// peak.telecommunity.com/dist/ez_setup.py

python ez_setup.py

4.MySQL-Python

代码如下:

easy_install MySQL-Python

5.SQLAlchemy

代码如下:

easy_install SQLAlchemy

6、安装完成后使用下面代码测试连接

代码如下:

from sqlalchemy import create_engine

from sqlalchemy.orm import sessionmaker

DB_CONNECT_STRING = 'mysql+mysqldb://root:123@localhost/ooxx?charset=utf8'

engine = create_engine(DB_CONNECT_STRING, echo=True)

DB_Session = sessionmaker(bind=engine)

session = DB_Session()

7、数据操作(增删改查)

代码如下:

from sqlalchemy import func, or_, not_

user = User(name='a')

session.add(user)

user = User(name='b')

session.add(user)

user = User(name='a')

session.add(user)

user = User()

session.add(user)

session.commit()

query = session.query(User)

print query # 显示SQL 语句

print query.statement # 同上

for user in query: # 遍历时查询

print user.name

print query.all() # 返回的是一个类似列表的对象

print query.first().name # 记录不存在时,first() 会返回 None

# print query.one().name # 不存在,或有多行记录时会抛出异常

print query.filter(User.id == 2).first().name

print query.get(2).name # 以主键获取,等效于上句

print query.filter('id = 2').first().name # 支持字符串

query2 = session.query(User.name)

print query2.all() # 每行是个元组

print query2.limit(1).all() # 最多返回 1 条记录

print query2.offset(1).all() # 从第 2 条记录开始返回

print query2.order_by(User.name).all()

print query2.order_by('name').all()

print query2.order_by(User.name.desc()).all()

print query2.order_by('name desc').all()

print session.query(User.id).order_by(User.name.desc(), User.id).all()

print query2.filter(User.id == 1).scalar() # 如果有记录,返回第一条记录的第一个元素

print session.query('id').select_from(User).filter('id = 1').scalar()

print query2.filter(User.id > 1, User.name != 'a').scalar() # and

query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and

query3 = query3.filter(User.name != 'a')

print query3.scalar()

print query2.filter(or_(User.id == 1, User.id == 2)).all() # or

print query2.filter(User.id.in_((1, 2))).all() # in

query4 = session.query(User.id)

print query4.filter(User.name == None).scalar()

print query4.filter('name is null').scalar()

print query4.filter(not_(User.name == None)).all() # not

print query4.filter(User.name != None).all()

print query4.count()

print session.query(func.count('*')).select_from(User).scalar()

print session.query(func.count('1')).select_from(User).scalar()

print session.query(func.count(User.id)).scalar()

print session.query(func.count('*')).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表

print session.query(func.count('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 可以用 limit() count() 的返回数

print session.query(func.sum(User.id)).scalar()

print session.query(func.now()).scalar() # func 后可以跟任意函数名,只要该数据库支持

print session.query(func.current_timestamp()).scalar()

print session.query(func.md5(User.name)).filter(User.id == 1).scalar()

query.filter(User.id == 1).update({User.name: 'c'})

user = query.get(1)

print user.name

user.name = 'd'

session.flush() # 写数据库,但并不提交

print query.get(1).name

session.delete(user)

session.flush()

print query.get(1)

session.rollback()

print query.get(1).name

query.filter(User.id == 1).delete()

session.commit()

print query.get(1)

显示全文