# Copyright (c) 2013, Web Notes Technologies Pvt. Ltd. and Contributors # MIT License. See license.txt from __future__ import unicode_literals import webnotes import os, base64, re from webnotes.utils import cstr, cint, get_site_path from webnotes import _ from webnotes import conf class MaxFileSizeReachedError(webnotes.ValidationError): pass def get_file_url(file_data_name): data = webnotes.conn.get_value("File Data", file_data_name, ["file_name", "file_url"], as_dict=True) return data.file_name or data.file_url def upload(): # get record details dt = webnotes.form_dict.doctype dn = webnotes.form_dict.docname file_url = webnotes.form_dict.file_url filename = webnotes.form_dict.filename if not filename and not file_url: webnotes.msgprint(_("Please select a file or url"), raise_exception=True) # save if filename: filedata = save_uploaded(dt, dn) elif file_url: filedata = save_url(file_url, dt, dn) return {"fid": filedata.name, "filename": filedata.file_name or filedata.file_url } def save_uploaded(dt, dn): fname, content = get_uploaded_content() if content: return save_file(fname, content, dt, dn); else: raise Exception def save_url(file_url, dt, dn): if not (file_url.startswith("http://") or file_url.startswith("https://")): webnotes.msgprint("URL must start with 'http://' or 'https://'") return None, None f = webnotes.bean({ "doctype": "File Data", "file_url": file_url, "attached_to_doctype": dt, "attached_to_name": dn }) f.ignore_permissions = True try: f.insert(); except webnotes.DuplicateEntryError: return webnotes.doc("File Data", f.doc.duplicate_entry) return f.doc def get_uploaded_content(): # should not be unicode when reading a file, hence using webnotes.form if 'filedata' in webnotes.form_dict: webnotes.uploaded_content = base64.b64decode(webnotes.form_dict.filedata) webnotes.uploaded_filename = webnotes.form_dict.filename return webnotes.uploaded_filename, webnotes.uploaded_content else: webnotes.msgprint('No File') return None, None def extract_images_from_html(doc, fieldname): content = doc.get(fieldname) webnotes.flags.has_dataurl = False def _save_file(match): data = match.group(1) headers, content = data.split(",") filename = headers.split("filename=")[-1] filename = save_file(filename, content, doc.doctype, doc.name, decode=True).get("file_name") if not webnotes.flags.has_dataurl: webnotes.flags.has_dataurl = True return ' 100: webnotes.msgprint("Too many versions", raise_exception=True) return new_fname def scrub_file_name(fname): if '\\' in fname: fname = fname.split('\\')[-1] if '/' in fname: fname = fname.split('/')[-1] return fname def check_max_file_size(content): max_file_size = conf.get('max_file_size') or 1000000 file_size = len(content) if file_size > max_file_size: webnotes.msgprint(_("File size exceeded the maximum allowed size"), raise_exception=MaxFileSizeReachedError) return file_size def write_file(content, files_path): """write file to disk with a random name (to compare)""" # create account folder (if not exists) webnotes.create_folder(files_path) fname = os.path.join(files_path, webnotes.generate_hash()) # write the file with open(fname, 'w+') as f: f.write(content) return fname def remove_all(dt, dn): """remove all files in a transaction""" try: for fid in webnotes.conn.sql_list("""select name from `tabFile Data` where attached_to_doctype=%s and attached_to_name=%s""", (dt, dn)): remove_file(fid) except Exception, e: if e.args[0]!=1054: raise # (temp till for patched) def remove_file(fid): """Remove file and File Data entry""" webnotes.delete_doc("File Data", fid) def get_file(fname): f = webnotes.conn.sql("""select file_name from `tabFile Data` where name=%s or file_name=%s""", (fname, fname)) if f: file_name = f[0][0] else: file_name = fname if not "/" in file_name: file_name = "files/" + file_name # read the file with open(get_site_path("public", file_name), 'r') as f: content = f.read() return [file_name, content]