ハニーポットのログをELKで一元管理する

先日の第3回ハニーポッター技術交流会で,ハニーポットのログの可視化に関連したLTをしていた方が何名かいらっしゃいました.

私は今までハニーポットのログを収集してもテキストベースの分析しかしたことがなく,資料としてグラフ等が必要なときはExcelを使って作成していました.生のログを分析する力ももちろん大事だとは思いますが,T-PotMHN(Modern Honey Network)のようにブラウザからアクセスするだけでリアルタイムに分析結果が可視化されるといった環境に非常に魅力を感じていました.

というのもテキストベースのログ分析だと,特定の国からの攻撃が多かったなどの攻撃の動向を調べるだけでもパソコンに向かってある程度の時間をかける必要があり,毎日行うのは難しいです.それに対してログ分析結果の可視化環境が整っていれば,毎日ブラウザからアクセスするだけで攻撃の動向がチェックでき,その上で新たな視点からログ分析を行うこともできます.

しかしT-PotやMHNは構築が簡単な反面,要求スペックは高く利用しない機能などもついています.そこで第3回ハニーポッター技術交流会のにほんももんがさんの発表を参考に,サーバにELKを導入しハニーポットのログの一元管理及び可視化を行いたいと思います.

ELKとは

ELKとは,分散処理マルチテナント対応の検索エンジンであるElasticsearch,サーバサイドデータ処理パイプラインのLogstash,Elasticsearchのデータの可視化ツールであるKibanaの頭文字を取ったもので,これらを組み合わせてログ集中管理を行う環境を俗にELK Stackと呼びます.

ELKの導入

ELKのインストール

ELKはJava環境で動作するため,サーバにJavaをインストールする必要があります.

Javaのインストール

# yum install java-1.8.0-openjdk

Elasticsearchリポジトリの追加

ELKをインストールするために,リポジトリを追加します.今回は最新版の6.x系をインストールします.(2018年3月現在)

# vim /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

Elasticsearch, Logstash, Kibanaをインストール

# yum install elasticsearch logstash kibana

ELKの設定

Elasticsearchの設定

# vim /etc/elasticsearch/elasticsearch.yml
network.host: xxx.xxx.xxx.xxx    // サーバのIPアドレス
http.port: 9200    // ポートを指定
http.cors.enabled: true    // 最後に追記

Kibanaの設定

# vim /etc/kibana/kibana.yml
server.port: 5601    // ポートを指定
server.host: "xxx.xxx.xxx.xxx"    // サーバのIPアドレス
elasticsearch.url: "http://xxx.xxx.xxx.xxx:9200"    // Elasticsearchを指定

Logstashの設定

今回はCowrieとGlastopfのログをElasticsearchに格納するという前提で,設定ファイルを記述します.この部分は自分が管理したいハニーポットなどのログファイルの書式に応じて適宜変更を行ってください.

# vim /etc/logstash/conf.d/logstash.conf
# Input Section
input {

  # Cowrie
    file {
      path => ["/home/cowrie/cowrie/log/cowrie.json"]
      start_position => beginning
      codec => json
      type => "Cowrie"
    }

  # Glastopf
    file {
      path => ["/opt/glastopf/log/glastopf.log"]
      start_position => beginning
      type => "Glastopf"
    }
}

# Filter Section
filter {

  # Cowrie
  if [type] == "Cowrie" {
    date {
      match => [ "timestamp", "ISO8601" ]
    }
    mutate {
      rename => {
        "dst_port" => "dest_port"
        "dst_ip" => "dest_ip"
      }
    }
  }

  # Glastopf
  if [type] == "Glastopf" {
    grok {
      match => [ "message", "\A%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{NOTSPACE}%{SPACE}%{IP:src_ip}%{SPACE}%{WORD}%{SPACE}%{URIPROTO:http_method}%{SPACE}%{NOTSPACE:http_uri}%{SPACE}%{NOTSPACE}%{SPACE}%{HOSTNAME}:%{NUMBER:dest_port:integer}" ]
    }
    date {
      match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
      remove_field => ["timestamp"]
    }
  }

  # Add geo coordinates / ASN info /IP rep.
  if [src_ip] {
    geoip {
      cache_size => 10000
      source => "src_ip"
      database => "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"
    }
    geoip {
      cache_size => 10000
      source => "src_ip"
      database => "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"
    }
    translate {
      refresh_interval => 86400
      field => "src_ip"
      destination => "ip_rep"
      dictionary_path => "/opt/listbot/iprep.yaml"
    }
  }

  # In some rare conditions dest_port, src_port is indexed as string, forcing integer for now
  if [dest_port] {
    mutate {
      convert => { "dest_port" => "integer" }
    }
  }
  if [src_port] {
    mutate {
      convert => { "src_port" => "integer" }
    }
  }
}

# Output Section
output {
  elasticsearch {
    hosts => ["xxx.xxx.xxx.xxx:9200"]    // Elasticsearchを指定
  }
}

その他設定

Cowrieのホームディレクトリのパーミッション変更

これについてはもっと利口な方法があると思いますが,LogstashがCowrieのログファイルを読み込むためにとりあえずパーミッションを変更します.

chmod 755 /home/cowrie/

Listbotの導入

T-Potが収集し公開している,攻撃元IPアドレスのデータベースを利用することで,頻繁に攻撃活動を行うIPアドレスからの通信数をグラフ化できるようにします.またデータベースは頻繁に更新されるため,crondに自動更新を登録しておきます.

# cd /opt/
# git clone https://github.com/dtag-dev-sec/listbot

# vim ip_rep-update.sh
#!/bin/bash

cd /opt/listbot/
git pull

exit 0

# vim /etc/crontab
0 2 * * * root bash /root/bin/ip_rep-update.sh    // 追記

firewallの設定

# firewall-cmd --permanent --add-service=kibana
# firewall-cmd --reload

ELKの起動と自動起動登録

# systemctl start elasticsearch
# systemctl start logstash
# systemctl start kibana
# systemctl enable elasticsearch
# systemctl enable logstash
# systemctl enable kibana

各種サービスが正常に動作しているかを確認

Elasticsearchの起動確認

curlコマンドを用いてElasticsearchにアクセスし,次のような応答が返ってくれば正常の動作しています.

# curl "http://xxx.xxx.xxx.xxx:9200"
{
  "name" : "VRUYKr6",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "N8UqpWXvSJWcm3SMzR6zdQ",
  "version" : {
    "number" : "6.2.2",
    "build_hash" : "10b1edd",
    "build_date" : "2018-02-16T19:01:30.685723Z",
    "build_snapshot" : false,
    "lucene_version" : "7.2.1",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

Logstashの起動確認

Logstashについては/var/log/logstash/logstash-plain.logにログが出力されるので,ここにエラーが出力されていないかチェックしてください.

Kibanaの起動確認

ブラウザからhttp://xxx.xxx.xxx.xxx:5601にアクセスして次のようなページが表示されたら正常に動作しています.

Index patternの作成

  1. Kibanaの左側メニューのManagementをクリック
  2. Index Patternsをクリック
  3. +Create Index Patternをクリック
  4. Index Patternにlogstash-*と入力し,Next Stepをクリック
  5. Time Filter field nameに@timestampを選択し,Create Index Patternをクリック

後はGUIの指示に従って,Visualizeでグラフや表を作成し,お好みのDashboardを作成すれば見事リアルタイムログ分析環境の完成です!

おわりに

今回はELKを用いたハニーポットのログ一元管理化及び可視化を行いました.

先日参加した第3回ハニーポッター技術交流会で得た知識を活かして,より良いハニーポット環境を構築することができました.交流会は本当に得られたものが大きく参加してよかったと感じています.まだまだやりたいことがたくさんあるので少しづつ実現していきたいです^^

参考文献

http://onthesoup.hatenablog.com/entry/2018/02/11/061500
https://knowledge.sakura.ad.jp/2736/