Source code for datastreams.datastream

from dataclasses import dataclass
from datastreams.streamer import Streamer
from functools import reduce
from subgrounds.subgrounds import Subgrounds
from pandas import DataFrame

[docs]@dataclass class DataStream: """ DataStream is used to manage multiple Streamer objects. In other words, you can manage multiple Subgraph queries with DataStream. #. Insert a list of endpoints, each one represents a Subgraph endpoint. """ endpointList: list[str] = None streamerDict: dict = None def __post_init__(self): self.makeStreamerDict()
[docs] def makeStreamerDict(self): """ generate streamerDict from endpointList """ self.streamerDict: dict = {} for endpoint in self.endpointList: # generate the name from the endpoint as the string seperated by the right most / name = endpoint.split('/')[-1] # create new Streamer object streamer = Streamer(endpoint) # add to streamerDict dict self.streamerDict[name] = streamer
[docs] def getCommonQueryKeys(self, streamerList: dict = None) -> list[str]: """ getCommonQueryKeys returns a list of query keys that are common to all Streamer objects. :param list[Streamer] streamerList: list of Streamer objects. Default is None, which will use all Streamer objects in streamerDict """ if streamerList: common_keys = list(reduce(set.intersection, [set(ds.queryDict.keys()) for ds in streamerList.values()])) else: common_keys = list(reduce(set.intersection, [set(ds.queryDict.keys()) for ds in self.streamerDict.values()])) return common_keys
[docs] def querySubgraphs(self, streamer_dict: dict, query_field: str) -> list[DataFrame]: """ query all Subgraphs in a streamer dictionary for a specific query field :param dict streamer_dict: dictionary of Streamer objects :param str query_field: query field to run """ data = [] #run query on streamer objects for ds in streamer_dict.values(): print(f'querying {ds.endpoint} for {query_field}...') # 2.15.23 TODO - construct new query fieldpath here? # run batch queries tokens_df = ds.runQuery(ds.queryDict[query_field]) data.append(tokens_df) return data
# # Subgrounds is unique for every DataStream? # def getDataStream(self, endpoint: str): # """ # :param str endpoint: graphql endpoint # getDataStream is a firehose that returns all data from a subgraph endpoint. # """ # # create new Subgrounds object # sub = Subgrounds() # # load subgraph data into Subgrounds # subgraph = sub.load_subgraph(endpoint) # # load Subgrounds data into Streamer # streamer = Streamer(sub, endpoint, subgraph)