from __future__ import print_function, unicode_literals, absolute_import
import hashlib
from os import urandom
import sys
from struct import Struct
if sys.version_info >= (3, 0):
unicode = str
short_map = map
else:
from itertools import imap as short_map
try:
from bytesop import op_xor
except ImportError:
from hashcrypto.bytesop_fallback import op_xor
__version__ = "0.4"
Q = Struct(b"<Q")
B = Struct(b"<B")
[docs]def pack_plus(b):
if isinstance(b, unicode):
b = b.encode("ascii")
if len(b) >= 0xFF:
return B.pack(0xFF) + b[:0xFF] + pack_plus(b[0xFF:])
return B.pack(len(b)) + b
[docs]def unpack_plus(b):
size = B.unpack(b[0])[0]
if size >= 0xFF:
return b[1:256] + unpack_plus(b[256:])
return b[1:size + 1]
[docs]def read_plus(f):
size = B.unpack(f.read(1))[0]
if size >= 0xFF:
return f.read(size) + read_plus(f)
return f.read(size)
[docs]class IVError(ValueError):
pass
fast_lookup = {"md5": hashlib.md5, "sha1": hashlib.sha1, "sha224": hashlib.sha224,
"sha256": hashlib.sha256, "sha384": hashlib.sha384, "sha512": hashlib.sha512}
[docs]def read_file(infile, block_size):
b = infile.read(block_size)
while b:
yield b
b = infile.read(block_size)
[docs]class HashCrypt(object):
def __init__(self, key, hash_constructor=hashlib.sha512):
if isinstance(hash_constructor, (unicode, bytes)):
h = hash_constructor
n = hashlib.new
hash_constructor = fast_lookup.get(h, (lambda b: n(h, b)))
self.hash = hash_constructor
self.key = key
self.block_size = hash_constructor().digest_size
[docs] def block(self, b):
return self.hash(self.key + b).digest()
[docs] def encrypt_file(self, infile, outfile, *args, **kwargs):
outfile.write(self.header())
self.encrypt_stream(infile, outfile, *args, **kwargs)
[docs] def encrypt_stream(self, infile, outfile, *args, **kwargs):
for block in self.encrypt(read_file(infile, self.block_size), *args, **kwargs):
outfile.write(block)
[docs] def decrypt_stream(self, infile, outfile, *args, **kwargs):
for block in self.decrypt(read_file(infile, self.block_size), *args, **kwargs):
outfile.write(block)
[docs] def hash_name(self):
return self.hash().name
@classmethod
[docs] def suggest_key_size(cls, hash_constructor):
return max(hash_constructor().block_size, 32)
[docs]class WithIV(HashCrypt):
def __init__(self, key, hash_constructor=hashlib.sha512, start_iv=None):
super(WithIV, self).__init__(key, hash_constructor)
if start_iv is None:
start_iv = self.make_iv(hash_constructor)
self.start_iv = start_iv
@classmethod
[docs] def suggest_iv_size(cls, hash_constructor):
return max(hash_constructor().block_size, 32)
@classmethod
[docs] def make_iv(cls, hash_constructor):
return urandom(cls.suggest_iv_size(hash_constructor))
[docs]class WithNonce(HashCrypt):
def __init__(self, key, hash_constructor=hashlib.sha512, nonce=None):
super(WithNonce, self).__init__(key, hash_constructor)
if nonce is None:
nonce = self.make_nonce(hash_constructor)
self.nonce = nonce
@classmethod
[docs] def make_nonce(cls, hash_constructor=hashlib.sha512):
return urandom(cls.suggest_nonce_size(hash_constructor))
[docs]class CTR(WithNonce):
[docs] def keystream(self, counter_start):
counter = counter_start
while True:
yield self.block(self.nonce + Q.pack(counter))
counter += 1
[docs] def encrypt(self, plain_blocks, counter_start=0):
return short_map(op_xor, plain_blocks, self.keystream(counter_start))
decrypt = encrypt
[docs] def encrypt_block(self, b, counter):
if len(b) != self.block_size:
raise ValueError("Wrong size for input", len(b))
d = self.block(self.nonce + Q.pack(counter))
return op_xor(b, d)
decrypt_block = encrypt_block
@classmethod
[docs] def suggest_nonce_size(cls, hash_constructor):
return max(hash_constructor().block_size - Q.size, 32)
[docs]class OFB(WithIV):
[docs] def keystream(self, iv=None):
if iv is None:
iv = self.start_iv
if iv is None:
raise IVError(
"If iv is omitted or None, self.start_iv must be set.")
while True:
iv = self.block(iv)
yield iv
[docs] def encrypt(self, plain_blocks, iv=None):
if iv is None:
iv = self.start_iv
if iv is None:
raise IVError(
"If iv is omitted or None, self.start_iv must be set.")
return short_map(op_xor, plain_blocks, self.keystream(iv))
decrypt = encrypt
[docs]class CFB(WithIV):
[docs] def encrypt(self, plain_blocks, iv=None):
if iv is None:
iv = self.start_iv
if iv is None:
raise IVError(
"If iv is omitted or None, self.start_iv must be set.")
for b in plain_blocks:
d = self.block(iv)
iv = op_xor(b, d)
yield iv
[docs] def decrypt(self, cipher_blocks, iv=None):
if iv is None:
iv = self.start_iv
if iv is None:
raise IVError(
"If iv is omitted or None, self.start_iv must be set.")
for b in cipher_blocks:
d = self.block(iv)
yield op_xor(b, d)
iv = b
[docs] def decrypt_block(self, b, iv):
d = self.block(iv)
return op_xor(b, d)
MODES = {"CTR": CTR, "OFB": OFB, "CFB": CFB}
[docs]def decrypt_file(infile, outfile, key):
if infile.read(9) != b"HASHCRYPT":
raise ValueError("Bad Header")
ver = read_plus(infile).decode("ascii")
mode = read_plus(infile).decode("ascii")
h = read_plus(infile).decode("ascii")
iv_nonce = read_plus(infile)
obj = MODES[mode](key, h, iv_nonce)
obj.decrypt_stream(infile, outfile)