未定

大学4回生です。調べた事をまとめたり、日記も書くかもしれません。

scrapyでmysql入れたデータをlogstashを用いてElasticsearchと同期

まずは必要なものをインストール

使用したものはこんな感じです.

  • pyenv
  • python3.6.2
  • MySQL
    • mysqlclient
    • scrapy
  • Elasticsearch
  • Kibana
  • logstash

pyenvを使ってscrapyを使用するための環境を作成するのは過去の記事でやりました.

http://dmchrt.hatenablog.com/entry/2017/09/08/python導入&scrapy環境構築


今回は実際にscrapyを使用してgunosyニュースを取得し,最終的にはElasticsearchにデータを突っ込んだ手順を追っていきたいと思います.

Scrapyを使用してクローリングし,MySQLにデータを入れるまで

まずはMySQLと,pythonMySQLを操作するために必要なmysqlclientをダウンロードします.

MySQL.

$ brew update
$ brew install mysql
$ brew info mysql 

mysqlclientをインストールするpyenvの環境のディレクトリで下記を実行.

$ pip install mysqlclient

とりあえずこれでMySQLにデータを入れるのに必要なものは揃いました.
pythonを使ってプログラムを書いていきます.
pyenvで開発環境を整えたディレクトリでプロジェクトを作成し,treeコマンドで確認します.

$ scrapy startproject gunosy
$ cd gunosy
$ scrapy genspider gunosy_spider gunosy.com
$ tree

.
├── gunosy
│   ├── __init__.py
│   ├── __pycache__
│   │   ├── __init__.cpython-36.pyc
│   │   └── settings.cpython-36.pyc
│   ├── items.py
│   ├── middlewares.py
│   ├── pipelines.py
│   ├── settings.py
│   └── spiders
│       ├── __init__.py
│       ├── __pycache__
│       │   └── __init__.cpython-36.pyc
│       └── gunosy_spider.py
└── scrapy.cfg

まずはitems.pyとsettings.pyを編集します.
特にsettings.pyでダウンロード間隔を設定しないとサイトに迷惑がかかってしまうので必ずしましょう.

#items.py
import scrapy
class GunosyItem(scrapy.Item):
    url = scrapy.Field()
    タイトル = scrapy.Field()
    元記事サイト名 = scrapy.Field()
    更新日 = scrapy.Field()
    取得日時 = scrapy.Field()
#settigs.py
DOWNLOAD_DELAY = 3 #コメント化されてるのでコメントを外す.

ここまでできたら次は実際にspiderのコードを書いていきます.書き方は様々でしょうが,私は下記を参考にこのように書きました.
Python3でMySQLを使う – 基本操作からエラー処理までサンプルコード付 | Crane & to.

#gunosy_spider.py

#必要なものをインポート
import scrapy
from gunosy.items import GunosyItem
import datetime
import MySQLdb
import atexit #終了時に接続を切るのに使用

#データベース接続とカーソル作成.
connection = MySQLdb.connect(
    host='localhost', user='root', passwd='*****', db='sample_db', charset='utf8')
cursor = connection.cursor()
cursor.execute("DROP TABLE IF EXISTS `gunosy_data`")

class GunosySpiderSpider(scrapy.Spider):
    name = 'gunosy_spider'
    allowed_domains = ['gunosy.com']
    start_urls = ['https://gunosy.com/categories/14']

    def parse(self, response):
        #詳細ページをパルス
        for href in response.xpath('//div[@class="list_title"]/a/@href'):
            yield response.follow(href, self.parseItem)

        #次のページをパルス
        for href in response.xpath('//div[@class="pager-link-option"]/a/@href'):
            yield response.follow(href, self.parse)

    #詳細ページの処理
    def parseItem(self, response):
        item = GunosyItem()
        item['url'] = response.url
        item['タイトル'] = response.xpath('//h1[@class="article_header_title"]/text()').extract_first()
        item['元記事サイト名'] = response.xpath('//ul[@class="article_header_lead"]/li[@class="article_header_lead_by"]/text()').extract_first()
        item['更新日'] = response.xpath('//ul[@class="article_header_lead"]/li[@class="article_header_lead_date"]/@content').extract_first()
        item['取得日時'] = datetime.datetime.utcnow() + datetime.timedelta(hours=9)

        # エラー処理
        try:
            # CREATE
            cursor.execute("""CREATE TABLE IF NOT EXISTS `gunosy_data` (
            `id` int(11) NOT NULL auto_increment primary key,
            `url` text not null,
            `タイトル` text  NOT NULL,
            `元記事サイト名` text,
            `更新日` text,
            `取得日時` datetime
          ) """)

            # INSERT
            cursor.execute(
            "INSERT INTO gunosy_data (url, タイトル, 元記事サイト名, 更新日,取得日時)"
            "VALUES (%s, %s, %s, %s, %s)",
            (item['url'], item['タイトル'], item['元記事サイト名'], item['更新日'], item['取得日時']))

        except MySQLdb.Error as e:
            print('MySQLdb.Error: ', e)

        yield item

    #プログラム終了時に呼び出され,接続を閉じるメソッド.
    def endMethod():
        print ('\n\n\n\nFINISHED\n\n\n')
        # 保存を実行(忘れると保存されないので注意)
        connection.commit()
        # 接続を閉じる
        connection.close()

    atexit.register(endMethod)

これをmysqlで確認してみるとちゃんとデータが入っていました.

MySQLのデータをElasticsearchに突っ込む

次はMySQLに入れたデータをlogstashを用いてElasticsearchに入れていきます.
まずは必要なElastichsearch,Kibana,logstashをインストールします.
Download Elasticsearch Free • Get Started Now | Elastic
Download Kibana Free • Get Started Now | Elastic
Download Logstash Free • Get Started Now | Elastic

インストールしたら,logstashの設定ファイルを書きます.書いたのですが,正直詳しいことはまだ理解していません.; ;
とりあえずこのように書いて,ファイル名はgunosy.confにしてlogstashディレクトリに置きます.

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.44/mysql-connector-java-5.1.44-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/sample_db"
    jdbc_user => "root"
    jdbc_password => "dnt2626h8"
    statement => "SELECT * FROM gunosy_data"
    type => "gunosy"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric"
    clean_run => true
  }
}
filter {
}
output {
    elasticsearch {
      index => "gunosy_index"
      hosts => ["localhost:9200"]
      document_id => "%{id}"
      document_type => "gunosy_type"
  }
  stdout {codec => rubydebug {metadata => true }}
}

そしてそれぞれ起動します.
elasricsearchディレクトリで下記を実行

$ bin/elasticsearch

kibanaディレクトリで下記を実行

$ bin/kibana

logstashディレクトリで下記を実行

$ bin/logstash -f gunosy.conf 

データが入っているか確認.
kibanaでdev toolsで確認してみます.
ブラウザでhttp://localhost:5601/にアクセスしてDev Toolsで下記を実行

GET gunosy_index/_search
{
  "query": {"match_all": {} },
  "size": 30,
  "sort": [
    {
      "id": {
        "order": "asc"
      }
    }
  ]
}

これでElasticsearchにデータが入ったこともわかると思います.

elasticsearch-headを使って確認するのも便利かもしれません.

これからやること

logstashの設定がいまいちよくわかってないので,スケジューリングとか,そこらへんを色々やってみたいと思います.