Compare Page Revisions
« Older Revision - Back to Page History - Newer Revision »
public abstract class BulkDataCopierBase { private string _sourceOdbcConn; private string _targetAdoConn; //private readonly NLog.Logger _nlog; public BulkDataCopierBase( string sourceOdbcConn, string targetAdoConn ) { _sourceOdbcConn = sourceOdbcConn; _targetAdoConn = targetAdoConn; //_nlog = NLogBuilder.ConfigureNLog("nlog.config").GetCurrentClassLogger(); } public async Task<long> ExecuteAsync() { RowCount = await GetRowCount(); var segments = new RowNumberRange(RowCount) .BreakIntoSegments(GetRowsPerSegment()); var opt = new ParallelOptions { MaxDegreeOfParallelism = GetMaxDegreesOfParallelism() }; var targetTable = GetTargetTable(); await Parallel.ForEachAsync(segments, opt, async (segment, ct) => { var sourceSql = GetSql(segment); await CopySegmentAsync(sourceSql, targetTable); }); return RowCount; } public long RowCount { get; set; } /// <summary> /// This is expected to return the SQL statement to retrieve data for copying, /// based on the row number range. The order of columns is expected to match /// the order of columns for the table returned by the /// <seealso cref="GetTargetTable()"/> method. /// </summary> /// <param name="range"></param> /// <returns></returns> protected abstract string GetSql(RowNumberRange range); /// <summary> /// This is expected to return a SQL statement that returns a single /// row with a single field that will indicate the total number of /// records that will be copied. /// </summary> /// <returns></returns> protected abstract string GetRowCountSql(); protected abstract long GetRowsPerSegment(); protected abstract int GetMaxDegreesOfParallelism(); /// <summary> /// This is expected to return the table in the target database to copy data into. /// The order of columns is expected to match the order of columns returned by the /// <seealso cref="GetSql(RowNumberRange)"/> method. /// </summary> /// <returns></returns> protected abstract string GetTargetTable(); /// <summary> /// Does a SqlBulkCopy of data from the ODBC source to the target SQL Server table. /// </summary> /// <param name="sourceSql">SQL statement against the ODBC source</param> /// <param name="targetTable">SQL Server table name</param> private async Task<long> CopySegmentAsync( string sourceSql, string targetTable ) { try { //_nlog.Info($"Bulk copying {targetTable} started. ====="); long recordCount = 0; var sw = Stopwatch.StartNew(); using (var srcOdbcConn = new OdbcConnection(_sourceOdbcConn)) { await srcOdbcConn.OpenAsync(); var selectCmd = new OdbcCommand(sourceSql, srcOdbcConn); selectCmd.CommandTimeout = 300; // 300 sec = 5 min var reader = await selectCmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess); using (var tgtAdoConn = new SqlConnection(_targetAdoConn)) { await tgtAdoConn.OpenAsync(); using (var bc = new SqlBulkCopy(tgtAdoConn)) { bc.DestinationTableName = targetTable; bc.BulkCopyTimeout = 300; // 300 sec = 5 min await bc.WriteToServerAsync(reader); recordCount = bc.RowsCopied; bc.Close(); } await tgtAdoConn.CloseAsync(); } await reader.CloseAsync(); await srcOdbcConn.CloseAsync(); sw.Stop(); var msg = $@"Loaded {recordCount:#,##0} row(s) into '{targetTable}' in {sw.Elapsed:h\:mm\:ss}"; Debug.Print(msg); //_nlog.Info(msg); } //_nlog.Info($"Bulk copying {targetTable} finished. -----"); return recordCount; } catch (Exception ex) { Debug.Print(ex.Message); Debug.Assert(false); //_nlog.Info($"Error while bulk copying {targetTable}. {ex.Message}"); throw; } } private async Task<long> GetRowCount() { long result = -1; if (Config.Current == null) { return result; } using (var conn = new OdbcConnection(Config.Current.SourceOdbcConnection)) { await conn.OpenAsync(); var sql = GetRowCountSql(); using (var cmd = new OdbcCommand(sql, conn)) { var reader = await cmd.ExecuteReaderAsync(); if (reader.Read()) { result = reader.GetInt64(0); } } } return result; } }
[DebuggerDisplay("{DisplayText}")] public class RowNumberRange { public RowNumberRange() { } public RowNumberRange(long rowCount) { FirstRow = 1; LastRow = rowCount; } public long FirstRow { get; set; } public long LastRow { get; set; } public string DisplayText { get { return $"Row {FirstRow:#,##0} to {LastRow:#,##0}"; } } public string SqlCondition { get { return $"RowNum between {FirstRow} and {LastRow}"; } } public List<RowNumberRange> BreakIntoSegments( long rowsPerSegment ) { var result = new List<RowNumberRange>(); var row1 = FirstRow; long row2; do { row2 = row1 + rowsPerSegment - 1; if (row2 > LastRow) { row2 = LastRow; } result.Add(new RowNumberRange { FirstRow = row1, LastRow = row2 }); row1 = row2 + 1; } while (row2 < LastRow); return result; } }
ScrewTurn Wiki version 3.0.1.400. Some of the icons created by FamFamFam. Except where noted, all contents Copyright © 1999-2024, Patrick Jasinski.