import itertools
import json
from io import BytesIO, StringIO
import pandas as pd
import panel as pn
import param
import yaml
import typing
from typing import Union
from . import settings
from .eve_model import EveModelBase
from .field import SUPPORTED_SCHEMA_FIELDS, Validator
from .http_client import DEFAULT_HTTP_CLIENT, EveHttpClient
from .item import EveItem
from .page import EvePage, EvePageCache, PageZero
from .io import FILE_READERS, read_file
[docs]class EveResource(EveModelBase):
"""
Interface for an Eve resource.
Should be instantiated using an Eve resource definition:
EveResource.from_resource_def(definition, name)
Inheritance:
EveModelBase:
"""
_http_client = param.ClassSelector(EveHttpClient, precedence=-1)
_url = param.String(precedence=-1)
_page_view_format = param.Selector(objects=["Table", "Widgets", "JSON"],
default=settings.DEFAULT_VIEW_FORMAT,
label="Display Format",
precedence=1)
upload_errors = param.List(default=[])
_resource = param.Dict(default={}, constant=True, precedence=-1)
_schema = param.Dict(default={}, constant=True, precedence=-1)
_cache = param.ClassSelector(class_=EvePageCache, default=EvePageCache())
_item_class = param.ClassSelector(EveItem,
is_instance=False,
precedence=-1)
_upload_buffer = param.List(default=[], precedence=-1)
_file_buffer = param.ClassSelector(bytes)
_filename = param.String()
selection = param.ListSelector(default=[], objects=[], precedence=-1)
filters = param.Dict(default={}, doc="Filters")
columns = param.List(default=[], precedence=1)
items_per_page = param.Integer(default=10,
label="Items per page",
precedence=1)
_prev_page_button = param.Action(lambda self: self.decrement_page(),
label="\u23EA",
precedence=1)
page_number = param.Integer(default=0,
bounds=(0, None),
label="",
doc="Page number",
precedence=2)
_reload_page_button = param.Action(lambda self: self.reload_page(),
label="\u21BB",
precedence=3)
_next_page_button = param.Action(lambda self: self.increment_page(),
label="\u23E9",
precedence=4)
[docs] @classmethod
def from_resource_def(cls,
resource_def: dict,
resource_name: str,
http_client: EveHttpClient = None):
"""Generate a resource interface from a Eve resource definition.
Args:
resource_def (dict): Eve resource definition
resource_name (str): Name to use for this resource
http_client (EveHttpClient, optional): http client to use. Defaults to None.
Returns:
EveResource: Interface to the remote resource.
"""
resource = dict(resource_def)
schema = resource.pop("schema")
if http_client is None:
http_client = DEFAULT_HTTP_CLIENT()
item = EveItem.from_schema(resource["item_title"].replace(" ", "_"),
schema,
resource["url"],
http_client=http_client)
item_class = item.__class__
params = dict(name=resource["resource_title"].replace(" ", "_"),
_url=resource["url"],
_http_client=http_client,
_item_class=item_class,
_resource=resource,
_schema=schema,
columns=list(schema))
return cls(**params)
def __getitem__(self, key):
if key in self._cache:
return self._cache[key]
data = self._http_client.get("/".join([self._url, key]))
if data:
item = self.make_item(**data)
return item
raise KeyError
def __setitem__(self, key, value):
self._cache[key] = value
[docs] def make_item(self, **kwargs):
"""Generate EveItem from key value pairs
Returns:
EveItem: EveItem instance that enforces schema of current resource.
"""
return self._item_class(**kwargs, _http_client=self._http_client)
@property
def projection(self):
return {k: 1 for k in self.columns if k not in settings.META_COLUMNS}
[docs] def read_clipboard(self):
from pandas.io.clipboard import clipboard_get
try:
self._upload_buffer = self.filter_docs(json.loads(clipboard_get()))
except Exception as e:
print(e)
[docs] def read_file(self, f: typing.BinaryIO=None, ext: str="csv"):
"""Read file into the upload buffer.
Args:
f (File, optional): file like object. Defaults to None.
ext (str, optional): file extension. Defaults to "csv".
Returns:
list: documents read
"""
if f is None:
f = open(f, "rb")
data = read_file(f, ext)
self._upload_buffer = self.filter_docs(data)
return data
@param.depends("_file_buffer", watch=True)
def _read_file_buffer(self):
sio = BytesIO(self._file_buffer)
_,_, ext = self._filename.rpartition(".")
self.read_file(sio, ext)
[docs] def filter_docs(self, docs):
if isinstance(docs, dict):
docs = [docs]
return [{k: v
for k, v in doc.items() if k in self._schema} for doc in docs]
@property
def gui(self):
return self.panel()
@property
def df(self):
return self.to_dataframe()
[docs] def keys(self):
for page in self.pages():
yield from page.keys()
[docs] def values(self):
for page in self.pages():
yield from page.values()
[docs] def items(self):
for page in self.pages():
yield from page.items()
[docs] def records(self):
for page in self.pages():
yield from page.records()
[docs] def pages(self, start=1, end=None):
for idx in itertools.count(start):
if end is not None and idx > end:
break
page = self.get_page(idx)
if not len(page):
break
yield page
[docs] def new_item(self, data={}):
item = self.item_class(**data)
self[item._id] = item
[docs] def to_records(self):
return [item.to_dict() for item in self.values()]
[docs] def to_dataframe(self):
df = pd.concat([page.to_dataframe() for page in self.values()])
if "_id" in df.columns:
df = df.set_index("_id")
return df
[docs] def pull(self, start=1, end=None):
for idx in itertools.count(start):
if end is not None and idx > end:
break
if not self.pull_page(idx):
break
[docs] def push(self, idxs=None):
if idxs is None:
idxs = list(self._cache.keys())
for idx in idxs:
self._cache[idx].push()
[docs] def find(self, query={}, projection={}, max_results=25, page_number=1):
"""Find documents in the remote resource that match a mongodb query.
Args:
query (dict, optional): Mongo query. Defaults to {}.
projection (dict, optional): Mongo projection. Defaults to {}.
max_results (int, optional): Items per page. Defaults to 25.
page_number (int, optional): page to return if query returns more than max_results.\
Defaults to 1.
Returns:
list: requested page documents that match query
"""
resp = self._http_client.get(self._url,
where=json.dumps(query),
projection=json.dumps(projection),
max_results=max_results,
page=page_number)
docs = []
if resp and "_items" in resp:
docs = resp["_items"]
return docs
[docs] def find_page(self,
query={},
projection={},
max_results=25,
page_number=1):
"""Same as find() only returns an EvePage instance
Args:
query (dict, optional): [description]. Defaults to {}.
projection (dict, optional): [description]. Defaults to {}.
max_results (int, optional): [description]. Defaults to 25.
page_number (int, optional): [description]. Defaults to 1.
Returns:
EvePage: requested page from all items that match the query
"""
docs = self.find(query=query,
projection=projection,
max_results=max_results,
page_number=page_number)
items = [self.make_item(**doc) for doc in docs]
page = EvePage(
name=f'{self._url.replace("/", ".")} page {page_number}',
_items={item._id: item
for item in items},
_columns=self.columns)
return page
[docs] def find_df(self, query={}, projection={}, max_results=25, page_number=1):
"""Same as find() only returns a pandas dataframe
Args:
query (dict, optional): [description]. Defaults to {}.
projection (dict, optional): [description]. Defaults to {}.
max_results (int, optional): [description]. Defaults to 25.
page_number (int, optional): [description]. Defaults to 1.
Returns:
[type]: [description]
"""
page = self.find_page(query=query,
projection=projection,
max_results=max_results,
page_number=page_number)
df = page.to_dataframe()
if "_id" in df.columns:
df = df.set_index("_id")
return df
[docs] def find_one(self, query: dict={}, projection: dict={}):
docs = self.find(query=query, projection=projection, max_results=1)
if docs:
return docs[0]
[docs] def find_one_item(self, query={}, projection={}):
doc = self.find_one(query=query, projection=projection)
if doc:
return self.make_item(**doc)
[docs] def validate_documents(self, docs):
schema = {
name:
{k: v
for k, v in field.items() if k in SUPPORTED_SCHEMA_FIELDS}
for name, field in self._schema.items()
}
v = Validator(schema)
valid = []
rejected = []
errors = []
for doc in docs:
if v.validate(doc):
valid.append(doc)
else:
rejected.append(doc)
errors.append(v.errors)
return valid, rejected, errors
[docs] def post(self, docs):
data = json.dumps(docs)
return self._http_client.post(self._url, data=data)
[docs] def insert_documents(self, docs: Union[list, tuple, dict], validate=True) -> tuple:
"""Insert documents into the database
Args:
docs (list): Documents to insert.
validate (bool, optional): whether to validate schema of docs locally. Defaults to True.
Raises:
TypeError: raised if docs is not the correct type.
Returns:
tuple[list, list, list]: Successfuly inserted, rejected, rejection reasons.
"""
if isinstance(docs, dict):
docs = [docs]
elif isinstance(docs, (tuple,list)):
docs = list(docs)
else:
raise TypeError("Documents must be list/tuple or dict")
if validate:
docs, rejected, errors = self.validate_documents(docs)
else:
rejected, errors = [], []
if docs and self.post(docs):
success = docs
else:
success = []
return success, rejected, errors
[docs] def clear_upload_buffer(self):
self._upload_buffer = []
[docs] def flush_buffer(self):
success, rejected, errors = self.insert_documents(self._upload_buffer)
self._upload_buffer = rejected
self.upload_errors = [str(err) for err in errors]
return success
[docs] def pull_page(self, idx=0):
if not idx:
self._cache[idx] = PageZero()
return False
page = self.find_page(query=self.filters,
projection=self.projection,
max_results=self.items_per_page,
page_number=idx)
if page._items:
self._cache[idx] = page
return True
return False
[docs] def push_page(self, idx):
if not idx in self._cache or len(self._cache[idx]):
return
self._cache[idx].push()
[docs] def get_page(self, idx):
if idx not in self._cache or not len(self._cache[idx]):
self.pull_page(idx)
return self._cache.get(
idx, EvePage(name="Place holder", _columns=self.columns))
[docs] def get_page_records(self, idx):
return self.get_page(idx).to_records()
[docs] def get_page_df(self, idx):
df = self.get_page(idx).to_dataframe()
df = df[[col for col in self.columns if col in df.columns]]
if "_id" in df.columns:
df = df.set_index("_id")
return df
[docs] def increment_page(self):
self.page_number = self.page_number + 1
[docs] def next_page(self):
self.increment_page()
return self.current_page()
[docs] def current_page(self):
return self.get_page(self.page_number)
[docs] def decrement_page(self):
if self.page_number > 1:
try:
self.page_number = self.page_number - 1
except:
pass
[docs] def previous_page(self):
self.decrement_page()
return self.current_page()
[docs] @param.depends("items_per_page",
"filters",
"columns",
watch=True)
def clear_cache(self):
self._cache = EvePageCache()
[docs] def reload_page(self, page_number=None):
if page_number is None:
page_number = self.page_number
self.pull_page(page_number)
[docs] def remove_item(self, _id: str) -> bool:
return self[_id].delete()
[docs] def filter(self, **filters):
return self.clone(filters=filters)
[docs] def project(self, **projection):
value_sum = sum(list(projection.values()))
if value_sum==0:
columns = [c for c in self._schema if c not in projection]
elif value_sum==len(projection):
columns = list(projection)
else:
raise ValueError("Mongo projections can either be inclusive or exclusive but not both.")
return self.clone(columns=columns)
[docs] @param.depends("_upload_buffer")
def upload_view(self):
clear_button = pn.widgets.Button(name="Clear buffer",
button_type="warning",
width=int(settings.GUI_WIDTH / 4))
clear_button.on_click(lambda event: self.clear_upload_buffer())
upload_file = pn.widgets.FileInput(accept=",".join(
[f".{ext}" for ext in FILE_READERS]),
width=int(settings.GUI_WIDTH / 4))
upload_file.link(self, filename="_filename", value="_file_buffer")
upload_file_button = pn.widgets.Button(name="Read file",
button_type="primary",
width=int(settings.GUI_WIDTH /
4))
upload_file_button.on_click(lambda event: self._read_file_buffer())
upload_preview = pn.pane.JSON(self._upload_buffer,
name='Upload Buffer',
height=int(settings.GUI_HEIGHT / 2),
width=int(settings.GUI_WIDTH),
theme="light")
upload_button = pn.widgets.Button(name="Insert to DB",
button_type="success",
width=int(settings.GUI_WIDTH / 4))
upload_button.on_click(lambda event: self.flush_buffer())
read_clipboard_button = pn.widgets.Button(name="Read Clipboard",
button_type="primary",
width=int(
settings.GUI_WIDTH / 4))
read_clipboard_button.on_click(lambda event: self.read_clipboard())
first_row_buttons = pn.Row(upload_file, read_clipboard_button)
second_row_buttons = pn.Row(clear_button, upload_button)
input_buttons = pn.Column(first_row_buttons, second_row_buttons)
upload_view = pn.Column(input_buttons, self.upload_errors,
"### Upload buffer", upload_preview)
return upload_view
[docs] @param.depends("page_number", "_cache", "_page_view_format")
def current_page_view(self):
page = self.get_page(self.page_number)
if page is None:
return pn.panel(f"## No data for page {self.page_number}.")
return getattr(page,
self._page_view_format.lower() + "_view", page.panel)()
[docs] @param.depends("upload_errors")
def upload_errors_view(self):
alerts = [
pn.pane.Alert(err, alert_type="danger")
for err in self.upload_errors
]
return pn.Column(*alerts,
height=50,
width=int(settings.GUI_WIDTH - 30))
[docs] def make_panel(self, show_client=True, tabs_location='above'):
buttons = pn.Param(self.param,
parameters=[
"_prev_page_button", "page_number",
"_reload_page_button", "_next_page_button"
],
default_layout=pn.Row,
name="",
width=int(settings.GUI_WIDTH / 3))
column_select = pn.Param(self.param.columns,
width=int(settings.GUI_WIDTH - 30),
widgets={
"columns": {
"type": pn.widgets.MultiChoice,
"options": list(self._schema) +
settings.META_COLUMNS,
"width": int(settings.GUI_WIDTH - 30)
}
})
page_settings = pn.Column(pn.Row(self.param.items_per_page,
self.param.filters,
self.param._page_view_format,
width=int(settings.GUI_WIDTH - 50)),
column_select,
width=int(settings.GUI_WIDTH - 10))
if show_client:
page_settings.append("### HTTP client")
page_settings.append(pn.layout.Divider())
page_settings.append(self._http_client.panel)
header = pn.Row(
f"## {self.name} resource",
pn.Spacer(sizing_mode='stretch_both'),
buttons,
pn.Spacer(sizing_mode='stretch_both'),
self._http_client.busy_indicator,
)
view = pn.Column(
header, self._http_client.messages,
pn.Tabs(
("Data", self.current_page_view),
("Upload", self.upload_view),
("Config", page_settings),
dynamic=True,
tabs_location=tabs_location,
width=int(settings.GUI_WIDTH),
))
return view