2

I am trying to perform actions in ES, so far I believe that I was able established connection correctly using Jest(http request) and now I am trying to create a new topic and post some information so it will be visible throw the elasticsearch head plugin, I run my code I dont receive any Exception but nothing happens as well,

   public class ElasticSearch {

  private String ES_HOST = "localhost";
  private String ES_PORT = "9200";
  private static JestClient jestClient = null;

  public JestClient getElasticSearchClient() {
    return jestClient;
  }

  public void connectToElasticSearch() {

    try {
      JestClientFactory factory = new JestClientFactory();
      factory.setHttpClientConfig(
              new HttpClientConfig.Builder("http://" + ES_HOST + ":" + ES_PORT)
                      .multiThreaded(true)
                      //            //Per default this implementation will create no more than 2 concurrent
                      // connections per given route
                      //            .defaultMaxTotalConnectionPerRoute(<YOUR_DESIRED_LEVEL_OF_CONCURRENCY_PER_ROUTE>)
                      //            // and no more 20 connections in total
                      //            .maxTotalConnection(<YOUR_DESIRED_LEVEL_OF_CONCURRENCY_TOTAL>)
                      .build());
      jestClient = factory.getObject();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public void createIndex(String indexName, String indexType) throws IOException {
    // jestClient.execute(new CreateIndex.Builder(indexName).build());
    PutMapping putMapping = new PutMapping.Builder(
            indexName,
            indexType,
            "{ \"my_type\" : { \"properties\" : { \"message\" : {\"type\" : \"string\", \"store\" : \"yes\"} } } }"
    ).build();
    jestClient.execute(putMapping);

  }

  public void postInES() throws IOException {
    String source = jsonBuilder()
            .startObject()
            .field("user", "kimchy")
            .field("postDate", "date")
            .field("message", "trying out Elastic Search")
            .endObject().string();
  }

  public static void main(String[] args) throws IOException {
    ElasticSearch es = new ElasticSearch();
    es.connectToElasticSearch();
    es.getElasticSearchClient();
    es.createIndex("ES TEST", "TEST");
    es.postInES();

  }

I am using:

<dependency>
    <groupId>io.searchbox</groupId>
    <artifactId>jest</artifactId>
    <version>5.3.3</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>6.2.4</version>
</dependency>`enter code here`

I will appreciate your help

thanks

1

1 Answer 1

1

Thanks.

I found few problems in my code above and I was able to fix it, first when using java the port has to be 9300 and not 9200, I actually changed my entire code and decided to use TransportClient instead of JestClient which helped me. in case anyone else needs or had a similar problem I will share my code here hope it will help others

import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;

/**
 * @author YoavT @Date 6/26/2018 @Time 9:20 AM
 */
public class ElasticSearch{

  private String ES_HOST = "localhost";
  private int ES_PORT = 9300;
  private TransportClient client = null;


  protected boolean connectToElasticSearch(String clusterName) {
    boolean flag = false;
    try {
      Settings settings =
              Settings.builder()
                      .put("cluster.name", clusterName)
                      .put("client.transport.ignore_cluster_name", true)
                      .put("client.transport.sniff", true)
                      .build();

      // create connection
      client = new PreBuiltTransportClient(settings);
      client.addTransportAddress(new TransportAddress(InetAddress.getByName(ES_HOST), ES_PORT));

      System.out.println(
              "Connection " + clusterName + "@" + ES_HOST + ":" + ES_PORT + " established!");
      flag = true;
    } catch (Exception e) {
      e.printStackTrace();
      flag = false;
    }
    return flag;
  }

  /**
   * Check the health status of the cluster
   */
  public boolean isClusterHealthy(String clusterName) {
    connectToElasticSearch(clusterName);
    final ClusterHealthResponse response =
            client
                    .admin()
                    .cluster()
                    .prepareHealth()
                    .setWaitForGreenStatus()
                    .setTimeout(TimeValue.timeValueSeconds(2))
                    .execute()
                    .actionGet();

    if (response.isTimedOut()) {
      System.out.println("The cluster is unhealthy: " + response.getStatus());
      return false;
    }

    System.out.println("The cluster is healthy: " + response.getStatus());
    return true;
  }

  /**
   * Previous step is (check if cluster is healthy) The cluster is ready now and we can start with
   * creating an index. Before that, we check that the same index was not created previously.
   */
  public boolean isIndexRegistered(String indexName, String clusterName) {
    connectToElasticSearch(clusterName);
    // check if index already exists
    final IndicesExistsResponse ieResponse =
            client.admin().indices().prepareExists(indexName).get(TimeValue.timeValueSeconds(1));

    // index not there
    if (!ieResponse.isExists()) {
      return false;
    }

    System.out.println("Index already created!");
    return true;
  }

  /**
   * If the index does not exist already, we create the index. *
   */
  public boolean createIndex(String indexName, String numberOfShards, String numberOfReplicas, String clusterName) {
    connectToElasticSearch(clusterName);
    try {
      CreateIndexResponse createIndexResponse =
              client
                      .admin()
                      .indices()
                      .prepareCreate(indexName.toLowerCase())
                      .setSettings(
                              Settings.builder()
                                      .put("index.number_of_shards", numberOfShards)
                                      .put("index.number_of_replicas", numberOfReplicas))
                      .get();

      if (createIndexResponse.isAcknowledged()) {
        System.out.println(
                "Created Index with "
                        + numberOfShards
                        + " Shard(s) and "
                        + numberOfReplicas
                        + " Replica(s)!");
        return true;
      }
    } catch (Exception e) {
      e.printStackTrace();
    }

    return false;
  }

  public static void main(String[] args) throws IOException {
    ElasticSearch elasticSearch = new ElasticSearch();
    elasticSearch.connectToElasticSearch("elasticsearch");
    boolean isHealthy = elasticSearch.isClusterHealthy("elasticsearch");
    System.out.println("is cluster healthy= " + isHealthy);
    boolean isIndexExsist = elasticSearch.isIndexRegistered("Test", "elasticsearch");
    System.out.println("is index exsist = " + isIndexExsist);
    boolean createIndex = elasticSearch.createIndex("TestIndex", "3", "1", "elasticsearch");
    System.out.println("Is index created = " + createIndex);
    boolean bulkInsert = elasticSearch.bulkInsert("TestIndex", "Json", "elasticsearch");
    System.out.println("Bulk insert = " + bulkInsert);
    long deleteBulk = elasticSearch.deleteBulk("TestIndex", "name", "Mark Twain", "elasticsearch");
    System.out.println("Delete bulk = " + deleteBulk);
  }

  /**
   * We basically want to index a JSON array consisting of objects with the properties name and age. We use a bulk insert to insert all the data at once.
   * In our tests it happened that the cluster health status was not ready when we tried to run a search/delete query directly after the insert. Consequently,
   * we added the setRefreshPolicy( RefreshPolicy.IMMEDIATE ) method to signalize the server to refresh the index after the specified request.
   * The data can now be queried directly after.
   *
   * @param indexName
   * @param indexType
   * @return
   * @throws IOException
   */

  public boolean bulkInsert(String indexName, String indexType, String clusterName) throws IOException {
    connectToElasticSearch(clusterName);
    boolean flag = true;
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    // for (int i = 0; i < listOfParametersForInsertion.length; i++) {

    bulkRequest
            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
            .add(
                    client
                            .prepareIndex(indexName, indexType, null)
                            .setSource(
                                    XContentFactory.jsonBuilder()
                                            .startObject()
                                            .field("name", "Mark Twain")
                                            .field("age", 75)
                                            .endObject()));
    // }
    BulkResponse bulkResponse = bulkRequest.get();
    if (bulkResponse.hasFailures()) {
      // process failures by iterating through each bulk response item
      System.out.println("Bulk insert failed!");
      flag = false;
    }

    return flag;
  }

  /**
   * After successfully querying data, we try to delete documents using a key-value pair to get
   * deeper into the Elasticsearch behavior.
   */
  public long deleteBulk(String indexName, String key, String value, String clusterName) {
    connectToElasticSearch(clusterName);
    BulkByScrollResponse response =
            DeleteByQueryAction.INSTANCE
                    .newRequestBuilder(client)
                    .filter(QueryBuilders.matchQuery(key, value))
                    .source(indexName)
                    .refresh(true)
                    .get();

    System.out.println("Deleted " + response.getDeleted() + " element(s)!");

    return response.getDeleted();
  }

  /**
   * To query the data, we use a SearchResponse in combination with a scroll. A scroll is basically
   * the Elasticsearch counterpart to a cursor in a traditional SQL database. Using that sort of
   * query is quite an overkill for our example and just for demonstration purposes. It is rather
   * used to query large amounts of data (not like five documents in our case) and not intended for
   * real-time user requests.
   *
   * @param indexName
   * @param from
   * @param to
   */
  public void queryResultsWithFilter(String indexName, int from, int to, String clusterName, String filterField) {
    connectToElasticSearch(clusterName);
    SearchResponse scrollResp =
            client
                    .prepareSearch(indexName)
                    // sort order
                    .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
                    // keep results for 60 seconds
                    .setScroll(new TimeValue(60000))
                    // filter for age
                    .setPostFilter(QueryBuilders.rangeQuery(filterField).from(from).to(to))
                    // maximum of 100 hits will be returned for each scroll
                    .setSize(100)
                    .get();

    // scroll until no hits are returned
    do {
      int count = 1;
      for (SearchHit hit : scrollResp.getHits().getHits()) {
        Map<String, Object> res = hit.getSourceAsMap();

        // print results
        for (Map.Entry<String, Object> entry : res.entrySet()) {
          System.out.println("[" + count + "] " + entry.getKey() + " --> " + entry.getValue());
        }
        count++;
      }

      scrollResp =
              client
                      .prepareSearchScroll(scrollResp.getScrollId())
                      .setScroll(new TimeValue(60000))
                      .execute()
                      .actionGet();
      // zero hits mark the end of the scroll and the while loop.
    } while (scrollResp.getHits().getHits().length != 0);
  }
}
Sign up to request clarification or add additional context in comments.

1 Comment

This is awesome, a concise package of everything I needed. Thank you!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.