言語処理100本ノック第2章に挑戦
https://www.du-soleil.com/entry/nlp-100-01
前回に引き続いて『言語処理100本ノック第2章: UNIXコマンド』のための環境構築と実装を行っていきたい。今回の章はタイトルの通り UNIX コマンドを実行し、その通りに動くプログラムを作って突合する。
外部ダウンロードファイルを含む場合の GitHub Actions
UNIXコマンドの挙動は実装系によって異なるし、ファイル読み書きが必要となるため、Dockerによる環境の固定とコンテナ化が必要となる。GitHub Actions で Docker コンテナを利用することでサーバーサイドの自動テストも行えるようした。
(前略) jobs: unit-test: timeout-minutes: 15 runs-on: ubuntu-latest container: ghcr.io/${{ github.repository }}:latest steps: - run: pwd - uses: actions/checkout@v2 - run: cp -r . /data - id: data-download uses: actions/cache@v2 with: path: /data/work/popular-names.txt key: /data/work/popular-names.txt - if: steps.data-download.outputs.cache-hit != 'true' run: curl https://nlp100.github.io/data/popular-names.txt > /data/work/popular-names.txt - run: pytest -s /data/tests/ working-directory: /data
popular-names.txt
というテストファイルが前提となるためテスト前にダウンロードしているが、actions/cache@v2
で永続キャッシュに入れてキャッシュ復元に成功したらダウンロードしないようにしている。
UNIXコマンドを任意に実行して結果を文字列で取得する
(前略) def unix_cmd(cmd): res = subprocess.run(cmd, shell=True, text=True, stdout=subprocess.PIPE) return str(res.stdout)
今回は UNIX コマンドとの突合が必要となるため上記のプログラムを使い回すこととした。詳細仕様は『subprocess --- サブプロセス管理 — Python 3.11.4 ドキュメント』を読んでもらうとして、Shell文字列を実行可能にして、パイプラインの標準出力結果をテキストとして取得。それを文字列に変換して返却している。
10. 行数のカウント
行数をカウントせよ.確認にはwcコマンドを用いよ.
def test_execute(): input = "./work/popular-names.txt" expected = util.unix_cmd(f"cat {input} | wc -l").rstrip() actual = str(nlp_010.execute(input)) assert expected == actual
テストコードは上記。 cat ./work/popular-names.txt | wc -l
で行数を取得。比較のために改行を除去している。
def execute(path): with open(path, "r") as f: return len(f.read().splitlines())
プログラム的には上記。
11. タブをスペースに置換
タブ1文字につきスペース1文字に置換せよ.確認にはsedコマンド,trコマンド,もしくはexpandコマンドを用いよ.
def test_sed(): input = "./work/popular-names.txt" expected = util.unix_cmd(f"sed -e 's/\t/ /g' {input}") actual = nlp_011.execute(input) assert expected == actual def test_tr(): input = "./work/popular-names.txt" expected = util.unix_cmd(f"cat {input} | tr '\t' ' '") actual = nlp_011.execute(input) assert expected == actual def test_expand(): input = "./work/popular-names.txt" expected = util.unix_cmd(f"expand -t 1 {input}") actual = nlp_011.execute(input) assert expected == actual
タブをスペースに変換するコマンド三兄弟。
def execute(path): with open(path, "r") as f: return f.read().replace("\t", " ")
文字列として変換するだけとした。
12. 1列目をcol1.txtに,2列目をcol2.txtに保存
各行の1列目だけを抜き出したものをcol1.txtに,2列目だけを抜き出したものをcol2.txtとしてファイルに保存せよ.確認にはcutコマンドを用いよ.
def cut(col, input, output): expected = util.unix_cmd(f"cut -f {col} {input}") nlp_012.out(col, input, output) actual = util.unix_cmd(f"cat {output}") return expected, actual def test_col1(): col = 1 input = "./work/popular-names.txt" output = "./work/col1.txt" expected, actual = cut(col, input, output) assert expected == actual def test_col2(): col = 2 input = "./work/popular-names.txt" output = "./work/col2.txt" expected, actual = cut(col, input, output) assert expected == actual
cut
の結果と Python で作った列抜き出し処理が出力したファイルの cat
結果を突合させている。
def execute(field, input, output): text = cut(field, input) with open(output, "w") as f: f.write(text) def cut(field, path): with open(path, "r") as f: r = [ff.split("\t")[field - 1] for ff in f.read().splitlines()] return "\n".join(r) + "\n"
行ごとにタブで分割して指定列だけを抜き出して改行結合。
13. col1.txtとcol2.txtをマージ
12で作ったcol1.txtとcol2.txtを結合し,元のファイルの1列目と2列目をタブ区切りで並べたテキストファイルを作成せよ.確認にはpasteコマンドを用いよ.
def test_execute(): arg1 = "./work/col1.txt" arg2 = "./work/col2.txt" expected = util.unix_cmd(f"paste {arg1} {arg2}") actual = nlp_013.execute(arg1, arg2) assert expected == actual
paste
コマンドをそのまま利用。
def execute(path1, path2): df1 = pandas.read_csv(path1, names=["col1"]) df2 = pandas.read_csv(path2, names=["col2"]) df = pandas.concat([df1, df2], axis=1) return df.to_csv(sep="\t", header=None, index=False)
ここから pandas
を利用。ファイルを取得して、 concat
を列方向で実施してヘッダーなし、インデックスなしの tsv を文字列として返却。
14. 先頭からN行を出力
自然数Nをコマンドライン引数などの手段で受け取り,入力のうち先頭のN行だけを表示せよ.確認にはheadコマンドを用いよ.
def head(input, num): expected = util.unix_cmd(f"head -n {num} {input}") actual = nlp_014.execute(input, num) return expected, actual def test_head1(): expected, actual = head("./work/popular-names.txt", 1) assert expected == actual def test_head3(): expected, actual = head("./work/popular-names.txt", 3) assert expected == actual def test_head10(): expected, actual = head("./work/popular-names.txt", 10) assert expected == actual
head
コマンドを実施。関数としてはファイル名、行数の順番にしたいのでコマンドとは逆になる。
def execute(input, num): df = pandas.read_csv( input, sep="\t", header=None, names=["name", "gender", "num", "year"] ) return df.head(num).to_csv(sep="\t", header=None, index=False)
pandas.DataFrame
に head
関数があるのでそのまま利用。
15. 末尾のN行を出力
自然数Nをコマンドライン引数などの手段で受け取り,入力のうち末尾のN行だけを表示せよ.確認にはtailコマンドを用いよ.
def tail(input, num): expected = util.unix_cmd(f"tail -n {num} {input}") actual = nlp_015.execute(input, num) return expected, actual def test_tail1(): expected, actual = tail("./work/popular-names.txt", 1) assert expected == actual def test_tail3(): expected, actual = tail("./work/popular-names.txt", 3) assert expected == actual def test_tail10(): expected, actual = tail("./work/popular-names.txt", 10) assert expected == actual
今度は tail
。あまり変わらない。
def execute(input, num): df = pandas.read_csv( input, sep="\t", header=None, names=["name", "gender", "num", "year"] ) return df.tail(num).to_csv(sep="\t", header=None, index=False)
こちらも tail
関数を利用。
16. ファイルをN分割する
自然数Nをコマンドライン引数などの手段で受け取り,入力のファイルを行単位でN分割せよ.同様の処理をsplitコマンドで実現せよ.
def split(input, num): exp_out = f"./work/split_expected_{num}_" act_out = f"./work/split_actual_{num}_" util.unix_cmd( f"split -l $((`cat {input} | wc -l` / {num} + 1 )) {input} -d {exp_out}" ) nlp_016.execute(input, num, act_out) return exp_out, act_out def test_split1(): exp_out, act_out = split("./work/popular-names.txt", 1) assert filecmp.cmp(f"{exp_out}00", f"{act_out}00") def test_split3(): exp_out, act_out = split("./work/popular-names.txt", 3) assert filecmp.cmp(f"{exp_out}00", f"{act_out}00") assert filecmp.cmp(f"{exp_out}01", f"{act_out}01") assert filecmp.cmp(f"{exp_out}02", f"{act_out}02") def test_split5(): exp_out, act_out = split("./work/popular-names.txt", 5) assert filecmp.cmp(f"{exp_out}00", f"{act_out}00") assert filecmp.cmp(f"{exp_out}01", f"{act_out}01") assert filecmp.cmp(f"{exp_out}02", f"{act_out}02") assert filecmp.cmp(f"{exp_out}03", f"{act_out}03") assert filecmp.cmp(f"{exp_out}04", f"{act_out}04")
実は一番難しく感じた。 split -n
で分割数を指定できるが、こちらは文字単位なので問題文の「行単位」ではない。結局のところで行数を分割数でわって +1 した行数で分割するようにした。-l
は本体行数を超えても問題ないので、num
が 1 の場合も動作する。-d
オプションをつけることで分割ファイルの末尾が数値形式となる。
def execute(input, number, out): with open(input, "r") as f: text = f.read().splitlines() flagsize = len(text) // number + 1 for n in range(number): with open(f"{out}{n:02d}", "w") as w: start = n * flagsize end = min((n + 1) * flagsize, len(text)) w.write("\n".join(text[start:end]) + "\n")
基本的には同じように分割数で割った結果を整数にして +1。取得した行数の範囲を指定してファイルに出力。この時に最終ファイルはテキストの行数を超えてしまう可能性があるため、最大行数を取得している。
17. 1列目の文字列の異なり
1列目の文字列の種類(異なる文字列の集合)を求めよ.確認にはcut, sort, uniqコマンドを用いよ.
def test_word_uniq(): input = "./work/popular-names.txt" expected = util.unix_cmd(f"cut -f 1 {input} | sort | uniq") actual = nlp_017.execute(input) assert actual == expected
特定列を取得しての重複除去はよくあるイデオム。 uniq
は sort
をしないと動かない。
def execute(input): df = pandas.read_csv( input, sep="\t", header=None, names=["name", "gender", "num", "year"] ) return "\n".join(df["name"].sort_values().unique()) + "\n"
Dataframe から Series を取り出してソートして重複除去。pandas.Series
は List と同様に join
で区切り文字付きの文字列に変換できる。
18. 各行を3コラム目の数値の降順にソート
各行を3コラム目の数値の逆順で整列せよ(注意: 各行の内容は変更せずに並び替えよ).確認にはsortコマンドを用いよ(この問題はコマンドで実行した時の結果と合わなくてもよい).
def test_word_sort(): input = "./work/popular-names.txt" expected = util.unix_cmd(f"sort -k 3nr -k 1 {input}") actual = nlp_018.execute(input) assert actual == expected
sort
コマンドの連休。 -k
で順にソートキーの順番を指定。 -n
は数値としてのソート、 -r
は逆順ソート。3列目だけでソートするコマンドとプログラムの結果が合わなくなって検証ができないので、1行目でもソートしている。
def execute(path): df = pandas.read_csv( path, sep="\t", header=None, names=["name", "gender", "num", "year"] ) df = df.sort_values(["num", "name"], ascending=[False, True]).reset_index(drop=True) return df.to_csv(sep="\t", index=False, header=None)
pandas.DataFrame
のソートをした後に、 reset_index
でインデックスを捨てて、ヘッダーなし、インデックスなしの tsv を文字列として返却。
19. 各行の1コラム目の文字列の出現頻度を求め,出現頻度の高い順に並べる
各行の1列目の文字列の出現頻度を求め,その高い順に並べて表示せよ.確認にはcut, uniq, sortコマンドを用いよ.
def test_word_freq(): input = "./work/popular-names.txt" cmd = f"cut -f 1 {input} | sort | uniq -c | sort -k 1nr -k 2 | sed -e 's/^ *[0-9]* //'" expected = util.unix_cmd(cmd) actual = nlp_019.execute(input) assert expected == actual
sort
までは応用問題であるが、高い順に並べる対象を名前だけにする場合は UNIX コマンドだとやや面倒。 uniq -c
は以下のような出力である。
108 Robert 92 Mary 75 Charles 74 Michael
スペース区切りでフィールドを取得しようとすると字下げのスペースと区別がつけられず cut -d
などで名前だけを切り出すことができない。先の sor
t 必須といいどうにも uniq
の入出力にはセンスがないような気がする。結局は sed
で行頭のスペース、数値、スペースを除去することにしたが、sed
も \s*\d+\s
みたいに使い慣れた拡張正規表現記法に対応しておらず難儀した。
実装系によってもオプションが異なるため、クラシックな正規表現としている。
def execute(path): df = pandas.read_csv( path, sep="\t", header=None, names=["name", "gender", "num", "year"] ) df = df.groupby("name").size().reset_index(name="freq") df = df.sort_values(["freq", "name"], ascending=[False, True]) return df[["name"]].to_csv(sep="\t", index=False, header=None)
フリークエンシーを取得して並び替え出力。
基本的にはこちらで作った処理と同様。
言語処理100本ノック第2章まとめ
今回までで言語処理100本ノック第2章を完了した。 Python としては簡単な問題ばかりだったけど UNIX コマンドで考えるとちょっとした面倒さを感じたりもした。UNIXの基本的な考え方として単機能の処理をパイプで繋げて応用できる良さがある。
ちょっとした処理については、可能な限りコマンドラインで済ませておいた方が楽だし、バグの入り込む余地も少ない。また pytest でUNIXコマンドと結果と Python 処理の突合をしたり、それを GitHub Actions 上でも動かせるようにするのはなかなか面白い経験であった。