言語処理100本ノック第2章を実装しながら pytest で Python 処理と UNIX コマンド結果を自動突合する




 前回に引き続いて『言語処理100本ノック第2章: UNIXコマンド』のための環境構築と実装を行っていきたい。今回の章はタイトルの通り UNIX コマンドを実行し、その通りに動くプログラムを作って突合する。

外部ダウンロードファイルを含む場合の GitHub Actions

 UNIXコマンドの挙動は実装系によって異なるし、ファイル読み書きが必要となるため、Dockerによる環境の固定とコンテナ化が必要となる。GitHub Actions で Docker コンテナを利用することでサーバーサイドの自動テストも行えるようした。

    timeout-minutes: 15
    runs-on: ubuntu-latest
    container: ghcr.io/${{ github.repository }}:latest
      - run: pwd
      - uses: actions/checkout@v2
      - run: cp -r . /data
      - id: data-download
        uses: actions/cache@v2
          path: /data/work/popular-names.txt
          key: /data/work/popular-names.txt
      - if: steps.data-download.outputs.cache-hit != 'true'
        run: curl https://nlp100.github.io/data/popular-names.txt > /data/work/popular-names.txt
      - run: pytest -s /data/tests/
        working-directory: /data

 popular-names.txt というテストファイルが前提となるためテスト前にダウンロードしているが、actions/cache@v2 で永続キャッシュに入れてキャッシュ復元に成功したらダウンロードしないようにしている。


def unix_cmd(cmd):
    res = subprocess.run(cmd, shell=True, text=True, stdout=subprocess.PIPE)
    return str(res.stdout)

 今回は UNIX コマンドとの突合が必要となるため上記のプログラムを使い回すこととした。詳細仕様は『subprocess --- サブプロセス管理 — Python 3.11.4 ドキュメント』を読んでもらうとして、Shell文字列を実行可能にして、パイプラインの標準出力結果をテキストとして取得。それを文字列に変換して返却している。

10. 行数のカウント


def test_execute():
    input = "./work/popular-names.txt"
    expected = util.unix_cmd(f"cat {input} | wc -l").rstrip()
    actual = str(nlp_010.execute(input))
    assert expected == actual

 テストコードは上記。 cat ./work/popular-names.txt | wc -l で行数を取得。比較のために改行を除去している。

def execute(path):
    with open(path, "r") as f:
        return len(f.read().splitlines())


11. タブをスペースに置換


def test_sed():
    input = "./work/popular-names.txt"
    expected = util.unix_cmd(f"sed -e 's/\t/ /g' {input}")
    actual = nlp_011.execute(input)
    assert expected == actual

def test_tr():
    input = "./work/popular-names.txt"
    expected = util.unix_cmd(f"cat {input} | tr '\t' ' '")
    actual = nlp_011.execute(input)
    assert expected == actual

def test_expand():
    input = "./work/popular-names.txt"
    expected = util.unix_cmd(f"expand -t 1 {input}")
    actual = nlp_011.execute(input)
    assert expected == actual


def execute(path):
    with open(path, "r") as f:
        return f.read().replace("\t", " ")


12. 1列目をcol1.txtに,2列目をcol2.txtに保存


def cut(col, input, output):
    expected = util.unix_cmd(f"cut -f {col} {input}")
    nlp_012.out(col, input, output)
    actual = util.unix_cmd(f"cat {output}")
    return expected, actual

def test_col1():
    col = 1
    input = "./work/popular-names.txt"
    output = "./work/col1.txt"
    expected, actual = cut(col, input, output)
    assert expected == actual

def test_col2():
    col = 2
    input = "./work/popular-names.txt"
    output = "./work/col2.txt"
    expected, actual = cut(col, input, output)
    assert expected == actual

 cut の結果と Python で作った列抜き出し処理が出力したファイルの cat 結果を突合させている。

def execute(field, input, output):
    text = cut(field, input)
    with open(output, "w") as f:

def cut(field, path):
    with open(path, "r") as f:
        r = [ff.split("\t")[field - 1] for ff in f.read().splitlines()]
        return "\n".join(r) + "\n"


13. col1.txtとcol2.txtをマージ


def test_execute():
    arg1 = "./work/col1.txt"
    arg2 = "./work/col2.txt"
    expected = util.unix_cmd(f"paste {arg1} {arg2}")
    actual = nlp_013.execute(arg1, arg2)
    assert expected == actual

 paste コマンドをそのまま利用。

def execute(path1, path2):
    df1 = pandas.read_csv(path1, names=["col1"])
    df2 = pandas.read_csv(path2, names=["col2"])
    df = pandas.concat([df1, df2], axis=1)
    return df.to_csv(sep="\t", header=None, index=False)

 ここから pandas を利用。ファイルを取得して、 concat を列方向で実施してヘッダーなし、インデックスなしの tsv を文字列として返却。

14. 先頭からN行を出力


def head(input, num):
    expected = util.unix_cmd(f"head -n {num} {input}")
    actual = nlp_014.execute(input, num)
    return expected, actual

def test_head1():
    expected, actual = head("./work/popular-names.txt", 1)
    assert expected == actual

def test_head3():
    expected, actual = head("./work/popular-names.txt", 3)
    assert expected == actual

def test_head10():
    expected, actual = head("./work/popular-names.txt", 10)
    assert expected == actual

 head コマンドを実施。関数としてはファイル名、行数の順番にしたいのでコマンドとは逆になる。

def execute(input, num):
    df = pandas.read_csv(
        input, sep="\t", header=None, names=["name", "gender", "num", "year"]
    return df.head(num).to_csv(sep="\t", header=None, index=False)

 pandas.DataFramehead 関数があるのでそのまま利用。

15. 末尾のN行を出力


def tail(input, num):
    expected = util.unix_cmd(f"tail -n {num} {input}")
    actual = nlp_015.execute(input, num)
    return expected, actual

def test_tail1():
    expected, actual = tail("./work/popular-names.txt", 1)
    assert expected == actual

def test_tail3():
    expected, actual = tail("./work/popular-names.txt", 3)
    assert expected == actual

def test_tail10():
    expected, actual = tail("./work/popular-names.txt", 10)
    assert expected == actual

 今度は tail。あまり変わらない。

def execute(input, num):
    df = pandas.read_csv(
        input, sep="\t", header=None, names=["name", "gender", "num", "year"]
    return df.tail(num).to_csv(sep="\t", header=None, index=False)

 こちらも tail 関数を利用。

16. ファイルをN分割する


def split(input, num):
    exp_out = f"./work/split_expected_{num}_"
    act_out = f"./work/split_actual_{num}_"
        f"split -l $((`cat {input} | wc -l` / {num} + 1 )) {input} -d {exp_out}"
    nlp_016.execute(input, num, act_out)
    return exp_out, act_out

def test_split1():
    exp_out, act_out = split("./work/popular-names.txt", 1)
    assert filecmp.cmp(f"{exp_out}00", f"{act_out}00")

def test_split3():
    exp_out, act_out = split("./work/popular-names.txt", 3)
    assert filecmp.cmp(f"{exp_out}00", f"{act_out}00")
    assert filecmp.cmp(f"{exp_out}01", f"{act_out}01")
    assert filecmp.cmp(f"{exp_out}02", f"{act_out}02")

def test_split5():
    exp_out, act_out = split("./work/popular-names.txt", 5)
    assert filecmp.cmp(f"{exp_out}00", f"{act_out}00")
    assert filecmp.cmp(f"{exp_out}01", f"{act_out}01")
    assert filecmp.cmp(f"{exp_out}02", f"{act_out}02")
    assert filecmp.cmp(f"{exp_out}03", f"{act_out}03")
    assert filecmp.cmp(f"{exp_out}04", f"{act_out}04")

 実は一番難しく感じた。 split -n で分割数を指定できるが、こちらは文字単位なので問題文の「行単位」ではない。結局のところで行数を分割数でわって +1 した行数で分割するようにした。-l は本体行数を超えても問題ないので、num が 1 の場合も動作する。-d オプションをつけることで分割ファイルの末尾が数値形式となる。

def execute(input, number, out):
    with open(input, "r") as f:
        text = f.read().splitlines()
        flagsize = len(text) // number + 1
        for n in range(number):
            with open(f"{out}{n:02d}", "w") as w:
                start = n * flagsize
                end = min((n + 1) * flagsize, len(text))
                w.write("\n".join(text[start:end]) + "\n")

 基本的には同じように分割数で割った結果を整数にして +1。取得した行数の範囲を指定してファイルに出力。この時に最終ファイルはテキストの行数を超えてしまう可能性があるため、最大行数を取得している。

17. 1列目の文字列の異なり

1列目の文字列の種類(異なる文字列の集合)を求めよ.確認にはcut, sort, uniqコマンドを用いよ.

def test_word_uniq():
    input = "./work/popular-names.txt"
    expected = util.unix_cmd(f"cut -f 1 {input} | sort | uniq")
    actual = nlp_017.execute(input)
    assert actual == expected

 特定列を取得しての重複除去はよくあるイデオム。 uniqsort をしないと動かない。

def execute(input):
    df = pandas.read_csv(
        input, sep="\t", header=None, names=["name", "gender", "num", "year"]
    return "\n".join(df["name"].sort_values().unique()) + "\n"

 Dataframe から Series を取り出してソートして重複除去。pandas.Series は List と同様に join で区切り文字付きの文字列に変換できる。

18. 各行を3コラム目の数値の降順にソート

各行を3コラム目の数値の逆順で整列せよ(注意: 各行の内容は変更せずに並び替えよ).確認にはsortコマンドを用いよ(この問題はコマンドで実行した時の結果と合わなくてもよい).

def test_word_sort():
    input = "./work/popular-names.txt"
    expected = util.unix_cmd(f"sort -k 3nr -k 1 {input}")
    actual = nlp_018.execute(input)
    assert actual == expected

 sort コマンドの連休。 -k で順にソートキーの順番を指定。 -n は数値としてのソート、 -r は逆順ソート。3列目だけでソートするコマンドとプログラムの結果が合わなくなって検証ができないので、1行目でもソートしている。

def execute(path):
    df = pandas.read_csv(
        path, sep="\t", header=None, names=["name", "gender", "num", "year"]
    df = df.sort_values(["num", "name"], ascending=[False, True]).reset_index(drop=True)
    return df.to_csv(sep="\t", index=False, header=None)

 pandas.DataFrame のソートをした後に、 reset_index でインデックスを捨てて、ヘッダーなし、インデックスなしの tsv を文字列として返却。

19. 各行の1コラム目の文字列の出現頻度を求め,出現頻度の高い順に並べる

各行の1列目の文字列の出現頻度を求め,その高い順に並べて表示せよ.確認にはcut, uniq, sortコマンドを用いよ.

def test_word_freq():
    input = "./work/popular-names.txt"
    cmd = f"cut -f 1 {input} | sort | uniq  -c | sort -k 1nr -k 2 | sed -e 's/^ *[0-9]* //'"
    expected = util.unix_cmd(cmd)
    actual = nlp_019.execute(input)
    assert expected == actual

 sort までは応用問題であるが、高い順に並べる対象を名前だけにする場合は UNIX コマンドだとやや面倒。 uniq -c は以下のような出力である。

108 Robert
  92 Mary
  75 Charles
  74 Michael

 スペース区切りでフィールドを取得しようとすると字下げのスペースと区別がつけられず cut -d などで名前だけを切り出すことができない。先の sort 必須といいどうにも uniq の入出力にはセンスがないような気がする。結局は sed で行頭のスペース、数値、スペースを除去することにしたが、sed\s*\d+\s みたいに使い慣れた拡張正規表現記法に対応しておらず難儀した。


def execute(path):
    df = pandas.read_csv(
        path, sep="\t", header=None, names=["name", "gender", "num", "year"]
    df = df.groupby("name").size().reset_index(name="freq")
    df = df.sort_values(["freq", "name"], ascending=[False, True])
    return df[["name"]].to_csv(sep="\t", index=False, header=None)




 今回までで言語処理100本ノック第2章を完了した。 Python としては簡単な問題ばかりだったけど UNIX コマンドで考えるとちょっとした面倒さを感じたりもした。UNIXの基本的な考え方として単機能の処理をパイプで繋げて応用できる良さがある。

 ちょっとした処理については、可能な限りコマンドラインで済ませておいた方が楽だし、バグの入り込む余地も少ない。また pytest でUNIXコマンドと結果と Python 処理の突合をしたり、それを GitHub Actions 上でも動かせるようにするのはなかなか面白い経験であった。