import asyncio, websockets,time,json, requests, rethinkdb, aiohttp, sys, os from rethinkdb import r import raw_data r.set_loop_type("asyncio") STOP = False STOPPED = False class PlaceScraper: db = "place" imgtable = "img" wstable = "ws" token = "ff" async def add_to_db(self, db, table, data): while True: try: conn = await r.connect() d = await r.db(db).table(table).insert(data).run(conn) await conn.close(False) return d except Exception as ename: print(f"Retrying DB transaction... ({ename})") try: await conn.close() except NameError: pass await asyncio.sleep(2) async def download_and_add_to_db(self, url, sesh): while True: try: async with sesh.get(url) as resp: data = await resp.read() except Exception as ename: print(f"Retrying {url} ({ename})...") await asyncio.sleep(10) else: break return await self.add_to_db( self.db, self.imgtable, {"time": time.time(), "url": url, "data": data} ) async def regen_token(self, sesh): print("Regenerating token...") async with sesh.get("https://reddit.com/r/place", headers={"User-Agent": "Mozilla"}) as f: a = await f.text() i = a.index('''"session":{"accessToken":"''') print(f"Regen-Token {a[i : i + 57]}") await self.add_to_db(self.db, self.wstable, {"data": a, "type": "newToken"}) self.token = a[i + 26 : i + 57] # thanks JAA for stopping me from using my .split() fuckery async def _start_connection(self, ws): send1 = raw_data.connection_init % self.token send2 = raw_data.id1 send3 = raw_data.id2 send4 = raw_data.id3 send5 = raw_data.id4 send6 = raw_data.id5 for send in send1, send2, send3, send4, send5: await ws.send(send) await self.add_to_db(self.db, "sent_ws", {"sent": send, "time": time.time()}) async def run_forever(self): global STOP, STOPPED prev = {} cert = False if type(os.environ.get("DISABLE_CERT_VERIF")) is None: cert = True connector = aiohttp.TCPConnector(limit=50, verify_ssl=cert) async with websockets.connect("wss://gql-realtime-2.reddit.com/query", extra_headers=[["Origin", "https://hot-potato.reddit.com"], ["User-Agent", "Mozilla"]]) as websocket: await self._start_connection(websocket) async with aiohttp.ClientSession(connector=connector, trust_env=True) as sesh: while True: try: response = await websocket.recv() except websockets.exceptions.ConnectionClosedOK: await self.regen_token(sesh) print(raw_data.connection_init % self.token) await self.run_forever() print(response) await self.add_to_db(self.db, self.wstable, {"time":time.time(), "data": response}) try: await self.download_and_add_to_db(json.loads(response)["payload"]["data"]["subscribe"]["data"]["name"], sesh=sesh) #dude why is it so LONG except KeyError: # probably one of the first few responses; skip pass if STOP: STOPPED = True break #, drop and roll print("\tSTOP: Exited gracefully after CtrlC.") client = PlaceScraper() while True: try: asyncio.run(client.run_forever()) except (requests.exceptions.SSLError, requests.exceptions.ConnectionError, ConnectionResetError) as ename: print(f"Caught {ename}") except KeyboardInterrupt: STOP = True if STOPPED: sys.exit(2)