Tuesday, January 9, 2007

Faster Filtered SQL Server Imports

If you have ever tried to load large amounts of data using SQL inserts, you know how slow it method is. A million records loaded in this way could take over 40 minutes, which is why the various database providers have there own bulk-loading tools to speed up the task (SQL Server DTS, for example). If, however, you use Perl for all your data parsing, and would also like to perform database imports directly from Perl, how do you get around the speed issue?

Using Win32::OLE to tap into the DTS object model allows you to combine Perl's file handling and data parsing capabilities with the bulk loading speed of DTS to load data at the rate of over a million records per minute.

Tip: When loading large datasets into SQL Server, set the recovery model to BULK_LOGGED, as the FULL recovery models build up massive log files for large imports.

Running the Test Script

First, download the test script dtsBulkLoad.pl into your scripts directory, the Perl module dtsBulkLoad.pm into your lib/ directory (plus the data file customers.txt into C:\temp\ if you want to try the test example).

Then check that you have the necessary dependencies installed and configured on your PC:

1. A Win32 version of Perl, such as ActiveState v5.8.6.
2. The Win32::OLE, DBI, and DBD::ODBC modules from CPAN.
3. A System DSN for your database. For the test example, create one called NWIND for the Northwind database that comes with SQL Server.

Talk to your local IT support if you are unsure about any of the above.

Import data into the Customers table. If, for example, your SQL Server database runs on a server called Server01, use the command:-

C:\> dtsBulkLoad.pl Server01 NWIND Customers c:\temp\customers.txt

By calling this script with your own server, database, table, and data-file options, you can bulk load data into your SQL Server database.

How does it work?

The test script dtsBulkLoad.pl walks through the process, while the module, dtsBulkLoad.pm, provides the subroutines that do the main work.

#!/usr/local/bin/perl
#
# Program dtsBulkLoad.pl for using Win32::OLE to create and run a
# dts package for importing a text file into SQL Server

use strict;
use dtsBulkLoad;
use Win32::OLE::Variant;

my ($server, $database, $table, $dataFile);

# Check for 4 arguments; server, database, table and data-file
if (@ARGV == 4){
($server, $database, $table, $dataFile) = @ARGV;
} else {
die "Call with 4 arguments: server, database, table and data-file\n";
}

# Define variables
my $dataTable= "[$database].[dbo].[$table]";
my $sourceId = 1;
my $destId = 2;
my $rowDelim = "\r\n";
my $colDelim = ",";
my $textQual = "\"";

dtsBulkLoad.pl starts by loading the dtsBulkLoad.pm module, checking for four command line arguments; server, database, table, and data-file, then defining and setting some variables. It then makes a series of calls to dtsBulkLoad.pm to perform the tasks:

1. Create a new Win32::OLE 'DTS.Package' object.

my $Package = dtsBulkLoad::getPackage("DataLoader");

This calls the getPackage() subroutine to initialise a Package object called DataLoader.

sub getPackage {
my ($name) = @_;
my $Package = Win32::OLE->new( 'DTS.Package' );
$Package->{Name} = $name;
$Package->{FailOnError} = 0;
return $Package;
}

2. Create a source connection object for the text file.

dtsBulkLoad::connectToFile($Package,"Source",$sourceId,$dataFile,$rowDelim,
$colDelim,$textQual);

This calls the connectToFile() subroutine to create a DTSFlatFile source connection object and configure various parameters such as Row Delimiter, Column Delimiter, and Text Qualifier:

sub connectToFile {
my ($Package,$name,$id,$dataFile, $rowDelim, $colDelim, $textQual) = @_;
my $Conn = $Package->Connections->New("DTSFlatFile");
$Conn->{Name} = $name;
$Conn->{ID} = $id;
$Conn->{DataSource} = $dataFile;
$Conn->{Reusable} = 1;
$Conn->{ConnectImmediate} = 0;
$Conn->{ConnectionTimeout} = 600;
$Conn->{UseTrustedConnection} = 0;
$Conn->{UseDSL} = 0;
$Conn->ConnectionProperties->{"Data Source"}->{Value} = $dataFile;
$Conn->ConnectionProperties->{"File Format"}->{Value} = 1;
$Conn->ConnectionProperties->{"Row Delimiter"}->{Value} = $rowDelim || "\r\n";
$Conn->ConnectionProperties->{"Column Delimiter"}->{Value} = $colDelim || ",";
$Conn->ConnectionProperties->{"Text Qualifier"}->{Value} = $textQual || "\"";
$Package->Connections->Add($Conn);
}

3. Create a destination connection object for the database.

dtsBulkLoad::connectToDb($Package,"Destination",$destId,$server,$database);

This calls the connectToDb() subroutine to create a SQLOLEDB destination connection object and configure various parameters such as Data Source, Initial Catalog, and Integrated Security:

sub connectToDb {
my ($Package,$name,$id,$server,$database) = @_;
my $Conn = $Package->Connections->New("SQLOLEDB");
$Conn ->{"Name"} = $name;
$Conn ->{"ID"} = $id;
$Conn ->{"DataSource"} = $server;
$Conn ->{"Catalog"} = $database;
$Conn->{"UseTrustedConnection"} = 1;
$Conn->ConnectionProperties->{"Data Source"}->{Value} = $server;
$Conn->ConnectionProperties->{"Initial Catalog"}->{Value} = $database;
$Conn->ConnectionProperties->{"Integrated Security"}->{Value} = "SSPI";
$Conn->ConnectionProperties->{"Persist Security Info"}->{Value} = 1;
$Package->Connections->Add($Conn);
}

4.

Add a step object to the package.

dtsBulkLoad::addStep($Package,"Step1","CopyColumns");

This calls the addStep() subroutine to create a step called CopyColumns:

sub addStep {
my ($Package,$stepName,$taskName) = @_;
my $Step = $Package->Steps->New();
$Step->{Name} = $stepName;
$Step ->{TaskName} = $taskName;
$Step ->{ExecuteInMainThread} = 1;
$Package->Steps->Add($Step);
}

5.

Get the column definitions for the table.

(my $rDefs = dtsBulkLoad::getColumnName($database, $table)) || die "No def\n";

This calls the getColumnName() subroutine which uses the DBI module to return a reference to an array of column definitions for the database table. This is use when configuring the Transformation object in the next stage.

sub getColumnName {
my ($database, $tableName) = @_;
my @columnDefs = ();
my $dbh = DBI->connect("DBI:ODBC:$database");
my $sql = "select * from $tableName";
my $sth = $dbh->prepare($sql) or return 0;
$sth->execute();
for (my $count = 0; $count < @{$sth->{NAME}}; $count++) {
push @columnDefs, [$sth->{NAME_lc}[$count], $sth->{TYPE}[$count]];
}
if (@columnDefs > 0) {
return \@columnDefs;
} else {
return 0;
}
}

6. Add a task object and configure CustomTask and Transformation objects.

dtsBulkLoad::addTask($Package,"CopyColumns",$dataFile,$sourceId,$dataTable,$destId, $rDefs);

This calls the addTask() subroutine which in turn calls the addColumns() subroutine. addTask() creates a new DTSDataPumpTask called CopyColumns and a CustomTask of the same name. It configures the source and destination connections in the CustomTask, and then creates a Transform object. It then calls the addColumns() subroutine, passing the Transform object and column definitions from the previous step. This simplifies the trickiest part of the procedure, by using the column definitions to correctly configure column name and data type information in the Transform object.

sub addTask {
my ($Package,$taskName,$SourceObj,$sourceId,$DestObj,$destId, $rColDefs) = @_;
my $Task = $Package->Tasks->New("DTSDataPumpTask");
$Task ->{Name} = $taskName;
my $CustomTask = $Task->{CustomTask};
$CustomTask ->{Name} = $taskName;
$CustomTask ->{SourceConnectionID} = $sourceId;
$CustomTask ->{SourceObjectName} = $SourceObj;
$CustomTask ->{DestinationConnectionID} = $destId;
$CustomTask ->{DestinationObjectName} = $DestObj;
# Create a transformation object, with column details and add to custom task
my $Transform = $CustomTask ->Transformations->New("DTS.DataPumpTransformCopy");
$Transform ->{Name} = "TransformData";
$Transform ->{TransformFlags} = 20; # AllowNullChange and AllowStringTruncation
addColumns($Transform, $rColDefs); # Add columns using information in $rColDefs
$CustomTask->Transformations->Add($Transform);
# Add task to package
$Package->Tasks->Add($Task);
}

sub addColumns {
my ($Transform, $rColDefs) = @_;
my $colNum = 1;

# For each entry from the column definitions array
foreach my $def (@{$rColDefs}) {

# Add source column info for the text file
my $colName = sprintf("Col%03d", $colNum);
my $SourceColumn = $Transform->SourceColumns->New($colName , 1);
$SourceColumn->{Name} = $colName;
$SourceColumn->{DataType} = 129;
$Transform->SourceColumns->Add($SourceColumn);

# Add destination column info for the database
my $dataType = ($def->[1] == 4) ? 3: 129;
my $DestColumn = $Transform->DestinationColumns->New($def->[0] , 1);
$DestColumn->{DataType} = $dataType;
$DestColumn->{Nullable} = 1;
$Transform->DestinationColumns->Add($DestColumn);

# Increment column number
$colNum++;
}
}

The script finally calls $Package->Execute() to run the package and performs some basic error checking:-

#Execute the package
$Package->Execute();

#Check for errors
my ($ErrMsg, $Err) = StepErrors();

if ($Err == -1){
print "$ErrMsg\n";
} else {
print "Package Completed Successfully\n";
}
$Package=undef;
exit $Err;

#Loop through the steps and look for errors based on the ExecutionResult
sub StepErrors {
my $ErrorCode=Variant(VT_I4|VT_BYREF, "-1");
my $Source=Variant(VT_BSTR|VT_BYREF, "");
my $Description=Variant(VT_BSTR|VT_BYREF, "");
foreach my $Steps (in {$Package->{Steps}}) {
if ($Steps->{ExecutionStatus} == 4) {
if ($Steps->{ExecutionResult} == 1) {
$Err = -1;
$Steps->GetExecutionErrorInfo($ErrorCode,$Source,$Description);
$ErrMsg = "$ErrMsg \n Step $Steps->{Name} failed, error:\n $ErrorCode \n $Description \n";
}
}
}
return $ErrMsg, $Err;
}

With an understanding of how the test script works, you can modify your own Perl scripts to use dtsBulkLoad.pm for loading any format of text file into any SQL Server data table at bulk-load speed.