PyWars
A public topic consumer for Chat Wars ... with steroids
π Motivations
- π€ Build a bot?
- π₯ Help comunity?
- π Personal training?
β Quick start
from PyWars import *
app = Client()
@app.agent(Deal)
async def deals(stream:Stream[Deal]):
async for deal in stream:
print(deal)
app.run()
π Overview
Dummy client creation
from PyWars import Client
app = Client()
This will create a client with an autogenerated id for chat wars v2 api
Specifiying client version
from PyWars import Client
app = Client(version=Client.Version.CW3)
This will create a client for chat wars v3 api
Adding agents
from PyWars import *
app = Client()
@app.agent(Deal)
async def deals(stream:Stream[Deal]):
async for deal in stream:
print(deal)
app.run()
- Deal for cw*-deals topic
- Duel for cw*-duels topic
- Offer for cw*-offers topic
- SexDigest for cw*-sex_digest topic
- YellowPage for cw*-yellow_pages topic
- AuctionDigest for cw*-au_digest topic
Adding timers
from PyWars import *
app = Client()
procesed_deals = 0
@app.agent(Deal)
async def deals(stream:Stream[Deal]):
async for deal in stream:
procesed_deals += 1
@app.timer(60)
async def print_procesed():
print(procesed_deals)
procesed_deals = 0
app.run()
Using executions loops
from PyWars import *
import asyncio
app = Client(loop=asyncio.get_event_loop())
@app.agent(Deal)
async def deals(stream:Stream[Deal]):
async for deal in stream:
print(deal)
try:
app.start()
finally:
app.stop()
The magical start and stop methods were thought to run and stop client with his execution loop smootly
Bulking
from PyWars import *
app = Client()
@app.agent(Deal)
async def deals(stream:Stream[Deal]):
async for bulk in stream.take(100, 10):
print(bulk)
app.run()
For extended documentation see for the docs page.