I am attempting to decode a Jupiter swap event on the Solana blockchain to extract token amounts and mint addresses from the transaction data. The expected output should look like this:
{
"amm": "whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc",
"inputMint": "So11111111111111111111111111111111111111112",
"inputAmount": "14829998",
"outputMint": "Hjw6bEcHtbHGpQr8onG3izfJY5DJiWdt7uk2BfdSpump",
"outputAmount": "127849833"
}
Here is an exampleTransaction for reference.
My Approach
I am using construct to define the expected data structure and parse the event:
from construct import Int64ul, Bytes
from construct import Struct as cStruct
import base58
PUBLIC_KEY_LAYOUT = Bytes(32)
JUP_SWAP = cStruct(
"amm" / PUBLIC_KEY_LAYOUT,
"inputMint" / PUBLIC_KEY_LAYOUT,
"inputAmount" / Int64ul,
"outputMint" / PUBLIC_KEY_LAYOUT,
"outputAmount" / Int64ul,
)
# Raw transaction data (copied from Solscan in "raw" mode)
hex_data = 'e445a52e51cb9a1d40c6cde8260871e20e03685f8e909053e458121c66f5a76aedc7706aa11c82f8aa952a8f2b7879a9069b8857feab8184fb687f634618c035dac439dc1aeb3b5598a0f00000000001ae49e20000000000f8ba8d41600d59ff5ede65103bfc142e5bc863e0e17ce50cc1155c425f02395f69d59e0700000000'
# Convert hex to bytes and remove the first 8 bytes
bytes_event = bytes.fromhex(hex_data)[8:]
# Decode the event
decoded_event = JUP_SWAP.parse(bytes_event)
amm = base58.b58encode(decoded_event.amm).decode()
input_mint = base58.b58encode(decoded_event.inputMint).decode()
input_amount = decoded_event.inputAmount
output_mint = base58.b58encode(decoded_event.outputMint).decode()
output_amount = decoded_event.outputAmount
print(f"amm: {amm}")
print(f"inputMint: {input_mint}")
print(f"inputAmount: {input_amount}")
print(f"outputMint: {output_mint}")
print(f"outputAmount: {output_amount}")
Issue
The output does not match the expected values:
amm: 5Mrv98ERNPgDKHDFz8AjcebvXyf8dq9kp1yY1RAHSCJB
inputMint: CUtETTsHosks4AUNtZuBZqb4Gn9k7JetykayaCyDcFq6
inputAmount: 72057594053697688
outputMint: CjMHLni4ZdtAi91Am1Vx63jCYL2bpEArkqzjzsBuaqMV
outputAmount: 6861518115434141121
I’ve tried multiple other decoding methods, including:
Attempt 1: Using PublicKey and struct
amm = PublicKey(buffer[0:32]).__str__()
input_mint = PublicKey(buffer[32:64]).__str__()
input_amount = struct.unpack("<Q", buffer[64:72])[0] # Little-endian u64
output_mint = PublicKey(buffer[72:104]).__str__()
output_amount = struct.unpack("<Q", buffer[104:112])[0] # Little-endian u64
Attempt 2: Using base58 encoding
amm = base58.b58encode(buffer[0:32]).decode()
inputMint = base58.b58encode(buffer[32:64]).decode()
inputAmount = int.from_bytes(buffer[64:72],'little')
outputMint = base58.b58encode(buffer[72:104]).decode()
outputAmount = int.from_bytes(buffer[104:112],'little')
Attempt 3: Using struct.unpack
unpacked = struct.unpack("=32s32sQ32sQ", buffer)
amm = base58.b58encode(unpacked[0]).decode()
inputMint = base58.b58encode(unpacked[2]).decode()
inputAmount = unpacked[2]
outputMint = base58.b58encode(unpacked[3]).decode()
outputAmount = unpacked[4]
Regardless of the method, the results remain incorrect.
Question
I referenced the expected event structure from this IDL, but my results do not align.
Any guidance or corrections would be greatly appreciated. Thanks in advance!
You are very close, and don't be discouraged because there is not enough information sharing in this particular space.
The Jupiter Aggregator v6 program uses 8 byte discriminators. What you are trying to do is to decode the inputs of one (of three total) CPI Log Instructions in the transaction linked above. The trick, (as I am learning) with CPI Logs is that they can have varied number of input schema/struct formats within the same program. This means there needs to be a second discriminator to identify which struct format to use to decode. Not all instructions have a second discrim; in my experience there is a second discriminator only if the instruction can take a varied number of inputs.
If you haven't already learned, the first place you might check to get the IDL/discriminators for a program is solscan. But for this particular program this IDL does not define the CPI Log instruction, let alone the second discriminator:
That second discriminator almost always follows the first discriminator, and almost always the same length as the discriminator. In Jupiter's Program this is also the case.
first_discriminator_length = 8
second_discriminator_length = first_discriminator_length
Depending on how you are formatting your discriminators they will usually look something like this (for 8byte discriminators), and be stored as list of int OR large int:
Jupiter Aggregator V6 CPI Log Discriminator
Second Discriminators:
You're free to name it what you want. This will be the byte data between 8:16 chars and remains the same between transactions for this particular input struct schema. See last point about different input schemas for same method
jup_v6_cpi_log_second_discrim_example_1_list: list[int] = [64, 198, 205, 232, 38, 8, 113, 226]
jup_v6_cpi_log_second_discrim_example_1_int: int = 16316831888147596864
If you check for the first discriminator (between [0,8]) and second discriminator (between [8:16]) and it equals the discriminators above you've identified the instruction, and in turn, the input schema.
def get_jup_v6_struct(ix_data, offset):
if ix_data[:offset] == 2133240923048723940:
# is for sure Jup Agg V6 CLI Log
if ix_data[offset:offset*2] == 16316831888147596864:
# is your struct
offset += 8
return cStruct(
"amm" / PUBLIC_KEY_LAYOUT,
"inputMint" / PUBLIC_KEY_LAYOUT,
"inputAmount" / Int64ul,
"outputMint" / PUBLIC_KEY_LAYOUT,
"outputAmount" / Int64ul,
), offset
return None, None
first_discriminator_length = 8
ix_data = bytes.fromhex(hex_data)
jup_struct, offset = get_jup_v6_struct(ix_data, first_discriminator_length)
if not jup_struct:
return
# You must remove the two discriminators from your ix data (16 bytes), then decode the rest leftover bytes using the struct you defined above:
bytes_event = ix_data[offset:]
decoded_event = jup_struct.parse(bytes_event)
# Here is a dumb one-liner way to decode all args regardless of key/value into a dict:
output_dict = { key: base58.b58encode(value).decode('utf-8') if isinstance(value, bytes) else value for key, value in decoded_event.items() if key != "_io"} if decoded_event else {}
print(output_dict)
####
{
'amm': '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8',
'mint': '3N2ETvNpPNAxhcaXgkhKoY1yDnQfs41Wnxsx5qNJpump',
'inputAmount': 214405806,
'outputMint': 'So11111111111111111111111111111111111111112',
'outputAmount': 14849820
}
You should be aware that the struct/input schema of CPI Logs can vary for the same program, even using the same outer instruction (In this example Jupiter Aggregator v6: route). If you look at other TX using the route instruction, you might see the example above AND another JUP Agg V6 CLI LOG struct/input schema that looks like the output below. This alternate second discriminator will help you identify the alternate struct/input schema, labeled JUP_CLI_LOG_2:
Example of a TX with two types of CPI Log input schema:
#5.2 - Jupiter Aggregator v6: anchor Self CPI Log
def get_jup_v6_struct(ix_data, offset):
if ix_data[:offset] == 2133240923048723940:
# is for sure Jup Agg V6 CLI Log
if ix_data[offset:offset*2] == 16316831888147596864:
# is your struct
offset += 8
return cStruct(
"amm" / PUBLIC_KEY_LAYOUT,
"inputMint" / PUBLIC_KEY_LAYOUT,
"inputAmount" / Int64ul,
"outputMint" / PUBLIC_KEY_LAYOUT,
"outputAmount" / Int64ul,
), offset
if ix_data[offset:offset*2] == 15856564851427921737:
# is JUP_CLI_LOG_2 input struct
offset += 8
return cStruct(
"account" / PUBLIC_KEY_LAYOUT,
"mint" / PUBLIC_KEY_LAYOUT,
"amount" / Int64ul,
), offset
return None, None
first_discriminator_length = 8
ix_data = bytes.fromhex(hex_data)
jup_struct, offset = get_jup_v6_struct(ix_data, first_discriminator_length)
if not jup_struct:
return
# You must remove the two discriminators from your ix data (16 bytes), then decode the rest leftover bytes using the struct you defined above:
bytes_event = ix_data[offset:]
decoded_event = jup_struct.parse(bytes_event)
# Here is a dumb one-liner way to decode all args regardless of key/value into a dict:
output_dict = { key: base58.b58encode(value).decode('utf-8') if isinstance(value, bytes) else value for key, value in decoded_event.items() if key != "_io"} if decoded_event else {}
print(output_dict)
####
{
'account': 'JYgphiFW7jPkZqpcvpSLr5YqZp7BZDfVD4Wrgtt7duq',
'mint': 'So11111111111111111111111111111111111111112',
'amount': 5678
}
Update: Corrected ix_data[offset:]
to ix_data[:offset]
inside the get_jup_v6_struct.