前言

在使用ES的时候,有些配置一旦确定是不支持修改的,例如以下场景:

1、修改mappings配置,如某个字段的类型或属性:默认情况下,动态mapping支持新增字段,不允许修改。
2、创建索引时主分片数一旦确定也是不能修改的。
3、索引的分词规则:使用了新的分词器或修改了后,之前索引的数据是使用老的分词器索引的,此时也需要索引重建。

以上场景,都需要重建索引,ES也提供了_reindex的api,应对以上场景。


概念

_reindex api 官方地址传送门>

功能就是将一个索引的文档copy到另一个索引上,它是以一种快照的方式进行的。

底层使用了两个api:先通过scroll api将文档从源索引中查询出来,再通过bulk api将文档写入新的索引中。

所以,对于源索引的mappings的配置以及主副分片的数量等是不会被reindex到新的索引上的,需要提前按需创建新的索引,如果不创建目标索引,那么会按集群或模板自动完成索引的创建。

最基础的示例:
1、先创建源索引 my_products

PUT http://127.0.0.1:9200/my_products
{
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0     
    }
}

2、索引部分文档

POST http://127.0.0.1:9200/my_products/_bulk

{ "index": { "_id": 1 }}
{ "productID" : "XHDK-A-1293-#fJ3","desc":"My little iPhone", "createTime":"2023-08-16", "price": 10, "status":true}
{ "index": { "_id": 2 }}
{ "productID" : "KDKE-B-9947-#kL5","desc":"My big iPad", "createTime":"2023-08-18", "price": 20, "status":true}
{ "index": { "_id": 3 }}
{ "productID" : "JODL-X-1937-#pV7","desc":"My medium MBP", "createTime":"2023-08-20", "price": 30.5, "status":false}

3、创建目标索引 products_new

PUT http://127.0.0.1:9200/products_new
{
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0     
    }
}

4、使用 _reindex api

POST http://127.0.0.1:9200/_reindex
{
  "source": {
     "index": "my_products",
  },
  "dest": {
     "index": "products_new"
  }
}

source:指定源索引信息;dest指定目标索引信息。

5、查看索引 products_new 数量:

GET http://127.0.0.1:9200/products_new/_count

下面演示示例,没有特别说明,均在1、2、3的基础上进行。
第3步也可以省略,可以配置个index template,在reindex自动创建目标索引。


常见场景

文档处理相关

1、reindex特定文档

通过query查询源索引特定的文档进行reindex。

POST http://127.0.0.1:9200/_reindex
{
  "source": {
     "index": "my_products",
      "query":{
              "term":{
                  "status": true
              }
          }
  },
  "dest": {
     "index": "products_new"
  }
}

只reindex源索引中文档的status为true的文档。


2、reindex部分字段

可以通过 _source 过滤源索引的部分字段。

记得先删除原来的目标索引并重新创建。

POST http://127.0.0.1:9200/_reindex

{
  "source": {
    "index": "my_products",
    "_source": ["status", "productID"]
  },
  "dest": {
    "index": "products_new"
  }
}

通过 _search api查看全部文档:

GET http://127.0.0.1:9200/products_new/_search

{
    "query": {
        "match_all":{}
    }
}

搜索显示只索引了status和productID两个字段;另外_id也同步了。


3、修改字段名或新增字段

可以使用 script 进行字段和逻辑处理。

例如:需要将status 修改名称为 state,可以按如下处理:

POST http://127.0.0.1:9200/_reindex

{
  "source": {
    "index": "my_products"
  },
  "dest": {
    "index": "products_new"
  },
  "script": {
    "source": "ctx._source.state = ctx._source.remove(\"status\")"
  }
}

通过 _search api查看,部分文档如下:

{
	"_index": "products_new",
	"_id": "1",
	"_score": 1,
	"_source": {
		"productID": "XHDK-A-1293-#fJ3",
		"createTime": "2023-08-16",
		"price": 10,
		"state": true,
		"desc": "My little iPhone"
	}
}

status 变成了 state字段

如果要新增字段,script可以定义如下source:

"source": "ctx._source.newField = ctx._source.desc + '_' + ctx._source.status"

多个字段写法:

"source": 
	"""
		ctx._source.newField = ctx._source.desc + '_' + ctx._source.status;
		ctx._source.newField2 = ctx._source.price + '_' + ctx._source.status;
	"""

4、版本冲突

如果dest索引已经有了部分数据,可以指定不同的策略应对version上的冲突,通过指定dest索引的version_type,如下:

  • version_type省略或者设置internal ,可以直接覆盖具有相同id的文档,同时目标索引文档的版本号加1。
  • version_type设置external,目标索引文档会使用源索引的文档版本,id对应文档不存在时,进行创建;id对应文档存在时,如果目标索引文档版本更低,则更新,否则会提示冲突(默认当前批次处理完,中断后续批次处理)。

1)、演示 version_type = internal
先进行一次reindex:

POST http://127.0.0.1:9200/_reindex
{
  "source": {
    "index": "my_products"
  },
  "dest": {
    "index": "products_new"
  }
}

查看目标索引 products_new 文档版本:

增加参数 ?version=true

GET http://127.0.0.1:9200/products_new/_search?version=true
{
  "query": {
    "match_all": {}
  }
}

部分结果如下:

{
	"_index": "products_new",
	"_id": "1",
	"_version": 1,
	"_score": 1,
	"_source": {
		"productID": "XHDK-A-1293-#fJ3",
		"desc": "My little iPhone",
		"createTime": "2023-08-16",
		"price": 10,
		"status": true
	}
}

版本号为1

再指定 version_type = internal,进行一次reindex:

可以看到 _reindex 返回显示是更新了文档,没有创建文档。
再次 _search?version=true 搜索,部分结果如下,文档version变成2,在原来基础上增加了1,因为文档执行了更新。

{
	"_index": "products_new",
	"_id": "1",
	"_version": 2,
	"_score": 1,
	"_source": {
		"productID": "XHDK-A-1293-#fJ3",
		"desc": "My little iPhone",
		"createTime": "2023-08-16",
		"price": 10,
		"status": true
	}
},

2)、演示 version_type = external

先删除演示1中的目标索引

执行如下reindex,指定version_type:

POST http://127.0.0.1:9200/_reindex

{
  "source": {
    "index": "my_products"
  },
  "dest": {
    "index": "products_new",
    "version_type": "external"
  }
}

查询目标索引及数据,成功,文档version均为1

此时目标索引已存在,相同的body,再次reindex,所有文档报错如下,版本冲突,因为verison相等了:

{
	"index": "products_new",
	"id": "1",
	"cause": {
		"type": "version_conflict_engine_exception",
		"reason": "[1]: version conflict, current version [1] is higher or equal to the one provided [1]",
		"index_uuid": "zG2deNneRwWVpvnl4bMUSw",
		"shard": "0",
		"index": "products_new"
	},
	"status": 409
}

更新源索引 my_products 文档id=1的文档,其版本更新为2。

相同的body(version_type = external),再次执行reindex,此时id=1的文档reindex成功,另外2个仍然冲突(源索引共3个文档)。

默认情况下,某批次的文档出现版本冲突会中断reindex后续批次处理,可以通过配置 conflicts 来忽略冲突,继续执行,如下所示:

POST http://127.0.0.1:9200/_reindex

{
  "conflicts": "proceed",
  "source": {
    "index": "my_products"
  },
  "dest": {
    "index": "products_new",
    "version_type": "external"
  }
}

conflicts默认是“abort”配置


性能相关

1、指定每批次查询的数据量

默认size大小为1000,可以通过指定size来提高reindex的速度。

POST http://127.0.0.1:9200/_reindex

{
  "source": {
    "index": "my_products",
    "size": 1
  },
  "dest": {
    "index": "products_new",
  }
}

返回结果如下:

{
	"took": 1407,
	"timed_out": false,
	"total": 3,
	"updated": 0,
	"created": 3,
	"deleted": 0,
	"batches": 3,
	"version_conflicts": 0,
	"noops": 0,
	"retries": {
		"bulk": 0,
		"search": 0
	},
	"throttled_millis": 0,
	"requests_per_second": -1,
	"throttled_until_millis": 0,
	"failures": []
}

可以看到 batches 的值为3(源索引共3个文档,每批次1个,共3/1批次)


2、异步reindex

reindex默认超时时间1m(分钟),如果数据量比较大,可能会导致超时中断,可以通过指定 rul参数 wait_for_completion = false 进行异步处理,如下:

POST http://127.0.0.1:9200/_reindex?wait_for_completion=false

{
  "source": {
    "index": "my_products",
    "size": 1
  },
  "dest": {
    "index": "products_new"
  }
}

返回任务id:

{
	"task": "Y7r8Iz33QHCi3oDq3HCwEg:1475243"
}

通过 _task/<taskId> 查看任务进度:

GET http://127.0.0.1:9200/_tasks/Y7r8Iz33QHCi3oDq3HCwEg:1475243

返回如下:

{
	"completed": true,
	"task": {
		"node": "Y7r8Iz33QHCi3oDq3HCwEg",
		"id": 1475243,
		"type": "transport",
		"action": "indices:data/write/reindex",
		"status": {
			"total": 3,
			"updated": 0,
			"created": 3,
			"deleted": 0,
			"batches": 3,
			"version_conflicts": 0,
			"noops": 0,
			"retries": {
				"bulk": 0,
				"search": 0
			},
			"throttled_millis": 0,
			"requests_per_second": -1,
			"throttled_until_millis": 0
		},
		"description": "reindex from [my_products] to [products_new]",
		"start_time_in_millis": 1694449113170,
		"running_time_in_nanos": 1679822000,
		"cancellable": true,
		"cancelled": false,
		"headers": {}
	},
	"response": {
		"took": 1408,
		"timed_out": false,
		"total": 3,
		"updated": 0,
		"created": 3,
		"deleted": 0,
		"batches": 3,
		"version_conflicts": 0,
		"noops": 0,
		"retries": {
			"bulk": 0,
			"search": 0
		},
		"throttled": "0s",
		"throttled_millis": 0,
		"requests_per_second": -1,
		"throttled_until": "0s",
		"throttled_until_millis": 0,
		"failures": []
	}
}

可以看到,任务成功,创建了3条文档,分了3个批次等信息。


3、Slicing 切片查询

reindex支持分片滚动来并行化处理过程,这种并行可以提高效率,为将请求分解为更小的部分提供了一种方便的方法。
它分为手动和自动两种:手动和自动

1)、手动切片 Manual slicing
在source索引中增加配置如下:

// 第一个
{
  "source": {
    "index": "my_products",
    "slice": {
      "id": 0,
      "max": 2
    }
  },
  "dest": {
    "index": "products_new"
  }
}

// 第二个
{
  "source": {
    "index": "my_products",
    "slice": {
      "id": 1,
      "max": 2
    }
  },
  "dest": {
    "index": "products_new"
  }
}

slice.id 指定分片id;slice.max 指定分片数量。
注意:slice.id 需要 小于slice.max字段,slice.max 必须大于1。

如果将slice.max设置成2,那么会分成两个切片进行获取目标索引文档,两个切片的数据并集等价于不分片的数据。

1)、自动切片 Automatic slicing
通过指定 url参数 ?slices={num} 指定切片数,进行自动完成切片处理,一次请求即可,如下所示:

POST http://127.0.0.1:9200/_reindex?slices=3

{
  "source": {
    "index": "my_products"
  },
  "dest": {
    "index": "products_new"
  }
}

结果如下:

{
	"took": 1637,
	"timed_out": false,
	"total": 3,
	"updated": 0,
	"created": 3,
	"deleted": 0,
	"batches": 2,
	"version_conflicts": 0,
	"noops": 0,
	"retries": {
		"bulk": 0,
		"search": 0
	},
	"throttled_millis": 0,
	"requests_per_second": -1,
	"throttled_until_millis": 0,
	"slices": [
		{
			"slice_id": 0,
			"total": 2,
			"updated": 0,
			"created": 2,
			"deleted": 0,
			"batches": 1,
			"version_conflicts": 0,
			"noops": 0,
			"retries": {
				"bulk": 0,
				"search": 0
			},
			"throttled_millis": 0,
			"requests_per_second": -1,
			"throttled_until_millis": 0
		},
		{
			"slice_id": 1,
			"total": 1,
			"updated": 0,
			"created": 1,
			"deleted": 0,
			"batches": 1,
			"version_conflicts": 0,
			"noops": 0,
			"retries": {
				"bulk": 0,
				"search": 0
			},
			"throttled_millis": 0,
			"requests_per_second": -1,
			"throttled_until_millis": 0
		},
		{
			"slice_id": 2,
			"total": 0,
			"updated": 0,
			"created": 0,
			"deleted": 0,
			"batches": 0,
			"version_conflicts": 0,
			"noops": 0,
			"retries": {
				"bulk": 0,
				"search": 0
			},
			"throttled_millis": 0,
			"requests_per_second": -1,
			"throttled_until_millis": 0
		}
	],
	"failures": []
}

可以看到,分了3个切片进行reindex,每个slice都包括了对应的处理数据量。

使用slice时注意点
1、当slice切片数等于索引的分片数时,处理效率最高。
2、如果切片数很大(如500),远大于索引分片数时,反而会影响效率。


总结

ES提供了reindex的api在一些场景还是比较有用的,能够可配置且高效的完成数据的重新索引,对于只有一个1主分片,文档大概6w+的源索引, reindex时size改成5千,其它默认,基本上秒级别可以完成处理。

reindex支持做字段的特殊处理,支持只reindex特定文档,支持目标索引 ingest_pipeline处理等等多个功能。

但reindex是一个一次性的动作,其基于源索引文档快照完成。