Frank Pantaleo, FPantaleo@rochgrp.com
The body of work that follows was born out of a desire to replicate data out of prodUCtion. Of course we needed to do this while minimizing the effect on production. The best way to do this was to leverage APIs that Oracle made available in 8i. Some or all of the logic provided becomes unnecessary in 9i with the introduction of Oracle streams. However, I would contend that the procedures below provide a better level of control. I made a pitch to a technical community that I am involved in. The pitch was based on an article from George Jucan at www.opendatasys.com. The article was called "Using Oracle LogMiner as a Data Replication Utility". This article eXPlains the way a program or set of programs could make use of this API. Our business had already leveraged information available in the logminer to track activity in an application. There are some C applications involved in this as well. I am only supplying the PL/SQL and C Oracle external functions as I feel this is enough to get started. Some of these file and Directory procs were created to reproduce functions that are now available in 9i. Dir_proc – C Oracle external application to get a list of all files in a unix directory
File_proc – C Oracle external application to determine the existence of a unix file
File_del_proc – C oracle external application to delete a file in /tmp
Get_ora_tab – Function that drives the diy$oratab view
Logminer_stats – Procedure that returns counts of local logmnr_contents store
Load_logminer – workhorse of the application. Identify, Extract, and delete archive log into a local store So how does it plug together? I have another program that creates a known state. A known state is required because of a deficiency in 8i logminer. 8i Logminer does not capture key information for deletes and updates. So what does it capture? Oracle 8i logminer captures the rowid for update and deletes. This is addressed in 9i where logminer captures key information on delete and update if the table being captured has a primary/unique key. I did not have this luxury in 8i. So the initial table state is captured from the source instance including the rowid of each row from the source table. I capture this in a column called drowid. The drowid is used to apply the update or delete to the destination table data. This was a hack, but a necessary hack based on what was available. Once a state is captured we can then move on to the logic supplied here and extract changes from a set of archive logs. So the process is: Enable archive logging in the source instance.
Create a known state – this is only necessary in 8i and then only if you need to worry about update/deletes.
Copy all archive logs from source to the machine where the destination instance lives – this is ongoing afterward.
Change the init.ora of the destination instance to have log_archive_dest point to the archive logs that have been captured from the source instance.
At regular intervals: Run the load_logminer proc provided. Apply the dml in logmnr_contents to the destination instance. Update checkpoint_change# in lmstate_checkpoint as each dml is applied. Commit after the scn changes in logmnr_contents table. PL/SQL Source DROP VIEW diy$oratab;
DROP FUNCTION get_oratab;
DROP TYPE oratab_type;
DROP TYPE oratab_row_type;
DROP LIBRARY os_lib;
CREATE LIBRARY os_lib IS '/u01/apps/oracle/product/8.1.7/lib/osfunc.so';
/
CREATE OR REPLACE PROCEDURE dir_proc (filename IN CHAR, PATH IN CHAR)
AS
EXTERNAL
LIBRARY os_lib
NAME "dir_func"
LANGUAGE c
PARAMETERS (
filename STRING,
PATH STRING
);
/
--
CREATE OR REPLACE PROCEDURE file_proc (
filename IN CHAR,
bexists OUT BINARY_INTEGER,
file_size OUT BINARY_INTEGER,
block_count OUT BINARY_INTEGER
)
AS
EXTERNAL
LIBRARY os_lib
NAME "file_func"
LANGUAGE c
WITH CONTEXT
PARAMETERS (
CONTEXT,
filename STRING,
bexists INT,
file_size INT,
block_count INT
);
/
CREATE OR REPLACE PROCEDURE file_del_proc (filename IN CHAR)
AS
EXTERNAL
LIBRARY os_lib
NAME "file_del_func"
LANGUAGE c
PARAMETERS (
filename STRING
);
/
DROP TABLE lmstate_checkpoint;
CREATE TABLE lmstate_checkpoint (checkpoint_change# NUMBER);
--
DROP TABLE lmsubscribe;
CREATE TABLE lmsubscribe (
owner VARCHAR2 (30),
table_name VARCHAR2 (30) );
--
DROP TABLE lmtables;
CREATE TABLE lmtables (
owner VARCHAR2 (30),
table_name VARCHAR2 (30),
load_order NUMERIC DEFAULT 10 NOT NULL);
--
DROP TABLE lm_log;
CREATE TABLE lm_log (lm_state VARCHAR2(2000));
--
DROP TABLE logmnr_contents;
CREATE TABLE logmnr_contents (
SCN NUMBER,
TIMESTAMP DATE,
thread# NUMBER,
log_id NUMBER,
xidusn NUMBER,
xidslt NUMBER,
xidsqn NUMBER,
rbasqn NUMBER,
rbablk NUMBER,
rbabyte NUMBER,
ubafil NUMBER,
ubablk NUMBER,
ubarec NUMBER,
ubasqn NUMBER,
abs_file# NUMBER,
rel_file# NUMBER,
data_blk# NUMBER,
data_obj# NUMBER,
data_objd# NUMBER,
seg_owner VARCHAR2(32),
seg_name VARCHAR2(32),
seg_type NUMBER,
seg_type_name VARCHAR2(32),
table_space VARCHAR2(32),
row_id VARCHAR2(19),
session# NUMBER,
serial# NUMBER,
username VARCHAR2(32),
session_info VARCHAR2(4000),
ROLLBACK NUMBER,
operation VARCHAR2(32),
sql_redo VARCHAR2(4000),
sql_undo VARCHAR2(4000),
rs_id VARCHAR2(32),
ssn NUMBER,
csf NUMBER,
info VARCHAR2(32),
status NUMBER,
ph1_name VARCHAR2(32),
ph1_redo VARCHAR2(2000),
ph1_undo VARCHAR2(2000),
ph2_name VARCHAR2(32),
ph2_redo VARCHAR2(2000),
ph2_undo VARCHAR2(2000),
ph3_name VARCHAR2(32),
ph3_redo VARCHAR2(2000),
ph3_undo VARCHAR2(2000),
ph4_name VARCHAR2(32),
ph4_redo VARCHAR2(2000),
ph4_undo VARCHAR2(2000),
ph5_name VARCHAR2(32),
ph5_redo VARCHAR2(2000),
ph5_undo VARCHAR2(2000)
);
--
DROP TABLE get_oratab_setting;
CREATE GLOBAL TEMPORARY TABLE get_oratab_setting
(thedir VARCHAR2(200))
ON COMMIT PRESERVE ROWS;
--
CREATE TYPE oratab_row_type AS OBJECT (
file_name VARCHAR2 (100)
);
/
CREATE TYPE oratab_type IS TABLE OF oratab_row_type;
/
--
CREATE OR REPLACE FUNCTION get_oratab
RETURN oratab_type
IS
ora_tab oratab_type := oratab_type (oratab_row_type (NULL));
f_handle UTL_FILE.file_type;
i_pos INTEGER;
v_file_name VARCHAR2 (100);
b_read BOOLEAN := TRUE;
b_first BOOLEAN := TRUE;
tmp_file VARCHAR2 (50);
mydir VARCHAR2 (200);
BEGIN
--
-- Note that in order to make the code shorter all the
-- utl_file defined exceptions are left unhandled.
--
tmp_file := 'oracle_' TO_CHAR (SYSDATE, 'yyyymmddhhss');
BEGIN
SELECT thedir
INTO mydir
FROM get_oratab_setting;
EXCEPTION
WHEN NO_DATA_FOUND
THEN
mydir := NULL;
END;
IF mydir IS NOT NULL
THEN
dir_proc ('/tmp/' tmp_file, mydir);
f_handle := UTL_FILE.fopen ('/tmp', tmp_file, 'r');
WHILE b_read
LOOP
BEGIN
UTL_FILE.get_line (f_handle, v_file_name);
IF b_first
THEN
b_first := FALSE;
ELSE
ora_tab.EXTEND;
END IF;
ora_tab (ora_tab.LAST) :=
oratab_row_type (RTRIM (v_file_name));
EXCEPTION
WHEN NO_DATA_FOUND
THEN
b_read := FALSE;
END;
END LOOP;
UTL_FILE.fclose (f_handle);
END IF;
file_del_proc (tmp_file);
RETURN ora_tab;
END;
/
CREATE OR REPLACE VIEW diy$oratab
AS
SELECT *
FROM TABLE (CAST (get_oratab () AS oratab_type));
/
CREATE OR REPLACE PROCEDURE "LOGMINER_STATS" (
insert_count OUT INTEGER,
delete_count OUT INTEGER,
update_count OUT INTEGER,
total_count OUT INTEGER
)
IS
empty_logmnr_contents EXCEPTION;
PRAGMA EXCEPTION_INIT (empty_logmnr_contents, -1306);
CURSOR the_csr
IS
SELECT COUNT (*) the_count, operation
FROM logmnr_contents, lmsubscribe b
WHERE seg_owner = b.owner
AND seg_name = b.table_name
AND operation IN ('INSERT', 'DELETE', 'UPDATE')
GROUP BY operation;
BEGIN
insert_count := 0;
update_count := 0;
delete_count := 0;
total_count := 0;
FOR the_rec IN the_csr
LOOP
IF the_rec.operation = 'INSERT'
THEN
insert_count := the_rec.the_count;
ELSIF the_rec.operation = 'DELETE'
THEN
delete_count := the_rec.the_count;
ELSIF the_rec.operation = 'UPDATE'
THEN
update_count := the_rec.the_count;
ELSE
NULL;
END IF;
END LOOP;
total_count := insert_count + delete_count + update_count;
EXCEPTION
WHEN NO_DATA_FOUND
THEN
NULL;
WHEN empty_logmnr_contents
THEN
NULL;
WHEN OTHERS
THEN
raise_application_error (-20000,
'Error in LOGMNR_CONTENTS View'
);
END logminer_stats;
/
PROCEDURE "LOAD_LOGMINER"
AS
empty_logmnr_contents EXCEPTION;
sid_file VARCHAR2 (30);
arch_dir VARCHAR2 (100);
work_file VARCHAR2 (200);
arch_count NUMBER := 0;
CURSOR the_csr
IS
SELECT file_name
FROM diy$oratab;
first_time BOOLEAN := TRUE;
start_scn_local NUMBER;
PRAGMA EXCEPTION_INIT (empty_logmnr_contents, -1306);
BEGIN
/* clean the slate of all prior activity */
DELETE FROM logmnr_contents;
DELETE FROM lm_log;
DELETE FROM get_oratab_setting;
COMMIT;
/* determine location of archive logs so we can get */
/* a directory of log_archive_dest */
SELECT VALUE
INTO arch_dir
FROM SYS.v_$parameter
WHERE NAME = 'log_archive_dest';
/* prime get_oratab_setting with directory name of archive logs */
INSERT INTO get_oratab_setting
(thedir
)
VALUES (arch_dir
);
COMMIT;
/* determine checkpoint of what has been applied to date */
SELECT checkpoint_change#
INTO start_scn_local
FROM lmstate_checkpoint;
/* go through each archive log and add to the local */
/* logmnr_contents table where applicable */
BEGIN
FOR the_rec IN the_csr
LOOP
work_file := the_rec.file_name;
IF work_file IS NOT NULL
THEN
INSERT INTO lm_log
(lm_state
)
VALUES ('start loading archive log ' work_file
);
COMMIT;
SYS.DBMS_LOGMNR.add_logfile
(logfilename => work_file,
options => SYS.DBMS_LOGMNR.NEW
);
first_time := FALSE;
BEGIN
/* use logfile generated from local or foreign database */
SYS.DBMS_LOGMNR.start_logmnr
(dictfilename => '/u01/apps/oracle/product/8.1.7/dbs/SEED_dict.ora'
);
COMMIT;
EXCEPTION
WHEN OTHERS
THEN
raise_application_error
(-20000,
'Error in LOAD_LOGMINER start_logmnr '
SQLERRM
SQLCODE
);
END;
BEGIN
/* strip what we want out of v_$logmnr_contents into our local copy */
INSERT /*+ APPEND */INTO logmnr_contents
SELECT a.*
FROM SYS.v_$logmnr_contents a;
/*Add any conditional logic here e.g. …*/
/*where seg_owner = 'SOME_OWNER' and seg_name = ‘SOME_TABLE’ */
COMMIT;
EXCEPTION
WHEN NO_DATA_FOUND
THEN
NULL;
WHEN empty_logmnr_contents
THEN
NULL;
WHEN OTHERS
THEN
raise_application_error
(-20002,
'Error in LOGMNR_CONTENTS View'
SQLERRM
SQLCODE
);
END;
BEGIN
/* end for this log and delete it */
SYS.DBMS_LOGMNR.end_logmnr;
INSERT INTO lm_log
(lm_state
)
VALUES ('end loading archive log ' work_file
);
COMMIT;
SYS.DBMS_BACKUP_RESTORE.deletefile (work_file);
EXCEPTION
WHEN OTHERS
THEN
NULL;
END;
END IF;
END LOOP;
EXCEPTION
WHEN OTHERS
THEN
raise_application_error (-20010,
'Error in LOAD_LOGMINER add '
work_file
SQLERRM
SQLCODE
);
END;
/* ok now we have our local store of activity clean it up and */
/* prep it to be used in the apply program. Put the prepped sql */
/* in ph1_redo column which is unused */
BEGIN
UPDATE logmnr_contents
SET ph1_redo =
REPLACE (REPLACE (REPLACE (sql_redo, 'ROWID',
'DROWID'),
'"',
''
),
';',
''
)
WHERE ph1_redo IS NULL AND operation IN ('UPDATE', 'DELETE');
--
UPDATE logmnr_contents
SET ph1_redo =
REPLACE
(REPLACE (REPLACE (REPLACE (sql_redo,
')
values',
',drowid) values'
),
');',
','
CHR (39)
ROWIDTOCHAR (row_id)
CHR (39)
');'
),
'"',
''
),
';',
''
)
WHERE ph1_redo IS NULL
AND seg_owner = 'SOME_SCHEMA'
AND operation = 'INSERT';
--
UPDATE logmnr_contents
SET ph1_redo = REPLACE (REPLACE (sql_redo, '"', ''), ';', '')
WHERE ph1_redo IS NULL
AND seg_owner = 'SOME_OTHER_OWNER'
AND operation = 'INSERT';
--
DELETE FROM logmnr_contents
WHERE ph1_redo IS NULL;
--
COMMIT;
END;
END;
C Source for Oracle External Functions #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <dirent.h>
#include <sys/stat.h>
#ifndef OCI_ORACLE
# include <oci.h>
#endif
void dir_func(char *FileName,char *Path);
void file_func(OCIExtProcContext *,char *,int *,int *,int *);
void file_del_func(char *FileName);
void dir_func(char *FileName,char *Path) {
int num;
static FILE *logfilep = NULL;
DIR *mydir;
struct dirent *dp;
//struct stat mybuff;
char work_file[100] = " ";
if ((logfilep = fopen(FileName, "w")) != NULL) {};
// fprintf(logfilep, "# file = %s path = %s\n",FileName,Path);
mydir = opendir(Path);
while ((dp = readdir (mydir)) != NULL) {
if ((strcmp(dp->d_name,".") == 0)
(strcmp(dp->d_name,"..") == 0)) {}
else {
strcpy(work_file,dp->d_name);
//stat(work_file,&mybuff);
//fprintf(logfilep, "%s/%s:%i:%i\n" ,
Path,dp->d_name,mybuff.st_size,mybuff.st_blocks);
fprintf(logfilep, "%s/%s\n" ,Path,dp->d_name);
}
}
fclose(logfilep);
}
void file_func(OCIExtProcContext *with_context,char *FileName,
int *exists,int *filesize,int *block_count) {
int rtn =0;
struct stat mybuff;
long file_size=0;
char mybyte;
*exists=1;
//
rtn = stat(FileName,&mybuff);
if (rtn ==0) {
*filesize=mybuff.st_size;
*block_count=mybuff.st_blocks;
*exists=0;
}
else
{
*exists=3;
*filesize=0;
*block_count=0;
}
*exists=rtn;
}
void file_del_func(char *FileName) {
int rtn =0;
struct stat mybuff;
long file_size=0;
char myFileName[200] = "/tmp/";
//
strcat(myFileName,FileName);
rtn = stat(myFileName,&mybuff);
if (rtn ==0) {
unlink(myFileName);
}
}