| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- from flask import flash, render_template, session, request, redirect, url_for
- from admin.knowledgetopic import *
- from admin.knowledgenote import *
- from flask import current_app as app
- import os
- import zipfile
- import shutil
- import re
- from hashlib import md5
- from pathlib import Path
- from datetime import datetime
- from werkzeug.utils import secure_filename
- from urllib.parse import urlparse
- ALLOWED_EXTENSIONS_IMG = set(['png', 'jpg', 'JPG', 'PNG', 'bmp'])
- ALLOWED_EXTENSIONS_VIDEO = set(['mp4', 'avi', 'flv', 'wmv', 'wov', 'mkv'])
- ALLOWED_EXTENSIONS_ZIP = set(['zip', 'ZIP']) # 新增ZIP文件格式支持
- # 判断文件格式(图像)
- def allowed_file_img(filename):
- return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS_IMG
- # 判断文件格式(视频)
- def allowed_file_video(filename):
- return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS_VIDEO
- # 判断文件格式(ZIP)
- def allowed_file_zip(filename):
- return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_EXTENSIONS_ZIP
- # ------------- 笔记信息管理 -------------------
- @app.route('/knowledgenotemanage')
- def knowledgenotemanage():
- if 'username' in session:
- results = loadAllKnowledgeNote()
- return render_template('admin/a_knowledgenote_manage.html', results=results)
- else:
- flash('登录失败, 请重新登录')
- return redirect(url_for('adminlogin'))
- @app.route('/newknowledgenote', methods=['POST', 'GET'])
- def newknowledgenote():
- if 'username' in session:
- knowledgetopic = loadAllKnowledgeTopic()
- return render_template('admin/a_knowledgenote_edit.html', knowledgetopic=knowledgetopic, knowledgenote=None,
- status='add')
- else:
- flash('登录失败, 请重新登录')
- return redirect(url_for('adminlogin'))
- @app.route('/editknowledgenote/<knid>', methods=['POST', 'GET'])
- def editnoteinfo(knid):
- if 'username' in session:
- knowledgetopic = loadAllKnowledgeTopic()
- knowledgenote = loadKnowledgeNoteByKNID(knid)
- return render_template('admin/a_knowledgenote_edit.html', knowledgetopic=knowledgetopic,
- knowledgenote=knowledgenote, status='edit')
- else:
- flash('登录失败, 请重新登录')
- return redirect(url_for('adminlogin'))
- @app.route('/editknowledgenote.do', methods=['POST'])
- def addnoteinfo():
- if 'username' in session:
- if request.method == 'POST':
- status = request.form['lbl_status']
- ktid = request.form.get('txt_KTID')
- kntitle = request.form['txt_KNTitle']
- isoriginal = request.form['txt_IsOriginal']
- source = request.form['txt_Source']
- origlink = request.form['txt_OrigLink']
- knimage = request.form['txt_KNImage']
- knabstract = request.form['txt_KNAbstract']
- kncontent = eval(request.form['txt_KNcontent'])
- kncontent = kncontent.replace('''"''', r'''\"''').replace("'", r"\'")
- filelink = request.form['txt_FileLink']
- pptlink = request.form['txt_PPTLink']
- videolink = request.form['txt_VideoLink']
- codelink = request.form['txt_CodeLink']
- datalink = request.form['txt_DataLink']
- author = request.form['txt_CoAuthors']
- knisshow = request.form.get('txt_KNIsShow')
- # 封面图片处理
- f = request.files['file_proimg']
- proimg = ""
- if f and f.filename != "":
- proimg = uploadimage(f)
- if proimg == "":
- proimg = knimage
- # 视频上传处理
- fvideo = request.files['file_video']
- video = ""
- if fvideo and fvideo.filename != "":
- video = uploadvideo(fvideo)
- if video == "":
- video = videolink
- if status == 'add':
- data = [ktid, kntitle, isoriginal, source, origlink, proimg, knabstract, kncontent, filelink, pptlink,
- video, codelink, datalink, author, knisshow]
- i, msg = add_KnowledgeNote(data)
- if i > 0:
- flash('笔记信息添加成功!')
- return redirect(url_for('knowledgenotemanage'))
- else:
- flash('笔记信息添加失败!%s' % str(msg))
- return redirect(url_for('knowledgenotemanage'))
- if status == 'edit':
- knid = request.form['lbl_KNID']
- data = [ktid, kntitle, isoriginal, source, origlink, proimg, knabstract, kncontent, filelink, pptlink,
- video, codelink, datalink, author, knisshow, knid]
- i, msg = edit_KnowledgeNote(data)
- if i > 0:
- flash('笔记信息修改成功!')
- return redirect(url_for('knowledgenotemanage'))
- else:
- flash('笔记信息修改失败!%s' % str(msg))
- return redirect(url_for('knowledgenotemanage'))
- else:
- flash('登录失败, 请重新登录')
- return redirect(url_for('adminlogin'))
- # ============ ZIP文件上传和解压功能 ============
- def upload_and_process_md_zip():
- """
- 简化的MD ZIP文件上传处理函数,整合了所有功能
- """
- try:
-
-
-
- if 'file' not in request.files: # 1. 检查文件上传
- return {"success": False, "message": "没有上传zip文件"}
-
- if 'folderName' not in request.form:
- return {"success": False, "message": "没有上传文件夹名"}
- file = request.files['file']
- folderName = request.form.get('folderName')
- if file.filename == '':
- return {"success": False, "message": "请选择要上传的ZIP文件"}
-
- if not ('.' in file.filename and file.filename.rsplit('.', 1)[1].lower() == 'zip'): #验证是否zip
- return {"success": False, "message": "只支持ZIP格式的压缩包"}
-
-
- original_filename = secure_filename(file.filename)#安全保存文件
- file_digest = md5(original_filename.encode("utf-8")).hexdigest()
- zip_filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}_{file_digest}.zip"
- zip_path = os.path.join('temp_uploads', zip_filename)
-
- os.makedirs(os.path.dirname(zip_path), exist_ok=True)
- file.save(zip_path)
-
- # 4. 安全解压ZIP文件
- extract_dir_name = Path(original_filename).stem + datetime.now().strftime('%Y%m%d%H%M%S')
- print(extract_dir_name)
- extract_path = os.path.join('static/uploads/md', extract_dir_name)
- os.makedirs(extract_path, exist_ok=True)
-
- if not zipfile.is_zipfile(zip_path):
- os.remove(zip_path)
- return {"success": False, "message": "无效的ZIP文件"}
-
- extracted_files = []
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- for file_info in zip_ref.infolist():
- if file_info.filename.endswith('/'):
- continue
-
- # 安全路径检查
- target_path = os.path.join(extract_path, file_info.filename)
- if not os.path.realpath(target_path).startswith(os.path.realpath(extract_path)):
- os.remove(zip_path)
- return {"success": False, "message": f"检测到不安全路径: {file_info.filename}"}
-
- # 解压文件
- zip_ref.extract(file_info, extract_path)
- extracted_files.append(target_path)
-
- # 5. 查找并处理MD文件
- md_files = []
- for root, dirs, files in os.walk(extract_path):
- for filename in files:
- if filename.lower().endswith('.md'):
- full_path = os.path.join(root, filename)
- relative_path = os.path.relpath(full_path, extract_path)
- md_files.append({'full_path': full_path, 'relative_path': relative_path})
-
- # 6. 检查MD文件数量
- if len(md_files) == 0:
- return {
- "success": True,
- "message": "ZIP文件解压成功,但未找到MD文件",
- "file_count": 0
- }
-
- # 7. 处理MD文件内容
- for md_file in md_files:
-
- replace_image_paths( md_file['full_path'],folderName,extract_dir_name) #传入处理md的函数
-
-
- # 8. 清理临时文件并返回结果
- os.remove(zip_path)
-
- return {
- 'success': True,
- "md_file":md_file,
- 'extract_url': f"/static/uploads/md/{extract_dir_name}"
- }
-
- except Exception as e:
- return {"success": False, "message": f"处理过程中出错: {str(e)}"}
-
- def replace_image_paths(file_path, folder_name,extract_dir_name, new_base_path="/static/uploads/md/"):
- """
- 原先设定正则提取url,文件夹名不变,提取文件夹名前面和文件夹名后面的字符串,发现有问题,重新进行修改
- """
- try:
- # 读取Markdown文件内容
- with open(file_path, 'r', encoding='utf-8') as f:
- content = f.read()
-
- # 正则表达式匹配Markdown图片语法和HTML img标签
- patterns = [
- r'!\[.*?\]\((.*?)\)', # Markdown图片
- r'<img[^>]+src="([^">]+)"' # HTML img标签
- ]
-
- all_matches = []
- for pattern in patterns:
- matches = re.findall(pattern, content)
- all_matches.extend(matches)
-
- print(f"找到 {len(all_matches)} 个图片链接")
-
- if not all_matches:
- print("未找到图片链接")
- return 0, 0
-
- # 替换每个图片URL
- new_content = content
- for url in all_matches:
- original_url = url
-
- # 跳过已经是新路径的URL
- if url.startswith(new_base_path):
- print(f"跳过(已经是新路径): {url}")
- continue
-
- # 查找folder_name在URL中的位置
- folder_index = url.find(folder_name)
-
- if folder_index != -1:
- # 提取folderName之后的部分
- path_after_folder = url[folder_index:]
-
- # 将反斜杠替换为正斜杠
- path_after_folder = path_after_folder.replace('\\', '/')
-
- # 构建新的URL
- new_url = f"{new_base_path}{path_after_folder}"
-
- # 替换原URL
- new_content = new_content.replace(original_url, new_url).replace(f"/{folder_name}/",f"/{extract_dir_name}/")
- # 将修改后的内容写回文件
- with open(file_path, 'w', encoding='utf-8') as f:
- f.write(new_content)
-
- except FileNotFoundError:
- print(f"错误: 文件 '{file_path}' 不存在")
- except Exception as e:
- print(f"处理文件时出错: {e}")
-
- @app.route('/upload_md_zip', methods=['POST'])
- def handle_md_zip_upload():
- """
- 处理MD ZIP文件上传的简化路由
- """
- result = upload_and_process_md_zip()
- return result
- # ============ 原有的文件上传函数 ============
- def uploadimage(f):
- if not (f and allowed_file_img(f.filename)):
- return "False"
- digest = md5(f.filename.encode("utf-8")).hexdigest()
- suffix = Path(f.filename).suffix
- images_name = datetime.now().strftime('%Y%m%d%H%M%S') + f'{digest}{suffix}'
- file_path = os.path.join(app.config['UPLOAD_FOLDER_IMG_Note'], images_name)
- f.save(file_path)
- return file_path
- def uploadvideo(f):
- if not (f and allowed_file_video(f.filename)):
- return "False"
- digest = md5(f.filename.encode("utf-8")).hexdigest()
- suffix = Path(f.filename).suffix
- images_name = datetime.now().strftime('%Y%m%d%H%M%S') + f'{digest}{suffix}'
- file_path = os.path.join(app.config['UPLOAD_FOLDER_VIDEO_Note'], images_name)
- f.save(file_path)
- file_path = "/" + file_path
- return file_path
- @app.route('/viewknowledgenote/<knid>', methods=['POST', 'GET'])
- def viewKnowledgeNote(knid):
- if 'username' in session:
- knowledgenote = loadKnowledgeNotewithKTNameByKNID(knid)
- return render_template('admin/a_knowledgenote_view.html', knowledgenote=knowledgenote)
- else:
- flash('查看失败, 请重新登录')
- return redirect(url_for('adminlogin'))
- @app.route('/delknowledgenote/<knid>', methods=['POST'])
- def delKnowledgeNote(knid):
- i, msg = del_KnowledgeNote(knid)
- if i > 0:
- flash('笔记信息删除成功!')
- else:
- flash('笔记信息删除失败!%s' % str(msg))
- return redirect(url_for('knowledgenotemanage'))
|