CREATE TABLE [stg].[ExternalSystem]
(
[ExternalSystemCode] [varchar](50) NOT NULL,
[ExternalSystem] [nvarchar](1000) NOT NULL,
CONSTRAINT [PK_ExternalSystem] PRIMARY KEY CLUSTERED
(
[ExternalSystemCode] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
CREATE TABLE [stg].[BatchStatus]
(
[StatusCode] [varchar](50) NOT NULL,
[Status] [varchar](50) NOT NULL,
CONSTRAINT [PK_BatchStatus] PRIMARY KEY CLUSTERED
(
[StatusCode] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY],
CONSTRAINT [UK_BatchStatus] UNIQUE NONCLUSTERED
(
[Status] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
-- 3. [stg].[BatchHeader]
CREATE TABLE [stg].[BatchHeader]
(
[BatchId] [int] IDENTITY(1,1) NOT NULL,
[ExternalSystemCode] [varchar](50) NOT NULL,
[StatusCode] [varchar](50) NOT NULL,
[TotalRecordCount] [int] NULL,
[ErrorRecordCount] [int] NULL,
[LastRunStartDateTime] [datetime] NULL,
[LastRunEndDateTime] [datetime] NULL,
[LastRunUser] [varchar](50) NULL,
[RunCount] [int] NOT NULL,
[CreateDateTime] [datetime] NOT NULL,
[CreateUser] [varchar](50) NOT NULL,
[MeasureCode] [varchar](50) NOT NULL,
[ClearMeasure] [int] NOT NULL,
[StartDate] [datetime] NULL,
[EndDate] [datetime] NULL,
CONSTRAINT [PK_BatchHeader] PRIMARY KEY CLUSTERED
(
[BatchId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO
ALTER TABLE [stg].[BatchHeader] ADD CONSTRAINT [DF_BatchHeader_StatusId] DEFAULT ((0)) FOR [StatusCode]
GO
ALTER TABLE [stg].[BatchHeader] ADD CONSTRAINT [DF_BatchHeader_RunCount] DEFAULT ((0)) FOR [RunCount]
GO
ALTER TABLE [stg].[BatchHeader] ADD CONSTRAINT [DF_BatchHeader_CreateDateTime] DEFAULT (getdate()) FOR [CreateDateTime]
GO
ALTER TABLE [stg].[BatchHeader] ADD CONSTRAINT [DF_BatchHeader_CreateUser] DEFAULT (user_name()) FOR [CreateUser]
GO
ALTER TABLE [stg].[BatchHeader] ADD CONSTRAINT [DF_BatchHeader_ClearMeasure] DEFAULT ((0)) FOR [ClearMeasure]
GO
ALTER TABLE [stg].[BatchHeader] WITH CHECK ADD CONSTRAINT [FK_BatchHeader_BatchStatus] FOREIGN KEY([StatusCode])
REFERENCES [stg].[BatchStatus] ([StatusCode])
GO
ALTER TABLE [stg].[BatchHeader] CHECK CONSTRAINT [FK_BatchHeader_BatchStatus]
GO
ALTER TABLE [stg].[BatchHeader] WITH CHECK ADD CONSTRAINT [FK_BatchHeader_ExternalSystem] FOREIGN KEY([ExternalSystemCode])
REFERENCES [stg].[ExternalSystem] ([ExternalSystemCode])
GO
ALTER TABLE [stg].[BatchHeader] CHECK CONSTRAINT [FK_BatchHeader_ExternalSystem]
GO
-- 4. [stg].[DimensionBatchHeader]
CREATE TABLE [stg].[DimensionBatchHeader]
(
[DimensionBatchId] [int] IDENTITY(1,1) NOT NULL,
[DimensionName] [varchar](50) NOT NULL,
[ValidFromDate] [datetime] NOT NULL,
[TotalRecordCount] [int] NOT NULL,
[LevelCount] [int] NOT NULL,
[StatusCode] [varchar](50) NOT NULL,
[LastRunStartDateTime] [datetime] NULL,
[LastRunEndDateTime] [datetime] NULL,
[LastRunUser] [varchar](50) NULL,
[RunCount] [int] NOT NULL,
[CreateDateTime] [datetime] NOT NULL,
[CreateUser] [varchar](50) NOT NULL,
CONSTRAINT [PK_DimensionHeader] PRIMARY KEY CLUSTERED
(
[DimensionBatchId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO
ALTER TABLE [stg].[DimensionBatchHeader] ADD CONSTRAINT [DF_DimensionHeader_RecordCount] DEFAULT ((0)) FOR [TotalRecordCount]
GO
ALTER TABLE [stg].[DimensionBatchHeader] ADD CONSTRAINT [DF_DimensionHeader_StatusCode] DEFAULT ('E') FOR [StatusCode]
GO
ALTER TABLE [stg].[DimensionBatchHeader] ADD CONSTRAINT [DF_DimensionHeader_RunCount] DEFAULT ((0)) FOR [RunCount]
GO
ALTER TABLE [stg].[DimensionBatchHeader] ADD CONSTRAINT [DF_DimensionHeader_CreateDateTime] DEFAULT (getdate()) FOR [CreateDateTime]
GO
ALTER TABLE [stg].[DimensionBatchHeader] ADD CONSTRAINT [DF_DimensionHeader_CreateUser] DEFAULT (user_name()) FOR [CreateUser]
GO
-- 5. [stg].[DimensionBatchData]
CREATE TABLE [stg].[DimensionBatchData]
(
[RecordId] [int] IDENTITY(1,1) NOT NULL,
[DimensionBatchId] [int] NOT NULL,
[LevelCode] [nvarchar](100) NOT NULL,
[LevelOrdinal] [int] NOT NULL,
[MemberCode] [nvarchar](100) NOT NULL,
[MemberDescription] [nvarchar](256) NOT NULL,
[ParentCode] [nvarchar](100) NULL,
[AttributeName] [nvarchar](100) NULL,
[AttributeValue] [nvarchar](100) NULL,
CONSTRAINT [PK_DimensionData] PRIMARY KEY CLUSTERED
(
[RecordId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
REFERENCES [stg].[DimensionBatchHeader] ([DimensionBatchId])
GO
ALTER TABLE [stg].[DimensionBatchData] CHECK CONSTRAINT [FK_DimensionData_DimensionHeader]
GO
CREATE TABLE [dbo].[Log](
[LogId] [int] IDENTITY(1,1) NOT NULL,
[ProcedureName] [nvarchar](100) NULL,
[PackageName] [nvarchar](50) NULL,
[SourceName] [nvarchar](50) NULL,
[TaskName] [nvarchar](50) NULL,
[Message] [nvarchar](1000) NOT NULL,
[LogDateTime] [datetime] NOT NULL,
[LogUser] [varchar](50) NOT NULL,
[LogType] [varchar](50) NOT NULL
) ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO
ALTER TABLE [dbo].[Log] ADD CONSTRAINT [DF_Log_LogDateTime] DEFAULT (getdate()) FOR [LogDateTime]
GO
ALTER TABLE [dbo].[Log] ADD CONSTRAINT [DF_Log_LogUser] DEFAULT (user_name()) FOR [LogUser]
GO
ALTER TABLE [dbo].[Log] ADD CONSTRAINT [DF_Log_LogType] DEFAULT ('INFO') FOR [LogType]
GO
Stored Procedures
-- SP 1. [dbo].[log_ssis_error]
CREATE PROCEDURE [dbo].[log_ssis_error]
-- Add the parameters for the stored procedure here
@PackageName nvarchar(150),
@SourceName nvarchar(150),
@TaskName nvarchar(150),
@Message nvarchar(500)
AS
BEGIN
SET NOCOUNT ON;
insert into dbo.Log (PackageName, SourceName, TaskName, [Message], LogType)
values (@PackageName, @SourceName, @TaskName, @Message, 'ERROR')
END
--SP 2 [dbo].[log_info]
-- Description: Logs information message to Log table
-- =============================================
CREATE PROCEDURE [dbo].[log_info]
-- Add the parameters for the stored procedure here
@ProcId int,
@Message nvarchar(1000)
AS
BEGIN
SET NOCOUNT ON;
insert into dbo.[Log] (ProcedureName, Message, LogType)
values (OBJECT_NAME(@ProcId), @Message, 'INFO');
END
GO
--SP3. [dbo].[log_error]
-- Description: Logs error message to Log table
-- =============================================
CREATE PROCEDURE [dbo].[log_error]
-- Add the parameters for the stored procedure here
@ProcId int,
@Message nvarchar(1000)
AS
BEGIN
SET NOCOUNT ON;
insert into dbo.[Log] (ProcedureName, Message, LogType)
values (OBJECT_NAME(@ProcId), @Message, 'ERROR');
END
GO
--SP4. [dbo].[create_batch]
-- Description: Create a batch from existing measure, to send to an external system
-- =============================================
CREATE PROCEDURE [dbo].[create_batch]
@MeasureId int,
@ExternalSystemCode varchar(50),
@StartDate datetime,
@EndDate datetime
AS
DECLARE @FactTable nvarchar(1000),
@MeasureName nvarchar(1000),
@MeasureCode nvarchar(50),
@ExternalSystem nvarchar(1000),
@KeyFieldList nvarchar(1000),
@CodeFieldList nvarchar(1000),
@LevelCodeFieldList nvarchar(1000),
@DimensionJoin nvarchar(1000),
@DimensionFrom nvarchar(1000),
@InsertStatement nvarchar(4000),
@TotalRecordCount int,
@BatchId int,
@msg nvarchar(1000),
@ErrorCode int;
BEGIN
SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
-- logging
exec log_info @@PROCID, 'Batch creation started';
-- initialise batch variables
set @FactTable = dbo.get_fact_table_name(@MeasureId);
set @MeasureName = dbo.get_measure_name(@MeasureId);
set @MeasureCode = dbo.get_measure_code(@MeasureId);
set @ExternalSystem = dbo.get_external_system_name(@ExternalSystemCode);
IF (@FactTable is not null and @MeasureCode is not null)
begin
IF (@ExternalSystem is not null)
begin
-- validate input date range
if @StartDate is null set @StartDate = dbo.get_earliest_date();
if @EndDate is null set @EndDate = dbo.get_latest_date();
IF (@StartDate <= @EndDate)
begin
set @msg = 'Date range determined : ' + convert(nvarchar, @StartDate, 104) + ' -> '
+ convert(nvarchar, @EndDate, 104);
exec log_info @@PROCID, @msg;
set @CodeFieldList = dbo.get_measure_codefield_list (@MeasureId, 0);
set @LevelCodeFieldList = dbo.get_measure_codefield_list (@MeasureId, 1);
set @DimensionJoin = dbo.get_measure_dimension_join (@MeasureId);
set @DimensionFrom = dbo.get_measure_dimension_from (@MeasureId);
-- create detail records in BatchData
set @InsertStatement =
'INSERT INTO stg.BatchData ' +
'(' + @CodeFieldList + ', Quantity, BatchId) ' +
'SELECT ' + @LevelCodeFieldList + ', Quantity, @BatchId ' +
'FROM ' + @DimensionFrom + ' ' +
'WHERE ' + @DimensionJoin + ' ' +
'AND Date BETWEEN @StartDate AND @EndDate';
exec log_info @@PROCID, @InsertStatement;
exec dbo.writeback_measure @MeasureId;
-- create header record in BatchHeader
BEGIN TRANSACTION
begin
declare @BatchIdTable table(id int);
INSERT INTO stg.BatchHeader (ExternalSystemCode, StatusCode, MeasureCode, StartDate, EndDate )
OUTPUT inserted.batchid into @BatchIdTable
VALUES (@ExternalSystemCode, 'C', @MeasureCode, @StartDate, @EndDate);
set @ErrorCode = @@ERROR;
select @BatchId = id from @BatchIdTable;
end;
set @msg = 'Header record created with BatchId ' + convert(nvarchar, @BatchId);
exec log_info @@PROCID, @msg;
IF @ErrorCode = 0
begin
exec sp_executesql @InsertStatement,
N'@BatchId int, @StartDate datetime, @EndDate datetime',
@BatchId, @StartDate, @EndDate;
set @ErrorCode = @@ERROR;
exec log_info @@PROCID, 'Batch Data generated';
end;
IF @ErrorCode = 0
COMMIT
ELSE
ROLLBACK
-- postprocessing
select @TotalRecordCount = COUNT(1)
from stg.BatchData
where BatchId = @BatchId;
-- update BachHeader information
update stg.BatchHeader
set StatusCode = 'S', -- Ready to send
TotalRecordCount = @TotalRecordCount
where BatchId = @BatchId;
exec log_info @@PROCID, 'Batch Header updated';
END
ELSE -- invalid date range
BEGIN
set @msg = 'Invalid date range. Startdate (' + CAST(@Startdate as varchar) +
') > Enddate (' + CAST(@Enddate as varchar) + ')';
exec log_error @@PROCID, @msg;
END
END
ELSE -- no external system found for supplied code
BEGIN
set @msg = 'No External System defined for ExternalSystemCode ' + CAST(@ExternalSystemCode as varchar);
exec log_error @@PROCID, @msg;
END
END
ELSE -- no factablename found
BEGIN
set @msg = 'No FactTable or MeasureCode defined. MeasureId ' + CAST(@MeasureId as varchar) + ' is not valid.';
exec log_error @@PROCID, @msg;
END
set @msg = 'Batch generation ended';
exec log_info @@PROCID, @msg;
END
GO
-- SP5. [dbo].[create_batch_header]
-- Description: Create a batch header record, returning the BatchId
-- =============================================
CREATE PROCEDURE [dbo].[create_batch_header]
@MeasureCode nvarchar(50),
@ExternalSystemCode varchar(50),
@TotalRecordCount int,
@StartDate datetime,
@EndDate datetime
AS
declare @BatchIdTable table(BatchId int);
declare @msg nvarchar(1000),
@ErrorCode int;
BEGIN
INSERT INTO stg.BatchHeader
(ExternalSystemCode,
StatusCode,
TotalRecordCount,
MeasureCode,
StartDate,
EndDate)
OUTPUT inserted.BatchId into @BatchIdTable
VALUES (@ExternalSystemCode,
dbo.get_constructing_batch_status(),
@TotalRecordCount,
@MeasureCode,
@StartDate,
@EndDate);
SELECT BatchId from @BatchIdTable;
END
GO
-- SP6. [dbo].[create_dimension_batch_header]
-- Description: Create a batch header record for dimension data, returning the BatchId
-- =============================================
CREATE PROCEDURE [dbo].[create_dimension_batch_header]
@DimensionName nvarchar(50),
@TotalRecordCount int,
@LevelCount INT,
@ValidFromDate datetime
AS
declare @BatchIdTable table(BatchId int);
declare @msg nvarchar(1000),
@ErrorCode int;
BEGIN
INSERT INTO stg.DimensionBatchHeader
(DimensionName,
ValidFromDate,
TotalRecordCount,
LevelCount,
StatusCode
)
OUTPUT inserted.DimensionBatchId into @BatchIdTable
VALUES (@DimensionName,
@ValidFromDate,
@TotalRecordCount,
@LevelCount,
dbo.get_constructing_batch_status());
SELECT BatchId from @BatchIdTable;
END
GO
-- SP7. [dbo].[delete_batch]
-- Description: Delete a specific Batch of data
-- =============================================
CREATE PROCEDURE [dbo].[delete_batch]
@BatchId int
AS
DECLARE @RecordsDeleted int,
@msg nvarchar(1000),
@ErrorCode int;
BEGIN
SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
-- logging
set @msg = 'Batch deletion started for BatchId ' + cast(@BatchId as varchar);
exec log_info @@PROCID, @msg;
BEGIN TRANSACTION
-- delete batch data detail records
delete from stg.BatchData
where BatchId = @BatchId;
set @ErrorCode = @@ERROR;
set @RecordsDeleted = @@ROWCOUNT;
if @ErrorCode = 0
begin
-- delete batch header record
delete from stg.BatchHeader
where BatchId = @BatchId;
set @ErrorCode = @@ERROR;
end
if @ErrorCode = 0
begin
COMMIT
set @msg = cast(@RecordsDeleted as varchar) + ' records deleted.';
exec log_info @@PROCID, @msg;
end
else
begin
ROLLBACK
set @msg = 'Error during deletion, operation rolled back.';
exec log_info @@PROCID, @msg;
end;
set @msg = 'Batch deletion ended for BatchId ' + cast(@BatchId as varchar) + '.';
exec log_info @@PROCID, @msg;
END
GO
-- SP8. [dbo].[merge_batch_to_dimension]
-- Description: Core procedure for merging dimension batch to dimension table
-- =============================================
CREATE PROCEDURE [dbo].[merge_batch_to_dimension]
@DimensionBatchId int,
@DimensionId int
AS
DECLARE LevelTableCursor cursor for
select quotename(LevelTable),
LevelOrdinal,
quotename(CodeField),
quotename(DescriptionField),
quotename(KeyField)
from cfg.V_LevelDimension
where DimensionId = @DimensionId
order by LevelOrdinal;
DECLARE @LevelTable nvarchar(50), @PreviousLevelTable nvarchar(50),
@LevelOrdinal int,
@KeyField nvarchar(50), @PreviousKeyField nvarchar(50),
@CodeField nvarchar(50), @PreviousCodeField nvarchar(50),
@DescriptionField nvarchar(50),
@sql nvarchar(2000),
@ParentLookupSql nvarchar(1000),
@msg varchar(1000),
@ErrorCode int;
BEGIN
SET NOCOUNT ON;
SET @ParentLookupSql = NULL;
SET @PreviousKeyField = NULL;
exec log_info @@PROCID, 'Starting dimension merge transaction';
BEGIN TRANSACTION
set @ErrorCode = @@ERROR;
OPEN LevelTableCursor;
FETCH NEXT FROM LevelTableCursor into @LevelTable, @LevelOrdinal, @CodeField, @DescriptionField, @KeyField;
WHILE (@@FETCH_STATUS = 0 and @ErrorCode = 0)
BEGIN
set @sql = 'MERGE ' + @LevelTable + ' as target ' +
'USING (SELECT MemberCode, MemberDescription, ' +
isnull(@ParentLookupSql, 'NULL') + ' ParentId ' +
'FROM stg.DimensionBatchData ' +
'WHERE DimensionBatchId = @DimensionBatchId ' +
'AND LevelOrdinal = @LevelOrdinal) as source '+
'ON (target.' + @CodeField + ' = source.MemberCode) ' +
'WHEN MATCHED THEN ' +
'UPDATE set ' + @DescriptionField + ' = source.MemberDescription ' +
isnull(', ' + @PreviousKeyField + ' = ParentId ', ' ') +
'WHEN NOT MATCHED THEN ' +
'INSERT (' + @CodeField + ',' + @DescriptionField + isnull(',' + @PreviousKeyField, ' ') + ') ' +
'VALUES (MemberCode, MemberDescription' + case when (@PreviousKeyField IS not null) then ', ParentId)' else ')' end + ';' ;
EXEC sp_executesql @sql,
N'@DimensionBatchId int, @LevelOrdinal int',
@DimensionBatchId, @LevelOrdinal;
SET @ErrorCode = @@ERROR;
set @msg = 'Merge of level ' + CAST(@LevelOrdinal as varchar) + ' performed.'
exec log_info @@PROCID, @msg;
SELECT @PreviousLevelTable = @LevelTable,
@PreviousCodeField = @CodeField,
@PreviousKeyField = @KeyField;
SET @ParentLookupSql =
' (SELECT ' + @PreviousKeyField +
' FROM ' + @PreviousLevelTable +
' WHERE ' + @PreviousCodeField + ' = ParentCode) ';
FETCH NEXT FROM LevelTableCursor into @LevelTable, @LevelOrdinal, @CodeField, @DescriptionField, @KeyField;
END;
CLOSE LevelTableCursor;
DEALLOCATE LevelTableCursor;
if @ErrorCode =0
begin
COMMIT;
exec log_info @@PROCID, 'Committed dimension merge transaction';
end
else
begin
ROLLBACK;
exec log_info @@PROCID, 'Rolled back dimension merge transaction';
exec log_info @@PROCID, @sql;
end;
END
GO
-- SP9 [dbo].[delete_measure]
-- Description: Delete data from measure between Start and End date.
-- If not provided, it's substituted with respectively
-- the earliest and latest date known.
-- =============================================
CREATE PROCEDURE [dbo].[delete_measure]
@MeasureId int,
@StartDate datetime = null,
@EndDate datetime = null
AS
DECLARE @MeasureName nvarchar(1000),
@FactTable nvarchar(1000),
@DeleteStatement nvarchar(4000),
@DateSelection nvarchar(1000),
@FieldList nvarchar(2000),
@ErrorCode int;
BEGIN
SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
exec log_info @@PROCID, 'Started';
set @MeasureName = dbo.get_measure_name(@MeasureId);
set @FactTable = dbo.get_fact_table_name(@MeasureId);
set @DateSelection =
N'WHERE DateId in ' +
'(SELECT DateId FROM VD_Date WHERE Date ' +
'BETWEEN isnull(@StartDate,(select MIN(Date) from VD_Date)) ' +
'AND isnull(@EndDate,(select MAX(Date) from VD_Date)))';
-- merge writebacks with fact data for this measure
-- this ensures no partial data remains when deleting from the fact table
execute dbo.writeback_measure @MeasureId
-- delete target measure
set @DeleteStatement = N'DELETE FROM ' + quotename(@FactTable) + @DateSelection;
execute sp_executesql @DeleteStatement,
N'@StartDate datetime, @EndDate datetime',
@StartDate, @EndDate;
exec log_info @@PROCID, 'Ended';
END
-- SP10. [dbo].[writeback_measure]
-- Description: Merge input data from writeback table to fact table for a specific measure
-- =============================================
CREATE PROCEDURE [dbo].[writeback_measure]
@MeasureId int
AS
DECLARE @WritebackView nvarchar(1000),
@FactTable nvarchar(1000),
@MeasureName nvarchar(1000),
@FieldList nvarchar(2000),
@PrefixFieldList nvarchar(1000),
@MergeJoin nvarchar(2000),
@MergeStatement nvarchar(4000),
@CountStatement nvarchar(1000),
@SourcePrefix nvarchar(1) = 'w',
@TargetPrefix nvarchar(1) = 'f',
@inserted int,
@updated int,
@merged int,
@writebacks int,
@msg nvarchar(1000),
@ErrorCode int;
;
BEGIN
SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
exec log_info @@PROCID, 'Started';
set @MeasureName = dbo.get_measure_name(@MeasureId);
set @FactTable = dbo.get_fact_table_name(@MeasureId);
set @WritebackView = dbo.get_writeback_view_name(@MeasureId);
if @WritebackView is not null
if @FactTable is not null
begin
set @CountStatement = N'select @WritebackCount = COUNT(1) from ' + @WritebackView;
EXECUTE sp_executesql @CountStatement,
N'@WritebackCount int OUTPUT',
@WritebackCount = @writebacks OUTPUT;
IF @writebacks > 0
BEGIN
set @FieldList = dbo.get_measure_keyfield_list(@MeasureId, '');
set @PrefixFieldList = dbo.get_measure_keyfield_list(@MeasureId, @SourcePrefix);
set @MergeJoin = dbo.get_measure_merge_join(@MeasureId, @SourcePrefix, @TargetPrefix);
set @MergeStatement =
N'DECLARE @MergeSummary TABLE(Change VARCHAR(20));' +
'MERGE ' + @FactTable + ' AS ' + @TargetPrefix +
' USING (SELECT ' + @FieldList + ', sum(Quantity) Quantity FROM ' + @WritebackView +
' GROUP BY ' + @FieldList + ') AS ' + @SourcePrefix +
' ON (' + @MergeJoin + ')' +
' WHEN MATCHED THEN ' +
' UPDATE SET ' + @TargetPrefix + '.Quantity = ' +
@TargetPrefix + '.Quantity + ' +
@SourcePrefix + '.Quantity ' +
' WHEN NOT MATCHED THEN ' +
' INSERT (' + @FieldList + ', Quantity) ' +
' VALUES (' + @PrefixFieldList + ', ' + @SourcePrefix + '.Quantity)' +
' OUTPUT $action INTO @MergeSummary ;'+
' select @updateCount = COUNT(1) from @MergeSummary where Change = ''UPDATE''; ' +
' select @insertCount = COUNT(1) from @MergeSummary where Change = ''INSERT''; ' +
' select @allCount = COUNT(1) from @MergeSummary; ' ;
BEGIN TRANSACTION
EXECUTE sp_executesql @MergeStatement,
N'@updateCount int OUTPUT, @insertCount int OUTPUT, @allCount int OUTPUT',
@updateCount = @updated OUTPUT,
@insertCount = @inserted OUTPUT,
@allCount = @merged OUTPUT;
set @ErrorCode = @@ERROR
IF @ErrorCode =0
begin
exec (N'DELETE FROM ' + @WritebackView);
set @ErrorCode = @@ERROR
end
IF @ErrorCode =0
begin
UPDATE cfg.Measure
SET LastWritebackMergeTime=GETDATE(),
LastUpdatedCount=@updated,
LastInsertedCount=@inserted,
LastWritebackCount=@merged
WHERE MeasureId = @MeasureId;
set @ErrorCode = @@ERROR
end
IF @ErrorCode = 0
COMMIT
ELSE
ROLLBACK
END
ELSE
begin
set @msg = 'No writeback records found for measure ''' + @MeasureName + ''' (MeasureId=' + cast(@MeasureId as nvarchar(50)) + ')';
exec dbo.log_info @@PROCID, @msg;
end
end
else
begin
set @msg = 'FactTable for measure ''' + @MeasureName + ''' (MeasureId=' + cast(@MeasureId as nvarchar(50)) + ') is not configured';
exec log_error @@PROCID, @msg;
end
else
begin
set @msg = 'WritebackView for measure ''' + @MeasureName + ''' (MeasureId=' + cast(@MeasureId as nvarchar(50)) + ') is not configured';
exec log_error @@PROCID, @msg;
end;
exec log_info @@PROCID, 'Ended';
END
-- SP11. [dbo].[process_batch]
-- Description: Process the data from a batch in the staging table into the forecast database
-- =============================================
CREATE PROCEDURE [dbo].[process_batch]
@BatchId int
AS
DECLARE @FactTable nvarchar(1000),
@MeasureName nvarchar(1000),
@StatusCode varchar(50),
@FieldList nvarchar(2000),
@InsertStatement nvarchar(4000),
@MeasureId int,
@ClearMeasure int,
@ErrorRecordCount int,
@TotalRecordCount int,
@StartDate datetime,
@EndDate datetime,
@LoadStartDate datetime,
@LoadEndDate datetime,
@msg nvarchar(1000),
@ErrorCode int;
;
BEGIN
SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
-- logging
set @msg = 'Batch processing started for BatchId ' + cast(@BatchId as varchar);
exec log_info @@PROCID, @msg;
-- initialise batch variables
select @StatusCode = StatusCode,
@MeasureId = MeasureId,
@ClearMeasure = ClearMeasure,
@StartDate = StartDate,
@EndDate = EndDate,
@FactTable = dbo.get_fact_table_name(MeasureId),
@FieldList = dbo.get_measure_keyfield_list(MeasureId,'')
from stg.V_BatchHeader
where BatchId = @BatchId;
IF @@ROWCOUNT = 1
begin
If @StatusCode = dbo.get_ready_to_load_status()
BEGIN
-- update BachHeader information
update stg.BatchHeader
set LastRunStartDateTime = GETDATE(),
LastRunUser = USER_NAME(),
RunCount = RunCount + 1
where BatchId = @BatchId;
exec log_info @@PROCID, 'Header updated';
select @LoadStartDate = MIN(Date),
@LoadEndDate = MAX(Date)
from stg.BatchData
where BatchId = @BatchId;
-- if explicity Start and End dates are provided, use these
if @StartDate is not null set @LoadStartDate = @StartDate;
if @EndDate is not null set @LoadEndDate = @EndDate;
set @msg = 'Date range determined : ' + convert(nvarchar, @LoadStartDate, 104) + ' -> '
+ convert(nvarchar, @LoadEndDate, 104);
exec log_info @@PROCID, @msg;
BEGIN TRANSACTION
-- delete data range to load in target measure
if @ClearMeasure = 1 -- delete entire measure,
exec delete_measure @MeasureId, null, null;
else
exec delete_measure @MeasureId, @LoadStartDate, @LoadEndDate
set @ErrorCode = @@ERROR
if @ErrorCode = 0
begin
-- copy valid records
set @InsertStatement =
N'INSERT INTO ' + @FactTable + ' (' + @FieldList + ', Quantity) ' +
'SELECT ' + @FieldList + ', Quantity ' +
'FROM stg.V_BatchData ' +
'WHERE BatchId = @BatchId '+
'AND DeducedError = 0 ' +
'AND Date BETWEEN @StartDate AND @EndDate;';
exec sp_executesql @InsertStatement,
N'@BatchId int, @StartDate datetime, @EndDate datetime',
@BatchId, @LoadStartDate, @LoadEndDate;
set @ErrorCode = @@ERROR
exec log_info @@PROCID, 'Copy performed';
end
IF @ErrorCode = 0
COMMIT
ELSE
ROLLBACK
-- update the erronerous records
update stg.V_Batchdata
set ErrorCode = DeducedError
where BatchId = @BatchId;
exec log_info @@PROCID, 'Update the errorcode for the batchdata';
-- update batch information
set @StatusCode = dbo.get_error_status();
set @ErrorRecordCount = dbo.get_batch_error_count(@BatchId);
set @TotalRecordCount = dbo.get_batch_total_count(@BatchId);
if @ErrorRecordCount = 0 set @StatusCode = dbo.get_success_status();
UPDATE stg.BatchHeader
SET ErrorRecordCount = @ErrorRecordCount,
TotalRecordCount = @TotalRecordCount,
StatusCode = @StatusCode,
LastRunEndDateTime = GETDATE()
WHERE BatchId = @BatchId;
exec log_info @@PROCID, 'Batch Header updated';
END
ELSE
BEGIN
set @msg = 'Batch should have status ''' + dbo.get_ready_to_load_Status() +
''' (Ready to Load). Status ''' + @StatusCode + ''' was found.';
exec log_error @@PROCID, @msg;
END
END
ELSE
BEGIN
set @msg = 'No batchheader record found with BatchId=' + CAST(@BatchId as varchar);
exec log_error @@PROCID, @msg;
END
set @msg = 'Batch processing ended for BatchId ' + cast(@BatchId as varchar);
exec log_info @@PROCID, @msg;
END
GO
-- SP12. [dbo].[process_batch_all]
-- Description: Import all unprocessed batches
-- =============================================
CREATE PROCEDURE [dbo].[process_batch_all]
AS
DECLARE @BatchId int,
@ErrorCode int;
DECLARE BatchCursor CURSOR FOR
SELECT BatchId
FROM stg.BatchHeader
ORDER BY CreateDateTime;
BEGIN
SET NOCOUNT ON;
exec log_info @@PROCID, 'Started';
OPEN BatchCursor;
FETCH NEXT FROM BatchCursor into @BatchId;
WHILE @@FETCH_STATUS = 0
BEGIN
BEGIN TRY
exec dbo.[process_batch] @BatchId;
END TRY
BEGIN CATCH
Declare @msg varchar(1000);
select @msg = ERROR_MESSAGE();
exec log_error @@PROCID, @msg;
END CATCH
FETCH NEXT FROM BatchCursor into @BatchId;
END;
CLOSE BatchCursor;
DEALLOCATE BatchCursor;
exec log_info @@PROCID, 'Ended';
END
GO
-- SP13. [dbo].[process_dimension]
-- Description: Process the dimension data from a batch in the staging table into the forecast database
-- =============================================
CREATE PROCEDURE [dbo].[process_dimension]
@DimensionBatchId int
AS
DECLARE @StatusCode varchar(50),
@TotalRecordCount int,
@DimensionId int,
@DimensionCode nvarchar(50),
@DimensionName nvarchar(50),
@ValidFromDate datetime,
@LevelCount int,
@BatchDataLevelCount int,
@DimensionLevelCount int,
@DataRecordCount int,
@msg nvarchar(1000),
@ErrorCode int;
BEGIN
SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
-- logging
set @msg = 'Batch processing started for DimensionBatchId ' + cast(@DimensionBatchId as varchar);
exec log_info @@PROCID, @msg;
-- initialise batch variables
select @StatusCode = StatusCode,
@DimensionName = DimensionName,
@DimensionId = dbo.get_dimension_id(DimensionName),
@ValidFromDate = ValidFromDate,
@TotalRecordCount = TotalRecordCount,
@LevelCount = LevelCount
from stg.DimensionBatchHeader
where DimensionBatchId = @DimensionBatchId;
IF @@ROWCOUNT = 1 -- Batch exists
BEGIN
-- indicate start
update stg.DimensionBatchHeader
set LastRunStartDateTime = GETDATE(),
LastRunUser = USER,
RunCount = RunCount + 1
where DimensionBatchId = @DimensionBatchId;
select @BatchDataLevelCount = dbo.get_dimension_batch_level_count(@DimensionBatchId),
@DimensionLevelCount = dbo.get_dimension_level_count(@DimensionId),
@DataRecordCount = dbo.get_dimension_batch_total_record_count(@DimensionBatchId);
-- validate input
if @DataRecordCount = @TotalRecordCount -- same amount of records in data records as in header
begin
exec log_info @@PROCID, 'Amount of Data records as indicated in header';
if @BatchDataLevelCount = @LevelCount -- same amount of levels in data records as in header
begin
exec log_info @@PROCID, 'Amount of levels in batch as indicated in header';
if @DimensionLevelCount = @LevelCount -- same amount of levels in Batch as in Dimension
begin
exec log_info @@PROCID, 'Amount of levels in dimension as indicated in header';
if @StatusCode = dbo.get_ready_to_load_status()
begin
exec log_info @@PROCID, 'Ready for processing status';
if @ValidFromDate < GETDATE() -- already allowed to load new dimension information
begin
exec log_info @@PROCID, 'Dimension data became actual';
exec dbo.merge_batch_to_dimension @DimensionBatchId, @DimensionId
end
else
begin
set @msg = 'ValidfromDate ' + convert(varchar,@ValidFromDate,104) + ' is later than now. ' +
'Dimension should not be loaded yet.';
exec log_error @@PROCID, @msg;
end
end
else
begin
set @msg = 'Batch has invalid StatusCode';
exec log_error @@PROCID, @msg;
end
end
else
begin
set @msg = 'Amount of levels in Dimension configuration (=' + cast(@DimensionLevelCount as varchar) + ') ' +
'different from indicated amount of levels in DimensionBatchHeader (=' + cast(@LevelCount as varchar) + ' levels) ' +
' for ' + @DimensionName + ' dimension.';
exec log_error @@PROCID, @msg;
end
end
else
begin
set @msg = 'Amount of levels in DimensionBatchData (=' + cast(@BatchDataLevelCount as varchar) + ' levels) ' +
'different from indicated amount of levels in DimensionBatchHeader (=' + cast(@LevelCount as varchar) + ' levels).';
exec log_error @@PROCID, @msg;
end
end
else
begin
set @msg = 'Amount of records in batch (=' + cast(@DataRecordCount as varchar) + ') ' +
'different from indicated amount in DimensionBatchHeader (=' + cast(@TotalRecordCount as varchar) + ').';
exec log_error @@PROCID, @msg;
end
-- indicate end
update stg.DimensionBatchHeader
set LastRunEndDateTime = GETDATE(),
StatusCode = dbo.get_success_status()
where DimensionBatchId = @DimensionBatchId;
end
else
begin
set @msg = 'No batchheader record found with DimensionBatchId=' + CAST(@DimensionBatchId as varchar);
exec log_error @@PROCID, @msg;
END
set @msg = 'Batch processing ended for DimensionBatchId ' + cast(@DimensionBatchId as varchar);
exec log_info @@PROCID, @msg;
END
GO
-- SP15. [dbo].[process_dimension_all]
-- Description: Import all unprocessed dimensions
-- =============================================
CREATE PROCEDURE [dbo].[process_dimension_all]
AS
DECLARE @DimensionsBatchId int,
@ErrorCode int;
DECLARE DimensionBatchCursor CURSOR FOR
SELECT DimensionBatchId
FROM stg.DimensionBatchHeader
ORDER BY CreateDateTime;
BEGIN
SET NOCOUNT ON;
exec log_info @@PROCID, 'Started';
OPEN DimensionBatchCursor;
FETCH NEXT FROM DimensionBatchCursor into @DimensionsBatchId;
WHILE @@FETCH_STATUS = 0
BEGIN
BEGIN TRY
exec dbo.[process_dimension] @DimensionsBatchId;
END TRY
BEGIN CATCH
Declare @msg varchar(1000);
select @msg = ERROR_MESSAGE();
exec log_error @@PROCID, @msg;
END CATCH
FETCH NEXT FROM DimensionBatchCursor into @DimensionsBatchId;
END;
CLOSE DimensionBatchCursor;
DEALLOCATE DimensionBatchCursor;
exec log_info @@PROCID, 'Ended';
END
GO
ETL Flow
1. Create Batch Header:
exec dbo.create_batch_header @MeasureCode = ?, @ExternalSystemCode = ?, @TotalRecordCount = ?, @StartDate = ?, @EndDate = ?
2. Transfer data records from source to destination.
3. Finalizing Batch Header
exec dbo.finalise_batch_header @BatchId = ?, @TotalRecordCount = ?
4. Log Download Error
exec dbo.log_ssis_error @PackageName = ?, @SourceName = '', @TaskName = ?, @Message = 'BI download failed';
5. Load Data from staging to associated measure.
exec dbo.process_batch @BatchId = ?
6. Log Upload Error
exec dbo.log_ssis_error @PackageName = ?, @SourceName = '', @TaskName = ?, @Message = 'FCST upload failed';
No comments:
Post a Comment