from flask import flash, render_template, session, request, redirect, url_for from admin.knowledgetopic import * from admin.knowledgenote import * from flask import current_app as app import os import zipfile import shutil import re from hashlib import md5 from pathlib import Path from datetime import datetime from werkzeug.utils import secure_filename from urllib.parse import urlparse ALLOWED_EXTENSIONS_IMG = set(['png', 'jpg', 'JPG', 'PNG', 'bmp']) ALLOWED_EXTENSIONS_VIDEO = set(['mp4', 'avi', 'flv', 'wmv', 'wov', 'mkv']) ALLOWED_EXTENSIONS_ZIP = set(['zip', 'ZIP']) # 新增ZIP文件格式支持 # 判断文件格式(图像) def allowed_file_img(filename): return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS_IMG # 判断文件格式(视频) def allowed_file_video(filename): return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS_VIDEO # 判断文件格式(ZIP) def allowed_file_zip(filename): return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS_ZIP # ------------- 笔记信息管理 ------------------- @app.route('/knowledgenotemanage') def knowledgenotemanage(): if 'username' in session: results = loadAllKnowledgeNote() return render_template('admin/a_knowledgenote_manage.html', results=results) else: flash('登录失败, 请重新登录') return redirect(url_for('adminlogin')) @app.route('/newknowledgenote', methods=['POST', 'GET']) def newknowledgenote(): if 'username' in session: knowledgetopic = loadAllKnowledgeTopic() return render_template('admin/a_knowledgenote_edit.html', knowledgetopic=knowledgetopic, knowledgenote=None, status='add') else: flash('登录失败, 请重新登录') return redirect(url_for('adminlogin')) @app.route('/editknowledgenote/', methods=['POST', 'GET']) def editnoteinfo(knid): if 'username' in session: knowledgetopic = loadAllKnowledgeTopic() knowledgenote = loadKnowledgeNoteByKNID(knid) return render_template('admin/a_knowledgenote_edit.html', knowledgetopic=knowledgetopic, knowledgenote=knowledgenote, status='edit') else: flash('登录失败, 请重新登录') return redirect(url_for('adminlogin')) @app.route('/editknowledgenote.do', methods=['POST']) def addnoteinfo(): if 'username' in session: if request.method == 'POST': status = request.form['lbl_status'] ktid = request.form.get('txt_KTID') kntitle = request.form['txt_KNTitle'] isoriginal = request.form['txt_IsOriginal'] source = request.form['txt_Source'] origlink = request.form['txt_OrigLink'] knimage = request.form['txt_KNImage'] knabstract = request.form['txt_KNAbstract'] kncontent = eval(request.form['txt_KNcontent']) kncontent = kncontent.replace('''"''', r'''\"''').replace("'", r"\'") filelink = request.form['txt_FileLink'] pptlink = request.form['txt_PPTLink'] videolink = request.form['txt_VideoLink'] codelink = request.form['txt_CodeLink'] datalink = request.form['txt_DataLink'] author = request.form['txt_CoAuthors'] knisshow = request.form.get('txt_KNIsShow') # 封面图片处理 f = request.files['file_proimg'] proimg = "" if f and f.filename != "": proimg = uploadimage(f) if proimg == "": proimg = knimage # 视频上传处理 fvideo = request.files['file_video'] video = "" if fvideo and fvideo.filename != "": video = uploadvideo(fvideo) if video == "": video = videolink if status == 'add': data = [ktid, kntitle, isoriginal, source, origlink, proimg, knabstract, kncontent, filelink, pptlink, video, codelink, datalink, author, knisshow] i, msg = add_KnowledgeNote(data) if i > 0: flash('笔记信息添加成功!') return redirect(url_for('knowledgenotemanage')) else: flash('笔记信息添加失败!%s' % str(msg)) return redirect(url_for('knowledgenotemanage')) if status == 'edit': knid = request.form['lbl_KNID'] data = [ktid, kntitle, isoriginal, source, origlink, proimg, knabstract, kncontent, filelink, pptlink, video, codelink, datalink, author, knisshow, knid] i, msg = edit_KnowledgeNote(data) if i > 0: flash('笔记信息修改成功!') return redirect(url_for('knowledgenotemanage')) else: flash('笔记信息修改失败!%s' % str(msg)) return redirect(url_for('knowledgenotemanage')) else: flash('登录失败, 请重新登录') return redirect(url_for('adminlogin')) # ============ ZIP文件上传和解压功能 ============ def upload_and_process_md_zip(): """ 简化的MD ZIP文件上传处理函数,整合了所有功能 """ try: if 'file' not in request.files: # 1. 检查文件上传 return {"success": False, "message": "没有上传zip文件"} if 'folderName' not in request.form: return {"success": False, "message": "没有上传文件夹名"} file = request.files['file'] folderName = request.form.get('folderName') if file.filename == '': return {"success": False, "message": "请选择要上传的ZIP文件"} if not ('.' in file.filename and file.filename.rsplit('.', 1)[1].lower() == 'zip'): #验证是否zip return {"success": False, "message": "只支持ZIP格式的压缩包"} original_filename = secure_filename(file.filename)#安全保存文件 file_digest = md5(original_filename.encode("utf-8")).hexdigest() zip_filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{file_digest}.zip" zip_path = os.path.join('temp_uploads', zip_filename) os.makedirs(os.path.dirname(zip_path), exist_ok=True) file.save(zip_path) # 4. 安全解压ZIP文件 extract_dir_name = Path(original_filename).stem + datetime.now().strftime('%Y%m%d%H%M%S') print(extract_dir_name) extract_path = os.path.join('static/uploads/md', extract_dir_name) os.makedirs(extract_path, exist_ok=True) if not zipfile.is_zipfile(zip_path): os.remove(zip_path) return {"success": False, "message": "无效的ZIP文件"} extracted_files = [] with zipfile.ZipFile(zip_path, 'r') as zip_ref: for file_info in zip_ref.infolist(): if file_info.filename.endswith('/'): continue # 安全路径检查 target_path = os.path.join(extract_path, file_info.filename) if not os.path.realpath(target_path).startswith(os.path.realpath(extract_path)): os.remove(zip_path) return {"success": False, "message": f"检测到不安全路径: {file_info.filename}"} # 解压文件 zip_ref.extract(file_info, extract_path) extracted_files.append(target_path) # 5. 查找并处理MD文件 md_files = [] for root, dirs, files in os.walk(extract_path): for filename in files: if filename.lower().endswith('.md'): full_path = os.path.join(root, filename) relative_path = os.path.relpath(full_path, extract_path) md_files.append({'full_path': full_path, 'relative_path': relative_path}) # 6. 检查MD文件数量 if len(md_files) == 0: return { "success": True, "message": "ZIP文件解压成功,但未找到MD文件", "file_count": 0 } # 7. 处理MD文件内容 for md_file in md_files: replace_image_paths( md_file['full_path'],folderName,extract_dir_name) #传入处理md的函数 # 8. 清理临时文件并返回结果 os.remove(zip_path) return { 'success': True, "md_file":md_file, 'extract_url': f"/static/uploads/md/{extract_dir_name}" } except Exception as e: return {"success": False, "message": f"处理过程中出错: {str(e)}"} def replace_image_paths(file_path, folder_name,extract_dir_name, new_base_path="/static/uploads/md/"): """ 原先设定正则提取url,文件夹名不变,提取文件夹名前面和文件夹名后面的字符串,发现有问题,重新进行修改 """ try: # 读取Markdown文件内容 with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # 正则表达式匹配Markdown图片语法和HTML img标签 patterns = [ r'!\[.*?\]\((.*?)\)', # Markdown图片 r']+src="([^">]+)"' # HTML img标签 ] all_matches = [] for pattern in patterns: matches = re.findall(pattern, content) all_matches.extend(matches) print(f"找到 {len(all_matches)} 个图片链接") if not all_matches: print("未找到图片链接") return 0, 0 # 替换每个图片URL new_content = content for url in all_matches: original_url = url # 跳过已经是新路径的URL if url.startswith(new_base_path): print(f"跳过(已经是新路径): {url}") continue # 查找folder_name在URL中的位置 folder_index = url.find(folder_name) if folder_index != -1: # 提取folderName之后的部分 path_after_folder = url[folder_index:] # 将反斜杠替换为正斜杠 path_after_folder = path_after_folder.replace('\\', '/') # 构建新的URL new_url = f"{new_base_path}{path_after_folder}" # 替换原URL new_content = new_content.replace(original_url, new_url).replace(f"/{folder_name}/",f"/{extract_dir_name}/") # 将修改后的内容写回文件 with open(file_path, 'w', encoding='utf-8') as f: f.write(new_content) except FileNotFoundError: print(f"错误: 文件 '{file_path}' 不存在") except Exception as e: print(f"处理文件时出错: {e}") @app.route('/upload_md_zip', methods=['POST']) def handle_md_zip_upload(): """ 处理MD ZIP文件上传的简化路由 """ result = upload_and_process_md_zip() return result # ============ 原有的文件上传函数 ============ def uploadimage(f): if not (f and allowed_file_img(f.filename)): return "False" digest = md5(f.filename.encode("utf-8")).hexdigest() suffix = Path(f.filename).suffix images_name = datetime.now().strftime('%Y%m%d%H%M%S') + f'{digest}{suffix}' file_path = os.path.join(app.config['UPLOAD_FOLDER_IMG_Note'], images_name) f.save(file_path) return file_path def uploadvideo(f): if not (f and allowed_file_video(f.filename)): return "False" digest = md5(f.filename.encode("utf-8")).hexdigest() suffix = Path(f.filename).suffix images_name = datetime.now().strftime('%Y%m%d%H%M%S') + f'{digest}{suffix}' file_path = os.path.join(app.config['UPLOAD_FOLDER_VIDEO_Note'], images_name) f.save(file_path) file_path = "/" + file_path return file_path @app.route('/viewknowledgenote/', methods=['POST', 'GET']) def viewKnowledgeNote(knid): if 'username' in session: knowledgenote = loadKnowledgeNotewithKTNameByKNID(knid) return render_template('admin/a_knowledgenote_view.html', knowledgenote=knowledgenote) else: flash('查看失败, 请重新登录') return redirect(url_for('adminlogin')) @app.route('/delknowledgenote/', methods=['POST']) def delKnowledgeNote(knid): i, msg = del_KnowledgeNote(knid) if i > 0: flash('笔记信息删除成功!') else: flash('笔记信息删除失败!%s' % str(msg)) return redirect(url_for('knowledgenotemanage'))