fix: udpate

parent c04b4771
from fastapi import FastAPI
from controllers import translate_controler, users_controller
from fastapi.responses import RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from core import setup_logger
import io import io
import os import os
import subprocess import subprocess
import tkinter as tk import tkinter as tk
import urllib.parse import urllib.parse
from datetime import datetime
from tkinter import filedialog, messagebox, scrolledtext from tkinter import filedialog, messagebox, scrolledtext
from typing import List from typing import List
...@@ -17,11 +11,40 @@ import moviepy.editor as mp ...@@ -17,11 +11,40 @@ import moviepy.editor as mp
import requests import requests
import speech_recognition as sr import speech_recognition as sr
from bson import ObjectId from bson import ObjectId
from fastapi import FastAPI, File, UploadFile from controllers import translate_controler, users_controller
from fastapi.responses import JSONResponse from core import setup_logger
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, RedirectResponse
from pydantic import BaseModel
from pymongo.mongo_client import MongoClient
app = FastAPI() app = FastAPI()
# Replace with your MongoDB Atlas credentials
username = "admin"
password = "JppbU6MZeHfOj7sp"
uri = f"mongodb+srv://{username}:{password}@researchmanagement-appl.vzhn4.mongodb.net/?retryWrites=true&w=majority"
client = MongoClient(uri)
db = client["test"]
items_collection = db["translated_items"]
@app.on_event("startup")
async def startup_db_client():
app.mongodb_client = MongoClient(uri)
try:
app.mongodb_client.admin.command("ping")
print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
print(e)
@app.on_event("shutdown")
async def shutdown_db_client():
app.mongodb_client.close()
logger = setup_logger() logger = setup_logger()
app.include_router(users_controller.router) app.include_router(users_controller.router)
...@@ -44,47 +67,8 @@ app.add_middleware( ...@@ -44,47 +67,8 @@ app.add_middleware(
@app.get("/") @app.get("/")
async def root(): async def read_root():
url = app.docs_url or "/docs" return {"message": "FastAPI with MongoDB integration"}
return RedirectResponse(url)
# Define a dictionary to map Sinhala Unicode to integers
unicode_to_int_mapping = {"මම": 1, "හෙට": 2, "යනවා": 3, "මං": 4}
def translate_text(text, target_language):
url = "https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
"sl": "auto",
"tl": target_language,
"dt": "t",
"q": text,
}
response = requests.get(url, params=params)
translation_data = response.json()
translated_text = translation_data[0][0][0]
return translated_text
# send to service.py
def send_to_service(translated_integer_si):
url = "http://127.0.0.1:8000/translated_items/"
data = {
"translated_integer_si": translated_integer_si,
}
response = requests.post(url, json=data)
if response.status_code == 200:
print("Data sent to server successfully")
else:
print("Failed to send data to server")
@app.get("/rest_pyton/welcome")
async def root():
return {"message": "Hello, World!"}
@app.post("/rest_pyton/uploaded_video") @app.post("/rest_pyton/uploaded_video")
async def uploaded_video(file: UploadFile = File(...)): async def uploaded_video(file: UploadFile = File(...)):
...@@ -114,7 +98,8 @@ async def uploaded_video(file: UploadFile = File(...)): ...@@ -114,7 +98,8 @@ async def uploaded_video(file: UploadFile = File(...)):
) )
print("Translated Integer (Si):", translated_integer_si) print("Translated Integer (Si):", translated_integer_si)
# send_to_service(translated_integer_si) # Send translated integer to MongoDB
send_to_mongodb(translated_integer_si)
return JSONResponse( return JSONResponse(
content={ content={
...@@ -125,3 +110,32 @@ async def uploaded_video(file: UploadFile = File(...)): ...@@ -125,3 +110,32 @@ async def uploaded_video(file: UploadFile = File(...)):
# return JSONResponse(content={"translated_text_si": "test"}) # return JSONResponse(content={"translated_text_si": "test"})
except Exception as e: except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500) return JSONResponse(content={"error": str(e)}, status_code=500)
class TranslatedItemCreate(BaseModel):
translated_integer_si: str
unicode_to_int_mapping = {"මම": 1, "හෙට": 2, "යනවා": 3, "මං": 4}
def translate_text(text, target_language):
url = "https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
"sl": "auto",
"tl": target_language,
"dt": "t",
"q": text,
}
response = requests.get(url, params=params)
translation_data = response.json()
translated_text = translation_data[0][0][0]
return translated_text
def send_to_mongodb(translated_integer_si):
translated_item_data = {
"translated_integer_si": translated_integer_si,
"timestamp": datetime.utcnow(),
}
result = items_collection.insert_one(translated_item_data)
if not result.inserted_id:
raise HTTPException(status_code=500, detail="Failed to create translated item")
import io
import os
import subprocess
import tkinter as tk
import urllib.parse
from tkinter import filedialog, messagebox, scrolledtext
from typing import List
import moviepy.editor as mp
import requests
import speech_recognition as sr
from bson import ObjectId
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
app = FastAPI()
# Define a dictionary to map Sinhala Unicode to integers
unicode_to_int_mapping = {"මම": 1, "හෙට": 2, "යනවා": 3, "මං": 4}
def translate_text(text, target_language):
url = "https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
"sl": "auto",
"tl": target_language,
"dt": "t",
"q": text,
}
response = requests.get(url, params=params)
translation_data = response.json()
translated_text = translation_data[0][0][0]
return translated_text
# send to service.py
def send_to_service(translated_integer_si):
url = "http://127.0.0.1:8000/translated_items/"
data = {
"translated_integer_si": translated_integer_si,
}
response = requests.post(url, json=data)
if response.status_code == 200:
print("Data sent to server successfully")
else:
print("Failed to send data to server")
# Define a route for the "Hello, World!" endpoint
@app.get("/")
def read_root():
return {"message": "Hello, World!"}
@app.post("/uploaded_video")
async def uploaded_video(file: UploadFile = File(...)):
try:
# Process the uploaded video
video_content = await file.read()
temp_video_path = "temp_video.mp4"
with open(temp_video_path, "wb") as temp_video_file:
temp_video_file.write(video_content)
audio_save_path = "audio.wav"
video = mp.VideoFileClip(temp_video_path)
audio = video.audio
audio.write_audiofile(audio_save_path)
r = sr.Recognizer()
with sr.AudioFile(audio_save_path) as source:
audio = r.record(source)
recognized_text = r.recognize_google(audio, language="si-LK")
translated_text_si = translate_text(recognized_text, "si")
translated_text_en = translate_text(recognized_text, "en")
translated_integer_si = " ".join(
str(unicode_to_int_mapping.get(word, 0))
for word in translated_text_si.split()
)
print("Translated Integer (Si):", translated_integer_si)
send_to_service(translated_integer_si)
return JSONResponse(
content={
"translated_text_si": translated_text_si,
"translated_text_en": translated_text_en,
}
)
# return JSONResponse(content={"translated_text_si": "test"})
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
import moviepy.editor as mp
import speech_recognition as sr
import tkinter as tk
from tkinter import filedialog
from tkinter import messagebox
from tkinter import scrolledtext
import requests
from bson import ObjectId
import urllib.parse
import subprocess
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
from typing import List
import io
app = FastAPI()
# Define a dictionary to map Sinhala Unicode to integers
unicode_to_int_mapping = {"මම": 1, "හෙට": 2, "යනවා": 3, "මං": 4}
def translate_text(text, target_language):
url = "https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
"sl": "auto",
"tl": target_language,
"dt": "t",
"q": text,
}
response = requests.get(url, params=params)
translation_data = response.json()
translated_text = translation_data[0][0][0]
return translated_text
# send to service.py
def send_to_service(translated_integer_si):
url = "http://127.0.0.1:8000/translated_items/"
data = {
"translated_integer_si": translated_integer_si,
}
response = requests.post(url, json=data)
if response.status_code == 200:
print("Data sent to server successfully")
else:
print("Failed to send data to server")
# get request from frontend
@app.post("/uploaded_video/")
async def uploaded_video(file: UploadFile = File(...)):
try:
# Process the video here
# ... Extract audio, recognize speech, perform translation ...
# Example response with translated results
translated_text_si = "Translated Sinhala Text"
translated_text_en = "Translated English Text"
return JSONResponse(
content={
"translated_text_si": translated_text_si,
"translated_text_en": translated_text_en,
}
)
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
def convert_video():
video_path = filedialog.askopenfilename(filetypes=[("Video Files", "*.mp4")])
if not video_path:
messagebox.showwarning("Warning", "No video file selected.")
return
# video_file = request.files['video']
# if video_file:
# # Save the video file and perform translation
# video_path = "uploaded_video.mp4" # Save the video temporarily
# video_file.save(video_path)
try:
audio_save_path = "audio.wav"
video = mp.VideoFileClip(video_path)
audio = video.audio
audio.write_audiofile(audio_save_path)
r = sr.Recognizer()
with sr.AudioFile(audio_save_path) as source:
audio = r.record(source)
recognized_text = r.recognize_google(audio, language="si-LK")
translated_text_si = translate_text(recognized_text, "si")
translated_text_en = translate_text(recognized_text, "en")
# Get the integer values and join them into a single string
translated_integer_si = " ".join(
str(unicode_to_int_mapping.get(word, 0))
for word in translated_text_si.split()
)
print(
"Translated Integer (Si):", translated_integer_si
) # Print the translated integer
send_to_service(translated_integer_si) # Sending translated_integer_si
# textarea_si.delete('1.0', tk.END)
# textarea_en.delete('1.0', tk.END)
# textarea_si.insert(tk.END, f"Translated Integer: {translated_integer_si}\n{translated_text_si}")
# textarea_en.insert(tk.END, translated_text_en)
except Exception as e:
messagebox.showerror("Error", str(e))
print("Error during video conversion:", e)
# if __name__ == "__main__":
# # subprocess.Popen(["uvicorn", "main:app", "--host", "127.0.0.1", "--port", "8050"])
# window = tk.Tk()
# window.title("Video Translation")
# window.geometry("800x600")
# title_label = tk.Label(window, text="Video Translation", font=("Arial", 16, "bold"))
# title_label.pack(pady=10)
# convert_button = tk.Button(window, text="Convert Video", command=convert_video)
# convert_button.pack(pady=10)
# textarea_si_label = tk.Label(window, text="Sinhala Unicode Translation")
# textarea_si_label.pack()
# textarea_si = scrolledtext.ScrolledText(window, wrap=tk.WORD, width=80, height=10)
# textarea_si.pack()
# textarea_en_label = tk.Label(window, text="Singlish Translation")
# textarea_en_label.pack()
# textarea_en = scrolledtext.ScrolledText(window, wrap=tk.WORD, width=80, height=10)
# textarea_en.pack()
# window.mainloop()
@app.post("/uploaded_video/v1")
async def uploaded_video(file: UploadFile = File(...)):
try:
# Process the uploaded video
video_content = await file.read()
video_io = io.BytesIO(video_content)
audio_save_path = "audio.wav"
video = mp.VideoFileClip(video_io, codec="mpeg4")
audio = video.audio
audio.write_audiofile(audio_save_path)
r = sr.Recognizer()
with sr.AudioFile(audio_save_path) as source:
audio = r.record(source)
recognized_text = r.recognize_google(audio, language="si-LK")
translated_text_si = translate_text(recognized_text, "si")
translated_text_en = translate_text(recognized_text, "en")
translated_integer_si = " ".join(
str(unicode_to_int_mapping.get(word, 0))
for word in translated_text_si.split()
)
print("Translated Integer (Si):", translated_integer_si)
send_to_service(translated_integer_si)
return JSONResponse(
content={
"translated_text_si": translated_text_si,
"translated_text_en": translated_text_en,
}
)
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment