数据增量抽取的模拟实现──脚本实现:
实现的环境:
业务数据库:Oracle数据库9i
数据仓库数据库:SQL Server 2000数据库
1、前提条件:SQL Server服务器必须已经安装Oracle驱动
2、创建链接数据库
打开企业管理器->安全性-链接服务器-右键新建
通常情况当链接数据库创建好,进行打开的时候都会弹出一下错误窗口:
一般情况下运行C:PRogram FilesCommon FilesSystemOle DBmtxoci81_win2k.reg该文件后,重启SQLServer数据库,再重新连接;
如果仍有问题,重启操作系统,即可OK。
3、创建Oracle环境脚本
--创建Oracle业务系统表结构
CREATE TABLE SourceTable
(
ID1 VARCHAR2(50),
ID2 VARCHAR2(50),
Measure1 INTEGER,
Measure2 INTEGER,
CloseDate DATE
)
--创建测试数据
DECLARE
-- Local variables here
i INTEGER;
BEGIN
-- Test statements here
FOR i IN 1..365 LOOP
INSERT INTO SourceTable
VALUES(i,i,i,i,TO_DATE('2006-01-01','yyyy-mm-dd')+i);
INSERT INTO SourceTable
VALUES(i,i,i,i,TO_DATE('2006-01-01 12:00:00','yyyy-mm-dd hh24:mi:ss')+i);
END LOOP
COMMIT;
END;
4、创建SQLServer数据仓库环境脚本
--创建系统参数表内
CREATE TABLE ExtractTaskList (
TaskName VARCHAR(32) ,
TargetTable VARCHAR(32) ,
TargetFieldList VARCHAR(500) ,
SourceTable VARCHAR(32) ,
SourceFieldList VARCHAR(500) ,
WhereFieldName VARCHAR(32) ,
IncType INT ,
TransType INT ,
TargetDate DATETIME ,
SourceDate DATETIME ,
Flag INT ,
Note VARCHAR (500)
)
GO
--创建数据仓库目标表
CREATE TABLE TargetTable (
ID1 VARCHAR(50) ,
ID2 VARCHAR(50) ,
Measure1 DECIMAL(18, 0) ,
Measure2 DECIMAL(18, 0) ,
CloseDate DATETIME
)
GO
5、创建SQLServer数据仓库ETL脚本
脚本考虑到现实的问题,已经做了许多取舍,不再追求全部动态实现,旨在给定一个模板,在有限的范围内可以更改每次抽取的周期,每次时间的跨度,抽取的字段,表等等;数据字典表仅仅利用了其中的四个字段:任务名称,当前抽取时间、结束时间、抽取状态。 CREATE PROCEDURE p_org_Extract
AS
DECLARE @sql VARCHAR(3000)
BEGIN
DECLARE @BeginDate DATETIME,
@EndDate DATETIME,
@TaskName VARCHAR(32),
@Flag INTEGER,
@Num INTEGER,
@CurrDate DATETIME
SELECT @Num = COUNT(TaskName) FROM ExtractTaskList
WHERE UPPER(TaskName) = UPPER('test')
IF @Num != 1
INSERT INTO ExtractTaskList(TaskName,IncType,TransType) VALUES('test',2,2)
--获取列表中的当前任务的时间戳和状态
SELECT @BeginDate = SourceDate,@Flag = Flag FROM ExtractTaskList WHERE TaskName='TEST'
--如果上次执行未成功,这样取值效率会高一些,则从数据仓库表中直接读取
--TargetDate和SourceDate可能会不一致
IF @Flag = 2 OR @Flag IS NULL
SELECT @BeginDate = DATEADD(ss,1,MAX(closedate)) FROM TargetTable
--如果数据仓库无数据,则从业务系统中直接读取,也可以设置一个默认的初始化时间
IF @BeginDate IS NULL
SELECT @BeginDate = MinLogDate FROM OPENQUERY(SOURCE,'SELECT MIN(CloseDate) AS MinLogDate FROM SourceTable')
--如果仍无数据,则表示无数据可抽取,退出执行
IF @BeginDate IS NULL
RETURN
--抽取结束时间为当前时间前一天,每次循环抽取1天数据,可以更改dd为hh,变成按小时抽取
--通常业务系统是连续的,如果有疑问也可以从业务系统中获取最大时间
SELECT @EndDate = CONVERT(DATETIME,LEFT(CONVERT(VARCHAR,GETDATE(),120),10)+' 00:00:00')
--更新当前开始时间和结束时间
UPDATE ExtractTaskList
SET TargetDate = @BeginDate,
SourceDate = @EndDate
WHERE UPPER(TaskName) = UPPER('test')
WHILE @BeginDate < @EndDate
BEGIN
SELECT @sql = ' INSERT INTO TargetTable
(
ID1,
ID2,
Measure1,
Measure2,
CloseDate
)SELECT * FROM OPENQUERY(SOURCE,''select
ID1,
ID2,
Measure1,
Measure2,
CloseDate
FROM SourceTable
WHERE CloseDate >= TO_DATE(''''' + CONVERT(varchar,@BeginDate,120) + ''''', ''''YYYY-MM-DD HH24:MI:SS'
+ ''''') AND CloseDate < TO_DATE(''''' + CONVERT(varchar,DATEADD(day,1,@BeginDate),120) + ''''', ''''yyyy-mm-dd HH24:MI:SS'
+ ''''') AND CloseDate < TO_DATE(''''' + CONVERT(varchar,@EndDate,120) + ''''', ''''YYYY-MM-DD HH24:MI:SS'
+ ''''')'')'
--PRINT @sql
EXEC (@sql)
--获取本次任务运行抽取的最大时间
IF DATEADD(day,1,@BeginDate)>@EndDate
SELECT @CurrDate = @EndDate
ELSE
SELECT @CurrDate = DATEADD(day,1,@BeginDate)
--如果@sql执行失败,同样记录状态和时间
IF @@ERROR <> 0
GOTO FAIL
--记录每次运行的时间运行情况,可提供相应参考
UPDATE ExtractTaskList
SET TargetDate = @CurrDate,
Flag = 1
WHERE UPPER(TaskName) = UPPER('test')
SELECT @BeginDate = DATEADD(DD,1,@BeginDate)
END
RETURN
FAIL:
--记录错误
UPDATE ExtractTaskList
SET TargetDate = @CurrDate,
Flag = 2
WHERE UPPER(TaskName) = UPPER('test')
RETURN 0
END