Source code for galois._code._reed_solomon

import numba
from numba import int64
import numpy as np

from .._factor import factors
from .._field import Field, Poly, matlab_primitive_poly
from .._field._meta_function import UNARY_CALCULATE_SIG, BINARY_CALCULATE_SIG, BERLEKAMP_MASSEY_CALCULATE_SIG, POLY_ROOTS_CALCULATE_SIG, POLY_EVALUATE_CALCULATE_SIG, CONVOLVE_CALCULATE_SIG
from .._overrides import set_module

from ._cyclic import poly_to_generator_matrix, roots_to_parity_check_matrix

__all__ = ["ReedSolomon"]


@set_module("galois")
class ReedSolomon:
    r"""
    Constructs a :math:`\textrm{RS}(n, k)` code.

    A :math:`\textrm{RS}(n, k)` code is a :math:`[n, k, d]_q` linear block code.

    To create the shortened :math:`\textrm{RS}(n-s, k-s)` code, construct the full-sized :math:`\textrm{RS}(n, k)` code
    and then pass :math:`k-s` symbols into :func:`encode` and :math:`n-s` symbols into :func:`decode()`. Shortened codes are only
    applicable for systematic codes.

    Parameters
    ----------
    n : int
        The codeword size :math:`n`, must be :math:`n = q - 1`.
    k : int
        The message size :math:`k`. The error-correcting capability :math:`t` is defined by :math:`n - k = 2t`.
    c : int, optional
        The first consecutive power of :math:`\alpha`. The default is 1.
    primitive_poly : galois.Poly, optional
        Optionally specify the primitive polynomial that defines the extension field :math:`\mathrm{GF}(q)`. The default is
        `None` which uses Matlab's default, see :func:`galois.matlab_primitive_poly`. Matlab tends to use the lexicographically-minimal
        primitive polynomial as a default instead of the Conway polynomial.
    primitive_element : int, galois.Poly, optional
        Optionally specify the primitive element :math:`\alpha` of :math:`\mathrm{GF}(q)` whose powers are roots of the generator polynomial :math:`g(x)`.
        The default is `None` which uses the lexicographically-minimal primitive element in :math:`\mathrm{GF}(q)`, i.e.
        `galois.primitive_element(p, m)`.
    systematic : bool, optional
        Optionally specify if the encoding should be systematic, meaning the codeword is the message with parity
        appended. The default is `True`.

    Examples
    --------
    .. ipython:: python

        rs = galois.ReedSolomon(15, 9)
        GF = rs.field
        m = GF.Random(rs.k); m
        c = rs.encode(m); c
        # Corrupt the first symbol in the codeword
        c[0] ^= 13
        dec_m = rs.decode(c); dec_m
        np.array_equal(dec_m, m)

        # Instruct the decoder to return the number of corrected symbol errors
        dec_m, N = rs.decode(c, errors=True); dec_m, N
        np.array_equal(dec_m, m)
    """
    # pylint: disable=no-member

    def __new__(cls, n, k, c=1, primitive_poly=None, primitive_element=None, systematic=True):
        if not isinstance(n, (int, np.integer)):
            raise TypeError(f"Argument `n` must be an integer, not {type(n)}.")
        if not isinstance(k, (int, np.integer)):
            raise TypeError(f"Argument `k` must be an integer, not {type(k)}.")
        if not isinstance(c, (int, np.integer)):
            raise TypeError(f"Argument `c` must be an integer, not {type(c)}.")
        if not isinstance(primitive_poly, (type(None), int, Poly)):
            raise TypeError(f"Argument `primitive_poly` must be None, an int, or galois.Poly, not {type(primitive_poly)}.")
        if not isinstance(systematic, bool):
            raise TypeError(f"Argument `systematic` must be a bool, not {type(systematic)}.")

        if not (n - k) % 2 == 0:
            raise ValueError("Arguments `n - k` must be even.")
        p, m = factors(n + 1)
        if not (len(p) == 1 and len(m) == 1):
            raise ValueError(f"Argument `n` must have value `q - 1` for a prime power `q`, not {n}.")
        if not c >= 1:
            raise ValueError(f"Argument `c` must be at least 1, not {c}.")
        p, m = p[0], m[0]

        if primitive_poly is None and m > 1:
            primitive_poly = matlab_primitive_poly(p, m)

        obj = super().__new__(cls)

        obj._n = n
        obj._k = k
        obj._c = c
        obj._systematic = systematic

        GF = Field(p**m, irreducible_poly=primitive_poly, primitive_element=primitive_element)
        alpha = GF.primitive_element
        t = (n - k) // 2
        roots = alpha**(c + np.arange(0, 2*t))
        g = Poly.Roots(roots)

        obj._generator_poly = g
        obj._roots = roots
        obj._field = GF
        obj._t = obj.roots.size // 2

        obj._G = poly_to_generator_matrix(n, obj.generator_poly, systematic)
        obj._H = roots_to_parity_check_matrix(n, obj.roots)

        obj._is_narrow_sense = c == 1

        return obj

    def __init__(self, *args, **kwargs):  # pylint: disable=unused-argument
        # Pre-compile the arithmetic methods
        self._add_jit = self.field._calculate_jit("add")
        self._subtract_jit = self.field._calculate_jit("subtract")
        self._multiply_jit = self.field._calculate_jit("multiply")
        self._reciprocal_jit = self.field._calculate_jit("reciprocal")
        self._power_jit = self.field._calculate_jit("power")

        # Pre-compile the JIT functions
        self._berlekamp_massey_jit = self.field._function("berlekamp_massey")
        self._poly_divmod_jit = self.field._function("poly_divmod")
        self._poly_roots_jit = self.field._function("poly_roots")
        self._poly_eval_jit = self.field._function("poly_evaluate")
        self._convolve_jit = self.field._function("convolve")

        # Pre-compile the JIT decoder
        self._decode_jit = numba.jit(DECODE_CALCULATE_SIG.signature, nopython=True, cache=True)(decode_calculate)

    def __str__(self):
        return f"<Reed-Solomon Code: [{self.n}, {self.k}, {self.d}] over {self.field.name}>"

    def __repr__(self):
        return str(self)

[docs] def encode(self, message, parity_only=False): r""" Encodes the message :math:`\mathbf{m}` into the Reed-Solomon codeword :math:`\mathbf{c}`. The message vector :math:`\mathbf{m}` is defined as :math:`\mathbf{m} = [m_{k-1}, \dots, m_1, m_0] \in \mathrm{GF}(q)^k`, which corresponds to the message polynomial :math:`m(x) = m_{k-1} x^{k-1} + \dots + m_1 x + m_0`. The codeword vector :math:`\mathbf{c}` is defined as :math:`\mathbf{c} = [c_{n-1}, \dots, c_1, c_0] \in \mathrm{GF}(q)^n`, which corresponds to the codeword polynomial :math:`c(x) = c_{n-1} x^{n-1} + \dots + c_1 x + c_0`. The codeword vector is computed from the message vector by :math:`\mathbf{c} = \mathbf{m}\mathbf{G}`, where :math:`\mathbf{G}` is the generator matrix. The equivalent polynomial operation is :math:`c(x) = m(x)g(x)`. For systematic codes, :math:`\mathbf{G} = [\mathbf{I}\ |\ \mathbf{P}]` such that :math:`\mathbf{c} = [\mathbf{m}\ |\ \mathbf{p}]`. And in polynomial form, :math:`p(x) = -(m(x) x^{n-k}\ \textrm{mod}\ g(x))` with :math:`c(x) = m(x)x^{n-k} + p(x)`. For systematic and non-systematic codes, each codeword is a multiple of the generator polynomial, i.e. :math:`g(x)\ |\ c(x)`. For the shortened :math:`\textrm{RS}(n-s, k-s)` code (only applicable for systematic codes), pass :math:`k-s` symbols into :func:`encode` to return the :math:`n-s`-symbol codeword. Parameters ---------- message : numpy.ndarray, galois.FieldArray The message as either a :math:`k`-length vector or :math:`(N, k)` matrix, where :math:`N` is the number of messages. For systematic codes, message lengths less than :math:`k` may be provided to produce shortened codewords. parity_only : bool, optional Optionally specify whether to return only the parity symbols. This only applies to systematic codes. The default is `False`. Returns ------- numpy.ndarray, galois.FieldArray The codeword as either a :math:`n`-length vector or :math:`(N, n)` matrix. The return type matches the message type. If `parity_only=True`, the parity symbols are returned as either a :math:`n - k`-length vector or :math:`(N, n-k)` matrix. Examples -------- Encode a single codeword. .. ipython:: python rs = galois.ReedSolomon(15, 9) GF = rs.field m = GF.Random(rs.k); m c = rs.encode(m); c p = rs.encode(m, parity_only=True); p Encode a single, shortened codeword. .. ipython:: python m = GF.Random(rs.k - 4); m c = rs.encode(m); c Encode a matrix of codewords. .. ipython:: python m = GF.Random((5, rs.k)); m c = rs.encode(m); c p = rs.encode(m, parity_only=True); p """ if not isinstance(message, np.ndarray): raise TypeError(f"Argument `message` must be a subclass of np.ndarray (or a galois.GF2 array), not {type(message)}.") if parity_only and not self.systematic: raise ValueError("Argument `parity_only` only applies to systematic codes.") if self.systematic: if not message.shape[-1] <= self.k: raise ValueError(f"For a systematic code, argument `message` must be a 1-D or 2-D array with last dimension less than or equal to {self.k}, not shape {message.shape}.") else: if not message.shape[-1] == self.k: raise ValueError(f"For a non-systematic code, argument `message` must be a 1-D or 2-D array with last dimension equal to {self.k}, not shape {message.shape}.") ks = message.shape[-1] # The number of input message symbols (could be less than self.k for shortened codes) if parity_only: parity = message.view(self.field) @ self.G[-ks:, self.k:] return parity.view(type(message)) elif self.systematic: parity = message.view(self.field) @ self.G[-ks:, self.k:] return np.hstack((message, parity)).view(type(message)) else: codeword = message.view(self.field) @ self.G return codeword.view(type(message))
[docs] def detect(self, codeword): r""" Detects if errors are present in the Reed-Solomon codeword :math:`\mathbf{c}`. The :math:`[n, k, d]_q` Reed-Solomon code has :math:`d_{min} = d` minimum distance. It can detect up to :math:`d_{min}-1` errors. Parameters ---------- codeword : numpy.ndarray, galois.FieldArray The codeword as either a :math:`n`-length vector or :math:`(N, n)` matrix, where :math:`N` is the number of codewords. For systematic codes, codeword lengths less than :math:`n` may be provided for shortened codewords. Returns ------- bool, numpy.ndarray A boolean scalar or array indicating if errors were detected in the corresponding codeword `True` or not `False`. Examples -------- Detect errors in a valid codeword. .. ipython:: python rs = galois.ReedSolomon(15, 9) GF = rs.field # The minimum distance of the code rs.d m = GF.Random(rs.k); m c = rs.encode(m); c rs.detect(c) Detect :math:`d_{min}-1` errors in a received codeword. .. ipython:: python # Corrupt the first `d - 1` symbols in the codeword c[0:rs.d - 1] += GF(13) rs.detect(c) """ if not isinstance(codeword, np.ndarray): raise TypeError(f"Argument `codeword` must be a subclass of np.ndarray (or a galois.GF2 array), not {type(codeword)}.") if self.systematic: if not codeword.shape[-1] <= self.n: raise ValueError(f"For a systematic code, argument `codeword` must be a 1-D or 2-D array with last dimension less than or equal to {self.n}, not shape {codeword.shape}.") else: if not codeword.shape[-1] == self.n: raise ValueError(f"For a non-systematic code, argument `codeword` must be a 1-D or 2-D array with last dimension equal to {self.n}, not shape {codeword.shape}.") codeword_1d = codeword.ndim == 1 ns = codeword.shape[-1] # The number of input codeword symbols (could be less than self.n for shortened codes) # Make codeword 2-D for array processing codeword = np.atleast_2d(codeword) # Compute the syndrome by matrix multiplying with the parity-check matrix syndrome = codeword.view(self.field) @ self.H[:,-ns:].T detected = ~np.all(syndrome == 0, axis=1) if codeword_1d: detected = detected[0] return detected
[docs] def decode(self, codeword, errors=False): r""" Decodes the Reed-Solomon codeword :math:`\mathbf{c}` into the message :math:`\mathbf{m}`. The codeword vector :math:`\mathbf{c}` is defined as :math:`\mathbf{c} = [c_{n-1}, \dots, c_1, c_0] \in \mathrm{GF}(q)^n`, which corresponds to the codeword polynomial :math:`c(x) = c_{n-1} x^{n-1} + \dots + c_1 x + c_0`. The message vector :math:`\mathbf{m}` is defined as :math:`\mathbf{m} = [m_{k-1}, \dots, m_1, m_0] \in \mathrm{GF}(q)^k`, which corresponds to the message polynomial :math:`m(x) = m_{k-1} x^{k-1} + \dots + m_1 x + m_0`. In decoding, the syndrome vector :math:`s` is computed by :math:`\mathbf{s} = \mathbf{c}\mathbf{H}^T`, where :math:`\mathbf{H}` is the parity-check matrix. The equivalent polynomial operation is :math:`s(x) = c(x)\ \textrm{mod}\ g(x)`. A syndrome of zeros indicates the received codeword is a valid codeword and there are no errors. If the syndrome is non-zero, the decoder will find an error-locator polynomial :math:`\sigma(x)` and the corresponding error locations and values. For the shortened :math:`\textrm{RS}(n-s, k-s)` code (only applicable for systematic codes), pass :math:`n-s` symbols into :func:`decode` to return the :math:`k-s`-symbol message. Parameters ---------- codeword : numpy.ndarray, galois.FieldArray The codeword as either a :math:`n`-length vector or :math:`(N, n)` matrix, where :math:`N` is the number of codewords. For systematic codes, codeword lengths less than :math:`n` may be provided for shortened codewords. errors : bool, optional Optionally specify whether to return the nubmer of corrected errors. Returns ------- numpy.ndarray, galois.FieldArray The decoded message as either a :math:`k`-length vector or :math:`(N, k)` matrix. int, np.ndarray Optional return argument of the number of corrected symbol errors as either a scalar or :math:`n`-length vector. Valid number of corrections are in :math:`[0, t]`. If a codeword has too many errors and cannot be corrected, -1 will be returned. Examples -------- Decode a single codeword. .. ipython:: python rs = galois.ReedSolomon(15, 9) GF = rs.field m = GF.Random(rs.k); m c = rs.encode(m); c # Corrupt the first symbol in the codeword c[0] += GF(13) dec_m = rs.decode(c); dec_m np.array_equal(dec_m, m) # Instruct the decoder to return the number of corrected symbol errors dec_m, N = rs.decode(c, errors=True); dec_m, N np.array_equal(dec_m, m) Decode a single, shortened codeword. .. ipython:: python m = GF.Random(rs.k - 4); m c = rs.encode(m); c # Corrupt the first symbol in the codeword c[0] += GF(13) dec_m = rs.decode(c); dec_m np.array_equal(dec_m, m) Decode a matrix of codewords. .. ipython:: python m = GF.Random((5, rs.k)); m c = rs.encode(m); c # Corrupt the first symbol in each codeword c[:,0] += GF(13) dec_m = rs.decode(c); dec_m np.array_equal(dec_m, m) # Instruct the decoder to return the number of corrected symbol errors dec_m, N = rs.decode(c, errors=True); dec_m, N np.array_equal(dec_m, m) """ # pylint: disable=protected-access if not isinstance(codeword, np.ndarray): raise TypeError(f"Argument `codeword` must be a subclass of np.ndarray (or a galois.FieldArray), not {type(codeword)}.") if self.systematic: if not codeword.shape[-1] <= self.n: raise ValueError(f"For a systematic code, argument `codeword` must be a 1-D or 2-D array with last dimension less than or equal to {self.n}, not shape {codeword.shape}.") else: if not codeword.shape[-1] == self.n: raise ValueError(f"For a non-systematic code, argument `codeword` must be a 1-D or 2-D array with last dimension equal to {self.n}, not shape {codeword.shape}.") codeword_1d = codeword.ndim == 1 dtype = codeword.dtype ns = codeword.shape[-1] # The number of input codeword symbols (could be less than self.n for shortened codes) ks = self.k - (self.n - ns) # The equivalent number of input message symbols (could be less than self.k for shortened codes) # Make codeword 2-D for array processing codeword = np.atleast_2d(codeword) # Compute the syndrome by matrix multiplying with the parity-check matrix syndrome = codeword.view(self.field) @ self.H[:,-ns:].T if self.field.ufunc_mode != "python-calculate": dec_codeword = self._decode_jit(codeword.astype(np.int64), syndrome.astype(np.int64), self.t, int(self.field.primitive_element), self._add_jit, self._subtract_jit, self._multiply_jit, self._reciprocal_jit, self._power_jit, self._berlekamp_massey_jit, self._poly_roots_jit, self._poly_eval_jit, self._convolve_jit, self.field.characteristic, self.field.degree, self.field._irreducible_poly_int) N_errors = dec_codeword[:, -1] if self.systematic: message = dec_codeword[:, 0:ks] else: message, _ = self.field._poly_divmod(dec_codeword[:, 0:self.n].view(self.field), self.generator_poly.coeffs) message = message.astype(dtype).view(type(codeword)) else: raise NotImplementedError("Reed-Solomon codes haven't been implemented for extremely large Galois fields.") if codeword_1d: message, N_errors = message[0,:], N_errors[0] if not errors: return message else: return message, N_errors
@property def field(self): r""" galois.FieldClass: The Galois field :math:`\mathrm{GF}(q)` that defines the Reed-Solomon code. """ return self._field @property def n(self): """ int: The codeword size :math:`n` of the :math:`[n, k, d]_q` code. """ return self._n @property def k(self): """ int: The message size :math:`k` of the :math:`[n, k, d]_q` code. """ return self._k @property def d(self): """ int: The design distance :math:`d` of the :math:`[n, k, d]_q` code. The minimum distance of a Reed-Solomon code is exactly equal to the design distance, :math:`d_{min} = d`. """ return 2*self.t + 1 @property def t(self): """ int: The error-correcting capability of the code. The code can correct :math:`t` symbol errors in a codeword. """ return self._t @property def systematic(self): """ bool: Indicates if the code is configured to return codewords in systematic form. """ return self._systematic @property def generator_poly(self): """ galois.Poly: The generator polynomial :math:`g(x)` whose roots are :obj:`ReedSolomon.roots`. """ return self._generator_poly @property def roots(self): r""" galois.FieldArray: The roots of the generator polynomial. These are consecutive powers of :math:`\alpha`. """ return self._roots @property def c(self): """ int: The degree of the first consecutive root. """ return self._c @property def G(self): r""" galois.FieldArray: The generator matrix :math:`\mathbf{G}` with shape :math:`(k, n)`. """ return self._G @property def H(self): r""" galois.FieldArray: The parity-check matrix :math:`\mathbf{H}` with shape :math:`(2t, n)`. """ return self._H @property def is_narrow_sense(self): r""" bool: Indicates if the Reed-Solomon code is narrow sense, meaning the roots of the generator polynomial are consecutive powers of :math:`\alpha` starting at 1, i.e. :math:`\alpha, \alpha^2, \dots, \alpha^{2t - 1}`. """ return self._is_narrow_sense ############################################################################### # JIT-compiled implementation of the specified functions ############################################################################### DECODE_CALCULATE_SIG = numba.types.FunctionType(int64[:,:](int64[:,:], int64[:,:], int64, int64, BINARY_CALCULATE_SIG, BINARY_CALCULATE_SIG, BINARY_CALCULATE_SIG, UNARY_CALCULATE_SIG, BINARY_CALCULATE_SIG, BERLEKAMP_MASSEY_CALCULATE_SIG, POLY_ROOTS_CALCULATE_SIG, POLY_EVALUATE_CALCULATE_SIG, CONVOLVE_CALCULATE_SIG, int64, int64, int64)) def decode_calculate(codeword, syndrome, t, primitive_element, ADD, SUBTRACT, MULTIPLY, RECIPROCAL, POWER, BERLEKAMP_MASSEY, POLY_ROOTS, POLY_EVAL, CONVOLVE, CHARACTERISTIC, DEGREE, IRREDUCIBLE_POLY): # pragma: no cover """ References ---------- * S. Lin and D. Costello. Error Control Coding. Section 7.4. """ args = CHARACTERISTIC, DEGREE, IRREDUCIBLE_POLY dtype = codeword.dtype N = codeword.shape[0] # The number of codewords n = codeword.shape[1] # The codeword size (could be less than the design n for shortened codes) # The last column of the returned decoded codeword is the number of corrected errors dec_codeword = np.zeros((N, n + 1), dtype=dtype) dec_codeword[:, 0:n] = codeword[:,:] for i in range(N): if not np.all(syndrome[i,:] == 0): # The syndrome vector is S = [S0, S1, ..., S2t-1] # The error pattern is defined as the polynomial e(x) = e_j1*x^j1 + e_j2*x^j2 + ... for j1 to jv, # implying there are v errors. And δi = e_ji is the i-th error value and βi = α^ji is the i-th error-locator # value and ji is the error location. # The error-locator polynomial σ(x) = (1 - β1*x)(1 - β2*x)...(1 - βv*x) where βi are the inverse of the roots # of σ(x). # Compute the error-locator polynomial σ(x) its v-reversal σ(x^-v), since the syndrome is passed in backwards sigma = BERLEKAMP_MASSEY(syndrome[i,:], ADD, SUBTRACT, MULTIPLY, RECIPROCAL, *args) sigma_rev = BERLEKAMP_MASSEY(syndrome[i,::-1], ADD, SUBTRACT, MULTIPLY, RECIPROCAL, *args) v = sigma.size - 1 # The number of errors if v > t: dec_codeword[i, -1] = -1 continue # Compute βi, the roots of σ(x^-v) which are the inverse roots of σ(x) degrees = np.arange(sigma_rev.size - 1, -1, -1) results = POLY_ROOTS(degrees, sigma_rev, primitive_element, ADD, MULTIPLY, POWER, *args) beta = results[0,:] # The roots of σ(x^-v) error_locations = results[1,:] # The roots as powers of the primitive element α if np.any(error_locations > n - 1): # Indicates there are "errors" in the zero-ed portion of a shortened code, which indicates there are actually # more errors than alleged. Return failure to decode. dec_codeword[i, -1] = -1 continue if beta.size != v: dec_codeword[i, -1] = -1 continue # Compute σ'(x) sigma_prime = np.zeros(v, dtype=np.int64) for j in range(v): degree = v - j sigma_prime[j] = MULTIPLY(degree % CHARACTERISTIC, sigma[j], *args) # Scalar multiplication # The error-value evalulator polynomial Z0(x) = S0*σ0 + (S1*σ0 + S0*σ1)*x + (S2*σ0 + S1*σ1 + S0*σ2)*x^2 + ... # with degree v-1 Z0 = CONVOLVE(sigma[-v:], syndrome[i,0:v][::-1], ADD, MULTIPLY, *args)[-v:] # The error value δi = -Z0(βi^-1) / σ'(βi^-1) for j in range(v): beta_inv = RECIPROCAL(beta[j], *args) Z0_i = POLY_EVAL(Z0, beta_inv, ADD, MULTIPLY, *args) sigma_prime_i = POLY_EVAL(sigma_prime, beta_inv, ADD, MULTIPLY, *args) delta_i = MULTIPLY(SUBTRACT(0, Z0_i, *args), RECIPROCAL(sigma_prime_i, *args), *args) dec_codeword[i, n - 1 - error_locations[j]] = SUBTRACT(dec_codeword[i, n - 1 - error_locations[j]], delta_i, *args) dec_codeword[i, -1] = v # The number of corrected errors return dec_codeword