"""Decoder for word-level quality estimation."""
# OpenKiwi: Open-Source Machine Translation Quality Estimation
# Copyright (C) 2019 Unbabel <openkiwi@unbabel.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import numpy as np
from .sequence_parts import SequenceBigramPart, SequenceUnigramPart
from .structured_decoder import StructuredDecoder
[docs]def logzero():
"""Return log of zero."""
return -np.inf
[docs]class LinearWordQEDecoder(StructuredDecoder):
"""A decoder for word-level quality estimation."""
def __init__(
self, estimator, cost_false_positives=0.5, cost_false_negatives=0.5
):
StructuredDecoder.__init__(self)
self.estimator = estimator
self.cost_false_positives = cost_false_positives
self.cost_false_negatives = cost_false_negatives
[docs] def decode_mira(
self, instance, parts, scores, gold_outputs, old_mira=False
):
"""Cost-augmented decoder. Allows a compromise between precision and
recall. In general:
p = a - (a+b)*z0
q = b*sum(z0)
p'*z + q = a*sum(z) - (a+b)*z0'*z + b*sum(z0)
= a*(1-z0)'*z + b*(1-z)'*z0
a => penalty for predicting 1 when it is 0 (FP)
b => penalty for predicting 0 when it is 1 (FN)
F1: a = 0.5, b = 0.5
recall: a = 0, b = 1"""
a = self.cost_false_positives
b = self.cost_false_negatives
# Allow multiple bad labels.
bad = []
for label in self.estimator.labels:
coarse_label = self.estimator.get_coarse_label(label)
if coarse_label == 'BAD':
bad.append(self.estimator.labels[label])
bad = set(bad)
index_parts = [
i
for i in range(len(parts))
if isinstance(parts[i], SequenceUnigramPart)
and parts[i].label in bad
]
p = np.zeros(len(parts))
p[index_parts] = a - (a + b) * gold_outputs[index_parts]
q = b * np.ones(len(gold_outputs[index_parts])).dot(
gold_outputs[index_parts]
)
if old_mira:
predicted_outputs = self.decode(instance, parts, scores)
else:
scores_cost = scores + p
predicted_outputs = self.decode(instance, parts, scores_cost)
cost = p.dot(predicted_outputs) + q
loss = cost + scores.dot(predicted_outputs - gold_outputs)
return predicted_outputs, cost, loss
[docs] def decode(self, instance, parts, scores):
"""Decoder. Return the most likely sequence of OK/BAD labels."""
if self.estimator.use_bigrams:
return self.decode_with_bigrams(instance, parts, scores)
else:
return self.decode_with_unigrams(instance, parts, scores)
[docs] def decode_with_unigrams(self, instance, parts, scores):
"""Decoder for a non-sequential model (unigrams only)."""
predicted_output = np.zeros(len(scores))
parts_by_index = [[] for _ in range(instance.num_words())]
for r, part in enumerate(parts):
parts_by_index[part.index].append(r)
for i in range(instance.num_words()):
num_labels = len(parts_by_index[i])
label_scores = np.zeros(num_labels)
predicted_for_word = [0] * num_labels
for k, r in enumerate(parts_by_index[i]):
label_scores[k] = scores[r]
best = np.argmax(label_scores)
predicted_for_word[best] = 1.0
r = parts_by_index[i][best]
predicted_output[r] = 1.0
return predicted_output
[docs] def decode_with_bigrams(self, instance, parts, scores):
"""Decoder for a sequential model (with bigrams)."""
num_labels = len(self.estimator.labels)
num_words = instance.num_words()
initial_scores = np.zeros(num_labels)
transition_scores = np.zeros((num_words - 1, num_labels, num_labels))
final_scores = np.zeros(num_labels)
emission_scores = np.zeros((num_words, num_labels))
indexed_unigram_parts = [{} for _ in range(num_words)]
indexed_bigram_parts = [{} for _ in range(num_words + 1)]
for r, part in enumerate(parts):
if isinstance(part, SequenceUnigramPart):
indexed_unigram_parts[part.index][part.label] = r
emission_scores[part.index, part.label] = scores[r]
elif isinstance(part, SequenceBigramPart):
indexed_bigram_parts[part.index][
(part.label, part.previous_label)
] = r
if part.previous_label < 0:
assert part.index == 0
initial_scores[part.label] = scores[r]
elif part.label < 0:
assert part.index == num_words
final_scores[part.previous_label] = scores[r]
else:
transition_scores[
part.index - 1, part.label, part.previous_label
] = scores[r]
else:
raise NotImplementedError
best_path, _ = self.run_viterbi(
initial_scores, transition_scores, final_scores, emission_scores
)
predicted_output = np.zeros(len(scores))
previous_label = -1
for i, label in enumerate(best_path):
r = indexed_unigram_parts[i][label]
predicted_output[r] = 1.0
r = indexed_bigram_parts[i][(label, previous_label)]
predicted_output[r] = 1.0
previous_label = label
r = indexed_bigram_parts[num_words][(-1, previous_label)]
predicted_output[r] = 1.0
return predicted_output
[docs] def run_viterbi(
self, initial_scores, transition_scores, final_scores, emission_scores
):
"""Computes the viterbi trellis for a given sequence.
Receives:
- Initial scores: (num_states) array
- Transition scores: (length-1, num_states, num_states) array
- Final scores: (num_states) array
- Emission scores: (length, num_states) array."""
length = np.size(emission_scores, 0) # Length of the sequence.
num_states = np.size(initial_scores) # Number of states.
# Variables storing the Viterbi scores.
viterbi_scores = np.zeros([length, num_states]) + logzero()
# Variables storing the paths to backtrack.
viterbi_paths = -np.ones([length, num_states], dtype=int)
# Most likely sequence.
best_path = -np.ones(length, dtype=int)
# Initialization.
viterbi_scores[0, :] = emission_scores[0, :] + initial_scores
# Viterbi loop.
for pos in range(1, length):
for current_state in range(num_states):
viterbi_scores[pos, current_state] = np.max(
viterbi_scores[pos - 1, :]
+ transition_scores[pos - 1, current_state, :]
)
viterbi_scores[pos, current_state] += emission_scores[
pos, current_state
]
viterbi_paths[pos, current_state] = np.argmax(
viterbi_scores[pos - 1, :]
+ transition_scores[pos - 1, current_state, :]
)
# Termination.
assert len(viterbi_scores[length - 1, :] + final_scores)
best_score = np.max(viterbi_scores[length - 1, :] + final_scores)
best_path[length - 1] = np.argmax(
viterbi_scores[length - 1, :] + final_scores
)
# Backtrack.
for pos in range(length - 2, -1, -1):
best_path[pos] = viterbi_paths[pos + 1, best_path[pos + 1]]
return best_path, best_score