本文共 3964 字,大约阅读时间需要 13 分钟。
#File:ezpymysql.py
#作者:\绒毛
\ quot; \ quot; \ quot; \轻便的包装器大约PyMySQL的。
仅\\ n00为\\ n00 python3
\ quot; \ quot; \ quot;
导入\时间
导入\测井
导入\追溯
导入\ pymysql.cursors
版本\ = \ \ quot; 0.7 \\"
version_info \ = \ (0,7,7,0,0)
类连接(对象):
\ quot; \ quot; \ quot; \轻便的包装器大约PyMySQL的。
\ quot; \ quot; \ quot;
def \ __ init __(自己,主机,数据库,用户=无,密码=无,
端口= 0,
max_idle_sp = 3 \
time_zone = \ sp; +0; 00 \ n; b; ; \ quot; utf8mb4",\ sql_mode = \ quot;传统""):
self.host \ = \主办
self.database \ = \数据库
self.max_idle_time \ = \浮动(max_idle_time)
args \ = \ dict(use_unicode = True,\\ charset =字符集,
\ bb;
\ Nbsp; init_command =(" SET \\ time_zone \"%s"%s time_zone),
\ sb =;
\ n;
如果用户\是\不是\\ n没有:
args [\ quot;用户\ quot;] = \用户
如果密码是\不是\\ n没有:
args [\ quot; passwd"]" = \密码
#\我们\接受\路径至\ \ MySQL \博克\ \ bb;
如果\ quot;/\ quot; 在\中主办:
args [\ quot; unix_socket \\" \\\" = \主办
其他:
self.socket \ = \没有
对\ = \ host.split(\ quot ;: \ quot)
如果len(对)\\ n == \ 2:
args [\ quot;主机\\" \\"
args [\ quot;端口\\";" \ (n1sp;)
其他:
args [\ quot;主机\\" \\"
args [\ quot;端口\\";" \ 3
如果港口:
args [" port"] \\ n = \港口
self._db \\ n = \没有
self._db_args \ = \ args
self._last_use_time \\ ubsp; = \ time.time()
尝试:
self.reconnect()
除了例外:
\; logging.error(\ quot;无法将连接到\ n上的MySQL \ n; sp;%\ b;
\ n; exc_info = True)
def \ _ensure_connected(个体经营):
#\ mysql \\\由默认\关闭\\ n客户\连接\那个是\\ n00为\\ n00
#\ 8 \\ n小时,\但\ \客户\库\不会不是\\ n不是\ nb;
#\您\尝试至\执行\查询\\\和它\失败sp; fa \ n;
#\情况\由抢先地关闭\\ n和重新打开\连接
#\如果它\具有\\ n00已\闲置为\\ n00太长\小时\小时\小时\小时\ nb;
如果(self._db \\ n为\\ n \\ n或
(time.time()-self._last_use_time \ gt; \);)
self.reconnect()
self._last_use_time \\ ubsp; = \ time.time()
def \ _cursor(个体):
self._ensure_connected()
返回\ self._db.cursor()
def \ __ del __(个体经营):
self.close()
def \关闭(个体经营):
\ quot; \ quot; \ quot;关闭\\ n这个\数据库\连接。 \ quot; \ quot; \ quot;
如果getattr(自身," _ db","无")是\不是\\ n没有:
self._db.close()
self._db \\ n = \没有
def \重新连接(个体经营):
\ quot; \ quot; \ quot;关闭\\ n \现有\数据库\连接\\ n和重新打开"
self.close()
self._db \\ n = \ pymysql.connect(** self._db_args)
self._db.autocommit(正确)
def \查询(自己,*参数,** ** kw参数):
\ quot; \ quot; \ quot;返回\ \第\行列出\为\\ n00 \给定\ sp;和\ n
游标= \ self._cursor()
尝试:
cursor.execute(查询,\\ kw参数\nbsp;或参数)
结果\ = \ cursor.fetchall()
返回\结果
最后:
cursor.close()
def \获取(自己,查询,*参数,** kw参数):
\ Nbsp; \ nbsp ;; \ nbsp ;; \\\\ nbsp \\ nbsp \ quot; \ quot; \ quot;返回\ \ (单数)第\行返回\由 \给予\\ n00查询。
\ quot; \ quot; \ quot;
游标= \ self._cursor()
尝试:
cursor.execute(查询,\\ kw参数\nbsp;或参数)
返回\ cursor.fetchone()
最后:
cursor.close()
def \执行(自我,查询,*参数,** kwparameters):
\ quot; \ quot; \ quot;执行\ \给予\\ n00查询,返回\ \ row nsp;从
游标= \ self._cursor()
尝试:
cursor.execute(查询,\\ kw参数\nbsp;或参数)
返回\ cursor.lastrowid
除了异常\为 e:
如果e.args [0] \\ n == \ 1062:
通过
其他:
traceback.print_exc()
提高\
最后:
cursor.close()
插入\ = \执行
## \ ===============高\级别方法为\\ n00表\ ================== def \ table_has(自身,table_name,字段,\)值:
如果isinstance(值,str):
值\\ ubsp; = \ value.encode(" utf8")
sql = \ " SELECT \\%s \\ FROM}%s \\%s = \ quot;%b \ ;; bsp;
领域,
table_name,
领域,
值)
d \\ n = \ self.get(SQL)
返回\ d
def \ table_insert(自身,table_name,项):
"""项目"是\\ a字典:密钥是是mysql 表字段" "
栏位= \清单(item.keys())
值\ = \列表(item.values())
fieldstr = \ ","。加入(字段)
valstr = \ ","。加入(["%s"] * *(len(item))
为\\ n00我\在\中范围(len(值)):
如果isinstance(值[i],str):
值[i] = \值")
sql = \ "将INSERT插入到%s(%s)值(%s)中"。 ; fieldstr,\\ ubsp; valstr)
尝试:
last_id \\ n00; = \ self.execute(sql,*值)
返回\ last_id
除了异常\为 e:
如果e.args [0] \\ n == \ 1062:
#\只是\跳过\ npsp; npsp; nbsp; npsp; npsp; npsp
通过
其他:
traceback.print_exc()
打印(" sql:",\\ sql)
打印("项目:")
为\\ n00我\在\ nbs;中()
(n]
\\ ubs(\ b; 300:
\ n;栏位[i],\\ ubsp; " \\ ::\\",\\ len(vs),\类型(值[i])
\ bb;
\ n;栏位[i],\\ ubsp; " \\ ::\\",\\与\\ ubsp;类型(值[i])
提高\
def \ table_update(自身,table_name,更新,
\ n;
""更新\\\\是\的dict \的{field_update:value_update}"""
心烦意乱= \ []
值\ = \ []
为\\ n00 k,\ v 在\中updates.items():
s \ = \ "%s = %% s"
upsets.append(s)
values.append(v)
心烦意乱= \ ","。加入(不安)
sql = \ "更新%s设置SET%s,其中%s =%s。
table_name,
心烦意乱
field_where,\ value_where,
)
self.execute(SQL,*(值))
转载地址:http://ltodl.baihongyu.com/