tokuhirom's blog

SparkSQL のクエリをユニットテストしたい

品質向上のために Spark クエリのユニットテストを実施したいという場合、JVM 言語で開発している場合には、Spark/hive をライブラリとしてロードできるから、容易に実装することができる。

dependencies {
    implementation 'org.apache.spark:spark-core_2.12:3.0.0'
    implementation 'org.apache.spark:spark-sql_2.12:3.0.0'
}

のように、関連するモジュールを依存に追加する。

以下のような、テストに利用するデータを json 形式などで用意する(spark は CSV, TSV などの形式も利用可能だから、好きなものを使えばよい)

{"name": "Nick",    "age":35,   "extra_fields": "{\"interests\":[\"car\", \"golf\"]}"}
{"name": "John",    "age":23}
{"name":"Cathy",    "age":44,   "extra_fields":"{\"interests\":[\"cooking\"]}"}

あとは、実際に spark session を作成し、local モードで spark を起動させれば良い。

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

// test without
class SimpleTest {
    fun run() {
        val spark: SparkSession = SparkSession
            .builder()
            .appName("Java Spark SQL basic example") // your application name
            .config("spark.master", "local")  // run on local machine, single thread.
            .config("spark.ui.enabled", false)
            .getOrCreate()

        val resourcePath = javaClass.classLoader.getResource("test-data/people.json")!!.toString()
        println("++++ read csv from: $resourcePath")

        val df = spark.read()
            .json(resourcePath)
        df.show()
        df.printSchema()

        println("++++ create table")
        df.createTempView("people")

        println("++++ select")
        val sqlDF: Dataset<Row> = spark.sql("SELECT * FROM people")
        sqlDF.show(false)

        println("++++ select 2nd")
        val sqlDF2: Dataset<Row> = spark.sql("SELECT name, get_json_object(extra_fields, '$.interests') interests FROM people")
        sqlDF2.show()

        println("++++ select 3rd")
        val sqlDF3: Dataset<Row> = spark.sql("SELECT avg(age) avg_age FROM people")
        sqlDF3.show()
    }
}

fun main() {
    SimpleTest().run()
}

hive を使う場合

実際に動かすクエリが select * from your_db_name.your_table のようにDB 名を指定していて、そのクエリ自体を変えずにテストしたいという場合には、hive サポートを有効にする必要がある。

hive を使う場合、spark-hive を依存に追加する。

dependencies {
    implementation 'org.apache.spark:spark-core_2.12:3.0.0'
    implementation 'org.apache.spark:spark-sql_2.12:3.0.0'
    implementation 'org.apache.spark:spark-hive_2.12:3.0.0'
}

あとは以下のように DB を作って入れるだけ。

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession

class TestClass {
    fun run() {
        val warehouseLocation = createTempDir()
        println("++++ warehouseLocation=$warehouseLocation")
        val spark: SparkSession = SparkSession
            .builder()
            .appName("Java Spark SQL basic example") // your application name
            .config("spark.master", "local")  // run on local machine, single thread.
            .config("spark.sql.warehouse.dir", warehouseLocation.toString())
            .config("spark.ui.enabled", false)
            .enableHiveSupport()
            .getOrCreate()

        val resourcePath = javaClass.classLoader.getResource("test-data/people.json")!!.toString()
        println("++++ read csv from: $resourcePath")

        val df = spark.read()
            .json(resourcePath)
        df.show()
        df.printSchema()

        println("++++ create table")
        spark.sql("create database if not exists foo")
        df.write().mode(SaveMode.Overwrite).saveAsTable("foo.people")
        spark.sql("show tables").show()
        spark.sql("show create table foo.people").show(false)

        // If the type of data is the important thing, you need to write your schema by yourself.
        //        spark.sql("""drop table if exists `foo`.`people`""")
//        spark.sql("""
//        CREATE TABLE `foo`.`people` (
//            `name` STRING,
//            `age` long,
//            `extra_fields` STRING)
//        USING parquet""".trimIndent())
//        df.write().insertInto("foo.people")


        println("++++ select")
        val sqlDF: Dataset<Row> = spark.sql("SELECT * FROM foo.people")
        sqlDF.show(false)

        println("++++ select 2nd")
        val sqlDF2: Dataset<Row> = spark.sql("SELECT name, get_json_object(extra_fields, '$.interests') interests FROM foo.people")
        sqlDF2.show()

        println("++++ select 3rd")
        val sqlDF3: Dataset<Row> = spark.sql("SELECT avg(age) avg_age FROM foo.people")
        sqlDF3.show()
    }
}

fun main() {
    TestClass().run()
}

クエリを変更しなくていいというメリットがある一方で、hive にアクセスするので依存も増えるし、実行もめちゃくちゃ遅くなります。

df.write().mode(SaveMode.Overwrite).saveAsTable("foo.people")

のようにすると、df 側の型をみていい感じにテーブル定義してくれて便利だが、明示的に create table したいときは以下のようにしたほうがいいかも。

        spark.sql("""drop table if exists `foo`.`people`""")
        spark.sql("""
        CREATE TABLE `foo`.`people` (
            `name` STRING,
            `age` long,
            `extra_fields` STRING)
        USING parquet""".trimIndent())
        df.write().insertInto("foo.people")

両者の比較

hive を利用しない場合、上記コードは 4.427 sec 程度で終わりますが、hive を利用する場合は 19.676 sec 程度かかるようになります。 プロダクションコードのテストをする場合はこの差はそこそこでかいかも。

sample code

https://github.com/tokuhirom/sparksql-unittest

Created: 2020-08-07 08:26:56 +0900
Updated: 2020-08-07 08:26:56 +0900

curl で silence したいけどエラーはみたい。

       -s, --silent
              Silent or quiet mode. Don't show progress meter or error messages.  Makes Curl mute.

で silence できるが、これを入れると、error message も抑制されてしまう。

       -S, --show-error
              When used with -s it makes curl show an error message if it fails.

-S を追加で入れると、エラーは stderr に出るようになるのでちょうどいい感じになる。

Created: 2020-08-05 10:20:01 +0900
Updated: 2020-08-05 10:20:01 +0900

SystemRules ではなく SystemLambda を使う

https://github.com/stefanbirkner/system-rules

stdout/stderr の出力をキャプチャするのに systemrules が便利だが、junit5 対応はしていない。 junit5 に対応するためには system-labmda を使う。

https://github.com/stefanbirkner/system-lambda

インターフェースも junit に密結合していなくて、きれい。

Created: 2020-07-21 15:10:25 +0900
Updated: 2020-07-21 15:10:25 +0900

適当に JDBC でデータとってきてダンプするスニペット

生JDBCで適当にデータ出してデバッグしたいって時につかうやつです。

    protected void selectAndPrint(Connection connection, String query) {
        log.info("======> dumpTable: {} <====", query);
        try (PreparedStatement preparedStatement = connection.prepareStatement(query)) {
            try (ResultSet rs = preparedStatement.executeQuery()) {
                ResultSetMetaData metaData = rs.getMetaData();
                log.info("| {} |", IntStream.range(0, metaData.getColumnCount())
                                            .mapToObj(i -> {
                                                try {
                                                    return metaData.getColumnName(i + 1);
                                                } catch (SQLException e) {
                                                    throw new RuntimeException(e);
                                                }
                                            }).collect(Collectors.joining(" | ")));
                while (rs.next()) {
                    log.info("| {} |", IntStream.range(0, metaData.getColumnCount())
                                                .mapToObj(i -> {
                                                    try {
                                                        return rs.getString(i + 1);
                                                    } catch (SQLException e) {
                                                        throw new RuntimeException(e);
                                                    }
                                                }).collect(Collectors.joining(" | ")));
                }
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
Created: 2020-06-26 18:49:46 +0900
Updated: 2020-06-26 18:49:46 +0900

Swift で Foundation の data をバイト列で初期化したい

Data([0xDE, 0xAD, 0xBE, 0xEF]) こんな感じ。Data とか、一般的な名前すぎてググってもなんか見つけにくい。

Created: 2020-06-18 10:39:21 +0900
Updated: 2020-06-18 10:39:21 +0900

thrift compiler のバイナリを static build したい

http://archive.apache.org/dist/thrift/ からダウンロードする。

https://stackoverflow.com/questions/20068947/how-to-static-link-linux-software-that-uses-configure

linux の場合

sudo yum install glibc-static -y
./configure --enable-static --without-ruby --without-nodejs --without-php --without-python --without-c_glib --without-go --without-nodejs --without-lua CFLAGS="-static"
make -j9 LDFLAGS="-all-static"

osx の場合

osx では static build ができないので諦める。

brew install bison
export PATH="/usr/local/opt/bison/bin:$PATH"
export LDFLAGS="-L/usr/local/opt/bison/lib"
./configure --without-ruby --without-nodejs --without-php --without-python --without-c_glib --without-go --without-nodejs --without-lua
make -j 9
Created: 2020-06-17 18:58:44 +0900
Updated: 2020-06-17 18:58:44 +0900

[C#] C# で byte 列を16進数に変換したい

https://stackoverflow.com/questions/623104/byte-to-hex-string

の通りにやればいい。

byte[] data = { 1, 2, 4, 8, 16, 32 };
string hex = BitConverter.ToString(data);

// Result: 01-02-04-08-10-20

標準ライブラリでこういう表現できるのは便利だなーという感想。

Created: 2020-06-12 23:41:06 +0900
Updated: 2020-06-12 23:41:06 +0900

Gradle の dependency locking について

昔の gradle には dependency locking 機能がなかった。ビルドするタイミングによって、別の依存モジュールが利用されたりしていた。。 最近、gradle に dependency locking 機能がついたので試してみた。 carton.lock とか package-lock.json とか、そういうのと同じようなことができるようになる。 同じレポジトリからビルドしたら同じ jar が生成されるようになる。便利。

dependency locking を利用すると gradle.lockfile というファイルが生成される。

デフォルトだとフェーズ単位でファイルが生成されるから enableFeaturePreview('ONE_LOCKFILE_PER_PROJECT') を settings.gradle に書いて1ファイルにまとめるようにしたほうが良い。gradle 7.0 ではこの方式がデフォルトになる予定なので、最初からこの feature flag は enabled にしたほうが良いです。管理上も、そのほうが便利。

  • ./gradlew dependencies --write-locks ってするとロックファイルが書かれる
  • ./gradlew classes --update-locks org.apache.commons:commons-lang3,org.slf4j:slf4j-api とかで特定のモジュールだけアップデートできる

たぶんもう普通に使えるけど、まだ開発途中って感じはする。./gradlew dependencies してもサブプロジェクトのぶんを一括で作れない、とか。。

↓実際に line-bot-sdk-java を利用して試しに生成してみたやつがこれ。 https://github.com/tokuhirom/line-bot-sdk-java/commit/08a53ed86eedcf1072e7c12e77d7e1777f54c933

Created: 2020-05-13 10:00:39 +0900
Updated: 2020-05-13 10:00:39 +0900

PowerShell に git branch 情報を表示する

https://github.com/dahlbyk/posh-git

を利用すればいい。https://www.powershellgallery.com/packages/posh-git/1.0.0-beta4 PowerShell gallery からインストールすればいいです。

profile.ps1 に以下のように設定した。

Import-Module posh-git

function prompt {
    $prompt = & $GitPromptScriptBlock
    if ($prompt) { "$prompt " } else { " " }
}

$global:GitPromptSettings.DefaultPromptAbbreviateHomeDirectory = $true
$global:GitPromptSettings.EnableFileStatus = $false
Created: 2020-03-20 23:20:33 +0900
Updated: 2020-03-20 23:20:33 +0900

jacoco で lombok で生成されたコードを無視したい

Created: 2020-03-07 09:06:53 +0900
Updated: 2020-03-07 09:06:53 +0900

/actuator/configprops で kotlin の @ConfigurationProperties つけてるビーンが出ないよってとき

configprops は constructor の引数を見ている。何も指定しないと java と同じく arg0 とかになって acutuator で設定が見れない。

↓gradle の場合は以下のようにオプションを指定しよう。

            compileKotlin {
                kotlinOptions {
                    jvmTarget = "1.8"
                    allWarningsAsErrors = true
                    javaParameters = true // ←これ!
                }
            }
Created: 2020-02-27 15:43:49 +0900
Updated: 2020-02-27 15:43:49 +0900

NullPointerException の stacktrace が出ないケース

https://stackoverflow.com/questions/2411487/nullpointerexception-in-java-with-no-stacktrace https://stackoverflow.com/questions/4659151/recurring-exception-without-a-stack-trace-how-to-reset

hotspot だと最適化の都合で、出ないケースがあるらしい。

-XX:-OmitStackTraceInFastThrow を入れると全部出るようになるが、パフォーマンスに若干の影響があるのかな。試してない。

→ パフォーマンスはあんま問題なさそうだが、ディスク溢れないように気をつけよう、という識者の意見。

Created: 2020-02-27 10:30:43 +0900
Updated: 2020-02-27 10:30:43 +0900

Skitch が catalina で使えなくなったので頑張って OSX のスクショ機能を使っていた

OSX のスクショ機能なんであんな使いにくいの。。モーダルにしなくてよくない?? って苦しんでたけど、

↓これで skitch をまた使えるようになった。安心。 https://neko11.com/macos-catalina%E3%81%AB%E3%81%97%E3%81%A6%E3%81%8B%E3%82%89skitch%E3%81%A7%E3%82%B9%E3%82%AF%E3%83%AA%E3%83%BC%E3%83%B3%E3%82%B7%E3%83%A7%E3%83%83%E3%83%88%E3%81%A7%E3%81%8D%E3%81%AA%E3%81%8F%E3%81%AA/

Created: 2020-02-26 09:13:10 +0900
Updated: 2020-02-26 09:13:10 +0900

compressed oops

https://blog.tiqwab.com/2019/07/21/openjdk-oop.html http://itdoc.hitachi.co.jp/manuals/link/cosmi_v0970/03Y1660D/EY160244.HTM https://www.baeldung.com/jvm-compressed-oops

Created: 2020-01-29 11:32:29 +0900
Updated: 2020-01-29 11:32:29 +0900

cloc みたいな行数適当に数える奴

#!/usr/bin/env perl
use strict;

&main; exit;

sub main {
    my %white = map { $_ => 1 } qw/
        kt java pl js rb py
    /;

    my %result;

    for my $file (split /\n/, `git ls-files`) {
        my ($ext,) = ($file =~ m{\.([^./_-]{1,4}$)});
        $ext //= '-';
        my $lines = do {
            open my $fh, '<', $file;
            my $n = 0;
            $n++ while <$fh>;
            close $fh;
            $n;
        };
        $result{$ext} += $lines;
    }

    my $total = 0;
    for my $ext (sort { $result{$a} <=> $result{$b} } keys %result) {
        if (%white) {
            next unless $white{$ext};
        }

        my $r = $result{$ext};
        printf("%5s %10s\n", $ext, commify($r));
        $total += $r;
    }
    print "\n";
    printf("%5s %10s\n", "Total", commify($total));
}

sub commify {
    local $_ = shift;
    1 while s/^([-+]?\d+)(\d{3})/$1,$2/;
    return $_;
}

Created: 2020-01-25 08:11:20 +0900
Updated: 2020-01-25 08:11:20 +0900

[kotlin] lateinit が初期化されてるか確認する

lateinit var foo: String である時に、lateinit 変数が初期化ずみか調べるには ::foo.isInitialized とすると良い

Created: 2020-01-11 10:18:34 +0900
Updated: 2020-01-11 10:18:34 +0900

mysql 8.0.17 から 8.0.19 までは member は予約語

MEMBER; added in 8.0.17 (reserved); became nonreserved in 8.0.19

Created: 2020-01-10 00:07:49 +0900
Updated: 2020-01-10 00:07:49 +0900

レポジトリの中に含まれる kotlin/java ファイル数を歴史込みで集計する

TSV が出るので、出たら tableau とかで集計する。

# XXX git checkout --force するんで手元がぶっ壊れるので注意してね! XXX
import subprocess
import os
from datetime import datetime, timedelta, date


print("ago\tlanguage\tfiles\tlines")

subprocess.run(['git', 'stash', '--quiet'])
subprocess.run(['git', 'checkout', '--quiet', 'master'])

for ago in range(1, 500, 10):
    cmd = '''git log --before '%d day ago' -n 1 --oneline "--format=format:%%H %%aI" | sed -e "s/T.*//" ''' % ago
    res = subprocess.run(['/bin/sh', '-c', cmd], capture_output=True)
    splitted = res.stdout.decode().rstrip().split(' ')
    commit = splitted[0]
    date = splitted[1]
    subprocess.run(['git', 'checkout', '--force', '--quiet', commit])

    kotlin_cnt = 0
    kotlin_lines = 0
    java_cnt = 0
    java_lines = 0
    for dirName, subdirList, fileList in os.walk('.'):
        for fname in fileList:
            if fname.endswith('.kt'):
                kotlin_cnt += 1
                kotlin_lines += sum(1 for line in open(f"{dirName}/{fname}"))
            elif fname.endswith('.java'):
                java_cnt += 1
                java_lines += sum(1 for line in open(f"{dirName}/{fname}"))

    print(f"{date}\tkotlin\t{kotlin_cnt}\t{kotlin_lines}")
    print(f"{date}\tjava\t{java_cnt}\t{java_lines}")
Created: 2019-12-12 15:56:49 +0900
Updated: 2019-12-12 15:56:49 +0900

mutable なクラスがどのぐらい DI 対象になってるか調べたい

BeanPostProcessor で探せる。

import org.springframework.beans.factory.config.BeanPostProcessor
import org.springframework.context.annotation.Profile
import org.springframework.stereotype.Component
import java.lang.reflect.Modifier

@Component
@Profile("!real")
class MyProcessor : BeanPostProcessor {
    override fun postProcessAfterInitialization(bean: Any, beanName: String): Any? {
        val fields = bean.javaClass.declaredFields
        val packageName = bean.javaClass.packageName
        if ((!packageName.startsWith("org.springframework"))
            && fields.filter { !Modifier.isFinal(it.modifiers) }.count() > 0) {
            println("$bean has mutable field.")
        }
        return bean;
    }
}
Created: 2019-12-10 15:57:28 +0900
Updated: 2019-12-10 15:57:28 +0900

[spring] @Configuration と @Component は違う、あるいは @Bean lite mode について

Spring の @Configuration は @Component の stereotype だと思っていたが、挙動が違うとのこと。

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/annotation/Bean.html

@Bean Lite Mode というものがあって、@Component を利用すると CGLIB proxy を利用した aspect 系の処理が動かないらしい。

https://github.com/spring-projects/spring-framework/issues/14061

歴史を振り返ると、spring 3 の頃にバグ報告されたから lite mode ということにしたようにもみえる(深追いしてない)。

(全く知らなくてまつださんに教えてもらった)


【20191123 追記】

Spring framework 5.2 以後では、@Configuration(proxyBeanMethods=false) と書けるようになっているから、これを利用するのが良さそう(意図が明確なので)。

Created: 2019-11-19 10:22:12 +0900
Updated: 2019-11-19 10:22:12 +0900

Spring framework 5.2 以後=Spring Boot 2.2 以後では WebClient のレスポンスの取り扱いが便利になっている件

https://twitter.com/making/status/1192216994147270656?s=20

WebClient からの response status などを取得しやすくなっている。 便利。

    val logger = LoggerFactory.getLogger(WebfluxSampleApplication::class.java)

    val url = "http://example.com/"

    val webClient = WebClient.builder()
            .build()
    val entityMono = webClient.get().uri("http://example.com/")
            .retrieve()
            .toEntity(String::class.java)

    logger.info("Sent http request. url=$url")

    val entity = entityMono.block()!!

    if (entity.statusCode.is2xxSuccessful) {
        val body = entity.body
        logger.info("HTTP request succeeded. url=$url status=${entity.statusCodeValue} length=${body?.length}")

        // process response data...
        println(body)
    } else {
        logger.info("HTTP request failed. url=$url status=${entity.statusCodeValue} body=${entity.body}")
    }
Created: 2019-11-07 08:58:18 +0900
Updated: 2019-11-07 08:58:18 +0900
Next page