C# fastest way to insert data into SQL database -
i receiving (streamed) data external source (over lightstreamer) c# application. c# application receives data listener. data listener stored in queue (concurrentqueue). queue getting cleaned every 0.5 seconds trydequeue datatable. datatable copy sql database using sqlbulkcopy. sql database processes newly data arrived staging table final table. receive around 300'000 rows per day (can increae within next weeks strongly) , goal stay under 1 second time receive data until available in final sql table. maximum rows per seconds have process around 50 rows.
unfortunately, since receiving more , more data, logic getting slower in performance (still far under 1 second, wanna keep improving). main bottleneck (so far) processing of staging data (on sql database) final table. in order improve performance, switch staging table memory-optimized table. final table memory-optimized table work fine sure.
my questions:
- is there way use sqlbulkcopy (out of c#) memory-optimized tables? (as far know there no way yet)
- any suggestions fastest way write received data c# application memory-optimized staging table?
edit (with solution):
after comments/answers , performance evaluations decided give bulk insert , use sqlcommand handover ienumerable data table-valued parameter native compiled stored procedure store data directly in memory-optimized final table (as copy "staging" table serves archive). performance increased (even did not consider parallelizing inserts yet (will @ later stage)).
here part of code:
memory-optimized user-defined table type (to handover data c# sql (stored procedure):
create type [staging].[cityindexintradayliveprices] table( [cityindexinstrumentid] [int] not null, [cityindextimestamp] [bigint] not null, [bidprice] [numeric](18, 8) not null, [askprice] [numeric](18, 8) not null, index [indexcityindexintradayliveprices] nonclustered ( [cityindexinstrumentid] asc, [cityindextimestamp] asc, [bidprice] asc, [askprice] asc ) ) ( memory_optimized = on )
native compiled stored procedures insert data final table , staging (which serves archive in case):
create procedure [staging].[spprocesscityindexintradaylivepricesstaging] ( @processingid int, @cityindexintradayliveprices staging.cityindexintradayliveprices readonly ) native_compilation, schemabinding, execute owner begin atomic (transaction isolation level=snapshot, language=n'us_english') -- store prices insert timeseries.cityindexintradayliveprices ( objectid, perdatetime, bidprice, askprice, processingid ) select objects.objectid, cityindextimestamp, cityindexintradaylivepricesstaging.bidprice, cityindexintradaylivepricesstaging.askprice, @processingid @cityindexintradayliveprices cityindexintradaylivepricesstaging, objects.objects objects.cityindexinstrumentid = cityindexintradaylivepricesstaging.cityindexinstrumentid -- store data in staging table insert staging.cityindexintradaylivepricesstaging ( importprocessingid, cityindexinstrumentid, cityindextimestamp, bidprice, askprice ) select @processingid, cityindexinstrumentid, cityindextimestamp, bidprice, askprice @cityindexintradayliveprices end
ienumerable filled queue:
private static ienumerable<sqldatarecord> createsqldatarecords() { // set columns (the sequence important sequence accordingly sequence of columns in table-value parameter) sqlmetadata metadatacol1; sqlmetadata metadatacol2; sqlmetadata metadatacol3; sqlmetadata metadatacol4; metadatacol1 = new sqlmetadata("cityindexinstrumentid", sqldbtype.int); metadatacol2 = new sqlmetadata("cityindextimestamp", sqldbtype.bigint); metadatacol3 = new sqlmetadata("bidprice", sqldbtype.decimal, 18, 8); // precision 18, 8 scale metadatacol4 = new sqlmetadata("askprice", sqldbtype.decimal, 18, 8); // precision 18, 8 scale // define sql data record columns sqldatarecord datarecord = new sqldatarecord(new sqlmetadata[] { metadatacol1, metadatacol2, metadatacol3, metadatacol4 }); // remove each price row queue , add sql data record lightstreamerapi.pricedto pricedto = new lightstreamerapi.pricedto(); while (intradayquotesqueue.trydequeue(out pricedto)) { datarecord.setint32(0, pricedto.marketid); // city index market id datarecord.setint64(1, convert.toint64((pricedto.tickdate.replace(@"\/date(", "")).replace(@")\/", ""))); // @ used avoid problem / escape sequence) datarecord.setdecimal(2, pricedto.bid); // bid price datarecord.setdecimal(3, pricedto.offer); // ask price yield return datarecord; } }
handling data every 0.5 seconds:
public static void childthreadintradayquoteshandler(int32 cityindexinterfaceprocessingid) { try { // open new sql connection using (sqlconnection timeseriesdatabasesqlconnection = new sqlconnection("data source=xxx;initial catalog=xxx;integrated security=sspi;multipleactiveresultsets=false")) { // open connection timeseriesdatabasesqlconnection.open(); // endless loop keep thread alive while(true) { // ensure queue has rows process (otherwise no need continue) if(intradayquotesqueue.count > 0) { // define stored procedure sql command sqlcommand insertcommand = new sqlcommand("staging.spprocesscityindexintradaylivepricesstaging", timeseriesdatabasesqlconnection); // set command type stored procedure insertcommand.commandtype = commandtype.storedprocedure; // define sql parameters (table-value parameter gets data createsqldatarecords()) sqlparameter parametercityindexintradayliveprices = insertcommand.parameters.addwithvalue("@cityindexintradayliveprices", createsqldatarecords()); // table-valued parameter sqlparameter parameterprocessingid = insertcommand.parameters.addwithvalue("@processingid", cityindexinterfaceprocessingid); // processing id parameter // set sql db type structured table-value paramter (structured = special data type specifying structured data contained in table-valued parameters) parametercityindexintradayliveprices.sqldbtype = sqldbtype.structured; // execute stored procedure insertcommand.executenonquery(); } // wait 0.5 seconds thread.sleep(500); } } } catch (exception e) { // handle error (standard error messages , update processing) threaderrorhandling(cityindexinterfaceprocessingid, "childthreadintradayquoteshandler (handler stopped now)", e); }; }
use sql server 2016 (it's not rtm yet, it's better 2014 when comes memory-optimized tables). use either memory-optimized table variable or blast whole lot of native stored procedure calls in transaction, each doing 1 insert, depending on what's faster in scenario (this varies). few things watch out for:
- doing multiple inserts in 1 transaction vital save on network roundtrips. while in-memory operations fast, sql server still needs confirm every operation.
- depending on how you're producing data, may find parallelizing inserts can speed things (don't overdo it; you'll hit saturation point). don't try clever here; leverage
async
/await
and/orparallel.foreach
. - if you're passing table-valued parameter, easiest way of doing pass
datatable
parameter value, not efficient way of doing -- passingienumerable<sqldatarecord>
. can use iterator method generate values, constant amount of memory allocated.
you'll have experiment bit find optimal way of passing through data; depends lot on size of data , how you're getting it.
Comments
Post a Comment