from argparse import ArgumentParser from contextlib import contextmanager from glob import glob import os import sys import elasticsearch from ctypes import CDLL, POINTER, cast, c_int, c_size_t, c_ssize_t, c_ubyte, c_void_p from mmap import MAP_SHARED, PROT_READ, PAGESIZE libc = CDLL('libc.so.6') assert libc MAP_FAILED = c_void_p(-1) # mmap/munmap/mincore definitions mostly copied from # https://github.com/spotify/mlockexec/blob/master/mlockexec/libc.py # set correct parameter types c_off_t = c_ssize_t # void *mmap(void *addr, size_t length, int prot, # int flags, int fd, off_t offset); mmap = libc.mmap mmap.restype = c_void_p mmap.argtypes = [c_void_p, c_size_t, c_int, c_int, c_off_t] # int munmap(void *addr, size_t length); munmap = libc.munmap munmap.restype = c_void_p munmap.argtypes = [c_void_p, c_size_t] # int mincore(void *addr, size_t length, unsigned char *vec); libc.mincore.restype = c_int libc.mincore.argtypes = [c_void_p, c_size_t, POINTER(c_ubyte)] def mincore(addr, length): pages = (length + PAGESIZE - 1) // PAGESIZE vec = (c_ubyte * pages)() res = libc.mincore(addr, length, cast(vec, POINTER(c_ubyte))) assert res == 0 return vec @contextmanager def open_fd(path: str, flags: int): fd = os.open(path, flags) try: yield fd finally: os.close(fd) @contextmanager def mmap_cm(addr, length, prot, flags, fd, offset): addr_mmap = mmap(addr, length, prot, flags, fd, offset) assert addr_mmap != MAP_FAILED try: yield addr_mmap finally: munmap(addr_mmap, length) def file_mincore(path: str): try: with open_fd(path, os.O_RDONLY) as fd: length = os.stat(fd).st_size # cannot mmap a zero size file if length == 0: return [] with mmap_cm(0, length, PROT_READ, MAP_SHARED, fd, 0) as file_mmap: vec = mincore(file_mmap, length) return list(vec) except FileNotFoundError: # Most likely the file was deleted, at least with current callers return [] def file_pages_cached(path: str): return sum(x > 0 for x in file_mincore(path)) def glob_pages_cached(glob_pattern): return sum(file_pages_cached(fname) for fname in glob(glob_pattern)) def shard_paths(client, indices): cluster_name = client.cluster.health()['cluster_name'] index_base_path = '/srv/elasticsearch/{cluster_name}/nodes/0/indices'.format(**locals()) assert os.path.exists(index_base_path) for index_meta in client.cat.indices(indices, format='json', local=True): index_path = os.path.join(index_base_path, index_meta['uuid']) index_name = index_meta['index'] if not os.path.exists(index_path): continue for shard_path in glob(os.path.join(index_path, '*')): basename = os.path.basename(shard_path) if basename == '_state': continue shard_id = int(basename) yield index_meta['index'], shard_id, shard_path def arg_parser(): parser = ArgumentParser() parser.add_argument('indices', nargs='+') parser.add_argument('--port', default=9200, type=int) return parser def main(indices, port): client = elasticsearch.Elasticsearch('localhost:{}'.format(port)) for index_name, shard_id, shard_path in shard_paths(client, ','.join(indices)): shard_pages = glob_pages_cached(os.path.join(shard_path, 'index/*')) print('{} {} {} MB'.format(index_name, shard_id, shard_pages * PAGESIZE / 1024 / 1024)) if __name__ == '__main__': sys.exit(main(**(dict(vars(arg_parser().parse_args())))))