Skip to content

Examples

aiohttp

import asyncio
import logging

from aiohttp import ClientSession

from jsonrpcclient import Error, Ok, parse, request


async def main() -> None:
    """Handle async request"""
    async with ClientSession() as session:
        async with session.post(
            "http://localhost:5000", json=request("ping")
        ) as response:
            parsed = parse(await response.json())
            if isinstance(parsed, Ok):
                print(parsed.result)
            elif isinstance(parse, Error):
                logging.error(parsed.message)


asyncio.get_event_loop().run_until_complete(main())

See blog post.

Requests

import logging

import requests

from jsonrpcclient import Error, Ok, parse, request

response = requests.post("http://localhost:5000/", json=request("ping"), timeout=10)
parsed = parse(response.json())
if isinstance(parsed, Ok):
    print(parsed.result)
elif isinstance(parsed, Error):
    logging.error(parsed.message)

Websockets

import asyncio
import logging

from websockets.client import connect

from jsonrpcclient import Error, Ok, parse_json, request_json


async def main() -> None:
    """Handle request"""
    async with connect("ws://localhost:5000") as socket:
        await socket.send(request_json("ping"))
        response = parse_json(await socket.recv())

    if isinstance(response, Ok):
        print(response.result)
    elif isinstance(response, Error):
        logging.error(response.message)


asyncio.get_event_loop().run_until_complete(main())

See blog post.

ZeroMQ

import logging

import zmq

from jsonrpcclient import Ok, parse_json, request_json

socket = zmq.Context().socket(zmq.REQ)
socket.connect("tcp://localhost:5000")
socket.send_string(request_json("ping"))
response = parse_json(socket.recv().decode())
if isinstance(response, Ok):
    print(response.result)
else:
    logging.error(response.message)

See blog post.