网络编程
位置:首页>> 网络编程>> 数据库>> 抽取oracle数据到mysql数据库的实现过程

抽取oracle数据到mysql数据库的实现过程

作者:skillfulit  发布时间:2024-01-14 03:07:12 

标签:oracle,mysql,数据库

在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:

1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql

2、建立一个目录ETL_DIR

3、运行oracle数据库程序P_ETL_ORA_DATA,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql

4、导入mysql数据,文件内容如下


load data infile "alarm_hist_inc.csv" into table alarm_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "button_authority.csv" into table button_authority fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "c3_sms_hist_inc.csv" into table c3_sms_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "datapermisson.csv" into table datapermisson fields terminated by "," enclosed by "^" lines terminated by "\r\n";

附:数据库脚本P_ETL_ORA_DATA


CREATE OR REPLACE PROCEDURE P_ETL_ORA_DATA
(
 P_ORA_DIR  VARCHAR2,
 P_DATA_PATH VARCHAR2
) IS
 TYPE T_REC IS RECORD(
   TBN VARCHAR2(40),
   WHR VARCHAR2(4000));
 TYPE T_TABS IS TABLE OF T_REC;
 V_TABS   T_TABS := T_TABS();
 V_ETL_DIR  VARCHAR2(40) := P_ORA_DIR;
 V_LOAD_FILE UTL_FILE.FILE_TYPE;
 PROCEDURE ETL_DATA
 (
   P_SQL_STMT VARCHAR2,
   P_DATA_PATH VARCHAR2,
   P_TB_NAME  VARCHAR2
 ) IS
 BEGIN
   DECLARE
     V_VAR_COL  VARCHAR2(32767);
     V_NUM_COL  NUMBER;
     V_DATE_COL DATE;
     V_TMZ    TIMESTAMP;
     V_COLS   NUMBER;
     V_COLS_DESC DBMS_SQL.DESC_TAB;
     V_ROW_STR  VARCHAR2(32767);
     V_COL_STR  VARCHAR2(32767);
     V_SQL_ID  NUMBER;
     V_SQL_REF  SYS_REFCURSOR;
     V_EXP_FILE UTL_FILE.FILE_TYPE;
     V_DATA_PATH VARCHAR2(200);
   BEGIN
     V_DATA_PATH := P_DATA_PATH;
     IF REGEXP_SUBSTR(V_DATA_PATH, '\\$') IS NULL
     THEN
       V_DATA_PATH := V_DATA_PATH || '\';
     END IF;
     V_DATA_PATH := REPLACE(V_DATA_PATH, '\', '\\');
     OPEN V_SQL_REF FOR P_SQL_STMT;
     V_SQL_ID := DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF);
     DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID, V_COLS, V_COLS_DESC);
     FOR I IN V_COLS_DESC.FIRST .. V_COLS_DESC.LAST
     LOOP
       CASE
         WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN
           DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_VAR_COL, 32767);
         WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN
           DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_NUM_COL);
         WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN
           DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_DATE_COL);
         WHEN V_COLS_DESC(I).COL_TYPE = 180 THEN
           DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_TMZ);
       END CASE;
     END LOOP;
     DECLARE
       V_FLUSH_OVER PLS_INTEGER := 1;
       V_FILE_OVER PLS_INTEGER := 1;
       V_FILE_NO  PLS_INTEGER := 1;
       V_FILE_NAME VARCHAR2(200);
       V_LINE    VARCHAR2(400);
     BEGIN
       WHILE DBMS_SQL.FETCH_ROWS(V_SQL_ID) > 0
       LOOP
         IF V_FILE_OVER = 1
         THEN
           V_FILE_NAME := P_TB_NAME || '_' || V_FILE_NO || '.csv';
           V_EXP_FILE := UTL_FILE.FOPEN(V_ETL_DIR, V_FILE_NAME, OPEN_MODE => 'w', MAX_LINESIZE => 32767);
         END IF;
         V_ROW_STR := '';
         FOR I IN 1 .. V_COLS
         LOOP
           V_COL_STR := '\N';
           BEGIN
             CASE
               WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN
                 DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_VAR_COL);
                 IF V_VAR_COL IS NOT NULL
                 THEN
                   V_COL_STR := '^' || V_VAR_COL || '^';
                 END IF;
               WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN
                 DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_NUM_COL);
                 IF V_NUM_COL IS NOT NULL
                 THEN
                   V_COL_STR := V_NUM_COL;
                 END IF;
               WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN
                 DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_DATE_COL);
                 IF V_DATE_COL IS NOT NULL
                 THEN
                   V_COL_STR := '^' || TO_CHAR(V_DATE_COL, 'yyyy-mm-dd hh24:mi:ss') || '^';
                 END IF;
               WHEN V_COLS_DESC(I).COL_TYPE IN (180, 181, 231) THEN
                 DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_TMZ);
                 IF V_TMZ IS NOT NULL
                 THEN
                   V_COL_STR := '^' || TO_CHAR(V_TMZ, 'yyyy-mm-dd hh24:mi:ss.ff6') || '^';
                 END IF;
             END CASE;
             IF I = 1
             THEN
               V_ROW_STR := V_COL_STR;
             ELSE
               V_ROW_STR := V_ROW_STR || ',' || V_COL_STR;
             END IF;
           END;
         END LOOP;
         UTL_FILE.PUT_LINE(V_EXP_FILE, CONVERT(V_ROW_STR, 'UTF8'));
         IF V_FILE_OVER > 200000 /*每200000条记录就产生一个新的文件*/
         THEN
           V_FILE_OVER := 1;
           V_FLUSH_OVER := 1;
           V_FILE_NO  := V_FILE_NO + 1;
           UTL_FILE.FCLOSE(V_EXP_FILE);
           V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME;
           V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
           UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);
           UTL_FILE.FFLUSH(V_LOAD_FILE);
           CONTINUE;
         END IF;
         V_FILE_OVER := V_FILE_OVER + 1;
         IF V_FLUSH_OVER > 2000 /*每2000条记录就刷新缓存,写到文件中 */
         THEN
           UTL_FILE.FFLUSH(V_EXP_FILE);
           V_FLUSH_OVER := 1;
         ELSE
           V_FLUSH_OVER := V_FLUSH_OVER + 1;
         END IF;
       END LOOP;
       DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
       IF UTL_FILE.IS_OPEN(V_EXP_FILE)
       THEN
         UTL_FILE.FCLOSE(V_EXP_FILE);
         V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME;
         V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
         UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);
         UTL_FILE.FFLUSH(V_LOAD_FILE);
       END IF;
     END;
   EXCEPTION
     WHEN OTHERS THEN
       IF DBMS_SQL.IS_OPEN(V_SQL_ID)
       THEN
         DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
       END IF;
       IF UTL_FILE.IS_OPEN(V_EXP_FILE)
       THEN
         UTL_FILE.FCLOSE(V_EXP_FILE);
       END IF;
       DBMS_OUTPUT.PUT_LINE(SQLERRM);
       DBMS_OUTPUT.PUT_LINE(P_SQL_STMT);
   END;
 END;
BEGIN
 BEGIN
   EXECUTE IMMEDIATE 'create table mysql_etl_tbs(tn varchar2(40),cn varchar2(40),ci number) ';
 EXCEPTION
   WHEN OTHERS THEN
     NULL;
 END;
 EXECUTE IMMEDIATE 'truncate table mysql_etl_tbs';
 DECLARE
   V_CI    PLS_INTEGER;
   V_CN    VARCHAR2(40);
   V_ETL_COLS VARCHAR2(32767);
   V_TBN   VARCHAR2(30);
   V_ETL_CFG VARCHAR2(32767);
   V_CNF_FILE UTL_FILE.FILE_TYPE;
   V_FROM_POS PLS_INTEGER;
 BEGIN
   V_CNF_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'ETL_TABS.CNF', 'r', 32767);
   LOOP
     UTL_FILE.GET_LINE(V_CNF_FILE, V_ETL_CFG, 32767);
     V_FROM_POS := REGEXP_INSTR(V_ETL_CFG, 'from', 1, 1, 0, 'i');
     V_ETL_COLS := SUBSTR(V_ETL_CFG, 1, V_FROM_POS - 1);
     V_ETL_COLS := REGEXP_SUBSTR(V_ETL_COLS, '(select)(.+)', 1, 1, 'i', 2);
     V_TBN   := REGEXP_SUBSTR(V_ETL_CFG, '(\s+from\s+)(\w+)(\s*)', 1, 1, 'i', 2);
     V_TBN   := UPPER(V_TBN);
     V_TABS.EXTEND();
     V_TABS(V_TABS.LAST).TBN := V_TBN;
     V_TABS(V_TABS.LAST).WHR := REGEXP_SUBSTR(V_ETL_CFG, '\s+where .+', 1, 1, 'i');
     V_CI := 1;
     LOOP
       V_CN := REGEXP_SUBSTR(V_ETL_COLS, '\S+', 1, V_CI);
       EXIT WHEN V_CN IS NULL;
       V_CN := UPPER(V_CN);
       EXECUTE IMMEDIATE 'insert into mysql_etl_tbs(tn,cn,ci) values(:1,:2,:3)'
         USING V_TBN, V_CN, V_CI;
       COMMIT;
       V_CI := V_CI + 1;
     END LOOP;
   END LOOP;
 EXCEPTION
   WHEN UTL_FILE.INVALID_PATH THEN
     DBMS_OUTPUT.PUT_LINE('指定的目录:ETL_DIR"' || '"无效!');
     RETURN;
   WHEN UTL_FILE.INVALID_FILENAME THEN
     DBMS_OUTPUT.PUT_LINE('指定的文件:" ETL_TABS.CNF' || '"无效!');
     RETURN;
   WHEN NO_DATA_FOUND THEN
     UTL_FILE.FCLOSE(V_CNF_FILE);
   WHEN OTHERS THEN
     DBMS_OUTPUT.PUT_LINE(SQLERRM);
     RETURN;
 END;
 DECLARE
   V_CUR_MATCH  SYS_REFCURSOR;
   V_SQL_SMT   VARCHAR2(32767);
   V_TN     VARCHAR2(40);
   V_CN     VARCHAR2(40);
   V_CI     PLS_INTEGER;
   V_COLUMN_NAME VARCHAR2(40);
   V_ETL_COLS  VARCHAR2(32767);
   V_LINE    VARCHAR2(4000);
   V_TBN     VARCHAR2(40);
 BEGIN
   V_LOAD_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'load_data.sql', OPEN_MODE => 'w', MAX_LINESIZE => 32767);
   FOR T_IX IN V_TABS.FIRST .. V_TABS.LAST
   LOOP
     V_SQL_SMT := 'select tn,cn,column_name,ci from ( select * from mysql_etl_tbs where tn='':tbn:'' ) l left join user_tab_columns r on l.tn = r.table_name and l.cn= r.column_name order by ci';
     V_TBN   := V_TABS(T_IX).TBN;
     V_SQL_SMT := REPLACE(V_SQL_SMT, ':tbn:', V_TBN);
     V_ETL_COLS := NULL;
     OPEN V_CUR_MATCH FOR V_SQL_SMT;
     LOOP
       FETCH V_CUR_MATCH
         INTO V_TN, V_CN, V_COLUMN_NAME, V_CI;
       EXIT WHEN V_CUR_MATCH%NOTFOUND;
       IF V_CI > 1
       THEN
         V_ETL_COLS := V_ETL_COLS || ' , ';
       END IF;
       IF V_COLUMN_NAME IS NULL
       THEN
         V_ETL_COLS := V_ETL_COLS || ' cast(null as number) ' || V_CN;
       ELSE
         V_ETL_COLS := V_ETL_COLS || V_CN;
       END IF;
     END LOOP;
     CLOSE V_CUR_MATCH;
     V_TBN   := LOWER(V_TBN);
     V_SQL_SMT := 'select ' || V_ETL_COLS || ' from ' || V_TBN || V_TABS(T_IX).WHR;
     ETL_DATA(V_SQL_SMT, P_DATA_PATH, V_TBN);
   END LOOP;
   IF UTL_FILE.IS_OPEN(V_LOAD_FILE)
   THEN
     UTL_FILE.FCLOSE(V_LOAD_FILE);
   END IF;
 END;
END P_ETL_ORA_DATA;

来源:https://blog.csdn.net/skillfulit/article/details/82767042

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com