In this tutorial, we create a clean, advanced demonstration of modern MCP design by focusing on three core ideas: stateless communication, strict SDK-level validation, and asynchronous, long-running operations. We implement a minimal MCP-like protocol using structured envelopes, signed requests, and pedantic-validating tools to show how agents and services can interact securely without relying on persistent sessions. check it out full code here.
import asyncio, time, json, uuid, hmac, hashlib
from dataclasses import dataclass
from typing import Any, Dict, Optional, Literal, List
from pydantic import BaseModel, Field, ValidationError, ConfigDict
def _now_ms():
return int(time.time() * 1000)
def _uuid():
return str(uuid.uuid4())
def _canonical_json(obj):
return json.dumps(obj, separators=(",", ":"), sort_keys=True).encode()
def _hmac_hex(secret, payload):
return hmac.new(secret, _canonical_json(payload), hashlib.sha256).hexdigest()
We’ve installed the core utilities required throughout the system, including time helpers, UUID generation, canonical JSON serialization, and cryptographic signatures. We ensure that all requests and responses can be definitively signed and verified using HMAC. check it out full code here.
class MCPEnvelope(BaseModel):
model_config = ConfigDict(extra="forbid")
v: Literal("mcp/0.1") = "mcp/0.1"
request_id: str = Field(default_factory=_uuid)
ts_ms: int = Field(default_factory=_now_ms)
client_id: str
server_id: str
tool: str
args: Dict(str, Any) = Field(default_factory=dict)
nonce: str = Field(default_factory=_uuid)
signature: str
class MCPResponse(BaseModel):
model_config = ConfigDict(extra="forbid")
v: Literal("mcp/0.1") = "mcp/0.1"
request_id: str
ts_ms: int = Field(default_factory=_now_ms)
ok: bool
server_id: str
status: Literal("ok", "accepted", "running", "done", "error")
result: Optional(Dict(str, Any)) = None
error: Optional(str) = None
signature: str
We define structured MCP envelopes and response formats that each interaction follows. We enforce a strict schema using pydantic to guarantee that malformed or unexpected fields are rejected early. This ensures a consistent contract between client and server, which is important for SDK standardization. check it out full code here.
class ServerIdentityOut(BaseModel):
model_config = ConfigDict(extra="forbid")
server_id: str
fingerprint: str
capabilities: Dict(str, Any)
class BatchSumIn(BaseModel):
model_config = ConfigDict(extra="forbid")
numbers: List(float) = Field(min_length=1)
class BatchSumOut(BaseModel):
model_config = ConfigDict(extra="forbid")
count: int
total: float
class StartLongTaskIn(BaseModel):
model_config = ConfigDict(extra="forbid")
seconds: int = Field(ge=1, le=20)
payload: Dict(str, Any) = Field(default_factory=dict)
class PollJobIn(BaseModel):
model_config = ConfigDict(extra="forbid")
job_id: str
We declare valid input and output models for each tool displayed by the server. We use pedantic constraints to clearly express what each device accepts and returns. This makes device behavior predictable and secure, even when invoked by LLM-powered agents. check it out full code here.
@dataclass
class JobState:
job_id: str
status: str
result: Optional(Dict(str, Any)) = None
error: Optional(str) = None
class MCPServer:
def __init__(self, server_id, secret):
self.server_id = server_id
self.secret = secret
self.jobs = {}
self.tasks = {}
def _fingerprint(self):
return hashlib.sha256(self.secret).hexdigest()(:16)
async def handle(self, env_dict, client_secret):
env = MCPEnvelope(**env_dict)
payload = env.model_dump()
sig = payload.pop("signature")
if _hmac_hex(client_secret, payload) != sig:
return {"error": "bad signature"}
if env.tool == "server_identity":
out = ServerIdentityOut(
server_id=self.server_id,
fingerprint=self._fingerprint(),
capabilities={"async": True, "stateless": True},
)
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status="ok",
result=out.model_dump(),
signature="",
)
elif env.tool == "batch_sum":
args = BatchSumIn(**env.args)
out = BatchSumOut(count=len(args.numbers), total=sum(args.numbers))
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status="ok",
result=out.model_dump(),
signature="",
)
elif env.tool == "start_long_task":
args = StartLongTaskIn(**env.args)
jid = _uuid()
self.jobs(jid) = JobState(jid, "running")
async def run():
await asyncio.sleep(args.seconds)
self.jobs(jid).status = "done"
self.jobs(jid).result = args.payload
self.tasks(jid) = asyncio.create_task(run())
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status="accepted",
result={"job_id": jid},
signature="",
)
elif env.tool == "poll_job":
args = PollJobIn(**env.args)
job = self.jobs(args.job_id)
resp = MCPResponse(
request_id=env.request_id,
ok=True,
server_id=self.server_id,
status=job.status,
result=job.result,
signature="",
)
payload = resp.model_dump()
resp.signature = _hmac_hex(self.secret, payload)
return resp.model_dump()
We implement a stateless MCP server with its async task management logic. We handle request validation, tool dispatch, and long-running task execution without relying on session state. By returning task identifiers and allowing voting, we demonstrate non-blocking, scalable task execution. check it out full code here.
class MCPClient:
def __init__(self, client_id, secret, server):
self.client_id = client_id
self.secret = secret
self.server = server
async def call(self, tool, args=None):
env = MCPEnvelope(
client_id=self.client_id,
server_id=self.server.server_id,
tool=tool,
args=args or {},
signature="",
).model_dump()
env("signature") = _hmac_hex(self.secret, {k: v for k, v in env.items() if k != "signature"})
return await self.server.handle(env, self.secret)
async def demo():
server_secret = b"server_secret"
client_secret = b"client_secret"
server = MCPServer("mcp-server-001", server_secret)
client = MCPClient("client-001", client_secret, server)
print(await client.call("server_identity"))
print(await client.call("batch_sum", {"numbers": (1, 2, 3)}))
start = await client.call("start_long_task", {"seconds": 2, "payload": {"task": "demo"}})
jid = start("result")("job_id")
while True:
poll = await client.call("poll_job", {"job_id": jid})
if poll("status") == "done":
print(poll)
break
await asyncio.sleep(0.5)
await demo()
We create a lightweight stateless client that signs each request and interacts with the server via structured envelopes. We demonstrate synchronous calls, input validation failures, and asynchronous task polling in a single flow. It shows how clients can reliably consume MCP-style services in real agent pipelines.
Finally, we show how MCP evolves from a simple tool-calling interface into a robust protocol suitable for real-world systems. We started tasks asynchronously and polled for results without blocking execution, enforced explicit contracts through schema validation, and relied on stateless, signed messages to maintain security and resiliency. Together, these patterns demonstrate how modern MCP-style systems support reliable, enterprise-ready agent workflows while remaining simple, transparent, and easy to extend.
check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.
Asif Razzaq Marktechpost Media Inc. Is the CEO of. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. Their most recent endeavor is the launch of MarketTechPost, an Artificial Intelligence media platform, known for its in-depth coverage of Machine Learning and Deep Learning news that is technically robust and easily understood by a wide audience. The platform boasts of over 2 million monthly views, which shows its popularity among the audience.