Source code for hashcrypto

from __future__ import print_function, unicode_literals, absolute_import

import hashlib
from os import urandom
import sys
from struct import Struct

if sys.version_info >= (3, 0):
    unicode = str
    short_map = map
else:
    from itertools import imap as short_map

try:
    from bytesop import op_xor
except ImportError:
    from hashcrypto.bytesop_fallback import op_xor


__version__ = "0.4"


Q = Struct(b"<Q")
B = Struct(b"<B")


[docs]def pack_plus(b): if isinstance(b, unicode): b = b.encode("ascii") if len(b) >= 0xFF: return B.pack(0xFF) + b[:0xFF] + pack_plus(b[0xFF:]) return B.pack(len(b)) + b
[docs]def unpack_plus(b): size = B.unpack(b[0])[0] if size >= 0xFF: return b[1:256] + unpack_plus(b[256:]) return b[1:size + 1]
[docs]def read_plus(f): size = B.unpack(f.read(1))[0] if size >= 0xFF: return f.read(size) + read_plus(f) return f.read(size)
[docs]class IVError(ValueError): pass
fast_lookup = {"md5": hashlib.md5, "sha1": hashlib.sha1, "sha224": hashlib.sha224, "sha256": hashlib.sha256, "sha384": hashlib.sha384, "sha512": hashlib.sha512}
[docs]def read_file(infile, block_size): b = infile.read(block_size) while b: yield b b = infile.read(block_size)
[docs]class HashCrypt(object): def __init__(self, key, hash_constructor=hashlib.sha512): if isinstance(hash_constructor, (unicode, bytes)): h = hash_constructor n = hashlib.new hash_constructor = fast_lookup.get(h, (lambda b: n(h, b))) self.hash = hash_constructor self.key = key self.block_size = hash_constructor().digest_size
[docs] def block(self, b): return self.hash(self.key + b).digest()
[docs] def encrypt_file(self, infile, outfile, *args, **kwargs): outfile.write(self.header()) self.encrypt_stream(infile, outfile, *args, **kwargs)
[docs] def encrypt_stream(self, infile, outfile, *args, **kwargs): for block in self.encrypt(read_file(infile, self.block_size), *args, **kwargs): outfile.write(block)
[docs] def decrypt_stream(self, infile, outfile, *args, **kwargs): for block in self.decrypt(read_file(infile, self.block_size), *args, **kwargs): outfile.write(block)
[docs] def header(self): return b"HASHCRYPT" + (b"".join(short_map(pack_plus, (__version__, self.__class__.__name__, self.hash_name()))))
[docs] def hash_name(self): return self.hash().name
@classmethod
[docs] def suggest_key_size(cls, hash_constructor): return max(hash_constructor().block_size, 32)
[docs]class WithIV(HashCrypt): def __init__(self, key, hash_constructor=hashlib.sha512, start_iv=None): super(WithIV, self).__init__(key, hash_constructor) if start_iv is None: start_iv = self.make_iv(hash_constructor) self.start_iv = start_iv
[docs] def header(self): return super(WithIV, self).header() + pack_plus(self.start_iv)
@classmethod
[docs] def suggest_iv_size(cls, hash_constructor): return max(hash_constructor().block_size, 32)
@classmethod
[docs] def make_iv(cls, hash_constructor): return urandom(cls.suggest_iv_size(hash_constructor))
[docs]class WithNonce(HashCrypt): def __init__(self, key, hash_constructor=hashlib.sha512, nonce=None): super(WithNonce, self).__init__(key, hash_constructor) if nonce is None: nonce = self.make_nonce(hash_constructor) self.nonce = nonce
[docs] def header(self): return super(WithNonce, self).header() + pack_plus(self.nonce)
@classmethod
[docs] def make_nonce(cls, hash_constructor=hashlib.sha512): return urandom(cls.suggest_nonce_size(hash_constructor))
[docs]class CTR(WithNonce):
[docs] def keystream(self, counter_start): counter = counter_start while True: yield self.block(self.nonce + Q.pack(counter)) counter += 1
[docs] def encrypt(self, plain_blocks, counter_start=0): return short_map(op_xor, plain_blocks, self.keystream(counter_start))
decrypt = encrypt
[docs] def encrypt_block(self, b, counter): if len(b) != self.block_size: raise ValueError("Wrong size for input", len(b)) d = self.block(self.nonce + Q.pack(counter)) return op_xor(b, d)
decrypt_block = encrypt_block @classmethod
[docs] def suggest_nonce_size(cls, hash_constructor): return max(hash_constructor().block_size - Q.size, 32)
[docs]class OFB(WithIV):
[docs] def keystream(self, iv=None): if iv is None: iv = self.start_iv if iv is None: raise IVError( "If iv is omitted or None, self.start_iv must be set.") while True: iv = self.block(iv) yield iv
[docs] def encrypt(self, plain_blocks, iv=None): if iv is None: iv = self.start_iv if iv is None: raise IVError( "If iv is omitted or None, self.start_iv must be set.") return short_map(op_xor, plain_blocks, self.keystream(iv))
decrypt = encrypt
[docs]class CFB(WithIV):
[docs] def encrypt(self, plain_blocks, iv=None): if iv is None: iv = self.start_iv if iv is None: raise IVError( "If iv is omitted or None, self.start_iv must be set.") for b in plain_blocks: d = self.block(iv) iv = op_xor(b, d) yield iv
[docs] def decrypt(self, cipher_blocks, iv=None): if iv is None: iv = self.start_iv if iv is None: raise IVError( "If iv is omitted or None, self.start_iv must be set.") for b in cipher_blocks: d = self.block(iv) yield op_xor(b, d) iv = b
[docs] def decrypt_block(self, b, iv): d = self.block(iv) return op_xor(b, d)
MODES = {"CTR": CTR, "OFB": OFB, "CFB": CFB}
[docs]def decrypt_file(infile, outfile, key): if infile.read(9) != b"HASHCRYPT": raise ValueError("Bad Header") ver = read_plus(infile).decode("ascii") mode = read_plus(infile).decode("ascii") h = read_plus(infile).decode("ascii") iv_nonce = read_plus(infile) obj = MODES[mode](key, h, iv_nonce) obj.decrypt_stream(infile, outfile)