每天迁移MySQL历史数据到历史库Python脚本

本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下

#!/usr/bin/env python

# coding:utf-8

__author__ = 'John'

import MySQLdb

import sys

import datetime

import time

class ClassMigrate(object):

def _get_argv(self):

self.usage = """

usage():

python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \\

--dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \\

--delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180

"""

if len(sys.argv) == 1:

print self.usage

sys.exit(1)

elif sys.argv[1] == '--help' or sys.argv[1] == '-h':

print self.usage

sys.exit()

elif len(sys.argv) > 2:

for i in sys.argv[1:]:

_argv = i.split('=')

if _argv[0] == '--source':

_list = _argv[1].split('/')

self.source_host = _list[0].split(':')[0]

self.source_port = int(_list[0].split(':')[1])

self.source_db = _list[1].split(':')[0]

self.source_tab = _list[1].split(':')[1]

self.source_user = _list[2]

self.source_password = _list[3]

elif _argv[0] == '--dest':

_list = _argv[1].split('/')

self.dest_host = _list[0].split(':')[0]

self.dest_port = int(_list[0].split(':')[1])

self.dest_db = _list[1].split(':')[0]

self.dest_tab = _list[1].split(':')[1]

self.dest_user = _list[2]

self.dest_password = _list[3]

elif _argv[0] == '--delete_strategy':

self.deleteStrategy = _argv[1]

if self.deleteStrategy not in ('delete', 'drop'):

print (self.usage)

sys.exit(1)

elif _argv[0] == '--primary_key':

self.pk = _argv[1]

elif _argv[0] == '--date_col':

self.date_col = _argv[1]

elif _argv[0] == '--time_interval':

self.interval = _argv[1]

else:

print (self.usage)

sys.exit(1)

def __init__(self):

self._get_argv()

## --------------------------------------------------------------------

self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8')

self.sourcedb_conn_str.autocommit(True)

self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8')

self.destdb_conn_str.autocommit(True)

## --------------------------------------------------------------------

self.template_tab = self.source_tab + '_template'

self.step_size = 20000

## --------------------------------------------------------------------

self._migCompleteState = False

self._deleteCompleteState = False

## --------------------------------------------------------------------

self.source_cnt = ''

self.source_min_id = ''

self.source_max_id = ''

self.source_checksum = ''

self.dest_cn = ''

## --------------------------------------------------------------------

self.today = time.strftime("%Y-%m-%d")

# self.today = '2016-05-30 09:59:40'

def sourcedb_query(self, sql, sql_type):

try:

cr = self.sourcedb_conn_str.cursor()

cr.execute(sql)

if sql_type == 'select':

return cr.fetchall()

elif sql_type == 'dml':

rows = self.sourcedb_conn_str.affected_rows()

return rows

else:

return True

except Exception, e:

print (str(e) + "<br>")

return False

finally:

cr.close()

def destdb_query(self, sql, sql_type, values=''):

try:

cr = self.destdb_conn_str.cursor()

if sql_type == 'select':

cr.execute(sql)

return cr.fetchall()

elif sql_type == 'insertmany':

cr.executemany(sql, values)

rows = self.destdb_conn_str.affected_rows()

return rows

else:

cr.execute(sql)

return True

except Exception, e:

print (str(e) + "<br>")

return False

finally:

cr.close()

def create_table_from_source(self):

'''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。'''

try:

sql = "show create table %s;" % self.source_tab

create_str = self.sourcedb_query(sql, 'select')[0][1]

create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')

self.destdb_query(create_str, 'ddl')

return True

except Exception, e:

print (str(e) + "<br>")

return False

def create_table_from_template(self):

try:

sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab)

state = self.destdb_query(sql, 'ddl')

if state:

return True

else:

return False

except Exception, e:

print (str(e + "<br>") + "<br>")

return False

def get_min_max(self):

""" 创建目标表、并获取源表需要迁移的总条数、最小id、最大id """

try:

print ("\nStarting Migrate at -- %s <br>") % (datetime.datetime.now().__str__())

sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \

and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \

% (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)

q = self.sourcedb_query(sql, 'select')

self.source_cnt = q[0][0]

self.source_min_id = q[0][1]

self.source_max_id = q[0][2]

self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id)

if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1:

print ("There is 0 record in source table been matched! <br>")

return False

else:

return True

except Exception, e:

print (str(e) + "<br>")

return False

def migrate_2_destdb(self):

try:

get_min_max_id = self.get_min_max()

if get_min_max_id:

k = self.source_min_id

desc_sql = "desc %s;" % self.source_tab

# self.filed = []

cols = self.sourcedb_query(desc_sql, 'select')

# for j in cols:

# self.filed.append(j[0])

fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句

fileds = fileds.rstrip(',')

while k <= self.source_max_id:

sql = """select * from %s where %s >= %d and %s< %d \

and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \

and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\

% (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)

print ("\n%s <br>") % sql

starttime = datetime.datetime.now()

results = self.sourcedb_query(sql, 'select')

insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds

rows = self.destdb_query(insert_sql, 'insertmany', results)

if rows == False:

print ("Insert failed!! <br>")

else:

print ("Inserted %s rows. <br>") % rows

endtime = datetime.datetime.now()

timeinterval = endtime - starttime

print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")

k += self.step_size

print ("\nInsert complete at -- %s <br>") % (datetime.datetime.now().__str__())

return True

else:

return False

except Exception, e:

print (str(e) + "<br>")

return False

def verify_total_cnt(self):

try:

sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \

and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \

% (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)

dest_result = self.destdb_query(sql, 'select')

self.dest_cnt = dest_result[0][0]

dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2])

print ("source_checksum: %s, dest_checksum: %s <br>") % (self.source_checksum, dest_checksum)

if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum:

self._migCompleteState = True

print ("Verify successfully !!<br>")

else:

print ("Verify failed !!<br>")

sys.exit(77)

except Exception, e:

print (str(e) + "<br>")

def drop_daily_partition(self):

try:

if self._migCompleteState:

sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')

and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\

% (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)

partition_name = self.sourcedb_query(sql, 'select')

partition_name = partition_name[0][3]

sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)""" \

% (self.pk, self.pk, self.source_tab, partition_name)

q = self.sourcedb_query(sql, 'select')

source_cnt = q[0][0]

source_min_id = q[0][1]

source_max_id = q[0][2]

checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id)

if source_cnt == 0 or source_min_id == -1 or source_max_id == -1:

print ("There is 0 record in source PARTITION been matched! <br>")

else:

if checksum == self.source_checksum:

drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name)

droped = self.sourcedb_query(drop_par_sql, 'ddl')

if droped:

print (drop_par_sql + " <br>")

print ("\nDrop partition complete at -- %s <br>") % (datetime.datetime.now().__str__())

self._deleteCompleteState = True

else:

print (drop_par_sql + " <br>")

print ("Drop partition failed.. <br>")

else:

print ("The partition %s checksum failed !! Drop failed !!") % partition_name

sys.exit(77)

except Exception, e:

print (str(e) + "<br>")

def delete_data(self):

try:

if self._migCompleteState:

k = self.source_min_id

while k <= self.source_max_id:

sql = """delete from %s where %s >= %d and %s< %d \

and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \

and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \

% (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)

print ("\n%s <br>") % sql

starttime = datetime.datetime.now()

rows = self.sourcedb_query(sql, 'dml')

if rows == False:

print ("Delete failed!! <br>")

else:

print ("Deleted %s rows. <br>") % rows

endtime = datetime.datetime.now()

timeinterval = endtime - starttime

print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")

time.sleep(1)

k += self.step_size

print ("\nDelete complete at -- %s <br>") % (datetime.datetime.now().__str__())

self._deleteCompleteState = True

except Exception, e:

print (str(e) + "<br>")

def do(self):

tab_create = self.create_table_from_template()

if tab_create:

migration = self.migrate_2_destdb()

if migration:

self.verify_total_cnt()

if self._migCompleteState:

if self.deleteStrategy == 'drop':

self.drop_daily_partition()

else:

self.delete_data()

print ("\n<br>")

print ("====="*5 + '<br>')

print ("source_total_cnt: %s <br>") % self.source_cnt

print ("dest_total_cnt: %s <br>") % self.dest_cnt

print ("====="*5 + '<br>')

if self._deleteCompleteState:

print ("\nFinal result: Successfully !! <br>")

sys.exit(88)

else:

print ("\nFinal result: Failed !! <br>")

sys.exit(254)

else:

print ("Create table failed ! Exiting. . .")

sys.exit(255)

f = ClassMigrate()

f.do()

以上是 每天迁移MySQL历史数据到历史库Python脚本 的全部内容, 来源链接: utcz.com/z/342344.html

回到顶部