Introduction
Recently I saw a video in Instagram reels about a guy that made quite a popular account with just copying
reddit posts, transcribing it using AI, adding subtitles and pasting it over a subway surfers or gta 5
mega ramp
video. Basically something like this instagram
account.
He explained how he did it manually, going onto Reddit, copying the text of a post, adding it to CapCut
to add
TTS, transcribe and add subtitles, and then manually post it on instagram (or TikTok). I saw this and I
thought:
this can easily be automated
. So I got to work. I used Python for obvious reasons.
This will be a full walkthrough of the code, how it works and what the results look like. I've divided all parts in different modules that will be connected by the main Python file, so it's easy to change out a module for something else (like using a different TTS engine or something).
All the code files for the project are also available here in the library
Getting a Reddit post
Step 1 is to get the text of a post from Reddit so it can be processed into a video. We want the video to read the title and the body text of a post, so we can make a post into a class. A reddit post contains these elements:
- Post id, unique for each post in their subreddit
- Post title
- Post description or the text
- Comments
- Subreddit
class RedditPost:
def __init__(self, id,title, description, comments, subreddit) -> None:
self.id = id
self.title = title
self.description = description
self.comments = comments
self.subreddit = subreddit
self.sanitize_post()
def into_text(self) -> str:
return self.title + ".\n" + self.description
def __str__(self) -> str:
return "Id: " + self.id + "Title: " + self.title + "\nDescription: " + self.description + "\nComments: " + str(len(self.comments))
The into_text
method will be used to get the text of the post and convert it into speech.
Sometimes reddit posts can contain text that is hard to turn into speech, and we want to censor "bad"
words because otherwise IG won't push the video as much. We also want to remove any extra periods
because they will also be spoken literally by the TTS AI. I added some (crude) sanitization to the
RedditPost
class to prevent this:
def sanitize_post(self):
self.description = self.description.replace("LGBTQ","L G B T Q")
self.description = self.description.replace("+","plus")
self.description = self.description.replace("/"," slash ")
self.description = self.description.replace("TLDR","To summarize: ")
self.description = self.description.replace("&", "and")
self.description = self.description.replace("ä", "ae")
self.description = self.description.replace("ö", "oe")
self.description = self.description.replace("ü", "ue")
self.description = self.description.replace("ß", "ss")
self.description = self.description.replace("*","")
self.description = self.description.replace("_","")
self.description = self.description.replace('"'," ")
# profanities
self.description = self.description.replace("fuck", "frick")
self.description = self.description.replace("Fuck", "Frick")
self.description = self.description.replace("Shit", "Shot")
self.description = self.description.replace("shit", "shot")
self.description = self.description.replace(" ass", " butt")
self.description = self.description.replace("asshole", "a-hole")
self.description = self.description.replace(" Ass", " Butt")
self.description = self.description.replace("Asshole", "A-hole")
self.description = self.description.replace(" buttum", " assum") # Assume also contains "Ass"
self.description = self.description.replace(" Buttum", " Assum")
self.description = self.description.replace("kill", "unalive")
self.description = self.description.replace("Kill", "Unalive")
self.description = self.description.replace("death", "unalive")
self.description = self.description.replace("Death", "Unalive")
self.description = self.description.replace("murder", "unalive")
self.description = self.description.replace("Murder", "Unalive")
self.description = self.description.replace("suicide", "self unalive")
self.description = self.description.replace("Suicide", "Self unalive")
self.description = self.description.replace("pedofile", "pdf ile")
self.description = self.description.replace("Pedofile", "Pdf ile")
self.description = self.description.replace("sex", "s*x")
self.description = self.description.replace("Sex", "s*x")
self.title = self.title.replace("fuck", "frick")
self.title = self.title.replace("Fuck", "Frick")
self.title = self.title.replace("Shit", "Shot")
self.title = self.title.replace("shit", "shot")
# AmITheAsshole
self.description = self.description.replace("AITA","Am I the a-hole")
self.title = self.title.replace("AITA","Am I the a-hole")
# tifu
self.description = self.description.replace("TIFU","Today I fricked up")
self.title = self.title.replace("TIFU","Today I fricked up")
# lifeProTips
self.description = self.description.replace("LPT","Life pro tip")
self.title = self.title.replace("LPT","Life pro tip")
self.description = stringutils.remove_trailing_periods(self.description)
The stringutils
module contains some functionality for processing text:
stringutils.py
import logging
logger = logging.getLogger(__name__)
alphabet = "qwertyuiopasdfghjklzxcvbnm"
def remove_trailing_periods(text: str) -> str:
for i in range(len(text)):
if (i < len(text)-1) and text[i].lower() not in alphabet and text[i+1] == ".":
# remove the period after this one
text = text[:i+1] + text[i + 2:]
logger.info("removing extra period at index " + str(i+1))
return remove_trailing_periods(text)
return text
def remove_period_after(character: str, text: str) -> str:
for i in range(len(text)):
if text[i] == character and text[i+1] == ".":
# remove the period after this one
text = text[:i+1] + text[i + 2:]
logger.info("removing extra period at index " + i+1)
return remove_period_after(character,text)
return text
def remove_repeating_periods(text: str) -> str:
return remove_period_after(".",text)
Now that we have a class we can use to represent a Reddit post, we need to actually retrieve them. For
this, I used the PRAW python package. I put the
functionality for this into a RedditEngine
class:
class RedditEngine:
MAX_IG_SHORT_LENGTH = 1620 # max video length is 1:30, this is about that
REDDIT_IDS_FILENAME = "reddit_ids"
TTS_FOLDER_NAME = "tts"
SUBREDDITS_STORIES_FILENAME = "subreddits_stories"
DEFAULT_POST_AMOUNT = 30
def __init__(self) -> None:
clientid = "your reddit client id"
secret = "your reddit secret"
user_agent = "praw_scaper_1.0"
self.reddit = praw.Reddit(username='your username',password='your password',client_id=clientid,client_secret=secret,user_agent=user_agent)
self.posts :List[RedditPost] = []
self.already_used_ids = []
with open(RedditEngine.REDDIT_IDS_FILENAME,"r") as reddit_ids:
for line in reddit_ids:
self.already_used_ids.append(line.replace("\n","").strip())
logger.info("IDs already used:")
logger.info(self.already_used_ids)
def get_posts(self, subreddit_name, limit):
subreddit = self.reddit.subreddit(subreddit_name)
logger.info("getting hot " + str(limit) + " posts for subreddit: " + subreddit.display_name)
for submission in subreddit.hot(limit=limit):
if RedditEngine.check_post(subreddit_name, submission) and ((len(submission.title) + len(submission.selftext)) < RedditEngine.MAX_IG_SHORT_LENGTH):
self.posts.append(RedditPost(str(submission),submission.title, submission.selftext, submission.comments, subreddit_name))
def check_post(subreddit_name, submission):
if "UPDATE" in submission.title or "(Part" in submission.title:
return False
if subreddit_name == "AmITheAsshole" and "Monthly Open" in submission.title:
return False
elif subreddit_name == "talesfromtechsupport" and "POSTING RULES" in submission.title or "Mr_Cartographer" in submission.title or "(Part" in submission.title or str(submission) == "16u1gxn":
return False
return True
def choose_id(self, id: str) -> bool:
"""
Checks if the post with the given ID is in the already used ids or not
Parameters:
id: id of the post to check
Returns:
True if the post has not yet been used, false otherwise
"""
return id not in self.already_used_ids
def exclude_id(self, id: str):
"""
Adds the ID to the already used ids file.
"""
self.already_used_ids.append(id)
with open (RedditEngine.REDDIT_IDS_FILENAME, "a") as f:
f.write(id + "\n")
Let's break that down . I want to always
get the 30 hot posts for a specific subreddit, but I don't want to use the same post twice. That's where
the
REDDIT_IDS_FILENAME
comes in. It's a file that every ID of a used post will get written to.
Each line will contain an ID of a post that has already been used. For example:
1eig16p
1ei8uz8
1eiqnyi
1ehi5il
1ejghtc
1ejjj51
The TTS_FOLDER_NAME
will be used by the main script to save the generated text-to-speech
files to. The
SUBREDDITS_STORIES_FILENAME
is a file that contains the names for all subreddits that can
be used to get a post from. This is also used by the main script. It looks like this:
tifu
nosleep
relationships
LifeProTips
pettyrevenge
talesfromtechsupport
confessions
AmITheAsshole
TrueOffMyChest
To be able to scrape reddit for posts, you need to give PRAW access to your account by entering a client ID and secret. To do that, you need to create an app and get the client id and secret from it.
In the constructor, PRAW will get initialized and the already used posts are read into a list. The
get_posts
method gets the hot 30 posts for a subreddit so that one can be chosen. The
check_post
method will check if a post is not an announcement or update post, because we
only want stand-alone posts to make a video out of. The choose_id
method will check if the
given ID is not already used, and the exclude_id
method will add a post ID to the already
used IDs list.
In the script that generates one video, a random subreddit is chosen from the file and the hot 30 posts
for that are gathered. From those posts, the first one that is not yet in the list of used posts gets
chosen. This is visible in the auto_post_video
and
generate_video_for_subreddit
functions:
def generate_video_for_subreddit(subreddit: str, reddit_engine: get_reddit_posts.RedditEngine) -> bool:
reddit_engine.get_posts(subreddit, get_reddit_posts.RedditEngine.DEFAULT_POST_AMOUNT)
id_accepted = False
i = 0
post = None
while not id_accepted:
if i == len(reddit_engine.posts):
return False
post = reddit_engine.posts[i]
can_use_post = reddit_engine.choose_id(post.id)
if(can_use_post):
id_accepted = True
else:
i+= 1
generate_story_video_for_post(post,reddit_engine)
return True
def auto_post_video():
reddit_engine = get_reddit_posts.RedditEngine()
subreddits = []
with open(get_reddit_posts.RedditEngine.SUBREDDITS_STORIES_FILENAME, "r") as f:
subreddits = f.readlines()
subreddit = random.choice(subreddits)
logger.info("getting post from subreddit " + subreddit)
video_result = generate_video_for_subreddit(subreddit,reddit_engine)
if (not video_result):
logger.warning("should use another subreddit")
After having chosen a Reddit post, it is further processed into a video.
Converting the text to speech
After a Reddit post is chosen, the next step is to convert the text of the post into speech. I wanted to do this using AI because it's incredibly easy to use nowadays. The first thing I tried was ElevenLabs. The results it generates are great, but unfortunately there's a character limit, and I'm not gonna pay for any of this.
text_to_speech_elevenlabs.py
import requests
import random
import logging
logger = logging.getLogger(__name__)
class ElevenLabsVoice:
def __init__(self, voice_id, name):
self.voice_id = voice_id
self.name = name
def __str__(self) -> str:
return "Voice ID: " + self.voice_id + ", Name: " + self.name
class ElevenLabsTTS:
API_KEY = "your API key"
CHUNK_SIZE = 1024
def __init__(self, api_key):
self.api_key = api_key
self.all_voices = []
self.current_voice = None
def get_all_voices(self):
logger.info("retrieving all voices...")
url = "https://api.elevenlabs.io/v1/voices"
headers = {
"Accept": "application/json",
"xi-api-key": self.api_key
}
response = requests.get(url, headers=headers)
for voice in response.json()["voices"]:
self.all_voices.append(ElevenLabsVoice(voice["voice_id"], voice["name"]))
def select_random_voice(self):
self.current_voice = random.choice(self.all_voices)
logger.info("Selected random voice: " + self.current_voice.name)
def write_to_file(self,filename,text) -> bool:
if self.current_voice is None:
raise Exception("No voice selected")
logger.info("writing text to file " + filename + "...")
logger.info(text)
logger.info("using voice: " + self.current_voice.name)
url = "https://api.elevenlabs.io/v1/text-to-speech/" + self.current_voice.voice_id
headers = {
"Accept": "audio/mpeg",
"Content-Type": "application/json",
"xi-api-key": ElevenLabsTTS.API_KEY
}
data = {
"text": text,
"voice_settings": {
"stability": 0.3,
"similarity_boost": 0.5
}}
response = requests.post(url, json=data, headers=headers)
logger.info("GOT RESPONSE")
logger.info(response)
logger.info(response.headers)
logger.info(response.text)
if (response.status_code != 200):
return False
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=ElevenLabsTTS.CHUNK_SIZE):
if chunk:
f.write(chunk)
logger.info("Done writing to file!")
return True
The next thing I tried was running a TTS AI model locally on the VM that will upload these videos. I looked at Coqui: a language model toolkit that's pretty easy to use. It worked pretty well and I got it working fairly quickly, but I wasn't satisfied with the results.
text_to_speech_coqui_tts.py
import torch
from TTS.api import TTS
from pydub import AudioSegment
import os
import stringutils
import time
import logging
logger = logging.getLogger(__name__)
class CoquiTTSEngine:
def __init__(self):
model_name = "tts_models/en/ljspeech/fast_pitch"
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.tts = TTS(model_name=model_name, progress_bar=True).to(self.device)
def synthesize_speech(self, text: str, file_path: str) -> bool:
logger.info(" >>>>> Synthesizing text\n" + text + "\n >>>>> to file " + file_path)
new_text = text.replace("\\","").replace("*","")
new_text = stringutils.remove_trailing_periods(new_text)
logger.info("text after processing a little: " + new_text)
tmp_file = "tmp_audio.mp3"
logger.info("Synthesizing speech...")
try:
self.tts.tts_to_file(text=new_text, file_path=tmp_file)
except Exception as e:
logger.error(e)
return False
time.sleep(1) # wait a little before reading the file
logger.info("speeding up audio file...")
orig_file = AudioSegment.from_file(tmp_file)
sped_up_file = orig_file.speedup(1.3)
sped_up_file.export(file_path,format="mp3")
os.remove(tmp_file)
return True
After some more searching, I came across the TikTok TTS API. I didn't know it existed, and since it's used by almost all reels and TikToks that use AI TTS, it was the perfect choice. It also does not have a character limit as far as I know. There's multiple voices to choose from, so I made the script choose a random english one every time a video gets made.
import sys
sys.path.append("TikTok-Voice-TTS")
from tiktokvoice import tts
import random
import logging
logger = logging.getLogger(__name__)
voices_en = [
# ENGLISH VOICES
'en_au_001', # English AU - Female
'en_au_002', # English AU - Male
'en_uk_001', # English UK - Male 1
'en_uk_003', # English UK - Male 2
'en_us_001', # English US - Female (Int. 1)
'en_us_002', # English US - Female (Int. 2)
'en_us_006', # English US - Male 1
'en_us_007', # English US - Male 2
'en_us_009', # English US - Male 3
'en_us_010', # English US - Male 4
]
class TiktokTTSApi:
def choose_random_voice() -> str:
chosen_voice = random.choice(voices_en)
logger.info("choosing random tiktok voice " + chosen_voice)
return chosen_voice
def tts(self, text: str, filename: str) -> str:
logger.info("converting text to speech!")
voice = TiktokTTSApi.choose_random_voice()
tts(text, voice, filename)
return voice
This is then used in the script to generate a single video:
def generate_story_video_for_post(post: get_reddit_posts.RedditPost, reddit_engine: get_reddit_posts.RedditEngine):
mp3_filename = post.id + ".mp3"
reddit_id_tts_file = os.path.join(os.getcwd(),get_reddit_posts.RedditEngine.TTS_FOLDER_NAME, mp3_filename)
tiktok_tts_api = text_to_speech_tiktok_api.TiktokTTSApi()
voice = tiktok_tts_api.tts(post.into_text(),reddit_id_tts_file)
...
Transcribing
After generating a TTS mp3 file for a Reddit post, the next step is to transcribe the spoken text, so we know when each word will be spoken. This will tell us when we need to show which word onto the screen. To do this, we can use another AI called Whisper. It's made by OpenAI (from ChatGPT, duh) and it works very well. It's also free to use and you can run it locally by downloading the model yourself. It can be used as a command line tool or as a Python package, perfect for this use case.
Using it in python is very straightforward. You load the model you want, pass in the filename of an mp3 file you want to transcribe and Bob's your uncle🥳. I put the functionality into a class so it stays modular:
whisper_transcribe.py
import whisper
import logging
logger = logging.getLogger(__name__)
class WhisperTranscriber:
def __init__(self) -> None:
logger.info("loading whisper model base.en")
self.model = whisper.load_model("base.en") # english-only base model
self.text_array = []
self.fps = 0
def transcribe(self, audio_filename: str) -> dict:
logger.info("transcribing " + audio_filename)
return self.model.transcribe(audio_filename,fp16=False,word_timestamps=True) # using CPU, FP32 must be used
Note that, because I run this on a VM (and I don't have GPU passthrough set up for this VM), I need to
use the fp16=False
parameter, to force FP32. The parameter
word_timestamps=True
is also very useful, as it will give us the timestamp for each word,
rather than for each sentence. This will come in later when we create the actual video.
After having made the class, it can be added to the method to generate a video for a story:
def generate_story_video_for_post(post: get_reddit_posts.RedditPost, reddit_engine: get_reddit_posts.RedditEngine):
...
transcriber = whisper_transcribe.WhisperTranscriber()
result = transcriber.transcribe(reddit_id_tts_file)
...
Generating the video
After transcribing, it's time to do some video editing. The difficult part is figuring out when to
display what part of a sentence. Luckily, we have the timestamps of each word thanks to that handy-dandy
word_timestamps
parameter from Whisper. I found a video that explains a bit about how to go
about creating subtitles with moviepy, but I didn't
really like this guy's implementation, so I modified it a bit.
We begin with (of course) a Video class📽️:
class Video:
def __init__(self,filename,width,height,duration, fps = 0, clip = None) -> None:
self.filename = filename
self.width = width
self.height = height
self.duration = duration
self.fps = fps
self.transcribed_text = []
self.clip = clip
It contains a filename to which to save it, the size of the video, the duration in seconds, FPS, a list of the sentences that were transcribed and a reference to a moviepy clip. The first part of creating the video is to crop it to the correct aspect ratio for instagram, remove the original audio and the TTS audio. In the process of making a video, these things happen:
- Create a video and audio clip
- crop the video to a 16:9 aspect ratio
- select a random start time for the video
- clip it to the length of the TTS audio
add_audio
method:
def add_audio(video_path, audio_path, output_path) -> Video:
logger.info("adding audio file " + audio_path + " to video file " + video_path + " and saving to " + output_path)
video = mpe.VideoFileClip(video_path)
audio = mpe.AudioFileClip(audio_path)
# calculate width to make video 9:16 aspect ratio
W,H = video.size
new_width = (float(H)/16.0)*9.0
new_width_start = (float(W)/2.0) - new_width/2.0
new_width_end = new_width_start + new_width
logger.info("Width of original video is " + str(W) + ". Setting width to " + str(new_width))
logger.info("cropping width from " + str(new_width_start) + " to " + str(new_width_end))
# make video as long as the audio
audio_duration = audio.duration # duration in seconds
video_duration = video.duration
logger.info("audio is " + str(audio.duration) + " seconds, video is " + str(video.duration) + " seconds")
start = random.randrange(0,int(video_duration-audio_duration)) # random start point in video
logger.info("clipping video from " + str(start) + " seconds to " + str(start + audio_duration))
clip = video.subclip(start, start + audio_duration).without_audio().set_audio(audio)
cropped_clip = moviepy.video.fx.all.crop(clip,x1=new_width_start,width=new_width)
if cropped_clip.fps > 60:
cropped_clip.set_fps(60)
logger.info("FPS IS " + str(cropped_clip.fps))
return Video(output_path,new_width,H,audio_duration,cropped_clip.fps,cropped_clip)
This returns a Video
object that is further used to add the subtitles for the transcribed
text. To represent a transcribed line of text, I made a TranscribedLineInfo
class. This
represents a line with a duration (either in seconds or frames):
class TranscribedLineInfo:
def __init__(self,line: str, fps: float, in_seconds: bool, start_frame: int = 0, end_frame: int = 0, start_second = 0, end_second = 0) -> None:
self.text = line
self.start_second = 0
self.end_second = 0
self.start_frame = start_frame
self.end_frame = end_frame
if in_seconds:
self.start_second = start_second
self.end_second = end_second
else:
self.start_second = start_frame/fps
self.end_second = end_frame/fps
self.duration = (self.end_second - self.start_second)
After creating the video file, the next thing to do is to add the transcribed text to the video. This is
done with the put_transcribed_text
function of the Video class. This function is called on
the result of the add_audio
file. The only parameter it takes is the transcribed text from
the transcribe
call earlier.
def put_transcribed_text(self, transcribed_text: dict) -> str:
"""
compiles the transcribed text into the video using the text, the start seconds and the duration
returns the filename of the created video file if successful
"""
self.calculate_words_for_frames(transcribed_text)
if len(self.transcribed_text) == 0:
logger.error("ERROR no transcribed text found")
return
if self.clip == None:
logger.error("ERROR no associated clip found. Did you generate this video with videoediting.add_audio?")
return
clips = []
clips.append(self.clip)
logger.info("Adding text to video!!")
for line_info in tqdm.tqdm(self.transcribed_text):
# print("\t",end='')
# print(line_info.text + " - " + str(line_info.start_second) + ":" + str(line_info.end_second) + " > " + str(line_info.duration))
txt_clip = mpe.TextClip(line_info.text, font="Caladea",fontsize=CHAR_WIDTH+FONT_OFFSET,color="white",bg_color="black").set_start(line_info.start_second).set_duration(line_info.duration).set_pos("center")
clips.append(txt_clip)
logger.info("amount of clips: " + str(len(clips)))
resulting_clip = mpe.CompositeVideoClip(clips)
resulting_clip.write_videofile(self.filename)
return self.filename
The only thing left to do is clean up the TTS file after having added the text to the video. With that,
this is our generate_story_video_for_post
function so far:
def generate_story_video_for_post(post: get_reddit_posts.RedditPost, reddit_engine: get_reddit_posts.RedditEngine):
mp3_filename = post.id + ".mp3"
reddit_id_tts_file = os.path.join(os.getcwd(),get_reddit_posts.RedditEngine.TTS_FOLDER_NAME, mp3_filename)
tiktok_tts_api = text_to_speech_tiktok_api.TiktokTTSApi()
voice = tiktok_tts_api.tts(post.into_text(),reddit_id_tts_file)
transcriber = whisper_transcribe.WhisperTranscriber()
result = transcriber.transcribe(reddit_id_tts_file)
random_video = fileutils.choose_random_video('video')
logger.info("getting random video: " + random_video)
cropped_video = videoediting.add_audio(random_video, reddit_id_tts_file, fileutils.create_result_video_filename( post.id, random_video))
result_filename = cropped_video.put_transcribed_text(result)
logger.info("removing TTS file")
os.remove(reddit_id_tts_file)
...
And now you should be able to automatically generate a video!
Creating a video URL
With the video generation part done, the next thing to do is to upload them somewhere. IG Reels can only be uploaded through the API if a publicly accessible URL is provided where the video is hosted. It kind of sucks, but fortunately I already have a webserver with which I can make videos publicly accessible.
My idea was that I would upload the video to my webserver, use that to upload the video to IG Reels, and then delete it from my webserver.
The uploading part is not that difficult, just read the file and create a HTTP request to send it:
def generate_story_video_for_post(post: get_reddit_posts.RedditPost, reddit_engine: get_reddit_posts.RedditEngine):
...
logger.info("Uploading video to interesting corner!")
files = {'sampleFile': open(result_filename, 'rb')}
r = requests.post(VIDEO_PLACEHOLDER_URL,files=files)
result_video_url = r.json()['file_url']
logger.info("Video uploaded to " + result_video_url)
...
To handle this on my server side, I created a route handler for an /ig endpoint that saves the file in a specific location and returns the URL to that file:
/**
* Upload a video file to the server
* The request must contain a "sampleFile" field with the video file
* Returns: a JSON object with a "status" field that is either "success" or "error"
* If "status" is "success", the object will also contain a "file_url" field with the URL of the uploaded file.
* If "status" is "error", the object will contain an "error" field with the error message.
*/
app.post('/ig', (req, res) => {
console.log('handling ig upload');
file_utils.handle_file_upload(req, __dirname + '/../img/ig/',
function (filename) {
console.log("File uploaded successfully! " + filename);
result = {
"status": "success",
"file_url": "https://www.example.com/img/ig/" + filename
}
res.send(result);
},
function (err) {
console.log("Error in handling file upload");
result = {
"status": "error",
"error": err
}
res.send(result);
}
);
});
Now I can upload a video, I also already created an endpoint that can be used after the video is uploaded to IG to delete it:
/**
* Delete a file from the server
* the request must contain a "video_name" field with only the name of the file
*/
app.put('/ig/rm', (req, res) => {
if (!req.body.video_name) {
console.log("no video name provided to delete")
res.status(400).send("No video name provided");
return;
}
console.log("handling delete for " + req.body.video_name);
if (req.body.video_name.includes("/") ||
req.body.video_name.includes("\\") ||
req.body.video_name.includes("..") ||
req.body.video_name.includes("~") ||
req.body.video_name.includes(" ") ||
req.body.video_name.includes(";") ||
!req.body.video_name.includes(".mp4")) {
console.log("Invalid video name provided")
res.status(400).send("Invalid video name provided");
return;
}
let video_name = __dirname + "/../img/ig/" + req.body.video_name;
file_utils.delete_file(video_name,
function () {
console.log("File deleted successfully")
res.status(200).send("File deleted successfully");
},
function (err) {
console.log("Error deleting file: " + err)
res.status(500).send("Error deleting file: " + err);
}
);
});
Uploading to IG
Phew! Almost there! The only thing left to do is to actually upload the video to IG Reels. To help with that, I found this article. Uploading has to be done through the graph API.
First, you have to make sure you have an Instagram business account that the reels will be posted to. It has to be a business account because otherwise you won't be able to connect to your IG graph API app. After having created the account, you can create the IG graph API app.
To do this, create an account for Meta Developers, and create a new app. When creating the app, select the Instagram Graph API option. After having created the app, in the App settings menu in the pane on the left, click Advanced. From there, you need 4 things:
- Graph API App ID: you can find this at the top of the window or in the URL:
https://developers.facebook.com/apps/your-app-id/settings/advanced/
- Graph API App Secret: view this page for instructions.
- Access token: View this page for instructions.
- IG Business account ID: View this page for instructions.
After having gathered those things, time to write the last bit of code. I created a new InstagramEngine
class for all the instagram related stuff:
class InstagramEngine:
def __init__(self) -> None:
self.params = self.get_creds()
self.check_and_renew_access_token()
The get_creds()
function gets the credentials and parameters used for communication with the graph API app. The function looks like this:
def get_creds(self) :
""" Get creds required for use in the applications
Returns:
dictonary: credentials needed globally
"""
creds = dict() # dictionary to hold everything
creds['access_token'] = IG_GRAPH_ACCESS_TOKEN # access token for use with all api calls
creds['graph_domain'] = 'https://graph.facebook.com/' # base domain for api calls
creds['graph_version'] = 'v20.0' # version of the api we are hitting
creds['endpoint_base'] = creds['graph_domain'] + creds['graph_version'] + '/' # base endpoint with domain and version
creds['instagram_account_id'] = IG_BUSINESS_ACCOUNT_ID # users instagram account id
creds['oauth_base'] = "oauth/access_token?grant_type=fb_exchange_token"
return creds
the check_and_renew_access_token()
function checks the validity of the access token, and if it's about to expire, it renews it. The access token and expiration of the token are written to their own file. The function looks like this:
def check_and_renew_access_token(self):
with open(TOKEN_FILENAME, "r") as token_f:
self.params['access_token'] = token_f.readline().strip()
logger.info("current token is " + self.params['access_token'])
now = time.time() # time in utc seconds since epoch
expiration = 0
with open(TOKEN_EXPIRATION_FILENAME, "r") as f:
expiration = float(f.readline())
seconds_left = expiration - now
days_left = math.floor(seconds_left/(60*60*24))
logger.info("days left for this token: " + str(days_left))
if (days_left <= 7):
self.renew_access_token()
def renew_access_token(self):
logger.info("Renewing access token!")
url = self.params['endpoint_base'] + self.params['oauth_base'] + "&client_id=" + IG_GRAPH_APP_ID + "&client_secret=" + IG_GRAPH_APP_SECRET + "&fb_exchange_token=" + self.params['access_token']
token_resp = requests.get(url)
logger.info("Response is " + str(token_resp))
if (not token_resp.json()):
logger.error("Could not convert response to json!")
return
logger.info("got token response " + str(token_resp.json()))
expire_seconds = int(token_resp.json()['expires_in'])
expire_time = time.time() + expire_seconds
logger.info("new token expires on " + str(expire_time) + " or in " + str(expire_seconds) + " seconds")
with open(TOKEN_EXPIRATION_FILENAME, "w") as expiration_f:
expiration_f.write(str(expire_time))
new_token = token_resp.json()['access_token']
logger.info("new token is " + new_token)
self.params['access_token'] = new_token
with open(TOKEN_FILENAME, "w") as token_f:
token_f.write(str(new_token))
logger.info("wrote new token and expiration to disk")
After having all the authentication figured out, it's time to do the uploading. This process has 2 steps:
- Upload the content to IG
- Publish the reel with the content
First, we need a function to make a call to the Graph API. For that I wrote the make_api_call
function.
def make_api_call(self, url, endpointParams, type ) :
""" Request data from endpoint with params
Args:
url: string of the url endpoint to make request from
endpointParams: dictionary keyed by the names of the url parameters
Returns:
object: data from the endpoint
"""
logger.info("making call to API with parameters:")
logger.info(endpointParams)
logger.info("\n")
if type == 'POST' : # post request
data = requests.post( url, endpointParams )
else : # get request
data = requests.get( url, endpointParams )
response = dict() # hold response info
response['url'] = url # url we are hitting
response['endpoint_params'] = endpointParams #parameters for the endpoint
response['endpoint_params_pretty'] = json.dumps( endpointParams, indent = 4 ) # pretty print for cli
response['json_data'] = json.loads( data.content ) # response data from the api
response['json_data_pretty'] = json.dumps( response['json_data'], indent = 4 ) # pretty print for cli
return response # get and return content
Then, we can upload the reel using the upload_reel
function. This gets the URL of the video on the webserver and the caption for the reel as parameters. It creates a media object which contains the video to use for the reel.
def upload_reel(self, video_url: str, caption: str) -> bool:
self.params['media_type'] = 'REELS'
self.params['media_url'] = video_url
self.params['caption'] = caption
resp = self.create_media_object()
logger.info(resp)
if (not resp['json_data']['id']):
logger.error("Could not upload!")
logger.error(resp['message'])
return False
...
The create_media_object
function looks like this:
def create_media_object(self):
logger.info("Creating media object...")
url = self.params['endpoint_base'] + self.params['instagram_account_id'] + "/media"
end_point_params = dict() # parameter to send to the endpoint
end_point_params['caption'] = self.params['caption'] # caption for the post
end_point_params['access_token'] = self.params['access_token'] # access token
if 'IMAGE' == self.params['media_type'] : # posting image
end_point_params['image_url'] = self.params['media_url'] # url to the asset
else : # posting video
end_point_params['media_type'] = self.params['media_type'] # specify media type
end_point_params['video_url'] = self.params['media_url'] # url to the asset
return self.make_api_call( url, end_point_params, 'POST' ) # make the api call
The API call to upload the reel immediately gives a response, so from that we won't know if the upload has finished or not. We can get that information by also querying the API. For that I wrote the get_media_object_status
function:
def get_media_object_status(self, object_id):
""" Check the status of a media object
Args:
object_id: id of the media object
API Endpoint:
https://graph.facebook.com/v5.0/{ig-container-id}?fields=status_code
Returns:
object: data from the endpoint
"""
logger.info("getting object status for object with ID " + str(object_id))
url = self.params['endpoint_base'] + '/' + object_id # endpoint url
endpointParams = dict() # parameter to send to the endpoint
endpointParams['fields'] = 'status_code' # fields to get back
endpointParams['access_token'] = self.params['access_token'] # access token
return self.make_api_call( url, endpointParams, 'GET' ) # make the api call
We have to keep querying that endpoint until its status code is FINISHED
. So to monitor the upload, I just made a request to that endpoint every 10 seconds if its status code was still IN_PROGRESS
.
def upload_reel(self, video_url: str, caption: str) -> bool:
self.params['media_type'] = 'REELS'
self.params['media_url'] = video_url
self.params['caption'] = caption
resp = self.create_media_object()
logger.info(resp)
if (not resp['json_data']['id']):
logger.error("Could not upload!")
logger.error(resp['message'])
return False
resp_id = resp['json_data']['id']
logger.info("Created resource with id " + str(resp_id))
status_code = 'IN_PROGRESS'
while status_code != 'FINISHED':
resp = self.get_media_object_status(resp_id)
if (not resp['json_data']['status_code']):
logger.error("could not get object status!")
return False
status_code = resp['json_data']['status_code']
logger.info("media object status: " + str(status_code))
logger.info("==========================")
if status_code != 'FINISHED':
logger.info("waiting " + str(PUBLISH_STATUS_DELAY_S) + " seconds before checking again...")
time.sleep(PUBLISH_STATUS_DELAY_S)
After the upload is completed, we can actually publish the reel to our account. For that we need a publish_media
function:
def publish_media(self, object_id):
""" Publish content
Args:
object_id: id of the media object
API Endpoint:
https://graph.facebook.com/v5.0/{ig-user-id}/media_publish?creation_id={creation-id}&access_token={access-token}
Returns:
object: data from the endpoint
"""
logger.info("publishing media for object with ID " + str(object_id))
url = self.params['endpoint_base'] + self.params['instagram_account_id'] + '/media_publish' # endpoint url
endpointParams = dict() # parameter to send to the endpoint
endpointParams['creation_id'] = object_id # fields to get back
endpointParams['access_token'] = self.params['access_token'] # access token
return self.make_api_call( url, endpointParams, 'POST' ) # make the api call
We only need to add that to the rest of the code and check the response. This is the whole upload_reel
function:
def upload_reel(self, video_url: str, caption: str) -> bool:
self.params['media_type'] = 'REELS'
self.params['media_url'] = video_url
self.params['caption'] = caption
resp = self.create_media_object()
logger.info(resp)
if (not resp['json_data']['id']):
logger.error("Could not upload!")
logger.error(resp['message'])
return False
resp_id = resp['json_data']['id']
logger.info("Created resource with id " + str(resp_id))
status_code = 'IN_PROGRESS'
while status_code != 'FINISHED':
resp = self.get_media_object_status(resp_id)
if (not resp['json_data']['status_code']):
logger.error("could not get object status!")
return False
status_code = resp['json_data']['status_code']
logger.info("media object status: " + str(status_code))
logger.info("==========================")
if status_code != 'FINISHED':
logger.info("waiting " + str(PUBLISH_STATUS_DELAY_S) + " seconds before checking again...")
time.sleep(PUBLISH_STATUS_DELAY_S)
publish_response = self.publish_media(resp_id)
logger.info("publish image response:\n")
logger.info(publish_response)
if not publish_response['json_data']['id']:
logger.error("Response had no ID!")
logger.info("SUCCESSFULLY PUBLISHED REEL WITH ID " + str(publish_response['json_data']['id']))
return True
And uploading to Instagram is done! The only thing left is to complete the generate_story_video_for_post
function. This is the completed function:
def generate_story_video_for_post(post: get_reddit_posts.RedditPost, reddit_engine: get_reddit_posts.RedditEngine):
mp3_filename = post.id + ".mp3"
reddit_id_tts_file = os.path.join(os.getcwd(),get_reddit_posts.RedditEngine.TTS_FOLDER_NAME, mp3_filename)
tiktok_tts_api = text_to_speech_tiktok_api.TiktokTTSApi()
voice = tiktok_tts_api.tts(post.into_text(),reddit_id_tts_file)
transcriber = whisper_transcribe.WhisperTranscriber()
result = transcriber.transcribe(reddit_id_tts_file)
random_video = fileutils.choose_random_video('video')
logger.info("getting random video: " + random_video)
cropped_video = videoediting.add_audio(random_video, reddit_id_tts_file, fileutils.create_result_video_filename( post.id, random_video))
result_filename = cropped_video.put_transcribed_text(result)
logger.info("removing TTS file")
os.remove(reddit_id_tts_file)
logger.info("Uploading video to interesting corner!")
files = {'sampleFile': open(result_filename, 'rb')}
r = requests.post(VIDEO_PLACEHOLDER_URL,files=files)
result_video_url = r.json()['file_url']
logger.info("Video uploaded to " + result_video_url)
game_title = random_video.split("/")[1]
description = post.title + "\nGame: " + game_title + "\nPost from the " + post.subreddit.strip() + " subreddit (post id: " + str(post.id) + ")\nFollow for more games and stories! Don't forget to share and save!\nSpoken by TikTok TTS AI voice " + str(voice) + "\n"
description += DEFAULT_TAGS
with open(os.path.join("tags",game_title), 'r') as f:
description += f.readline()
logger.info("Setting description to " + description)
logger.info("Uploading reel to instagram")
ig_engine = instagram_engine.InstagramEngine()
if (ig_engine.upload_reel(result_video_url, description)):
time.sleep(5)
logger.info("Requesting file to be deleted from the server")
params = {"video_name" : result_filename.split("/")[-1]}
r = requests.put(REMOVE_VIDEO_URL, params)
logger.info(str(r) + " -> result from deleting: " + str(r.content))
reddit_engine.exclude_id(post.id)
logger.info("---- DONE ----")
else:
logger.error("Error uploading to instagram!")
Running it periodically
The only thing left to do is to periodically run the auto_post_video
script. For that, I made the main.py
file run the function every 6 hours, so posting a new reel 4 times a day. I wanted the posting to stay consistent, so I made the function write the last time it posted to a file, and it sleep for a while to check again, until it has passed the 6 hours. The code looks like this:
import auto_videos
import time
import random
import logging
logger = logging.getLogger(__name__)
# 8 hours, it's best to post 3 times a day
# according to https://www.reddit.com/r/SocialMediaMarketing/comments/16rbvn8/how_frequent_should_you_post_reels/
# however, I wanna post 4 times a day, so 6 hours
TIMEOUT_SECONDS = 60 * 60 * 6
LAST_POST_TIME_FILENAME = "last_post_time"
def run_periodically():
while True:
logger.info("\n\n===================================================\n\nWaking up, let's check if I can post again!")
last_post_time_seconds = 0
with open(LAST_POST_TIME_FILENAME, "r") as f:
last_post_time_seconds = float(f.readline())
now = time.time()
difference = now - last_post_time_seconds
logger.info("posted " + str(difference) + " seconds or " + str(difference/60) + " minutes ago or " + str(difference/60/60) + " hours ago")
if (difference >= TIMEOUT_SECONDS):
with open(LAST_POST_TIME_FILENAME, "w") as w_f:
logger.info("setting new last post time to " + str(now))
w_f.write(str(now))
logger.info("Posting another video!!\n\n")
auto_videos.auto_post_video()
minutes_to_sleep = random.randint(30,60)
logger.info("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\ngoing to sleep for " + str(minutes_to_sleep) + " minutes")
logger.info("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n\n")
time.sleep(60 * minutes_to_sleep)
def main():
logging.basicConfig(level=logging.INFO)
logger.info("Running main loop!\n\n")
try:
run_periodically()
except Exception as e:
logger.error("Exception occurred! " + str(e))
time.sleep(10)
if __name__ == "__main__":
main()