每天迁移MySQL历史数据到历史库Python脚本
本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下
#!/usr/bin/env python
# coding:utf-8
__author__ = 'John'
import MySQLdb
import sys
import datetime
import time
class ClassMigrate(object):
def _get_argv(self):
self.usage = """
usage():
python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \\
--dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \\
--delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180
"""
if len(sys.argv) == 1:
print self.usage
sys.exit(1)
elif sys.argv[1] == '--help' or sys.argv[1] == '-h':
print self.usage
sys.exit()
elif len(sys.argv) > 2:
for i in sys.argv[1:]:
_argv = i.split('=')
if _argv[0] == '--source':
_list = _argv[1].split('/')
self.source_host = _list[0].split(':')[0]
self.source_port = int(_list[0].split(':')[1])
self.source_db = _list[1].split(':')[0]
self.source_tab = _list[1].split(':')[1]
self.source_user = _list[2]
self.source_password = _list[3]
elif _argv[0] == '--dest':
_list = _argv[1].split('/')
self.dest_host = _list[0].split(':')[0]
self.dest_port = int(_list[0].split(':')[1])
self.dest_db = _list[1].split(':')[0]
self.dest_tab = _list[1].split(':')[1]
self.dest_user = _list[2]
self.dest_password = _list[3]
elif _argv[0] == '--delete_strategy':
self.deleteStrategy = _argv[1]
if self.deleteStrategy not in ('delete', 'drop'):
print (self.usage)
sys.exit(1)
elif _argv[0] == '--primary_key':
self.pk = _argv[1]
elif _argv[0] == '--date_col':
self.date_col = _argv[1]
elif _argv[0] == '--time_interval':
self.interval = _argv[1]
else:
print (self.usage)
sys.exit(1)
def __init__(self):
self._get_argv()
## --------------------------------------------------------------------
self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8')
self.sourcedb_conn_str.autocommit(True)
self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8')
self.destdb_conn_str.autocommit(True)
## --------------------------------------------------------------------
self.template_tab = self.source_tab + '_template'
self.step_size = 20000
## --------------------------------------------------------------------
self._migCompleteState = False
self._deleteCompleteState = False
## --------------------------------------------------------------------
self.source_cnt = ''
self.source_min_id = ''
self.source_max_id = ''
self.source_checksum = ''
self.dest_cn = ''
## --------------------------------------------------------------------
self.today = time.strftime("%Y-%m-%d")
# self.today = '2016-05-30 09:59:40'
def sourcedb_query(self, sql, sql_type):
try:
cr = self.sourcedb_conn_str.cursor()
cr.execute(sql)
if sql_type == 'select':
return cr.fetchall()
elif sql_type == 'dml':
rows = self.sourcedb_conn_str.affected_rows()
return rows
else:
return True
except Exception, e:
print (str(e) + "<br>")
return False
finally:
cr.close()
def destdb_query(self, sql, sql_type, values=''):
try:
cr = self.destdb_conn_str.cursor()
if sql_type == 'select':
cr.execute(sql)
return cr.fetchall()
elif sql_type == 'insertmany':
cr.executemany(sql, values)
rows = self.destdb_conn_str.affected_rows()
return rows
else:
cr.execute(sql)
return True
except Exception, e:
print (str(e) + "<br>")
return False
finally:
cr.close()
def create_table_from_source(self):
'''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。'''
try:
sql = "show create table %s;" % self.source_tab
create_str = self.sourcedb_query(sql, 'select')[0][1]
create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
self.destdb_query(create_str, 'ddl')
return True
except Exception, e:
print (str(e) + "<br>")
return False
def create_table_from_template(self):
try:
sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab)
state = self.destdb_query(sql, 'ddl')
if state:
return True
else:
return False
except Exception, e:
print (str(e + "<br>") + "<br>")
return False
def get_min_max(self):
""" 创建目标表、并获取源表需要迁移的总条数、最小id、最大id """
try:
print ("\nStarting Migrate at -- %s <br>") % (datetime.datetime.now().__str__())
sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
% (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
q = self.sourcedb_query(sql, 'select')
self.source_cnt = q[0][0]
self.source_min_id = q[0][1]
self.source_max_id = q[0][2]
self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id)
if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1:
print ("There is 0 record in source table been matched! <br>")
return False
else:
return True
except Exception, e:
print (str(e) + "<br>")
return False
def migrate_2_destdb(self):
try:
get_min_max_id = self.get_min_max()
if get_min_max_id:
k = self.source_min_id
desc_sql = "desc %s;" % self.source_tab
# self.filed = []
cols = self.sourcedb_query(desc_sql, 'select')
# for j in cols:
# self.filed.append(j[0])
fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句
fileds = fileds.rstrip(',')
while k <= self.source_max_id:
sql = """select * from %s where %s >= %d and %s< %d \
and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
% (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
print ("\n%s <br>") % sql
starttime = datetime.datetime.now()
results = self.sourcedb_query(sql, 'select')
insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds
rows = self.destdb_query(insert_sql, 'insertmany', results)
if rows == False:
print ("Insert failed!! <br>")
else:
print ("Inserted %s rows. <br>") % rows
endtime = datetime.datetime.now()
timeinterval = endtime - starttime
print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
k += self.step_size
print ("\nInsert complete at -- %s <br>") % (datetime.datetime.now().__str__())
return True
else:
return False
except Exception, e:
print (str(e) + "<br>")
return False
def verify_total_cnt(self):
try:
sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
% (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
dest_result = self.destdb_query(sql, 'select')
self.dest_cnt = dest_result[0][0]
dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2])
print ("source_checksum: %s, dest_checksum: %s <br>") % (self.source_checksum, dest_checksum)
if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum:
self._migCompleteState = True
print ("Verify successfully !!<br>")
else:
print ("Verify failed !!<br>")
sys.exit(77)
except Exception, e:
print (str(e) + "<br>")
def drop_daily_partition(self):
try:
if self._migCompleteState:
sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
% (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
partition_name = self.sourcedb_query(sql, 'select')
partition_name = partition_name[0][3]
sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)""" \
% (self.pk, self.pk, self.source_tab, partition_name)
q = self.sourcedb_query(sql, 'select')
source_cnt = q[0][0]
source_min_id = q[0][1]
source_max_id = q[0][2]
checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id)
if source_cnt == 0 or source_min_id == -1 or source_max_id == -1:
print ("There is 0 record in source PARTITION been matched! <br>")
else:
if checksum == self.source_checksum:
drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name)
droped = self.sourcedb_query(drop_par_sql, 'ddl')
if droped:
print (drop_par_sql + " <br>")
print ("\nDrop partition complete at -- %s <br>") % (datetime.datetime.now().__str__())
self._deleteCompleteState = True
else:
print (drop_par_sql + " <br>")
print ("Drop partition failed.. <br>")
else:
print ("The partition %s checksum failed !! Drop failed !!") % partition_name
sys.exit(77)
except Exception, e:
print (str(e) + "<br>")
def delete_data(self):
try:
if self._migCompleteState:
k = self.source_min_id
while k <= self.source_max_id:
sql = """delete from %s where %s >= %d and %s< %d \
and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
% (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
print ("\n%s <br>") % sql
starttime = datetime.datetime.now()
rows = self.sourcedb_query(sql, 'dml')
if rows == False:
print ("Delete failed!! <br>")
else:
print ("Deleted %s rows. <br>") % rows
endtime = datetime.datetime.now()
timeinterval = endtime - starttime
print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
time.sleep(1)
k += self.step_size
print ("\nDelete complete at -- %s <br>") % (datetime.datetime.now().__str__())
self._deleteCompleteState = True
except Exception, e:
print (str(e) + "<br>")
def do(self):
tab_create = self.create_table_from_template()
if tab_create:
migration = self.migrate_2_destdb()
if migration:
self.verify_total_cnt()
if self._migCompleteState:
if self.deleteStrategy == 'drop':
self.drop_daily_partition()
else:
self.delete_data()
print ("\n<br>")
print ("====="*5 + '<br>')
print ("source_total_cnt: %s <br>") % self.source_cnt
print ("dest_total_cnt: %s <br>") % self.dest_cnt
print ("====="*5 + '<br>')
if self._deleteCompleteState:
print ("\nFinal result: Successfully !! <br>")
sys.exit(88)
else:
print ("\nFinal result: Failed !! <br>")
sys.exit(254)
else:
print ("Create table failed ! Exiting. . .")
sys.exit(255)
f = ClassMigrate()
f.do()
以上是 每天迁移MySQL历史数据到历史库Python脚本 的全部内容, 来源链接: utcz.com/z/342344.html