101 lines
4.0 KiB
Python
101 lines
4.0 KiB
Python
import asyncio, websockets,time,json, requests, rethinkdb, aiohttp, sys, os
|
|
from rethinkdb import r
|
|
|
|
import raw_data
|
|
|
|
r.set_loop_type("asyncio")
|
|
|
|
STOP = False
|
|
STOPPED = False
|
|
|
|
class PlaceScraper:
|
|
db = "place"
|
|
imgtable = "img"
|
|
wstable = "ws"
|
|
token = "ff"
|
|
async def add_to_db(self, db, table, data):
|
|
while True:
|
|
try:
|
|
conn = await r.connect()
|
|
d = await r.db(db).table(table).insert(data).run(conn)
|
|
await conn.close(False)
|
|
return d
|
|
except Exception as ename:
|
|
print(f"Retrying DB transaction... ({ename})")
|
|
try:
|
|
await conn.close()
|
|
except NameError:
|
|
pass
|
|
await asyncio.sleep(2)
|
|
async def download_and_add_to_db(self, url, sesh):
|
|
while True:
|
|
try:
|
|
async with sesh.get(url) as resp:
|
|
data = await resp.read()
|
|
except Exception as ename:
|
|
print(f"Retrying {url} ({ename})...")
|
|
await asyncio.sleep(10)
|
|
else:
|
|
break
|
|
return await self.add_to_db(
|
|
self.db,
|
|
self.imgtable,
|
|
{"time": time.time(), "url": url, "data": data}
|
|
)
|
|
async def regen_token(self, sesh):
|
|
print("Regenerating token...")
|
|
async with sesh.get("https://reddit.com/r/place", headers={"User-Agent": "Mozilla"}) as f:
|
|
a = await f.text()
|
|
i = a.index('''"session":{"accessToken":"''')
|
|
print(f"Regen-Token {a[i : i + 57]}")
|
|
await self.add_to_db(self.db, self.wstable, {"data": a, "type": "newToken"})
|
|
self.token = a[i + 26 : i + 57] # thanks JAA for stopping me from using my .split() fuckery
|
|
async def _start_connection(self, ws):
|
|
send1 = raw_data.connection_init % self.token
|
|
send2 = raw_data.id1
|
|
send3 = raw_data.id2
|
|
send4 = raw_data.id3
|
|
send5 = raw_data.id4
|
|
send6 = raw_data.id5
|
|
for send in send1, send2, send3, send4, send5:
|
|
await ws.send(send)
|
|
await self.add_to_db(self.db, "sent_ws", {"sent": send, "time": time.time()})
|
|
async def run_forever(self):
|
|
global STOP, STOPPED
|
|
prev = {}
|
|
cert = False
|
|
if type(os.environ.get("DISABLE_CERT_VERIF")) is None:
|
|
cert = True
|
|
connector = aiohttp.TCPConnector(limit=50, verify_ssl=cert)
|
|
async with websockets.connect("wss://gql-realtime-2.reddit.com/query", extra_headers=[["Origin", "https://hot-potato.reddit.com"], ["User-Agent", "Mozilla"]]) as websocket:
|
|
await self._start_connection(websocket)
|
|
async with aiohttp.ClientSession(connector=connector, trust_env=True) as sesh:
|
|
while True:
|
|
try:
|
|
response = await websocket.recv()
|
|
except websockets.exceptions.ConnectionClosedOK:
|
|
await self.regen_token(sesh)
|
|
print(raw_data.connection_init % self.token)
|
|
await self.run_forever()
|
|
print(response)
|
|
await self.add_to_db(self.db, self.wstable, {"time":time.time(), "data": response})
|
|
try:
|
|
await self.download_and_add_to_db(json.loads(response)["payload"]["data"]["subscribe"]["data"]["name"], sesh=sesh) #dude why is it so LONG
|
|
except KeyError:
|
|
# probably one of the first few responses; skip
|
|
pass
|
|
if STOP:
|
|
STOPPED = True
|
|
break #, drop and roll
|
|
print("\tSTOP: Exited gracefully after CtrlC.")
|
|
client = PlaceScraper()
|
|
while True:
|
|
try:
|
|
asyncio.run(client.run_forever())
|
|
except (requests.exceptions.SSLError, requests.exceptions.ConnectionError, ConnectionResetError) as ename:
|
|
print(f"Caught {ename}")
|
|
except KeyboardInterrupt:
|
|
STOP = True
|
|
if STOPPED:
|
|
sys.exit(2)
|