第3章 声質変換のための様々なモジュールの実装

この章では、声質変換のために必要となる様々なモジュールの実装と簡単な解説を行う。

3.1 STFファイルの読み書き

声質変換を行うためには、まずTANDEM-STRAIGHTで抽出された特徴量をPythonで扱える形式に変換しなければならない。しかし、PythonからTANDEM-STRAIGHTをライブラリとして呼び出し、結果を受け取るのはかなり煩雑なので、TANDEM-STRAIGHTの音声データから抽出した特徴量をSTFファイルという独自形式で保存する機能を利用する。

まず、音声データをSTFファイルに変換するには、配布されているTANDEM-STRAIGHTライブラリ内のGenerateSTFという実行ファイルを利用できる。そうして得られるSTFファイルを、Pythonで扱いやすいようデータを読み込んで、numpyの配列に変換するようなライブラリを実装する。

STFファイルについての具体的な仕様は配布されているライブラリ内のドキュメントに詳しく記載されているが、簡単に説明すると、1つのSTFファイルは複数のチャンクからなり、それぞれのチャンクは格納しているデータの種類を表すチャンク名、チャンクサイズ、そしてデータから構成されている。チャンクのうち、特徴量に直接関係のない制御用のものは以下の3つである。

ただし、GenerateSTFで得られるSTFファイルではNXFLチャンクは用いられないので、STFTとCHKLのみに対して処理を行えばよい。また、特徴量を表すチャンクはいくつかあるが、以下の3つのチャンクが存在していれば合成が可能なので、これら以外は無視してもよい。

これをPythonで実装したのが、以下のソースコードである。

リスト3.1: stf.py

 1: #!/usr/bin/env python
 2: 
 3: import numpy
 4: import os
 5: import struct
 6: import sys
 7: 
 8: class STF:
 9:     def __init__(self, filename = None):
10:         self.endian = '>'
11:         self.chunks = ['APSG', 'F0  ', 'SPEC']
12: 
13:     def loadfile(self, filename):
14:         with open(filename, 'rb') as stf_file:
15:             self.load(stf_file)
16: 
17:     def load(self, stf_file):
18:         filesize = os.fstat(stf_file.fileno()).st_size
19: 
20:         while stf_file.tell() < filesize:
21:             chunk = stf_file.read(4)
22: 
23:             if chunk == 'STRT':
24:                 if stf_file.read(2) == '\xff\xfe':
25:                     self.endian = '<'
26:                 chunk_size, self.version, self.channel, self.frequency \
27:                     = struct.unpack(self.endian + 'IHHI', stf_file.read(12))
28:             else:
29:                 chunk_size, = struct.unpack(self.endian + 'I', stf_file.read(4))
30: 
31:                 if chunk == 'CHKL' or chunk == 'NXFL':
32:                     data = stf_file.read(chunk_size)
33:                     if chunk == 'CHKL':
34:                         self.chunks += [data[i: i + 4] \
35:                             for i in range(0, chunk_size, 4) \
36:                             if data[i: i + 4] not in self.chunks]
37:                 else:
38:                     self.shift_length, frame_count, argument, \
39:                     self.bit_size, self.weight, data_size \
40:                         = struct.unpack(self.endian + 'dIIHdI', stf_file.read(30))
41:                     data = stf_file.read(data_size)
42: 
43:                     element = data_size / (self.bit_size / 8)
44:                     matrix = numpy.fromstring(data, count = element)
45: 
46:                     for c in self.chunks:
47:                         if chunk == c:
48:                             if element / frame_count == 1:
49:                                 self.__dict__[c.strip()] = matrix
50:                             else:
51:                                 self.__dict__[c.strip()] \
52:                                     = matrix.reshape( \
53:                                         (frame_count, element / frame_count))
54:                             break
55: 
56:         for c in self.chunks:
57:             if c.strip() not in self.__dict__:
58:                 self.__dict__[c.strip()] = None
59: 
60:     def savefile(self, filename):
61:         with open(filename, 'wb') as stf_file:
62:             self.save(stf_file)
63: 
64:     def save(self, stf_file):
65:         stf_file.write('STRT')
66:         if self.endian == '>':
67:             stf_file.write('\xfe\xff')
68:         elif self.endian == '<':
69:             stf_file.write('\xff\xfe')
70:         stf_file.write(struct.pack(self.endian + 'IHHI', 8, \
71:             self.version, self.channel, self.frequency))
72: 
73:         stf_file.write('CHKL')
74:         stf_file.write(struct.pack(self.endian + 'I', \
75:             len(''.join(self.chunks))) + ''.join(self.chunks))
76: 
77:         for c in self.chunks:
78:             if self.__dict__[c.strip()] is None:
79:                 continue
80: 
81:             matrix = self.__dict__[c.strip()]
82:             if len(matrix.shape) == 1:
83:                 argument = 1
84:             else:
85:                 argument = matrix.shape[1]
86:             data_size = matrix.shape[0] * argument * 8
87: 
88:             header = struct.pack(self.endian + 'dIIHdI', self.shift_length, \
89:                 matrix.shape[0], argument, self.bit_size, self.weight, data_size)
90:             stf_file.write(c + \
91:                 struct.pack(self.endian + 'I', len(header) + data_size) + header)
92: 
93:             for i in xrange(matrix.shape[0]):
94:                 if argument == 1:
95:                     stf_file.write(struct.pack(self.endian + 'd', matrix[i]))
96:                 else:
97:                     for j in xrange(matrix.shape[1]):
98:                         stf_file.write(struct.pack(self.endian + 'd', \
99:                                                             matrix[i, j]))
100: 
101: if __name__ == '__main__':
102:     if len(sys.argv) < 2:
103:         print 'Usage: %s <stf_file>' % sys.argv[0]
104:         sys.exit()
105: 
106:     stf = STF()
107:     stf.loadfile(sys.argv[1])
108:     print stf.F0

基本的に、structモジュールを用いてデータの読み込み・書き込みを行っている。STFクラスのself.chunksに含まれているチャンクは、インスタンス変数としてnumpyの配列に変換されるようになっている。

3.2 MFCCの抽出

次に、STFファイルから読み込んだスペクトル包絡からMFCCを抽出するライブラリを実装する。また、声質変換後の音声合成のために、MFCCからスペクトル包絡を復元する機能も実装する。なお、実装にあたっては、「人工知能に関する断創録[4]」を参考にした。

まず、周波数をMFCCの算出に用いるメル尺度という音高の知覚的尺度に変換する関数と、その逆の処理をする関数を用意する。hz2melによって周波数をメル尺度に、mel2hzによってメル尺度を周波数に変換する。

周波数とメル尺度の変換

    def hz2mel(self, f):
        return 1127.01048 * numpy.log(f / 700.0 + 1.0)

    def mel2hz(self, m):
        return 700.0 * (numpy.exp(m / 1127.01048) - 1.0)

次に、メルフィルタバンクを計算する。これは、メル尺度上で等間隔にならぶバンドパスフィルタを並べたものであり、このフィルタをスペクトル包絡に適用することで人間が知覚しにくい領域の重みが小さくなる。

メルフィルタバンクの導出

    def melFilterBank(self):
        # サンプリング周波数の半分(ナイキスト周波数)までを対象とする
        fmax = self.frequency / 2
        melmax = self.hz2mel(fmax)

        # 周波数に合わせて、サンプル数の半分の標本数で計算する
        nmax = self.nfft / 2
        df = self.frequency / self.nfft

        # フィルタごとの中心となるメル尺度を計算する
        dmel = melmax / (self.channels + 1)
        melcenters = numpy.arange(1, self.channels + 1) * dmel
        fcenters = self.mel2hz(melcenters)

        # それぞれの標本が対象とする周波数の範囲を計算する
        indexcenter = numpy.round(fcenters / df)
        indexstart = numpy.hstack(([0], indexcenter[0: self.channels - 1]))
        indexstop = numpy.hstack((indexcenter[1: self.channels], [nmax]))

        # フィルタごとにindexstartを始点、indexcenterを頂点、
        # indexstopを終点とする三角形のグラフを描くように計算する
        filterbank = numpy.zeros((self.channels, nmax))
        for c in numpy.arange(0, self.channels):
            increment = 1.0 / (indexcenter[c] - indexstart[c])
            for i in numpy.arange(indexstart[c], indexcenter[c]):
                filterbank[c, i] = (i - indexstart[c]) * increment
            decrement = 1.0 / (indexstop[c] - indexcenter[c])
            for i in numpy.arange(indexcenter[c], indexstop[c]):
                filterbank[c, i] = 1.0 - ((i - indexcenter[c]) * decrement)
            filterbank[c] /= (indexstop[c] - indexstart[c]) / 2

        return filterbank, fcenters

ここで、self.channelsはメルフィルタバンクに用いられるバンドパスフィルタの数を示しており、後に出てくるMFCCの次元数と同じか、より大きくする必要がある。

このメルフィルタバンクをスペクトル包絡に適用した後に、離散コサイン変換によって得られる係数がMFCCである。つまり、MFCCを得る関数は以下のようになる。

MFCCの導出

    def mfcc(self, spectrum):
        # スペクトル包絡として負の値が与えられた場合は、0として扱う
        spectrum = numpy.maximum(numpy.zeros(spectrum.shape), spectrum)
        # スペクトル包絡とメルフィルタバンクの積の対数を取る
        mspectrum = numpy.log10(numpy.dot(spectrum, self.filterbank.transpose()))
        # scipyを用いて離散コサイン変換をする
        return scipy.fftpack.dct(mspectrum, norm = 'ortho')[:self.dimension]

ここで、self.dimensionは離散コサイン変換の結果のうち低次の係数から何次元だけ用いるかを示しており、一般には16次元の係数を用いることが多いが、より精度を求める場合には32次元や64次元と設定することもある。

また、MFCCからスペクトル包絡への逆変換は、離散コサイン変換の逆変換を用いて、以下のように実装できる。この実装においては、逆変換の後にscipy.interpolateを用いてスプライン補間をしている。

MFCCからのスペクトル包絡の導出

    def imfcc(self, mfcc):
        # MFCCの削られた部分に0を代入した上で、逆離散コサイン変換をする
        mfcc = numpy.hstack([mfcc, [0] * (self.channels - self.dimension)])
        mspectrum = scipy.fftpack.idct(mfcc, norm = 'ortho')
        # 得られた離散的な値をスプライン補間によって連続的にする
        tck = scipy.interpolate.splrep(self.fcenters, numpy.power(10, mspectrum))
        return scipy.interpolate.splev(self.fscale, tck)

そして、今後のために動的変化量を求める関数を実装しておく。これは、あるフレームの前後数フレームでのMFCCの変化量を微分したもの(回帰係数)であり、ここでは簡単のため、1つ前のフレームと1つ後のフレームの差を2で割ったものを用いることとする。

MFCCの動的変化量の導出

    def delta(self, mfcc):
        # データの開始部と終了部は同じデータが続いているものとする
        mfcc = numpy.concatenate([[mfcc[0]], mfcc, [mfcc[-1]]])

        delta = None
        for i in xrange(1, mfcc.shape[0] - 1):
            # 前後のフレームの差を2で割ったものを動的変化量とする
            slope = (mfcc[i + 1] - mfcc[i - 1]) / 2
            if delta is None:
                delta = slope
            else:
                delta = numpy.vstack([delta, slope])

        return delta

これまでのソースコードを全部組み合わせたmfcc.pyは以下の通りである。なお、このモジュールをコマンドライン引数にSTFファイルを与えて実行すると、MFCCの出力及び元データとMFCCからの逆変換で得られるデータの波形の違いを確認することができる。

リスト3.2: mfcc.py

 1: #!/usr/bin/env python
 2: 
 3: import numpy
 4: import scipy.fftpack
 5: import scipy.interpolate
 6: import scipy.linalg
 7: import sys
 8: 
 9: from stf import STF
10: 
11: class MFCC:
12:     '''
13:     MFCC computation from spectrum information
14: 
15:     Reference
16:     ---------
17:      - http://aidiary.hatenablog.com/entry/20120225/1330179868
18:     '''
19: 
20:     def __init__(self, nfft, frequency, dimension = 16, channels = 20):
21:         self.nfft = nfft
22:         self.frequency = frequency
23:         self.dimension = dimension
24:         self.channels = channels
25: 
26:         self.fscale = \
27:             numpy.fft.fftfreq(self.nfft, d = 1.0 / self.frequency)[: self.nfft / 2]
28:         self.filterbank, self.fcenters = self.melFilterBank()
29: 
30:     def hz2mel(self, f):
31:         return 1127.01048 * numpy.log(f / 700.0 + 1.0)
32: 
33:     def mel2hz(self, m):
34:         return 700.0 * (numpy.exp(m / 1127.01048) - 1.0)
35: 
36:     def melFilterBank(self):
37:         <省略>
38: 
39:     def mfcc(self, spectrum):
40:         <省略>
41: 
42:     def delta(self, mfcc):
43:         <省略>
44: 
45:     def imfcc(self, mfcc):
46:         <省略>
47: 
48: if __name__ == '__main__':
49:     if len(sys.argv) < 2:
50:         print 'Usage: %s <stf_file>' % sys.argv[0]
51:         sys.exit()
52: 
53:     stf = STF()
54:     stf.loadfile(sys.argv[1])
55: 
56:     mfcc = MFCC(stf.SPEC.shape[1] * 2, stf.frequency)
57:     res = mfcc.mfcc(stf.SPEC[stf.SPEC.shape[0] / 5])
58:     spec = mfcc.imfcc(res)
59: 
60:     print res
61: 
62:     import pylab
63: 
64:     pylab.subplot(211)
65:     pylab.plot(stf.SPEC[stf.SPEC.shape[0] / 5])
66:     pylab.ylim(0, 1.2)
67:     pylab.subplot(212)
68:     pylab.plot(spec)
69:     pylab.ylim(0, 1.2)
70:     pylab.show()

以下は、同じ音声データをMFCCに変換し、逆変換した結果を、16次元まで求めたものと、64次元まで求めたもので比べたものである。16次元でも低周波数ではある程度復元できているものの、64次元の方がより元のスペクトル包絡に近くなっていることがわかる。また、64次元のものでも、高周波数帯になるにつれて線が滑らかになってしまっているのがわかるだろう。

MFCCの次元数による精度の違い

図3.1: MFCCの次元数による精度の違い

3.3 動的時間伸縮の実装

GMMを使った学習処理を行う前に、もう1つ実装しておくべき機能が、動的時間伸縮(Dynamic Time Warping、以下、「DTW」という)である。これは、学習に使うパラレルデータを作るために欠かせない処理であり、同じ発話内容の音声データであっても話すスピードや間の長さの違いによってフレームごとでは対応が取れていない場合が多いので、DPマッチングに基づいてデータを伸縮させるという処理である。つまり、以下の図のように、同じ音を発している部分の対応を見つけ、両方が同じフレーム数になるように伸縮させるという処理である。

話者による波形の違い(<a href="bib.html#bib-dtw-ut">[5]</a>より引用)

図3.2: 話者による波形の違い([5]より引用)

そのためには、まずDPマッチングを実装する必要がある。DPマッチングは、その名の通り動的計画法を用いた手法で、以下のように説明できる。

  1. 1つ目のデータをa_1, a_2, ..., a_N、2つ目のデータをb_1, b_2, ..., b_Mとして、N×Mのグリッドグラフを用意する。
  2. 頂点(i, j)に対し、a_ib_jの類似度をコストとして設定する。
  3. 頂点(1, 1)から頂点(N, M)への最小コストのパスを検索する。
  4. 最小コストの値からパスを逆順に求めていき、頂点(i, j)を通過したならば、a_ib_jが対応していることがわかる。

図で表すと以下の様なイメージとなる。同じ内容の発話をしているフレームに対応する頂点を、赤いパスが通過していることがわかる。

DPマッチングのイメージ(<a href="bib.html#bib-dtw-ut">[5]</a>より引用)

図3.3: DPマッチングのイメージ([5]より引用)

Pythonでの実装は以下の通りとなる。なお、self.distanceはフレーム間の類似度を算出する関数である。

DPマッチング

    def dtw(self):
        M, N = len(self.source), len(self.target)
        cost = sys.maxint * numpy.ones((M, N))

        # グリッドグラフの1行目、1列目だけ先に処理しておく
        cost[0, 0] = self.distance(self.source[0], self.target[0])
        for i in range(1, M):
            cost[i, 0] = cost[i - 1, 0] + self.distance(self.source[i], self.target[0])
        for i in range(1, N):
            cost[0, i] = cost[0, i - 1] + self.distance(self.source[0], self.target[i])

        # 各頂点までの最短パスの長さを計算する
        for i in range(1, M):
            # 各フレームの前後self.windowフレームだけを参照する
            for j in range(max(1, i - self.window), min(N, i + self.window)):
                cost[i, j] = \
                        min(cost[i - 1, j - 1], cost[i, j - 1], cost[i - 1, j]) + \
                        self.distance(self.source[i], self.target[j])

        m, n = M - 1, N - 1
        self.path = []

        # 最短パスの経路を逆順に求めていく
        while (m, n) != (0, 0):
            self.path.append((m, n))
            m, n = min((m - 1, n), (m, n - 1), (m - 1, n - 1), \
                                    key = lambda x: cost[x[0], x[1]])
            if m < 0 or n < 0:
                break

        self.path.append((0, 0))

ここで注意すべき点は、self.windowによる探索範囲の制限である。これは、あるフレームに対して比較対象を前後self.windowフレームのみに限定するというもので、探索時間と、同じ発話内容が繰り返し現れた時に異なる繰り返しを参照してしまう可能性の低減を目的としている。これまで使ってみた限りでは、self.windowを2つのデータのフレーム数の差の2倍程度にちょうど設定するくらいがよいであろう。

次に、DPマッチングの結果に応じてデータを伸縮させる処理を実装する。この際、単に対応するフレームを用いるのではなく、1つ前のフレームが対応するフレームとの間で最も類似度が高いものを選ぶようにしている。つまり、DPマッチングの説明の時と同じ記号を用いると、パスが(i, j)の次に(i + 1, k)を通るとすると、a_{i+1}に対応するフレームは、b_j, b_{j+1}, ..., b_kの中で最もa_{i+1}に類似度が高いフレームを選ぶ。

DPマッチングによる伸縮

    def align(self, data, reverse = False):
        # reverse = Trueの時は、targetのフレーム数に合わせるようにする
        if reverse:
            path = [(t[1], t[0]) for t in self.path]
            source = self.target
            target = self.source
        else:
            path = self.path
            source = self.source
            target = self.target

        path.sort(key = lambda x: (x[1], x[0]))

        shape = tuple([path[-1][1] + 1] + list(data.shape[1:]))
        alignment = numpy.ndarray(shape)

        idx = 0
        frame = 0
        candicates = []

        while idx < len(path) and frame < target.shape[0]:
            if path[idx][1] > frame:
                # 候補となっているフレームから最も類似度が高いフレームを選ぶ
                candicates.sort(key = lambda x: \
                                    self.distance(source[x], target[frame]))
                alignment[frame] = data[candicates[0]]

                candicates = [path[idx][0]]
                frame += 1
            else:
                candicates.append(path[idx][0])
                idx += 1

        if frame < target.shape[0]:
            candicates.sort(key = lambda x: self.distance(source[x], target[frame]))
            alignment[frame] = data[candicates[0]]

        return alignment

DTWの実装をすべてまとめてモジュールとしたものが以下のソースコードである。この実装では、ユークリッド距離とコサイン距離のどちらかから類似度計算関数を選択できるようにしている。また、このモジュールをコマンドライン引数に2つのSTFファイルを与えて実行すると、DTW前と後での2つのMFCCの第1次係数のズレの違いが確認できるようになっている。

リスト3.3: dtw.py

 1: #!/usr/bin/env python
 2: 
 3: import numpy
 4: import scipy
 5: import scipy.linalg
 6: import sys
 7: 
 8: class DTW:
 9:     def __getstate__(self):
10:         d = self.__dict__.copy()
11: 
12:         if self.distance == self.cosine:
13:             d['distance'] = 'cosine'
14:         elif self.distance == self.euclidean:
15:             d['distance'] = 'euclidean'
16: 
17:         return d
18: 
19:     def __setstate__(self, dict):
20:         self.__dict__ = dict
21: 
22:         if dict['distance'] == 'cosine':
23:             self.distance = self.cosine
24:         elif dict['distance'] == 'euclidean':
25:             self.distance = self.euclidean
26: 
27:     def cosine(self, A, B):
28:         return scipy.dot(A, B.transpose()) / scipy.linalg.norm(A) \
29:                                                 / scipy.linalg.norm(B)
30: 
31:     def euclidean(self, A, B):
32:         return scipy.linalg.norm(A - B)
33: 
34:     def __init__(self, source, target, distance = None, window = sys.maxint):
35:         self.window = window
36:         self.source = source
37:         self.target = target
38: 
39:         if distance:
40:             self.distance = distance
41:         else:
42:             self.distance = self.euclidean
43: 
44:         self.dtw()
45: 
46:     def dtw(self):
47:         <省略>
48: 
49:     def align(self, data, reverse = False):
50:         <省略>
51: 
52: if __name__ == '__main__':
53:     if len(sys.argv) < 3:
54:         print 'Usage: %s <source stf> <target stf>' % sys.argv[0]
55:         sys.exit()
56: 
57:     from stf import STF
58:     source, target = STF(), STF()
59:     source.loadfile(sys.argv[1])
60:     target.loadfile(sys.argv[2])
61: 
62:     from mfcc import MFCC
63:     mfcc = MFCC(source.SPEC.shape[1] * 2, source.frequency)
64:     source_mfcc = numpy.array([mfcc.mfcc(source.SPEC[frame]) \
65:                         for frame in xrange(source.SPEC.shape[0])])
66:     mfcc = MFCC(target.SPEC.shape[1] * 2, target.frequency)
67:     target_mfcc = numpy.array([mfcc.mfcc(target.SPEC[frame]) \
68:                         for frame in xrange(target.SPEC.shape[0])])
69: 
70:     dtw = DTW(source_mfcc, target_mfcc, \
71:         window = abs(source_mfcc.shape[0] - target_mfcc.shape[0]) * 2)
72:     warp_mfcc = dtw.align(source_mfcc)
73: 
74:     import pylab
75:     pylab.subplot(211)
76:     pylab.plot(source_mfcc[:, 0])
77:     pylab.plot(target_mfcc[:, 0])
78:     pylab.subplot(212)
79:     pylab.plot(warp_mfcc[:, 0])
80:     pylab.plot(target_mfcc[:, 0])
81:     pylab.show()

このモジュールを同じ発話内容を異なる話者が発声したデータを使って実行した結果である。上のグラフだと、最初から大きくずれが発生し、終了位置も異なるが、下のグラフだと、波形がマッチしており、長さも等しくなっていることが分かる。

DTWの実行結果

図3.4: DTWの実行結果