본문 바로가기
Hack/Cryptocurrency

Avalanche(AVAX) RPC 사용 방법

by Becoming a Hacker 2023. 2. 19.
반응형

※ 1초에 5회 이상 Request 전송 시 Rate Limit 있음

 

기본 구문

import requests
import json

class AvalancheAPI:
    def __init__(self, rpc_url):
        self.rpc_url = rpc_url

    def get_response(self, method, params):
        response = requests.post(
            self.rpc_url,
            data=json.dumps({
                "jsonrpc": "2.0",
                "id": 1,
                "method": method,
                "params": params,
            }),
            headers={'content-type': 'application/json'}
        ).json()
        return response

if __name__ == '__main__':
    # AVAX 노드 RPC 엔드포인트 URL
    rpc_url = "https://api.avax.network/ext/bc/X"

    avax_api = AvalancheAPI(rpc_url)

 

assetID 확인

import requests
import json

class AvalancheAPI:
    def __init__(self, rpc_url):
        self.rpc_url = rpc_url

    def get_response(self, method, params):
        response = requests.post(
            self.rpc_url,
            data=json.dumps({
                "jsonrpc": "2.0",
                "id": 1,
                "method": method,
                "params": params,
            }),
            headers={'content-type': 'application/json'}
        ).json()
        return response

    def get_AssetDescription(self, assetId):
        response = self.get_response(
            "avm.getAssetDescription",
            {
                "assetID": assetId
            }
        )
        return response

if __name__ == '__main__':
    # AVAX 노드 RPC 엔드포인트 URL
    rpc_url = "https://api.avax.network/ext/bc/X"

    avax_api = AvalancheAPI(rpc_url)
    data = avax_api.get_AssetDescription("AVAX")
    print(data)

 

특정 Accout에 포함된 txid 조회

import requests
import json
import time


class AvalancheAPI:
    def __init__(self, rpc_url):
        self.rpc_url = rpc_url

    def get_response(self, method, params):
        response = requests.post(
            self.rpc_url,
            data=json.dumps({
                "jsonrpc": "2.0",
                "id": 1,
                "method": method,
                "params": params,
            }),
            headers={'content-type': 'application/json'}
        ).json()
        return response

    def get_tx(self, tx_id):
        response = self.get_response(
            "avm.getTx",
            {
                "txID": tx_id,
                "encoding": "json",
            }
        )
        return response

    def get_balance(self, address, asset_id):
        response = self.get_response(
            "avm.getBalance",
            {
                "address": address,
                "assetID": asset_id,
            }
        )
        return response


if __name__ == '__main__':
    # AVAX 노드 RPC 엔드포인트 URL
    rpc_url = "https://api.avax.network/ext/bc/X"

    avax_api = AvalancheAPI(rpc_url)

    # 출력 목록에서 SECP256K1 Transfer Output을 찾음
    rep = avax_api.get_balance("X-avax1pnpgcx4asflpjc0zphfn62hxdf34kc6w8csetz",
                               "FvwEAhmxKfeiG8SnEvq42hc6whRyY3EFYAvebMqDNDGCgxN5Z")
    tx_ids = rep["result"]["utxoIDs"]

    for tx_id in tx_ids:
        tx = tx_id["txID"]
        print(tx)
        rep = avax_api.get_tx(tx)
        outputs = rep["result"]["tx"]["unsignedTx"]["outputs"]

        for output in outputs:
            if output["output"]["threshold"] > 1:
                print(output)

        time.sleep(1)

 

댓글