Here is an incomplete wip that tries to do that:
<details>
<summary>[patch] malleate an existent transaction by flipping s to ORDER - s</summary>
diff --git i/test/functional/test_framework/key.py w/test/functional/test_framework/key.py
index 682c2de35f..af62dc353a 100644
--- i/test/functional/test_framework/key.py
+++ w/test/functional/test_framework/key.py
@@ -26,12 +26,49 @@ def TaggedHash(tag, data):
ss = hashlib.sha256(tag.encode('utf-8')).digest()
ss += ss
ss += data
return hashlib.sha256(ss).digest()
+def extract_r_and_s_from_der_sig(sig):
+ """
+ Extract r and s from the DER formatted signature. Return False for any DER encoding errors.
+ """
+ if (sig[1] + 2 != len(sig)):
+ return False
+ if (len(sig) < 4):
+ return False
+ if (sig[0] != 0x30):
+ return False
+ if (sig[2] != 0x02):
+ return False
+ rlen = sig[3]
+ if (len(sig) < 6 + rlen):
+ return False
+ if rlen < 1 or rlen > 33:
+ return False
+ if sig[4] >= 0x80:
+ return False
+ if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)):
+ return False
+ r = int.from_bytes(sig[4:4+rlen], 'big')
+ if (sig[4+rlen] != 0x02):
+ return False
+ slen = sig[5+rlen]
+ if slen < 1 or slen > 33:
+ return False
+ if (len(sig) != 6 + rlen + slen):
+ return False
+ if sig[6+rlen] >= 0x80:
+ return False
+ if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)):
+ return False
+ s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big')
+ return (r, s)
+
+
class ECPubKey:
"""A secp256k1 public key"""
def __init__(self):
"""Construct an uninitialized public key"""
self.p = None
@@ -60,44 +97,17 @@ class ECPubKey:
"""Verify a strictly DER-encoded ECDSA signature against this pubkey.
See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the
ECDSA verifier algorithm"""
assert self.is_valid
- # Extract r and s from the DER formatted signature. Return false for
- # any DER encoding errors.
- if (sig[1] + 2 != len(sig)):
- return False
- if (len(sig) < 4):
- return False
- if (sig[0] != 0x30):
- return False
- if (sig[2] != 0x02):
- return False
- rlen = sig[3]
- if (len(sig) < 6 + rlen):
- return False
- if rlen < 1 or rlen > 33:
- return False
- if sig[4] >= 0x80:
- return False
- if (rlen > 1 and (sig[4] == 0) and not (sig[5] & 0x80)):
- return False
- r = int.from_bytes(sig[4:4+rlen], 'big')
- if (sig[4+rlen] != 0x02):
- return False
- slen = sig[5+rlen]
- if slen < 1 or slen > 33:
- return False
- if (len(sig) != 6 + rlen + slen):
- return False
- if sig[6+rlen] >= 0x80:
- return False
- if (slen > 1 and (sig[6+rlen] == 0) and not (sig[7+rlen] & 0x80)):
+ rs = extract_r_and_s_from_der_sig(sig)
+ if not rs:
return False
- s = int.from_bytes(sig[6+rlen:6+rlen+slen], 'big')
+ r = rs[0]
+ s = rs[1]
# Verify that r and s are within the group order
if r < 1 or s < 1 or r >= ORDER or s >= ORDER:
return False
if low_s and s >= secp256k1.GE.ORDER_HALF:
return False
diff --git i/test/functional/test_framework/messages.py w/test/functional/test_framework/messages.py
index fc36bd3c9e..643680fcd7 100755
--- i/test/functional/test_framework/messages.py
+++ w/test/functional/test_framework/messages.py
@@ -25,13 +25,15 @@ from io import BytesIO
import math
import random
import socket
import time
import unittest
+from test_framework.crypto import secp256k1
from test_framework.crypto.siphash import siphash256
+from test_framework.key import extract_r_and_s_from_der_sig
from test_framework.util import (
assert_equal,
assert_not_equal,
)
MAX_LOCATOR_SZ = 101
@@ -245,27 +247,48 @@ def from_hex(obj, hex_string):
def tx_from_hex(hex_string):
"""Deserialize from hex string to a transaction object"""
return from_hex(CTransaction(), hex_string)
-def malleate_tx(tx):
+def get_der_sig(tx):
+ return ""
+
+
+def malleate_tx(tx, valid=False):
"""
- Create a malleated version of the tx where the witness is replaced with garbage data.
+ Create a malleated version of the tx where the witness is replaced with
+ - same witness with flipped `s` to `ORDER - s` (valid=True)
+ - garbage data (valid=False)
Returns a CTransaction object.
"""
- tx_bad_wit = tx_from_hex(tx["hex"])
- tx_bad_wit.wit.vtxinwit = [CTxInWitness()]
- # Add garbage data to witness 0. We cannot simply strip the witness, as the node would
- # classify it as a transaction in which the witness was missing rather than wrong.
- tx_bad_wit.wit.vtxinwit[0].scriptWitness.stack = [b'garbage']
+ malleated_tx = tx_from_hex(tx["hex"])
+ if valid:
+ sig = get_der_sig(malleated_tx)
+ rs = extract_r_and_s_from_der_sig(sig)
+ r = rs[0]
+ s = rs[1]
+
+ s = secp256k1.GE.ORDER - s
+
+ # copied from test/functional/test_framework/key.py:194
+ rb = r.to_bytes((r.bit_length() + 8) // 8, 'big')
+ sb = s.to_bytes((s.bit_length() + 8) // 8, 'big')
+ malleated_sig = b'\x30' + bytes([4 + len(rb) + len(sb), 2, len(rb)]) + rb + bytes([2, len(sb)]) + sb
+
+ # put malleated_sig into malleated_tx
+ else:
+ malleated_tx.wit.vtxinwit = [CTxInWitness()]
+ # Add garbage data to witness 0. We cannot simply strip the witness, as the node would
+ # classify it as a transaction in which the witness was missing rather than wrong.
+ malleated_tx.wit.vtxinwit[0].scriptWitness.stack = [b'garbage']
- assert_equal(tx["txid"], tx_bad_wit.txid_hex)
- assert_not_equal(tx["wtxid"], tx_bad_wit.wtxid_hex)
+ assert_equal(tx["txid"], malleated_tx.txid_hex)
+ assert_not_equal(tx["wtxid"], malleated_tx.wtxid_hex)
- return tx_bad_wit
+ return malleated_tx
# like from_hex, but without the hex part
def from_binary(cls, stream):
"""deserialize a binary stream (or bytes object) into an object"""
# handle bytes object by turning it into a stream
</details>