5

I am trying to do bulk insert using .Net API in Elasticsearch and this is the error that I am getting while performing the operation;

Error   {Type: es_rejected_execution_exception Reason: "rejected execution of org.elasticsearch.transport.TransportService$6@604b47a4 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51f4f734[Running, pool size = 4, active threads = 4, queued tasks = 50, completed tasks = 164]]" CausedBy: ""}   Nest.BulkError

Is it due to the low space in my system or the bulk insert function itself is not working? My NEST version is 5.0 and Elasticsearch version is also 5.0.

Code of the bulk insert logic;

public void bulkInsert(List<BaseData> recordList, List<String> listOfIndexName) {
    BulkDescriptor descriptor = new BulkDescriptor();            
    foreach (var j in Enumerable.Range(0, recordList.Count)) {
        descriptor.Index<BaseData>(op => op.Document(recordList[j])
                                           .Index(listOfIndexName[j]));
    }
    var result = clientConnection.Bulk(descriptor);
}
5
  • 1
    That error means that you're probably sending data faster than your cluster can handle. Try to add a small delay (your mileage may vary) between each bulk call and see if that helps. Commented Apr 18, 2017 at 8:07
  • hi Val thanks for the suggestion but could you please elaborate as to where do I add it, because I am calling this bulk function only once !\ Commented Apr 18, 2017 at 8:22
  • I see 4 active threads which means that your pool of 4 threads is exhausted which in turn means that there are at least 4 calls being handled at the moment you get the error. And 50 other bulk tasks have been queued and are waiting to be processed. Commented Apr 18, 2017 at 8:23
  • Ok Val, but is it by any chance a memory issue? because my system doesnt have enough memory for more than 500 mb. And I am trying to insert a document of 6mb max Commented Apr 18, 2017 at 8:34
  • I edited your question title to make it easier for others with a similar issue in the future to find the answer :) Commented Apr 18, 2017 at 9:38

1 Answer 1

13

As Val said in the comments, you're likely sending more data at a time than your cluster can handle. It looks like you might be trying to send all your documents in one bulk request, which for a lot of documents or large documents may not work.

With _bulk, you need to send your data to the cluster in several bulk requests and find the optimum number of documents that you can send in each bulk request, in addition to the number of bulk requests that you can send concurrently to your cluster.

There are no hard and fast rules here for the optimum size because it can vary depending on the complexity of your documents, how they are analyzed, the cluster hardware, cluster settings, index settings, etc.

The best thing to do is start with a reasonable number, say 500 documents (or some number that makes sense in your context) in one request, and then go from there. Calculating the total size in bytes of each bulk request is also a good approach to take. If the performance and throughput is insufficient then increase the number of documents, request byte size or concurrent requests until you start seeing es_rejected_execution_exception.

NEST 5.x ships with a handy helper to make bulk requests much easier, using an IObservable<T> and the Observable design pattern

void Main()
{
    var client = new ElasticClient();

    // can cancel the operation by calling .Cancel() on this
    var cancellationTokenSource = new CancellationTokenSource();

    // set up the bulk all observable
    var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
        // number of concurrent requests
        .MaxDegreeOfParallelism(8)
        // in case of 429 response, how long we should wait before retrying
        .BackOffTime(TimeSpan.FromSeconds(5))
        // in case of 429 response, how many times to retry before failing
        .BackOffRetries(2)
        // number of documents to send in each request
        .Size(500)
        .Index("index-name")
        .RefreshOnCompleted(),
        cancellationTokenSource.Token
    );

    var waitHandle = new ManualResetEvent(false);
    Exception ex = null;

    // what to do on each call, when an exception is thrown, and 
    // when the bulk all completes
    var bulkAllObserver = new BulkAllObserver(
        onNext: bulkAllResponse =>
        {
            // do something after each bulk request
        },
        onError: exception =>
        {
            // do something with exception thrown
            ex = exception;
            waitHandle.Set();
        },
        onCompleted: () =>
        {
            // do something when all bulk operations complete
            waitHandle.Set();
        });

    bulkAllObservable.Subscribe(bulkAllObserver);

    // wait for handle to be set.
    waitHandle.WaitOne();

    if (ex != null)
    {
        throw ex;
    }
}

// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
    return Enumerable.Range(1, 10000).Select(x =>
        new Document
        {
            Id = x,
            Name = $"Document {x}" 
        }
    );
}

public class Document
{
    public int Id { get; set; }
    public string Name { get; set; }
}
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you Russ :)
@RussCam for large document collections (millions of docs) is it a good idea to launch BulkAll in a Parallel.ForEach? Otherwise, what is your recommendation to increase performance other than tweaking sizeMbytes, batchSize or these stuff tune for indexing and general recommendations?
@FlorinVîrdol the internals of BulkAll run concurrent bulk requests based on .MaxDegreeOfParallelism(...) parameter. Thus, running multiple BulkAll calls in a concurrent loop like Parallel.ForEach() wouldn't have any benefit over a single call and increasing the .MaxDegreeOfParallelism(...) parameter, as far as I can see. It would however increase the complexity. For millions of documents, the IEnumerable<T> passed to bulk all should enumerate documents from a source and yield them to BulkAll i.e. avoid attempting to load the entire collection into memory.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.