from spyql.nulltype import Null
[docs]class OutputHandler:
"""Mediates data processing with data writting"""
[docs] @staticmethod
def make_handler(prs):
"""
Chooses the right handler depending on the kind of query
and eventual optimization opportunities
"""
if prs["group by"] and not prs["partials"]:
return GroupByDelayedOutSortAtEnd(
prs["order by"], prs["limit"], prs["offset"]
)
if prs["order by"]:
# TODO optimization: use special handler that only keeps the top n elements
# in memory when LIMIT is defined
if prs["distinct"]:
return DistinctDelayedOutSortAtEnd(
prs["order by"], prs["limit"], prs["offset"]
)
return DelayedOutSortAtEnd(prs["order by"], prs["limit"], prs["offset"])
if prs["distinct"]:
return LineInDistinctLineOut(prs["limit"], prs["offset"])
return LineInLineOut(prs["limit"], prs["offset"])
def __init__(self, limit, offset):
self.limit = limit
self.rows_written = 0
self.offset = offset if offset else 0
[docs] def set_writer(self, writer):
self.writer = writer
[docs] def handle_result(self, result, group_key, sort_keys):
"""
To be implemented by child classes to handle a new output row (aka result).
All inputs should be tuples.
"""
return self.is_done()
[docs] def is_done(self):
# premature ending
return self.limit is not None and self.rows_written >= self.limit
[docs] def write(self, row):
if self.offset > 0:
self.offset = self.offset - 1
else:
self.writer.writerow(row)
self.rows_written = self.rows_written + 1
[docs] def finish(self):
self.writer.flush()
[docs]class LineInLineOut(OutputHandler):
"""Simple handler that immediately writes every processed row"""
[docs] def handle_result(self, result, *_):
self.write(result)
return self.is_done()
[docs] def finish(self):
super().finish()
[docs]class LineInDistinctLineOut(OutputHandler):
"""In-memory distinct handler that immediately writes every non-duplicated row"""
def __init__(self, limit, offset):
super().__init__(limit, offset)
self.output_rows = set()
[docs] def handle_result(self, result, *_):
# uses a dict to store distinct results instead of storing all rows
if result in self.output_rows:
return False # duplicate
self.output_rows.add(result)
self.write(result)
return self.is_done()
[docs] def finish(self):
super().finish()
[docs]class DelayedOutSortAtEnd(OutputHandler):
"""
Only writes after collecting and sorting all data.
Temporary implementation that reads every processed row into memory.
"""
def __init__(self, orderby, limit, offset):
super().__init__(limit, offset)
self.orderby = orderby
self.output_rows = []
[docs] def handle_result(self, result, sort_keys, *_):
self.output_rows.append({"data": result, "sort_keys": sort_keys})
# TODO use temporary files to write `output_rows` whenever it gets too large
# TODO sort intermediate results before writing to a temporary file
return False # no premature endings here
[docs] def finish(self):
# TODO read and merge previously sorted temporary files (look into heapq.merge)
# 1. sorts everything
if self.orderby:
for i in reversed(range(len(self.orderby))):
# taking advantage of list.sort being stable to sort elements from minor
# to major criteria (not be the most efficient way but straightforward)
self.output_rows.sort(
key=lambda row: (
# handle of NULLs based on NULLS FIRST/LAST specification
(row["sort_keys"][i] is Null) != self.orderby[i]["rev_nulls"],
row["sort_keys"][i],
),
reverse=self.orderby[i]["rev"], # handles ASC/DESC order
)
# 2. writes sorted rows to output
for row in self.output_rows:
# it would be more efficient to slice `output_rows` based on limit/offset
# however, this is more generic with less repeated logic and this is a
# temporary implementation
if self.is_done():
break
self.write(row["data"])
super().finish()
[docs]class GroupByDelayedOutSortAtEnd(DelayedOutSortAtEnd):
"""
Extends `DelayedOutSortAtEnd` to only store intermediate group by results instead of
keeping all rows in memory
"""
def __init__(self, orderby, limit, offset):
super().__init__(orderby, limit, offset)
self.output_rows = dict()
[docs] def handle_result(self, result, sort_keys, group_key):
# uses a dict to store intermidiate group by results instead of storing all rows
self.output_rows[group_key] = {"data": result, "sort_keys": sort_keys}
return False # no premature endings here
[docs] def finish(self):
# converts output_rows dict to list so that it can be sorted and written
self.output_rows = list(self.output_rows.values())
super().finish()
[docs]class DistinctDelayedOutSortAtEnd(DelayedOutSortAtEnd):
"""
Alters `DelayedOutSortAtEnd` to only store distinct results instead of
keeping all rows in memory
"""
def __init__(self, orderby, limit, offset):
super().__init__(orderby, limit, offset)
self.output_rows = dict()
[docs] def handle_result(self, result, sort_keys, *_):
# uses a dict to store distinct results instead of storing all rows
if result not in self.output_rows:
self.output_rows[result] = sort_keys
return False # no premature endings here
[docs] def finish(self):
# converts output_rows dict to list so that it can be sorted and written
self.output_rows = [
{"data": k, "sort_keys": v} for k, v in self.output_rows.items()
]
super().finish()