Commit e5aafc34 by 赵增煜

新增lib

parent 7ace76e2

Too many changes to show.

To preserve performance only 1000 of 1000+ files are displayed.

/* -*- indent-tabs-mode: nil; tab-width: 4; -*- */
/* Greenlet object interface */
#ifndef Py_GREENLETOBJECT_H
#define Py_GREENLETOBJECT_H
#include <Python.h>
#ifdef __cplusplus
extern "C" {
#endif
/* This is deprecated and undocumented. It does not change. */
#define GREENLET_VERSION "1.0.0"
#ifndef GREENLET_MODULE
#define implementation_ptr_t void*
#endif
typedef struct _greenlet {
PyObject_HEAD
PyObject* weakreflist;
PyObject* dict;
implementation_ptr_t pimpl;
} PyGreenlet;
#define PyGreenlet_Check(op) (op && PyObject_TypeCheck(op, &PyGreenlet_Type))
/* C API functions */
/* Total number of symbols that are exported */
#define PyGreenlet_API_pointers 12
#define PyGreenlet_Type_NUM 0
#define PyExc_GreenletError_NUM 1
#define PyExc_GreenletExit_NUM 2
#define PyGreenlet_New_NUM 3
#define PyGreenlet_GetCurrent_NUM 4
#define PyGreenlet_Throw_NUM 5
#define PyGreenlet_Switch_NUM 6
#define PyGreenlet_SetParent_NUM 7
#define PyGreenlet_MAIN_NUM 8
#define PyGreenlet_STARTED_NUM 9
#define PyGreenlet_ACTIVE_NUM 10
#define PyGreenlet_GET_PARENT_NUM 11
#ifndef GREENLET_MODULE
/* This section is used by modules that uses the greenlet C API */
static void** _PyGreenlet_API = NULL;
# define PyGreenlet_Type \
(*(PyTypeObject*)_PyGreenlet_API[PyGreenlet_Type_NUM])
# define PyExc_GreenletError \
((PyObject*)_PyGreenlet_API[PyExc_GreenletError_NUM])
# define PyExc_GreenletExit \
((PyObject*)_PyGreenlet_API[PyExc_GreenletExit_NUM])
/*
* PyGreenlet_New(PyObject *args)
*
* greenlet.greenlet(run, parent=None)
*/
# define PyGreenlet_New \
(*(PyGreenlet * (*)(PyObject * run, PyGreenlet * parent)) \
_PyGreenlet_API[PyGreenlet_New_NUM])
/*
* PyGreenlet_GetCurrent(void)
*
* greenlet.getcurrent()
*/
# define PyGreenlet_GetCurrent \
(*(PyGreenlet * (*)(void)) _PyGreenlet_API[PyGreenlet_GetCurrent_NUM])
/*
* PyGreenlet_Throw(
* PyGreenlet *greenlet,
* PyObject *typ,
* PyObject *val,
* PyObject *tb)
*
* g.throw(...)
*/
# define PyGreenlet_Throw \
(*(PyObject * (*)(PyGreenlet * self, \
PyObject * typ, \
PyObject * val, \
PyObject * tb)) \
_PyGreenlet_API[PyGreenlet_Throw_NUM])
/*
* PyGreenlet_Switch(PyGreenlet *greenlet, PyObject *args)
*
* g.switch(*args, **kwargs)
*/
# define PyGreenlet_Switch \
(*(PyObject * \
(*)(PyGreenlet * greenlet, PyObject * args, PyObject * kwargs)) \
_PyGreenlet_API[PyGreenlet_Switch_NUM])
/*
* PyGreenlet_SetParent(PyObject *greenlet, PyObject *new_parent)
*
* g.parent = new_parent
*/
# define PyGreenlet_SetParent \
(*(int (*)(PyGreenlet * greenlet, PyGreenlet * nparent)) \
_PyGreenlet_API[PyGreenlet_SetParent_NUM])
/*
* PyGreenlet_GetParent(PyObject* greenlet)
*
* return greenlet.parent;
*
* This could return NULL even if there is no exception active.
* If it does not return NULL, you are responsible for decrementing the
* reference count.
*/
# define PyGreenlet_GetParent \
(*(PyGreenlet* (*)(PyGreenlet*)) \
_PyGreenlet_API[PyGreenlet_GET_PARENT_NUM])
/*
* deprecated, undocumented alias.
*/
# define PyGreenlet_GET_PARENT PyGreenlet_GetParent
# define PyGreenlet_MAIN \
(*(int (*)(PyGreenlet*)) \
_PyGreenlet_API[PyGreenlet_MAIN_NUM])
# define PyGreenlet_STARTED \
(*(int (*)(PyGreenlet*)) \
_PyGreenlet_API[PyGreenlet_STARTED_NUM])
# define PyGreenlet_ACTIVE \
(*(int (*)(PyGreenlet*)) \
_PyGreenlet_API[PyGreenlet_ACTIVE_NUM])
/* Macro that imports greenlet and initializes C API */
/* NOTE: This has actually moved to ``greenlet._greenlet._C_API``, but we
keep the older definition to be sure older code that might have a copy of
the header still works. */
# define PyGreenlet_Import() \
{ \
_PyGreenlet_API = (void**)PyCapsule_Import("greenlet._C_API", 0); \
}
#endif /* GREENLET_MODULE */
#ifdef __cplusplus
}
#endif
#endif /* !Py_GREENLETOBJECT_H */
# -*- coding: utf-8 -*-
#
# Cipher/AES.py : AES
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
import sys
from Crypto.Cipher import _create_cipher
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, SmartPointer,
c_size_t, c_uint8_ptr)
from Crypto.Util import _cpu_features
from Crypto.Random import get_random_bytes
MODE_ECB = 1 #: Electronic Code Book (:ref:`ecb_mode`)
MODE_CBC = 2 #: Cipher-Block Chaining (:ref:`cbc_mode`)
MODE_CFB = 3 #: Cipher Feedback (:ref:`cfb_mode`)
MODE_OFB = 5 #: Output Feedback (:ref:`ofb_mode`)
MODE_CTR = 6 #: Counter mode (:ref:`ctr_mode`)
MODE_OPENPGP = 7 #: OpenPGP mode (:ref:`openpgp_mode`)
MODE_CCM = 8 #: Counter with CBC-MAC (:ref:`ccm_mode`)
MODE_EAX = 9 #: :ref:`eax_mode`
MODE_SIV = 10 #: Synthetic Initialization Vector (:ref:`siv_mode`)
MODE_GCM = 11 #: Galois Counter Mode (:ref:`gcm_mode`)
MODE_OCB = 12 #: Offset Code Book (:ref:`ocb_mode`)
_cproto = """
int AES_start_operation(const uint8_t key[],
size_t key_len,
void **pResult);
int AES_encrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int AES_decrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int AES_stop_operation(void *state);
"""
# Load portable AES
_raw_aes_lib = load_pycryptodome_raw_lib("Crypto.Cipher._raw_aes",
_cproto)
# Try to load AES with AES NI instructions
try:
_raw_aesni_lib = None
if _cpu_features.have_aes_ni():
_raw_aesni_lib = load_pycryptodome_raw_lib("Crypto.Cipher._raw_aesni",
_cproto.replace("AES",
"AESNI"))
# _raw_aesni may not have been compiled in
except OSError:
pass
def _create_base_cipher(dict_parameters):
"""This method instantiates and returns a handle to a low-level
base cipher. It will absorb named parameters in the process."""
use_aesni = dict_parameters.pop("use_aesni", True)
try:
key = dict_parameters.pop("key")
except KeyError:
raise TypeError("Missing 'key' parameter")
if len(key) not in key_size:
raise ValueError("Incorrect AES key length (%d bytes)" % len(key))
if use_aesni and _raw_aesni_lib:
start_operation = _raw_aesni_lib.AESNI_start_operation
stop_operation = _raw_aesni_lib.AESNI_stop_operation
else:
start_operation = _raw_aes_lib.AES_start_operation
stop_operation = _raw_aes_lib.AES_stop_operation
cipher = VoidPointer()
result = start_operation(c_uint8_ptr(key),
c_size_t(len(key)),
cipher.address_of())
if result:
raise ValueError("Error %X while instantiating the AES cipher"
% result)
return SmartPointer(cipher.get(), stop_operation)
def _derive_Poly1305_key_pair(key, nonce):
"""Derive a tuple (r, s, nonce) for a Poly1305 MAC.
If nonce is ``None``, a new 16-byte nonce is generated.
"""
if len(key) != 32:
raise ValueError("Poly1305 with AES requires a 32-byte key")
if nonce is None:
nonce = get_random_bytes(16)
elif len(nonce) != 16:
raise ValueError("Poly1305 with AES requires a 16-byte nonce")
s = new(key[:16], MODE_ECB).encrypt(nonce)
return key[16:], s, nonce
def new(key, mode, *args, **kwargs):
"""Create a new AES cipher.
Args:
key(bytes/bytearray/memoryview):
The secret key to use in the symmetric cipher.
It must be 16 (*AES-128)*, 24 (*AES-192*) or 32 (*AES-256*) bytes long.
For ``MODE_SIV`` only, it doubles to 32, 48, or 64 bytes.
mode (a ``MODE_*`` constant):
The chaining mode to use for encryption or decryption.
If in doubt, use ``MODE_EAX``.
Keyword Args:
iv (bytes/bytearray/memoryview):
(Only applicable for ``MODE_CBC``, ``MODE_CFB``, ``MODE_OFB``,
and ``MODE_OPENPGP`` modes).
The initialization vector to use for encryption or decryption.
For ``MODE_CBC``, ``MODE_CFB``, and ``MODE_OFB`` it must be 16 bytes long.
For ``MODE_OPENPGP`` mode only,
it must be 16 bytes long for encryption
and 18 bytes for decryption (in the latter case, it is
actually the *encrypted* IV which was prefixed to the ciphertext).
If not provided, a random byte string is generated (you must then
read its value with the :attr:`iv` attribute).
nonce (bytes/bytearray/memoryview):
(Only applicable for ``MODE_CCM``, ``MODE_EAX``, ``MODE_GCM``,
``MODE_SIV``, ``MODE_OCB``, and ``MODE_CTR``).
A value that must never be reused for any other encryption done
with this key (except possibly for ``MODE_SIV``, see below).
For ``MODE_EAX``, ``MODE_GCM`` and ``MODE_SIV`` there are no
restrictions on its length (recommended: **16** bytes).
For ``MODE_CCM``, its length must be in the range **[7..13]**.
Bear in mind that with CCM there is a trade-off between nonce
length and maximum message size. Recommendation: **11** bytes.
For ``MODE_OCB``, its length must be in the range **[1..15]**
(recommended: **15**).
For ``MODE_CTR``, its length must be in the range **[0..15]**
(recommended: **8**).
For ``MODE_SIV``, the nonce is optional, if it is not specified,
then no nonce is being used, which renders the encryption
deterministic.
If not provided, for modes other than ``MODE_SIV``, a random
byte string of the recommended length is used (you must then
read its value with the :attr:`nonce` attribute).
segment_size (integer):
(Only ``MODE_CFB``).The number of **bits** the plaintext and ciphertext
are segmented in. It must be a multiple of 8.
If not specified, it will be assumed to be 8.
mac_len (integer):
(Only ``MODE_EAX``, ``MODE_GCM``, ``MODE_OCB``, ``MODE_CCM``)
Length of the authentication tag, in bytes.
It must be even and in the range **[4..16]**.
The recommended value (and the default, if not specified) is **16**.
msg_len (integer):
(Only ``MODE_CCM``). Length of the message to (de)cipher.
If not specified, ``encrypt`` must be called with the entire message.
Similarly, ``decrypt`` can only be called once.
assoc_len (integer):
(Only ``MODE_CCM``). Length of the associated data.
If not specified, all associated data is buffered internally,
which may represent a problem for very large messages.
initial_value (integer or bytes/bytearray/memoryview):
(Only ``MODE_CTR``).
The initial value for the counter. If not present, the cipher will
start counting from 0. The value is incremented by one for each block.
The counter number is encoded in big endian mode.
counter (object):
(Only ``MODE_CTR``).
Instance of ``Crypto.Util.Counter``, which allows full customization
of the counter block. This parameter is incompatible to both ``nonce``
and ``initial_value``.
use_aesni: (boolean):
Use Intel AES-NI hardware extensions (default: use if available).
Returns:
an AES object, of the applicable mode.
"""
kwargs["add_aes_modes"] = True
return _create_cipher(sys.modules[__name__], key, mode, *args, **kwargs)
# Size of a data block (in bytes)
block_size = 16
# Size of a key (in bytes)
key_size = (16, 24, 32)
from typing import Dict, Optional, Tuple, Union, overload
from typing_extensions import Literal
Buffer=bytes|bytearray|memoryview
from Crypto.Cipher._mode_ecb import EcbMode
from Crypto.Cipher._mode_cbc import CbcMode
from Crypto.Cipher._mode_cfb import CfbMode
from Crypto.Cipher._mode_ofb import OfbMode
from Crypto.Cipher._mode_ctr import CtrMode
from Crypto.Cipher._mode_openpgp import OpenPgpMode
from Crypto.Cipher._mode_ccm import CcmMode
from Crypto.Cipher._mode_eax import EaxMode
from Crypto.Cipher._mode_gcm import GcmMode
from Crypto.Cipher._mode_siv import SivMode
from Crypto.Cipher._mode_ocb import OcbMode
MODE_ECB: Literal[1]
MODE_CBC: Literal[2]
MODE_CFB: Literal[3]
MODE_OFB: Literal[5]
MODE_CTR: Literal[6]
MODE_OPENPGP: Literal[7]
MODE_CCM: Literal[8]
MODE_EAX: Literal[9]
MODE_SIV: Literal[10]
MODE_GCM: Literal[11]
MODE_OCB: Literal[12]
# MODE_ECB
@overload
def new(key: Buffer,
mode: Literal[1],
use_aesni : bool = ...) -> \
EcbMode: ...
# MODE_CBC
@overload
def new(key: Buffer,
mode: Literal[2],
iv : Optional[Buffer] = ...,
use_aesni : bool = ...) -> \
CbcMode: ...
@overload
def new(key: Buffer,
mode: Literal[2],
IV : Optional[Buffer] = ...,
use_aesni : bool = ...) -> \
CbcMode: ...
# MODE_CFB
@overload
def new(key: Buffer,
mode: Literal[3],
iv : Optional[Buffer] = ...,
segment_size : int = ...,
use_aesni : bool = ...) -> \
CfbMode: ...
@overload
def new(key: Buffer,
mode: Literal[3],
IV : Optional[Buffer] = ...,
segment_size : int = ...,
use_aesni : bool = ...) -> \
CfbMode: ...
# MODE_OFB
@overload
def new(key: Buffer,
mode: Literal[5],
iv : Optional[Buffer] = ...,
use_aesni : bool = ...) -> \
OfbMode: ...
@overload
def new(key: Buffer,
mode: Literal[5],
IV : Optional[Buffer] = ...,
use_aesni : bool = ...) -> \
OfbMode: ...
# MODE_CTR
@overload
def new(key: Buffer,
mode: Literal[6],
nonce : Optional[Buffer] = ...,
initial_value : Union[int, Buffer] = ...,
counter : Dict = ...,
use_aesni : bool = ...) -> \
CtrMode: ...
# MODE_OPENPGP
@overload
def new(key: Buffer,
mode: Literal[7],
iv : Optional[Buffer] = ...,
use_aesni : bool = ...) -> \
OpenPgpMode: ...
@overload
def new(key: Buffer,
mode: Literal[7],
IV : Optional[Buffer] = ...,
use_aesni : bool = ...) -> \
OpenPgpMode: ...
# MODE_CCM
@overload
def new(key: Buffer,
mode: Literal[8],
nonce : Optional[Buffer] = ...,
mac_len : int = ...,
assoc_len : int = ...,
use_aesni : bool = ...) -> \
CcmMode: ...
# MODE_EAX
@overload
def new(key: Buffer,
mode: Literal[9],
nonce : Optional[Buffer] = ...,
mac_len : int = ...,
use_aesni : bool = ...) -> \
EaxMode: ...
# MODE_GCM
@overload
def new(key: Buffer,
mode: Literal[10],
nonce : Optional[Buffer] = ...,
use_aesni : bool = ...) -> \
SivMode: ...
# MODE_SIV
@overload
def new(key: Buffer,
mode: Literal[11],
nonce : Optional[Buffer] = ...,
mac_len : int = ...,
use_aesni : bool = ...) -> \
GcmMode: ...
# MODE_OCB
@overload
def new(key: Buffer,
mode: Literal[12],
nonce : Optional[Buffer] = ...,
mac_len : int = ...,
use_aesni : bool = ...) -> \
OcbMode: ...
block_size: int
key_size: Tuple[int, int, int]
# -*- coding: utf-8 -*-
#
# Cipher/ARC2.py : ARC2.py
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
"""
Module's constants for the modes of operation supported with ARC2:
:var MODE_ECB: :ref:`Electronic Code Book (ECB) <ecb_mode>`
:var MODE_CBC: :ref:`Cipher-Block Chaining (CBC) <cbc_mode>`
:var MODE_CFB: :ref:`Cipher FeedBack (CFB) <cfb_mode>`
:var MODE_OFB: :ref:`Output FeedBack (OFB) <ofb_mode>`
:var MODE_CTR: :ref:`CounTer Mode (CTR) <ctr_mode>`
:var MODE_OPENPGP: :ref:`OpenPGP Mode <openpgp_mode>`
:var MODE_EAX: :ref:`EAX Mode <eax_mode>`
"""
import sys
from Crypto.Cipher import _create_cipher
from Crypto.Util.py3compat import byte_string
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, SmartPointer,
c_size_t, c_uint8_ptr)
_raw_arc2_lib = load_pycryptodome_raw_lib(
"Crypto.Cipher._raw_arc2",
"""
int ARC2_start_operation(const uint8_t key[],
size_t key_len,
size_t effective_key_len,
void **pResult);
int ARC2_encrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int ARC2_decrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int ARC2_stop_operation(void *state);
"""
)
def _create_base_cipher(dict_parameters):
"""This method instantiates and returns a handle to a low-level
base cipher. It will absorb named parameters in the process."""
try:
key = dict_parameters.pop("key")
except KeyError:
raise TypeError("Missing 'key' parameter")
effective_keylen = dict_parameters.pop("effective_keylen", 1024)
if len(key) not in key_size:
raise ValueError("Incorrect ARC2 key length (%d bytes)" % len(key))
if not (40 <= effective_keylen <= 1024):
raise ValueError("'effective_key_len' must be at least 40 and no larger than 1024 "
"(not %d)" % effective_keylen)
start_operation = _raw_arc2_lib.ARC2_start_operation
stop_operation = _raw_arc2_lib.ARC2_stop_operation
cipher = VoidPointer()
result = start_operation(c_uint8_ptr(key),
c_size_t(len(key)),
c_size_t(effective_keylen),
cipher.address_of())
if result:
raise ValueError("Error %X while instantiating the ARC2 cipher"
% result)
return SmartPointer(cipher.get(), stop_operation)
def new(key, mode, *args, **kwargs):
"""Create a new RC2 cipher.
:param key:
The secret key to use in the symmetric cipher.
Its length can vary from 5 to 128 bytes; the actual search space
(and the cipher strength) can be reduced with the ``effective_keylen`` parameter.
:type key: bytes, bytearray, memoryview
:param mode:
The chaining mode to use for encryption or decryption.
:type mode: One of the supported ``MODE_*`` constants
:Keyword Arguments:
* **iv** (*bytes*, *bytearray*, *memoryview*) --
(Only applicable for ``MODE_CBC``, ``MODE_CFB``, ``MODE_OFB``,
and ``MODE_OPENPGP`` modes).
The initialization vector to use for encryption or decryption.
For ``MODE_CBC``, ``MODE_CFB``, and ``MODE_OFB`` it must be 8 bytes long.
For ``MODE_OPENPGP`` mode only,
it must be 8 bytes long for encryption
and 10 bytes for decryption (in the latter case, it is
actually the *encrypted* IV which was prefixed to the ciphertext).
If not provided, a random byte string is generated (you must then
read its value with the :attr:`iv` attribute).
* **nonce** (*bytes*, *bytearray*, *memoryview*) --
(Only applicable for ``MODE_EAX`` and ``MODE_CTR``).
A value that must never be reused for any other encryption done
with this key.
For ``MODE_EAX`` there are no
restrictions on its length (recommended: **16** bytes).
For ``MODE_CTR``, its length must be in the range **[0..7]**.
If not provided for ``MODE_EAX``, a random byte string is generated (you
can read it back via the ``nonce`` attribute).
* **effective_keylen** (*integer*) --
Optional. Maximum strength in bits of the actual key used by the ARC2 algorithm.
If the supplied ``key`` parameter is longer (in bits) of the value specified
here, it will be weakened to match it.
If not specified, no limitation is applied.
* **segment_size** (*integer*) --
(Only ``MODE_CFB``).The number of **bits** the plaintext and ciphertext
are segmented in. It must be a multiple of 8.
If not specified, it will be assumed to be 8.
* **mac_len** : (*integer*) --
(Only ``MODE_EAX``)
Length of the authentication tag, in bytes.
It must be no longer than 8 (default).
* **initial_value** : (*integer*) --
(Only ``MODE_CTR``). The initial value for the counter within
the counter block. By default it is **0**.
:Return: an ARC2 object, of the applicable mode.
"""
return _create_cipher(sys.modules[__name__], key, mode, *args, **kwargs)
MODE_ECB = 1
MODE_CBC = 2
MODE_CFB = 3
MODE_OFB = 5
MODE_CTR = 6
MODE_OPENPGP = 7
MODE_EAX = 9
# Size of a data block (in bytes)
block_size = 8
# Size of a key (in bytes)
key_size = range(5, 128 + 1)
from typing import Union, Dict, Iterable, Optional
Buffer = bytes|bytearray|memoryview
from Crypto.Cipher._mode_ecb import EcbMode
from Crypto.Cipher._mode_cbc import CbcMode
from Crypto.Cipher._mode_cfb import CfbMode
from Crypto.Cipher._mode_ofb import OfbMode
from Crypto.Cipher._mode_ctr import CtrMode
from Crypto.Cipher._mode_openpgp import OpenPgpMode
from Crypto.Cipher._mode_eax import EaxMode
ARC2Mode = int
MODE_ECB: ARC2Mode
MODE_CBC: ARC2Mode
MODE_CFB: ARC2Mode
MODE_OFB: ARC2Mode
MODE_CTR: ARC2Mode
MODE_OPENPGP: ARC2Mode
MODE_EAX: ARC2Mode
def new(key: Buffer,
mode: ARC2Mode,
iv : Optional[Buffer] = ...,
IV : Optional[Buffer] = ...,
nonce : Optional[Buffer] = ...,
segment_size : int = ...,
mac_len : int = ...,
initial_value : Union[int, Buffer] = ...,
counter : Dict = ...) -> \
Union[EcbMode, CbcMode, CfbMode, OfbMode, CtrMode, OpenPgpMode]: ...
block_size: int
key_size: Iterable[int]
# -*- coding: utf-8 -*-
#
# Cipher/ARC4.py : ARC4
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib, VoidPointer,
create_string_buffer, get_raw_buffer,
SmartPointer, c_size_t, c_uint8_ptr)
_raw_arc4_lib = load_pycryptodome_raw_lib("Crypto.Cipher._ARC4", """
int ARC4_stream_encrypt(void *rc4State, const uint8_t in[],
uint8_t out[], size_t len);
int ARC4_stream_init(uint8_t *key, size_t keylen,
void **pRc4State);
int ARC4_stream_destroy(void *rc4State);
""")
class ARC4Cipher:
"""ARC4 cipher object. Do not create it directly. Use
:func:`Crypto.Cipher.ARC4.new` instead.
"""
def __init__(self, key, *args, **kwargs):
"""Initialize an ARC4 cipher object
See also `new()` at the module level."""
if len(args) > 0:
ndrop = args[0]
args = args[1:]
else:
ndrop = kwargs.pop('drop', 0)
if len(key) not in key_size:
raise ValueError("Incorrect ARC4 key length (%d bytes)" %
len(key))
self._state = VoidPointer()
result = _raw_arc4_lib.ARC4_stream_init(c_uint8_ptr(key),
c_size_t(len(key)),
self._state.address_of())
if result != 0:
raise ValueError("Error %d while creating the ARC4 cipher"
% result)
self._state = SmartPointer(self._state.get(),
_raw_arc4_lib.ARC4_stream_destroy)
if ndrop > 0:
# This is OK even if the cipher is used for decryption,
# since encrypt and decrypt are actually the same thing
# with ARC4.
self.encrypt(b'\x00' * ndrop)
self.block_size = 1
self.key_size = len(key)
def encrypt(self, plaintext):
"""Encrypt a piece of data.
:param plaintext: The data to encrypt, of any size.
:type plaintext: bytes, bytearray, memoryview
:returns: the encrypted byte string, of equal length as the
plaintext.
"""
ciphertext = create_string_buffer(len(plaintext))
result = _raw_arc4_lib.ARC4_stream_encrypt(self._state.get(),
c_uint8_ptr(plaintext),
ciphertext,
c_size_t(len(plaintext)))
if result:
raise ValueError("Error %d while encrypting with RC4" % result)
return get_raw_buffer(ciphertext)
def decrypt(self, ciphertext):
"""Decrypt a piece of data.
:param ciphertext: The data to decrypt, of any size.
:type ciphertext: bytes, bytearray, memoryview
:returns: the decrypted byte string, of equal length as the
ciphertext.
"""
try:
return self.encrypt(ciphertext)
except ValueError as e:
raise ValueError(str(e).replace("enc", "dec"))
def new(key, *args, **kwargs):
"""Create a new ARC4 cipher.
:param key:
The secret key to use in the symmetric cipher.
Its length must be in the range ``[1..256]``.
The recommended length is 16 bytes.
:type key: bytes, bytearray, memoryview
:Keyword Arguments:
* *drop* (``integer``) --
The amount of bytes to discard from the initial part of the keystream.
In fact, such part has been found to be distinguishable from random
data (while it shouldn't) and also correlated to key.
The recommended value is 3072_ bytes. The default value is 0.
:Return: an `ARC4Cipher` object
.. _3072: http://eprint.iacr.org/2002/067.pdf
"""
return ARC4Cipher(key, *args, **kwargs)
# Size of a data block (in bytes)
block_size = 1
# Size of a key (in bytes)
key_size = range(1, 256+1)
from typing import Any, Union, Iterable
Buffer = bytes|bytearray|memoryview
class ARC4Cipher:
block_size: int
key_size: int
def __init__(self, key: Buffer, *args: Any, **kwargs: Any) -> None: ...
def encrypt(self, plaintext: Buffer) -> bytes: ...
def decrypt(self, ciphertext: Buffer) -> bytes: ...
def new(key: Buffer, drop : int = ...) -> ARC4Cipher: ...
block_size: int
key_size: Iterable[int]
# -*- coding: utf-8 -*-
#
# Cipher/Blowfish.py : Blowfish
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
"""
Module's constants for the modes of operation supported with Blowfish:
:var MODE_ECB: :ref:`Electronic Code Book (ECB) <ecb_mode>`
:var MODE_CBC: :ref:`Cipher-Block Chaining (CBC) <cbc_mode>`
:var MODE_CFB: :ref:`Cipher FeedBack (CFB) <cfb_mode>`
:var MODE_OFB: :ref:`Output FeedBack (OFB) <ofb_mode>`
:var MODE_CTR: :ref:`CounTer Mode (CTR) <ctr_mode>`
:var MODE_OPENPGP: :ref:`OpenPGP Mode <openpgp_mode>`
:var MODE_EAX: :ref:`EAX Mode <eax_mode>`
"""
import sys
from Crypto.Cipher import _create_cipher
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, SmartPointer, c_size_t,
c_uint8_ptr)
_raw_blowfish_lib = load_pycryptodome_raw_lib(
"Crypto.Cipher._raw_blowfish",
"""
int Blowfish_start_operation(const uint8_t key[],
size_t key_len,
void **pResult);
int Blowfish_encrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int Blowfish_decrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int Blowfish_stop_operation(void *state);
"""
)
def _create_base_cipher(dict_parameters):
"""This method instantiates and returns a smart pointer to
a low-level base cipher. It will absorb named parameters in
the process."""
try:
key = dict_parameters.pop("key")
except KeyError:
raise TypeError("Missing 'key' parameter")
if len(key) not in key_size:
raise ValueError("Incorrect Blowfish key length (%d bytes)" % len(key))
start_operation = _raw_blowfish_lib.Blowfish_start_operation
stop_operation = _raw_blowfish_lib.Blowfish_stop_operation
void_p = VoidPointer()
result = start_operation(c_uint8_ptr(key),
c_size_t(len(key)),
void_p.address_of())
if result:
raise ValueError("Error %X while instantiating the Blowfish cipher"
% result)
return SmartPointer(void_p.get(), stop_operation)
def new(key, mode, *args, **kwargs):
"""Create a new Blowfish cipher
:param key:
The secret key to use in the symmetric cipher.
Its length can vary from 5 to 56 bytes.
:type key: bytes, bytearray, memoryview
:param mode:
The chaining mode to use for encryption or decryption.
:type mode: One of the supported ``MODE_*`` constants
:Keyword Arguments:
* **iv** (*bytes*, *bytearray*, *memoryview*) --
(Only applicable for ``MODE_CBC``, ``MODE_CFB``, ``MODE_OFB``,
and ``MODE_OPENPGP`` modes).
The initialization vector to use for encryption or decryption.
For ``MODE_CBC``, ``MODE_CFB``, and ``MODE_OFB`` it must be 8 bytes long.
For ``MODE_OPENPGP`` mode only,
it must be 8 bytes long for encryption
and 10 bytes for decryption (in the latter case, it is
actually the *encrypted* IV which was prefixed to the ciphertext).
If not provided, a random byte string is generated (you must then
read its value with the :attr:`iv` attribute).
* **nonce** (*bytes*, *bytearray*, *memoryview*) --
(Only applicable for ``MODE_EAX`` and ``MODE_CTR``).
A value that must never be reused for any other encryption done
with this key.
For ``MODE_EAX`` there are no
restrictions on its length (recommended: **16** bytes).
For ``MODE_CTR``, its length must be in the range **[0..7]**.
If not provided for ``MODE_EAX``, a random byte string is generated (you
can read it back via the ``nonce`` attribute).
* **segment_size** (*integer*) --
(Only ``MODE_CFB``).The number of **bits** the plaintext and ciphertext
are segmented in. It must be a multiple of 8.
If not specified, it will be assumed to be 8.
* **mac_len** : (*integer*) --
(Only ``MODE_EAX``)
Length of the authentication tag, in bytes.
It must be no longer than 8 (default).
* **initial_value** : (*integer*) --
(Only ``MODE_CTR``). The initial value for the counter within
the counter block. By default it is **0**.
:Return: a Blowfish object, of the applicable mode.
"""
return _create_cipher(sys.modules[__name__], key, mode, *args, **kwargs)
MODE_ECB = 1
MODE_CBC = 2
MODE_CFB = 3
MODE_OFB = 5
MODE_CTR = 6
MODE_OPENPGP = 7
MODE_EAX = 9
# Size of a data block (in bytes)
block_size = 8
# Size of a key (in bytes)
key_size = range(4, 56 + 1)
from typing import Union, Dict, Iterable, Optional
Buffer = bytes|bytearray|memoryview
from Crypto.Cipher._mode_ecb import EcbMode
from Crypto.Cipher._mode_cbc import CbcMode
from Crypto.Cipher._mode_cfb import CfbMode
from Crypto.Cipher._mode_ofb import OfbMode
from Crypto.Cipher._mode_ctr import CtrMode
from Crypto.Cipher._mode_openpgp import OpenPgpMode
from Crypto.Cipher._mode_eax import EaxMode
BlowfishMode = int
MODE_ECB: BlowfishMode
MODE_CBC: BlowfishMode
MODE_CFB: BlowfishMode
MODE_OFB: BlowfishMode
MODE_CTR: BlowfishMode
MODE_OPENPGP: BlowfishMode
MODE_EAX: BlowfishMode
def new(key: Buffer,
mode: BlowfishMode,
iv : Optional[Buffer] = ...,
IV : Optional[Buffer] = ...,
nonce : Optional[Buffer] = ...,
segment_size : int = ...,
mac_len : int = ...,
initial_value : Union[int, Buffer] = ...,
counter : Dict = ...) -> \
Union[EcbMode, CbcMode, CfbMode, OfbMode, CtrMode, OpenPgpMode]: ...
block_size: int
key_size: Iterable[int]
# -*- coding: utf-8 -*-
#
# Cipher/CAST.py : CAST
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
"""
Module's constants for the modes of operation supported with CAST:
:var MODE_ECB: :ref:`Electronic Code Book (ECB) <ecb_mode>`
:var MODE_CBC: :ref:`Cipher-Block Chaining (CBC) <cbc_mode>`
:var MODE_CFB: :ref:`Cipher FeedBack (CFB) <cfb_mode>`
:var MODE_OFB: :ref:`Output FeedBack (OFB) <ofb_mode>`
:var MODE_CTR: :ref:`CounTer Mode (CTR) <ctr_mode>`
:var MODE_OPENPGP: :ref:`OpenPGP Mode <openpgp_mode>`
:var MODE_EAX: :ref:`EAX Mode <eax_mode>`
"""
import sys
from Crypto.Cipher import _create_cipher
from Crypto.Util.py3compat import byte_string
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, SmartPointer,
c_size_t, c_uint8_ptr)
_raw_cast_lib = load_pycryptodome_raw_lib(
"Crypto.Cipher._raw_cast",
"""
int CAST_start_operation(const uint8_t key[],
size_t key_len,
void **pResult);
int CAST_encrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int CAST_decrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int CAST_stop_operation(void *state);
""")
def _create_base_cipher(dict_parameters):
"""This method instantiates and returns a handle to a low-level
base cipher. It will absorb named parameters in the process."""
try:
key = dict_parameters.pop("key")
except KeyError:
raise TypeError("Missing 'key' parameter")
if len(key) not in key_size:
raise ValueError("Incorrect CAST key length (%d bytes)" % len(key))
start_operation = _raw_cast_lib.CAST_start_operation
stop_operation = _raw_cast_lib.CAST_stop_operation
cipher = VoidPointer()
result = start_operation(c_uint8_ptr(key),
c_size_t(len(key)),
cipher.address_of())
if result:
raise ValueError("Error %X while instantiating the CAST cipher"
% result)
return SmartPointer(cipher.get(), stop_operation)
def new(key, mode, *args, **kwargs):
"""Create a new CAST cipher
:param key:
The secret key to use in the symmetric cipher.
Its length can vary from 5 to 16 bytes.
:type key: bytes, bytearray, memoryview
:param mode:
The chaining mode to use for encryption or decryption.
:type mode: One of the supported ``MODE_*`` constants
:Keyword Arguments:
* **iv** (*bytes*, *bytearray*, *memoryview*) --
(Only applicable for ``MODE_CBC``, ``MODE_CFB``, ``MODE_OFB``,
and ``MODE_OPENPGP`` modes).
The initialization vector to use for encryption or decryption.
For ``MODE_CBC``, ``MODE_CFB``, and ``MODE_OFB`` it must be 8 bytes long.
For ``MODE_OPENPGP`` mode only,
it must be 8 bytes long for encryption
and 10 bytes for decryption (in the latter case, it is
actually the *encrypted* IV which was prefixed to the ciphertext).
If not provided, a random byte string is generated (you must then
read its value with the :attr:`iv` attribute).
* **nonce** (*bytes*, *bytearray*, *memoryview*) --
(Only applicable for ``MODE_EAX`` and ``MODE_CTR``).
A value that must never be reused for any other encryption done
with this key.
For ``MODE_EAX`` there are no
restrictions on its length (recommended: **16** bytes).
For ``MODE_CTR``, its length must be in the range **[0..7]**.
If not provided for ``MODE_EAX``, a random byte string is generated (you
can read it back via the ``nonce`` attribute).
* **segment_size** (*integer*) --
(Only ``MODE_CFB``).The number of **bits** the plaintext and ciphertext
are segmented in. It must be a multiple of 8.
If not specified, it will be assumed to be 8.
* **mac_len** : (*integer*) --
(Only ``MODE_EAX``)
Length of the authentication tag, in bytes.
It must be no longer than 8 (default).
* **initial_value** : (*integer*) --
(Only ``MODE_CTR``). The initial value for the counter within
the counter block. By default it is **0**.
:Return: a CAST object, of the applicable mode.
"""
return _create_cipher(sys.modules[__name__], key, mode, *args, **kwargs)
MODE_ECB = 1
MODE_CBC = 2
MODE_CFB = 3
MODE_OFB = 5
MODE_CTR = 6
MODE_OPENPGP = 7
MODE_EAX = 9
# Size of a data block (in bytes)
block_size = 8
# Size of a key (in bytes)
key_size = range(5, 16 + 1)
from typing import Union, Dict, Iterable, Optional
Buffer = bytes|bytearray|memoryview
from Crypto.Cipher._mode_ecb import EcbMode
from Crypto.Cipher._mode_cbc import CbcMode
from Crypto.Cipher._mode_cfb import CfbMode
from Crypto.Cipher._mode_ofb import OfbMode
from Crypto.Cipher._mode_ctr import CtrMode
from Crypto.Cipher._mode_openpgp import OpenPgpMode
from Crypto.Cipher._mode_eax import EaxMode
CASTMode = int
MODE_ECB: CASTMode
MODE_CBC: CASTMode
MODE_CFB: CASTMode
MODE_OFB: CASTMode
MODE_CTR: CASTMode
MODE_OPENPGP: CASTMode
MODE_EAX: CASTMode
def new(key: Buffer,
mode: CASTMode,
iv : Optional[Buffer] = ...,
IV : Optional[Buffer] = ...,
nonce : Optional[Buffer] = ...,
segment_size : int = ...,
mac_len : int = ...,
initial_value : Union[int, Buffer] = ...,
counter : Dict = ...) -> \
Union[EcbMode, CbcMode, CfbMode, OfbMode, CtrMode, OpenPgpMode]: ...
block_size: int
key_size : Iterable[int]
from typing import Union, overload, Optional
Buffer = bytes|bytearray|memoryview
def _HChaCha20(key: Buffer, nonce: Buffer) -> bytearray: ...
class ChaCha20Cipher:
block_size: int
nonce: bytes
def __init__(self, key: Buffer, nonce: Buffer) -> None: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
def seek(self, position: int) -> None: ...
def new(key: Buffer, nonce: Optional[Buffer] = ...) -> ChaCha20Cipher: ...
block_size: int
key_size: int
from typing import Union, Tuple, overload, Optional
Buffer = bytes|bytearray|memoryview
class ChaCha20Poly1305Cipher:
nonce: bytes
def __init__(self, key: Buffer, nonce: Buffer) -> None: ...
def update(self, data: Buffer) -> None: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
def digest(self) -> bytes: ...
def hexdigest(self) -> str: ...
def verify(self, received_mac_tag: Buffer) -> None: ...
def hexverify(self, received_mac_tag: str) -> None: ...
def encrypt_and_digest(self, plaintext: Buffer) -> Tuple[bytes, bytes]: ...
def decrypt_and_verify(self, ciphertext: Buffer, received_mac_tag: Buffer) -> bytes: ...
def new(key: Buffer, nonce: Optional[Buffer] = ...) -> ChaCha20Poly1305Cipher: ...
block_size: int
key_size: int
# -*- coding: utf-8 -*-
#
# Cipher/DES.py : DES
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
"""
Module's constants for the modes of operation supported with Single DES:
:var MODE_ECB: :ref:`Electronic Code Book (ECB) <ecb_mode>`
:var MODE_CBC: :ref:`Cipher-Block Chaining (CBC) <cbc_mode>`
:var MODE_CFB: :ref:`Cipher FeedBack (CFB) <cfb_mode>`
:var MODE_OFB: :ref:`Output FeedBack (OFB) <ofb_mode>`
:var MODE_CTR: :ref:`CounTer Mode (CTR) <ctr_mode>`
:var MODE_OPENPGP: :ref:`OpenPGP Mode <openpgp_mode>`
:var MODE_EAX: :ref:`EAX Mode <eax_mode>`
"""
import sys
from Crypto.Cipher import _create_cipher
from Crypto.Util.py3compat import byte_string
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, SmartPointer,
c_size_t, c_uint8_ptr)
_raw_des_lib = load_pycryptodome_raw_lib(
"Crypto.Cipher._raw_des",
"""
int DES_start_operation(const uint8_t key[],
size_t key_len,
void **pResult);
int DES_encrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int DES_decrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int DES_stop_operation(void *state);
""")
def _create_base_cipher(dict_parameters):
"""This method instantiates and returns a handle to a low-level
base cipher. It will absorb named parameters in the process."""
try:
key = dict_parameters.pop("key")
except KeyError:
raise TypeError("Missing 'key' parameter")
if len(key) != key_size:
raise ValueError("Incorrect DES key length (%d bytes)" % len(key))
start_operation = _raw_des_lib.DES_start_operation
stop_operation = _raw_des_lib.DES_stop_operation
cipher = VoidPointer()
result = start_operation(c_uint8_ptr(key),
c_size_t(len(key)),
cipher.address_of())
if result:
raise ValueError("Error %X while instantiating the DES cipher"
% result)
return SmartPointer(cipher.get(), stop_operation)
def new(key, mode, *args, **kwargs):
"""Create a new DES cipher.
:param key:
The secret key to use in the symmetric cipher.
It must be 8 byte long. The parity bits will be ignored.
:type key: bytes/bytearray/memoryview
:param mode:
The chaining mode to use for encryption or decryption.
:type mode: One of the supported ``MODE_*`` constants
:Keyword Arguments:
* **iv** (*byte string*) --
(Only applicable for ``MODE_CBC``, ``MODE_CFB``, ``MODE_OFB``,
and ``MODE_OPENPGP`` modes).
The initialization vector to use for encryption or decryption.
For ``MODE_CBC``, ``MODE_CFB``, and ``MODE_OFB`` it must be 8 bytes long.
For ``MODE_OPENPGP`` mode only,
it must be 8 bytes long for encryption
and 10 bytes for decryption (in the latter case, it is
actually the *encrypted* IV which was prefixed to the ciphertext).
If not provided, a random byte string is generated (you must then
read its value with the :attr:`iv` attribute).
* **nonce** (*byte string*) --
(Only applicable for ``MODE_EAX`` and ``MODE_CTR``).
A value that must never be reused for any other encryption done
with this key.
For ``MODE_EAX`` there are no
restrictions on its length (recommended: **16** bytes).
For ``MODE_CTR``, its length must be in the range **[0..7]**.
If not provided for ``MODE_EAX``, a random byte string is generated (you
can read it back via the ``nonce`` attribute).
* **segment_size** (*integer*) --
(Only ``MODE_CFB``).The number of **bits** the plaintext and ciphertext
are segmented in. It must be a multiple of 8.
If not specified, it will be assumed to be 8.
* **mac_len** : (*integer*) --
(Only ``MODE_EAX``)
Length of the authentication tag, in bytes.
It must be no longer than 8 (default).
* **initial_value** : (*integer*) --
(Only ``MODE_CTR``). The initial value for the counter within
the counter block. By default it is **0**.
:Return: a DES object, of the applicable mode.
"""
return _create_cipher(sys.modules[__name__], key, mode, *args, **kwargs)
MODE_ECB = 1
MODE_CBC = 2
MODE_CFB = 3
MODE_OFB = 5
MODE_CTR = 6
MODE_OPENPGP = 7
MODE_EAX = 9
# Size of a data block (in bytes)
block_size = 8
# Size of a key (in bytes)
key_size = 8
from typing import Union, Dict, Iterable, Optional
Buffer = bytes|bytearray|memoryview
from Crypto.Cipher._mode_ecb import EcbMode
from Crypto.Cipher._mode_cbc import CbcMode
from Crypto.Cipher._mode_cfb import CfbMode
from Crypto.Cipher._mode_ofb import OfbMode
from Crypto.Cipher._mode_ctr import CtrMode
from Crypto.Cipher._mode_openpgp import OpenPgpMode
from Crypto.Cipher._mode_eax import EaxMode
DESMode = int
MODE_ECB: DESMode
MODE_CBC: DESMode
MODE_CFB: DESMode
MODE_OFB: DESMode
MODE_CTR: DESMode
MODE_OPENPGP: DESMode
MODE_EAX: DESMode
def new(key: Buffer,
mode: DESMode,
iv : Optional[Buffer] = ...,
IV : Optional[Buffer] = ...,
nonce : Optional[Buffer] = ...,
segment_size : int = ...,
mac_len : int = ...,
initial_value : Union[int, Buffer] = ...,
counter : Dict = ...) -> \
Union[EcbMode, CbcMode, CfbMode, OfbMode, CtrMode, OpenPgpMode]: ...
block_size: int
key_size: int
# -*- coding: utf-8 -*-
#
# Cipher/DES3.py : DES3
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
"""
Module's constants for the modes of operation supported with Triple DES:
:var MODE_ECB: :ref:`Electronic Code Book (ECB) <ecb_mode>`
:var MODE_CBC: :ref:`Cipher-Block Chaining (CBC) <cbc_mode>`
:var MODE_CFB: :ref:`Cipher FeedBack (CFB) <cfb_mode>`
:var MODE_OFB: :ref:`Output FeedBack (OFB) <ofb_mode>`
:var MODE_CTR: :ref:`CounTer Mode (CTR) <ctr_mode>`
:var MODE_OPENPGP: :ref:`OpenPGP Mode <openpgp_mode>`
:var MODE_EAX: :ref:`EAX Mode <eax_mode>`
"""
import sys
from Crypto.Cipher import _create_cipher
from Crypto.Util.py3compat import byte_string, bchr, bord, bstr
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, SmartPointer,
c_size_t)
_raw_des3_lib = load_pycryptodome_raw_lib(
"Crypto.Cipher._raw_des3",
"""
int DES3_start_operation(const uint8_t key[],
size_t key_len,
void **pResult);
int DES3_encrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int DES3_decrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int DES3_stop_operation(void *state);
""")
def adjust_key_parity(key_in):
"""Set the parity bits in a TDES key.
:param key_in: the TDES key whose bits need to be adjusted
:type key_in: byte string
:returns: a copy of ``key_in``, with the parity bits correctly set
:rtype: byte string
:raises ValueError: if the TDES key is not 16 or 24 bytes long
:raises ValueError: if the TDES key degenerates into Single DES
"""
def parity_byte(key_byte):
parity = 1
for i in range(1, 8):
parity ^= (key_byte >> i) & 1
return (key_byte & 0xFE) | parity
if len(key_in) not in key_size:
raise ValueError("Not a valid TDES key")
key_out = b"".join([ bchr(parity_byte(bord(x))) for x in key_in ])
if key_out[:8] == key_out[8:16] or key_out[-16:-8] == key_out[-8:]:
raise ValueError("Triple DES key degenerates to single DES")
return key_out
def _create_base_cipher(dict_parameters):
"""This method instantiates and returns a handle to a low-level base cipher.
It will absorb named parameters in the process."""
try:
key_in = dict_parameters.pop("key")
except KeyError:
raise TypeError("Missing 'key' parameter")
key = adjust_key_parity(bstr(key_in))
start_operation = _raw_des3_lib.DES3_start_operation
stop_operation = _raw_des3_lib.DES3_stop_operation
cipher = VoidPointer()
result = start_operation(key,
c_size_t(len(key)),
cipher.address_of())
if result:
raise ValueError("Error %X while instantiating the TDES cipher"
% result)
return SmartPointer(cipher.get(), stop_operation)
def new(key, mode, *args, **kwargs):
"""Create a new Triple DES cipher.
:param key:
The secret key to use in the symmetric cipher.
It must be 16 or 24 byte long. The parity bits will be ignored.
:type key: bytes/bytearray/memoryview
:param mode:
The chaining mode to use for encryption or decryption.
:type mode: One of the supported ``MODE_*`` constants
:Keyword Arguments:
* **iv** (*bytes*, *bytearray*, *memoryview*) --
(Only applicable for ``MODE_CBC``, ``MODE_CFB``, ``MODE_OFB``,
and ``MODE_OPENPGP`` modes).
The initialization vector to use for encryption or decryption.
For ``MODE_CBC``, ``MODE_CFB``, and ``MODE_OFB`` it must be 8 bytes long.
For ``MODE_OPENPGP`` mode only,
it must be 8 bytes long for encryption
and 10 bytes for decryption (in the latter case, it is
actually the *encrypted* IV which was prefixed to the ciphertext).
If not provided, a random byte string is generated (you must then
read its value with the :attr:`iv` attribute).
* **nonce** (*bytes*, *bytearray*, *memoryview*) --
(Only applicable for ``MODE_EAX`` and ``MODE_CTR``).
A value that must never be reused for any other encryption done
with this key.
For ``MODE_EAX`` there are no
restrictions on its length (recommended: **16** bytes).
For ``MODE_CTR``, its length must be in the range **[0..7]**.
If not provided for ``MODE_EAX``, a random byte string is generated (you
can read it back via the ``nonce`` attribute).
* **segment_size** (*integer*) --
(Only ``MODE_CFB``).The number of **bits** the plaintext and ciphertext
are segmented in. It must be a multiple of 8.
If not specified, it will be assumed to be 8.
* **mac_len** : (*integer*) --
(Only ``MODE_EAX``)
Length of the authentication tag, in bytes.
It must be no longer than 8 (default).
* **initial_value** : (*integer*) --
(Only ``MODE_CTR``). The initial value for the counter within
the counter block. By default it is **0**.
:Return: a Triple DES object, of the applicable mode.
"""
return _create_cipher(sys.modules[__name__], key, mode, *args, **kwargs)
MODE_ECB = 1
MODE_CBC = 2
MODE_CFB = 3
MODE_OFB = 5
MODE_CTR = 6
MODE_OPENPGP = 7
MODE_EAX = 9
# Size of a data block (in bytes)
block_size = 8
# Size of a key (in bytes)
key_size = (16, 24)
from typing import Union, Dict, Tuple, Optional
Buffer = bytes|bytearray|memoryview
from Crypto.Cipher._mode_ecb import EcbMode
from Crypto.Cipher._mode_cbc import CbcMode
from Crypto.Cipher._mode_cfb import CfbMode
from Crypto.Cipher._mode_ofb import OfbMode
from Crypto.Cipher._mode_ctr import CtrMode
from Crypto.Cipher._mode_openpgp import OpenPgpMode
from Crypto.Cipher._mode_eax import EaxMode
def adjust_key_parity(key_in: bytes) -> bytes: ...
DES3Mode = int
MODE_ECB: DES3Mode
MODE_CBC: DES3Mode
MODE_CFB: DES3Mode
MODE_OFB: DES3Mode
MODE_CTR: DES3Mode
MODE_OPENPGP: DES3Mode
MODE_EAX: DES3Mode
def new(key: Buffer,
mode: DES3Mode,
iv : Optional[Buffer] = ...,
IV : Optional[Buffer] = ...,
nonce : Optional[Buffer] = ...,
segment_size : int = ...,
mac_len : int = ...,
initial_value : Union[int, Buffer] = ...,
counter : Dict = ...) -> \
Union[EcbMode, CbcMode, CfbMode, OfbMode, CtrMode, OpenPgpMode]: ...
block_size: int
key_size: Tuple[int, int]
# -*- coding: utf-8 -*-
#
# Cipher/PKCS1_OAEP.py : PKCS#1 OAEP
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
from Crypto.Signature.pss import MGF1
import Crypto.Hash.SHA1
from Crypto.Util.py3compat import _copy_bytes
import Crypto.Util.number
from Crypto.Util.number import ceil_div, bytes_to_long, long_to_bytes
from Crypto.Util.strxor import strxor
from Crypto import Random
from ._pkcs1_oaep_decode import oaep_decode
class PKCS1OAEP_Cipher:
"""Cipher object for PKCS#1 v1.5 OAEP.
Do not create directly: use :func:`new` instead."""
def __init__(self, key, hashAlgo, mgfunc, label, randfunc):
"""Initialize this PKCS#1 OAEP cipher object.
:Parameters:
key : an RSA key object
If a private half is given, both encryption and decryption are possible.
If a public half is given, only encryption is possible.
hashAlgo : hash object
The hash function to use. This can be a module under `Crypto.Hash`
or an existing hash object created from any of such modules. If not specified,
`Crypto.Hash.SHA1` is used.
mgfunc : callable
A mask generation function that accepts two parameters: a string to
use as seed, and the lenth of the mask to generate, in bytes.
If not specified, the standard MGF1 consistent with ``hashAlgo`` is used (a safe choice).
label : bytes/bytearray/memoryview
A label to apply to this particular encryption. If not specified,
an empty string is used. Specifying a label does not improve
security.
randfunc : callable
A function that returns random bytes.
:attention: Modify the mask generation function only if you know what you are doing.
Sender and receiver must use the same one.
"""
self._key = key
if hashAlgo:
self._hashObj = hashAlgo
else:
self._hashObj = Crypto.Hash.SHA1
if mgfunc:
self._mgf = mgfunc
else:
self._mgf = lambda x, y: MGF1(x, y, self._hashObj)
self._label = _copy_bytes(None, None, label)
self._randfunc = randfunc
def can_encrypt(self):
"""Legacy function to check if you can call :meth:`encrypt`.
.. deprecated:: 3.0"""
return self._key.can_encrypt()
def can_decrypt(self):
"""Legacy function to check if you can call :meth:`decrypt`.
.. deprecated:: 3.0"""
return self._key.can_decrypt()
def encrypt(self, message):
"""Encrypt a message with PKCS#1 OAEP.
:param message:
The message to encrypt, also known as plaintext. It can be of
variable length, but not longer than the RSA modulus (in bytes)
minus 2, minus twice the hash output size.
For instance, if you use RSA 2048 and SHA-256, the longest message
you can encrypt is 190 byte long.
:type message: bytes/bytearray/memoryview
:returns: The ciphertext, as large as the RSA modulus.
:rtype: bytes
:raises ValueError:
if the message is too long.
"""
# See 7.1.1 in RFC3447
modBits = Crypto.Util.number.size(self._key.n)
k = ceil_div(modBits, 8) # Convert from bits to bytes
hLen = self._hashObj.digest_size
mLen = len(message)
# Step 1b
ps_len = k - mLen - 2 * hLen - 2
if ps_len < 0:
raise ValueError("Plaintext is too long.")
# Step 2a
lHash = self._hashObj.new(self._label).digest()
# Step 2b
ps = b'\x00' * ps_len
# Step 2c
db = lHash + ps + b'\x01' + _copy_bytes(None, None, message)
# Step 2d
ros = self._randfunc(hLen)
# Step 2e
dbMask = self._mgf(ros, k-hLen-1)
# Step 2f
maskedDB = strxor(db, dbMask)
# Step 2g
seedMask = self._mgf(maskedDB, hLen)
# Step 2h
maskedSeed = strxor(ros, seedMask)
# Step 2i
em = b'\x00' + maskedSeed + maskedDB
# Step 3a (OS2IP)
em_int = bytes_to_long(em)
# Step 3b (RSAEP)
m_int = self._key._encrypt(em_int)
# Step 3c (I2OSP)
c = long_to_bytes(m_int, k)
return c
def decrypt(self, ciphertext):
"""Decrypt a message with PKCS#1 OAEP.
:param ciphertext: The encrypted message.
:type ciphertext: bytes/bytearray/memoryview
:returns: The original message (plaintext).
:rtype: bytes
:raises ValueError:
if the ciphertext has the wrong length, or if decryption
fails the integrity check (in which case, the decryption
key is probably wrong).
:raises TypeError:
if the RSA key has no private half (i.e. you are trying
to decrypt using a public key).
"""
# See 7.1.2 in RFC3447
modBits = Crypto.Util.number.size(self._key.n)
k = ceil_div(modBits, 8) # Convert from bits to bytes
hLen = self._hashObj.digest_size
# Step 1b and 1c
if len(ciphertext) != k or k < hLen+2:
raise ValueError("Ciphertext with incorrect length.")
# Step 2a (O2SIP)
ct_int = bytes_to_long(ciphertext)
# Step 2b (RSADP) and step 2c (I2OSP)
em = self._key._decrypt_to_bytes(ct_int)
# Step 3a
lHash = self._hashObj.new(self._label).digest()
# y must be 0, but we MUST NOT check it here in order not to
# allow attacks like Manger's (http://dl.acm.org/citation.cfm?id=704143)
maskedSeed = em[1:hLen+1]
maskedDB = em[hLen+1:]
# Step 3c
seedMask = self._mgf(maskedDB, hLen)
# Step 3d
seed = strxor(maskedSeed, seedMask)
# Step 3e
dbMask = self._mgf(seed, k-hLen-1)
# Step 3f
db = strxor(maskedDB, dbMask)
# Step 3b + 3g
res = oaep_decode(em, lHash, db)
if res <= 0:
raise ValueError("Incorrect decryption.")
# Step 4
return db[res:]
def new(key, hashAlgo=None, mgfunc=None, label=b'', randfunc=None):
"""Return a cipher object :class:`PKCS1OAEP_Cipher`
that can be used to perform PKCS#1 OAEP encryption or decryption.
:param key:
The key object to use to encrypt or decrypt the message.
Decryption is only possible with a private RSA key.
:type key: RSA key object
:param hashAlgo:
The hash function to use. This can be a module under `Crypto.Hash`
or an existing hash object created from any of such modules.
If not specified, `Crypto.Hash.SHA1` is used.
:type hashAlgo: hash object
:param mgfunc:
A mask generation function that accepts two parameters: a string to
use as seed, and the lenth of the mask to generate, in bytes.
If not specified, the standard MGF1 consistent with ``hashAlgo`` is used (a safe choice).
:type mgfunc: callable
:param label:
A label to apply to this particular encryption. If not specified,
an empty string is used. Specifying a label does not improve
security.
:type label: bytes/bytearray/memoryview
:param randfunc:
A function that returns random bytes.
The default is `Random.get_random_bytes`.
:type randfunc: callable
"""
if randfunc is None:
randfunc = Random.get_random_bytes
return PKCS1OAEP_Cipher(key, hashAlgo, mgfunc, label, randfunc)
from typing import Optional, Union, Callable, Any, overload
from typing_extensions import Protocol
from Crypto.PublicKey.RSA import RsaKey
class HashLikeClass(Protocol):
digest_size : int
def new(self, data: Optional[bytes] = ...) -> Any: ...
class HashLikeModule(Protocol):
digest_size : int
@staticmethod
def new(data: Optional[bytes] = ...) -> Any: ...
HashLike = Union[HashLikeClass, HashLikeModule]
Buffer = Union[bytes, bytearray, memoryview]
class PKCS1OAEP_Cipher:
def __init__(self,
key: RsaKey,
hashAlgo: HashLike,
mgfunc: Callable[[bytes, int], bytes],
label: Buffer,
randfunc: Callable[[int], bytes]) -> None: ...
def can_encrypt(self) -> bool: ...
def can_decrypt(self) -> bool: ...
def encrypt(self, message: Buffer) -> bytes: ...
def decrypt(self, ciphertext: Buffer) -> bytes: ...
def new(key: RsaKey,
hashAlgo: Optional[HashLike] = ...,
mgfunc: Optional[Callable[[bytes, int], bytes]] = ...,
label: Optional[Buffer] = ...,
randfunc: Optional[Callable[[int], bytes]] = ...) -> PKCS1OAEP_Cipher: ...
# -*- coding: utf-8 -*-
#
# Cipher/PKCS1-v1_5.py : PKCS#1 v1.5
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
__all__ = ['new', 'PKCS115_Cipher']
from Crypto import Random
from Crypto.Util.number import bytes_to_long, long_to_bytes
from Crypto.Util.py3compat import bord, is_bytes, _copy_bytes
from ._pkcs1_oaep_decode import pkcs1_decode
class PKCS115_Cipher:
"""This cipher can perform PKCS#1 v1.5 RSA encryption or decryption.
Do not instantiate directly. Use :func:`Crypto.Cipher.PKCS1_v1_5.new` instead."""
def __init__(self, key, randfunc):
"""Initialize this PKCS#1 v1.5 cipher object.
:Parameters:
key : an RSA key object
If a private half is given, both encryption and decryption are possible.
If a public half is given, only encryption is possible.
randfunc : callable
Function that returns random bytes.
"""
self._key = key
self._randfunc = randfunc
def can_encrypt(self):
"""Return True if this cipher object can be used for encryption."""
return self._key.can_encrypt()
def can_decrypt(self):
"""Return True if this cipher object can be used for decryption."""
return self._key.can_decrypt()
def encrypt(self, message):
"""Produce the PKCS#1 v1.5 encryption of a message.
This function is named ``RSAES-PKCS1-V1_5-ENCRYPT``, and it is specified in
`section 7.2.1 of RFC8017
<https://tools.ietf.org/html/rfc8017#page-28>`_.
:param message:
The message to encrypt, also known as plaintext. It can be of
variable length, but not longer than the RSA modulus (in bytes) minus 11.
:type message: bytes/bytearray/memoryview
:Returns: A byte string, the ciphertext in which the message is encrypted.
It is as long as the RSA modulus (in bytes).
:Raises ValueError:
If the RSA key length is not sufficiently long to deal with the given
message.
"""
# See 7.2.1 in RFC8017
k = self._key.size_in_bytes()
mLen = len(message)
# Step 1
if mLen > k - 11:
raise ValueError("Plaintext is too long.")
# Step 2a
ps = []
while len(ps) != k - mLen - 3:
new_byte = self._randfunc(1)
if bord(new_byte[0]) == 0x00:
continue
ps.append(new_byte)
ps = b"".join(ps)
# Step 2b
em = b'\x00\x02' + ps + b'\x00' + _copy_bytes(None, None, message)
# Step 3a (OS2IP)
em_int = bytes_to_long(em)
# Step 3b (RSAEP)
m_int = self._key._encrypt(em_int)
# Step 3c (I2OSP)
c = long_to_bytes(m_int, k)
return c
def decrypt(self, ciphertext, sentinel, expected_pt_len=0):
r"""Decrypt a PKCS#1 v1.5 ciphertext.
This is the function ``RSAES-PKCS1-V1_5-DECRYPT`` specified in
`section 7.2.2 of RFC8017
<https://tools.ietf.org/html/rfc8017#page-29>`_.
Args:
ciphertext (bytes/bytearray/memoryview):
The ciphertext that contains the message to recover.
sentinel (any type):
The object to return whenever an error is detected.
expected_pt_len (integer):
The length the plaintext is known to have, or 0 if unknown.
Returns (byte string):
It is either the original message or the ``sentinel`` (in case of an error).
.. warning::
PKCS#1 v1.5 decryption is intrinsically vulnerable to timing
attacks (see `Bleichenbacher's`__ attack).
**Use PKCS#1 OAEP instead**.
This implementation attempts to mitigate the risk
with some constant-time constructs.
However, they are not sufficient by themselves: the type of protocol you
implement and the way you handle errors make a big difference.
Specifically, you should make it very hard for the (malicious)
party that submitted the ciphertext to quickly understand if decryption
succeeded or not.
To this end, it is recommended that your protocol only encrypts
plaintexts of fixed length (``expected_pt_len``),
that ``sentinel`` is a random byte string of the same length,
and that processing continues for as long
as possible even if ``sentinel`` is returned (i.e. in case of
incorrect decryption).
.. __: https://dx.doi.org/10.1007/BFb0055716
"""
# See 7.2.2 in RFC8017
k = self._key.size_in_bytes()
# Step 1
if len(ciphertext) != k:
raise ValueError("Ciphertext with incorrect length (not %d bytes)" % k)
# Step 2a (O2SIP)
ct_int = bytes_to_long(ciphertext)
# Step 2b (RSADP) and Step 2c (I2OSP)
em = self._key._decrypt_to_bytes(ct_int)
# Step 3 (not constant time when the sentinel is not a byte string)
output = bytes(bytearray(k))
if not is_bytes(sentinel) or len(sentinel) > k:
size = pkcs1_decode(em, b'', expected_pt_len, output)
if size < 0:
return sentinel
else:
return output[size:]
# Step 3 (somewhat constant time)
size = pkcs1_decode(em, sentinel, expected_pt_len, output)
return output[size:]
def new(key, randfunc=None):
"""Create a cipher for performing PKCS#1 v1.5 encryption or decryption.
:param key:
The key to use to encrypt or decrypt the message. This is a `Crypto.PublicKey.RSA` object.
Decryption is only possible if *key* is a private RSA key.
:type key: RSA key object
:param randfunc:
Function that return random bytes.
The default is :func:`Crypto.Random.get_random_bytes`.
:type randfunc: callable
:returns: A cipher object `PKCS115_Cipher`.
"""
if randfunc is None:
randfunc = Random.get_random_bytes
return PKCS115_Cipher(key, randfunc)
from typing import Callable, Union, Any, Optional, TypeVar
from Crypto.PublicKey.RSA import RsaKey
Buffer = Union[bytes, bytearray, memoryview]
T = TypeVar('T')
class PKCS115_Cipher:
def __init__(self,
key: RsaKey,
randfunc: Callable[[int], bytes]) -> None: ...
def can_encrypt(self) -> bool: ...
def can_decrypt(self) -> bool: ...
def encrypt(self, message: Buffer) -> bytes: ...
def decrypt(self, ciphertext: Buffer,
sentinel: T,
expected_pt_len: Optional[int] = ...) -> Union[bytes, T]: ...
def new(key: RsaKey,
randfunc: Optional[Callable[[int], bytes]] = ...) -> PKCS115_Cipher: ...
# -*- coding: utf-8 -*-
#
# Cipher/Salsa20.py : Salsa20 stream cipher (http://cr.yp.to/snuffle.html)
#
# Contributed by Fabrizio Tarizzo <fabrizio@fabriziotarizzo.org>.
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
from Crypto.Util.py3compat import _copy_bytes
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
create_string_buffer,
get_raw_buffer, VoidPointer,
SmartPointer, c_size_t,
c_uint8_ptr, is_writeable_buffer)
from Crypto.Random import get_random_bytes
_raw_salsa20_lib = load_pycryptodome_raw_lib("Crypto.Cipher._Salsa20",
"""
int Salsa20_stream_init(uint8_t *key, size_t keylen,
uint8_t *nonce, size_t nonce_len,
void **pSalsaState);
int Salsa20_stream_destroy(void *salsaState);
int Salsa20_stream_encrypt(void *salsaState,
const uint8_t in[],
uint8_t out[], size_t len);
""")
class Salsa20Cipher:
"""Salsa20 cipher object. Do not create it directly. Use :py:func:`new`
instead.
:var nonce: The nonce with length 8
:vartype nonce: byte string
"""
def __init__(self, key, nonce):
"""Initialize a Salsa20 cipher object
See also `new()` at the module level."""
if len(key) not in key_size:
raise ValueError("Incorrect key length for Salsa20 (%d bytes)" % len(key))
if len(nonce) != 8:
raise ValueError("Incorrect nonce length for Salsa20 (%d bytes)" %
len(nonce))
self.nonce = _copy_bytes(None, None, nonce)
self._state = VoidPointer()
result = _raw_salsa20_lib.Salsa20_stream_init(
c_uint8_ptr(key),
c_size_t(len(key)),
c_uint8_ptr(nonce),
c_size_t(len(nonce)),
self._state.address_of())
if result:
raise ValueError("Error %d instantiating a Salsa20 cipher")
self._state = SmartPointer(self._state.get(),
_raw_salsa20_lib.Salsa20_stream_destroy)
self.block_size = 1
self.key_size = len(key)
def encrypt(self, plaintext, output=None):
"""Encrypt a piece of data.
Args:
plaintext(bytes/bytearray/memoryview): The data to encrypt, of any size.
Keyword Args:
output(bytes/bytearray/memoryview): The location where the ciphertext
is written to. If ``None``, the ciphertext is returned.
Returns:
If ``output`` is ``None``, the ciphertext is returned as ``bytes``.
Otherwise, ``None``.
"""
if output is None:
ciphertext = create_string_buffer(len(plaintext))
else:
ciphertext = output
if not is_writeable_buffer(output):
raise TypeError("output must be a bytearray or a writeable memoryview")
if len(plaintext) != len(output):
raise ValueError("output must have the same length as the input"
" (%d bytes)" % len(plaintext))
result = _raw_salsa20_lib.Salsa20_stream_encrypt(
self._state.get(),
c_uint8_ptr(plaintext),
c_uint8_ptr(ciphertext),
c_size_t(len(plaintext)))
if result:
raise ValueError("Error %d while encrypting with Salsa20" % result)
if output is None:
return get_raw_buffer(ciphertext)
else:
return None
def decrypt(self, ciphertext, output=None):
"""Decrypt a piece of data.
Args:
ciphertext(bytes/bytearray/memoryview): The data to decrypt, of any size.
Keyword Args:
output(bytes/bytearray/memoryview): The location where the plaintext
is written to. If ``None``, the plaintext is returned.
Returns:
If ``output`` is ``None``, the plaintext is returned as ``bytes``.
Otherwise, ``None``.
"""
try:
return self.encrypt(ciphertext, output=output)
except ValueError as e:
raise ValueError(str(e).replace("enc", "dec"))
def new(key, nonce=None):
"""Create a new Salsa20 cipher
:keyword key: The secret key to use. It must be 16 or 32 bytes long.
:type key: bytes/bytearray/memoryview
:keyword nonce:
A value that must never be reused for any other encryption
done with this key. It must be 8 bytes long.
If not provided, a random byte string will be generated (you can read
it back via the ``nonce`` attribute of the returned object).
:type nonce: bytes/bytearray/memoryview
:Return: a :class:`Crypto.Cipher.Salsa20.Salsa20Cipher` object
"""
if nonce is None:
nonce = get_random_bytes(8)
return Salsa20Cipher(key, nonce)
# Size of a data block (in bytes)
block_size = 1
# Size of a key (in bytes)
key_size = (16, 32)
from typing import Union, Tuple, Optional, overload, Optional
Buffer = bytes|bytearray|memoryview
class Salsa20Cipher:
nonce: bytes
block_size: int
key_size: int
def __init__(self,
key: Buffer,
nonce: Buffer) -> None: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
def new(key: Buffer, nonce: Optional[Buffer] = ...) -> Salsa20Cipher: ...
block_size: int
key_size: Tuple[int, int]
# ===================================================================
#
# Copyright (c) 2019, Legrandin <helderijs@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# ===================================================================
import sys
from Crypto.Cipher import _create_cipher
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, SmartPointer, c_size_t,
c_uint8_ptr, c_uint)
_raw_blowfish_lib = load_pycryptodome_raw_lib(
"Crypto.Cipher._raw_eksblowfish",
"""
int EKSBlowfish_start_operation(const uint8_t key[],
size_t key_len,
const uint8_t salt[16],
size_t salt_len,
unsigned cost,
unsigned invert,
void **pResult);
int EKSBlowfish_encrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int EKSBlowfish_decrypt(const void *state,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int EKSBlowfish_stop_operation(void *state);
"""
)
def _create_base_cipher(dict_parameters):
"""This method instantiates and returns a smart pointer to
a low-level base cipher. It will absorb named parameters in
the process."""
try:
key = dict_parameters.pop("key")
salt = dict_parameters.pop("salt")
cost = dict_parameters.pop("cost")
except KeyError as e:
raise TypeError("Missing EKSBlowfish parameter: " + str(e))
invert = dict_parameters.pop("invert", True)
if len(key) not in key_size:
raise ValueError("Incorrect EKSBlowfish key length (%d bytes)" % len(key))
start_operation = _raw_blowfish_lib.EKSBlowfish_start_operation
stop_operation = _raw_blowfish_lib.EKSBlowfish_stop_operation
void_p = VoidPointer()
result = start_operation(c_uint8_ptr(key),
c_size_t(len(key)),
c_uint8_ptr(salt),
c_size_t(len(salt)),
c_uint(cost),
c_uint(int(invert)),
void_p.address_of())
if result:
raise ValueError("Error %X while instantiating the EKSBlowfish cipher"
% result)
return SmartPointer(void_p.get(), stop_operation)
def new(key, mode, salt, cost, invert):
"""Create a new EKSBlowfish cipher
Args:
key (bytes, bytearray, memoryview):
The secret key to use in the symmetric cipher.
Its length can vary from 0 to 72 bytes.
mode (one of the supported ``MODE_*`` constants):
The chaining mode to use for encryption or decryption.
salt (bytes, bytearray, memoryview):
The salt that bcrypt uses to thwart rainbow table attacks
cost (integer):
The complexity factor in bcrypt
invert (bool):
If ``False``, in the inner loop use ``ExpandKey`` first over the salt
and then over the key, as defined in
the `original bcrypt specification <https://www.usenix.org/legacy/events/usenix99/provos/provos_html/node4.html>`_.
If ``True``, reverse the order, as in the first implementation of
`bcrypt` in OpenBSD.
:Return: an EKSBlowfish object
"""
kwargs = { 'salt':salt, 'cost':cost, 'invert':invert }
return _create_cipher(sys.modules[__name__], key, mode, **kwargs)
MODE_ECB = 1
# Size of a data block (in bytes)
block_size = 8
# Size of a key (in bytes)
key_size = range(0, 72 + 1)
from typing import Union, Iterable
from Crypto.Cipher._mode_ecb import EcbMode
MODE_ECB: int
Buffer = Union[bytes, bytearray, memoryview]
def new(key: Buffer,
mode: int,
salt: Buffer,
cost: int) -> EcbMode: ...
block_size: int
key_size: Iterable[int]
#
# A block cipher is instantiated as a combination of:
# 1. A base cipher (such as AES)
# 2. A mode of operation (such as CBC)
#
# Both items are implemented as C modules.
#
# The API of #1 is (replace "AES" with the name of the actual cipher):
# - AES_start_operaion(key) --> base_cipher_state
# - AES_encrypt(base_cipher_state, in, out, length)
# - AES_decrypt(base_cipher_state, in, out, length)
# - AES_stop_operation(base_cipher_state)
#
# Where base_cipher_state is AES_State, a struct with BlockBase (set of
# pointers to encrypt/decrypt/stop) followed by cipher-specific data.
#
# The API of #2 is (replace "CBC" with the name of the actual mode):
# - CBC_start_operation(base_cipher_state) --> mode_state
# - CBC_encrypt(mode_state, in, out, length)
# - CBC_decrypt(mode_state, in, out, length)
# - CBC_stop_operation(mode_state)
#
# where mode_state is a a pointer to base_cipher_state plus mode-specific data.
import os
from Crypto.Cipher._mode_ecb import _create_ecb_cipher
from Crypto.Cipher._mode_cbc import _create_cbc_cipher
from Crypto.Cipher._mode_cfb import _create_cfb_cipher
from Crypto.Cipher._mode_ofb import _create_ofb_cipher
from Crypto.Cipher._mode_ctr import _create_ctr_cipher
from Crypto.Cipher._mode_openpgp import _create_openpgp_cipher
from Crypto.Cipher._mode_ccm import _create_ccm_cipher
from Crypto.Cipher._mode_eax import _create_eax_cipher
from Crypto.Cipher._mode_siv import _create_siv_cipher
from Crypto.Cipher._mode_gcm import _create_gcm_cipher
from Crypto.Cipher._mode_ocb import _create_ocb_cipher
_modes = { 1:_create_ecb_cipher,
2:_create_cbc_cipher,
3:_create_cfb_cipher,
5:_create_ofb_cipher,
6:_create_ctr_cipher,
7:_create_openpgp_cipher,
9:_create_eax_cipher
}
_extra_modes = { 8:_create_ccm_cipher,
10:_create_siv_cipher,
11:_create_gcm_cipher,
12:_create_ocb_cipher
}
def _create_cipher(factory, key, mode, *args, **kwargs):
kwargs["key"] = key
modes = dict(_modes)
if kwargs.pop("add_aes_modes", False):
modes.update(_extra_modes)
if not mode in modes:
raise ValueError("Mode not supported")
if args:
if mode in (8, 9, 10, 11, 12):
if len(args) > 1:
raise TypeError("Too many arguments for this mode")
kwargs["nonce"] = args[0]
elif mode in (2, 3, 5, 7):
if len(args) > 1:
raise TypeError("Too many arguments for this mode")
kwargs["IV"] = args[0]
elif mode == 6:
if len(args) > 0:
raise TypeError("Too many arguments for this mode")
elif mode == 1:
raise TypeError("IV is not meaningful for the ECB mode")
return modes[mode](factory, **kwargs)
from typing import Union, overload
from Crypto.Util._raw_api import SmartPointer
Buffer = Union[bytes, bytearray, memoryview]
__all__ = ['CbcMode']
class CbcMode(object):
block_size: int
iv: Buffer
IV: Buffer
def __init__(self,
block_cipher: SmartPointer,
iv: Buffer) -> None: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
from types import ModuleType
from typing import Union, overload, Dict, Tuple, Optional
Buffer = Union[bytes, bytearray, memoryview]
__all__ = ['CcmMode']
class CcmMode(object):
block_size: int
nonce: bytes
def __init__(self,
factory: ModuleType,
key: Buffer,
nonce: Buffer,
mac_len: int,
msg_len: int,
assoc_len: int,
cipher_params: Dict) -> None: ...
def update(self, assoc_data: Buffer) -> CcmMode: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
def digest(self) -> bytes: ...
def hexdigest(self) -> str: ...
def verify(self, received_mac_tag: Buffer) -> None: ...
def hexverify(self, hex_mac_tag: str) -> None: ...
@overload
def encrypt_and_digest(self,
plaintext: Buffer) -> Tuple[bytes, bytes]: ...
@overload
def encrypt_and_digest(self,
plaintext: Buffer,
output: Buffer) -> Tuple[None, bytes]: ...
def decrypt_and_verify(self,
ciphertext: Buffer,
received_mac_tag: Buffer,
output: Optional[Union[bytearray, memoryview]] = ...) -> bytes: ...
from typing import Union, overload
from Crypto.Util._raw_api import SmartPointer
Buffer = Union[bytes, bytearray, memoryview]
__all__ = ['CfbMode']
class CfbMode(object):
block_size: int
iv: Buffer
IV: Buffer
def __init__(self,
block_cipher: SmartPointer,
iv: Buffer,
segment_size: int) -> None: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
from typing import Union, overload
from Crypto.Util._raw_api import SmartPointer
Buffer = Union[bytes, bytearray, memoryview]
__all__ = ['CtrMode']
class CtrMode(object):
block_size: int
nonce: bytes
def __init__(self,
block_cipher: SmartPointer,
initial_counter_block: Buffer,
prefix_len: int,
counter_len: int,
little_endian: bool) -> None: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
from types import ModuleType
from typing import Any, Union, Tuple, Dict, overload, Optional
Buffer = Union[bytes, bytearray, memoryview]
__all__ = ['EaxMode']
class EaxMode(object):
block_size: int
nonce: bytes
def __init__(self,
factory: ModuleType,
key: Buffer,
nonce: Buffer,
mac_len: int,
cipher_params: Dict) -> None: ...
def update(self, assoc_data: Buffer) -> EaxMode: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
def digest(self) -> bytes: ...
def hexdigest(self) -> str: ...
def verify(self, received_mac_tag: Buffer) -> None: ...
def hexverify(self, hex_mac_tag: str) -> None: ...
@overload
def encrypt_and_digest(self,
plaintext: Buffer) -> Tuple[bytes, bytes]: ...
@overload
def encrypt_and_digest(self,
plaintext: Buffer,
output: Buffer) -> Tuple[None, bytes]: ...
def decrypt_and_verify(self,
ciphertext: Buffer,
received_mac_tag: Buffer,
output: Optional[Union[bytearray, memoryview]] = ...) -> bytes: ...
# -*- coding: utf-8 -*-
#
# Cipher/mode_ecb.py : ECB mode
#
# ===================================================================
# The contents of this file are dedicated to the public domain. To
# the extent that dedication to the public domain is not available,
# everyone is granted a worldwide, perpetual, royalty-free,
# non-exclusive license to exercise all rights associated with the
# contents of this file for any purpose whatsoever.
# No rights are reserved.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ===================================================================
"""
Electronic Code Book (ECB) mode.
"""
__all__ = [ 'EcbMode' ]
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, create_string_buffer,
get_raw_buffer, SmartPointer,
c_size_t, c_uint8_ptr,
is_writeable_buffer)
raw_ecb_lib = load_pycryptodome_raw_lib("Crypto.Cipher._raw_ecb", """
int ECB_start_operation(void *cipher,
void **pResult);
int ECB_encrypt(void *ecbState,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int ECB_decrypt(void *ecbState,
const uint8_t *in,
uint8_t *out,
size_t data_len);
int ECB_stop_operation(void *state);
"""
)
class EcbMode(object):
"""*Electronic Code Book (ECB)*.
This is the simplest encryption mode. Each of the plaintext blocks
is directly encrypted into a ciphertext block, independently of
any other block.
This mode is dangerous because it exposes frequency of symbols
in your plaintext. Other modes (e.g. *CBC*) should be used instead.
See `NIST SP800-38A`_ , Section 6.1.
.. _`NIST SP800-38A` : http://csrc.nist.gov/publications/nistpubs/800-38a/sp800-38a.pdf
:undocumented: __init__
"""
def __init__(self, block_cipher):
"""Create a new block cipher, configured in ECB mode.
:Parameters:
block_cipher : C pointer
A smart pointer to the low-level block cipher instance.
"""
self.block_size = block_cipher.block_size
self._state = VoidPointer()
result = raw_ecb_lib.ECB_start_operation(block_cipher.get(),
self._state.address_of())
if result:
raise ValueError("Error %d while instantiating the ECB mode"
% result)
# Ensure that object disposal of this Python object will (eventually)
# free the memory allocated by the raw library for the cipher
# mode
self._state = SmartPointer(self._state.get(),
raw_ecb_lib.ECB_stop_operation)
# Memory allocated for the underlying block cipher is now owned
# by the cipher mode
block_cipher.release()
def encrypt(self, plaintext, output=None):
"""Encrypt data with the key set at initialization.
The data to encrypt can be broken up in two or
more pieces and `encrypt` can be called multiple times.
That is, the statement:
>>> c.encrypt(a) + c.encrypt(b)
is equivalent to:
>>> c.encrypt(a+b)
This function does not add any padding to the plaintext.
:Parameters:
plaintext : bytes/bytearray/memoryview
The piece of data to encrypt.
The length must be multiple of the cipher block length.
:Keywords:
output : bytearray/memoryview
The location where the ciphertext must be written to.
If ``None``, the ciphertext is returned.
:Return:
If ``output`` is ``None``, the ciphertext is returned as ``bytes``.
Otherwise, ``None``.
"""
if output is None:
ciphertext = create_string_buffer(len(plaintext))
else:
ciphertext = output
if not is_writeable_buffer(output):
raise TypeError("output must be a bytearray or a writeable memoryview")
if len(plaintext) != len(output):
raise ValueError("output must have the same length as the input"
" (%d bytes)" % len(plaintext))
result = raw_ecb_lib.ECB_encrypt(self._state.get(),
c_uint8_ptr(plaintext),
c_uint8_ptr(ciphertext),
c_size_t(len(plaintext)))
if result:
if result == 3:
raise ValueError("Data must be aligned to block boundary in ECB mode")
raise ValueError("Error %d while encrypting in ECB mode" % result)
if output is None:
return get_raw_buffer(ciphertext)
else:
return None
def decrypt(self, ciphertext, output=None):
"""Decrypt data with the key set at initialization.
The data to decrypt can be broken up in two or
more pieces and `decrypt` can be called multiple times.
That is, the statement:
>>> c.decrypt(a) + c.decrypt(b)
is equivalent to:
>>> c.decrypt(a+b)
This function does not remove any padding from the plaintext.
:Parameters:
ciphertext : bytes/bytearray/memoryview
The piece of data to decrypt.
The length must be multiple of the cipher block length.
:Keywords:
output : bytearray/memoryview
The location where the plaintext must be written to.
If ``None``, the plaintext is returned.
:Return:
If ``output`` is ``None``, the plaintext is returned as ``bytes``.
Otherwise, ``None``.
"""
if output is None:
plaintext = create_string_buffer(len(ciphertext))
else:
plaintext = output
if not is_writeable_buffer(output):
raise TypeError("output must be a bytearray or a writeable memoryview")
if len(ciphertext) != len(output):
raise ValueError("output must have the same length as the input"
" (%d bytes)" % len(plaintext))
result = raw_ecb_lib.ECB_decrypt(self._state.get(),
c_uint8_ptr(ciphertext),
c_uint8_ptr(plaintext),
c_size_t(len(ciphertext)))
if result:
if result == 3:
raise ValueError("Data must be aligned to block boundary in ECB mode")
raise ValueError("Error %d while decrypting in ECB mode" % result)
if output is None:
return get_raw_buffer(plaintext)
else:
return None
def _create_ecb_cipher(factory, **kwargs):
"""Instantiate a cipher object that performs ECB encryption/decryption.
:Parameters:
factory : module
The underlying block cipher, a module from ``Crypto.Cipher``.
All keywords are passed to the underlying block cipher.
See the relevant documentation for details (at least ``key`` will need
to be present"""
cipher_state = factory._create_base_cipher(kwargs)
cipher_state.block_size = factory.block_size
if kwargs:
raise TypeError("Unknown parameters for ECB: %s" % str(kwargs))
return EcbMode(cipher_state)
from typing import Union, overload
from Crypto.Util._raw_api import SmartPointer
Buffer = Union[bytes, bytearray, memoryview]
__all__ = [ 'EcbMode' ]
class EcbMode(object):
def __init__(self, block_cipher: SmartPointer) -> None: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
from types import ModuleType
from typing import Union, Tuple, Dict, overload, Optional
__all__ = ['GcmMode']
Buffer = Union[bytes, bytearray, memoryview]
class GcmMode(object):
block_size: int
nonce: Buffer
def __init__(self,
factory: ModuleType,
key: Buffer,
nonce: Buffer,
mac_len: int,
cipher_params: Dict) -> None: ...
def update(self, assoc_data: Buffer) -> GcmMode: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
def digest(self) -> bytes: ...
def hexdigest(self) -> str: ...
def verify(self, received_mac_tag: Buffer) -> None: ...
def hexverify(self, hex_mac_tag: str) -> None: ...
@overload
def encrypt_and_digest(self,
plaintext: Buffer) -> Tuple[bytes, bytes]: ...
@overload
def encrypt_and_digest(self,
plaintext: Buffer,
output: Buffer) -> Tuple[None, bytes]: ...
def decrypt_and_verify(self,
ciphertext: Buffer,
received_mac_tag: Buffer,
output: Optional[Union[bytearray, memoryview]] = ...) -> bytes: ...
from types import ModuleType
from typing import Union, Any, Optional, Tuple, Dict, overload
Buffer = Union[bytes, bytearray, memoryview]
class OcbMode(object):
block_size: int
nonce: Buffer
def __init__(self,
factory: ModuleType,
nonce: Buffer,
mac_len: int,
cipher_params: Dict) -> None: ...
def update(self, assoc_data: Buffer) -> OcbMode: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
def digest(self) -> bytes: ...
def hexdigest(self) -> str: ...
def verify(self, received_mac_tag: Buffer) -> None: ...
def hexverify(self, hex_mac_tag: str) -> None: ...
def encrypt_and_digest(self,
plaintext: Buffer) -> Tuple[bytes, bytes]: ...
def decrypt_and_verify(self,
ciphertext: Buffer,
received_mac_tag: Buffer) -> bytes: ...
from typing import Union, overload
from Crypto.Util._raw_api import SmartPointer
Buffer = Union[bytes, bytearray, memoryview]
__all__ = ['OfbMode']
class OfbMode(object):
block_size: int
iv: Buffer
IV: Buffer
def __init__(self,
block_cipher: SmartPointer,
iv: Buffer) -> None: ...
@overload
def encrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def encrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
@overload
def decrypt(self, plaintext: Buffer) -> bytes: ...
@overload
def decrypt(self, plaintext: Buffer, output: Union[bytearray, memoryview]) -> None: ...
# ===================================================================
#
# Copyright (c) 2014, Legrandin <helderijs@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# ===================================================================
"""
OpenPGP mode.
"""
__all__ = ['OpenPgpMode']
from Crypto.Util.py3compat import _copy_bytes
from Crypto.Random import get_random_bytes
class OpenPgpMode(object):
"""OpenPGP mode.
This mode is a variant of CFB, and it is only used in PGP and
OpenPGP_ applications. If in doubt, use another mode.
An Initialization Vector (*IV*) is required.
Unlike CFB, the *encrypted* IV (not the IV itself) is
transmitted to the receiver.
The IV is a random data block. For legacy reasons, two of its bytes are
duplicated to act as a checksum for the correctness of the key, which is now
known to be insecure and is ignored. The encrypted IV is therefore 2 bytes
longer than the clean IV.
.. _OpenPGP: http://tools.ietf.org/html/rfc4880
:undocumented: __init__
"""
def __init__(self, factory, key, iv, cipher_params):
#: The block size of the underlying cipher, in bytes.
self.block_size = factory.block_size
self._done_first_block = False # True after the first encryption
# Instantiate a temporary cipher to process the IV
IV_cipher = factory.new(
key,
factory.MODE_CFB,
IV=b'\x00' * self.block_size,
segment_size=self.block_size * 8,
**cipher_params)
iv = _copy_bytes(None, None, iv)
# The cipher will be used for...
if len(iv) == self.block_size:
# ... encryption
self._encrypted_IV = IV_cipher.encrypt(iv + iv[-2:])
elif len(iv) == self.block_size + 2:
# ... decryption
self._encrypted_IV = iv
# Last two bytes are for a deprecated "quick check" feature that
# should not be used. (https://eprint.iacr.org/2005/033)
iv = IV_cipher.decrypt(iv)[:-2]
else:
raise ValueError("Length of IV must be %d or %d bytes"
" for MODE_OPENPGP"
% (self.block_size, self.block_size + 2))
self.iv = self.IV = iv
# Instantiate the cipher for the real PGP data
self._cipher = factory.new(
key,
factory.MODE_CFB,
IV=self._encrypted_IV[-self.block_size:],
segment_size=self.block_size * 8,
**cipher_params)
def encrypt(self, plaintext):
"""Encrypt data with the key and the parameters set at initialization.
A cipher object is stateful: once you have encrypted a message
you cannot encrypt (or decrypt) another message using the same
object.
The data to encrypt can be broken up in two or
more pieces and `encrypt` can be called multiple times.
That is, the statement:
>>> c.encrypt(a) + c.encrypt(b)
is equivalent to:
>>> c.encrypt(a+b)
This function does not add any padding to the plaintext.
:Parameters:
plaintext : bytes/bytearray/memoryview
The piece of data to encrypt.
:Return:
the encrypted data, as a byte string.
It is as long as *plaintext* with one exception:
when encrypting the first message chunk,
the encypted IV is prepended to the returned ciphertext.
"""
res = self._cipher.encrypt(plaintext)
if not self._done_first_block:
res = self._encrypted_IV + res
self._done_first_block = True
return res
def decrypt(self, ciphertext):
"""Decrypt data with the key and the parameters set at initialization.
A cipher object is stateful: once you have decrypted a message
you cannot decrypt (or encrypt) another message with the same
object.
The data to decrypt can be broken up in two or
more pieces and `decrypt` can be called multiple times.
That is, the statement:
>>> c.decrypt(a) + c.decrypt(b)
is equivalent to:
>>> c.decrypt(a+b)
This function does not remove any padding from the plaintext.
:Parameters:
ciphertext : bytes/bytearray/memoryview
The piece of data to decrypt.
:Return: the decrypted data (byte string).
"""
return self._cipher.decrypt(ciphertext)
def _create_openpgp_cipher(factory, **kwargs):
"""Create a new block cipher, configured in OpenPGP mode.
:Parameters:
factory : module
The module.
:Keywords:
key : bytes/bytearray/memoryview
The secret key to use in the symmetric cipher.
IV : bytes/bytearray/memoryview
The initialization vector to use for encryption or decryption.
For encryption, the IV must be as long as the cipher block size.
For decryption, it must be 2 bytes longer (it is actually the
*encrypted* IV which was prefixed to the ciphertext).
"""
iv = kwargs.pop("IV", None)
IV = kwargs.pop("iv", None)
if (None, None) == (iv, IV):
iv = get_random_bytes(factory.block_size)
if iv is not None:
if IV is not None:
raise TypeError("You must either use 'iv' or 'IV', not both")
else:
iv = IV
try:
key = kwargs.pop("key")
except KeyError as e:
raise TypeError("Missing component: " + str(e))
return OpenPgpMode(factory, key, iv, kwargs)
from types import ModuleType
from typing import Union, Dict
Buffer = Union[bytes, bytearray, memoryview]
__all__ = ['OpenPgpMode']
class OpenPgpMode(object):
block_size: int
iv: Union[bytes, bytearray, memoryview]
IV: Union[bytes, bytearray, memoryview]
def __init__(self,
factory: ModuleType,
key: Buffer,
iv: Buffer,
cipher_params: Dict) -> None: ...
def encrypt(self, plaintext: Buffer) -> bytes: ...
def decrypt(self, plaintext: Buffer) -> bytes: ...
from types import ModuleType
from typing import Union, Tuple, Dict, Optional, overload
Buffer = Union[bytes, bytearray, memoryview]
__all__ = ['SivMode']
class SivMode(object):
block_size: int
nonce: bytes
def __init__(self,
factory: ModuleType,
key: Buffer,
nonce: Buffer,
kwargs: Dict) -> None: ...
def update(self, component: Buffer) -> SivMode: ...
def encrypt(self, plaintext: Buffer) -> bytes: ...
def decrypt(self, plaintext: Buffer) -> bytes: ...
def digest(self) -> bytes: ...
def hexdigest(self) -> str: ...
def verify(self, received_mac_tag: Buffer) -> None: ...
def hexverify(self, hex_mac_tag: str) -> None: ...
@overload
def encrypt_and_digest(self,
plaintext: Buffer) -> Tuple[bytes, bytes]: ...
@overload
def encrypt_and_digest(self,
plaintext: Buffer,
output: Buffer) -> Tuple[None, bytes]: ...
def decrypt_and_verify(self,
ciphertext: Buffer,
received_mac_tag: Buffer,
output: Optional[Union[bytearray, memoryview]] = ...) -> bytes: ...
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib, c_size_t,
c_uint8_ptr)
_raw_pkcs1_decode = load_pycryptodome_raw_lib("Crypto.Cipher._pkcs1_decode",
"""
int pkcs1_decode(const uint8_t *em, size_t len_em,
const uint8_t *sentinel, size_t len_sentinel,
size_t expected_pt_len,
uint8_t *output);
int oaep_decode(const uint8_t *em,
size_t em_len,
const uint8_t *lHash,
size_t hLen,
const uint8_t *db,
size_t db_len);
""")
def pkcs1_decode(em, sentinel, expected_pt_len, output):
if len(em) != len(output):
raise ValueError("Incorrect output length")
ret = _raw_pkcs1_decode.pkcs1_decode(c_uint8_ptr(em),
c_size_t(len(em)),
c_uint8_ptr(sentinel),
c_size_t(len(sentinel)),
c_size_t(expected_pt_len),
c_uint8_ptr(output))
return ret
def oaep_decode(em, lHash, db):
ret = _raw_pkcs1_decode.oaep_decode(c_uint8_ptr(em),
c_size_t(len(em)),
c_uint8_ptr(lHash),
c_size_t(len(lHash)),
c_uint8_ptr(db),
c_size_t(len(db)))
return ret
# ===================================================================
#
# Copyright (c) 2014, Legrandin <helderijs@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# ===================================================================
from binascii import unhexlify
from Crypto.Util.py3compat import bord, tobytes
from Crypto.Random import get_random_bytes
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, SmartPointer,
create_string_buffer,
get_raw_buffer, c_size_t,
c_uint8_ptr)
_raw_blake2b_lib = load_pycryptodome_raw_lib("Crypto.Hash._BLAKE2b",
"""
int blake2b_init(void **state,
const uint8_t *key,
size_t key_size,
size_t digest_size);
int blake2b_destroy(void *state);
int blake2b_update(void *state,
const uint8_t *buf,
size_t len);
int blake2b_digest(const void *state,
uint8_t digest[64]);
int blake2b_copy(const void *src, void *dst);
""")
class BLAKE2b_Hash(object):
"""A BLAKE2b hash object.
Do not instantiate directly. Use the :func:`new` function.
:ivar oid: ASN.1 Object ID
:vartype oid: string
:ivar block_size: the size in bytes of the internal message block,
input to the compression function
:vartype block_size: integer
:ivar digest_size: the size in bytes of the resulting hash
:vartype digest_size: integer
"""
# The internal block size of the hash algorithm in bytes.
block_size = 64
def __init__(self, data, key, digest_bytes, update_after_digest):
# The size of the resulting hash in bytes.
self.digest_size = digest_bytes
self._update_after_digest = update_after_digest
self._digest_done = False
# See https://tools.ietf.org/html/rfc7693
if digest_bytes in (20, 32, 48, 64) and not key:
self.oid = "1.3.6.1.4.1.1722.12.2.1." + str(digest_bytes)
state = VoidPointer()
result = _raw_blake2b_lib.blake2b_init(state.address_of(),
c_uint8_ptr(key),
c_size_t(len(key)),
c_size_t(digest_bytes)
)
if result:
raise ValueError("Error %d while instantiating BLAKE2b" % result)
self._state = SmartPointer(state.get(),
_raw_blake2b_lib.blake2b_destroy)
if data:
self.update(data)
def update(self, data):
"""Continue hashing of a message by consuming the next chunk of data.
Args:
data (bytes/bytearray/memoryview): The next chunk of the message being hashed.
"""
if self._digest_done and not self._update_after_digest:
raise TypeError("You can only call 'digest' or 'hexdigest' on this object")
result = _raw_blake2b_lib.blake2b_update(self._state.get(),
c_uint8_ptr(data),
c_size_t(len(data)))
if result:
raise ValueError("Error %d while hashing BLAKE2b data" % result)
return self
def digest(self):
"""Return the **binary** (non-printable) digest of the message that has been hashed so far.
:return: The hash digest, computed over the data processed so far.
Binary form.
:rtype: byte string
"""
bfr = create_string_buffer(64)
result = _raw_blake2b_lib.blake2b_digest(self._state.get(),
bfr)
if result:
raise ValueError("Error %d while creating BLAKE2b digest" % result)
self._digest_done = True
return get_raw_buffer(bfr)[:self.digest_size]
def hexdigest(self):
"""Return the **printable** digest of the message that has been hashed so far.
:return: The hash digest, computed over the data processed so far.
Hexadecimal encoded.
:rtype: string
"""
return "".join(["%02x" % bord(x) for x in tuple(self.digest())])
def verify(self, mac_tag):
"""Verify that a given **binary** MAC (computed by another party)
is valid.
Args:
mac_tag (bytes/bytearray/memoryview): the expected MAC of the message.
Raises:
ValueError: if the MAC does not match. It means that the message
has been tampered with or that the MAC key is incorrect.
"""
secret = get_random_bytes(16)
mac1 = new(digest_bits=160, key=secret, data=mac_tag)
mac2 = new(digest_bits=160, key=secret, data=self.digest())
if mac1.digest() != mac2.digest():
raise ValueError("MAC check failed")
def hexverify(self, hex_mac_tag):
"""Verify that a given **printable** MAC (computed by another party)
is valid.
Args:
hex_mac_tag (string): the expected MAC of the message, as a hexadecimal string.
Raises:
ValueError: if the MAC does not match. It means that the message
has been tampered with or that the MAC key is incorrect.
"""
self.verify(unhexlify(tobytes(hex_mac_tag)))
def new(self, **kwargs):
"""Return a new instance of a BLAKE2b hash object.
See :func:`new`.
"""
if "digest_bytes" not in kwargs and "digest_bits" not in kwargs:
kwargs["digest_bytes"] = self.digest_size
return new(**kwargs)
def new(**kwargs):
"""Create a new hash object.
Args:
data (bytes/bytearray/memoryview):
Optional. The very first chunk of the message to hash.
It is equivalent to an early call to :meth:`BLAKE2b_Hash.update`.
digest_bytes (integer):
Optional. The size of the digest, in bytes (1 to 64). Default is 64.
digest_bits (integer):
Optional and alternative to ``digest_bytes``.
The size of the digest, in bits (8 to 512, in steps of 8).
Default is 512.
key (bytes/bytearray/memoryview):
Optional. The key to use to compute the MAC (1 to 64 bytes).
If not specified, no key will be used.
update_after_digest (boolean):
Optional. By default, a hash object cannot be updated anymore after
the digest is computed. When this flag is ``True``, such check
is no longer enforced.
Returns:
A :class:`BLAKE2b_Hash` hash object
"""
data = kwargs.pop("data", None)
update_after_digest = kwargs.pop("update_after_digest", False)
digest_bytes = kwargs.pop("digest_bytes", None)
digest_bits = kwargs.pop("digest_bits", None)
if None not in (digest_bytes, digest_bits):
raise TypeError("Only one digest parameter must be provided")
if (None, None) == (digest_bytes, digest_bits):
digest_bytes = 64
if digest_bytes is not None:
if not (1 <= digest_bytes <= 64):
raise ValueError("'digest_bytes' not in range 1..64")
else:
if not (8 <= digest_bits <= 512) or (digest_bits % 8):
raise ValueError("'digest_bits' not in range 8..512, "
"with steps of 8")
digest_bytes = digest_bits // 8
key = kwargs.pop("key", b"")
if len(key) > 64:
raise ValueError("BLAKE2b key cannot exceed 64 bytes")
if kwargs:
raise TypeError("Unknown parameters: " + str(kwargs))
return BLAKE2b_Hash(data, key, digest_bytes, update_after_digest)
from typing import Any, Union
from types import ModuleType
Buffer = Union[bytes, bytearray, memoryview]
class BLAKE2b_Hash(object):
block_size: int
digest_size: int
oid: str
def __init__(self,
data: Buffer,
key: Buffer,
digest_bytes: bytes,
update_after_digest: bool) -> None: ...
def update(self, data: Buffer) -> BLAKE2b_Hash: ...
def digest(self) -> bytes: ...
def hexdigest(self) -> str: ...
def verify(self, mac_tag: Buffer) -> None: ...
def hexverify(self, hex_mac_tag: str) -> None: ...
def new(self,
data: Buffer = ...,
digest_bytes: int = ...,
digest_bits: int = ...,
key: Buffer = ...,
update_after_digest: bool = ...) -> BLAKE2b_Hash: ...
def new(data: Buffer = ...,
digest_bytes: int = ...,
digest_bits: int = ...,
key: Buffer = ...,
update_after_digest: bool = ...) -> BLAKE2b_Hash: ...
# ===================================================================
#
# Copyright (c) 2014, Legrandin <helderijs@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# ===================================================================
from binascii import unhexlify
from Crypto.Util.py3compat import bord, tobytes
from Crypto.Random import get_random_bytes
from Crypto.Util._raw_api import (load_pycryptodome_raw_lib,
VoidPointer, SmartPointer,
create_string_buffer,
get_raw_buffer, c_size_t,
c_uint8_ptr)
_raw_blake2s_lib = load_pycryptodome_raw_lib("Crypto.Hash._BLAKE2s",
"""
int blake2s_init(void **state,
const uint8_t *key,
size_t key_size,
size_t digest_size);
int blake2s_destroy(void *state);
int blake2s_update(void *state,
const uint8_t *buf,
size_t len);
int blake2s_digest(const void *state,
uint8_t digest[32]);
int blake2s_copy(const void *src, void *dst);
""")
class BLAKE2s_Hash(object):
"""A BLAKE2s hash object.
Do not instantiate directly. Use the :func:`new` function.
:ivar oid: ASN.1 Object ID
:vartype oid: string
:ivar block_size: the size in bytes of the internal message block,
input to the compression function
:vartype block_size: integer
:ivar digest_size: the size in bytes of the resulting hash
:vartype digest_size: integer
"""
# The internal block size of the hash algorithm in bytes.
block_size = 32
def __init__(self, data, key, digest_bytes, update_after_digest):
# The size of the resulting hash in bytes.
self.digest_size = digest_bytes
self._update_after_digest = update_after_digest
self._digest_done = False
# See https://tools.ietf.org/html/rfc7693
if digest_bytes in (16, 20, 28, 32) and not key:
self.oid = "1.3.6.1.4.1.1722.12.2.2." + str(digest_bytes)
state = VoidPointer()
result = _raw_blake2s_lib.blake2s_init(state.address_of(),
c_uint8_ptr(key),
c_size_t(len(key)),
c_size_t(digest_bytes)
)
if result:
raise ValueError("Error %d while instantiating BLAKE2s" % result)
self._state = SmartPointer(state.get(),
_raw_blake2s_lib.blake2s_destroy)
if data:
self.update(data)
def update(self, data):
"""Continue hashing of a message by consuming the next chunk of data.
Args:
data (byte string/byte array/memoryview): The next chunk of the message being hashed.
"""
if self._digest_done and not self._update_after_digest:
raise TypeError("You can only call 'digest' or 'hexdigest' on this object")
result = _raw_blake2s_lib.blake2s_update(self._state.get(),
c_uint8_ptr(data),
c_size_t(len(data)))
if result:
raise ValueError("Error %d while hashing BLAKE2s data" % result)
return self
def digest(self):
"""Return the **binary** (non-printable) digest of the message that has been hashed so far.
:return: The hash digest, computed over the data processed so far.
Binary form.
:rtype: byte string
"""
bfr = create_string_buffer(32)
result = _raw_blake2s_lib.blake2s_digest(self._state.get(),
bfr)
if result:
raise ValueError("Error %d while creating BLAKE2s digest" % result)
self._digest_done = True
return get_raw_buffer(bfr)[:self.digest_size]
def hexdigest(self):
"""Return the **printable** digest of the message that has been hashed so far.
:return: The hash digest, computed over the data processed so far.
Hexadecimal encoded.
:rtype: string
"""
return "".join(["%02x" % bord(x) for x in tuple(self.digest())])
def verify(self, mac_tag):
"""Verify that a given **binary** MAC (computed by another party)
is valid.
Args:
mac_tag (byte string/byte array/memoryview): the expected MAC of the message.
Raises:
ValueError: if the MAC does not match. It means that the message
has been tampered with or that the MAC key is incorrect.
"""
secret = get_random_bytes(16)
mac1 = new(digest_bits=160, key=secret, data=mac_tag)
mac2 = new(digest_bits=160, key=secret, data=self.digest())
if mac1.digest() != mac2.digest():
raise ValueError("MAC check failed")
def hexverify(self, hex_mac_tag):
"""Verify that a given **printable** MAC (computed by another party)
is valid.
Args:
hex_mac_tag (string): the expected MAC of the message, as a hexadecimal string.
Raises:
ValueError: if the MAC does not match. It means that the message
has been tampered with or that the MAC key is incorrect.
"""
self.verify(unhexlify(tobytes(hex_mac_tag)))
def new(self, **kwargs):
"""Return a new instance of a BLAKE2s hash object.
See :func:`new`.
"""
if "digest_bytes" not in kwargs and "digest_bits" not in kwargs:
kwargs["digest_bytes"] = self.digest_size
return new(**kwargs)
def new(**kwargs):
"""Create a new hash object.
Args:
data (byte string/byte array/memoryview):
Optional. The very first chunk of the message to hash.
It is equivalent to an early call to :meth:`BLAKE2s_Hash.update`.
digest_bytes (integer):
Optional. The size of the digest, in bytes (1 to 32). Default is 32.
digest_bits (integer):
Optional and alternative to ``digest_bytes``.
The size of the digest, in bits (8 to 256, in steps of 8).
Default is 256.
key (byte string):
Optional. The key to use to compute the MAC (1 to 64 bytes).
If not specified, no key will be used.
update_after_digest (boolean):
Optional. By default, a hash object cannot be updated anymore after
the digest is computed. When this flag is ``True``, such check
is no longer enforced.
Returns:
A :class:`BLAKE2s_Hash` hash object
"""
data = kwargs.pop("data", None)
update_after_digest = kwargs.pop("update_after_digest", False)
digest_bytes = kwargs.pop("digest_bytes", None)
digest_bits = kwargs.pop("digest_bits", None)
if None not in (digest_bytes, digest_bits):
raise TypeError("Only one digest parameter must be provided")
if (None, None) == (digest_bytes, digest_bits):
digest_bytes = 32
if digest_bytes is not None:
if not (1 <= digest_bytes <= 32):
raise ValueError("'digest_bytes' not in range 1..32")
else:
if not (8 <= digest_bits <= 256) or (digest_bits % 8):
raise ValueError("'digest_bits' not in range 8..256, "
"with steps of 8")
digest_bytes = digest_bits // 8
key = kwargs.pop("key", b"")
if len(key) > 32:
raise ValueError("BLAKE2s key cannot exceed 32 bytes")
if kwargs:
raise TypeError("Unknown parameters: " + str(kwargs))
return BLAKE2s_Hash(data, key, digest_bytes, update_after_digest)
from typing import Any, Union
Buffer = Union[bytes, bytearray, memoryview]
class BLAKE2s_Hash(object):
block_size: int
digest_size: int
oid: str
def __init__(self,
data: Buffer,
key: Buffer,
digest_bytes: bytes,
update_after_digest: bool) -> None: ...
def update(self, data: Buffer) -> BLAKE2s_Hash: ...
def digest(self) -> bytes: ...
def hexdigest(self) -> str: ...
def verify(self, mac_tag: Buffer) -> None: ...
def hexverify(self, hex_mac_tag: str) -> None: ...
def new(self, **kwargs: Any) -> BLAKE2s_Hash: ...
def new(data: Buffer = ...,
digest_bytes: int = ...,
digest_bits: int = ...,
key: Buffer = ...,
update_after_digest: bool = ...) -> BLAKE2s_Hash: ...
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment