Tuesday, 2 July 2013

Generic Staging System Design for ETL

-- 1. [stg].[ExternalSystem]

CREATE TABLE [stg].[ExternalSystem]

(
    [ExternalSystemCode] [varchar](50) NOT NULL,
    [ExternalSystem] [nvarchar](1000) NOT NULL,
 CONSTRAINT [PK_ExternalSystem] PRIMARY KEY CLUSTERED
(
    [ExternalSystemCode] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]






 -- 2. [stg].[BatchStatus]

CREATE TABLE [stg].[BatchStatus]

(
    [StatusCode] [varchar](50) NOT NULL,
    [Status] [varchar](50) NOT NULL,
 CONSTRAINT [PK_BatchStatus] PRIMARY KEY CLUSTERED
(
    [StatusCode] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY],
 CONSTRAINT [UK_BatchStatus] UNIQUE NONCLUSTERED
(
    [Status] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]









-- 3. [stg].[BatchHeader]

CREATE TABLE [stg].[BatchHeader]

(
    [BatchId] [int] IDENTITY(1,1) NOT NULL,
    [ExternalSystemCode] [varchar](50) NOT NULL,
    [StatusCode] [varchar](50) NOT NULL,
    [TotalRecordCount] [int] NULL,
    [ErrorRecordCount] [int] NULL,
    [LastRunStartDateTime] [datetime] NULL,
    [LastRunEndDateTime] [datetime] NULL,
    [LastRunUser] [varchar](50) NULL,
    [RunCount] [int] NOT NULL,
    [CreateDateTime] [datetime] NOT NULL,
    [CreateUser] [varchar](50) NOT NULL,
    [MeasureCode] [varchar](50) NOT NULL,
    [ClearMeasure] [int] NOT NULL,
    [StartDate] [datetime] NULL,
    [EndDate] [datetime] NULL,
 CONSTRAINT [PK_BatchHeader] PRIMARY KEY CLUSTERED
(
    [BatchId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

GO

SET ANSI_PADDING OFF
GO

ALTER TABLE [stg].[BatchHeader] ADD  CONSTRAINT [DF_BatchHeader_StatusId]  DEFAULT ((0)) FOR [StatusCode]
GO

ALTER TABLE [stg].[BatchHeader] ADD  CONSTRAINT [DF_BatchHeader_RunCount]  DEFAULT ((0)) FOR [RunCount]
GO

ALTER TABLE [stg].[BatchHeader] ADD  CONSTRAINT [DF_BatchHeader_CreateDateTime]  DEFAULT (getdate()) FOR [CreateDateTime]
GO

ALTER TABLE [stg].[BatchHeader] ADD  CONSTRAINT [DF_BatchHeader_CreateUser]  DEFAULT (user_name()) FOR [CreateUser]
GO

ALTER TABLE [stg].[BatchHeader] ADD  CONSTRAINT [DF_BatchHeader_ClearMeasure]  DEFAULT ((0)) FOR [ClearMeasure]
GO

ALTER TABLE [stg].[BatchHeader]  WITH CHECK ADD  CONSTRAINT [FK_BatchHeader_BatchStatus] FOREIGN KEY([StatusCode])
REFERENCES [stg].[BatchStatus] ([StatusCode])
GO

ALTER TABLE [stg].[BatchHeader] CHECK CONSTRAINT [FK_BatchHeader_BatchStatus]
GO

ALTER TABLE [stg].[BatchHeader]  WITH CHECK ADD  CONSTRAINT [FK_BatchHeader_ExternalSystem] FOREIGN KEY([ExternalSystemCode])
REFERENCES [stg].[ExternalSystem] ([ExternalSystemCode])
GO

ALTER TABLE [stg].[BatchHeader] CHECK CONSTRAINT [FK_BatchHeader_ExternalSystem]
GO








-- 4. [stg].[DimensionBatchHeader]

CREATE TABLE [stg].[DimensionBatchHeader]

(
    [DimensionBatchId] [int] IDENTITY(1,1) NOT NULL,
    [DimensionName] [varchar](50) NOT NULL,
    [ValidFromDate] [datetime] NOT NULL,
    [TotalRecordCount] [int] NOT NULL,
    [LevelCount] [int] NOT NULL,
    [StatusCode] [varchar](50) NOT NULL,
    [LastRunStartDateTime] [datetime] NULL,
    [LastRunEndDateTime] [datetime] NULL,
    [LastRunUser] [varchar](50) NULL,
    [RunCount] [int] NOT NULL,
    [CreateDateTime] [datetime] NOT NULL,
    [CreateUser] [varchar](50) NOT NULL,
 CONSTRAINT [PK_DimensionHeader] PRIMARY KEY CLUSTERED
(
    [DimensionBatchId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

GO

SET ANSI_PADDING OFF
GO

ALTER TABLE [stg].[DimensionBatchHeader] ADD  CONSTRAINT [DF_DimensionHeader_RecordCount]  DEFAULT ((0)) FOR [TotalRecordCount]
GO

ALTER TABLE [stg].[DimensionBatchHeader] ADD  CONSTRAINT [DF_DimensionHeader_StatusCode]  DEFAULT ('E') FOR [StatusCode]
GO

ALTER TABLE [stg].[DimensionBatchHeader] ADD  CONSTRAINT [DF_DimensionHeader_RunCount]  DEFAULT ((0)) FOR [RunCount]
GO

ALTER TABLE [stg].[DimensionBatchHeader] ADD  CONSTRAINT [DF_DimensionHeader_CreateDateTime]  DEFAULT (getdate()) FOR [CreateDateTime]
GO

ALTER TABLE [stg].[DimensionBatchHeader] ADD  CONSTRAINT [DF_DimensionHeader_CreateUser]  DEFAULT (user_name()) FOR [CreateUser]
GO

















-- 5. [stg].[DimensionBatchData]
CREATE TABLE [stg].[DimensionBatchData]

(
    [RecordId] [int] IDENTITY(1,1) NOT NULL,
    [DimensionBatchId] [int] NOT NULL,
    [LevelCode] [nvarchar](100) NOT NULL,
    [LevelOrdinal] [int] NOT NULL,
    [MemberCode] [nvarchar](100) NOT NULL,
    [MemberDescription] [nvarchar](256) NOT NULL,
    [ParentCode] [nvarchar](100) NULL,
    [AttributeName] [nvarchar](100) NULL,
    [AttributeValue] [nvarchar](100) NULL,
 CONSTRAINT [PK_DimensionData] PRIMARY KEY CLUSTERED
(
    [RecordId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

GO


ALTER TABLE [stg].[DimensionBatchData]  WITH CHECK ADD  CONSTRAINT [FK_DimensionData_DimensionHeader] FOREIGN KEY([DimensionBatchId])
REFERENCES [stg].[DimensionBatchHeader] ([DimensionBatchId])
GO

ALTER TABLE [stg].[DimensionBatchData] CHECK CONSTRAINT [FK_DimensionData_DimensionHeader]
GO









-- 6. [dbo].[Log]

CREATE TABLE [dbo].[Log](
    [LogId] [int] IDENTITY(1,1) NOT NULL,
    [ProcedureName] [nvarchar](100) NULL,
    [PackageName] [nvarchar](50) NULL,
    [SourceName] [nvarchar](50) NULL,
    [TaskName] [nvarchar](50) NULL,
    [Message] [nvarchar](1000) NOT NULL,
    [LogDateTime] [datetime] NOT NULL,
    [LogUser] [varchar](50) NOT NULL,
    [LogType] [varchar](50) NOT NULL
) ON [PRIMARY]

GO

SET ANSI_PADDING OFF
GO

ALTER TABLE [dbo].[Log] ADD  CONSTRAINT [DF_Log_LogDateTime]  DEFAULT (getdate()) FOR [LogDateTime]
GO

ALTER TABLE [dbo].[Log] ADD  CONSTRAINT [DF_Log_LogUser]  DEFAULT (user_name()) FOR [LogUser]
GO

ALTER TABLE [dbo].[Log] ADD  CONSTRAINT [DF_Log_LogType]  DEFAULT ('INFO') FOR [LogType]
GO




Stored Procedures

 

-- SP 1. [dbo].[log_ssis_error]



CREATE PROCEDURE [dbo].[log_ssis_error]
    -- Add the parameters for the stored procedure here
    @PackageName nvarchar(150),
    @SourceName nvarchar(150),
    @TaskName nvarchar(150),
    @Message nvarchar(500)
AS
BEGIN
    SET NOCOUNT ON;
    insert into dbo.Log (PackageName, SourceName, TaskName, [Message], LogType)
    values (@PackageName, @SourceName, @TaskName, @Message, 'ERROR') 
END


--SP 2 [dbo].[log_info]


-- Description:    Logs information message to Log table
-- =============================================
CREATE PROCEDURE [dbo].[log_info]
    -- Add the parameters for the stored procedure here
    @ProcId int,
    @Message nvarchar(1000)
AS
BEGIN
    SET NOCOUNT ON;
    insert into dbo.[Log] (ProcedureName, Message, LogType)
    values (OBJECT_NAME(@ProcId), @Message, 'INFO');   
END

GO



--SP3.  [dbo].[log_error]
 

-- Description:    Logs error message to Log table
-- =============================================
CREATE PROCEDURE [dbo].[log_error]
    -- Add the parameters for the stored procedure here
    @ProcId int,
    @Message nvarchar(1000)
AS
BEGIN
    SET NOCOUNT ON;
    insert into dbo.[Log] (ProcedureName, Message, LogType)
    values (OBJECT_NAME(@ProcId), @Message, 'ERROR');   
END

GO




--SP4. [dbo].[create_batch]



-- Description:    Create a batch from existing measure, to send to an external system
-- =============================================
CREATE PROCEDURE [dbo].[create_batch]
    @MeasureId int,
    @ExternalSystemCode varchar(50),
    @StartDate datetime,
    @EndDate datetime   
AS
DECLARE @FactTable nvarchar(1000),
        @MeasureName nvarchar(1000),
        @MeasureCode nvarchar(50),
        @ExternalSystem nvarchar(1000),
        @KeyFieldList nvarchar(1000),
        @CodeFieldList nvarchar(1000),
        @LevelCodeFieldList nvarchar(1000),
        @DimensionJoin nvarchar(1000),
        @DimensionFrom nvarchar(1000),
        @InsertStatement nvarchar(4000),
        @TotalRecordCount int,
        @BatchId int,
        @msg nvarchar(1000),
        @ErrorCode int;
BEGIN
  SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
 
  -- logging
  exec log_info @@PROCID, 'Batch creation started';
 
  -- initialise batch variables
  set @FactTable = dbo.get_fact_table_name(@MeasureId);
  set @MeasureName = dbo.get_measure_name(@MeasureId);
  set @MeasureCode = dbo.get_measure_code(@MeasureId);
  set @ExternalSystem = dbo.get_external_system_name(@ExternalSystemCode);
 
  IF (@FactTable is not null and @MeasureCode is not null)
  begin
    IF (@ExternalSystem is not null)
    begin
      -- validate input date range
      if @StartDate is null set @StartDate = dbo.get_earliest_date();
      if @EndDate is null set @EndDate = dbo.get_latest_date();
     
      IF (@StartDate <= @EndDate)
      begin
        set @msg = 'Date range determined : ' + convert(nvarchar, @StartDate, 104) + ' -> '
                                              + convert(nvarchar, @EndDate, 104);
        exec log_info @@PROCID, @msg;
       
        set @CodeFieldList = dbo.get_measure_codefield_list    (@MeasureId, 0);
        set @LevelCodeFieldList = dbo.get_measure_codefield_list (@MeasureId, 1);
        set @DimensionJoin = dbo.get_measure_dimension_join (@MeasureId);
        set @DimensionFrom = dbo.get_measure_dimension_from (@MeasureId);
       
        -- create detail records in BatchData
        set @InsertStatement =
              'INSERT INTO stg.BatchData ' +
              '(' + @CodeFieldList + ', Quantity, BatchId) ' +
              'SELECT ' + @LevelCodeFieldList + ', Quantity, @BatchId ' +
                'FROM ' + @DimensionFrom + ' ' +
               'WHERE ' + @DimensionJoin + ' ' +
                 'AND Date BETWEEN @StartDate AND @EndDate';
        exec log_info @@PROCID, @InsertStatement;
               
        exec dbo.writeback_measure @MeasureId;
       
        -- create header record in BatchHeader
        BEGIN TRANSACTION
        begin
          declare @BatchIdTable table(id int);
          INSERT INTO stg.BatchHeader (ExternalSystemCode, StatusCode, MeasureCode, StartDate, EndDate )
               OUTPUT inserted.batchid into @BatchIdTable
               VALUES (@ExternalSystemCode, 'C', @MeasureCode, @StartDate, @EndDate);
          set @ErrorCode = @@ERROR;
          select @BatchId = id from @BatchIdTable;
        end;

        set @msg = 'Header record created with BatchId ' + convert(nvarchar, @BatchId);
        exec log_info @@PROCID, @msg;
   
        IF @ErrorCode = 0
        begin
          exec sp_executesql @InsertStatement,
                             N'@BatchId int, @StartDate datetime, @EndDate datetime',
                             @BatchId, @StartDate, @EndDate;
          set @ErrorCode = @@ERROR;                  
          exec log_info @@PROCID, 'Batch Data generated';
        end;
       
        IF @ErrorCode = 0
          COMMIT
        ELSE
          ROLLBACK
       
        -- postprocessing
        select @TotalRecordCount = COUNT(1)
          from stg.BatchData
         where BatchId = @BatchId;
       
        -- update BachHeader information
        update stg.BatchHeader
           set StatusCode = 'S',  -- Ready to send
               TotalRecordCount = @TotalRecordCount
         where BatchId = @BatchId;
          
        exec log_info @@PROCID, 'Batch Header updated';
       
      END
      ELSE -- invalid date range
      BEGIN
        set @msg = 'Invalid date range. Startdate (' + CAST(@Startdate as varchar) +
                                  ')  > Enddate   (' + CAST(@Enddate as varchar)   + ')';
        exec log_error @@PROCID, @msg;
      END
    END     
    ELSE -- no external system found for supplied code
    BEGIN
      set @msg = 'No External System defined for ExternalSystemCode ' + CAST(@ExternalSystemCode as varchar);
      exec log_error @@PROCID, @msg;
    END
  END     
  ELSE -- no factablename found
  BEGIN
    set @msg = 'No FactTable or MeasureCode defined. MeasureId ' + CAST(@MeasureId as varchar) + ' is not valid.';
    exec log_error @@PROCID, @msg;
  END
 
  set @msg = 'Batch generation ended';
  exec log_info @@PROCID, @msg;
  
  END

GO


-- SP5. [dbo].[create_batch_header]


-- Description:    Create a batch header record, returning the BatchId
-- =============================================
CREATE PROCEDURE [dbo].[create_batch_header]
    @MeasureCode nvarchar(50),
    @ExternalSystemCode varchar(50),
    @TotalRecordCount int,
    @StartDate datetime,
    @EndDate datetime
AS
declare @BatchIdTable table(BatchId int);
declare @msg nvarchar(1000),
        @ErrorCode int;
BEGIN
  INSERT INTO stg.BatchHeader
              (ExternalSystemCode,
               StatusCode,
               TotalRecordCount,
               MeasureCode,
               StartDate,
               EndDate)
       OUTPUT inserted.BatchId into @BatchIdTable
       VALUES (@ExternalSystemCode,
               dbo.get_constructing_batch_status(),
               @TotalRecordCount,
               @MeasureCode,
               @StartDate,
               @EndDate);
 
  SELECT BatchId from @BatchIdTable;
END

GO



-- SP6. [dbo].[create_dimension_batch_header]



-- Description:    Create a batch header record for dimension data, returning the BatchId
-- =============================================
CREATE PROCEDURE [dbo].[create_dimension_batch_header]
    @DimensionName nvarchar(50),
    @TotalRecordCount int,
    @LevelCount INT,
    @ValidFromDate datetime
AS
declare @BatchIdTable table(BatchId int);
declare @msg nvarchar(1000),
        @ErrorCode int;
BEGIN
  INSERT INTO stg.DimensionBatchHeader
              (DimensionName,
               ValidFromDate,
               TotalRecordCount,
               LevelCount,
               StatusCode
               )
       OUTPUT inserted.DimensionBatchId into @BatchIdTable
       VALUES (@DimensionName,
               @ValidFromDate,
               @TotalRecordCount,
               @LevelCount,
               dbo.get_constructing_batch_status());
 
  SELECT BatchId from @BatchIdTable;
END

GO




-- SP7.  [dbo].[delete_batch]



-- Description:    Delete a specific Batch of data
-- =============================================
CREATE PROCEDURE [dbo].[delete_batch]
    @BatchId int
AS
DECLARE @RecordsDeleted int,
        @msg nvarchar(1000),
        @ErrorCode int;
BEGIN
  SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
 
  -- logging
  set @msg = 'Batch deletion started for BatchId ' + cast(@BatchId as varchar);
  exec log_info @@PROCID, @msg;
 
  BEGIN TRANSACTION
  -- delete batch data detail records
  delete from stg.BatchData
   where BatchId = @BatchId;
  
  set @ErrorCode = @@ERROR;
  set @RecordsDeleted = @@ROWCOUNT;
 
  if @ErrorCode = 0
  begin
    -- delete batch header record
    delete from stg.BatchHeader
     where BatchId = @BatchId;
    
    set @ErrorCode = @@ERROR;
  end
 
  if @ErrorCode = 0
  begin
    COMMIT
    set @msg = cast(@RecordsDeleted as varchar) + ' records deleted.';
    exec log_info @@PROCID, @msg;
  end
  else
  begin
    ROLLBACK
    set @msg = 'Error during deletion, operation rolled back.';
    exec log_info @@PROCID, @msg;
  end;
 
  set @msg = 'Batch deletion ended for BatchId ' + cast(@BatchId as varchar) + '.';
  exec log_info @@PROCID, @msg;
  
  END

GO





-- SP8. [dbo].[merge_batch_to_dimension]



-- Description:    Core procedure for merging dimension batch to dimension table
-- =============================================
CREATE PROCEDURE [dbo].[merge_batch_to_dimension]
    @DimensionBatchId int,
    @DimensionId int
AS
DECLARE LevelTableCursor cursor for
                select quotename(LevelTable),
                       LevelOrdinal,
                       quotename(CodeField),
                       quotename(DescriptionField),
                       quotename(KeyField)
                  from cfg.V_LevelDimension
                 where DimensionId = @DimensionId
              order by LevelOrdinal;
DECLARE @LevelTable nvarchar(50), @PreviousLevelTable nvarchar(50),
        @LevelOrdinal int,
        @KeyField nvarchar(50), @PreviousKeyField nvarchar(50),
        @CodeField nvarchar(50), @PreviousCodeField nvarchar(50),
        @DescriptionField nvarchar(50),
        @sql nvarchar(2000),
        @ParentLookupSql nvarchar(1000),
        @msg varchar(1000),
        @ErrorCode int;
BEGIN
  SET NOCOUNT ON;

  SET @ParentLookupSql = NULL;
  SET @PreviousKeyField = NULL;
 
  exec log_info @@PROCID, 'Starting dimension merge transaction';
  BEGIN TRANSACTION
  set @ErrorCode = @@ERROR;

  OPEN LevelTableCursor;
  FETCH NEXT FROM LevelTableCursor into @LevelTable, @LevelOrdinal, @CodeField, @DescriptionField, @KeyField;
  WHILE (@@FETCH_STATUS = 0 and @ErrorCode = 0)
  BEGIN
     set @sql = 'MERGE ' + @LevelTable + ' as target ' +
           'USING (SELECT MemberCode, MemberDescription, ' +
           isnull(@ParentLookupSql, 'NULL') + ' ParentId ' +
                    'FROM stg.DimensionBatchData ' +
                   'WHERE DimensionBatchId = @DimensionBatchId ' +
                     'AND LevelOrdinal = @LevelOrdinal) as source '+
              'ON (target.' + @CodeField + ' = source.MemberCode) ' +
           'WHEN MATCHED THEN ' +
             'UPDATE set ' + @DescriptionField + ' = source.MemberDescription ' +
                             isnull(', ' + @PreviousKeyField + ' = ParentId ', ' ') +
           'WHEN NOT MATCHED THEN ' +
             'INSERT (' + @CodeField + ',' + @DescriptionField + isnull(',' + @PreviousKeyField, ' ') + ') ' +
             'VALUES (MemberCode, MemberDescription' + case when (@PreviousKeyField IS not null) then ', ParentId)' else ')' end + ';' ;
   
    EXEC sp_executesql @sql,
                     N'@DimensionBatchId int, @LevelOrdinal int',
                       @DimensionBatchId,     @LevelOrdinal;
   
    SET @ErrorCode = @@ERROR;      
   
    set @msg = 'Merge of level ' + CAST(@LevelOrdinal as varchar) + ' performed.'
    exec log_info @@PROCID, @msg;
   
    SELECT @PreviousLevelTable = @LevelTable,
           @PreviousCodeField = @CodeField,
           @PreviousKeyField = @KeyField;
   
    SET @ParentLookupSql =
           ' (SELECT ' + @PreviousKeyField +
              ' FROM ' + @PreviousLevelTable +
             ' WHERE ' + @PreviousCodeField + ' = ParentCode) ';
     
    FETCH NEXT FROM LevelTableCursor into @LevelTable, @LevelOrdinal, @CodeField, @DescriptionField, @KeyField;
 END;
 CLOSE LevelTableCursor;
 DEALLOCATE LevelTableCursor;

 if @ErrorCode =0
 begin
   COMMIT;
   exec log_info @@PROCID, 'Committed dimension merge transaction';
 end
 else
 begin
   ROLLBACK;
   exec log_info @@PROCID, 'Rolled back dimension merge transaction';
   exec log_info @@PROCID, @sql;
 end; 
END

GO










-- SP9 [dbo].[delete_measure]


-- Description:    Delete data from measure between Start and End date.
--              If not provided, it's substituted with respectively
--              the earliest and latest date known.
-- =============================================
CREATE PROCEDURE [dbo].[delete_measure]
    @MeasureId int,
    @StartDate datetime = null,
    @EndDate datetime = null
AS
DECLARE @MeasureName nvarchar(1000),
        @FactTable nvarchar(1000),
        @DeleteStatement nvarchar(4000),
        @DateSelection nvarchar(1000),
        @FieldList nvarchar(2000),
        @ErrorCode int;
BEGIN
  SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution

  exec log_info @@PROCID, 'Started';

  set @MeasureName = dbo.get_measure_name(@MeasureId);
  set @FactTable = dbo.get_fact_table_name(@MeasureId);
  set @DateSelection =
           N'WHERE DateId in ' +
            '(SELECT DateId FROM VD_Date WHERE Date ' +
             'BETWEEN isnull(@StartDate,(select MIN(Date) from VD_Date)) ' +
             'AND     isnull(@EndDate,(select MAX(Date) from VD_Date)))';
 
  -- merge writebacks with fact data for this measure
  -- this ensures no partial data remains when deleting from the fact table
  execute dbo.writeback_measure @MeasureId
 
  -- delete target measure
  set @DeleteStatement = N'DELETE FROM ' + quotename(@FactTable) + @DateSelection;
 
  execute sp_executesql @DeleteStatement,
                        N'@StartDate datetime, @EndDate datetime',
                        @StartDate, @EndDate;
 
  exec log_info @@PROCID, 'Ended';

END







-- SP10. [dbo].[writeback_measure]


 


-- Description:    Merge input data from writeback table to fact table for a specific measure
-- =============================================
CREATE PROCEDURE [dbo].[writeback_measure]
    @MeasureId int
AS
DECLARE @WritebackView nvarchar(1000),
        @FactTable nvarchar(1000),
        @MeasureName nvarchar(1000),
        @FieldList nvarchar(2000),
        @PrefixFieldList nvarchar(1000),
        @MergeJoin nvarchar(2000),
        @MergeStatement nvarchar(4000),
        @CountStatement nvarchar(1000),
        @SourcePrefix nvarchar(1) = 'w',
        @TargetPrefix nvarchar(1) = 'f',
        @inserted int,
        @updated int,
        @merged int,
        @writebacks int,
        @msg nvarchar(1000),
        @ErrorCode int;
;
BEGIN
  SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution

  exec log_info @@PROCID, 'Started';

  set @MeasureName = dbo.get_measure_name(@MeasureId);
  set @FactTable = dbo.get_fact_table_name(@MeasureId);
  set @WritebackView = dbo.get_writeback_view_name(@MeasureId);
 
  if @WritebackView is not null
    if @FactTable is not null
    begin
    set @CountStatement = N'select @WritebackCount = COUNT(1) from ' + @WritebackView;
      EXECUTE sp_executesql @CountStatement,
              N'@WritebackCount int OUTPUT',
              @WritebackCount = @writebacks OUTPUT;
     
      IF @writebacks > 0
      BEGIN
        set @FieldList = dbo.get_measure_keyfield_list(@MeasureId, '');
        set @PrefixFieldList = dbo.get_measure_keyfield_list(@MeasureId, @SourcePrefix);
        set @MergeJoin = dbo.get_measure_merge_join(@MeasureId, @SourcePrefix, @TargetPrefix);
   
        set @MergeStatement =
         N'DECLARE @MergeSummary TABLE(Change VARCHAR(20));' +
          'MERGE ' + @FactTable + ' AS ' + @TargetPrefix +
          ' USING (SELECT ' + @FieldList + ', sum(Quantity) Quantity FROM ' + @WritebackView +
          ' GROUP BY ' + @FieldList + ') AS ' + @SourcePrefix +
          ' ON (' + @MergeJoin + ')' +
          ' WHEN MATCHED THEN ' +
          '  UPDATE SET ' + @TargetPrefix + '.Quantity = ' +
                            @TargetPrefix + '.Quantity + ' +
                            @SourcePrefix + '.Quantity ' +
          '    WHEN NOT MATCHED THEN ' +
          '  INSERT (' + @FieldList + ', Quantity) ' +
          '  VALUES (' + @PrefixFieldList + ', ' + @SourcePrefix + '.Quantity)' +
          ' OUTPUT $action INTO @MergeSummary ;'+
          ' select @updateCount = COUNT(1) from @MergeSummary where Change = ''UPDATE''; ' +
          ' select @insertCount = COUNT(1) from @MergeSummary where Change = ''INSERT''; ' +
          ' select @allCount = COUNT(1) from @MergeSummary; ' ;
         
          BEGIN TRANSACTION
          EXECUTE sp_executesql  @MergeStatement,
            N'@updateCount int OUTPUT, @insertCount int OUTPUT, @allCount int OUTPUT',
              @updateCount = @updated OUTPUT,
              @insertCount = @inserted OUTPUT,
              @allCount = @merged OUTPUT;

            set @ErrorCode = @@ERROR
           
            IF @ErrorCode =0
            begin
              exec (N'DELETE FROM ' + @WritebackView);
              set @ErrorCode = @@ERROR
            end
           
            IF @ErrorCode =0
            begin
              UPDATE cfg.Measure
                 SET LastWritebackMergeTime=GETDATE(),
                     LastUpdatedCount=@updated,
                     LastInsertedCount=@inserted,
                     LastWritebackCount=@merged
                WHERE MeasureId = @MeasureId;
              set @ErrorCode = @@ERROR
            end
           
            IF @ErrorCode = 0
              COMMIT
            ELSE
              ROLLBACK  
        END
        ELSE
        begin
          set @msg = 'No writeback records found for measure ''' + @MeasureName + ''' (MeasureId=' + cast(@MeasureId as nvarchar(50)) + ')';
          exec dbo.log_info @@PROCID, @msg;
        end 
      end
    else
    begin
      set @msg = 'FactTable for measure ''' + @MeasureName + ''' (MeasureId=' + cast(@MeasureId as nvarchar(50)) + ') is not configured';
      exec log_error @@PROCID, @msg;
    end  
    else
    begin
      set @msg = 'WritebackView for measure ''' + @MeasureName + ''' (MeasureId=' + cast(@MeasureId as nvarchar(50)) + ') is not configured';
      exec log_error @@PROCID, @msg;
    end;

  exec log_info @@PROCID, 'Ended';

END







-- SP11. [dbo].[process_batch]


-- Description:    Process the data from a batch in the staging table into the forecast database
-- =============================================
CREATE PROCEDURE [dbo].[process_batch]
    @BatchId int
AS
DECLARE @FactTable nvarchar(1000),
        @MeasureName nvarchar(1000),
        @StatusCode varchar(50),
        @FieldList nvarchar(2000),
        @InsertStatement nvarchar(4000),
        @MeasureId int,
        @ClearMeasure int,
        @ErrorRecordCount int,
        @TotalRecordCount int,
        @StartDate datetime,
        @EndDate datetime,
        @LoadStartDate datetime,
        @LoadEndDate datetime,
        @msg nvarchar(1000),
        @ErrorCode int;
;
BEGIN
  SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
 
  -- logging
  set @msg = 'Batch processing started for BatchId ' + cast(@BatchId as varchar);
  exec log_info @@PROCID, @msg;
 
  -- initialise batch variables
  select @StatusCode = StatusCode, 
         @MeasureId = MeasureId,
         @ClearMeasure = ClearMeasure,
         @StartDate = StartDate,
         @EndDate = EndDate,
         @FactTable = dbo.get_fact_table_name(MeasureId),
         @FieldList = dbo.get_measure_keyfield_list(MeasureId,'')       
    from stg.V_BatchHeader
   where BatchId = @BatchId;
 
  IF @@ROWCOUNT = 1
  begin
      If @StatusCode = dbo.get_ready_to_load_status()
      BEGIN
        -- update BachHeader information
        update stg.BatchHeader
           set LastRunStartDateTime = GETDATE(),
               LastRunUser = USER_NAME(),
               RunCount = RunCount + 1
         where BatchId = @BatchId;
     
        exec log_info @@PROCID, 'Header updated';
     
        select @LoadStartDate = MIN(Date),
               @LoadEndDate = MAX(Date)
          from stg.BatchData
         where BatchId = @BatchId;
   
        -- if explicity Start and End dates are provided, use these
        if @StartDate is not null set @LoadStartDate = @StartDate;
        if @EndDate is not null set @LoadEndDate = @EndDate;
       
        set @msg = 'Date range determined : ' + convert(nvarchar, @LoadStartDate, 104) + ' -> '
                                              + convert(nvarchar, @LoadEndDate, 104);
        exec log_info @@PROCID, @msg;
   
        BEGIN TRANSACTION
        -- delete data range to load in target measure
        if @ClearMeasure = 1 -- delete entire measure,
          exec delete_measure @MeasureId, null, null;
        else
          exec delete_measure @MeasureId, @LoadStartDate, @LoadEndDate
       
        set @ErrorCode = @@ERROR
       
        if @ErrorCode = 0
        begin
          -- copy valid records
         set @InsertStatement =
          N'INSERT INTO ' + @FactTable + ' (' + @FieldList + ', Quantity) ' +
                'SELECT ' + @FieldList + ', Quantity ' +
                  'FROM stg.V_BatchData ' +
                 'WHERE BatchId = @BatchId '+
                   'AND DeducedError = 0 ' +
                   'AND Date BETWEEN @StartDate AND @EndDate;';
       
          exec sp_executesql @InsertStatement,
                             N'@BatchId int, @StartDate datetime, @EndDate datetime',
                             @BatchId, @LoadStartDate, @LoadEndDate;
   
          set @ErrorCode = @@ERROR
          exec log_info @@PROCID, 'Copy performed';
        end
       
        IF @ErrorCode = 0
          COMMIT
        ELSE
          ROLLBACK
       
        -- update the erronerous records
        update stg.V_Batchdata
           set ErrorCode = DeducedError
         where BatchId = @BatchId;
       
        exec log_info @@PROCID, 'Update the errorcode for the batchdata';
       
        -- update batch information
        set @StatusCode = dbo.get_error_status();
        set @ErrorRecordCount = dbo.get_batch_error_count(@BatchId);
        set @TotalRecordCount = dbo.get_batch_total_count(@BatchId);
        if @ErrorRecordCount = 0 set @StatusCode = dbo.get_success_status();
        UPDATE stg.BatchHeader
           SET ErrorRecordCount = @ErrorRecordCount,
               TotalRecordCount = @TotalRecordCount,
               StatusCode = @StatusCode,
               LastRunEndDateTime = GETDATE()
         WHERE BatchId = @BatchId;
          
        exec log_info @@PROCID, 'Batch Header updated';
       
      END
      ELSE
      BEGIN
        set @msg = 'Batch should have status ''' + dbo.get_ready_to_load_Status() +
                   ''' (Ready to Load). Status ''' + @StatusCode + ''' was found.';
        exec log_error @@PROCID, @msg;
      END
  END     
  ELSE
  BEGIN
    set @msg = 'No batchheader record found with BatchId=' + CAST(@BatchId as varchar);
    exec log_error @@PROCID, @msg;
  END
 
  set @msg = 'Batch processing ended for BatchId ' + cast(@BatchId as varchar);
  exec log_info @@PROCID, @msg;
  
  END

GO





-- SP12. [dbo].[process_batch_all]









-- Description:    Import all unprocessed batches
-- =============================================
CREATE PROCEDURE [dbo].[process_batch_all]
AS
DECLARE @BatchId int,
        @ErrorCode int;
DECLARE BatchCursor CURSOR FOR
          SELECT BatchId
            FROM stg.BatchHeader 
        ORDER BY CreateDateTime;
BEGIN
  SET NOCOUNT ON;
 
  exec log_info @@PROCID, 'Started';

  OPEN BatchCursor;
  FETCH NEXT FROM BatchCursor into @BatchId;
  WHILE @@FETCH_STATUS = 0
   BEGIN
      BEGIN TRY
        exec dbo.[process_batch] @BatchId;
      END TRY
      BEGIN CATCH
        Declare @msg varchar(1000);
        select @msg = ERROR_MESSAGE();
        exec log_error @@PROCID, @msg;
      END CATCH 
      FETCH NEXT FROM BatchCursor into @BatchId;
   END;
  CLOSE BatchCursor;
  DEALLOCATE BatchCursor;
 
  exec log_info @@PROCID, 'Ended';

END

GO





-- SP13. [dbo].[process_dimension]



-- Description:    Process the dimension data from a batch in the staging table into the forecast database
-- =============================================
CREATE PROCEDURE [dbo].[process_dimension]
    @DimensionBatchId int
AS
DECLARE @StatusCode varchar(50),
        @TotalRecordCount int,
        @DimensionId int,
        @DimensionCode nvarchar(50),
        @DimensionName nvarchar(50),
        @ValidFromDate datetime,
        @LevelCount int,
        @BatchDataLevelCount int,
        @DimensionLevelCount int,
        @DataRecordCount int,
        @msg nvarchar(1000),
        @ErrorCode int;
BEGIN
  SET NOCOUNT ON; -- do not return the rowcount as result of store procedure execution
 
  -- logging
  set @msg = 'Batch processing started for DimensionBatchId ' + cast(@DimensionBatchId as varchar);
  exec log_info @@PROCID, @msg;
 
  -- initialise batch variables
  select @StatusCode = StatusCode, 
         @DimensionName = DimensionName,
         @DimensionId = dbo.get_dimension_id(DimensionName),
         @ValidFromDate = ValidFromDate,
         @TotalRecordCount = TotalRecordCount,
         @LevelCount = LevelCount          
    from stg.DimensionBatchHeader
   where DimensionBatchId = @DimensionBatchId;
 
  IF @@ROWCOUNT = 1 -- Batch exists
  BEGIN
    -- indicate start
    update stg.DimensionBatchHeader
       set LastRunStartDateTime = GETDATE(),
           LastRunUser = USER,
           RunCount = RunCount + 1
     where DimensionBatchId = @DimensionBatchId;
   
    select @BatchDataLevelCount = dbo.get_dimension_batch_level_count(@DimensionBatchId),
           @DimensionLevelCount = dbo.get_dimension_level_count(@DimensionId),
           @DataRecordCount = dbo.get_dimension_batch_total_record_count(@DimensionBatchId);
 
    -- validate input
    if  @DataRecordCount = @TotalRecordCount -- same amount of records in data records as in header
    begin
      exec log_info @@PROCID, 'Amount of Data records as indicated in header';
      if  @BatchDataLevelCount = @LevelCount -- same amount of levels in data records as in header
      begin
        exec log_info @@PROCID, 'Amount of levels in batch as indicated in header';
        if  @DimensionLevelCount = @LevelCount  -- same amount of levels in Batch as in Dimension
        begin
          exec log_info @@PROCID, 'Amount of levels in dimension as indicated in header';
          if @StatusCode = dbo.get_ready_to_load_status()
          begin
            exec log_info @@PROCID, 'Ready for processing status';
            if  @ValidFromDate < GETDATE() -- already allowed to load new dimension information
            begin
               exec log_info @@PROCID, 'Dimension data became actual';
               exec dbo.merge_batch_to_dimension @DimensionBatchId, @DimensionId    
            end
            else
            begin
              set @msg = 'ValidfromDate ' + convert(varchar,@ValidFromDate,104) + ' is later than now. ' +
                         'Dimension should not be loaded yet.';
              exec log_error @@PROCID, @msg;
            end
          end   
          else
          begin
            set @msg = 'Batch has invalid StatusCode';
            exec log_error @@PROCID, @msg;
          end
        end
        else 
        begin
          set @msg = 'Amount of levels in Dimension configuration (=' + cast(@DimensionLevelCount as varchar) + ') ' +
                     'different from indicated amount of levels in DimensionBatchHeader (=' + cast(@LevelCount as varchar) + ' levels) ' +
                     ' for ' + @DimensionName + ' dimension.';
          exec log_error @@PROCID, @msg;
        end
      end
      else
      begin
        set @msg = 'Amount of levels in DimensionBatchData (=' + cast(@BatchDataLevelCount as varchar) + ' levels) ' +
                   'different from indicated amount of levels in DimensionBatchHeader (=' + cast(@LevelCount as varchar) + ' levels).';
        exec log_error @@PROCID, @msg;
      end
    end
    else
    begin
      set @msg = 'Amount of records in batch (=' + cast(@DataRecordCount as varchar) + ') ' +
                 'different from indicated amount in DimensionBatchHeader (=' + cast(@TotalRecordCount as varchar) + ').';
      exec log_error @@PROCID, @msg;
    end
    -- indicate end
    update stg.DimensionBatchHeader
       set LastRunEndDateTime = GETDATE(),
           StatusCode = dbo.get_success_status()
     where DimensionBatchId = @DimensionBatchId;   
  end     
  else
  begin
    set @msg = 'No batchheader record found with DimensionBatchId=' + CAST(@DimensionBatchId as varchar);
    exec log_error @@PROCID, @msg;
  END
 
  set @msg = 'Batch processing ended for DimensionBatchId ' + cast(@DimensionBatchId as varchar);
  exec log_info @@PROCID, @msg;  
END

GO







-- SP15. [dbo].[process_dimension_all]








-- Description:    Import all unprocessed dimensions
-- =============================================
CREATE PROCEDURE [dbo].[process_dimension_all]
AS
DECLARE @DimensionsBatchId int,
        @ErrorCode int;
DECLARE DimensionBatchCursor CURSOR FOR
          SELECT DimensionBatchId
            FROM stg.DimensionBatchHeader 
        ORDER BY CreateDateTime;
BEGIN
  SET NOCOUNT ON;
 
  exec log_info @@PROCID, 'Started';

  OPEN DimensionBatchCursor;
  FETCH NEXT FROM DimensionBatchCursor into @DimensionsBatchId;
  WHILE @@FETCH_STATUS = 0
   BEGIN
      BEGIN TRY
        exec dbo.[process_dimension] @DimensionsBatchId;
      END TRY
      BEGIN CATCH
        Declare @msg varchar(1000);
        select @msg = ERROR_MESSAGE();
        exec log_error @@PROCID, @msg;
      END CATCH 
      FETCH NEXT FROM DimensionBatchCursor into @DimensionsBatchId;
   END;
  CLOSE DimensionBatchCursor;
  DEALLOCATE DimensionBatchCursor;
 
  exec log_info @@PROCID, 'Ended';

END

GO




ETL Flow



1. Create Batch Header:
             exec dbo.create_batch_header @MeasureCode = ?, @ExternalSystemCode = ?, @TotalRecordCount = ?,    @StartDate = ?, @EndDate = ?

2. Transfer data records from source to destination.


3. Finalizing Batch Header
                        exec dbo.finalise_batch_header @BatchId = ?, @TotalRecordCount = ?

4. Log Download Error
                       exec dbo.log_ssis_error @PackageName = ?, @SourceName = '', @TaskName = ?, @Message = 'BI download failed';


5. Load Data from staging to associated measure.

                        exec dbo.process_batch @BatchId = ?

6. Log Upload Error
                        exec dbo.log_ssis_error @PackageName = ?, @SourceName = '', @TaskName = ?, @Message = 'FCST upload failed';

No comments:

Post a Comment