tokuhirom's blog.

'; DROP DATABASE database();

MySQL binlog API は row based mode でこそ、その真価を発揮する!!

空前の MySQL binlog API ブームですが、みなさん libreplication の examples/basic-[12] を実行するだけで満足してしまっているようです。しかし、libreplication のおもしろいのは examples/mysql2lucene の方なんです。

3つのロギングモード

普段はあまり意識しないかもしれないですが mysql の binlog には statement based, row based, mixed の3種類があります。

statement based は、実際に実行された SQL が記録されます。一部の関数でちょっと危険です。

row based では、実際に変更された行のデータが記録されます。

mixed では、危険な関数をつかった場合などには row based で記録し、そうでなければ statement based で記録します。最近はこれがデフォルトです。

詳細は http://thinkit.co.jp/article/95/1?page=0,2 このへんなどをご欄ください。

ロギングモードを row based にしよう

SET GLOBAL binlog_format = 'ROW';

とかやると、binlog_format が row based に変更されます。

この状態で binlog API をたたくと、TABLE_MAP_EVENT, WRITE_ROWS_EVENT, UPDATE_ROWS_EVENT, DELETE_ROWS_EVENT の4種類のイベントがおくられてくるようになります。

行データが一緒におくられてくるので、それをつかって手元のデータをアップデートすればいいのです。SQL statement を解析したりする必要はありません。

実際に MySQL::BinLog でためしてみる

以下のようなプログラムで簡単に行のデータをとれます。

use 5.016000;
use MySQL::BinLog;

my %table_map;
my $url = shift or pod2usage;
my $binlog = MySQL::BinLog->new(MySQL::BinLog::create_transport($url));
$binlog->connect();
$binlog->set_position(4);
say("connected: $binlog");
while (my $event = $binlog->wait_for_next_event()) {
    my $type = $event->get_event_type();
    if ($type eq TABLE_MAP_EVENT) {
        $table_map{$event->table_id} = $event;
    } elsif ($type ~~ [WRITE_ROWS_EVENT, UPDATE_ROWS_EVENT, DELETE_ROWS_EVENT]) {
        my $table_event = $table_map{$event->table_id}
            or die "Unknown table: " . $event->table_id;
        my $rows = MySQL::BinLog::Row_event_set->new($event, $table_event);
        my $iter = $rows->begin();
        while (my $row = $iter->next()) {
            if ($type eq WRITE_ROWS_EVENT) {
                say("INSERT");
                show_row($row);
            } elsif ($type eq UPDATE_ROWS_EVENT) {
                say("UPDATE BEFORE");
                show_row($row);
                say("UPDATE AFTER");
                show_row($iter->next());
            } elsif ($type eq DELETE_ROWS_EVENT) {
                say("DELETE");
                show_row($row);
            }
        }
    }
}

sub show_row {
    my $row = shift;
    my $fields_iter = $row->begin;
    while (my $field = $fields_iter->next) {
        printf("       TYPE: %-10s STR: %s\n", $field->type_str, $field->as_string);
    }
}

この状態で以下のような SQL を発行します。

INSERT INTO hoi (id, name) values (1, "HOGEHOGE");
UPDATE hoi SET name="FUGAFUGA" WHERE id=1;
DELETE FROM hoi WHERE id=1;

すると、以下のような結果がえられます。

INSERT 
       TYPE: LONG       STR: 1
       TYPE: VARCHAR    STR: HOGEHOGE
       TYPE: TIMESTAMP  STR: 1341954455
UPDATE BEFORE
       TYPE: LONG       STR: 1
       TYPE: VARCHAR    STR: HOGEHOGE
       TYPE: TIMESTAMP  STR: 1341954455
UPDATE AFTER
       TYPE: LONG       STR: 1
       TYPE: VARCHAR    STR: FUGAFUGA
       TYPE: TIMESTAMP  STR: 1341954455
DELETE 
       TYPE: LONG       STR: 1
       TYPE: VARCHAR    STR: FUGAFUGA
       TYPE: TIMESTAMP  STR: 1341954455

まとめ

row based で binlog を記録することにより、行のデータを簡単に取得できます。

binlog API をつかうことで lucene, solr, groonga, rast などの全文検索エンジンにデータをうつすことが容易にできます。通常 solr を運用する場合は minutely で SELECT かけてデータをうつしたりすることが多いとおもいますが、binlog API をつかえばリアルタイムにデータをコピーできます。

mroonga のようなアプローチもありますが、こういうアプローチも面白いのではないでしょうか。mysql storage engine にくらべ制約がすくなく、開発期間も格段に短縮できそうですね。

このような、任意のストレージに replication できるというのが binlog API の主目的だとおもいますが、その他にもデータのモニタリングやらなんやら、アイディアはいろいろ考えられるのではないでしょうか。

エキスパートのためのMySQL[運用+管理]トラブルシューティングガイド

エキスパートのためのMySQL[運用+管理]トラブルシューティングガイド

TheSchwartz のような RDBMS をつかったジョブキューをリアルタイムに処理するアイディア

TheSchwartz のような RDBMS をつかった job queue は、新規に daemon をたてたりする必要がないので楽でいいのですが、一方で job の追加の timing が storage から push でおくられてこないので、若干の delay が生じてしまうのが難点でした。

この問題を解決するために、mysql の binlog API を用いて、処理してみるハックを考えてみました。
binlog API を利用すると更新クエリを streaming で処理できるので、こういったハックも簡単にできちゃいます。おもしろい。

use 5.016000;
use MySQL::BinLog;
use TheSchwartz;

my $sch = TheSchwartz->new(...) or die;
$sch->can_do($_) for ...;
$sch->work_until_done();

my $binlog = MySQL::BinLog->new(MySQL::BinLog::create_transport(...));
$binlog->connect();
$sch->work_until_done(); # and once more.

while (my $event = $binlog->wait_for_next_event()) {
    my $type = $event->get_event_type();
    if ($type eq QUERY_EVENT) {
        if ($event->query =~ /INSERT\s*INTO\s*job/i) {
            say("WORK!!!!!");
            $sch->work_until_done(); # ←  ここでたまってるジョブを一気にやっつける
        }
    }
}

mysql の binlog api を利用するための Perl Binding を作りました

mysql の binlog api をあつかうためのライブラリである libreplication というものがあるのですが、これの perl binding を開発しました。

https://github.com/tokuhirom/MySQL-BinLog
とりあえず、examples/basic-2.cpp でやっていることができるところまでつくっておきました。

バイナリログの内容を streaming で処理できるのがおもしろいです。

以下のような用途での応用が考えられるようです。夢がひろがりんぐですね。


libreplication のセットアップ方法については riywo さんの記事をごらんください。
http://blog.riywo.com/2012/07/08/133005


はまりポイントは、streaming 開始時に ROTATE_EVENT が発生して、最新のログをみにいかされるので、これを skip するために

$binlog->set_position(4);

としなくてはいけないことぐらいでしょうか。



主だったイベントをラップしてますが、まだ不十分なので、patches welcome です。



このブログがくわしいね。
http://intuitive-search.blogspot.jp/2011/07/binary-log-api-and-replication-listener.html

Inserting row without parameters

% mbp mysql -uroot  foo                                            [~] Tue 1 6:47
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 13
Server version: 5.5.16 MySQL Community Server (GPL)

Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create table x ( id int unsigned auto_increment not null );
ERROR 1075 (42000): Incorrect table definition; there can be only one auto column and it must be defined as a key
mysql> 
mysql> create table x ( id int unsigned auto_increment not null primary key);
Query OK, 0 rows affected (0.15 sec)

mysql> insert into x () values ();
Query OK, 1 row affected (0.05 sec)

mysql> insert into x default values;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'default values' at line 1
mysql> select * from x;
+----+
| id |
+----+
|  1 |
+----+
1 row in set (0.03 sec)
mysql> Bye
mbp sqlite3 foo.db                                               [~] Tue 1 6:48
SQLite version 3.6.22
Enter ".help" for instructions
Enter SQL statements terminated with a ";"
sqlite> create table x ( id integer not null primary key );
sqlite> insert into x () values ();
Error: near ")": syntax error
sqlite> insert into x default values;
sqlite> select * from x;
1
sqlite> 
Copyright (c) 2000, 2011, Oracle and/or its affiliate

MySQL のレプリケーションが動いているのか監視するスクリプト

【注意】
この記事は 2005-08-19 17:29:10 に自分がかいた記事の再掲です。


レプリケーションが止まってて悲しいことになることがあるので、監視することにした。

#!/usr/bin/perl
use strict;
use Data::Dumper;
use DBI;

my $dbh = DBI->connect('dbi:mysql:database;hostname=localhost','user','password', {AutoCommit => 1});
my $sth = $dbh->prepare(q{SHOW SLAVE STATUS});
$sth->execute();

while (my $c = $sth->fetchrow_hashref) {
        my $le = $c->{'Last_Error'};
        print "replication error : $le\n" if $le;

        if ($c->{'Slave_IO_Running'} ne 'Yes') {
                print "Slave IO is not running\n";
        }

        if ($c->{'Slave_SQL_Running'} ne 'Yes') {
                print "Slave SQL is not running\n";
        }

        my $rmlp = $c->{'Read_Master_Log_Pos'};
        my $emlp = $c->{'Exec_Master_Log_Pos'};
        if ($rmlp != $emlp) {
                print "considerable delay : $rmlp $emlp\n";
        }
}

5min ごとに MAILTO を設定した cron で動かしてやると、よさげ。

MAILTO=alert@example.com
*/5 * * * * 

実運用する時には、Read_Master_Log_Pos と Exec_Master_Log_Pos については、閾値をあげてやる方がよいかな。

mysqldump から、綺麗なスキーマを得る方法

下記のような簡単なワンライナーを書くだけで、綺麗な SQL ファイルを得られます!

ポイントは --quote-names=FALSE というオプション。これつけると、テーブル名が ` で囲まれなくなります。すっきり!

mysqldump --quote-names=FALSE --add-drop-table=FALSE -u hogehoge -p fugafuga -d | perl -pe 's!(.+ )(int\(|varchar\(|enum\(|datetime|timestamp)!sprintf("%-20s%s", $1, $2)!ge;s!(.+)(NOT NULL)!sprintf "%-40s%s", $1, $2!eg;s!(.+)( default )!sprintf("%-60s%s", $1, $2)!ge;s!(.+)( auto_increment)!sprintf("%-60s%s", $1, $2)!ge'

出力結果は下記のような感じになります。

CREATE TABLE `comment` (
  id                int(10) unsigned    NOT NULL             auto_increment,
  rid               varchar(10)         NOT NULL             default '',
  member_id         int(10) unsigned    NOT NULL             default '0',
  task_id           int(10) unsigned    NOT NULL             default '0',
  body text                             NOT NULL,
  created_on        datetime            NOT NULL             default '0000-00-00 00:00:00',
  `timestamp`       timestamp           NOT NULL             default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
  PRIMARY KEY  (id),
  UNIQUE KEY rid (rid)
) ENGINE=MyISAM DEFAULT CHARSET=ujis;