Muitos sistemas que levam dados ao Elasticsearch aproveitam os valores de _id gerados automaticamente para documentos recém-inseridos. No entanto, se a fonte de dados enviar acidentalmente o mesmo documento várias vezes, e se esses valores de _id gerados automaticamente forem usados ​​para cada documento que o Elasticsearch insere, esse mesmo documento será armazenado diversas vezes no Elasticsearch com valores _iddiferentes. Se isso ocorrer, é preciso detectar e remover os documentos duplicados usando o Logstash ou um código personalizado escrito em Python.

Leia também: Passo a passo para instalar o Stack ELK

Conheça o Semantix OpenGalaxy, uma plataforma de dados que possibilita subir o Stack ELK em 1-Click. 

Exemplo de documento

{
      "_index": "stocks",
      "_type": "doc",
      "_id": "6fo3tmMB_ieLOlkwYclP",
      "_version": 1,
      "found": true,
      "_source": {
        "CAC": 1854.6,
        "host": "Alexanders-MBP",
        "SMI": 2061.7,
        "@timestamp": "2017-01-09T02:30:00.000Z",
        "FTSE": 2827.5,
        "DAX": 1527.06,
        "time": "1483929000",
        "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version": "1"
      }
    }

Dado o exemplo de estrutura de documento acima, vamos assumir que, se vários documentos tiverem os mesmos valores para os campos [“CAC”, “FTSE”, “SMI”], eles estão duplicados.

Leia também: Organize seus dados com o novo Kibana Spaces

Remover documentos duplicados com o Logstash

O Logstash pode ser usado para detectar e remover documentos duplicados de um índice Elasticsearch. No exemplo abaixo, há uma configuração simples do Logstash que lê documentos de um índice em um cluster do Elasticsearch, depois usa o fingerprint filter para calcular um valor _id único para cada documento com base em um hash dos campos [“CAC”, “FTSE”, “SMI”] e, finalmente, grava cada documento em um novo índice no mesmo cluster do Elasticsearch, de modo que os documentos duplicados sejam gravados no mesmo _id e, portanto, eliminados.

Além disso, com pequenas modificações, o mesmo filtro Logstash também pode ser aplicado a futuros documentos gravados no índice recém-criado, a fim de garantir que as duplicatas sejam removidas quase em tempo real. Isso pode ser feito alterando a seção de entrada (input) para aceitar documentos de sua fonte de entrada em tempo real em vez de extrair documentos de um índice existente.

Esteja ciente de que o uso de valores _id personalizados (ou seja, um _id que não seja gerado pelo Elasticsearch) terá algum impacto no desempenho de gravação de suas operações de índices.

Além disso, vale a pena notar que, dependendo do algoritmo de hash usado, essa abordagem pode gerar um hash já utilizado para o valor _id, o que poderia resultar em dois documentos diferentes sendo mapeados para o mesmo _id e um deles poderia ser eliminado. Para a maioria dos casos práticos, a probabilidade de gerar um hash já utilizado é provavelmente muito baixa.

Configuração do Logstash usando o fingerprint filter:

input {
  # Read all documents from Elasticsearch 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}
filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

Script Python personalizado para remoção de documentos duplicados no Elasticsearch

Uma abordagem de memória eficiente

Outra opção para a remoção de documentos duplicados é por meio de um script python personalizado. Para essa abordagem, calculamos o hash dos campos [“CAC”, “FTSE”, “SMI”] que definimos para identificar exclusivamente um documento. Em seguida, usamos esse hash como chave em um dicionário python, em que o valor associado de cada entrada do dicionário será uma matriz dos _ids dos documentos que mapeiam para o mesmo hash.

Se mais de um documento tiver o mesmo hash, os documentos duplicados mapeados para o mesmo hash poderão ser excluídos. Como alternativa, se você estiver preocupado com a possibilidade de colisões de hash, o conteúdo de documentos que mapeiam para o mesmo hash pode ser examinado para ver se eles são realmente idênticos e, em caso afirmativo, as duplicatas podem ser eliminadas.

Leia também: Conheça o curso online de Introdução a Stack ELK

Análise de algoritmo de detecção

Para um índice de 50 GB, se assumirmos que o índice contém documentos com um tamanho médio de 0,4 kB, então haveria 125 milhões de documentos no índice. Nesse caso, a quantidade de memória necessária para armazenar as estruturas de dados de deduplicação ao usar um hash de 128 bits md5 seria da ordem de 128 bits x 125 Milhões = 2GB de memória, e os _ids bit 160 exigirão outros 160 bits x 125 Milhões = 2,5 GB de memória. Esse algoritmo, portanto, exigirá 4,5 GB de RAM para manter todas as estruturas de dados relevantes na memória.

Aprimoramento de algoritmo

Se você estiver armazenando dados temporais e souber que documentos duplicados ocorrerão apenas em um pequeno intervalo de tempo um do outro, talvez seja possível aprimorar a memória executando repetidamente o algoritmo em um subconjunto dos documentos no índice, com cada subconjunto correspondendo a uma janela de tempo diferente. Por exemplo, se você tiver um ano de dados, poderá usar range queries em seu campo de data e hora (dentro de um filter context para obter um melhor desempenho) para percorrer seu conjunto de dados uma semana por vez. Isso exigiria que o algoritmo fosse executado 52 vezes (uma vez por semana).

Nesse exemplo, você pode estar preocupado em não detectar documentos duplicados que se estendem por semanas. Vamos supor que os documentos duplicados não podem ocorrer com mais de 2 horas de diferença. Então você precisaria garantir que cada execução do algoritmo inclua documentos que se sobreponham por 2 horas com o último conjunto de documentos analisados ​​pela execução anterior do algoritmo. Para o exemplo semanal, você precisaria consultar 170 horas (1 semana + 2 horas) no valor de documentos temporais para garantir que nenhuma duplicata seja perdida.

Se você desejar limpar periodicamente documentos duplicados de seus índices, poderá executar esse algoritmo em documentos recebidos recentemente. Pode-se aplicar a mesma lógica – garanta que os documentos recebidos recentemente sejam incluídos na análise junto com uma sobreposição suficiente com documentos antigos para garantir que as duplicatas não sejam perdidas.

Código Python para detectar documentos duplicados

O código a seguir demonstra como os documentos podem ser avaliados com eficiência para verificar se são idênticos e, em seguida, eliminados. No entanto, para evitar a exclusão acidental de documentos, não executamos uma operação de exclusão.

O código para desduplicar documentos do Elasticsearch também pode ser encontrado no github .

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        # If the hashval is new, then we will create a new key
        # in the dict_of_duplicate_docs, which will be
        # assigned a value of an empty array.
        # We then immediately push the _id onto the array.
        # If hashval already exists, then
        # we will just push the new _id onto the existing array
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})
    # Get the scroll ID
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # Before scroll, process current batch of hits
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # Process current batch of hits
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # Update the scroll ID
        sid = data['_scroll_id']
        # Get the number of results that returned in the last scroll
        scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
    # Search through the hash of doc values to see if any
    # duplicate hashes have been found
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Get the documents that have mapped to the current hashval
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # In this example, we just print the duplicate docs.
            # This code could be easily modified to delete duplicates
            # here instead of printing them
            print("doc=%s\n" % doc)
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
main()