[AWS][Elasticsearch]记又一次Elasticsearch捕捉日志问题及解决

之前有过一次,可参见[AWS]记一次Elasticsearch无法捕捉到fargate log的解决

错误现象

第一步跑去查看lambda有没有报错日志,发现需先在Lambda里开启更详细的日志:

// Set this to true if you want to debug why data isn't making it to 
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;

看到类似于如下的报警信息:

this action would add [100] total shards, but this cluster currently has [4231]/[4000] maximum shards open

具体原因

7版本以上的elasticsearch开始,每个node默认只允许1000个分片,问题是因为集群分片数不足引起的。

默认情况下,每一个Index,ES会分配5个Shard并且各自有1个Copy,也就是10个Shards。
查看ES的index
GET _cat/indices

修复方法

临时修复方法1:修改max_shards_per_node至10000
PUT /_cluster/settings
{
  "transient": {
    "cluster": {
      "max_shards_per_node":10000
    }
  }
}

可惜这个参数不在AWS Managed ES的Supported Elasticsearch Operations之列

临时修复方法2:增加机器数

查看ES的index
GET _cat/indices

长期改进

再深一步,长期改进,就涉及到系统创建的Lambda里ES index的生成机制(代码分析)
参考Put index template API

方法一: 添加/修改template

本例中,可以通过template讲每个index的shard由5减少到2

PUT _template/template_1
{
  "index_patterns": ["*"],
  "settings": {
    "number_of_shards": 2
  }
}
方法二:shrink API(还没有尝试,需要研读)

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shrink-index.html

方法三:ES Curator来做Index Lifecycle Management

参考Using Curator to Rotate Data in Amazon Elasticsearch Service(此文同时包括snapshot deletion部分, 我还未尝试)
用Lambda跑Curator,注意:

  1. 配置与ES相同的VPC,Subnet和对应的Security Group
  2. “Both Lambda functions in this section need the basic logging permissions that all Lambda functions need, plus HTTP method permissions for the Amazon ES domain”

此处我的示例代码在原基础上稍微修改了下,删除90天以上index, 添加了日志输出index number和账号的自动判断

import boto3
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
import curator
import logging

region = 'ap-southeast-1' # For example, us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Lambda execution starts here.
def lambda_handler(event, context):
    account_id = boto3.client('sts').get_caller_identity().get('Account')
    logger.info("ACCOUNT ID IS: "+str(account_id))

    if account_id == '0xxxxxxx':
        host = 'vpc-nm-dev-logging-xxxxxxxxxxxxxxxxxx4.ap-southeast-1.es.amazonaws.com'
    elif account_id == '4yyyyyy':
        host = 'vpc-nm-prod-logging-yyyyyyyyyyyyyyyyyy.ap-southeast-1.es.amazonaws.com'
    else:
        pass

    # Build the Elasticsearch client.
    es = Elasticsearch(
        hosts = [{'host': host, 'port': 443}],
        http_auth = awsauth,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection
    )

    index_list = curator.IndexList(es)
    logger.info("Before Deletion %s indices" % len(index_list.indices))

    # Filters by age, anything with a time stamp older than 30 days in the index name.
    index_list.filter_by_age(source='name', direction='older', timestring='%Y.%m.%d', unit='days', unit_count=90)

    logger.info("Found %s indices to delete" % len(index_list.indices))

    # Filters by naming prefix.
    # index_list.filter_by_regex(kind='prefix', value='my-logs-2017')

    # # Filters by age, anything created more than one month ago.
    # # index_list.filter_by_age(source='creation_date', direction='older', unit='months', unit_count=1)

    # print("Found %s indices to delete" % len(index_list.indices))

    # If our filtered list contains any indices, delete them.
    if index_list.indices:
        curator.DeleteIndices(index_list).do_action()
        logger.info("Deletion Complete")
        logger.info("After Deletion %s indices" % len(index_list.indices))

备注:

2020年3月开始Amazon ES支持自动index management.
Automate index management with Amazon Elasticsearch Service
Index State Management

Reference:

maximum shards open什么原因导致的
Sizing Amazon ES Domains
记录 Elasticsearch 的 maximum shards open 问题