import streamlit as st import logging import os from annotated_text import annotation from json import JSONDecodeError from markdown import markdown from utils.config import parser from utils.haystack import start_document_store, start_haystack_extractive, start_haystack_rag, query, start_preprocessor_node, start_retriever, start_reader from utils.ui import reset_results, set_initial_state # Sliders DEFAULT_DOCS_FROM_RETRIEVER = int(os.getenv("DEFAULT_DOCS_FROM_RETRIEVER", "3")) DEFAULT_NUMBER_OF_ANSWERS = int(os.getenv("DEFAULT_NUMBER_OF_ANSWERS", "3")) # Labels for the evaluation #EVAL_LABELS = os.getenv("EVAL_FILE", str(Path(__file__).parent / "eval_labels_volksbank_QA.csv")) # Whether the file upload should be enabled or not DISABLE_FILE_UPLOAD = bool(os.getenv("DISABLE_FILE_UPLOAD")) UPLOAD_DOCUMENTS = [] # Define a function to handle file uploads def upload_files(): uploaded_files = st.sidebar.file_uploader( "upload", type=["pdf", "txt", "docx"], accept_multiple_files=True, label_visibility="hidden" ) return uploaded_files # Define a function to process a single file def process_file(data_file, preprocesor, document_store): # read file and add content file_contents = data_file.read().decode("utf-8") docs = [{ 'content': str(file_contents), 'meta': {'name': str(data_file.name)} }] try: names = [item.meta.get('name') for item in document_store.get_all_documents()] #if args.store == 'inmemory': # doc = converter.convert(file_path=files, meta=None) if data_file.name in names: print(f"{data_file.name} already processed") else: print(f'preprocessing uploaded doc {data_file.name}.......') #print(data_file.read().decode("utf-8")) preprocessed_docs = preprocesor.process(docs) print('writing to document store.......') document_store.write_documents(preprocessed_docs) print('updating emebdding.......') document_store.update_embeddings(retriever) except Exception as e: print(e) try: args = parser.parse_args() set_initial_state() st.write('# '+args.name) session_state = st.session_state preprocesor = start_preprocessor_node() document_store = start_document_store(args.store) retriever = start_retriever(document_store) reader = start_reader() if args.task == 'extractive': pipeline = start_haystack_extractive(document_store, retriever, reader) else: pipeline = start_haystack_rag(document_store, retriever) # Sidebar #st.sidebar.header("Options") # File upload block if not DISABLE_FILE_UPLOAD: st.sidebar.write("## File Upload:") #data_files = st.sidebar.file_uploader( # "upload", type=["pdf", "txt", "docx"], accept_multiple_files=True, label_visibility="hidden" #) data_files = upload_files() if data_files is not None: for data_file in data_files: # Upload file if data_file: try: #raw_json = upload_doc(data_file) # Call the process_file function for each uploaded file if args.store == 'inmemory': processed_data = process_file(data_file, preprocesor, document_store) st.sidebar.write(str(data_file.name) + "    ✅ ") except Exception as e: st.sidebar.write(str(data_file.name) + "    ❌ ") st.sidebar.write("_This file could not be parsed, see the logs for more information._") # Search bar question = st.text_input("Ask a question", value=st.session_state.question, max_chars=100, on_change=reset_results) # question = "what is Pi?" run_pressed = st.button("Run") # run_pressed = True run_query = ( run_pressed or question != st.session_state.question ) # Get results for query if run_query and question: reset_results() st.session_state.question = question with st.spinner("🔎    Running your pipeline"): try: st.session_state.results = query(pipeline, question) except JSONDecodeError as je: st.error( "👓    An error occurred reading the results. Is the document store working?" ) except Exception as e: logging.exception(e) st.error("🐞    An error occurred during the request.") if st.session_state.results: results = st.session_state.results if args.task == 'extractive': answers = results['answers'] for count, answer in enumerate(answers): if answer.answer: text, context = answer.answer, answer.context start_idx = context.find(text) end_idx = start_idx + len(text) st.write( f" Answer: {markdown(context[:start_idx] + str(annotation(body=text, label='ANSWER', background='#964448', color='#ffffff')) + context[end_idx:])}", unsafe_allow_html=True, ) else: st.info( "🤔    Haystack is unsure whether any of the documents contain an answer to your question. Try to reformulate it!" ) elif args.task == 'rag': st.write(f" Answer: {results['results'][0]}") # Extract and display information from the 'documents' list retrieved_documents = results['documents'] st.subheader("Retriever Results:") for document in retrieved_documents: st.write(f"Document Name: {document.meta['name']}") st.write(f"Score: {document.score}") st.write(f"Text: {document.content}") except SystemExit as e: # This exception will be raised if --help or invalid command line arguments # are used. Currently streamlit prevents the program from exiting normally # so we have to do a hard exit. os._exit(e.code)