mirror of
https://github.com/Ikatono/CommTools.git
synced 2025-10-28 20:45:33 -05:00
comvolutional encoder/viterbi
This commit is contained in:
276
commtools.py
276
commtools.py
@@ -5,7 +5,9 @@ from math import log2
|
||||
#import sympy as sp
|
||||
from sympy.polys.domains import ZZ
|
||||
from sympy.polys.galoistools import gf_factor
|
||||
import itertools as it
|
||||
|
||||
#TODO rewrite this, no longer accurate
|
||||
#basic structure of objects used in this package
|
||||
#feed() is used to send data to the object
|
||||
#it stores whatever state data it needs to determine future bits
|
||||
@@ -17,8 +19,8 @@ from sympy.polys.galoistools import gf_factor
|
||||
#Hard decoders return 1 and -1. True and False are interpretted as 1 and -1 respectively
|
||||
#some objects allow other data types, but they may behave strangely if you use Iterables as single data points
|
||||
#because of this, DO NOT USE STRINGS AS DATA POINTS
|
||||
class CommObject:
|
||||
def feed(self):
|
||||
class CommObject(object):
|
||||
def feed(self, data=()):
|
||||
raise NotImplementedError()
|
||||
|
||||
def clear(self):
|
||||
@@ -27,6 +29,13 @@ class CommObject:
|
||||
def getinverse(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def load(self, data):
|
||||
self.buff.add(data)
|
||||
|
||||
class Decoder(object):
|
||||
def geterrors(self):
|
||||
return self.errors.pop(-1)
|
||||
|
||||
#turns a sequence of CommObjects into a single CommObject
|
||||
class Chain(CommObject):
|
||||
def __init__(self, comms):
|
||||
@@ -38,11 +47,10 @@ class Chain(CommObject):
|
||||
self.comms = list(comms)
|
||||
|
||||
#feeds data through each CommObject in sequence
|
||||
def feed(self, data):
|
||||
if not isinstance(data, Iterable):
|
||||
out = (data,)
|
||||
else:
|
||||
out = data
|
||||
def feed(self, data=()):
|
||||
#putsdata through a buffer first because all CommObjects should support load()
|
||||
self.load(data)
|
||||
out = self.buff.pop(-1)
|
||||
|
||||
for i in self.comms:
|
||||
out = i.feed(out)
|
||||
@@ -71,7 +79,7 @@ class LinearBlockEncoder(CommObject):
|
||||
self.buff = Queue()
|
||||
self.dist = self.distance(self.gen)
|
||||
|
||||
def feed(self, data):
|
||||
def feed(self, data=()):
|
||||
self.buff.add(data)
|
||||
out = []
|
||||
while len(self.buff) >= self.datalen:
|
||||
@@ -86,7 +94,6 @@ class LinearBlockEncoder(CommObject):
|
||||
def checkmat(self):
|
||||
if not self.standard(self.gen):
|
||||
raise ValueError('Currently only standard matrices can automatically generate check matrices')
|
||||
raise ValueError('Currently only standard matrices can automatically generate check matrices')
|
||||
(h,w) = self.gen.shape
|
||||
return np.concatenate((self.gen[:,h:].T, np.identity(w-h)), 1)
|
||||
|
||||
@@ -109,6 +116,7 @@ class LinearBlockEncoder(CommObject):
|
||||
|
||||
#literally useless, why did I make this?
|
||||
#LINEAR ALGEBRA MAKES LINEAR CODES!
|
||||
#TODO rework this to test linear independence of rows (copy, tostandard()?)
|
||||
@staticmethod
|
||||
def validate(mat):
|
||||
if not isinstance(mat, np.matrix):
|
||||
@@ -126,10 +134,133 @@ class LinearBlockEncoder(CommObject):
|
||||
return False
|
||||
return True
|
||||
|
||||
#implements single-shift register convolutional encoder
|
||||
#each row of mat represents an output
|
||||
#columns represent input, delay1, delay2... in that order
|
||||
#recursive follows the same structure, always start it with a 1
|
||||
class ConvolutionalEncoder(CommObject):
|
||||
def __init__(self, mat, recursive=None):
|
||||
if isinstance(mat, np.matrix):
|
||||
self.mat = mat
|
||||
else:
|
||||
self.mat = np.matrix(mat)
|
||||
self.shiftlen = len(self.mat.T)-1
|
||||
if recursive is None:
|
||||
self.recursive = np.matrix([1] + [0]*self.shiftlen)
|
||||
else:
|
||||
self.recursive = np.matrix(recursive)
|
||||
self.reg = deque([0]*self.shiftlen,self.shiftlen)
|
||||
|
||||
def feed(self, data=()):
|
||||
if not isinstance(data, Iterable):
|
||||
data = (data,)
|
||||
#TODO replace with empty list of correct size
|
||||
out = deque([])
|
||||
for i in data:
|
||||
state = np.matrix([i]+list(self.reg)).T
|
||||
outs = mul(self.mat, state)
|
||||
self.reg.appendleft(mul(self.recursive, state)[0,0])
|
||||
#turn vertical vector into list
|
||||
outs = [i[0] for i in outs.tolist()]
|
||||
out.extend(outs)
|
||||
return list(out)
|
||||
|
||||
def flush(self):
|
||||
return self.feed([0]*self.shiftlen)
|
||||
|
||||
#relatively slow maximum-likelyhood decoder for convolutional codes
|
||||
#ViterbiDecoder( convEncoder ) automatically creates a decoder for convEncoder
|
||||
#ViterbiDecoder( matrix [, recursive [, puncturing ]])
|
||||
class ViterbiDecoder(CommObject):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.buff = Queue()
|
||||
if len(args) < 1:
|
||||
raise TypeError('class ViterbiDecoder requires at least one argument, received 0')
|
||||
#TODO change this logic for new init format
|
||||
if isinstance(args[0], ConvolutionalEncoder):
|
||||
self.mat = args[0].mat
|
||||
self.recursive = args[0].recursive
|
||||
self.shiftlen = args[0].shiftlen
|
||||
self.block = args[0].block
|
||||
else:
|
||||
self.mat = np.matrix(args[0])
|
||||
if len(args) >=2:
|
||||
self.recursive = np.matrix(args[1])
|
||||
if not self.recursive:
|
||||
self.recursive = np.matrix([1] + [0]*self.mat.shape[1])
|
||||
else:
|
||||
self.recursive = np.matrix([1] + [0]*self.mat.shape[1])
|
||||
self.shiftlen = self.mat.shape[1]-1
|
||||
#trellis forward and reverse links
|
||||
self.tf = [None]*self.shiftlen
|
||||
self.tr = [[]]*self.shiftlen
|
||||
self.datawords = [None] * 2**self.shiftlen
|
||||
self.codewords = list(self.datawords)
|
||||
for i in range(2**self.shiftlen):
|
||||
imat = tobin(i)
|
||||
datawords[i] = tobin(i, self.shiftlen+1).T[0]
|
||||
codewords[i] = mul(self.mat, datawords[i])
|
||||
#bit entering shift register if input is 0
|
||||
shiftin = mul(self.recursive[0,1:],imat)
|
||||
if shiftin:
|
||||
self.tf[i] = [i//2, i//2 + 2**(self.shiftlen-1)]
|
||||
self.tr[i//2].append(i)
|
||||
self.tr[i//2 + 2**(self.shiftlen-1)].append(i)
|
||||
else:
|
||||
self.tf[i] = [i//2 + 2**(self.shiftlen-1), i//2]
|
||||
self.tr[i//2].append(i)
|
||||
self.tr[i//2 + 2**(self.shiftlen-1)].append(i)
|
||||
self.dists = [(0,None)] + [(float('inf'),None)]*(2**self.shiftlen - 2)
|
||||
#use deque for efficient appends, plus it works without specified block length
|
||||
self.trell = deque()
|
||||
#self.trellc = deque()
|
||||
|
||||
#TODO make all data go through same trellis-building code
|
||||
#TODO make single function to build maximum-likelyhood list
|
||||
#TODO make trellv only record links forward, store a SINGLE column of net distances elsewhere
|
||||
def feed(self, data):
|
||||
self.buff.add(data)
|
||||
out = []
|
||||
while self.block and len(self.buff) + len(self.trellc) >= self.block*self.mat.size[0]:
|
||||
l = self.buff.pop(self.block*self.mat.size[0] - len(self.buff) - len(self.trellc))
|
||||
blockdata = [l[i:i + n] for i in range(0, len(l), n)]
|
||||
for dat in len(blockdata):
|
||||
trellcol = [None]*len(self.datawords)
|
||||
#TODO implement more efficient euclidean distance (calculate distance from 1/0 beforehand)
|
||||
for num in range(len(datawords)):
|
||||
#chooses reverse link with least distance
|
||||
choices = [self.trell[-1][i] for i in self.tr[num]]
|
||||
#TODO implement random decision if reverse links have equal distance
|
||||
choice = self.tr[num][1 * (choices[1] > choices[0])]
|
||||
dist = edist(tobinl(num), blockdata)
|
||||
dist = dist + self.trell[-1][choice]
|
||||
trellcol[num] = (dist, choice)
|
||||
self.trell.append(trellcol)
|
||||
#TODO make this better
|
||||
links = [self.trell[-1][i] for i in self.tr[self.trell[-1].index(min(self.trell[-1]))]]
|
||||
statelist = [None]*len(self.block)
|
||||
statelist[0] = index(min(self.trell[-1]))
|
||||
ind = min(self.trell[-1])[1]
|
||||
|
||||
for i, col in it.izip(it.count(), reversed(self.trell)):
|
||||
#TODO find a better way to skip first index of trell
|
||||
if not i:
|
||||
continue
|
||||
statelist[i] = col[ind][1]
|
||||
ind = col[ind][1]
|
||||
|
||||
|
||||
def clear(self):
|
||||
self.buff = Queue()
|
||||
trellfirst = [0] + [float('inf')]*(2**self.shiftlen - 2)
|
||||
self.trell = deque((trellfirst,))
|
||||
#self.trellc = deque()
|
||||
|
||||
|
||||
#return behavior is only defined for syndromes with weight <= (d-1)/2
|
||||
#where d is the min Hamming distance of the generator matrix
|
||||
class LinearBlockDecoder(CommObject):
|
||||
def __init__(self, chk, error=False):
|
||||
def __init__(self, gen=None, chk=None, error=False):
|
||||
if not isinstance(chk, np.matrix):
|
||||
chk = np.matrix(chk)
|
||||
self.chk = chk
|
||||
@@ -137,12 +268,14 @@ class LinearBlockDecoder(CommObject):
|
||||
self.codelen = self.gen.shape[1]
|
||||
self.datalen = self.codelen - self.gen.shape[0]
|
||||
|
||||
def feed(self, data):
|
||||
def feed(self, data=()):
|
||||
self.buff.append(data)
|
||||
out = Queue()
|
||||
while len(self.buff) >= self.codelen:
|
||||
code = np.matrix(self.buff.pop(self.codelen)).T
|
||||
synd = self.chk * code
|
||||
#data =
|
||||
#TODO finish this
|
||||
|
||||
#creates a cyclic encoder, a type of linear block code
|
||||
#can be used in two different ways:
|
||||
#CyclicEncoder(n, k) creates an [n,k] code from a code polynomial of max distance
|
||||
@@ -158,6 +291,10 @@ class CyclicEncoder(LinearBlockEncoder):
|
||||
tostand = kwargs['standard']
|
||||
else:
|
||||
tostand = True
|
||||
if 'left' in kwargs:
|
||||
left = kwargs['left']
|
||||
else:
|
||||
left = False
|
||||
if isinstance(args[0], list):
|
||||
#TODO implement polynomial loading
|
||||
pass
|
||||
@@ -166,8 +303,6 @@ class CyclicEncoder(LinearBlockEncoder):
|
||||
outerpoly = [1] + [0]*(n-1) + [1]
|
||||
factors = gf_factor(ZZ.map(outerpoly), 2, ZZ)
|
||||
doms = [len(factor[0])-1 for factor in factors[1]]
|
||||
#print(factors)
|
||||
#print(doms)
|
||||
if n-k not in doms:
|
||||
raise ValueError('there is no cyclic code with these values')
|
||||
#list of all factors of the correct order
|
||||
@@ -191,15 +326,22 @@ class CyclicEncoder(LinearBlockEncoder):
|
||||
self.buff = Queue()
|
||||
self.datalen = self.gen.shape[0]
|
||||
self.codelen = self.gen.shape[1]
|
||||
if tostand:
|
||||
tostandard(self.gen, left)
|
||||
|
||||
def getinverse(self, errors=False):
|
||||
pass
|
||||
|
||||
#TODO more efficiect, cycleic-specific feee() algorithm
|
||||
|
||||
#TODO make this
|
||||
class HammingEncoder(LinearBlockEncoder):
|
||||
def __init__(self, paritybits=3):
|
||||
pass
|
||||
|
||||
|
||||
#basic queue structure for CommObject buffers
|
||||
class Queue:
|
||||
#basic deque-based FIFO structure for CommObject buffers
|
||||
class Queue(object):
|
||||
def __init__(self, data=()):
|
||||
self.data = deque(data)
|
||||
|
||||
@@ -220,6 +362,9 @@ class Queue:
|
||||
self.data.append(data)
|
||||
|
||||
def pop(self, num=1):
|
||||
#-1 pops all, -2 all but one, etc.
|
||||
if num < 0:
|
||||
num = max(0, len(self.data)+num+1)
|
||||
if num > len(self.data):
|
||||
raise IndexError('Cannot pop more values than are in queue')
|
||||
|
||||
@@ -242,6 +387,37 @@ def mul(x1, x2):
|
||||
x2 = np.matrix(x2)
|
||||
return x1.dot(x2) % 2
|
||||
|
||||
#DEPRECATED: I'm actually better off not indexing so a deque is fine
|
||||
#very basic shift register for use in convolutional codes
|
||||
#size is determined by the size of the iterable passed at initialization
|
||||
#supports indexing arbitrary values by list or tuple
|
||||
#pushing data is O(1), indexing is O(1)
|
||||
class ShiftRegister(object):
|
||||
def __init__(self, data):
|
||||
self.length = len(data)
|
||||
self.data = [i for i in data]
|
||||
self.ind = 0
|
||||
|
||||
def __len__(self):
|
||||
return self.length
|
||||
|
||||
def __getitem__(self, key):
|
||||
if isinstance(key, int):
|
||||
return self.data[(key+self.ind)%len(self)]
|
||||
elif isinstance(key, (tuple, list)):
|
||||
return [self.data[(i+self.ind)%len(self)] for i in key]
|
||||
#TODO implement slice indexing
|
||||
|
||||
|
||||
def __iter__(self):
|
||||
yield [self.data[(i+self.ind)%len(self)] for i in range(len(self))]
|
||||
|
||||
def push(self, datum):
|
||||
self.ind -= 1
|
||||
if self.ind == -1:
|
||||
self.ind = len(self)-1
|
||||
self.data[self.ind] = datum
|
||||
|
||||
#converts each value to 1 (if True or >0) or 0
|
||||
#for use with hard decoders
|
||||
#TODO finish this
|
||||
@@ -269,22 +445,78 @@ def tobin(num, bits=0):
|
||||
num -= 2**i
|
||||
return np.matrix(out).T
|
||||
|
||||
def tobinl(num, bits=0):
|
||||
if num == 0:
|
||||
return np.matrix([0] * max(bits,1)).T
|
||||
if not isinstance(num, int):
|
||||
raise TypeError('tobin only takes integers')
|
||||
bitsmax = int(log2(num))+1
|
||||
if not bits:
|
||||
bits = bitsmax
|
||||
if bitsmax > bits:
|
||||
raise ValueError('insufficient bits')
|
||||
out = [0] * bits
|
||||
for i in range(bitsmax-1,-1,-1):
|
||||
if num >= 2**i:
|
||||
out[i] = 1
|
||||
num -= 2**i
|
||||
return out
|
||||
|
||||
#I stole this from stackoverflow
|
||||
#https://stackoverflow.com/questions/5488307/numpy-array-in-python-list/
|
||||
#thanks Sven Marnach!
|
||||
def arreq_in_list(myarr, list_arrays):
|
||||
return any((myarr == x).all() for x in list_arrays)
|
||||
|
||||
#adds row a to row b, mod 2
|
||||
#adds row a to row b, mod 2, in place
|
||||
def rowadd(mat, a, b):
|
||||
mat[a] += mat[b]
|
||||
mat %= 2
|
||||
|
||||
#calculates euclidean distance for two iterables
|
||||
def edist(x1, x2):
|
||||
if not len(x1) == len(x2):
|
||||
raise ValueError('iterables must have same length')
|
||||
dist = 0
|
||||
for i in range(x1):
|
||||
dist += (x1[i]-x2[i])**2
|
||||
return dist**.5
|
||||
|
||||
#TODO finish this
|
||||
#left=True if identity matrix is on the left side
|
||||
def standard(mat, left):
|
||||
k = mat.shape[1]
|
||||
for i in mat.T[:k]:
|
||||
if not i.any():
|
||||
pass
|
||||
def tostandard(mat, left):
|
||||
k, n = mat.shape
|
||||
#TODO validate input
|
||||
#for i in mat.T:
|
||||
#if not i.any():
|
||||
#pass
|
||||
#first column of identity matrix
|
||||
s = (n-k) * (not left)
|
||||
for i in range(k):
|
||||
#places a 1 in the appropriate cell if necessary
|
||||
if not mat[i,s+i]:
|
||||
for j in [l for l in range(k) if l != i]:
|
||||
if mat[i,s+j]:
|
||||
rowadd(mat, j, i)
|
||||
break
|
||||
#places 0s in the rest of the column
|
||||
for j in [l for l in range(k) if l != i]:
|
||||
if mat[i,s+j]:
|
||||
rowadd(mat, i, j)
|
||||
|
||||
#0 no; 1 left; 2 right
|
||||
def isstandard(mat):
|
||||
n, k = mat.shape
|
||||
#test left
|
||||
left = True
|
||||
for i in range(k):
|
||||
if not (mat[i,i] and mat[:,i].sum() == 1):
|
||||
left = False
|
||||
break
|
||||
if left:
|
||||
return 1
|
||||
s = n-k
|
||||
for i in range(k):
|
||||
if not (mat[i,s+i] and mat[:,s+i].sum() == 1):
|
||||
return 0
|
||||
return 2
|
||||
|
||||
Reference in New Issue
Block a user