-Поиск по дневнику

Поиск сообщений в rss_rss_hh_new

 -Подписка по e-mail

 

 -Статистика

Статистика LiveInternet.ru: показано количество хитов и посетителей
Создан: 17.03.2011
Записей:
Комментариев:
Написано: 51


SQL Server Integration Services (SSIS) для начинающих – часть 3

Среда, 14 Июня 2017 г. 16:01 + в цитатник

-> Часть 1
-> Часть 2

В этой части я расскажу о работе с параметрами и переменными внутри SSIS-пакета. Узнаем, как можно задавать и отслеживать значения переменных во время выполнения пакета.

Также рассмотрим вызов одного пакета из другого при помощи «Execute Package Task» и некоторые дополнительные компоненты и решения.

Здесь тоже будет много картинок.

Продолжим знакомство с SSIS


Создадим в трех демонстрационных базах новую таблицу ProductResidues, которая будет содержать информацию об остатках на каждый день:
USE DemoSSIS_SourceA
GO

CREATE TABLE ProductResidues(
  ResidueDate date NOT NULL,
  ProductID int NOT NULL,
  ResidueAmount decimal(10,2) NOT NULL,
CONSTRAINT PK_ProductResidues PRIMARY KEY(ResidueDate,ProductID),
CONSTRAINT FK_ProductResidues_ProductID FOREIGN KEY(ProductID) REFERENCES Products(ID)
)
GO

USE DemoSSIS_SourceB
GO

CREATE TABLE ProductResidues(
  ResidueDate date NOT NULL,
  ProductID int NOT NULL,
  ResidueAmount decimal(10,2) NOT NULL,
CONSTRAINT PK_ProductResidues PRIMARY KEY(ResidueDate,ProductID),
CONSTRAINT FK_ProductResidues_ProductID FOREIGN KEY(ProductID) REFERENCES Products(ID)
)
GO

USE DemoSSIS_Target
GO

CREATE TABLE ProductResidues(
  ResidueDate date NOT NULL,
  ProductID int NOT NULL,
  ResidueAmount decimal(10,2) NOT NULL,
CONSTRAINT PK_ProductResidues PRIMARY KEY(ResidueDate,ProductID),
CONSTRAINT FK_ProductResidues_ProductID FOREIGN KEY(ProductID) REFERENCES Products(ID)
)
GO

И наполним таблицы в источниках тестовыми данными, поочередно выполнив на базах DemoSSIS_SourceA и DemoSSIS_SourceB следующий скрипт:
USE DemoSSIS_SourceA
--USE DemoSSIS_SourceB
GO

DECLARE @MinDate date=DATEADD(MONTH,-2,GETDATE())
DECLARE @MaxDate date=GETDATE()

;WITH dayCTE AS(
  SELECT CAST(@MinDate AS date) ResidueDate,10000 ResidueAmount

  UNION ALL

  SELECT DATEADD(DAY,1,ResidueDate),ResidueAmount-1
  FROM dayCTE
  WHERE ResidueDate<@MaxDate
)
INSERT ProductResidues(ResidueDate,ProductID,ResidueAmount)
SELECT
  d.ResidueDate,
  p.ID,
  d.ResidueAmount
FROM dayCTE d
CROSS JOIN Products p
OPTION(MAXRECURSION 0)

Допустим, что в таблице ProductResidues будет очень много строк, и чтобы каждый раз не перезагружать всю информацию и упростить процедуру интеграции, логику загрузки в таблицу ProductResidues в базе DemoSSIS_Target реализуем следующую:
  1. Если в принимающей БД еще нет записей, то загрузим все данные;
  2. Если в принимающей БД есть данные, то будем удалять данные за неделю (последние 7 дней) от последней загруженной даты и загружать данные из источника начиная от этой даты снова.

Неудобство интеграции таблиц типа ProductResidues в том, что в ней нет полей, по которым можно было бы однозначно определить, когда появилась запись, в ней нет никакого идентификатора типа ID, нет ни поля типа UpdatedOn, которое содержало бы дату/время последнего обновления записи (т.е. зацепиться особо не за что), иначе бы мы, например, могли вычислить для каждого источника, по данным базы DemoSSIS_Target, последний ID или последнюю UpdatedOn и уже загружать данные из источников начиная от этих стартовых значений. К тому же не всегда есть возможность внести изменения в структуру источников, подстроив их под себя, т.к. это могут быть вообще чужие базы, к которым у нас нет полного доступа.

В нашем же случае еще допустим, что пользователи могут менять данные задним числом и могут вообще удалить некоторые ранее загруженные в базу DemoSSIS_Target строки из источников. Поэтому здесь обновление делается как-бы внахлёст, данные последней недели полностью перезаписываются. Здесь неделя берется условно, об этом минимальном сроке, например, мы могли бы условиться с заказчиком (он мог подтвердить, что данные обычно меняются максимум в течении недели). Конечно это не самый надежный способ, и иногда могут возникнуть расхождения, например, в том случае, когда пользователь поменял данные месячной давности и здесь стоит предусмотреть возможность перезагрузки данных начиная с более ранней даты, мы сделаем это при помощи параметра, в котором будем указывать нужное нам количество дней назад.

Создадим новый SSIS-пакет и назовем его «LoadResidues.dtsx».

При помощи контекстного меню, отобразим область ввода переменных пакета (это можно сделать также при помощи меню «SSIS -> Variables»):


В данном пакете, нажав на кнопку «Add Variable», создадим переменную LoadFromDate типа DateTime:

По умолчанию переменной присваивается текущее значение даты/времени, т.к. мы будем переопределять значение внутри пакета, нам это не важно.
Так же стоит обратить внимание на Expression – если прописать в этом поле выражение, то переменная будет работать как формула и мы не сможем изменять ее значение при помощи присваивания. При каждом обращении к такой переменной ее значение будет рассчитываться согласно указанному выражению.

Значения переменных можно задавать при помощи компонента «Expression Task». Давайте рассмотрим, как это делается. Создадим элемент «Expression Task»:


Двойным щелчком откроем редактор данного элемента:

Пропишем следующее выражение:
@[User::LoadFromDate] = (DT_DATE) (DT_DBDATE) GETDATE()

Здесь так же была применена двойная конвертация типа, чтобы избавиться от составляющей времени и оставить только дату.

Давайте так же посмотрим как можно отследить значение переменной во время выполнения пакета.
Создадим точку останова на элементе «Expression Task»:


Укажем, что точка должна срабатывать по окончанию выполнения данного блока:


Запустим пакет на выполнение (F5) и после остановке в нашей точке, перейдем на вкладку «Locals»:


Раскроем список Variables и найдем в нем свою переменную:


Для интереса можем поменять выражение элементе «Expression Task» на следующее:
@[User::LoadFromDate] = (DT_DATE) (DT_DBDATE) DATEADD("DAY", -7, GETDATE())
и также поэкспериментировать:


Точку останова убирается таким же образом каким и была установлена. Либо можно удалить сразу все точки останова, если их было несколько:


Создадим параметр, который будет отвечать за количество дней назад.
Параметр можно создать, как глобальный для всего проекта:


Так и локальный, внутри конкретного пакета:


Параметр может быть обязательный для задания – за это отвечает флаг Required. Если этот флаг установлен, то при создании задачи или при вызове пакета из другого пакета нужно будет определить входящее значение параметра (мы рассмотрим это далее).
В отличие от переменных значение параметров при помощи «Expression Task» менять нельзя.

Сохраним параметры и снова зайдем в редактор «Expression Task»:

Для примера я поменял выражение на следующее:
@[User::LoadFromDate] = (DT_DATE) (DT_DBDATE) DATEADD("DAY", - @[$Package::DateOffset] , GETDATE())

Думаю, на этом суть параметров и переменных ясна, и мы можем продолжить.

После того как мы поигрались с «Expression Task» мы его удалим.

Создадим «Execute SQL Task»:


Настроим его следующим образом:


Пропишем в SQLStatement следующий запрос:
SELECT ISNULL(DATEADD(DAY,-?,MAX(ResidueDate)),'19000101') FromDate
FROM ProductResidues

Т.к. данный запрос возвращает одну строку, установим ResultSet = «Single Row» и ниже на вкладке «Result Set» сохраним результат в значение переменной LoadFromDate.

На вкладке «Parameter Mapping» зададим значения параметров, которые в запросе обозначены знаком вопроса (?):

Параметры нумеруются, начиная с нуля.
Стоит отметить, что если при создании соединения воспользоваться другим видом провайдера, например, «ADO» или «ADO.Net», то вместо вопросов мы сможем использовать именованные параметры типа @ParamName и в качестве «Parameter Name» тоже могли бы указывать @ParamName, а не его номер. Но увы типом соединения с другим провайдером мы сможем воспользоваться не во всех случаях.

Теперь на вкладке «Result Set» укажем в какую переменную нужно записать результат выполнения запроса:


Здесь так же можем продебажить установив у этого компонента точку останова на «Break when the container receives the OnPostExecute event» и запустив пакет на выполнение:

Здесь я для удобства мониторинга значения переменной прописал название переменной в «Watch», чтобы не искать ее в блоке «Locals».

Как видим все верно в переменную LoadFromDate записалась дата «01.01.1900», т.к. строк в таблице ProductResidues на Target еще нет.

Переименую для наглядности «Execute SQL Task» в «Set LoadFromDate».

Создадим еще один элемент «Execute SQL Task» и назовем его «Delete Old Rows»:


Настроим его следующим образом:

SQLStatement содержит следующий запрос:
DELETE ProductResidues
WHERE ResidueDate>=?

И зададим значение параметра на вкладке «Parameters Mapping»:


Все, удаление старых данных за указанный конечный период у нас реализовано.

Теперь сделаем часть отвечающую загрузку свежих данных. Для этого воспользуемся компонентом «Data Flow Task»:


Зайдем в область данного компонента и создадим «Source Assistant»:


Настроим его следующим образом:


Нажав на кнопку «Parameters…» зададим значение параметра:


Для записи новых данных воспользуемся уже знакомым компонентом «Destination Assistant»:


Протянем стрелку от «Source Assistant» и настроим его:






Все, пакет для переноса данных с источника SourceA у нас готов, можем запустить его на выполнение:


Запустим еще раз:


Как видим при повторном запуске удалились и залились только данные за указанный период. Ну вроде все работает так как мы и хотели. На всякий случай сверимся, что данные получателя по количеству равны данным источника:


Дойдя до сюда, я понял, что я допустил ошибку. Кто понял в чем дело, молодец!

Но может это и хорошо, т.к. пример получился не таким перегруженным.

Ошибка в том, что я забыл учесть, что при интеграции данных таблицы Products у нас в Target формируются свои идентификаторы (поле ID с флагом IDENTITY)!

Давайте переделаем, чтобы все было правильно. Ничего страшного повторим, зато лучше запомним.

Забежим чуть вперед и добавим в пакет еще один параметр, который назовем «SourceID»:


Перенастроим «Set LoadFromDate»:


В SQLStatement пропишем новый запрос с учетом SourceID:
SELECT ISNULL(DATEADD(DAY,-?,MAX(res.ResidueDate)),'19000101') FromDate
FROM ProductResidues res
JOIN Products prod ON res.ProductID=prod.ID
WHERE prod.SourceID=?

Настроим второй параметр:


Теперь перенастроим «Delete Old Rows» аналогичным образом, чтобы учитывался SourceID:


В SQLStatement пропишем новый запрос с учетов SourceID:
DELETE res
FROM ProductResidues res
JOIN Products prod ON res.ProductID=prod.ID
WHERE ResidueDate>=?
  AND prod.SourceID=?

Настроим второй параметр:


Теперь зайдем в «Data Flow Task», удалим цепочку и добавим «Derived Column»:


Настроим его следующим образом:


Здесь я намеренно оставил тип «Unicode string», а не сделал преобразование как в первой части. Давайте за одно рассмотрим компонент «Data Conversion»:


Настроим его:


Теперь при помощи Lookup сделаем сопоставление и получим нужные нам идентификаторы продуктов:


Настроим его:






Теперь протянем синюю стрелку от Lookup к «OLE DB Destination»:


Выберем поток «Lookup Match Output»:


Настроим «OLE DB Destination», нужно перестроить Mappings:


Все, сделаем очистку таблицы от неправильно загруженных данных:
TRUNCATE TABLE DemoSSIS_Target.dbo.ProductResidues

И запустим пакет на выполнение:


И еще раз:

Похоже на правду. Можете самостоятельно проверить правильно ли разнеслись идентификаторы продуктов.

Так как структура DemoSSIS_SourceA и DemoSSIS_SourceB одинакова и нам нужно сделать для DemoSSIS_SourceB, то же самое, то мы можем при создании задачи создать два шага для пакета «LoadResidues.dtsx», в первом шаге настроить подключение к базе DemoSSIS_SourceA, а на втором шаге DemoSSIS_SourceB.

Откомпилируем и передеплоим SSIS проект:






Давайте теперь создадим новое задание в SQL Agent:


На вкладке Steps создадим шаг 1 для загрузки продуктов:


Создадим шаг 2 для загрузки остатков из SourceA:


На вкладке Configuration можем увидеть наши параметры:


Здесь от нас требуют ввести SourceID, т.к. мы указали его как Required. Зададим его:

Данные вкладки «Connection Managers» изменять для этого шаге не будем.

Создадим шаг 3 для загрузки остатков из SourceB:


Зададим параметр SourceID:


И изменим данные соединения SourceA таким образом, чтобы оно ссылалось на базу DemoSSIS_SourceB:

В данном случае мне достаточно было изменить ConnectionString и InitialCatalog, теперь они указывают на DemoSSIS_SourceB.

В итоге мы должны получить следующее – три шага:


Запустим эту задачу на выполнение:


Выполним запрос:
USE DemoSSIS_Target
GO

SELECT prod.SourceID,COUNT(*)
FROM ProductResidues res
JOIN Products prod ON res.ProductID=prod.ID
GROUP BY prod.SourceID

И убедимся, что все отработало как надо:


Теперь допустим, что базы DemoSSIS_SourceA и DemoSSIS_SourceB расположены на одном экземпляре SQL Server. Давайте переделаем «OLE DB Source»:



Текст команды:
DECLARE @SourceID char(1)=?

IF(@SourceID='A') USE DemoSSIS_SourceA
ELSE  USE DemoSSIS_SourceB

SELECT
  ResidueDate,
  ProductID,
  ResidueAmount
FROM ProductResidues
WHERE ResidueDate>=?

Зададим параметры:

Теперь наш пакет в зависимости от значения параметра SourceID будет брать данные либо из SourceA, либо из SourceB.

Для теста можете изменить значение параметра SourceB на «B» и запустить проект на выполнение:




Давайте теперь создадим новый пакет «LoadAll.dtsx» и создадим в нем «Execute Package Task» (переименуем его в «Load Products»):


Настроим «Load Products»:


Создадим в новом пакете 2 параметра:


В области «Control Flow» создадим еще 2 компонента «Execute Package Task» которые назовем «Load Resudues A» и «Load Resudues B»:


Настроим их задав у обоих название пакета «LoadResidues.dtsx»:


Зададим обязательный параметр SourceID для «Load Resudues A»:


Зададим обязательный параметр SourceID для «Load Resudues B»:


Обратите внимание, что у стрелок тоже есть свои свойства, например, мы можем поменять свойство Value на Completion, что будет означать, что следующий шаг будет выполнен даже в том случае если на шаге «Load Resudues A» произойдет ошибка:


Все, можем запустить пакет на выполнение:


Думаю, объяснять, что здесь произошло нет смысла.

Иногда параметры для конкретного пакета удобно хранить в вспомогательной таблице и считывать их оттуда в переменные пакета, например, используя для поиска глобальную переменную «System::PackageName». Для демонстрации, давайте переделаем наш пакет таким образом.

Создадим таблицу с параметрами:
USE DemoSSIS_Target
GO

CREATE TABLE IntegrationPackageParams(
  PackageName nvarchar(128) NOT NULL,
  DateOffset int NOT NULL,
CONSTRAINT PK_IntegrationPackageParams PRIMARY KEY(PackageName)
)
GO

INSERT IntegrationPackageParams(PackageName,DateOffset)VALUES
(N'LoadResidues',7)
GO

Удалим параметр DateOffset из пакета «LoadResidues.dtsx»:


Создадим в пакете переменную DateOffset:


В область «Control Flow» добавим еще один элемент «Execute SQL Task» и переименуем его «Load Params»:


Настроим его:


Запрос в SQLStatement пропишем следующий:
SELECT DateOffset
FROM IntegrationPackageParams
WHERE PackageName=?

Настроим параметр запроса используя системную переменную «System::PackageName»:


Осталось сбросить результат выполнения запроса в переменную:


Теперь осталось перенастроить «Set LoadFromDate», чтобы в нем использовалась теперь переменная:




Все, можем тестировать новую версию пакета.

Вот мы и добрались до финиша. Мои поздравления!

Заключение по третьей части


Уважаемые читатели, эта часть будет заключительной.

В данном цикле статей, я постарался продумать примеры таким образом, чтобы сделать их как можно короче и в свою очередь охватить как можно больше полезных и важных деталей.

Думаю, освоив это, далее вы уже без особого труда сможете освоить работу с остальными компонентами SSIS. В данных статьях я рассмотрел только самые важные компоненты (наиболее часто применяемые на моей практике), но зная только это вы уже можно сделать очень многое. По мере надобности изучайте самостоятельно другие компоненты, в первую очередь порекомендовал бы посмотреть следующее:
  • компоненты-контейнеры (Sequence Container, For Loop Container, Foreach Loop Container);
  • «Merge Join», который позволяет сделать операции JOIN, LEFT JOIN, FULL JOIN на стороне SSIS (это требует предварительно отсортировать два набора при помощи «Sort»);
  • «Conditional Split», который позволяет разбить один поток на несколько в зависимости от условий;
  • так же рассмотрите вкладку «Event Handlers», которая позволяет создать дополнительные области для определенного вида события пакета.

Доступного материала на эту тему очень много, используйте MSDN, Youtube и прочие источники.

Я не старался сделать подробный учебник (думаю, все это уже есть), а старался сделать такой материал, который позволит начинающим шаг за шагом создать все с самого нуля и делая все своими руками увидеть всю картину в целом, а после уже имея основу идти дальше самостоятельно. Очень надеюсь, что у меня это получилось и материал окажется полезен именно в таком ключе.

Я очень рад, что мне хватило сил осуществить задуманное и описать все так как я это хотел, даже получилось сделать большее, так как из-за допущенной в этой части ошибки возникли неожиданные повороты сюжета, но так я думаю, стало даже интересней. ;)

Спасибо за внимание! Удачи!

И возможно, до встреч в новых статьях…
Original source: habrahabr.ru (comments, light).

https://habrahabr.ru/post/330840/

Метки:  

 

Добавить комментарий:
Текст комментария: смайлики

Проверка орфографии: (найти ошибки)

Прикрепить картинку:

 Переводить URL в ссылку
 Подписаться на комментарии
 Подписать картинку