apache-spark - 无法使用 Spark 从 AWS EMR 集群执行 mtermvectors elasticsearch 查询
问题描述
我正在尝试通过 spark 执行此 elasticsearch 查询:
POST /aa6/_mtermvectors
{
"ids": [
"ABC",
"XYA",
"RTE"
],
"parameters": {
"fields": [
"attribute"
],
"term_statistics": true,
"offsets": false,
"payloads": false,
"positions": false
}
}
我在 Zeppelin 中编写的代码是:
def createString():String = {
return s"""_mtermvectors {
"ids": [
"ABC",
"XYA",
"RTE"
],
"parameters": {
"fields": [
"attribute"
],
"term_statistics": true,
"offsets": false,
"payloads": false,
"positions": false
}
}"""
}
import org.elasticsearch.spark._
sc.esRDD("aa6", "?q="+createString).count
我得到错误:
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:org.elasticsearch.hadoop.rest.EsHadoopRemoteException:parse_exception:parse_exception:在第 1 行第 22 列遇到“<RANGE_GOOP>”[“RTE”“XYA”“ABC”“” . 期待:“TO” ...
{"query":{"query_string":{"query":"_mtermvectors {\"ids\": [\"RTE\",\"ABC\",\"XYA\"], \"parameters\": {\"fields\": [\"attribute\"], \"term_statistics\": true, \"offsets\": false, \"payloads\": false, \"positions\": false } }"}}}
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:408)
这可能很简单,但我无法在进行 spark 调用时找到设置请求正文的方法
解决方案
我不确定,但我认为 package.json 目前不支持此功能es-Spark
。您可以通过ofthis link
查看哪些选项可用。sparkContext
esRDD
您可以做的是利用 Elasticsearch 的高级 Rest Client并在 List 或 Seq 或任何文件中获取详细信息,然后将其加载到 Spark RDD 中。
这是环游世界的方式,但不幸的是,这是我认为的唯一方式。为了有所帮助,我创建了以下代码段,因此您至少拥有来自 Elasticsearch 的与上述查询相关的所需数据。
import org.apache.http.HttpHost
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.MultiTermVectorsRequest
import org.elasticsearch.client.core.TermVectorsRequest
import org.elasticsearch.client.core.TermVectorsResponse
object SampleSparkES {
/**
* Main Class where program starts
*/
def main(args: Array[String]) = {
val termVectorsResponse = elasticRestClient
println(termVectorsResponse.size)
}
/**
* Scala client code to retrieve the response of mtermVectors
*/
def elasticRestClient : java.util.List[TermVectorsResponse] = {
val client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")))
val tvRequestTemplate = new TermVectorsRequest("aa6","ids");
tvRequestTemplate.setFields("attribute");
//Set the document ids you want for collecting the term Vector information
val ids = Array("1", "2", "3");
val request = new MultiTermVectorsRequest(ids, tvRequestTemplate);
val response = client.mtermvectors(request, RequestOptions.DEFAULT)
//get the response
val termVectorsResponse = response.getTermVectorsResponses
//close RestHighLevelClient
client.close();
//return List[TermVectorsResponse]
termVectorsResponse
}
}
例如,您可以通过以下方式获取第一个文档的 sumDocFreq
println(termVectorsResponse.iterator.next.getTermVectorsList.iterator.next.getFieldStatistics.getSumDocFreq)
您现在需要做的就是找到一种方法,以一种可以加载到 RDD 中的方式将集合转换为 Seq。
推荐阅读
- alert - bosun 从不发送未知通知
- wpf - Avalonia Ui 相当于 ImageResource
- c# - Console.Writeline 变量中的转义字符
- c# - .NET HttpClient 的 Http 连接变慢或死锁
- amazon-web-services - 从 cloudformation 堆栈中检索 lambda 函数的 arn
- reactjs - 两个 FlatList 之一未在同一组件中呈现项目
- jpa - @Transactional 和可序列化级别问题
- netbeans-8 - NetBeans - 在保存时用空格替换选项卡
- javascript - 如何使用 axios 拦截器?
- azure-functions - 每十分钟运行一次的时间触发功能