scrapyでmysql入れたデータをlogstashを用いてElasticsearchと同期
まずは必要なものをインストール
使用したものはこんな感じです.
- pyenv
- python3.6.2
- MySQL
- mysqlclient
- scrapy
- Elasticsearch
- Kibana
- logstash
pyenvを使ってscrapyを使用するための環境を作成するのは過去の記事でやりました.
http://dmchrt.hatenablog.com/entry/2017/09/08/python導入&scrapy環境構築
今回は実際にscrapyを使用してgunosyニュースを取得し,最終的にはElasticsearchにデータを突っ込んだ手順を追っていきたいと思います.
Scrapyを使用してクローリングし,MySQLにデータを入れるまで
まずはMySQLと,pythonでMySQLを操作するために必要なmysqlclientをダウンロードします.
$ brew update $ brew install mysql $ brew info mysql
mysqlclientをインストールするpyenvの環境のディレクトリで下記を実行.
$ pip install mysqlclient
とりあえずこれでMySQLにデータを入れるのに必要なものは揃いました.
pythonを使ってプログラムを書いていきます.
pyenvで開発環境を整えたディレクトリでプロジェクトを作成し,treeコマンドで確認します.
$ scrapy startproject gunosy $ cd gunosy $ scrapy genspider gunosy_spider gunosy.com $ tree . ├── gunosy │ ├── __init__.py │ ├── __pycache__ │ │ ├── __init__.cpython-36.pyc │ │ └── settings.cpython-36.pyc │ ├── items.py │ ├── middlewares.py │ ├── pipelines.py │ ├── settings.py │ └── spiders │ ├── __init__.py │ ├── __pycache__ │ │ └── __init__.cpython-36.pyc │ └── gunosy_spider.py └── scrapy.cfg
まずはitems.pyとsettings.pyを編集します.
特にsettings.pyでダウンロード間隔を設定しないとサイトに迷惑がかかってしまうので必ずしましょう.
#items.py import scrapy class GunosyItem(scrapy.Item): url = scrapy.Field() タイトル = scrapy.Field() 元記事サイト名 = scrapy.Field() 更新日 = scrapy.Field() 取得日時 = scrapy.Field()
#settigs.py DOWNLOAD_DELAY = 3 #コメント化されてるのでコメントを外す.
ここまでできたら次は実際にspiderのコードを書いていきます.書き方は様々でしょうが,私は下記を参考にこのように書きました.
Python3でMySQLを使う – 基本操作からエラー処理までサンプルコード付 | Crane & to.
#gunosy_spider.py #必要なものをインポート import scrapy from gunosy.items import GunosyItem import datetime import MySQLdb import atexit #終了時に接続を切るのに使用 #データベース接続とカーソル作成. connection = MySQLdb.connect( host='localhost', user='root', passwd='*****', db='sample_db', charset='utf8') cursor = connection.cursor() cursor.execute("DROP TABLE IF EXISTS `gunosy_data`") class GunosySpiderSpider(scrapy.Spider): name = 'gunosy_spider' allowed_domains = ['gunosy.com'] start_urls = ['https://gunosy.com/categories/14'] def parse(self, response): #詳細ページをパルス for href in response.xpath('//div[@class="list_title"]/a/@href'): yield response.follow(href, self.parseItem) #次のページをパルス for href in response.xpath('//div[@class="pager-link-option"]/a/@href'): yield response.follow(href, self.parse) #詳細ページの処理 def parseItem(self, response): item = GunosyItem() item['url'] = response.url item['タイトル'] = response.xpath('//h1[@class="article_header_title"]/text()').extract_first() item['元記事サイト名'] = response.xpath('//ul[@class="article_header_lead"]/li[@class="article_header_lead_by"]/text()').extract_first() item['更新日'] = response.xpath('//ul[@class="article_header_lead"]/li[@class="article_header_lead_date"]/@content').extract_first() item['取得日時'] = datetime.datetime.utcnow() + datetime.timedelta(hours=9) # エラー処理 try: # CREATE cursor.execute("""CREATE TABLE IF NOT EXISTS `gunosy_data` ( `id` int(11) NOT NULL auto_increment primary key, `url` text not null, `タイトル` text NOT NULL, `元記事サイト名` text, `更新日` text, `取得日時` datetime ) """) # INSERT cursor.execute( "INSERT INTO gunosy_data (url, タイトル, 元記事サイト名, 更新日,取得日時)" "VALUES (%s, %s, %s, %s, %s)", (item['url'], item['タイトル'], item['元記事サイト名'], item['更新日'], item['取得日時'])) except MySQLdb.Error as e: print('MySQLdb.Error: ', e) yield item #プログラム終了時に呼び出され,接続を閉じるメソッド. def endMethod(): print ('\n\n\n\nFINISHED\n\n\n') # 保存を実行(忘れると保存されないので注意) connection.commit() # 接続を閉じる connection.close() atexit.register(endMethod)
これをmysqlで確認してみるとちゃんとデータが入っていました.
MySQLのデータをElasticsearchに突っ込む
次はMySQLに入れたデータをlogstashを用いてElasticsearchに入れていきます.
まずは必要なElastichsearch,Kibana,logstashをインストールします.
Download Elasticsearch Free • Get Started Now | Elastic
Download Kibana Free • Get Started Now | Elastic
Download Logstash Free • Get Started Now | Elastic
インストールしたら,logstashの設定ファイルを書きます.書いたのですが,正直詳しいことはまだ理解していません.; ;
とりあえずこのように書いて,ファイル名はgunosy.confにしてlogstashディレクトリに置きます.
input { jdbc { jdbc_driver_library => "mysql-connector-java-5.1.44/mysql-connector-java-5.1.44-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/sample_db" jdbc_user => "root" jdbc_password => "dnt2626h8" statement => "SELECT * FROM gunosy_data" type => "gunosy" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" clean_run => true } } filter { } output { elasticsearch { index => "gunosy_index" hosts => ["localhost:9200"] document_id => "%{id}" document_type => "gunosy_type" } stdout {codec => rubydebug {metadata => true }} }
そしてそれぞれ起動します.
elasricsearchディレクトリで下記を実行
$ bin/elasticsearch
kibanaディレクトリで下記を実行
$ bin/kibana
logstashディレクトリで下記を実行
$ bin/logstash -f gunosy.conf
データが入っているか確認.
kibanaでdev toolsで確認してみます.
ブラウザでhttp://localhost:5601/にアクセスしてDev Toolsで下記を実行
GET gunosy_index/_search { "query": {"match_all": {} }, "size": 30, "sort": [ { "id": { "order": "asc" } } ] }
これでElasticsearchにデータが入ったこともわかると思います.
elasticsearch-headを使って確認するのも便利かもしれません.
これからやること
logstashの設定がいまいちよくわかってないので,スケジューリングとか,そこらへんを色々やってみたいと思います.