Documentation

Module contents

The Streamer class

class datastreams.streamer.Streamer(endpoint=None, subgraph=None, sub=None, data=None, schema=None, queryFields=None, queryStrs=None, queryDict=None)[source]

Bases: object

Streamer is a query utility class that makes queries easier to define and build queries using Subgrounds functions. Streamer makes it easier to define query paths by introducing helper functions that expose commonly used Subgrounds functions. Each Streamer relates to a single subgraph. Introducing these helper functions allows Streamer to queue up multiple queries of different parts of a schema at once.

Parameters:
  • sub (Subgrounds) – Subgrounds object. Default is None

  • endpoint (str) – graphql endpoint. Default is None

  • subgraph (Subgraph) – Subgraph object. Default is None

  • data (list) – list of dataframes. Default is None

  • schema (list) – list of schema objects. Default is None

  • queryFields (list) – list of queryable fields. Default is None

  • queryStrs (list) – list of query strings. Default is None

  • queryDict (dict) – dictionary of query strings. Default is None

endpoint: str = None
subgraph: Subgraph = None
sub: Subgrounds = None
data: list = None
schema: list = None
queryFields: list = None
queryStrs: list[str] = None
queryDict: dict = None
setupStreamer()[source]

run this function when Streamer is initialized. This function initializes a Subgrounds object and loads a Subgraph object into Subgrounds. Since this is needed before doing anything with Subgrounds, this is done automatically at start to remove additional dependencies from using Streamer.

filterQueryFieldStrs()[source]

filterQueryFieldStrs() is an inner helper function run at startup that stores a str representation of the queryable fields.

Returns:

list[str] of queryable fields

Return type:

list[str]

filterQueryFields()[source]

filterQueryFields() is an inner helper function run at startup that stores a FieldPath representation of the queryable fields.

Returns:

list[FieldPath] of queryable fields

Return type:

list[subgrounds.subgraph.fieldpath.FieldPath]

getFieldPath(field, operation='Query')[source]

getFieldPath converts a string to a FieldPath object. In a Subgrounds query, the format follows subgrounds.schema.FieldPath.

Parameters:
  • field (str) – Enter the string that will be converted to a FieldPath

  • operation (str) – Enter one of the following - ‘Query’, ‘Mutation’, or ‘Subscription’. Default is ‘Query’ because that is most commonly used.

Returns:

FieldPath object

Return type:

FieldPath

getSubgraphSchema()[source]

getSubgraphSchema gets the schema list from a Subgraph.

Returns:

schema list from a Subgraph

Return type:

list[str]

getSchemaFields(schema_object)[source]

getSubgraphField gets a fields list from a subgraph schema.

Parameters:
  • schema_object (str) – Schema object name to get fields list from

  • operation (str) – Enter one of the following - ‘Query’, ‘Mutation’, or ‘Subscription’. Default is ‘Query’ because that is most commonly used.

Returns:

strings field list from a Subgraph schema

Return type:

list[str]

getQueryFields()[source]

Get all queryable fields from the subgraph schema.

Returns:

list[str] of queryable fields from the subgraph schema

Return type:

list[str]

getQueryDict()[source]

getQueryDict() is an inner helper function run at startup that stores a dictionary of queryable fields in {str: FieldPath} format. This function allows the subgraph schema to be searchable by dictionary keys.

Returns:

dict of queryable fields in {str: FieldPath} format

getFieldPathQueryCols(fieldpath)[source]

getFieldPathQueryCols() returns a list of columns available to query.

Parameters:

fieldpath (FieldPath) – FieldPath object

Returns:

list[str] of queryable fields

Return type:

list[str]

getQueryCols(fieldpath, col_query_list)[source]

getQueryCols() converts a query column list to a dictionary object. The dictionary keys are the string names of the fieldpath query columns. The dictionary values are the fieldpath values that correpsond to the key string names.

Parameters:
  • fieldpath (FieldPath) – FieldPath object

  • col_query_list (list[str]) – list of columns to query within a FieldPath schema object

Return type:

dict

runQuery(query_field)[source]

runQuery() executes the Subgrounds query_df() function to query the Subgraph and return a DataFrame.

Parameters:
  • query_field (FieldPath) – FieldPath object

  • where (dict) – where is a dictionary that specifies query searches. Default is None.

Returns:

DataFrame object

Return type:

DataFrame

runStreamerLoop(query_field_list, where=None)[source]

runStreamer() runs through a list of queryable fields list and returns a list of Subgraph query dataframes.

Parameters:
  • query_field_list (list[FieldPath]) – list of FieldPath objects

  • where (dict) – where is a dictionary conditional that specifies query searches. Default is None.

Returns:

list of dataframes

Return type:

list[pandas.core.frame.DataFrame]

The DataStream class

class datastreams.datastream.DataStream(endpointList=None, streamerDict=None)[source]

Bases: object

DataStream is used to manage multiple Streamer objects. In other words, you can manage multiple Subgraph queries with DataStream.

  1. Insert a list of endpoints, each one represents a Subgraph endpoint.

Parameters:
  • endpointList (list[str]) –

  • streamerDict (dict) –

endpointList: list[str] = None
streamerDict: dict = None
makeStreamerDict()[source]

generate streamerDict from endpointList

getCommonQueryKeys(streamerList=None)[source]

getCommonQueryKeys returns a list of query keys that are common to all Streamer objects.

Parameters:

streamerList (list[Streamer]) – list of Streamer objects. Default is None, which will use all Streamer objects in streamerDict

Return type:

list[str]

querySubgraphs(streamer_dict, query_field)[source]

query all Subgraphs in a streamer dictionary for a specific query field

Parameters:
  • streamer_dict (dict) – dictionary of Streamer objects

  • query_field (str) – query field to run

Return type:

list[pandas.core.frame.DataFrame]