Elasticsearch

首先数据有结构化数据和非结构化数据还有半结构化数据。

结构化数据一般为二维的表结构,一般是用sql来查询,可以用索引增加效率,但是扩展方面很麻烦。

非结构化数据是不能用二维表来记录的数据,比如报表,一般用key -value 结构来存储,相对来说快。

半结构化数据就是类似XML,把内容和结构混在一起,但是查询内容不容易

现在就是为了解决这个实时数据的分析和采集就用Elasticsearch

是什么

是一个开源的高扩展的分布式全文搜索引擎,全站搜索,整个网站匹配的文章。

那么同类型的还有Solr,然后ES一般实时数据,Solr用静态数据,变动不大的查询。

喜欢监控和指标还有分布式索引那么勇ES。

入门

安装

Elasticsearch 分为 Linux 和 Windows 版本,基于我们主要学习的是Elasticsearch的Java 客户端的使用,所以课程中使用的是安装较为简便的Windows版本。

目录 含义
bin 可执行脚本目录
config 配置目录
jdk 内置JDK目录
lib 类库
logs 日志目录
modules 模块目录
plugins 插件目录

解压后,进入bin文件目录,点击elasticsearch.bat文件启动ES服务

注意:9300端口为Elasticsearch集群间组件的通信端口(内部的端口),9200端口为浏览器访问的http 协议RESTful端口。

打开浏览器(推荐使用谷歌浏览器),输入地址:http://localhost:9200,测试结果

基本操作

先理解一些概念

RESTful

REST 指的是一组架构约束条件和原则。满足这些约束条件和原则的应用程序或设计就 是 RESTful。Web 应用程序最重要的 REST 原则是,客户端和服务器之间的交互在请求之 间是无状态的

我们的HTTP协议就遵循了REST原则,比如在web中资源的唯一标识是URI,我们可以在网上搜索这个路径来查找资源,比如localhost:8000/test/test.txt这个路径中不应该包含对资源的操作的。我们要遵循统一的接口,也就是GET,POST,PUT,DELETE,HEAD,遵循这些方法,路径是资源的定位,方法是对资源的操作。

如果按照HTTP的方法来暴露资源,接口具有安全性和幂等性的特性,比方说GET和HEAD请求都是安全的,其他的请求都是幂等性的,但是POST不是幂等性的。

那么请求和响应的数据都是JSON格式的

这里回顾一下JSON
1
2
3
//表示的是特殊标记的javaScript对象
var obj = {"name" : "zhangsan" , "age" : 1 , "info" : {"email" : "xxx"}}
var objs = [obj,obj]

JSON字符串,网络中传递的字符串格式,符合JSON格式。

ES的数据格式

Elasticsearch 是面向文档型数据库,一条数据在这里就是一个文档。为了方便大家理解, 我们将Elasticsearch 里存储文档数据和关系型数据库MySQL存储数据的概念进行一个类比

image-20260121023305032

Elasticsearch 7.X 中, Type 的概念已经被删除了。后面引用了一个概念,倒排索引,正向索引

正排索引,就是通过key找value

1
2
3
4
id    content
-------------
1001 my name is zs
1002 my name is ls

倒排索引,引入了keyword,关键字和表的联系,类型的概念就没那么重要了

1
2
3
4
keyword    id
----------------
name 1001,1002
zhang 1001

索引操作

现在我们需要创建数据库,在ES里面称为索引 Index

创建索引

利用Apifox,向ES服务器发送PUT请求 http://127.0.0.1:9200/shopping

1
2
3
4
5
6
 {
"acknowledged"【响应结果】: true, # true操作成功
"shards_acknowledged"【分片结果】: true, # 分片操作成功
"index"【索引名称】: "shopping"
}
# 注意:创建索引库的分片数默认1片,在7.0.0之前的Elasticsearch版本中,默认5

如果重复添加索引,会返回错误信息

查看所有索引

向ES服务器发GET请求http://127.0.0.1:9200/_cat/indices?v

1
2
health status index    uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open shopping 92tbSXcHQhil8KnB70CpzA 1 1 0 0 208b 208b
表头 含义
health 当前服务器健康状态: • green(集群完整) • yellow(单点正常、集群不完整) • red(单点不正常)
status 索引打开、关闭状态
index 索引名
uuid 索引统一编号
pri 主分片数量
rep 副本数量
docs.count 可用文档数量
docs.deleted 文档删除状态(逻辑删除)
store.size 主分片和副分片整体占空间大小
pri.store.size 主分片占空间大小

这里请求路径中的_cat表示查看的意思,indices表示索引,所以整体含义就是查看当前ES 服务器中的所有索引,就好像MySQL中的show tables的感觉,服务器响应结果如下

查看一个就是发送get请求http://127.0.0.1:9200/shopping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 {
"shopping"【索引名】: {
"aliases"【别名】: {},
"mappings"【映射】: {},
"settings"【设置】: {
"index"【设置 - 索引】: {
"creation_date"【设置 - 索引 - 创建时间】: "1614265373911",
"number_of_shards"【设置 - 索引 - 主分片数量】: "1",
"number_of_replicas"【设置 - 索引 - 副分片数量】: "1",
"uuid"【设置 - 索引 - 唯一标识】: "eI5wemRERTumxGCc1bAk2A",
"version"【设置 - 索引 - 版本】: {
"created": "7080099"
},
"provided_name"【设置 - 索引 - 名称】: "shopping"
}
}
}
}
删除索引

发送DELETE请求,http://127.0.0.1:9200/shopping

文档操作

创建文档

索引已经创建好了,接下来我们来创建文档,并添加数据。这里的文档可以类比为关系型数 据库中的表数据,添加的数据格式为JSON格式

向ES服务器发POST请求 :http://127.0.0.1:9200/shopping/_doc

请求体内容为:

1
2
3
4
5
6
{ 
"title":"小米手机",
"category":"小米",
"images":"http://www.gulixueyuan.com/xm.jpg",
"price":3999.00
}

此处发送请求的方式必须为POST,不能是PUT,否则会发生错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{ 
"_index"【索引】: "shopping",
"_type"【类型-文档】: "_doc",
"_id"【唯一标识】: "Xhsa2ncBlvF_7lxyCE9G", #可以类比为MySQL中的主键,随机生成
"_version"【版本】: 1,
"result"【结果】: "created", #这里的create表示创建成功
"_shards"【分片】: {
"total"【分片 - 总数】: 2,
"successful"【分片 - 成功】: 1,
"failed"【分片 - 失败】: 0
},
"_seq_no": 0,
"_primary_term": 1
}

上面的数据创建后,由于没有指定数据唯一性标识(ID),默认情况下,ES服务器会随机 生成一个。 如果想要自定义唯一性标识,需要在创建时指定:http://127.0.0.1:9200/shopping/_doc/1

此处需要注意:如果增加数据时明确数据主键,那么请求方式也可以为PUT

查看文档

查看文档时,需要指明文档的唯一性标识,类似于MySQL中数据的主键查询

向ES服务器发GET请求 :http://127.0.0.1:9200/shopping/_doc/1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 {
"_index"【索引】: "shopping",
"_type"【文档类型】: "_doc",
"_id": "1",
"_version": 2,
"_seq_no": 2,
"_primary_term": 2,
"found"【查询结果】: true, # true表示查找到,false表示未查找到
"_source"【文档源信息】: {
"title": "华为手机",
"category": "华为",
"images": "http://www.gulixueyuan.com/hw.jpg",
"price": 4999.00
}
}
修改文档

和新增文档一样,输入相同的URL地址请求,如果请求体变化,会将原有的数据内容覆盖

向ES服务器发POST请求 :http://127.0.0.1:9200/shopping/_doc/1

修改成功后,服务器相应结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{ 
"_index": "shopping",
"_type": "_doc",
"_id": "1",
"_version"【版本】: 2,
"result"【结果】: "updated", # updated表示数据被更新
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 2
}
修改字段

修改数据时,也可以只修改某一给条数据的局部信息

向ES服务器发POST请求 :http://127.0.0.1:9200/shopping/_update/1

请求体内容为:

1
2
3
4
5
{  
"doc": {
"price":3000.00
}
}

根据唯一性标识,查询文档数据,文档数据已经更新

删除文档

删除一个文档不会立即从磁盘上移除,它只是被标记成已删除(逻辑删除)。

向ES服务器发DELETE请求 :http://127.0.0.1:9200/shopping/_doc/1

删除成功,服务器响应结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{ 
"_index": "shopping",
"_type": "_doc",
"_id": "1",
"_version"【版本】: 4, #对数据的操作,都会更新版本
"result"【结果】: "deleted", # deleted表示数据被标记为删除
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 4,
"_primary_term": 2
}

如果删除一个不存在的文档会显示not_found

条件删除文档

一般删除数据都是根据文档的唯一性标识进行删除,实际操作时,也可以根据条件对多条数 据进行删除 首先分别增加多条数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
{ 
"title":"小米手机",
"category":"小米",
"images":"http://www.gulixueyuan.com/xm.jpg",
"price":4000.00
}

{
"title":"华为手机",
"category":"华为",
"images":"http://www.gulixueyuan.com/hw.jpg",
"price":4000.00
}

向ES服务器发POST请求 :http://127.0.0.1:9200/shopping/_delete_by_query

请求体内容为:

1
2
3
4
5
6
7
{ 
"query":{
"match":{
"price":4000.00
}
}
}

响应结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{ 
"took"【耗时】: 175,
"timed_out"【是否超时】: false,
"total"【总数】: 2,
"deleted"【删除数量】: 2,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": -1.0,
"throttled_until_millis": 0,
"failures": []
}

映射操作

有了索引库,等于有了数据库中的database。

接下来就需要建索引库(index)中的映射了,类似于数据库(database)中的表结构(table)。 创建数据库表需要设置字段名称,类型,长度,约束等;索引库也一样,需要知道这个类型 下有哪些字段,每个字段有哪些约束信息,这就叫做映射(mapping)。

创建映射

向ES服务器发PUT请求 :http://127.0.0.1:9200/student/_mapping

请求体内容为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{ 
"properties": {
"name":{
"type": "text",
"index": true
},
"sex":{
"type": "text",
"index": false
},
"age":{
"type": "long",
"index": false
}
}
}

映射数据说明:

字段名:任意填写,下面指定许多属性,例如:title、subtitle、images、price

type:类型,Elasticsearch中支持的数据类型非常丰富,说几个关键的:

String类型,又分两种:text:可分词 keyword:不可分词,数据会作为完整字段进行匹配

Numerical:数值类型,分两类

基本数据类型:long、integer、short、byte、double、float、half_float

浮点数的高精度类型:scaled_float

Date:日期类型

Array:数组类型

Object:对象

index:是否索引,默认为true,也就是说你不进行任何配置,所有字段都会被索引。 true:字段会被索引,则可以用来进行搜索 false:字段不会被索引,不能用来搜索

store:是否将数据进行独立存储,默认为false

原始的文本会存储在_source里面,默认情况下其他提取出来的字段都不是独立存储 的,是从_source里面提取出来的。当然你也可以独立的存储某个字段,只要设置 “store”: true 即可,获取独立存储的字段要比从_source中解析快得多,但是也会占用 更多的空间,所以要根据实际业务需求来设置。

analyzer:分词器,这里的ik_max_word即使用ik分词器,后面会有专门的章节学习

查看映射

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_mapping

索引映射关联

上面的操作是把创建索引和创建映射分为了两步,下面这种是一个请求,及创建索引又创建映射

向ES服务器发PUT请求 :http://127.0.0.1:9200/student1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{ 
"settings": {},
"mappings": {
"properties": {
"name":{
"type": "text",
"index": true

},
"sex":{
"type": "text",
"index": false
},
"age":{
"type": "long",
"index": false
}
}
}
}

高级查询

现在我们有了索引也有了映射,并且也学会了传入文档,那么现在就需要进行查询了。

Elasticsearch提供了基于JSON提供完整的查询DSL来定义查询

首先我们先定义文档,也就是装入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# POST /student/_doc/1001 
{
"name":"zhangsan",
"nickname":"zhangsan",
"sex":"男",
"age":30
}
# POST /student/_doc/1002
{
"name":"lisi",
"nickname":"lisi",
"sex":"男",
"age":20
}
# POST /student/_doc/1003
{
"name":"wangwu",
"nickname":"wangwu",
"sex":"女",
"age":40
}
# POST /student/_doc/1004
{
"name":"zhangsan1",
"nickname":"zhangsan1",
"sex":"女",
"age":50
}
# POST /student/_doc/1005
{
"name":"zhangsan2",
"nickname":"zhangsan2",
"sex":"女",
"age":30
}
查询所有文档

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"query": {
"match_all": {}
}
}
# "query":这里的query代表一个查询对象,里面可以有不同的查询属性
# "match_all":查询类型,例如:match_all(代表查询所有), match,term , range 等等
# {查询条件}:查询条件会根据类型的不同,写法也有差异

响应结果格式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{ 
"took【查询花费时间,单位毫秒】" : 1116,
"timed_out【是否超时】" : false,
"_shards【分片信息】" : {
"total【总数】" : 1,
"successful【成功】" : 1,
"skipped【忽略】" : 0,
"failed【失败】" : 0
},
"hits【搜索命中结果】" : {
"total"【搜索条件匹配的文档总数】: {
"value"【总命中计数的值】: 3,
"relation"【计数规则】: "eq" # eq 表示计数准确, gte表示计数不准确
},
"max_score【匹配度分值】" : 1.0,
"hits【命中结果集合】" : [
。。。
}
]
}
}
匹配查询

match 匹配类型查询,会把查询条件进行分词,然后进行查询,多个词条之间是or的关系

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
{ 
"query": {
"match": {
"name":"zhangsan"
}
}
}
字段匹配查询

multi_match与match类似,不同的是它可以在多个字段中查询。

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"query": {
"multi_match": {
"query": "zhangsan",
"fields": ["name","nickname"]
}
}
}
关键字精确查询

term查询,精确的关键词匹配查询,不对查询条件进行分词。

之前用match,会进行分词也就是zhang san会分成zhang 和 san,然后查询的时候能找到,但是term是要完全一致才能找到。

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
{ 
"query": {
"term": {
"name": {
"value": "zhangsan"
}
}
}
}
多关键字精确查询

terms 查询和 term 查询一样,但它允许你指定多值进行匹配。 如果这个字段包含了指定值中的任何一个值,那么这个文档满足条件,类似于mysql的in

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
{ 
"query": {
"terms": {
"name": ["zhangsan","lisi"]
}
}
}
指定查询字段

默认情况下,Elasticsearch在搜索的结果中,会把文档中保存在_source的所有字段都返回。 如果我们只想获取其中的部分字段,我们可以添加_source的过滤

也就是说不是返回整个文档信息,而是按照source的信息来返回

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"_source": ["name","nickname"],
"query": {
"terms": {
"nickname": ["zhangsan"]
}
}
}
过滤字段

我们也可以通过:

includes:来指定想要显示的字段

excludes:来指定不想要显示的字段

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
10
{ 
"_source": {
"includes": ["name","nickname"]
},
"query": {
"terms": {
"nickname": ["zhangsan"]
}
}
}
组合查询

bool把各种其它查询通过must(必须 )、must_not(必须不)、should(应该)的方 式进行组合

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{ 
"query": {
"bool": {
"must": [
{
"match": {
"name": "zhangsan"
}
}
],
"must_not": [
{
"match": {
"age": "40"
}
}
],
"should": [
{
"match": {
"sex": "男"
}
}
]
}
}
}
范围查询

range 查询找出那些落在指定区间内的数字或者时间。range查询允许以下字符

操作符 说明
gt 大于 >
gte 大于等于 >=
lt 小于 <
lte 小于等于 <=

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
10
{ 
"query": {
"range": {
"age": {
"gte": 30,
"lte": 35
}
}
}
}
模糊查询

返回包含与搜索字词相似的字词的文档。 编辑距离是将一个术语转换为另一个术语所需的一个字符更改的次数。这些更改可以包括:

更改字符(box → fox)

删除字符(black → lack)

插入字符(sic → sick)

转置两个相邻字符(act → cat)

为了找到相似的术语,fuzzy查询会在指定的编辑距离内创建一组搜索词的所有可能的变体 或扩展。然后查询返回每个扩展的完全匹配。 通过fuzziness修改编辑距离。一般使用默认值AUTO,根据术语的长度生成编辑距离。

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
{ 
"query": {
"fuzzy": {
"title": {
"value": "zhangsan"
}
}
}
}

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
10
{ 
"query": {
"fuzzy": {
"title": {
"value": "zhangsan",
"fuzziness": 2
}
}
}
}
单字段排序

sort 可以让我们按照不同的字段进行排序,并且通过order指定排序的方式。desc降序,asc 升序。

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
10
11
12
{ 
"query": {
"match": {
"name":"zhangsan"
}
},
"sort": [{
"age": {
"order":"desc"
}
}]
}
多字段排序

假定我们想要结合使用 age和 _score进行查询,并且匹配的结果首先按照年龄排序,然后 按照相关性得分排序

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{ 
"query": {
"match_all": {}
},
"sort": [
{
"age": {
"order": "desc"
}
},
{
"_score":{
"order": "desc"
}
}
]
}
高亮查询

在进行关键字搜索时,搜索出的内容中的关键字会显示不同的颜色,称之为高亮。

Elasticsearch 可以对查询内容中的关键字部分,进行标签和样式(高亮)的设置。 在使用match查询的同时,加上一个highlight属性:

pre_tags:前置标签

post_tags:后置标签

fields:需要高亮的字段

title:这里声明title 字段需要高亮,后面可以为这个字段设置特有配置,也可以空

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{ 
"query": {
"match": {
"name": "zhangsan"
}
},
"highlight": {
"pre_tags": "<font color='red'>",
"post_tags": "</font>",
"fields": {
"name": {}
}
}
}
分页查询

from:当前页的起始索引,默认从0开始。 from = (pageNum - 1) * size size:每页显示多少条

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{ 
"query": {
"match_all": {}
},
"sort": [
{
"age": {
"order": "desc"
}
}
],
"from": 0,
"size": 2
}
聚合查询

聚合允许使用者对es文档进行统计分析,类似与关系型数据库中的group by,当然还有很 多其他的聚合,例如取最大值、平均值等等。

对某个字段取最大值max

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"aggs":{
"max_age":{
"max":{"field":"age"}
}
},
"size":0
}

对某个字段取最小值min

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"aggs":{
"min_age":{
"min":{"field":"age"}
}
},
"size":0
}

对某个字段求和sum

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"aggs":{
"sum_age":{
"sum":{"field":"age"}
}
},
"size":0
}

对某个字段取平均值avg

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"aggs":{
"avg_age":{
"avg":{"field":"age"}
}
},
"size":0
}

对某个字段的值进行去重之后再取总数

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"aggs":{
"distinct_age":{
"cardinality":{"field":"age"}
}
},
"size":0
}

State聚合

stats聚合,对某个字段一次性返回count,max,min,avg和sum五个指标

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"aggs":{
"stats_age":{
"stats":{"field":"age"}
}
},
"size":0
}
桶聚合查询

桶聚和相当于sql中的group by语句

terms聚合,分组统计

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"aggs":{
"age_groupby":{
"terms":{"field":"age"}
}
},
"size":0
}

在terms分组下再进行聚合

向ES服务器发GET请求 :http://127.0.0.1:9200/student/_search

1
2
3
4
5
6
7
8
{ 
"aggs":{
"age_groupby":{
"terms":{"field":"age"}
}
},
"size":0
}

Java API操作

Elasticsearch 软件是由Java语言开发的,所以也可以通过Java API的方式对Elasticsearch 服务进行访问

创建Maven项目

改POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<dependencies> 
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch的客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch依赖2.x的log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
<!-- junit单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
客户端对象

创建客户端类,,代码中创建Elasticsearch客户端对象 因为早期版本的客户端对象已经不再推荐使用,且在未来版本中会被删除,所以这里我们采 用高级REST客户端对象

1
2
3
4
5
6
7
8
9
// 创建客户端对象 
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);

...

// 关闭客户端连接
client.close();

注意:9200端口为Elasticsearch的Web通信端口,localhost为启动ES服务的主机名

索引操作

ES服务器正常启动后,可以通过Java API 客户端对象对ES索引进行操作

1、创建索引

1
2
3
4
5
6
7
8
// 创建索引 - 请求对象 
CreateIndexRequest request = new CreateIndexRequest("user");
// 发送请求,获取响应
CreateIndexResponse response = client.indices().create(request,
RequestOptions.DEFAULT);
boolean acknowledged = response.isAcknowledged();
// 响应状态
System.out.println("操作状态 = " + acknowledged);

2、查看索引

1
2
3
4
5
6
7
8
// 查询索引 - 请求对象 
GetIndexRequest request = new GetIndexRequest("user");
// 发送请求,获取响应
GetIndexResponse response = client.indices().get(request,
RequestOptions.DEFAULT);
System.out.println("aliases:"+response.getAliases());
System.out.println("mappings:"+response.getMappings());
System.out.println("settings:"+response.getSettings());

3、删除索引

1
2
3
4
5
6
7
// 删除索引 - 请求对象 
DeleteIndexRequest request = new DeleteIndexRequest("user");
// 发送请求,获取响应
AcknowledgedResponse response = client.indices().delete(request,
RequestOptions.DEFAULT);
// 操作结果
System.out.println("操作结果 : " + response.isAcknowledged());
文档操作

1、新增文档

创建数据模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class User {                          
private String name;
private Integer age;
private String sex;

public String getName() {
return name;
}
public void setName(String name)
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
}

创建数据,添加到文档中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 新增文档 - 请求对象 
IndexRequest request = new IndexRequest();
// 设置索引及唯一性标识
request.index("user").id("1001");
// 创建数据对象
User user = new User();
user.setName("zhangsan");
user.setAge(30);
user.setSex("男");
ObjectMapper objectMapper = new ObjectMapper();
String productJson = objectMapper.writeValueAsString(user);
// 添加文档数据,数据格式为JSON格式
request.source(productJson,XContentType.JSON);
// 客户端发送请求,获取响应对象
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
////3.打印结果信息
System.out.println("_index:" + response.getIndex());
System.out.println("_id:" + response.getId());
System.out.println("_result:" + response.getResult());

2、修改文档

1
2
3
4
5
6
7
8
9
10
11
// 修改文档 - 请求对象 
UpdateRequest request = new UpdateRequest();
// 配置修改参数
request.index("user").id("1001");
// 设置请求体,对数据进行修改
request.doc(XContentType.JSON, "sex", "女");
// 客户端发送请求,获取响应对象
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
System.out.println("_index:" + response.getIndex());
System.out.println("_id:" + response.getId());
System.out.println("_result:" + response.getResult());

3、查询文档

1
2
3
4
5
6
7
8
9
//1.创建请求对象 
GetRequest request = new GetRequest().index("user").id("1001");
//2.客户端发送请求,获取响应对象
GetResponse response = client.get(request, RequestOptions.DEFAULT);
////3.打印结果信息
System.out.println("_index:" + response.getIndex());
System.out.println("_type:" + response.getType());
System.out.println("_id:" + response.getId());
System.out.println("source:" + response.getSourceAsString());

4、删除文档

1
2
3
4
5
6
//创建请求对象 
DeleteRequest request = new DeleteRequest().index("user").id("1");
//客户端发送请求,获取响应对象
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
//打印信息
System.out.println(response.toString());

5、批量操作

批量新增

1
2
3
4
5
6
7
8
9
10
11
//创建批量新增请求对象 
BulkRequest request = new BulkRequest();
request.add(new IndexRequest().index("user").id("1001").source(XContentType.JSON, "name", "zhangsan"));
request.add(new IndexRequest().index("user").id("1002").source(XContentType.JSON, "name", "lisi"));
request.add(new IndexRequest().index("user").id("1003").source(XContentType.JSON, "name",
"wangwu"));
//客户端发送请求,获取响应对象
BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
//打印结果信息
System.out.println("took:" + responses.getTook());
System.out.println("items:" + responses.getItems());

批量删除

1
2
3
4
5
6
7
8
9
10
//创建批量删除请求对象 
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest().index("user").id("1001"));
request.add(new DeleteRequest().index("user").id("1002"));
request.add(new DeleteRequest().index("user").id("1003"));
//客户端发送请求,获取响应对象
BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
//打印结果信息
System.out.println("took:" + responses.getTook());
System.out.println("items:" + responses.getItems());
高级查询

1、请求体查询

查询所有索引数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建搜索请求对象 
SearchRequest request = new SearchRequest();
request.indices("student");

// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 查询所有数据
sourceBuilder.query(QueryBuilders.matchAllQuery());
request.source(sourceBuilder);

SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");

term查询,查询条件为关键字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 创建搜索请求对象 
SearchRequest request = new SearchRequest();
request.indices("student");

// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("age", "30"));
request.source(sourceBuilder);

SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");

分页查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建搜索请求对象 
SearchRequest request = new SearchRequest();
request.indices("student");

// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());

// 分页查询
// 当前页其实索引(第一条数据的顺序号),from
sourceBuilder.from(0);
// 每页显示多少条size
sourceBuilder.size(2);

request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");

数据排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 构建查询的请求体 
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());

// 排序
sourceBuilder.sort("age", SortOrder.ASC);

request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");

过滤字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 创建搜索请求对象 
SearchRequest request = new SearchRequest();
request.indices("student");

// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());

//查询字段过滤
String[] excludes = {};
String[] includes = {"name", "age"};
sourceBuilder.fetchSource(includes, excludes);

request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");

Bool 查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 创建搜索请求对象 
SearchRequest request = new SearchRequest();
request.indices("student");

// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 必须包含
boolQueryBuilder.must(QueryBuilders.matchQuery("age", "30"));
// 一定不含
boolQueryBuilder.mustNot(QueryBuilders.matchQuery("name", "zhangsan"));
// 可能包含
boolQueryBuilder.should(QueryBuilders.matchQuery("sex", "男"));

sourceBuilder.query(boolQueryBuilder);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");

范围查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建搜索请求对象 
SearchRequest request = new SearchRequest();
request.indices("student");

// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
// 大于等于
rangeQuery.gte("30");
// 小于等于
rangeQuery.lte("40");

sourceBuilder.query(rangeQuery);
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");

模糊查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建搜索请求对象 
SearchRequest request = new SearchRequest();
request.indices("student");

// 构建查询的请求体
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

sourceBuilder.query(QueryBuilders.fuzzyQuery("name","zhangsan").fuzziness(Fu
zziness.ONE));
request.source(sourceBuilder);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 查询匹配
SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");

高亮查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 高亮查询 
SearchRequest request = new SearchRequest().indices("student");
//2.创建查询请求体构建器
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//构建查询方式:高亮查询
TermsQueryBuilder termsQueryBuilder =
QueryBuilders.termsQuery("name","zhangsan");
//设置查询方式
sourceBuilder.query(termsQueryBuilder);
//构建高亮字段
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.preTags("<font color='red'>");//设置标签前缀
highlightBuilder.postTags("</font>");//设置标签后缀
highlightBuilder.field("name");//设置高亮字段
//设置高亮构建对象
sourceBuilder.highlighter(highlightBuilder);
//设置请求体
request.source(sourceBuilder);
//3.客户端发送请求,获取响应对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);

//4.打印响应结果
SearchHits hits = response.getHits();
System.out.println("took::"+response.getTook());
System.out.println("time_out::"+response.isTimedOut());
System.out.println("total::"+hits.getTotalHits());
System.out.println("max_score::"+hits.getMaxScore());
System.out.println("hits::::>>");
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
System.out.println(sourceAsString);
//打印高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
System.out.println(highlightFields);
}
System.out.println("<<::::");

聚合查询

最大年龄

1
2
3
4
5
6
7
8
9
10
11
12
13
// 高亮查询 
SearchRequest request = new SearchRequest().indices("student");

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.max("maxAge").field("age"));
//设置请求体
request.source(sourceBuilder);
//3.客户端发送请求,获取响应对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);

//4.打印响应结果
SearchHits hits = response.getHits();
System.out.println(response);

分组统计

1
2
3
4
5
6
7
8
9
10
11
// 高亮查询 
SearchRequest request = new SearchRequest().indices("student");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(AggregationBuilders.terms("age_groupby").field("age"));
//设置请求体
request.source(sourceBuilder);
//3.客户端发送请求,获取响应对象
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.打印响应结果
SearchHits hits = response.getHits();
System.out.println(response);

ES环境

相关概念

单机 和 集群

单台Elasticsearch 服务器提供服务,往往都有最大的负载能力,超过这个阈值,服务器 性能就会大大降低甚至不可用,所以生产环境中,一般都是运行在指定服务器集群中。

除了负载能力,单点服务器也存在其他问题:

单台机器存储容量有限

单服务器容易出现单点故障,无法实现高可用

单服务的并发处理能力有限

配置服务器集群时,集群中节点数量没有限制,大于等于2个节点就可以看做是集群了。一 般出于高性能及高可用方面来考虑集群中节点数量都是3个以上。

集群 Cluster

一个集群就是由一个或多个服务器节点组织在一起,共同持有整个的数据,并一起提供 索引和搜索功能。一个 Elasticsearch 集群有一个唯一的名字标识,这个名字默认就 是”elasticsearch”。这个名字是重要的,因为一个节点只能通过指定某个集群的名字,来加入 这个集群。

节点 Node

集群中包含很多服务器,一个节点就是其中的一个服务器。作为集群的一部分,它存储 数据,参与集群的索引和搜索功能。

一个节点也是由一个名字来标识的,默认情况下,这个名字是一个随机的漫威漫画角色 的名字,这个名字会在启动的时候赋予节点。这个名字对于管理工作来说挺重要的,因为在 这个管理过程中,你会去确定网络中的哪些服务器对应于Elasticsearch集群中的哪些节点。

一个节点可以通过配置集群名称的方式来加入一个指定的集群。默认情况下,每个节点 都会被安排加入到一个叫做“elasticsearch”的集群中,这意味着,如果你在你的网络中启动了 若干个节点,并假定它们能够相互发现彼此,它们将会自动地形成并加入到一个叫做 “elasticsearch”的集群中。

在一个集群里,只要你想,可以拥有任意多个节点。而且,如果当前你的网络中没有运 行任何Elasticsearch节点,这时启动一个节点,会默认创建并加入一个叫做“elasticsearch”的 集群。

Windows集群

部署集群

1、创建elasticsearch-cluster文件夹,在内部复制三个elasticsearch服务

image-20260122050240544

2、修改集群文件目录中每个节点的 config/elasticsearch.yml配置文件

node-1001 节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#节点1的配置信息: 
#集群名称,节点之间要保持一致
cluster.name: my-elasticsearch
#节点名称,集群内要唯一
node.name: node-1001
node.master: true
node.data: true

#ip地址
network.host: localhost
#http端口
http.port: 1001
#tcp监听端口
transport.tcp.port: 9301

#discovery.seed_hosts: ["localhost:9301", "localhost:9302","localhost:9303"]
#discovery.zen.fd.ping_timeout: 1m
#discovery.zen.fd.ping_retries: 5

#集群内的可以被选为主节点的节点列表
#cluster.initial_master_nodes: ["node-1", "node-2","node-3"]

#跨域配置
#action.destructive_requires_name: true
http.cors.enabled: true
http.cors.allow-origin: "*"

node-1002 节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#节点2的配置信息: 
#集群名称,节点之间要保持一致
cluster.name: my-elasticsearch
#节点名称,集群内要唯一
node.name: node-1002
node.master: true
node.data: true

#ip地址
network.host: localhost
#http端口
http.port: 1002
#tcp监听端口
transport.tcp.port: 9302

discovery.seed_hosts: ["localhost:9301"]
discovery.zen.fd.ping_timeout: 1m
discovery.zen.fd.ping_retries: 5

#集群内的可以被选为主节点的节点列表
#cluster.initial_master_nodes: ["node-1", "node-2","node-3"]

#跨域配置
#action.destructive_requires_name: true
http.cors.enabled: true
http.cors.allow-origin: "*"

node-1003 节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#节点3的配置信息: 
#集群名称,节点之间要保持一致
cluster.name: my-elasticsearch
#节点名称,集群内要唯一
node.name: node-1003
node.master: true
node.data: true

#ip地址
network.host: localhost
#http端口
http.port: 1003
#tcp监听端口
transport.tcp.port: 9303
#候选主节点的地址,在开启服务后可以被选为主节点
discovery.seed_hosts: ["localhost:9301", "localhost:9302"]
discovery.zen.fd.ping_timeout: 1m
discovery.zen.fd.ping_retries: 5

#集群内的可以被选为主节点的节点列表
#cluster.initial_master_nodes: ["node-1", "node-2","node-3"]

#跨域配置
#action.destructive_requires_name: true
http.cors.enabled: true
http.cors.allow-origin: "*"

启动集群

1、启动前先删除每个节点中的data目录中所有内容(如果存在)

image-20260122051354039

2、分别双击执行 bin/elasticsearch.bat, 启动节点服务器,启动后,会自动加入指定名称的 集群

测试集群

查看集群状态

node-1001节点

image-20260122051915758

node-1002节点

image-20260122051928710

node-1003节点

image-20260122051953414

image-20260122052151048

向集群中的node-1001节点增加索引

PUT方法: http://127.0.0.1:1001/user

向集群中的node-1002节点查询索引

GET方法 : http://127.0.0.1:1001/user

Linux 单机

软件下载

软件安装

1、解压软件

将下载的软件解压缩

1
2
3
4
# 解压缩 
tar -zxvf elasticsearch-7.8.0-linux-x86_64.tar.gz -C /opt/module
# 改名
mv elasticsearch-7.8.0 es

2、创建用户

因为安全问题,Elasticsearch不允许root用户直接运行,所以要创建新用户,在root用 户中创建新用户

1
2
3
4
5
useradd es #新增es用户 
passwd es #为es用户设置密码

userdel -r es #如果错了,可以删除再加
chown -R es:es /opt/module/es #文件夹所有者

3、修改配置文件

修改/opt/module/es/config/elasticsearch.yml文件

1
2
3
4
5
6
# 加入如下配置 
cluster.name: elasticsearch
node.name: node-1
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["node-1"]

修改/etc/security/limits.conf

1
2
3
4
# 在文件末尾中增加下面内容 
# 每个进程可以打开的文件数的限制
es soft nofile 65536
es hard nofile 65536

修改/etc/security/limits.d/20-nproc.conf

1
2
3
4
5
6
7
# 在文件末尾中增加下面内容 
# 每个进程可以打开的文件数的限制
es soft nofile 65536
es hard nofile 65536
# 操作系统级别对每个用户创建的进程数的限制
* hard nproc 4096
# 注:* 带表Linux所有用户名称

修改/etc/sysctl.conf

1
2
3
# 在文件中增加下面内容 
# 一个进程可以拥有的VMA(虚拟内存区域)的数量,默认值为65536
vm.max_map_count=655360

重新加载

1
sysctl -p

启动软件

使用ES用户启动

1
2
3
4
5
cd /opt/module/es/ 
#启动
bin/elasticsearch
#后台启动
bin/elasticsearch -d

启动时,会动态生成文件,如果文件所属用户不匹配,会发生错误,需要重新进行修改用户 和用户组

关闭防火墙

1
2
3
4
5
6
#暂时关闭防火墙 
systemctl stop firewalld

#永久关闭防火墙
systemctl enable firewalld.service #打开放货抢永久性生效,重启后不会复原
systemctl disable firewalld.service #关闭防火墙,永久性生效,重启后不会复原

测试软件

浏览器中输入地址:http://linux1:9200/

Linux集群

软件安装

1、解压软件

将下载的软件解压缩

1
2
3
4
# 解压缩 
tar -zxvf elasticsearch-7.8.0-linux-x86_64.tar.gz -C /opt/module
# 改名
mv elasticsearch-7.8.0 es-cluster

将软件分发到其他节点:linux2, linux3

2、创建用户

因为安全问题,Elasticsearch不允许root用户直接运行,所以要在每个节点中创建新用 户,在root用户中创建新用户

1
2
3
4
5
useradd es #新增es用户 
passwd es #为es用户设置密码

userdel -r es #如果错了,可以删除再加
chown -R es:es /opt/module/es-cluster #文件夹所有者

3、修改配置文件

修改/opt/module/es/config/elasticsearch.yml文件,分发文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 加入如下配置 
#集群名称
cluster.name: cluster-es
#节点名称,每个节点的名称不能重复
node.name: node-1
#ip地址,每个节点的地址不能重复
network.host: linux1
#是不是有资格主节点
node.master: true
node.data: true
http.port: 9200
# head 插件需要这打开这两个配置
http.cors.allow-origin: "*"
http.cors.enabled: true
http.max_content_length: 200mb
#es7.x 之后新增的配置,初始化一个新的集群时需要此配置来选举master
cluster.initial_master_nodes: ["node-1"]
#es7.x 之后新增的配置,节点发现
discovery.seed_hosts: ["linux1:9300","linux2:9300","linux3:9300"]
gateway.recover_after_nodes: 2
network.tcp.keep_alive: true
network.tcp.no_delay: true
transport.tcp.compress: true
#集群内同时启动的数据任务个数,默认是2个
cluster.routing.allocation.cluster_concurrent_rebalance: 16
#添加或删除节点及负载均衡时并发恢复的线程个数,默认4个
cluster.routing.allocation.node_concurrent_recoveries: 16
#初始化数据恢复时,并发恢复线程的个数,默认4个
cluster.routing.allocation.node_initial_primaries_recoveries: 16

修改/etc/security/limits.conf,分发文件

1
2
3
# 在文件末尾中增加下面内容 
es soft nofile 65536
es hard nofile 65536

修改/etc/security/limits.d/20-nproc.conf,分发文件

1
2
3
4
5
# 在文件末尾中增加下面内容 
es soft nofile 65536
es hard nofile 65536
* hard nproc 4096
# 注:* 带表Linux所有用户名称

修改/etc/sysctl.conf

1
2
# 在文件中增加下面内容 
vm.max_map_count=655360

重新加载

1
sysctl -p 

启动软件

分别在不同节点上启动ES软件

1
2
3
4
5
cd /opt/module/es-cluster 
#启动
bin/elasticsearch
#后台启动
bin/elasticsearch -d

ES进阶

核心概念

索引

一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的 索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必 须全部是小写字母),并且当我们要对这个索引中的文档进行索引、搜索、更新和删除的时 候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。

能搜索的数据必须索引,这样的好处是可以提高查询速度,比如:新华字典前面的目录 就是索引的意思,目录可以提高查询速度。

Elasticsearch 索引的精髓:一切设计都是为了提高搜索的性能。

类型

在一个索引中,你可以定义一种或多种类型。

一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具 有一组共同字段的文档定义一个类型。不同的版本,类型发生了不同的变化

版本 Type 支持情况
5.x 支持多种 type(一个索引中可定义多个类型)
6.x 一个索引最多只能有一种 type
7.x 默认不再支持自定义索引类型,统一使用默认类型 _doc(创建映射时无需指定 type)

文档

一个文档是一个可被索引的基础信息单元,也就是一条数据

比如:你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个 订单的一个文档。文档以JSON(Javascript Object Notation)格式来表示,而JSON是一个 到处存在的互联网数据交互格式。

在一个index/type 里面,你可以存储任意多的文档。

字段

相当于是数据表的字段,对文档数据根据不同属性进行的分类标识。

映射

mapping 是处理数据的方式和规则方面做一些限制,如:某个字段的数据类型、默认值、 分析器、是否被索引等等。这些都是映射里面可以设置的,其它就是处理ES里面数据的一 些使用规则设置也叫做映射,按着最优规则处理数据对性能提高很大,因此才需要建立映射, 并且需要思考如何建立映射才能对性能更好。

分片

一个索引可以存储超出单个节点硬件限制的大量数据。比如,一个具有10亿文档数据 的索引占据1TB 的磁盘空间,而任一节点都可能没有这样大的磁盘空间。或者单个节点处 理搜索请求,响应太慢。为了解决这个问题,Elasticsearch提供了将索引划分成多份的能力, 每一份就称之为分片。当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分 片本身也是一个功能完善并且独立的“索引”,这个“索引”可以被放置到集群中的任何节点 上。

分片很重要,主要有两方面的原因:

1)允许你水平分割 / 扩展你的内容容量。

2)允许你在分片之上进行分布式的、并行的操作,进而提高性能/吞吐量。

至于一个分片怎样分布,它的文档怎样聚合和搜索请求,是完全由Elasticsearch管理的, 对于作为用户的你来说,这些都是透明的,无需过分关心。

被混淆的概念是,一个 Lucene 索引 我们在 Elasticsearch 称作 分片 。 一个 Elasticsearch 索引 是分片的集合。 当 Elasticsearch 在索引中搜索的时候, 他发送查询 到每一个属于索引的分片(Lucene 索引),然后合并每个分片的结果到一个全局的结果集。

ES中的分片就是Lucene索引,分片一个是索引,但是ES的索引是分片集合。当ES在索引中搜索的时候,会分发到每一个分片上,然后在合并结果集。

副本

在一个网络 / 云的环境里,失败随时都可能发生,在某个分片/节点不知怎么的就处于 离线状态,或者由于任何原因消失了,这种情况下,有一个故障转移机制是非常有用并且是 强烈推荐的。为此目的,Elasticsearch 允许你创建分片的一份或多份拷贝,这些拷贝叫做复 制分片(副本)。

复制分片之所以重要,有两个主要原因:

在分片/节点失败的情况下,提供了高可用性。因为这个原因,注意到复制分片从不与 原/主(original/primary)分片置于同一节点上是非常重要的。

扩展你的搜索量/吞吐量,因为搜索可以在所有的副本上并行运行。(副本是“可读”的!ES 默认会在主分片和所有副本之间轮询分配搜索请求。)

总之,每个索引可以被分成多个分片。一个索引也可以被复制0次(意思是没有复制) 或多次。一旦复制了,每个索引就有了主分片(作为复制源的原来的分片)和复制分片(主 分片的拷贝)之别。分片和复制的数量可以在索引创建的时候指定。在索引创建之后,你可 以在任何时候动态地改变复制的数量,但是你事后不能改变分片的数量。默认情况下, Elasticsearch 中的每个索引被分片1个主分片和1个复制,这意味着,如果你的集群中至少 有两个节点,你的索引将会有1个主分片和另外1个复制分片(1个完全拷贝),这样的话 每个索引总共就有2个分片,我们需要根据索引需要确定分片个数。

分配

将分片分配给某个节点的过程,包括分配主分片或者副本。如果是副本,还包含从主分 片复制数据的过程。这个过程是由master节点完成的。

系统架构

image-20260122095745297

一个运行中的 Elasticsearch 实例称为一个节点,而集群是由一个或者多个拥有相同 cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。当有节点加入集群中或者 从集群中移除节点时,集群将会重新平均分布所有的数据。

当一个节点被选举成为主节点时, 它将负责管理集群范围内的所有变更,例如增加、 删除索引,或者增加、删除节点等。 而主节点并不需要涉及到文档级别的变更和搜索等操 作,所以当集群只拥有一个主节点的情况下,即使流量的增加它也不会成为瓶颈。 任何节 点都可以成为主节点。我们的示例集群就只有一个节点,所以它同时也成为了主节点。

作为用户,我们可以将请求发送到集群中的任何节点 ,包括主节点。 每个节点都知道 任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。 无论 我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将 最终结果返回給客户端。 Elasticsearch 对这一切的管理都是透明的。

分布式集群

单节点集群

​ 我们在包含一个空节点的集群内创建名为 users 的索引,为了演示目的,我们将分配3 个主分片和一份副本(每个主分片拥有一个副本分片)

1
2
3
4
5
6
{ 
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 1
}
}

image-20260122100348115

我们的集群现在是拥有一个索引的单节点集群。所有3个主分片都被分配在 node-1 。

通过elasticsearch-head 插件查看集群情况

image-20260122100554884

集群健康值:yellow( 3 of 6 ) : 表示当前集群的全部主分片都正常运行,但是副本分片没有全部处在正常状 态

3 个副本分片都是 Unassigned —— 它们都没有被分配到任何节点。 在同一个节点上既保存原始数据又保存副本是没有意义的,因为一旦失去了那个节点,我们也将丢失该节点 上的所有副本数据。

故障转移

当集群中只有一个节点在运行时,意味着会有一个单点故障问题——没有冗余。 幸运 的是,我们只需再启动一个节点即可防止数据丢失。当你在同一台机器上启动了第二个节点 时,只要它和第一个节点有同样的 cluster.name 配置,它就会自动发现集群并加入到其中。 但是在不同机器上启动节点的时候,为了加入到同一集群,你需要配置一个可连接到的单播 主机列表。之所以配置为使用单播发现,以防止节点无意中加入集群。只有在同一台机器上 运行的节点才会自动组成集群。

如果启动了第二个节点,我们的集群将会拥有两个节点的集群 : 所有主分片和副本分 片都已被分配

image-20260122101115914

当第二个节点加入到集群后,3 个副本分片将会分配到这个节点上——每个主分片对应一个副本分片。这意味着当集群内任何一个节点出现问题时,我们的数据都完好无损。所 有新近被索引的文档都将会保存在主分片上,然后被并行的复制到对应的副本分片上。这就保证了我们 既可以从主分片又可以从副本分片上获得文档。

水平扩容

怎样为我们的正在增长中的应用程序按需扩容呢?当启动了第三个节点,我们的集群将 会拥有三个节点的集群 : 为了分散负载而对分片进行重新分配

image-20260122101221704

通过elasticsearch-head插件查看集群情况

Node 1 和 Node 2 上各有一个分片被迁移到了新的 Node 3 节点,现在每个节点上都拥有2个分片, 而不是之前的3个。 这表示每个节点的硬件资源(CPU, RAM, I/O)将被更少的分片所共享,每个分片 的性能将会得到提升。

分片是一个功能完整的搜索引擎,它拥有使用一个节点上的所有资源的能力。 我们这个拥有6个分 片(3个主分片和3个副本分片)的索引可以最大扩容到6个节点,每个节点上存在一个分片,并且每个 分片拥有所在节点的全部资源。

但是如果我们想要扩容超过6个节点怎么办呢?

主分片的数目在索引创建时就已经确定了下来。实际上,这个数目定义了这个索引能够 存储 的最大数据量。(实际大小取决于你的数据、硬件和使用场景。) 但是,读操作—— 搜索和返回数据——可以同时被主分片 或 副本分片所处理,所以当你拥有越多的副本分片 时,也将拥有越高的吞吐量。

在运行中的集群上是可以动态调整副本分片数目的,我们可以按需伸缩集群。让我们把 副本数从默认的 1 增加到 2

1
2
3
{ 
"number_of_replicas" : 2
}

image-20260122101511755

users 索引现在拥有9个分片:3个主分片和6个副本分片。 这意味着我们可以将集群 扩容到9个节点,每个节点上一个分片。相比原来3个节点时,集群搜索性能可以提升 3 倍。

应对故障

我们关闭第一个节点,这时集群的状态为:关闭了一个节点后的集群。

我们关闭的节点是一个主节点。而集群必须拥有一个主节点来保证正常工作,所以发生 的第一件事情就是选举一个新的主节点: Node 2 。在我们关闭 Node 1 的同时也失去了主 分片 1 和 2 ,并且在缺失主分片的时候索引也不能正常工作。 如果此时来检查集群的状况,我们看到的状态将会为 red :不是所有主分片都在正常工作。

幸运的是,在其它节点上存在着这两个主分片的完整副本, 所以新的主节点立即将这 些分片在 Node 2 和 Node 3 上对应的副本分片提升为主分片, 此时集群的状态将会为 yellow。这个提升主分片的过程是瞬间发生的,如同按下一个开关一般。

为什么我们集群状态是 yellow 而不是 green 呢?

虽然我们拥有所有的三个主分片,但是同时设置了每个主分片需要对应2份副本分片,而此 时只存在一份副本分片。 所以集群不能为 green 的状态,不过我们不必过于担心:如果我 们同样关闭了 Node 2 ,我们的程序 依然 可以保持在不丢任何数据的情况下运行,因为 Node 3 为每一个分片都保留着一份副本。

如果我们重新启动 Node 1 ,集群可以将缺失的副本分片再次进行分配,那么集群的状 态也将恢复成之前的状态。 如果 Node 1 依然拥有着之前的分片,它将尝试去重用它们, 同时仅从主分片复制发生了修改的数据文件。和之前的集群相比,只是Master节点切换了

路由计算

当索引一个文档的时候,文档会被存储到一个主分片中。 Elasticsearch 如何知道一个 文档应该存放到哪个分片中呢?当我们创建文档时,它如何决定这个文档应当被存储在分片 1 还是分片 2 中呢?首先这肯定不会是随机的,否则将来要获取文档的时候我们就不知道 从何处寻找了。实际上,这个过程是根据下面这个公式决定的:

image-20260122102314057

routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值。 routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards (主分片的数量)后得到余数 。这个分布在 0 到 number_of_primary_shards-1 之间的余数,就是我们所寻求 的文档所在分片的位置。

这就解释了为什么我们要在创建索引的时候就确定好主分片的数量 并且永远不会改变 这个数量:因为如果数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了。

所有的文档 API( get 、 index 、 delete 、 bulk 、 update 以及 mget )都接受一 个叫做 routing 的路由参数 ,通过这个参数我们可以自定义文档到分片的映射。一个自定 义的路由参数可以用来确保所有相关的文档——例如所有属于同一个用户的文档——都被 存储到同一个分片中。

分片控制

我们假设有一个集群由三个节点组成。 它包含一个叫 emps 的索引,有两个主分片, 每个主分片有两个副本分片。相同分片的副本不会放在同一节点。

image-20260122102852954

我们可以发送请求到集群中的任一节点。 每个节点都有能力处理任意请求。 每个节点都知 道集群中任一文档位置,所以可以直接将请求转发到需要的节点上。 在下面的例子中,将 所有的请求发送到 Node 1,我们将其称为 协调节点(coordinating node) 。任何节点都可以成为协调节点

当发送请求的时候, 为了扩展负载,更好的做法是轮询集群中所有的节点。

写流程

新建、索引和删除 请求都是 写 操作, 必须在主分片上面完成之后才能被复制到相关 的副本分片

image-20260122103048442

新建,索引和删除文档所需要的步骤顺序:

1、客户端向 Node 1 发送新建、索引或者删除请求。

2、节点使用文档的 _id 确定文档属于分片 0 。请求会被转发到 Node 3,因为分片 0 的 主分片目前被分配在 Node 3 上。

3、Node 3 在主分片上面执行请求。如果成功了,它将请求并行转发到 Node 1 和 Node 2 的副本分片上。一旦所有的副本分片都报告成功, Node 3 将向协调节点报告成功,协调 节点向客户端报告成功。

在客户端收到成功响应时,文档变更已经在主分片和所有副本分片执行完成,变更是安全的。 有一些可选的请求参数允许您影响这个过程,可能以数据安全为代价提升性能。这些选项很 少使用,因为Elasticsearch 已经很快,但是为了完整起见,请参考下面表格:

参数 含义
consistency 控制写操作(如索引、删除文档)执行前所需活跃分片副本的最小数量,用于保证数据一致性。
可选值:
one:只要主分片可用即可执行写操作
quorum(默认):需要大多数分片(主 + 副本)处于活跃状态
all:必须主分片和所有副本分片都活跃才允许写操作 “规定数量”计算公式int((primary + number_of_replicas) / 2) + 1
⚠️ 注意:number_of_replicas 是索引设置中配置的副本数,不是当前实际活跃的副本数。
示例:若设置 number_of_replicas = 3,则规定数量 = int((1 + 3) / 2) + 1 = 3
timeout 当活跃分片数量未达到 consistency 要求时,Elasticsearch 会等待更多分片上线,默认最多等待 1 分钟
可通过 timeout 参数自定义等待时间,例如:
100100ms → 等待 100 毫秒
30s → 等待 30 秒
超时后若仍未满足条件,则写操作失败。

新索引默认有 1 个副本分片,这意味着为满足规定数量应该需要两个活动的分片副本。 但是,这些 默认的设置会阻止我们在单一节点上做任何事情(因为单点故障,只有一个节点副本没生效)。为了避免这个问题,要求只有当 number_of_replicas 大 于1的时候,规定数量才会执行(只有大于1)。

读流程

我们可以从主分片或者从其它任意副本分片检索文档

image-20260122110448479

从主分片或者副本分片检索文档的步骤顺序:

1、客户端向 Node 1 发送获取请求。

2、节点使用文档的 _id 来确定文档属于分片 0 。分片 0 的副本分片存在于所有的三个 节点上。 在这种情况下,它将请求转发到 Node 2 。

3、Node 2 将文档返回给 Node 1 ,然后将文档返回给客户端。

在处理读取请求时,协调结点在每次请求的时候都会通过轮询所有的副本分片来达到负载均 衡。在文档被检索时,已经被索引的文档可能已经存在于主分片上但是还没有复制到副本分 片。 在这种情况下,副本分片可能会报告文档不存在,但是主分片可能成功返回文档。 一 旦索引请求成功返回给用户,文档在主分片和副本分片都是可用的。

更新流程

部分更新一个文档结合了先前说明的读取和写入流程:

image-20260122211442229

部分更新一个文档的步骤如下

1、客户端向 Node1 发送更新请求

2、他将请求转发到主分片所在的Node3

3、Node3从主分片检索文档,修改 _source 字段中的 JSON ,并且尝试重新索引主分片 的文档。 如果文档已经被另一个进程修改,它会重试步骤 3 ,超过 retry_on_conflict 次 后放弃。

4、如果 Node 3 成功地更新文档,它将新版本的文档并行转发到 Node 1 和 Node 2 上的 副本分片,重新建立索引。一旦所有副本分片都返回成功, Node 3 向协调节点也返回 成功,协调节点向客户端返回成功。

当主分片把更改转发到副本分片时, 它不会转发更新请求。 相反,它转发完整文档的新版本。请记住, 这些更改将会异步转发到副本分片,并且不能保证它们以发送它们相同的顺序到达。 如果Elasticsearch仅 转发更改请求,则可能以错误的顺序应用更改,导致得到损坏的文档。

多文档操作流程

mget 和 bulk API 的模式类似于单文档模式。区别在于协调节点知道每个文档存在于 哪个分片中。它将整个多文档请求分解成 每个分片 的多文档请求,并且将这些请求并行转 发到每个参与节点。

协调节点一旦收到来自每个节点的应答,就将每个节点的响应收集整理成单个响应,返 回给客户端

image-20260123152659398

用单个 mget 请求取回多个文档所需的步骤顺序:

1、客户端向 Node 1 发送 mget 请求。

2、Node 1 为每个分片构建多文档获取请求,然后并行转发这些请求到托管在每个所需的 主分片或者副本分片的节点上。一旦收到所有答复, Node 1 构建响应并将其返回给客 户端。

可以对 docs 数组中每个文档设置 routing 参数。

bulk API, 允许在单个批量请求中执行多个创建、索引、删除和更新请求。

image-20260123152804042

bulk API 按如下步骤顺序执行:

1、客户端向 Node 1 发送 bulk 请求。

2、Node 1 为每个节点创建一个批量请求,并将这些请求并行转发到每个包含主分片的节 点主机。

3、主分片一个接一个按顺序执行每个操作。当每个操作成功时,主分片并行转发新文档(或 删除)到副本分片,然后执行下一个操作。 一旦所有的副本分片报告所有操作成功, 该节点将向协调节点报告成功,协调节点将这些响应收集整理并返回给客户端。

分片原理

分片是Elasticsearch 最小的工作单元。但是究竟什么是一个分片,它是如何工作的?

传统的数据库每个字段存储单个值,但这对全文检索并不够。文本字段中的每个单词需 要被搜索,对数据库意味着需要单个字段有索引多值的能力。最好的支持是一个字段多个值 需求的数据结构是倒排索引。

倒排索引

Elasticsearch 使用一种称为倒排索引的结构,它适用于快速的全文搜索。

见其名,知其意,有倒排索引,肯定会对应有正向索引。正向索引(forward index), 反向索引(inverted index)更熟悉的名字是倒排索引。

所谓的正向索引,就是搜索引擎会将待搜索的文件都对应一个文件 ID,搜索时将这个 ID 和搜索关键字进行对应,形成K-V对,然后对关键字进行统计计数

但是互联网上收录在搜索引擎中的文档的数目是个天文数字,这样的索引结构根本无法满足 实时返回排名结果的要求。所以,搜索引擎会将正向索引重新构建为倒排索引,即把文件 ID对应到关键词的映射转换为关键词到文件ID的映射,每个关键词都对应着一系列的文件, 这些文件中都出现这个关键词。

一个倒排索引由文档中所有不重复词的列表构成,对于其中每个词,有一个包含它的文 档列表。例如,假设我们有两个文档,每个文档的 content 域包含如下内容:

The quick brown fox jumped over the lazy dog

Quick brown foxes leap over lazy dogs in summer

为了创建倒排索引,我们首先将每个文档的 content 域拆分成单独的 词(我们称它为 词条 或 tokens ),创建一个包含所有不重复词条的排序列表,然后列出每个词条出现在哪个文 档。结果如下所示:

image-20260123153121564

现在,如果我们想搜索 quick brown ,我们只需要查找包含每个词条的文档:

image-20260123153136838

两个文档都匹配,但是第一个文档比第二个匹配度更高。如果我们使用仅计算匹配词条数量 的简单相似性算法,那么我们可以说,对于我们查询的相关性来讲,第一个文档比第二个文 档更佳。 但是,我们目前的倒排索引有一些问题:

Quick 和 quick 以独立的词条出现,然而用户可能认为它们是相同的词。

fox 和 foxes 非常相似, 就像 dog 和 dogs ;他们有相同的词根。

jumped 和 leap, 尽管没有相同的词根,但他们的意思很相近。他们是同义词。

使用前面的索引搜索 +Quick +fox 不会得到任何匹配文档。(记住,+ 前缀表明这个词必 须存在。)只有同时出现 Quick 和 fox 的文档才满足这个查询条件,但是第一个文档包含 quick fox ,第二个文档包含 Quick foxes 。 我们的用户可以合理的期望两个文档与查询匹配。我们可以做的更好。 如果我们将词条规范为标准模式,那么我们可以找到与用户搜索的词条不完全一致,但具有 足够相关性的文档。例如:

Quick 可以小写化为 quick 。

foxes 可以 词干提取 —变为词根的格式— 为 fox 。类似的, dogs 可以为提取为 dog 。

jumped 和 leap 是同义词,可以索引为相同的单词 jump 。 现在索引看上去像这样:

image-20260123153321583

这还远远不够。我们搜索 +Quick +fox 仍然 会失败,因为在我们的索引中,已经没有 Quick 了。但是,如果我们对搜索的字符串使用与 content 域相同的标准化规则,会变成查询 +quick +fox,这样两个文档都会匹配!分词和标准化的过程称为分析

这非常重要。你只能搜索在索引中出现的词条,所以索引文本和查询字符串必须标准化为相 同的格式。

文档搜索

早期的全文检索会为整个文档集合建立一个很大的倒排索引并将其写入到磁盘。 一旦 新的索引就绪,旧的就会被其替换,这样最近的变化便可以被检索到。

倒排索引被写入磁盘后是 不可改变 的:它永远不会修改。

不变性有重要的价值:

不需要锁。如果你从来不更新索引,你就不需要担心多进程同时修改数据的问题。

一旦索引被读入内核的文件系统缓存,便会留在哪里,由于其不变性。只要文件系统缓存中还有足够 的空间,那么大部分读请求会直接请求内存,而不会命中磁盘。这提供了很大的性能提升。

其它缓存(像filter缓存),在索引的生命周期内始终有效。它们不需要在每次数据改变时被重建,因为 数据不会变化。

写入单个大的倒排索引允许数据被压缩,减少磁盘 I/O 和 需要被缓存到内存的索引的使用量。

当然,一个不变的索引也有不好的地方。主要事实是它是不可变的! 你不能修改它。如 果你需要让一个新的文档 可被搜索,你需要重建整个索引。这要么对一个索引所能包含的 数据量造成了很大的限制要么对索引可被更新的频率造成了很大的限制。

动态更新索引

如何在保留不变性的前提下实现倒排索引的更新? 解决上面的问题

答案是: 用更多的索引。通过增加新的补充索引来反映新近的修改,而不是直接重写整 个倒排索引。每一个倒排索引都会被轮流查询到,从最早的开始查询完后再对结果进行合并。

Elasticsearch 基于 Lucene, 这个 java 库引入了按段搜索的概念。 每一 段 本身都是一 个倒排索引, 但索引在 Lucene 中除表示所有段的集合外, 还增加了提交点的概念 — 一 个列出了所有已知段的文件

image-20260124093708646

按段抖索:

1、新文档被收集到内存索引缓存

image-20260124093750741

2、不时地, 缓存被 提交

(1) 一个新的段—一个追加的倒排索引—被写入磁盘。

(2) 一个新的包含新段名字的 提交点 被写入磁盘

(3) 磁盘进行 同步 — 所有在文件系统缓存中等待的写入都刷新到磁盘,以确保它们 被写入物理文件

3、新的段被开启,让它包含的文档可见以被搜索

4、内存缓存被清空,等待接收新的文档

当一个查询被触发,所有已知的段按顺序被查询。词项统计会对所有段的结果进行聚合,以 保证每个词和每个文档的关联都被准确计算。 这种方式可以用相对较低的成本将新文档添 加到索引。

段是不可改变的,所以既不能从把文档从旧的段中移除,也不能修改旧的段来进行反映文档 的更新。 取而代之的是,每个提交点会包含一个 .del 文件,文件中会列出这些被删除文档 的段信息。

当一个文档被 “删除” 时,它实际上只是在 .del 文件中被 标记 删除。一个被标记删除的文档仍然可以被查询匹配到, 但它会在最终结果被返回前从结果集中移除。

文档更新也是类似的操作方式:当一个文档被更新时,旧版本文档被标记删除,文档的新版本被索引到一个新的段中。 可能两个版本的文档都会被一个查询匹配到,但被删除的那个旧版本文档在结果集返回前就已经被移除。

总结:ES的动态更新索引首先一个前提是段是不可改变的,其次底层是Lucene,里面有个概念段是倒排索引,提交点是所有段集合。首先新文档会被手机到内存索引缓存,然后不时地缓存会被提交到磁盘里面,也就是一个追加的倒排索引被写入磁盘,然后一个新的提交点也会被写入磁盘,磁盘就会将所有缓存的段和提交点都写进磁盘了,这时候新的段就会被开启,可以被搜索了,然后清空内存。当查询出发的时候会按照顺序查询段,并且由于不可变,提交点会包含一个.del文件,包含被删除的文档的信息,虽然会被查询到,但是返回结果的时候会删除。

近实时搜索

随着按段(per-segment)搜索的发展,一个新的文档从索引到可被搜索的延迟显著降低 了。新文档在几分钟之内即可被检索,但这样还是不够快。磁盘在这里成为了瓶颈。提交 (Commiting)一个新的段到磁盘需要一个 fsync 来确保段被物理性地写入磁盘,这样在断 电的时候就不会丢失数据。 但是 fsync 操作代价很大; 如果每次索引一个文档都去执行一 次的话会造成很大的性能问题。

我们需要的是一个更轻量的方式来使一个文档可被搜索,这意味着 fsync 要从整个过程中 被移除。在Elasticsearch 和磁盘之间是文件系统缓存。 像之前描述的一样, 在内存索引缓 冲区中的文档会被写入到一个新的段中。 但是这里新段会被先写入到文件系统缓存—这一 步代价会比较低,稍后再被刷新到磁盘—这一步代价比较高。不过只要文件已经在缓存中, 就可以像其它文件一样被打开和读取了。

image-20260124100505581

Lucene 允许新段被写入和打开—使其包含的文档在未进行一次完整提交时便对搜索可见。 这种方式比进行一次提交代价要小得多,并且在不影响性能的前提下可以被频繁地执行。

image-20260124100602126

在 Elasticsearch 中,写入和打开一个新段的轻量的过程叫做 refresh 。 默认情况下每个分 片会每秒自动刷新一次。这就是为什么我们说 Elasticsearch 是 近 实时搜索: 文档的变化 并不是立即对搜索可见,但会在一秒之内变为可见。 这些行为可能会对新用户造成困惑: 他们索引了一个文档然后尝试搜索它,但却没有搜到。 这个问题的解决办法是用 refresh API 执行一次手动刷新: /users/_refresh

尽管刷新是比提交轻量很多的操作,它还是会有性能开销。当写测试的时候, 手动刷新很有用,但是不要 在生产环境下每次索引一个文档都去手动刷新。 相反,你的应用需要意识到 Elasticsearch 的近实时的性 质,并接受它的不足。

并不是所有的情况都需要每秒刷新。可能你正在使用 Elasticsearch 索引大量的日志文件, 你可能想优化索引速度而不是近实时搜索, 可以通过设置 refresh_interval , 降低每个索 引的刷新频率

1
2
3
4
5
{ 
"settings": {
"refresh_interval": "30s"
}
}

refresh_interval可以在既存索引上进行动态更新。 在生产环境中,当你正在建立一个大的 新索引时,可以先关闭自动刷新(避免频繁刷新,而是放到缓存中,放完之后再刷新),待开始使用该索引时,再把它们调回来

1
2
3
4
5
6
# 关闭自动刷新 
PUT /users/_settings
{ "refresh_interval": -1 }
# 每一秒刷新
PUT /users/_settings
{ "refresh_interval": "1s" }

持久化变更

如果没有用 fsync 把数据从文件系统缓存刷(flush)到硬盘,我们不能保证数据在断 电甚至是程序正常退出之后依然存在。为了保证 Elasticsearch 的可靠性,需要确保数据变 化被持久化到磁盘。在 动态更新索引,我们说一次完整的提交会将段刷到磁盘,并写入一个包含所有段列表的提交点。Elasticsearch 在启动或重新打开一个索引的过程中使用这个提 交点来判断哪些段隶属于当前分片。

即使通过每秒刷新(refresh)实现了近实时搜索,我们仍然需要经常进行完整提交来确 保能从失败中恢复。但在两次提交之间发生变化的文档怎么办?(也就是说两次提交之间,还未提交的文档没有持久化到磁盘然后断电故障就会丢失了)我们也不希望丢失掉这些数 据。Elasticsearch 增加了一个 translog ,或者叫事务日志,在每一次对 Elasticsearch 进行 操作时均进行了日志记录

整个流程如下:

1、一个文档被索引之后,就会被添加到内存缓冲区,并且追加到了 translog

2、刷新(refresh)使分片每秒被刷新(refresh)一次:

  • 这些在内存缓冲区的文档被写入到一个新的段中,且没有进行 fsync 操作。
  • 这个段被打开,使其可被搜索
  • 进行同步,内存缓冲区被清空

3、这个进程继续工作,更多的文档被添加到内存缓冲区和追加到事务日志

4、每隔一段时间—例如 translog 变得越来越大—索引被刷新(flush);一个新的 translog 被创建,并且一个全量提交被执行

  • 所有在内存缓冲区的文档都被写入一个新的段。
  • 缓冲区被清空。
  • 一个提交点被写入硬盘。
  • 文件系统缓存通过 fsync 被刷新(flush)。
  • 老的 translog 被删除。

translog 提供所有还没有被刷到磁盘的操作的一个持久化纪录。当 Elasticsearch 启动的时 候, 它会从磁盘中使用最后一个提交点去恢复已知的段,并且会重放 translog 中所有在最 后一次提交后发生的变更操作。

translog 也被用来提供实时 CRUD 。当你试着通过 ID 查询、更新、删除一个文档,它会 在尝试从相应的段中检索之前, 首先检查 translog 任何最近的变更。这意味着它总是能够 实时地获取到文档的最新版本。

执行一个提交并且截断 translog 的行为在 Elasticsearch 被称作一次 flush 分片每30分钟被自动刷新(flush),或者在 translog 太大的时候也会刷新

你很少需要自己手动执行 flush 操作;通常情况下,自动刷新就足够了。这就是说,在 重启节点或关闭索引之前执行 flush 有益于你的索引。当 Elasticsearch 尝试恢复或重新打 开一个索引, 它需要重放 translog 中所有的操作,所以如果日志越短,恢复越快。

translog 的目的是保证操作不会丢失,在文件被 fsync 到磁盘前,被写入的文件在重启 之后就会丢失。默认 translog 是每 5 秒被 fsync 刷新到硬盘, 或者在每次写请求完成之 后执行(e.g. index, delete, update, bulk)。这个过程在主分片和复制分片都会发生。最终, 基 本上,这意味着在整个请求被 fsync 到主分片和复制分片的translog之前,你的客户端不会 得到一个 200 OK 响应。

在每次请求后都执行一个 fsync 会带来一些性能损失,尽管实践表明这种损失相对较 小(特别是bulk导入,它在一次请求中平摊了大量文档的开销)。

但是对于一些大容量的偶尔丢失几秒数据问题也并不严重的集群,使用异步的 fsync 还是比较有益的。比如,写入的数据被缓存到内存中,再每 5 秒执行一次 fsync 。如果你 决定使用异步 translog 的话,你需要 保证 在发生 crash 时,丢失掉 sync_interval 时间段 的数据也无所谓。请在决定前知晓这个特性。如果你不确定这个行为的后果,最好是使用默 认的参数( “index.translog.durability”: “request” )来避免数据丢失。

段合并

由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增。而段 数目太多会带来较大的麻烦。 每一个段都会消耗文件句柄、内存和cpu运行周期。更重要 的是,每个搜索请求都必须轮流检查每个段;所以段越多,搜索也就越慢。

Elasticsearch 通过在后台进行段合并来解决这个问题。小的段被合并到大的段,然后这些大 的段再被合并到更大的段。

段合并的时候会将那些旧的已删除文档从文件系统中清除。被删除的文档(或被更新文档的 旧版本)不会被拷贝到新的大段中。

启动段合并不需要你做任何事。进行索引和搜索时会自动进行。

1、当索引的时候,刷新(refresh)操作会创建新的段并将段打开以供搜索使用。

2、合并进程选择一小部分大小相似的段,并且在后台将它们合并到更大的段中。这并不会 中断索引和搜索。

3、一旦合并结束,老的段被删除

  • 新的段被刷新(flush)到了磁盘。 ** 写入一个包含新段且排除旧的和较小的段 的新提交点。
  • 新的段被打开用来搜索。
  • 老的段被删除。

合并大的段需要消耗大量的I/O和CPU资源,如果任其发展会影响搜索性能。Elasticsearch 在默认情况下会对合并流程进行资源限制,所以搜索仍然 有足够的资源很好地执行。

文档分析

分析 包含下面的过程:

  • 将一块文本分成适合于倒排索引的独立的 词条
  • 将这些词条统一化为标准格式以提高它们的“可搜索性”,或者 recall 分析器执行上面的工作。分析器实际上是将三个功能封装到了一个包里

字符过滤器

首先,字符串按顺序通过每个 字符过滤器 。他们的任务是在分词前整理字符串。一个 字符过滤器可以用来去掉HTML,或者将 & 转化成 and。

分词器

其次,字符串被 分词器 分为单个的词条。一个简单的分词器遇到空格和标点的时候, 可能会将文本拆分成词条。

Token过滤器

最后,词条按顺序通过每个 token 过滤器 。这个过程可能会改变词条(例如,小写化 Quick ),删除词条(例如, 像 a, and, the 等无用词),或者增加词条(例如,像 jump 和 leap 这种同义词)。

内置分析器

Elasticsearch 还附带了可以直接使用的预包装的分析器。接下来我们会列出最重要的分 析器。为了证明它们的差异,我们看看每个分析器会从下面的字符串得到哪些词条:

1
"Set the shape to semi-transparent by calling set_trans(5)"

标准分析器

标准分析器是Elasticsearch 默认使用的分析器。它是分析各种语言文本最常用的选择。 它根据 Unicode 联盟 定义的 单词边界 划分文本。删除绝大部分标点。最后,将词条小写。 它会产生:

1
set, the, shape, to, semi, transparent, by, calling, set_trans, 5 

简单分析器

简单分析器在任何不是字母的地方分隔文本,将词条小写。它会产生:

1
set, the, shape, to, semi, transparent, by, calling, set, trans

空格分析器

空格分析器在空格的地方划分文本。它会产生:

1
Set, the, shape, to, semi-transparent, by, calling, set_trans(5)

语言分析器

特定语言分析器可用于 很多语言。它们可以考虑指定语言的特点。例如, 英语 分析 器附带了一组英语无用词(常用单词,例如 and 或者 the ,它们对相关性没有多少影响), 它们会被删除。 由于理解英语语法的规则,这个分词器可以提取英语单词的 词干 。 英语 分词器会产生下面的词条:

1
set, shape, semi, transpar, call, set_tran, 5  

注意看 transparent、 calling 和 set_trans 已经变为词根格式

分析器使用场景

当我们 索引 一个文档,它的全文域被分析成词条以用来创建倒排索引。 但是,当我 们在全文域 搜索 的时候,我们需要将查询字符串通过 相同的分析过程 ,以保证我们搜索 的词条格式与索引中的词条格式一致

全文查询,理解每个域是如何定义的,因此它们可以做正确的事:

  • 当你查询一个 全文 域时, 会对查询字符串应用相同的分析器,以产生正确的搜 索词条列表。
  • 当你查询一个 精确值 域时,不会分析查询字符串,而是搜索你指定的精确值。

测试分析器

有些时候很难理解分词的过程和实际被存储到索引中的词条,特别是你刚接触 Elasticsearch。为了理解发生了什么,你可以使用 analyze API 来看文本是如何被分析的。 在消息体里,指定分析器和要分析的文本

1
2
3
4
5
GET http://localhost:9200/_analyze 
{
"analyzer": "standard",
"text": "Text to analyze"
}

结果中每个元素代表一个单独的词条:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"tokens": [
{
"token": "text",
"start_offset": 0,
"end_offset": 4,
"type": "<ALPHANUM>",
"position": 1
},
{
"token": "to",
"start_offset": 5,
"end_offset": 7,
"type": "<ALPHANUM>",
"position": 2
},
{
"token": "analyze",
"start_offset": 8,
"end_offset": 15,
"type": "<ALPHANUM>",
"position": 3
}
]
}

token 是实际存储到索引中的词条。 position 指明词条在原始文本中出现的位置。 start_offset 和 end_offset 指明字符在原始字符串中的位置。

指定分析器

当Elasticsearch在你的文档中检测到一个新的字符串域,它会自动设置其为一个全文 字 符串 域,使用 标准 分析器对它进行分析。你不希望总是这样。可能你想使用一个不同的 分析器,适用于你的数据使用的语言。有时候你想要一个字符串域就是一个字符串域—不使用分析,直接索引你传入的精确值,例如用户ID或者一个内部的状态域或标签。要做到这 一点,我们必须手动指定这些域的映射。

IK分词器

首先我们通过Postman发送GET请求查询分词效果

1
2
3
4
# GET http://localhost:9200/_analyze 
{
"text":"测试单词"
}

ES的默认分词器无法识别中文中测试、单词这样的词汇,而是简单的将每个字拆完分为一 个词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{ 
"tokens": [
{
"token": "测",
"start_offset": 0,
"end_offset": 1,
"type": "<IDEOGRAPHIC>",
"position": 0
},
{
"token": "试",
"start_offset": 1,
"end_offset": 2,
"type": "<IDEOGRAPHIC>",
"position": 1
},
{
"token": "单",
"start_offset": 2,
"end_offset": 3,
"type": "<IDEOGRAPHIC>",
"position": 2
},
{
"token": "词",
"start_offset": 3,
"end_offset": 4,
"type": "<IDEOGRAPHIC>",
"position": 3
}
]
}

这样的结果显然不符合我们的使用要求,所以我们需要下载ES对应版本的中文分词器。

我们这里采用IK中文分词器,下载地址为: https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.8.0 将解压后的后的文件夹放入ES根目录下的plugins目录下,重启ES即可使用。

我们这次加入新的查询参数"analyzer":"ik_max_word"

1
2
3
4
5
# GET http://localhost:9200/_analyze 
{
"text":"测试单词",
"analyzer":"ik_max_word"
}

ik_max_word:会将文本做最细粒度的拆分

ik_smart:会将文本做最粗粒度的拆分

使用中文分词后的结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{ 
"tokens": [
{
"token": "测试",
"start_offset": 0,
"end_offset": 2,
"type": "CN_WORD",
"position": 0
},
{
"token": "单词",
"start_offset": 2,
"end_offset": 4,
"type": "CN_WORD",
"position": 1
}
]
}

ES中也可以进行扩展词汇,首先查询

1
2
3
4
5
# GET http://localhost:9200/_analyze 
{
"text":"弗雷尔卓德",
"analyzer":"ik_max_word"
}

仅仅可以得到每个字的分词结果,我们需要做的就是使分词器识别到弗雷尔卓德也是一个词 语

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{ 
"tokens": [
{
"token": "弗",
"start_offset": 0,
"end_offset": 1,
"type": "CN_CHAR",
"position": 0
},
{
"token": "雷",
"start_offset": 1,
"end_offset": 2,
"type": "CN_CHAR",
"position": 1
},
{
"token": "尔",
"start_offset": 2,
"end_offset": 3,
"type": "CN_CHAR",
"position": 2
},
{
"token": "卓",
"start_offset": 3,
"end_offset": 4,
"type": "CN_CHAR",
"position": 3
},
{
"token": "德",
"start_offset": 4,
"end_offset": 5,
"type": "CN_CHAR",
"position": 4
}
]
}

首先进入ES根目录中的plugins文件夹下的ik文件夹,进入config目录,创建custom.dic 文件,写入弗雷尔卓德。同时打开IKAnalyzer.cfg.xml文件,将新建的custom.dic配置其中, 重启ES服务器。

image-20260124104014305

自定义分析器

虽然Elasticsearch 带有一些现成的分析器,然而在分析器上Elasticsearch真正的强大之 处在于,你可以通过在一个适合你的特定数据的设置之中组合字符过滤器、分词器、词汇单 元过滤器来创建自定义的分析器。在 分析与分析器 我们说过,一个 分析器 就是在一个包 里面组合了三种函数的一个包装器, 三种函数按照顺序被执行: 字符过滤器 、分词器 、词单元过滤器

接下来,我们看看如何创建自定义的分析器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# PUT http://localhost:9200/my_index 
{
"settings": {
"analysis": {
"char_filter": {
"&_to_and": {
"type": "mapping",
"mappings": [ "&=> and "]
}},
"filter": {
"my_stopwords": {
"type": "stop",
"stopwords": [ "the", "a" ]
}},
"analyzer": {
"my_analyzer": {
"type": "custom",
"char_filter": [ "html_strip", "&_to_and" ],
"tokenizer": "standard",
"filter": [ "lowercase", "my_stopwords" ]
}}
}}}

索引被创建以后,使用 analyze API 来 测试这个新的分析器

1
2
3
4
5
# GET http://127.0.0.1:9200/my_index/_analyze 
{
"text":"The quick & brown fox",
"analyzer": "my_analyzer"
}

文档处理

文档冲突

当我们使用 index API 更新文档 ,可以一次性读取原始文档,做我们的修改,然后重 新索引 整个文档 。 最近的索引请求将获胜(最后的胜利):无论最后哪一个文档被索引,都将被唯一存 储在 Elasticsearch 中。如果其他人同时更改这个文档,他们的更改将丢失。

很多时候这是没有问题的。也许我们的主数据存储是一个关系型数据库,我们只是将数 据复制到 Elasticsearch 中并使其可被搜索。 也许两个人同时更改相同的文档的几率很小。 或者对于我们的业务来说偶尔丢失更改并不是很严重的问题。

但有时丢失了一个变更就是 非常严重的 。试想我们使用 Elasticsearch 存储我们网上 商城商品库存的数量, 每次我们卖一个商品的时候,我们在 Elasticsearch 中将库存数量减 少。有一天,管理层决定做一次促销。突然地,我们一秒要卖好几个商品。 假设有两个 web 程序并行运行,每一个都同时处理所有商品的销售。

这时候就会发生问题了,那么解决这种问题就是用乐观锁或者悲观锁了只能。

悲观就是假定会冲突,每次修改都阻塞,改完再下一个。

乐观就是假定无冲突,先对比版本是不是最新的,如果是就改,不一致就后续操作,重试或者返回错误等。

乐观并发控制

Elasticsearch 是分布式的。当文档创建、更新或删除时, 新版本的文档必须复制到集 群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并 且到达目的地时也许 顺序是乱的 。 Elasticsearch 需要一种方法确保文档的旧版本不会覆 盖新的版本。

当我们之前讨论 index , GET 和 delete 请求时,我们指出每个文档都有一个 _version (版本)号,当文档被修改时版本号递增。 Elasticsearch 使用这个 version 号来确保变更 以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。

我们可以利用 version 号来确保 应用中相互冲突的变更不会导致数据丢失。我们通过 指定想要修改文档的 version 号来达到这个目的。 如果该版本不是当前版本号,我们的请 求将会失败。

外部系统版本控制

一个常见的设置是使用其它数据库作为主要的数据存储使用 Elasticsearch 做数据检 索, 这意味着主数据库的所有更改发生时都需要被复制到 Elasticsearch ,如果多个进程负 责这一数据同步,你可能遇到类似于之前描述的并发问题。

如果你的主数据库已经有了版本号 — 或一个能作为版本号的字段值比如 timestamp —  那么你就可以在 Elasticsearch 中通过增加 version_type=external 到查询字符串的方式重用 这些相同的版本号, 版本号必须是大于零的整数, 且小于 9.2E+18 — 一个 Java 中 long 类型的正值。

外部版本号的处理方式和我们之前讨论的内部版本号的处理方式有些不同, Elasticsearch 不是检查当前 _version 和请求中指定的版本号是否相同, 而是检查当前 _version 是否 小于 指定的版本号。 如果请求成功,外部的版本号作为文档的新 _version 进行存储。

外部版本号不仅在索引和删除请求是可以指定,而且在 创建 新文档时也可以指定。

文档展示 Kibana

Kibana 是一个免费且开放的用户界面,能够让你对 Elasticsearch 数据进行可视化,并 让你在 Elastic Stack 中进行导航。你可以进行各种操作,从跟踪查询负载,到理解请求如 何流经你的整个应用,都能轻松完成。

下载地址:https://artifacts.elastic.co/downloads/kibana/kibana-7.8.0-windows-x86_64.zip

1、解压下载zip文件

2、修改config/kibana.yml文件

1
2
3
4
5
6
7
8
# 默认端口 
server.port: 5601
# ES 服务器的地址
elasticsearch.hosts: ["http://localhost:9200"]
# 索引名
kibana.index: ".kibana"
# 支持中文
i18n.locale: "zh-CN"

3、Windows 环境下执行bin/kibana.bat 文件

4、通过浏览器访问 : http://localhost:5601

ES集成

Spring Data框架集成

Spring Data 是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的 开源框架。其主要目标是使得对数据的访问变得方便快捷,并支持 map-reduce 框架和云计 算数据服务。 Spring Data可以极大的简化JPA(Elasticsearch„)的写法,可以在几乎不用 写实现的情况下,实现对数据的访问和操作。除了CRUD外,还包括如分页、排序等一些 常用的功能。

Spring Data 的官网:https://spring.io/projects/spring-data

image-20260124111753275

Elasticsearch7.6.2,Spring boot2.3.x一般可以兼容Elasticsearch7.x

案例

建Module

改POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.6.RELEASE</version>
<relativePath/>
</parent>


<groupId>com.bitzh.es</groupId>
<artifactId>springdata-elasticsearch</artifactId>
<version>1.0</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
</project>

写YML配置

1
2
3
4
5
6
# es服务地址 
elasticsearch.host=127.0.0.1
# es服务端口
elasticsearch.port=9200
# 配置日志级别,开启debug日志
logging.level.com.atguigu.es=debug

主启动

1
2
3
4
5
6
7
@SpringBootApplication 
public class SpringDataElasticSearchMainApplication {
public static void main(String[] args) {

SpringApplication.run(SpringDataElasticSearchMainApplication.class,args);
}
}

业务类

实体类

1
2
3
4
5
6
7
8
9
10
11
@Data 
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Product {
private Long id;//商品唯一标识
private String title;//商品名称
private String category;//分类名称
private Double price;//商品价格
private String images;//图片地址
}

配置类

  • ElasticsearchRestTemplate是spring-data-elasticsearch项目中的一个类,和其他spring项目中的template 类似。
  • 在新版的spring-data-elasticsearch中,ElasticsearchRestTemplate代替了原来ElasticsearchTemplate。
  • 原因是ElasticsearchTemplate基于TransportClient,TransportClient即将在8.x以后的版本中移除。所 以,我们推荐使用ElasticsearchRestTemplate。
  • ElasticsearchRestTemplate基于RestHighLevelClient客户端的。需要自定义配置类,继承 AbstractElasticsearchConfiguration,并实现elasticsearchClient()抽象方法,创建RestHighLevelClient对 象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@ConfigurationProperties(prefix = "elasticsearch") 
@Configuration
@Data
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
private String host ;
private Integer port ;

//重写父类方法
@Override
public RestHighLevelClient elasticsearchClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
RestHighLevelClient restHighLevelClient = new
RestHighLevelClient(builder);
return restHighLevelClient;
}
}

DAO数据访问对象

1
2
3
4
@Repository 
public interface ProductDao extends ElasticsearchRepository<Product,Long> {

}

实体类映射操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Data 
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Document(indexName = "shopping", shards = 3, replicas = 1)
public class Product {
//必须有id,这里的id是全局唯一的标识,等同于es中的"_id"
@Id
private Long id;//商品唯一标识
/**
* type : 字段数据类型
* analyzer : 分词器类型
* index : 是否索引(默认:true)
* Keyword : 短语,不进行分词
*/

@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String title;//商品名称
@Field(type = FieldType.Keyword)
private String category;//分类名称
@Field(type = FieldType.Double)
private Double price;//商品价格
@Field(type = FieldType.Keyword, index = false)
private String images;//图片地址
}

索引操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RunWith(SpringRunner.class) 
@SpringBootTest
public class SpringDataESIndexTest {
//注入ElasticsearchRestTemplate
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;

//创建索引并增加映射配置
@Test
public void createIndex(){
//创建索引,系统初始化会自动创建索引
System.out.println("创建索引");
}

@Test
public void deleteIndex(){
//创建索引,系统初始化会自动创建索引
boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
System.out.println("删除索引 = " + flg);
}
}

文档操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@RunWith(SpringRunner.class) 
@SpringBootTest
public class SpringDataESProductDaoTest {
@Autowired
private ProductDao productDao;

/**
* 新增
*/
@Test
public void save(){
Product product = new Product();
product.setId(2L);
product.setTitle("华为手机");
product.setCategory("手机");
product.setPrice(2999.0);
product.setImages("http://www.atguigu/hw.jpg");
productDao.save(product);
}

//修改
@Test
public void update(){
Product product = new Product();
product.setId(1L);
product.setTitle("小米2手机");
product.setCategory("手机");
product.setPrice(9999.0);
product.setImages("http://www.atguigu/xm.jpg");
productDao.save(product);
}

//根据id查询
@Test
public void findById(){
Product product = productDao.findById(1L).get();
System.out.println(product);
}

//查询所有
@Test
public void findAll(){
Iterable<Product> products = productDao.findAll();
for (Product product : products) {
System.out.println(product);
}
}

//删除
@Test
public void delete(){
Product product = new Product();
product.setId(1L);
productDao.delete(product);
}

//批量新增
@Test
public void saveAll(){
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Product product = new Product();
product.setId(Long.valueOf(i));
product.setTitle("["+i+"]小米手机");
product.setCategory("手机");
product.setPrice(1999.0+i);
product.setImages("http://www.atguigu/xm.jpg");
productList.add(product);
}
productDao.saveAll(productList);
}

//分页查询
@Test
public void findByPageable(){
//设置排序(排序方式,正序还是倒序,排序的id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage=0;//当前页,第一页从0开始,1表示第二页
int pageSize = 5;//每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
//分页查询
Page<Product> productPage = productDao.findAll(pageRequest);
for (Product Product : productPage.getContent()) {
System.out.println(Product);
}
}
}

文档搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@RunWith(SpringRunner.class) 
@SpringBootTest
public class SpringDataESSearchTest {
@Autowired
private ProductDao productDao;

/**
* term查询
* search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
*/
@Test
public void termQuery(){
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
Iterable<Product> products = productDao.search(termQueryBuilder);
for (Product product : products) {
System.out.println(product);
}
}

/**
* term查询加分页
*/
@Test
public void termQueryByPage(){
int currentPage= 0 ;
int pageSize = 5;
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
Iterable<Product> products = productDao.search(termQueryBuilder,pageRequest);
for (Product product : products) {
System.out.println(product);
}
}

}

ES面试题

为什么使用ES

系统中的数据,随着业务的发展,时间的推移,将会非常多,而业务中往往采用模糊查询进行数据的 搜索,而模糊查询会导致查询引擎放弃索引,导致系统查询数据时都是全表扫描,在百万级别的数据库中, 查询效率是非常低下的,而我们使用 ES 做一个全文索引,将经常查询的系统功能的某些字段,比如说电 商系统的商品表中商品名,描述、价格还有id这些字段我们放入ES索引库里,可以提高查询速度。

ES的master选举流程

基于 \Raft 协议** 的协调层

底层使用 类 Raft 的共识算法 实现 Master 选举,更安全、更可靠,且无需手动配置法定人数

只有配置了以下角色的节点才能成为 Master

1
node.roles: [ master ] 

选举出发条件,当集群中 没有活跃的 Master 节点 时(如首次启动、Master 宕机),会触发选举。

首先发现候选节点,所有候选节点通过单播也就是一对一连接互相发现,节点会配置在一个文件里面,然后形成一个候选主节点的集合。每个节点发起选举,节点给自己投票,并且向其他候选节点请求投票,当急群众的获得得票数大于半数以上的票就称为主节点,然后向全集群广播,其他就变成了Follower,然后网络分区为了防止形成两个master的情况(也就是说多数派里才会形成Master,少数派不会形成master),也就是脑裂。

由于这个网络分区几乎不会有脑裂的风险。之前旧版本可能会由于网络问题或者节点负载或者内存回收,造成ES进程时区相应导致脑裂,现在网络分区之后不会了。

ES索引文档的流程(新增)

首先会对文档ID进行分片,分片公式就是通过hash文档ID然后对分片数量取模得出分片位置。然后当分片所在的节点接收到来自协议节点(也就是访问的节点,比如访问节点1,分到了节点0,那么节点1就是协议节点)的请求后会将请求写入到内存缓冲区中,然后定时默认每隔1秒写入硬盘,从内存缓冲区写入硬盘的过程叫做刷新refresh,为了保证数据不丢失,加入了transLog,也就是写入缓冲区的过程中也会写入到translog里面,当全部都写入之后才会删除transLog。当完全写入的时候会创建一个新的提交点。全部写入默认是30分钟或者translog大于512M的时候会完全写入。

ES更新和删除文档的流程

删除和更新也都是写操作,但是Elasticsearch中的文档是不可变的,因此不能被删除或者改动以展示 其变更; 磁盘上的每个段都有一个相应的.del 文件。当删除请求发送后,文档并没有真的被删除,而是在.del 文件中被标记为删除。该文档依然能匹配查询,但是会在结果中被过滤掉。当段合并时,在.del文件中被标记为删除的文档将不会被写入新段。 在新的文档被创建时,Elasticsearch会为该文档指定一个版本号,当执行更新时,旧版本的文档在.del 文件中被标记为删除,新版本的文档被索引到一个新段。旧版本的文档依然能匹配查询,但是会在结 果中被过滤掉。

ES搜索的流程

搜索被执行成一个两阶段过程,我们称之为 Query Then Fetch;

在初始查询阶段时,查询会广播到索引中每一个分片拷贝(主分片或者副本分片)。 每个分片在本 地执行搜索并构建一个匹配文档的大小为 from + size 的优先队列(有分数的队列排序的)。PS:在搜索的时候是会查询 Filesystem Cache 的,但是有部分数据还在Memory Buffer,所以搜索是近实时的。

每个分片返回各自优先队列中 所有文档的 ID 和排序值 给协调节点,它合并这些值到自己的优先队 列中来产生一个全局排序后的结果列表。

接下来就是取回阶段,协调节点辨别出哪些文档需要被取回并向相关的分片提交多个 GET 请求。

每 个分片加载并丰富文档,如果有需要的话,接着返回文档给协调节点。一旦所有的文档都被取回了, 协调节点返回结果给客户端。

Query Then Fetch的搜索类型在文档相关性打分的时候参考的是本分片的数据,这样在文档数量较少 的时候可能不够准确,DF S Query Then Fetch 增加了一个预查询的处理,询问 Term 和 Document frequency,这个评分更准确,但是性能会变差。

通俗理解:因为ES是分布式系统,一个索引数据被分成多个分片,每个分片可能在不同的节点上,客户端不知道数据在哪,所有先问所有分片,你们有哪些匹配的文档,再汇总结果,最后决定返回哪些,最后去取完整的文档内容。

ES在部署的时候,对Linux的设置有哪些优化方法?

64 GB 内存的机器是非常理想的, 但是32 GB 和16 GB 机器也是很常见的。少于8 GB 会适得其反。

如果你要在更快的 CPUs 和更多的核心之间选择,选择更多的核心更好。多个内核提供的额外并发远胜过稍微快一点点的时钟频率。

如果你负担得起 SSD(固态硬盘),它将远远超出任何旋转介质。 基于 SSD 的节点,查询和索引性能都有提升。 如果你负担得起,SSD 是一个好的选择。

即使数据中心们近在咫尺,也要避免集群跨越多个数据中心。绝对要避免集群跨越大的地理距离。

请确保运行你应用程序的 JVM 和服务器的 JVM 是完全一样的。 在 Elasticsearch 的几个地方,使 用 Java 的本地序列化。

如果你的搜索结果不需要近实时的准确度,考虑把每个索引的index.refresh_interval 改到30s。

段和合并:Elasticsearch 默认值是 20 MB/s,对机械磁盘应该是个不错的设置。如果你用的是 SSD, 可以考虑提高到 100–200 MB/s。如果你在做批量导入,完全不在意搜索,你可以彻底关掉合并限流。 另外还可以增加 index.translog.flush_threshold_size 设置,从默认的 512 MB 到更大一些的值,比如 1 GB,这可以在一次清空触发的时候在事务日志里积累出更大的段。

GC方面,在使用ES时要注意什么

倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory 段增长趋势。

各类缓存,field cache, filter cache, indexing cache, bulk queue 等等,要设置合理的大小,并且要应该根 据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他 任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。

避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用scan & scroll api来实现。

cluster stats 驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。

想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。

ES对于大数据量的聚合如何实现

Elasticsearch 提供的首个近似聚合是 cardinality 度量。它提供一个字段的基数,即该字段的 distinct 或者unique 值的数目。它是基于HLL算法的。HLL 会先对我们的输入作哈希运算,然后根据哈希运算的 结果中的 bits 做概率估算从而得到基数。其特点是:可配置的精度,用来控制内存的使用(更精确 = 更 多内存);小的数据集精度是非常高的;我们可以通过配置参数,来设置去重需要的固定内存使用量。无 论数千还是数十亿的唯一值,内存使用量只与你配置的精确度相关

并发情况下,ES如何保证读写一致

可以通过版本号使用乐观并发控制,以确保新版本不会被旧版本覆盖,由应用层来处理具体的冲突;

另外对于写操作,一致性级别支持quorum/one/all,默认为quorum,即只有当大多数分片可用时才允 许写操作。但即使大多数可用,也可能存在因为网络等原因导致写入副本失败,这样该副本被认为故 障,分片将会在一个不同的节点上重建。

对于读操作,可以设置replication为sync(默认),这使得操作在主分片和副本分片都完成后才会返回; 如果设置replication 为 async 时,也可以通过设置搜索请求参数_preference为primary来查询主分片, 确保文档是最新版本。

如何监控ES集群状态

elasticsearch-head 插件

通过Kibana监控ES。你可以实时查看你的集群健康状态和性能,也可以分析过去的集群、 索引和节点指标

是否了解字典树

字典树又称单词查找树,Trie树,是一种树形结构,是一种哈希树的变种。典型应用是用于统计,排 序和保存大量的字符串(但不仅限于字符串),所以经常被搜索引擎系统用于文本词频统计。它的优点是: 利用字符串的公共前缀来减少查询时间,最大限度地减少无谓的字符串比较,查询效率比哈希树高。

Trie的核心思想是空间换时间,利用字符串的公共前缀来降低查询时间的开销以达到提高效率的目的。 它有3个基本性质:

根节点不包含字符,除根节点外每一个节点都只包含一个字符。

从根节点到某一节点,路径上经过的字符连接起来,为该节点对应的字符串。

每个节点的所有子节点包含的字符都不相同。

对于中文的字典树,每个节点的子节点用一个哈希表存储,这样就不用浪费太大的空间,而且查询速度上 可以保留哈希的复杂度O(1)。