作者:容 易 2015-12-18 15:07:37

版权所有未经允许请勿转载


   学习了几天elk,感觉非常不错。由于实际业务需求的关系,Logstash对我们显得太过庞大,而且相对学习成本也比较高。为了能够简单高效的实现日志解析并且写入elasticsearch。花了点时间写了一个简单的python并行日志处理的脚本。该脚本的并行处理框架已经写好了,理论上只需要简单修改就可以实现各种数据库的数据抽取处理并且插入elasticsearch。现在分享给大家,希望看了后不是一味的复制了事,能够将更完善的和更高效的优化代码进行反馈和分享谢谢,我的QQ46053710。

   该脚本以分析WEB应用日志为例,包括了Geoip信息的获取和geo_point类型的mappings,以及KIBANA的UTC时间的处理,另外还有读取和插入内容的统计等。大家可以根据自己的实际需求去修改和调整。

   另外进行批量插入的时候需要对elasticsearch的某些默认参数进行调整,否则可能会出现elasticsearch处理不过来等原因造成部分插入失败。以下参数是我在批量插入时elasticsearch调整的参数,可以根据实际的情况进行调整。


index.store.type: niofs

index.cache.field.type: soft

bootstrap.mlockall: true

threadpool.index.size: 8

threadpool.index.queue_size: 1200

threadpool.search.size: 8

threadpool.search.queue_size: 1000

threadpool.get.type: fixed

threadpool.get.size: 8            

threadpool.get.queue_size: 1000

threadpool.bulk.type: fixed

threadpool.bulk.size: 8

threadpool.bulk.queue_size: 1200

threadpool.flush.size: 8

threadpool.flush.queue_size: 1000

indices.store.throttle.type: merge

indices.store.throttle.max_bytes_per_sec: 100mb

index.merge.scheduler.max_thread_count: 16

index.translog.flush_threshold_size: 1024MB

index.gateway.local.sync: 30s

index.refresh_interval: 30s

index.translog.interval: 15s

index.translog.flush_threshold_ops: 50000

index.translog.flush_threshold_period: 30s


经过处理写入elasticsearch的基本数据结构如下

   res = {"@timestamp": ‘2015-12-09T12:31:22.000+08:00’,

         "remote_ip": ‘112.64.186.119’,

         "rel_ip": ’10.98.50.131’,

         "rel_port": ‘8010’,

         "domain": ‘www.3qos.com’,

         "v_ip": ’10.98.3.162’,

         "r_time": 30,

         "status": 200,

         "bytes": 128,

         "r_url": ‘/mas/’,

         "r_method": ‘GET’,

         "user_agent": ‘Apache-HttpClient/4.3.3 (java 1.5)’,

"geoip":geoip,                                                        

"location": {"lon":geoip["longitude"], "lat": geoip["latitude"]}

              }

   脚本内容

__author__ = 'tiger'

# -*- coding: utf-8 -*-

#!/usr/bin/env python

import multiprocessing

import time, pygeoip, re,  sys

import datetime, requests, json

from elasticsearch import Elasticsearch

from elasticsearch import helpers


reload(sys)

sys.setdefaultencoding("utf8")


def read_file(data_queue, source_file, rsize, insert_process):

   total_count = 0

   try:

       stdin_log = open(source_file, 'r')

   except Exception, err_info:

       print err_info

   lines = stdin_log.readlines(rsize)

   while lines:

       total_count = total_count + len(lines)

       data_queue.put(lines)

       #print data_queue.qsize()

       lines = stdin_log.readlines(rsize)

   stdin_log.close()

   for i in range(insert_process):

       data_queue.put('ok')

   data_queue.task_done()

   print "read lines %s." % total_count


def insert_es(data_queue, es_info, index_name, type_name, commplete_queue, geodb_path):

   es_info = es_info

   try:

       es = Elasticsearch([es_info])

   except Exception, err:

       print "conn es error " + str(err)

   total_count = 0

   #正则匹配""包含的内容

   regex_str = re.compile('\B".*?"\B')

   #获取源IP的地域信息

   geodb = geodb_path

   gi = pygeoip.GeoIP(geodb, pygeoip.MEMORY_CACHE)

   while 1:

       res_list = data_queue.get()

       values_list = []

       if res_list == 'ok':

           break

       rows = len(res_list)

       error_rows = 0

       for line in res_list:

           #字符集的特殊处理,否则可能会出错,可以根据实际情况去调整

           try:

               line = line.encode('utf-8')

           except:

               line = line.decode('GB2312','ignore').encode('utf-8')

           line_format = line.strip().split()

           r_time = line_format[9][1:]

           date = datetime.datetime.strptime(r_time, '%d/%b/%Y:%H:%M:%S')

           x = date.strftime('%Y-%m-%dT%H:%M:%S.000+08:00')

           res_info = regex_str.findall(line)

           try:

               geoip = gi.record_by_addr(line_format[0])

           except Exception, err:

               print err + ' ip:' + str(line_format[0])

               geoip = 0

           try:

               url_method = res_info[0].replace('"', "").strip().split()

               res = {"@timestamp": x, "remote_ip": line_format[0],

                      "rel_ip": line_format[1], "rel_port": line_format[4],

                      "domain": line_format[2], "v_ip": line_format[3],

                      "r_time": int(line_format[5]), "status": int(line_format[14]),

                      "bytes": int(line_format[15]), "r_url": url_method[1].split('?')[0],

                      "r_method": url_method[0], "user_agent": res_info[2].replace('"', ""),

                      "referer_url": res_info[1].replace('"', "").strip().split('?')[0]}

           except Exception, err:

               error_rows = error_rows + 1

               print "res " + str(err)

           if geoip:

               res["geoip"] = geoip

               res["location"] = {"lon": geoip["longitude"], "lat": geoip["latitude"]}

           values_list.append({"_index": index_name, "_type": type_name, "_source": res})

       try:

           helpers.bulk(es, values_list)

           total_count = total_count + rows - error_rows

       except Exception, error_info:

           print "insert error " + str(error_info)

   commplete_queue.put(total_count)

   print "insert process commplete will stop."



def main():

   #定义日志文件的绝对路径

   source_file = '/root/chinapnr151209.log.5'

   #需要导入的日志文件

   #es服务器的IP,默认端口9200

   es_info = "192.168.4.181"

   #插入指定的索引和type

   index_name = 'pnr_netscaler-map-log'

   type_name = 'maplog'

   #索引是否存在,如果为0代表需要创建索引

   index_create = 0

   #URL需要map的索引

   url = 'http://192.168.4.181:9200/' + index_name

   #每次读取文件的SIZE,例如每次读取512K   1024 * 512 * 1

   rsize = 1024 * 1024 * 1

   start_time = time.time()

   #定义执行批量insert的进程数

   insert_process = 16

   #总读取的行数

   total_row = 0

   #geoip 数据的路径

   geodb = '/root/test/GeoLiteCity.dat'

   data_manager = multiprocessing.Manager()

   data_queue = data_manager.Queue(maxsize=40)

   commplete_manager = multiprocessing.Manager()

   pro_pool = multiprocessing.Pool(processes=insert_process + 1)

   commplete_queue = commplete_manager.Queue(maxsize=insert_process)

   #索引是否存在,如果为0代表需要创建索引

   if index_create == 0:

       try:

           #es_create = Elasticsearch([es_info])

           #es_create.indices.create(index=index_name, ignore=400)

           values = {"mappings": {"maplog": {"properties": {"location": {"type": "geo_point"}}}}}

           r = requests.put(url, json.dumps(values))

           print r

       except Exception, err:

           print err

   print "start insert es process."

   for i in xrange(insert_process):

       pro_pool.apply_async(insert_es, args=(data_queue, es_info, index_name, type_name, commplete_queue, geodb))

   print "start select db process."

   pro_pool.apply_async(read_file, args=(data_queue, source_file, rsize, insert_process))

   pro_pool.close()

   pro_pool.join()

   while insert_process > 0:

       single_count = commplete_queue.get()

       total_row = total_row + single_count

       insert_process = insert_process - 1

   print "insert lines:%s." % total_row

   elapsed_time = time.time() - start_time

   print "The elapsed time is %s seconds." % elapsed_time


if __name__ == "__main__":

   main()


One Response


    还没有评论!
1  

Leave your comment

请留下您的姓名(*)

请输入正确的邮箱地址(*)

请输入你的评论(*)


感谢开源 © 2016. All rights reserved.&3Q Open Source&^_^赣ICP备15012863^_^
乐于分享共同进步 KreativeThemes