Breaking Through Encryption: How AI-Assisted Analysis Cracked End-to-End Protected Android App
The Challenge: Encrypted Everything #
Sample request/response payload:
{"msg":"tf5qrtXfYAAxcy+nqNfh/ni5kkre/O4QwD8dMsmACkEzXM1EG7gDll1FTVsJfxWfWVjgmOoS6HupynHUoYXz2MCECdqgnK3d0LAkJ4VVpz4101pTZZsNNKmGPbkrCosvk7xxERZpLXt1WPtt2vGQhkJaqsOP2+JOIP9iqSLYj2XK2kTOTPSawDFv5g0uSqS1mw2RwWrV2sVhI2F2G0IYKyGkWm0sHrvgItu8C3vyikMUB2T8iWNj6lo8/t5ZjyY5"}
The target application encrypted all HTTP request and response data, making traditional web application testing approaches ineffective. While this might seem like good security practice, my initial analysis revealed some concerning patterns:
- Static Behavior: The app could decrypt messages created days earlier
- Cross-User Decryption: Messages from different users were successfully decrypted by the same app instance
These observations suggested the encryption implementation might have fundamental flaws, potentially using static keys rather than proper key exchange mechanisms.
The Reverse Engineering Journey #
Before we start, shown below is the diagram to illustrate the process that will be explained:
Step 1: Decompiling the Android Bundle #
Since this was a React Native application, the core logic resided in JavaScript bundled named index.android.bundle
within the APK. To get this file, you can simply use Zip explorer (such as 7Zip) or of course you can also use Jadx if you are interested in exploring the app more.
After obtaining the file, I used hermes-dec, an excellent tool for decompiling Android bundles back into readable (though obfuscated) JavaScript.
# Decompiling the React Native bundle
hbc-decompiler assets/index.android.bundle /tmp/my_output_file.js
The decompilation was straightforward, yielding a large JavaScript file containing the app’s business logic.
Step 2: Pattern Recognition #
After analyzing the decompiled code, I identified clear patterns indicating where encryption and decryption operations occurred. The code structure revealed:
- AES encryption usage - Multiple references to AES cryptographic functions
- Symmetric key operations - Consistent patterns suggesting the same key for encrypt/decrypt
- Encoded key material - The actual encryption key was not plainly visible but appeared to be encoded
Step 3: AI-Powered Analysis #
Rather than spending hours manually tracing through obfuscated JavaScript, I decided to leverage AI assistance. I provided Claude with:
- Sample encrypted requests and responses
- The suspected encryption/decryption functions from the decompiled code
- My hypothesis about the static key usage
Within approximately 5 minutes, Claude successfully identified:
- The encryption key
- The salt values
- The complete encryption parameters
Building the Testing Infrastructure #
Creating the Decryption Engine #
Once I had the cryptographic parameters, I asked Claude to create a Python script to validate the findings:
Scaling with a Web API #
To make the encryption/decryption functionality more accessible, I converted the Python script into a simple web API attached in the Appendix
Automating with Burp Suite #
The final piece was creating a Burp Suite extension to automate the entire process. The extension provides:
- Automatic Detection: Recognizes encrypted traffic patterns
- History Integration: Adds a new tab showing decrypted requests/responses
- Repeater Support: Allows payload modification with automatic re-encryption
- API Integration: Calls our web API for cryptographic operations
I chose the API approach because Jython (Burp’s Python environment) has limitations with certain cryptographic libraries I needed.
The Results: Hidden Vulnerabilities Revealed #
With the encryption barrier removed, I could finally conduct a proper penetration test. The decrypted traffic revealed critical issues:
Broken Access Control #
Multiple endpoints suffered from horizontal privilege escalation vulnerabilities, allowing users to access and modify data belonging to other users. These issues were completely hidden behind the encryption layer and would have been impossible to discover through traditional black-box testing.
Key Takeaways #
This engagement highlighted several important lessons:
- Don’t assume encryption equals security - Implementation flaws can render even strong encryption useless. Encryption should complement, not replace, proper access controls
- Static keys are a critical vulnerability - Proper key management is essential
- Modern tools make reverse engineering more accessible - Tools like hermes-dec significantly lower the barrier to mobile app analysis
- AI can accelerate analysis - When used appropriately, AI can quickly identify patterns in complex code
Conclusion #
This penetration test demonstrated that sophisticated-looking security measures can sometimes hide fundamental flaws. The combination of reverse engineering, AI-assisted analysis, and custom tooling development allowed me to uncover critical vulnerabilities that would have otherwise remained hidden.
The experience reinforced the importance of comprehensive security testing that goes beyond surface-level analysis, especially when dealing with applications that implement custom encryption schemes.
Have you encountered similar challenges in mobile application security testing? I’d love to hear about your experiences and approaches in the comments below.
Appendix #
Encrypt/Decrypt Test Script #
#!/usr/bin/env python3
import base64
import hashlib
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
import json
import sys
def aes_decrypt(ciphertext_b64, key, salt):
"""
Decrypt AES encrypted data using the same parameters as the React Native app
"""
# Convert base64 ciphertext to bytes
ciphertext = base64.b64decode(ciphertext_b64)
# Derive key using PBKDF2 (same parameters as app: 32 iterations, SHA256, keySize 8 = 256 bits)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, # 256 bits / 8 = 32 bytes
salt=salt.encode('utf-8'),
iterations=32,
)
derived_key = kdf.derive(key.encode('utf-8'))
# Use zero IV as in the app
iv = b'\x00' * 16
# Decrypt
cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv))
decryptor = cipher.decryptor()
decrypted = decryptor.update(ciphertext) + decryptor.finalize()
# Remove PKCS7 padding
padding_length = decrypted[-1]
decrypted = decrypted[:-padding_length]
# Convert to UTF-8 string
return decrypted.decode('utf-8')
# Keys found in the decompiled code (hardcoded)
encryption_key = "n6UWQ8UR-xxxxxxxxxxxxxxxxxxxxxxxxxx"
salt = "guDJSxxxxxxxxxxxxxx"
def main():
# Check command line arguments
if len(sys.argv) != 2:
print("Usage: python3 decrypt_test2.py <encrypted_msg>")
print("Example: python3 decrypt_test2.py 'tf5qrtXfYAAxcy+nqNfh/ni5kkre/O4Q...'")
sys.exit(1)
encrypted_msg = sys.argv[1]
try:
# Decrypt the message
decrypted_text = aes_decrypt(encrypted_msg, encryption_key, salt)
print("Decrypted message:")
print(decrypted_text)
# Try to parse as JSON
try:
decrypted_json = json.loads(decrypted_text)
print("\nDecrypted JSON (formatted):")
print(json.dumps(decrypted_json, indent=2, ensure_ascii=False))
except json.JSONDecodeError:
print("\nDecrypted text is not valid JSON")
except Exception as e:
print(f"Decryption failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Web API #
Shown below is the complete Web API created to handle encrypt/decrypt process.
#!/usr/bin/env python3
"""
Flask API for Message Encryption/Decryption
Provides endpoints to encrypt and decrypt messages using the extracted keys
"""
from flask import Flask, request, jsonify
import base64
import json
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes, padding
app = Flask(__name__)
# Hardcoded encryption keys from decompiled app
ENCRYPTION_KEY = "n6UWQ8UR-XXXXXXXXXXXXXXXXX"
SALT = "guDJSXXXXXXXXXXXXXXXXX"
def aes_encrypt(plaintext, key, salt):
"""
Encrypt data using the same parameters as the React Native app
"""
try:
# Derive key using PBKDF2 (same parameters as app: 32 iterations, SHA256, keySize 8 = 256 bits)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, # 256 bits / 8 = 32 bytes
salt=salt.encode('utf-8'),
iterations=32,
)
derived_key = kdf.derive(key.encode('utf-8'))
# Use zero IV as in the app
iv = b'\x00' * 16
# Add PKCS7 padding
padder = padding.PKCS7(128).padder() # AES block size is 128 bits
padded_data = padder.update(plaintext.encode('utf-8')) + padder.finalize()
# Encrypt
cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv))
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
# Return base64 encoded
return base64.b64encode(ciphertext).decode('ascii')
except Exception as e:
raise Exception(f"Encryption failed: {str(e)}")
def aes_decrypt(ciphertext_b64, key, salt):
"""
Decrypt AES encrypted data using the same parameters as the React Native app
"""
try:
# Convert base64 ciphertext to bytes
ciphertext = base64.b64decode(ciphertext_b64)
# Derive key using PBKDF2 (same parameters as app: 32 iterations, SHA256, keySize 8 = 256 bits)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, # 256 bits / 8 = 32 bytes
salt=salt.encode('utf-8'),
iterations=32,
)
derived_key = kdf.derive(key.encode('utf-8'))
# Use zero IV as in the app
iv = b'\x00' * 16
# Decrypt
cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv))
decryptor = cipher.decryptor()
decrypted = decryptor.update(ciphertext) + decryptor.finalize()
# Remove PKCS7 padding
padding_length = decrypted[-1]
decrypted = decrypted[:-padding_length]
# Convert to UTF-8 string
return decrypted.decode('utf-8')
except Exception as e:
raise Exception(f"Decryption failed: {str(e)}")
@app.route('/', methods=['GET'])
def home():
"""API information endpoint"""
return jsonify({
"service": "Crypto API",
"version": "1.0",
"endpoints": {
"/encrypt": "POST - Encrypt JSON data",
"/decrypt": "POST - Decrypt encrypted message",
"/health": "GET - Health check"
},
"usage": {
"encrypt": {
"method": "POST",
"body": {"data": {"your": "json", "data": "here"}},
"response": {"msg": "encrypted_base64_string"}
},
"decrypt": {
"method": "POST",
"body": {"msg": "encrypted_base64_string"},
"response": {"data": {"decrypted": "json", "data": "here"}}
}
}
})
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
"status": "healthy",
"encryption_key_available": bool(ENCRYPTION_KEY),
"salt_available": bool(SALT)
})
@app.route('/encrypt', methods=['POST'])
def encrypt_data():
"""
Encrypt JSON data
Expected input: {"data": {"your": "json", "data": "here"}}
Response: {"msg": "encrypted_base64_string"}
"""
try:
# Get JSON data from request
request_data = request.get_json()
if not request_data:
return jsonify({"error": "No JSON data provided"}), 400
if 'data' not in request_data:
return jsonify({"error": "Missing 'data' field in request"}), 400
# Convert data to JSON string (compact format)
json_string = json.dumps(request_data['data'], separators=(',', ':'), ensure_ascii=False)
# Encrypt the data
encrypted = aes_encrypt(json_string, ENCRYPTION_KEY, SALT)
return jsonify({"msg": encrypted})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/decrypt', methods=['POST'])
def decrypt_data():
"""
Decrypt encrypted message
Expected input: {"msg": "encrypted_base64_string"}
Response: {"data": {"decrypted": "json", "data": "here"}}
"""
try:
# Get JSON data from request
request_data = request.get_json()
if not request_data:
return jsonify({"error": "No JSON data provided"}), 400
if 'msg' not in request_data:
return jsonify({"error": "Missing 'msg' field in request"}), 400
# Decrypt the message
decrypted_text = aes_decrypt(request_data['msg'], ENCRYPTION_KEY, SALT)
# Try to parse as JSON
try:
decrypted_json = json.loads(decrypted_text)
return jsonify({"data": decrypted_json})
except json.JSONDecodeError:
# If not valid JSON, return as plain text
return jsonify({"data": {"text": decrypted_text}})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/test', methods=['GET'])
def test_crypto():
"""
Test endpoint to verify encryption/decryption is working
"""
try:
# Test data
test_data = {
"product": "MB-XXXXXXXXX",
"transactionType": "API-TEST",
"transactionDetail": {
"timestamp": "2025-01-06T12:00:00Z",
"testMessage": "Hello from Flask API!"
}
}
# Convert to JSON string
json_string = json.dumps(test_data, separators=(',', ':'))
# Encrypt
encrypted = aes_encrypt(json_string, ENCRYPTION_KEY, SALT)
# Decrypt to verify
decrypted = aes_decrypt(encrypted, ENCRYPTION_KEY, SALT)
decrypted_json = json.loads(decrypted)
return jsonify({
"test": "successful",
"original": test_data,
"encrypted": encrypted,
"decrypted": decrypted_json,
"round_trip_success": test_data == decrypted_json
})
except Exception as e:
return jsonify({"test": "failed", "error": str(e)}), 500
@app.errorhandler(404)
def not_found(error):
return jsonify({"error": "Endpoint not found", "available_endpoints": ["/", "/encrypt", "/decrypt", "/health", "/test"]}), 404
@app.errorhandler(405)
def method_not_allowed(error):
return jsonify({"error": "Method not allowed"}), 405
@app.errorhandler(500)
def internal_error(error):
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
print("=" * 60)
print("Crypto API")
print("=" * 60)
print(f"Encryption Key: {ENCRYPTION_KEY}")
print(f"Salt: {SALT}")
print("=" * 60)
print("Available endpoints:")
print(" GET / - API information")
print(" GET /health - Health check")
print(" GET /test - Test encryption/decryption")
print(" POST /encrypt - Encrypt data")
print(" POST /decrypt - Decrypt data")
print("=" * 60)
print("Starting Flask server...")
print("Access the API at: http://localhost:5000")
print("=" * 60)
app.run(debug=True, host='0.0.0.0', port=5000)
Burp Extension #
Shown below is the complete burp extension created to automate the encrypt/decrypt process.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Burp Suite Extension: Crypto Proxy (Simple)
Features:
1. Custom History tab with decrypted request/response bodies
2. Repeater integration via custom message editor tabs
3. Automatic encryption/decryption using Flask API
"""
from burp import IBurpExtender, IHttpListener, ITab, IMessageEditorController
from burp import IMessageEditorTabFactory, IMessageEditorTab, IExtensionHelpers
from javax.swing import JPanel, JTextArea, JScrollPane, JLabel, JButton, JSplitPane, JTable
from javax.swing import BorderFactory, BoxLayout, Box, JTextField, JCheckBox, JTabbedPane
from javax.swing.table import DefaultTableModel
from javax.swing.event import ListSelectionListener
from java.awt import BorderLayout, Dimension, Font
from java.awt.event import ActionListener
import json
import urllib2
import threading
import time
class BurpExtender(IBurpExtender, IHttpListener, ITab, IMessageEditorController, IMessageEditorTabFactory):
def registerExtenderCallbacks(self, callbacks):
self._callbacks = callbacks
self._helpers = callbacks.getHelpers()
callbacks.setExtensionName("Crypto Proxy")
# Configuration
self.api_base_url = "http://localhost:5000"
self.target_host = "target.redacted.com"
# History storage
self._history_entries = []
# Create UI
self._create_ui()
# Register listeners
callbacks.registerHttpListener(self)
callbacks.addSuiteTab(self)
callbacks.registerMessageEditorTabFactory(self)
self._log_message("Crypto Proxy loaded")
# Test API connection
threading.Thread(target=self._test_api_connection).start()
def _create_ui(self):
"""Create the main UI"""
self._main_panel = JTabbedPane()
# History tab
self._create_history_tab()
# Configuration tab
self._create_config_tab()
self._main_panel.addTab("Decrypted History", self._history_panel)
self._main_panel.addTab("Configuration", self._config_panel)
def _create_history_tab(self):
"""Create the history tab"""
self._history_panel = JPanel(BorderLayout())
# Create table for history
column_names = ["#", "Method", "URL", "Status", "Request Decrypted", "Response Decrypted"]
self._history_table_model = DefaultTableModel(column_names, 0)
self._history_table = JTable(self._history_table_model)
# Set column widths
self._history_table.getColumnModel().getColumn(0).setPreferredWidth(50)
self._history_table.getColumnModel().getColumn(1).setPreferredWidth(80)
self._history_table.getColumnModel().getColumn(2).setPreferredWidth(300)
self._history_table.getColumnModel().getColumn(3).setPreferredWidth(80)
self._history_table.getColumnModel().getColumn(4).setPreferredWidth(100)
self._history_table.getColumnModel().getColumn(5).setPreferredWidth(100)
table_scroll = JScrollPane(self._history_table)
# Create text areas for detailed view
self._request_detail = JTextArea(10, 40)
self._request_detail.setEditable(False)
self._request_detail.setFont(Font("Monospaced", Font.PLAIN, 11))
self._response_detail = JTextArea(10, 40)
self._response_detail.setEditable(False)
self._response_detail.setFont(Font("Monospaced", Font.PLAIN, 11))
# Create tabbed pane for request/response details
detail_tabs = JTabbedPane()
detail_tabs.addTab("Request Details", JScrollPane(self._request_detail))
detail_tabs.addTab("Response Details", JScrollPane(self._response_detail))
# Split pane
split_pane = JSplitPane(JSplitPane.VERTICAL_SPLIT)
split_pane.setTopComponent(table_scroll)
split_pane.setBottomComponent(detail_tabs)
split_pane.setDividerLocation(200)
self._history_panel.add(split_pane, BorderLayout.CENTER)
# Add table selection listener
self._table_listener = TableSelectionListener(self)
self._history_table.getSelectionModel().addListSelectionListener(self._table_listener)
def _create_config_tab(self):
"""Create the configuration tab"""
self._config_panel = JPanel(BorderLayout())
# Config form
config_form = JPanel()
config_form.setLayout(BoxLayout(config_form, BoxLayout.Y_AXIS))
config_form.setBorder(BorderFactory.createTitledBorder("Configuration"))
# API URL
api_panel = JPanel(BorderLayout())
api_panel.add(JLabel("Flask API URL: "), BorderLayout.WEST)
self._api_url_field = JTextField(self.api_base_url, 30)
api_panel.add(self._api_url_field, BorderLayout.CENTER)
update_btn = JButton("Update")
update_btn.addActionListener(ActionListenerWrapper(self._update_config))
api_panel.add(update_btn, BorderLayout.EAST)
# Target host
host_panel = JPanel(BorderLayout())
host_panel.add(JLabel("Target Host: "), BorderLayout.WEST)
self._target_host_field = JTextField(self.target_host, 30)
host_panel.add(self._target_host_field, BorderLayout.CENTER)
# Buttons
button_panel = JPanel()
test_btn = JButton("Test API")
test_btn.addActionListener(ActionListenerWrapper(self._test_api_button))
clear_btn = JButton("Clear History")
clear_btn.addActionListener(ActionListenerWrapper(self._clear_history))
button_panel.add(test_btn)
button_panel.add(clear_btn)
config_form.add(api_panel)
config_form.add(Box.createVerticalStrut(10))
config_form.add(host_panel)
config_form.add(Box.createVerticalStrut(10))
config_form.add(button_panel)
# Log area
self._log_area = JTextArea(20, 60)
self._log_area.setEditable(False)
self._log_area.setFont(Font("Monospaced", Font.PLAIN, 11))
log_scroll = JScrollPane(self._log_area)
log_scroll.setBorder(BorderFactory.createTitledBorder("Log"))
self._config_panel.add(config_form, BorderLayout.NORTH)
self._config_panel.add(log_scroll, BorderLayout.CENTER)
def getTabCaption(self):
return "Crypto Proxy"
def getUiComponent(self):
return self._main_panel
def _log_message(self, message):
"""Add message to log"""
try:
timestamp = time.strftime("%H:%M:%S")
log_entry = "[{}] {}".format(timestamp, message)
self._log_area.append(log_entry + "\n")
self._log_area.setCaretPosition(self._log_area.getDocument().getLength())
except:
print(message)
def _update_config(self, event):
"""Update configuration"""
self.api_base_url = self._api_url_field.getText()
self.target_host = self._target_host_field.getText()
self._log_message("Configuration updated")
def _test_api_button(self, event):
"""Test API button action"""
threading.Thread(target=self._test_api_connection).start()
def _clear_history(self, event):
"""Clear history"""
self._history_entries = []
self._history_table_model.setRowCount(0)
self._request_detail.setText("")
self._response_detail.setText("")
self._log_message("History cleared")
def _test_api_connection(self):
"""Test Flask API connection"""
try:
self._log_message("Testing API connection...")
url = self.api_base_url + "/health"
request = urllib2.Request(url)
response = urllib2.urlopen(request, timeout=5)
if response.getcode() == 200:
self._log_message("API connection successful!")
else:
self._log_message("API returned status: " + str(response.getcode()))
except Exception as e:
self._log_message("API connection failed: " + str(e))
def _call_decrypt_api(self, encrypted_msg):
"""Call decrypt API"""
try:
url = self.api_base_url + "/decrypt"
payload = json.dumps({"msg": encrypted_msg})
request = urllib2.Request(url, payload)
request.add_header('Content-Type', 'application/json')
response = urllib2.urlopen(request, timeout=10)
result = json.loads(response.read())
if 'data' in result:
return result['data']
return None
except Exception as e:
self._log_message("Decrypt API failed: " + str(e))
return None
def _call_encrypt_api(self, data):
"""Call encrypt API"""
try:
url = self.api_base_url + "/encrypt"
payload = json.dumps({"data": data})
request = urllib2.Request(url, payload)
request.add_header('Content-Type', 'application/json')
response = urllib2.urlopen(request, timeout=10)
result = json.loads(response.read())
if 'msg' in result:
return result['msg']
return None
except Exception as e:
self._log_message("Encrypt API failed: " + str(e))
return None
def processHttpMessage(self, toolFlag, messageIsRequest, messageInfo):
"""Process HTTP messages"""
try:
http_service = messageInfo.getHttpService()
host = http_service.getHost()
if self.target_host not in host:
return
if not messageIsRequest: # Process responses
self._process_message(messageInfo)
except Exception as e:
self._log_message("Error processing message: " + str(e))
def _process_message(self, messageInfo):
"""Process and decrypt message"""
try:
request = messageInfo.getRequest()
response = messageInfo.getResponse()
if not request or not response:
return
request_info = self._helpers.analyzeRequest(messageInfo)
response_info = self._helpers.analyzeResponse(response)
url = str(request_info.getUrl())
method = request_info.getMethod()
status = response_info.getStatusCode()
# Process request
request_decrypted = None
req_body_offset = request_info.getBodyOffset()
if req_body_offset < len(request):
req_body_bytes = request[req_body_offset:]
req_body = ''.join(chr(b & 0xFF) for b in req_body_bytes)
try:
req_json = json.loads(req_body)
if 'msg' in req_json:
request_decrypted = self._call_decrypt_api(req_json['msg'])
except:
pass
# Process response
response_decrypted = None
resp_body_offset = response_info.getBodyOffset()
if resp_body_offset < len(response):
resp_body_bytes = response[resp_body_offset:]
resp_body = ''.join(chr(b & 0xFF) for b in resp_body_bytes)
try:
resp_json = json.loads(resp_body)
msg_content = None
if 'data' in resp_json and isinstance(resp_json['data'], dict) and 'msg' in resp_json['data']:
msg_content = resp_json['data']['msg']
elif 'msg' in resp_json:
msg_content = resp_json['msg']
if msg_content:
response_decrypted = self._call_decrypt_api(msg_content)
except:
pass
# Add to history if we have decrypted content
if request_decrypted or response_decrypted:
self._add_to_history(url, method, status, request_decrypted, response_decrypted)
except Exception as e:
self._log_message("Error processing message: " + str(e))
def _add_to_history(self, url, method, status, request_decrypted, response_decrypted):
"""Add entry to history table"""
try:
entry_id = len(self._history_entries) + 1
entry = {
'id': entry_id,
'url': url,
'method': method,
'status': status,
'request_decrypted': request_decrypted,
'response_decrypted': response_decrypted
}
self._history_entries.append(entry)
# Add to table
row_data = [
str(entry_id),
method,
url,
str(status),
"Yes" if request_decrypted else "No",
"Yes" if response_decrypted else "No"
]
self._history_table_model.addRow(row_data)
self._log_message("Added to history: {} {}".format(method, url))
except Exception as e:
self._log_message("Error adding to history: " + str(e))
def _show_history_details(self, row_index):
"""Show details for selected history entry"""
try:
if row_index >= 0 and row_index < len(self._history_entries):
entry = self._history_entries[row_index]
# Show request details
if entry['request_decrypted']:
req_text = json.dumps(entry['request_decrypted'], indent=2)
self._request_detail.setText(req_text)
else:
self._request_detail.setText("No decrypted request data")
# Show response details
if entry['response_decrypted']:
resp_text = json.dumps(entry['response_decrypted'], indent=2)
self._response_detail.setText(resp_text)
else:
self._response_detail.setText("No decrypted response data")
except Exception as e:
self._log_message("Error showing details: " + str(e))
# Message Editor Tab Factory methods
def createNewInstance(self, controller, editable):
"""Create new message editor tab for Repeater integration"""
return CryptoMessageEditorTab(self, controller, editable)
def getTabName(self):
"""Tab name for message editor"""
return "Decrypted"
# Message Editor Controller methods (required interface)
def getHttpService(self):
return None
def getRequest(self):
return None
def getResponse(self):
return None
# Table selection listener
class TableSelectionListener(ListSelectionListener):
def __init__(self, extender):
self._extender = extender
def valueChanged(self, event):
if not event.getValueIsAdjusting():
selected_row = self._extender._history_table.getSelectedRow()
self._extender._show_history_details(selected_row)
# Custom message editor tab for Repeater integration
class CryptoMessageEditorTab(IMessageEditorTab):
def __init__(self, extender, controller, editable):
self._extender = extender
self._controller = controller
self._editable = editable
# Create text area
self._text_area = JTextArea()
self._text_area.setEditable(editable)
self._text_area.setFont(Font("Monospaced", Font.PLAIN, 12))
self._scroll_pane = JScrollPane(self._text_area)
self._current_message = None
self._original_text = None
self._is_request = None
def getTabCaption(self):
return "Decrypted"
def getUiComponent(self):
return self._scroll_pane
def isEnabled(self, content, isRequest):
"""Enable tab if content contains 'msg' field"""
if content and len(content) > 0:
try:
# Extract body from HTTP message
if isRequest:
request_info = self._extender._helpers.analyzeRequest(content)
body_offset = request_info.getBodyOffset()
if body_offset < len(content):
body_bytes = content[body_offset:]
body_str = ''.join(chr(b & 0xFF) for b in body_bytes)
return '"msg"' in body_str and body_str.strip()
else:
response_info = self._extender._helpers.analyzeResponse(content)
body_offset = response_info.getBodyOffset()
if body_offset < len(content):
body_bytes = content[body_offset:]
body_str = ''.join(chr(b & 0xFF) for b in body_bytes)
return '"msg"' in body_str and body_str.strip()
except:
pass
return False
def setMessage(self, content, isRequest):
"""Set message content and decrypt if possible"""
self._current_message = content
self._is_request = isRequest
self._original_text = None
if content is None:
self._text_area.setText("")
return
try:
# For HTTP requests/responses, we need to extract just the body
if isRequest:
# Parse request to get body
request_info = self._extender._helpers.analyzeRequest(content)
body_offset = request_info.getBodyOffset()
if body_offset < len(content):
body_bytes = content[body_offset:]
body_str = ''.join(chr(b & 0xFF) for b in body_bytes)
else:
self._text_area.setText("No request body found")
return
else:
# Parse response to get body
response_info = self._extender._helpers.analyzeResponse(content)
body_offset = response_info.getBodyOffset()
if body_offset < len(content):
body_bytes = content[body_offset:]
body_str = ''.join(chr(b & 0xFF) for b in body_bytes)
else:
self._text_area.setText("No response body found")
return
# Skip if body is empty
if not body_str.strip():
self._text_area.setText("Empty body")
return
# Try to parse and decrypt
try:
json_data = json.loads(body_str)
msg_content = None
if 'msg' in json_data:
msg_content = json_data['msg']
elif 'data' in json_data and isinstance(json_data['data'], dict) and 'msg' in json_data['data']:
msg_content = json_data['data']['msg']
if msg_content:
self._extender._log_message("Decrypting message in Repeater tab...")
decrypted_data = self._extender._call_decrypt_api(msg_content)
if decrypted_data:
formatted = json.dumps(decrypted_data, indent=2, ensure_ascii=False)
self._text_area.setText(formatted)
self._original_text = formatted # Store original for comparison
self._extender._log_message("Successfully decrypted message in Repeater")
return
else:
self._text_area.setText("Decryption API call failed")
return
self._text_area.setText("No 'msg' field found in JSON")
except ValueError as e:
self._text_area.setText("Invalid JSON in body: " + str(e))
except Exception as e:
self._text_area.setText("Error processing message: " + str(e))
self._extender._log_message("Error in setMessage: " + str(e))
def getMessage(self):
"""Get message content, encrypting if modified"""
# Always return original message if not editable (for response tabs)
if not self._editable:
return self._current_message
# Check if content has been modified
current_text = self._text_area.getText()
# If text hasn't changed from original, return original message
if self._original_text and current_text == self._original_text:
self._extender._log_message("Content unchanged, returning original message")
return self._current_message
# Content has been modified, encrypt it
try:
plaintext = current_text.strip()
if not plaintext:
self._extender._log_message("Empty content, returning original message")
return self._current_message
# Parse the modified JSON
try:
json_data = json.loads(plaintext)
self._extender._log_message("Content was modified! Encrypting new data...")
# Call encryption API
encrypted_msg = self._extender._call_encrypt_api(json_data)
if encrypted_msg:
# Create new HTTP body with encrypted message
new_body = json.dumps({"msg": encrypted_msg}, separators=(',', ':'))
# Reconstruct the HTTP message with new body
if self._current_message and self._is_request is not None:
try:
if self._is_request:
# Handle request
request_info = self._extender._helpers.analyzeRequest(self._current_message)
headers_end = request_info.getBodyOffset()
headers = self._current_message[:headers_end]
# Convert new_body to byte array
new_body_bytes = [ord(c) for c in new_body]
# Create new message with encrypted body
new_message = list(headers) + new_body_bytes
self._extender._log_message("Successfully encrypted and rebuilt request message!")
self._extender._log_message("New encrypted msg: " + encrypted_msg[:50] + "...")
return new_message
else:
# Handle response (though typically not editable)
response_info = self._extender._helpers.analyzeResponse(self._current_message)
headers_end = response_info.getBodyOffset()
headers = self._current_message[:headers_end]
# Convert new_body to byte array
new_body_bytes = [ord(c) for c in new_body]
# Create new message with encrypted body
new_message = list(headers) + new_body_bytes
self._extender._log_message("Successfully encrypted and rebuilt response message!")
return new_message
except Exception as rebuild_error:
self._extender._log_message("Error rebuilding message: " + str(rebuild_error))
return self._current_message
else:
self._extender._log_message("Encryption API call failed!")
return self._current_message
except ValueError as json_error:
self._extender._log_message("Invalid JSON in modified content: " + str(json_error))
return self._current_message
except Exception as e:
self._extender._log_message("Error in getMessage: " + str(e))
return self._current_message
def isModified(self):
"""Check if the content has been modified"""
if not self._editable or not self._original_text:
return False
current_text = self._text_area.getText()
return current_text != self._original_text
def getSelectedData(self):
return self._text_area.getSelectedText()
# Action listener wrapper
class ActionListenerWrapper(ActionListener):
def __init__(self, func):
self.func = func
def actionPerformed(self, event):
self.func(event)