Skip to content

[Modification: minor Bug Fix] Multi-container aware code #1

@MPagel

Description

@MPagel

If your SSIS pathway has multiple containers, there's a strong possibility that you will have the same column number used by multiple objects. Thus, it's necessary to add a layer in the hierarchy. My final table has the following fields:

  • DataFlowTask
  • ColumnID
  • ColumnName
  • LineageFileName

I couldn't figure out how to get it to return the object variable such that I could use it later in the SSIS pathway, but by going through a temporary file intermediate, I was able to address this. This code allows for multiple containers that have overlapping column ID numbers.

#region References and Compilation Notes
/* Note: References should be from DTS's SQL_Main version e.g.  
C:\Program Files (x86)\Microsoft Visual Studio\2017\Community\Common7\IDE\PublicAssemblies\SSIS\140\
 *    lest MainPipe as InnerObject of TaskHost be an object of type System.__ComObject and 
 *    uncastable as anything other than null (i.e. if using SQLServer version of dll 
 *    you will get a runtime error when attempting to look inside the object).

 * EXTENSIONS REFERENCES USED 
 * Microsoft.SqlServer.DTSPipelineWrap
 * Microsoft.SqlServer.DTSRuntimeWrap
 * Microsoft.SqlServer.ManagedDTS
 * Microsoft.SqlServer.PipelineHost
 * Microsoft.SqlServer.ScriptTask   // as SQL Server Integrated Services Script Task
 * ------------------------------
 * FRAMEWORK REFERENCES USED
 * System
 * System.Data
 * System.Windows.Forms             // may not be needed
 * System.Xml                       // may not be needed
*/
#endregion

#region Input/Output
/* ReadOnlyVariables:
 *    @[lin::savedFile] = 
 *       @[User::ImportFolder]+
 *       "\\eTable_"+
 *       REPLACE(REPLACE(REPLACE(REPLACE
 *          ((DT_WSTR,20)@[System::ContainerStartTime]
 *                                      ," ","")
 *                               ,":","")
 *                       ,"-","")
 *               ,"/","")+
 *       ".csv"
        
 * ReadWriteVariables:   @[lin::lineageIds]
*/
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using System.Windows.Forms;
using System.IO;
using System.Collections.Generic;
#endregion

namespace ST_f7b6d543345c47af9516aefa125beca9
{
	[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
   public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
   {
      Dictionary<string, object> pipeDict = null;
      IDictionary<Tuple<string, int>, string> lineageIDs = null;
      public void Main()
      {
         pipeDict = new Dictionary<string, object>();
         Dts.Variables["lin::lineageIds"].Value = new Dictionary<Tuple<string, int>, string>();
         lineageIDs = (Dictionary<Tuple<string, int>, string>)Dts.Variables["lin::lineageIds"].Value;
         Executables execs = ((Package)Dts.Variables["lin::lineageIds"].Parent).Executables;
         ReadExecutables(execs);
         string snapTime = (string)Dts.Variables["lin::savedFile"].Value;
         // Note: There is probably a better solution out there bypassing the file intermediate, 
         // but I wasn't able to this Task to work by passing an object back as the value of the lineages.
         using (StreamWriter writetext = new StreamWriter(snapTime, true))
         {
            foreach (var kvp in pipeDict)
               foreach (var kvcol in (Dictionary<int, string>)kvp.Value)
               {
                  Tuple<string, int> ky = new Tuple<string, int>(kvp.Key, kvcol.Key);
                  lineageIDs.Add(ky, kvcol.Value);
                  string outLine = kvp.Key.Replace(",", "_") + "," + kvcol.Key.ToString().Replace(",", "") + "," + kvcol.Value.Replace(",", "_");
                  writetext.WriteLine(outLine);
               }
         }
         Dts.Variables["lin::lineageIds"].Value = lineageIDs;
         Dts.TaskResult = (int)ScriptResults.Success;
      }

      private void ReadExecutables(Executables executables)
      {
         foreach (Executable pkgExecutable in executables)
         {
            if (object.ReferenceEquals(pkgExecutable.GetType(), typeof(Microsoft.SqlServer.Dts.Runtime.TaskHost)))
            {
               TaskHost pkgExecTaskHost = (TaskHost)pkgExecutable;
               string sp = pkgExecTaskHost.CreationName;
               if (sp.StartsWith("SSIS.Pipeline."))
               {
                  pipeDict.Add(pkgExecTaskHost.ID, ProcessDataFlowTask(pkgExecTaskHost));
               }
            }
            else if (object.ReferenceEquals(pkgExecutable.GetType(), typeof(Microsoft.SqlServer.Dts.Runtime.ForEachLoop)))
            {
               ForEachLoop fel = (ForEachLoop)pkgExecutable;
               ReadExecutables(fel.Executables);
            }
         }
      }

      // if returning componentDict from ProcessDataFlowTask, use <int, object> as output type
      private Dictionary<int, string> ProcessDataFlowTask(TaskHost currentDataFlowTask)
      {
         object obj = currentDataFlowTask.InnerObject;
         MainPipe currentDataFlow = ((MainPipe)obj);
         Dictionary<int, string> columnDict = new Dictionary<int, string> { { 0, currentDataFlowTask.Name } };
         if (currentDataFlow is null) return columnDict;
         foreach (IDTSComponentMetaData100 currentComponent in currentDataFlow.ComponentMetaDataCollection)
         {
            // columnDict = new Dictionary<int, string> { { currentComponent.ID, currentComponent.Name } };
            // had 0, cc.Name, but numbering for ID seems to be by pipenum.ID
            columnDict.Add(currentComponent.ID, currentComponent.Name); 
            foreach (IDTSInput100 currentInput in currentComponent.InputCollection)
               foreach (IDTSInputColumn100 currentInputColumn in currentInput.InputColumnCollection)
                     columnDict.Add(currentInputColumn.ID, currentInputColumn.Name);
            foreach (IDTSOutput100 currentOutput in currentComponent.OutputCollection)
               foreach (IDTSOutputColumn100 currentoutputColumn in currentOutput.OutputColumnCollection)
                     columnDict.Add(currentoutputColumn.ID, currentoutputColumn.Name);
         }
         return columnDict;
      }
#region ScriptResults declaration
      enum ScriptResults
      {
         Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
         Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
      };
#endregion
   }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions