The construction of the input to the bulk API doesn't look correct with the low level client. Each bulk operation should consist of two objects
- An object representing the bulk operation to perform e.g. index, and associated metadata
- An object representing the document
It looks like the example in the question combines both of these into one object, which probably results in an error - the bulk response will have more details.
As asked in the comments, is there a reason why you're using the low level client in particular? There is a bulk observable helper in the high level client that can help with indexing a large number of documents, which is useful if those documents are coming from some other source like a file or database.
For example, indexing all questions and answers from Stack Overflow's posts.xml archive
public class Question : Post
{
public string Title { get; set; }
public CompletionField TitleSuggest { get; set; }
public int? AcceptedAnswerId { get; set; }
public int ViewCount { get; set; }
public string LastEditorDisplayName { get; set; }
public List<string> Tags { get; set; }
public int AnswerCount { get; set; }
public int FavoriteCount { get; set; }
public DateTimeOffset? CommunityOwnedDate { get; set; }
public override string Type => nameof(Question);
}
public class Answer : Post
{
public override string Type => nameof(Answer);
}
public class Post
{
public int Id { get; set; }
public JoinField ParentId { get; set; }
public DateTimeOffset CreationDate { get; set; }
public int Score { get; set; }
public string Body { get; set; }
public int? OwnerUserId { get; set; }
public string OwnerDisplayName { get; set; }
public int? LastEditorUserId { get; set; }
public DateTimeOffset? LastEditDate { get; set; }
public DateTimeOffset? LastActivityDate { get; set; }
public int CommentCount { get; set; }
public virtual string Type { get; }
}
void Main()
{
var indexName = "posts";
var node = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionSettings(node)
.RequestTimeout(TimeSpan.FromMinutes(10))
.DefaultMappingFor(new ClrTypeMapping[] {
new ClrTypeMapping(typeof(Post)) { IndexName = indexName },
new ClrTypeMapping(typeof(Question)) { IndexName = indexName, RelationName = "question" },
new ClrTypeMapping(typeof(Answer)) { IndexName = indexName },
})
.OnRequestCompleted(response =>
{
if (response.Success)
Console.WriteLine($"Status: {response.HttpStatusCode}");
else
Console.WriteLine($"Error: {response.DebugInformation}");
});
var client = new ElasticClient(settings);
var characterFilterMappings = CreateCharacterFilterMappings();
if (!client.Indices.Exists(indexName).Exists)
{
var createIndexResponse = client.Indices.Create(indexName, c => c
.Settings(s => s
.NumberOfShards(3)
.NumberOfReplicas(0)
.Analysis(a => a
.CharFilters(cf => cf
.Mapping("programming_language", mca => mca
.Mappings(characterFilterMappings)
)
)
.Analyzers(an => an
.Custom("html", ca => ca
.CharFilters("html_strip", "programming_language")
.Tokenizer("standard")
.Filters("standard", "lowercase", "stop")
)
.Custom("expand", ca => ca
.CharFilters("programming_language")
.Tokenizer("standard")
.Filters("standard", "lowercase", "stop")
)
)
)
)
.Map<Post>(u => u
.RoutingField(r => r.Required())
.AutoMap<Question>()
.AutoMap<Answer>()
.SourceField(s => s
.Excludes(new[] { "titleSuggest" })
)
.Properties<Question>(p => p
.Join(j => j
.Name(f => f.ParentId)
.Relations(r => r
.Join<Question, Answer>()
)
)
.Text(s => s
.Name(n => n.Title)
.Analyzer("expand")
.Norms(false)
.Fields(f => f
.Keyword(ss => ss
.Name("raw")
)
)
)
.Keyword(s => s
.Name(n => n.OwnerDisplayName)
)
.Keyword(s => s
.Name(n => n.LastEditorDisplayName)
)
.Keyword(s => s
.Name(n => n.Tags)
)
.Keyword(s => s
.Name(n => n.Type)
)
.Text(s => s
.Name(n => n.Body)
.Analyzer("html")
.SearchAnalyzer("expand")
)
.Completion(co => co
.Name(n => n.TitleSuggest)
)
)
)
);
if (!createIndexResponse.IsValid)
Console.WriteLine($"invalid response creating index. {createIndexResponse.DebugInformation}");
}
var seenPages = 0;
var handle = new ManualResetEvent(false);
var size = 1000;
var observableBulk = client.BulkAll<Post>(GetQuestionsAndAnswers(), f => f
.MaxDegreeOfParallelism(16)
.BackOffTime(TimeSpan.FromSeconds(10))
.BackOffRetries(2)
.Size(size)
.BufferToBulk((bulk, posts) =>
{
foreach (var post in posts)
{
if (post is Question question)
{
var item = new BulkIndexOperation<Question>(question);
bulk.AddOperation(item);
}
else
{
var answer = (Answer)post;
var item = new BulkIndexOperation<Answer>(answer);
bulk.AddOperation(item);
}
}
})
.RefreshOnCompleted()
.Index(indexName)
);
ExceptionDispatchInfo exception = null;
var bulkObserver = new BulkAllObserver(
onError: e =>
{
exception = ExceptionDispatchInfo.Capture(e);
handle.Set();
},
onCompleted: () => handle.Set(),
onNext: b =>
{
Interlocked.Increment(ref seenPages);
Console.WriteLine($"indexed {seenPages} pages");
}
);
observableBulk.Subscribe(bulkObserver);
handle.WaitOne();
if (exception != null)
exception.Throw();
}
public IEnumerable<Post> GetQuestionsAndAnswers()
{
using (var stream = File.OpenRead(@"stackoverflow_data\Posts.xml"))
using (var reader = XmlReader.Create(stream))
{
reader.ReadToDescendant("posts");
reader.ReadToDescendant("row");
do
{
var item = (XElement)XNode.ReadFrom(reader);
var id = int.Parse(item.Attribute("Id").Value);
var postTypeId = int.Parse(item.Attribute("PostTypeId").Value);
var score = int.Parse(item.Attribute("Score").Value);
var body = item.Attribute("Body")?.Value;
var creationDate = DateTimeOffset.Parse(item.Attribute("CreationDate").Value);
var commentCount = int.Parse(item.Attribute("CommentCount").Value);
var ownerUserId = item.Attribute("OwnerUserId") != null
? int.Parse(item.Attribute("OwnerUserId").Value)
: (int?)null;
var ownerDisplayName = item.Attribute("OwnerDisplayName")?.Value;
var lastEditorUserId = item.Attribute("LastEditorUserId") != null
? int.Parse(item.Attribute("LastEditorUserId").Value)
: (int?)null;
var lastEditDate = item.Attribute("LastEditDate") != null
? DateTimeOffset.Parse(item.Attribute("LastEditDate").Value)
: (DateTimeOffset?)null;
var lastActivityDate = item.Attribute("LastActivityDate") != null
? DateTimeOffset.Parse(item.Attribute("LastActivityDate").Value)
: (DateTimeOffset?)null;
switch (postTypeId)
{
case 1:
var title = item.Attribute("Title")?.Value;
var question = new Question
{
Id = id,
ParentId = JoinField.Root<Question>(),
AcceptedAnswerId = item.Attribute("AcceptedAnswerId") != null
? int.Parse(item.Attribute("AcceptedAnswerId").Value)
: (int?)null,
CreationDate = creationDate,
Score = score,
ViewCount = int.Parse(item.Attribute("ViewCount").Value),
Body = body,
OwnerUserId = ownerUserId,
OwnerDisplayName = ownerDisplayName,
LastEditorUserId = lastEditorUserId,
LastEditorDisplayName = item.Attribute("LastEditorDisplayName")?.Value,
LastEditDate = lastEditDate,
LastActivityDate = lastActivityDate,
Title = title,
TitleSuggest = new CompletionField
{
Input = new[] { title },
Weight = score < 0 ? 0 : score
},
Tags = item.Attribute("Tags") != null
? item.Attribute("Tags").Value.Replace("<", string.Empty)
.Split(new[] { ">" }, StringSplitOptions.RemoveEmptyEntries)
.ToList()
: null,
AnswerCount = int.Parse(item.Attribute("AnswerCount").Value),
CommentCount = commentCount,
FavoriteCount = item.Attribute("FavoriteCount") != null
? int.Parse(item.Attribute("FavoriteCount").Value)
: 0,
CommunityOwnedDate = item.Attribute("CommunityOwnedDate") != null
? DateTimeOffset.Parse(item.Attribute("CommunityOwnedDate").Value)
: (DateTimeOffset?)null
};
yield return question;
break;
case 2:
var answer = new Answer
{
Id = id,
ParentId = JoinField.Link<Answer>(int.Parse(item.Attribute("ParentId").Value)),
CreationDate = creationDate,
Body = body,
OwnerUserId = ownerUserId,
OwnerDisplayName = ownerDisplayName,
LastEditorUserId = lastEditorUserId,
LastEditDate = lastEditDate,
LastActivityDate = lastActivityDate,
CommentCount = commentCount,
};
yield return answer;
break;
}
}
while (reader.ReadToNextSibling("row"));
}
}
/*
* Simple char filter mappings to transform common
* programming languages in symbols to words
* e.g. c# => csharp, C++ => cplusplus
*/
private IList<string> CreateCharacterFilterMappings()
{
var mappings = new List<string>();
foreach (var c in new[] { "c", "f", "m", "j", "s", "a", "k", "t" })
{
mappings.Add($"{c}# => {c}sharp");
mappings.Add($"{c.ToUpper()}# => {c}sharp");
}
foreach (var c in new[] { "g", "m", "c", "s", "a", "d" })
{
mappings.Add($"{c}++ => {c}plusplus");
mappings.Add($"{c.ToUpper()}++ => {c}plusplus");
}
return mappings;
}
IEnumerable<Post> GetQuestionsAndAnswers() yields questions and answers from the large posts.xml file (~50GB in size if I recall), feeding these to BulkAll, which will concurrently make up to 16 bulk requests at a time to Elasticsearch, where each bulk request indexes 1000 documents. See this GitHub repository for a more comprehensive example.
client.LowLevel.Bulk<>instead ofclient.BulkAllorclient.IndexMany?