C# fastest way to insert data into SQL database -


i receiving (streamed) data external source (over lightstreamer) c# application. c# application receives data listener. data listener stored in queue (concurrentqueue). queue getting cleaned every 0.5 seconds trydequeue datatable. datatable copy sql database using sqlbulkcopy. sql database processes newly data arrived staging table final table. receive around 300'000 rows per day (can increae within next weeks strongly) , goal stay under 1 second time receive data until available in final sql table. maximum rows per seconds have process around 50 rows.

unfortunately, since receiving more , more data, logic getting slower in performance (still far under 1 second, wanna keep improving). main bottleneck (so far) processing of staging data (on sql database) final table. in order improve performance, switch staging table memory-optimized table. final table memory-optimized table work fine sure.

my questions:

  1. is there way use sqlbulkcopy (out of c#) memory-optimized tables? (as far know there no way yet)
  2. any suggestions fastest way write received data c# application memory-optimized staging table?

edit (with solution):

after comments/answers , performance evaluations decided give bulk insert , use sqlcommand handover ienumerable data table-valued parameter native compiled stored procedure store data directly in memory-optimized final table (as copy "staging" table serves archive). performance increased (even did not consider parallelizing inserts yet (will @ later stage)).

here part of code:

memory-optimized user-defined table type (to handover data c# sql (stored procedure):

create type [staging].[cityindexintradayliveprices] table(     [cityindexinstrumentid] [int] not null,     [cityindextimestamp] [bigint] not null,     [bidprice] [numeric](18, 8) not null,     [askprice] [numeric](18, 8) not null,     index [indexcityindexintradayliveprices] nonclustered  (     [cityindexinstrumentid] asc,     [cityindextimestamp] asc,     [bidprice] asc,     [askprice] asc ) ) ( memory_optimized = on ) 

native compiled stored procedures insert data final table , staging (which serves archive in case):

create procedure [staging].[spprocesscityindexintradaylivepricesstaging] (     @processingid int,     @cityindexintradayliveprices staging.cityindexintradayliveprices readonly ) native_compilation, schemabinding, execute owner  begin atomic (transaction isolation level=snapshot, language=n'us_english')       -- store prices      insert timeseries.cityindexintradayliveprices     (         objectid,          perdatetime,          bidprice,          askprice,          processingid     )     select objects.objectid,     cityindextimestamp,     cityindexintradaylivepricesstaging.bidprice,      cityindexintradaylivepricesstaging.askprice,     @processingid     @cityindexintradayliveprices cityindexintradaylivepricesstaging,     objects.objects     objects.cityindexinstrumentid = cityindexintradaylivepricesstaging.cityindexinstrumentid       -- store data in staging table      insert staging.cityindexintradaylivepricesstaging     (         importprocessingid,         cityindexinstrumentid,         cityindextimestamp,         bidprice,         askprice     )     select @processingid,     cityindexinstrumentid,     cityindextimestamp,     bidprice,     askprice     @cityindexintradayliveprices   end 

ienumerable filled queue:

private static ienumerable<sqldatarecord> createsqldatarecords() {       // set columns (the sequence important sequence accordingly sequence of columns in table-value parameter)      sqlmetadata metadatacol1;     sqlmetadata metadatacol2;     sqlmetadata metadatacol3;     sqlmetadata metadatacol4;      metadatacol1 = new sqlmetadata("cityindexinstrumentid", sqldbtype.int);     metadatacol2 = new sqlmetadata("cityindextimestamp", sqldbtype.bigint);     metadatacol3 = new sqlmetadata("bidprice", sqldbtype.decimal, 18, 8); // precision 18, 8 scale     metadatacol4 = new sqlmetadata("askprice", sqldbtype.decimal, 18, 8); // precision 18, 8 scale       // define sql data record columns      sqldatarecord datarecord = new sqldatarecord(new sqlmetadata[] { metadatacol1, metadatacol2, metadatacol3, metadatacol4 });       // remove each price row queue , add sql data record      lightstreamerapi.pricedto pricedto = new lightstreamerapi.pricedto();      while (intradayquotesqueue.trydequeue(out pricedto))     {          datarecord.setint32(0, pricedto.marketid); // city index market id         datarecord.setint64(1, convert.toint64((pricedto.tickdate.replace(@"\/date(", "")).replace(@")\/", ""))); // @ used avoid problem / escape sequence)         datarecord.setdecimal(2, pricedto.bid); // bid price         datarecord.setdecimal(3, pricedto.offer); // ask price          yield return datarecord;      }   } 

handling data every 0.5 seconds:

public static void childthreadintradayquoteshandler(int32 cityindexinterfaceprocessingid) {       try     {          // open new sql connection          using (sqlconnection timeseriesdatabasesqlconnection = new sqlconnection("data source=xxx;initial catalog=xxx;integrated security=sspi;multipleactiveresultsets=false"))         {               // open connection              timeseriesdatabasesqlconnection.open();               // endless loop keep thread alive              while(true)             {                   // ensure queue has rows process (otherwise no need continue)                  if(intradayquotesqueue.count > 0)                  {                       // define stored procedure sql command                      sqlcommand insertcommand = new sqlcommand("staging.spprocesscityindexintradaylivepricesstaging", timeseriesdatabasesqlconnection);                       // set command type stored procedure                      insertcommand.commandtype = commandtype.storedprocedure;                       // define sql parameters (table-value parameter gets data createsqldatarecords())                      sqlparameter parametercityindexintradayliveprices = insertcommand.parameters.addwithvalue("@cityindexintradayliveprices", createsqldatarecords()); // table-valued parameter                     sqlparameter parameterprocessingid = insertcommand.parameters.addwithvalue("@processingid", cityindexinterfaceprocessingid); // processing id parameter                       // set sql db type structured table-value paramter (structured = special data type specifying structured data contained in table-valued parameters)                      parametercityindexintradayliveprices.sqldbtype = sqldbtype.structured;                       // execute stored procedure                      insertcommand.executenonquery();                   }                   // wait 0.5 seconds                  thread.sleep(500);               }          }      }     catch (exception e)     {          // handle error (standard error messages , update processing)          threaderrorhandling(cityindexinterfaceprocessingid, "childthreadintradayquoteshandler (handler stopped now)", e);      };   } 

use sql server 2016 (it's not rtm yet, it's better 2014 when comes memory-optimized tables). use either memory-optimized table variable or blast whole lot of native stored procedure calls in transaction, each doing 1 insert, depending on what's faster in scenario (this varies). few things watch out for:

  • doing multiple inserts in 1 transaction vital save on network roundtrips. while in-memory operations fast, sql server still needs confirm every operation.
  • depending on how you're producing data, may find parallelizing inserts can speed things (don't overdo it; you'll hit saturation point). don't try clever here; leverage async/await and/or parallel.foreach.
  • if you're passing table-valued parameter, easiest way of doing pass datatable parameter value, not efficient way of doing -- passing ienumerable<sqldatarecord>. can use iterator method generate values, constant amount of memory allocated.

you'll have experiment bit find optimal way of passing through data; depends lot on size of data , how you're getting it.


Comments

Popular posts from this blog

java - nested exception is org.hibernate.exception.SQLGrammarException: could not extract ResultSet Hibernate+SpringMVC -

sql - Postgresql tables exists, but getting "relation does not exist" when querying -

asp.net mvc - breakpoint on javascript in CSHTML? -