I am receiving (streamed) data from an external source (over Lightstreamer) into my C# application. My C# application receives data from the listener. The data from the listener are stored in a queue (ConcurrentQueue). The queue is getting cleaned every 0.5 seconds with TryDequeue into a DataTable. The DataTable will then be copy into a SQL database using SqlBulkCopy. The SQL database processes the newly data arrived from the staging table into the final table. I currently receive around 300'000 rows per day (can increae within the next weeks strongly) and my goal is to stay under 1 second from the time I receive the data until they are available in the final SQL table. Currently the maximum rows per seconds I have to process is around 50 rows.
Unfortunately, since receiving more and more data, my logic is getting slower in performance (still far under 1 second, but I wanna keep improving). The main bottleneck (so far) is the processing of the staging data (on the SQL database) into the final table. In order to improve the performance, I would like to switch the staging table into a memory-optimized table. The final table is already a memory-optimized table so they will work together fine for sure.
My questions:
- Is there a way to use SqlBulkCopy (out of C#) with memory-optimized tables? (as far as I know there is no way yet)
- Any suggestions for the fastest way to write the received data from my C# application into the memory-optimized staging table?
EDIT (with solution):
After the comments/answers and performance evaluations I decided to give up the bulk insert and use SQLCommand to handover a IEnumerable with my data as table-valued parameter into a native compiled stored procedure to store the data directly in my memory-optimized final table (as well as a copy into the "staging" table which now serves as archive). Performance increased significantly (even I did not consider parallelizing the inserts yet (will be at a later stage)).
Here is part of the code:
Memory-optimized user-defined table type (to handover the data from C# into SQL (stored procedure):
CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
[CityIndexInstrumentID] [int] NOT NULL,
[CityIndexTimeStamp] [bigint] NOT NULL,
[BidPrice] [numeric](18, 8) NOT NULL,
[AskPrice] [numeric](18, 8) NOT NULL,
INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED
(
[CityIndexInstrumentID] ASC,
[CityIndexTimeStamp] ASC,
[BidPrice] ASC,
[AskPrice] ASC
)
)
WITH ( MEMORY_OPTIMIZED = ON )
Native compiled stored procedures to insert the data into final table and staging (which serves as archive in this case):
create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
(
@ProcessingID int,
@CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
)
with native_compilation, schemabinding, execute as owner
as
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')
-- store prices
insert into TimeSeries.CityIndexIntradayLivePrices
(
ObjectID,
PerDateTime,
BidPrice,
AskPrice,
ProcessingID
)
select Objects.ObjectID,
CityIndexTimeStamp,
CityIndexIntradayLivePricesStaging.BidPrice,
CityIndexIntradayLivePricesStaging.AskPrice,
@ProcessingID
from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
Objects.Objects
where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID
-- store data in staging table
insert into Staging.CityIndexIntradayLivePricesStaging
(
ImportProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
)
select @ProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
from @CityIndexIntradayLivePrices
end
IEnumerable filled with the from the queue:
private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()
{
// set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)
SqlMetaData MetaDataCol1;
SqlMetaData MetaDataCol2;
SqlMetaData MetaDataCol3;
SqlMetaData MetaDataCol4;
MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
// define sql data record with the columns
SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });
// remove each price row from queue and add it to the sql data record
LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();
while (IntradayQuotesQueue.TryDequeue(out PriceDTO))
{
DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with / as escape sequence)
DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price
yield return DataRecord;
}
}
Handling the data every 0.5 seconds:
public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)
{
try
{
// open new sql connection
using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false"))
{
// open connection
TimeSeriesDatabaseSQLConnection.Open();
// endless loop to keep thread alive
while(true)
{
// ensure queue has rows to process (otherwise no need to continue)
if(IntradayQuotesQueue.Count > 0)
{
// define stored procedure for sql command
SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);
// set command type to stored procedure
InsertCommand.CommandType = CommandType.StoredProcedure;
// define sql parameters (table-value parameter gets data from CreateSqlDataRecords())
SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter
// set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)
ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;
// execute stored procedure
InsertCommand.ExecuteNonQuery();
}
// wait 0.5 seconds
Thread.Sleep(500);
}
}
}
catch (Exception e)
{
// handle error (standard error messages and update processing)
ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);
};
}