Notes:
Conclusions:
import threading
import re
import os
import twitter
import requests
import json
import time
import redis
from unicodedata import normalize
from datetime import datetime
import string
from pprint import pprint
CONSUMER_KEY = os.getenv("CONSUMER_KEY")
CONSUMER_SECRET = os.getenv("CONSUMER_SECRET")
ACCESS_TOKEN_KEY = os.getenv("ACCESS_TOKEN")
ACCESS_TOKEN_SECRET = os.getenv("ACCESS_SECRET")
a_track = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
a_langs = ['ar','tl','und','es','th','en','ja','in','ko','tr']
assert hasattr(twitter.Api,'GetStreamFilter'), "python-twitter doesn't have .Api"
api = twitter.Api(consumer_key=CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET,
access_token_key=ACCESS_TOKEN_KEY,
access_token_secret=ACCESS_TOKEN_SECRET)
# Docker Configuration
rconn = redis.StrictRedis(host='redis-server', port=6379)
# Local Configuration
rconn = redis.StrictRedis(host='localhost', port=6379)
# Digital Ocean Managed Redis
rconn = redis.Redis(
host='redis-cluster1-do-user-4185874-0.db.ondigitalocean.com',
ssl=True,
password='et49l08yjxblcstm',
port=25061,
decode_responses=True,
db=0)
# Flush redis cache before running main()
# print(rconn.dbsize())
# print(rconn.exists('twit1'))
print(rconn.flushall())
# print(rconn.flushdb())
def main():
try:
for msg in api.GetStreamFilter(track=a_track, languages=a_langs, stall_warnings=True):
if not type(msg) is dict:
raise TypeError("python-twitter not dict: twitter_to_redis")
try:
created_at = msg.get('created_at', 'No created_at')
id_str = msg.get('id_str', 'No id_str')
try:
# This works for tweets only. Not retweets
if msg['truncated'] == True:
text = msg['extended_tweet']["full_text"]
elif msg['truncated'] == False:
text = msg["text"]
except KeyError as e:
print(e)
# try:
# if msg.get("extended_tweet"):
# text = msg['extended_tweet']["full_text"]
# else:
# text = msg["text"]
# except KeyError as e:
# print(e)
if msg.get("user"):
try:
usr_id = msg['user']['id_str']
usr_id = msg.get(usr_id, 'NaN')
usr_name = msg['user']['name']
usr_name = msg.get(usr_name, 'NaN')
usr_screen_name = msg['user']['screen_name']
usr_screen_name = msg.get(usr_screen_name, 'NaN')
usr_location = msg['user']['location']
usr_location = msg.get(usr_location, 'NaN')
usr_url = msg['user']['url']
usr_url = msg.get(usr_url, 'NaN')
usr_description = msg['user']['description']
usr_description = msg.get(usr_description, 'NaN')
if msg['user']['protected'] == True:
usr_protected = 'protected'
else:
usr_protected = 'unprotected'
# # This was throwing error cuz it returns False/True: Used get to return none instead
if msg['user']['verified'] == True:
usr_verified = 'verified'
else:
usr_verified = 'unverified'
# elif msg['user']['verified'] == False:
# usr_verified = 'unverified'
# # usr_verified = msg['user']['verified']
# # usr_verified = msg.get(usr_verified, 'Not Verified')
except KeyError as e:
print(e)
url = "https://www.twitter.com/i/web/status/" + id_str
source = msg.get('source', "No msg_source")
##############################
tweet_dict = {
# 'datetime_now': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'created_at': created_at,
'id_str': id_str,
'text': text,
'usr_id': usr_id,
'usr_name': usr_name,
'usr_screen_name': usr_screen_name,
'usr_location': usr_location,
'usr_url': usr_url,
'usr_desc': usr_description,
'usr_protected': usr_protected,
'usr_verified': usr_verified,
'url': url,
'source': source,
}
pprint(tweet_dict)
try:
pipeline = rconn.pipeline(True)
pipeline.hmset('twit1', tweet_dict)
pipeline.publish('twit_1', json.dumps(tweet_dict))
pipeline.execute()
print('Published!!')
except redis.exceptions.DataError as dataerror:
print(dataerror)
except redis.exceptions.ConnectionError as exc:
print(exc)
except requests.exceptions.ChunkedEncodingError as e:
time.sleep(10)
print('We timed out...Sleeping for 10 seconds.')
main()
import re
import os
import twitter
import requests
import json
import time
import redis
from unicodedata import normalize
from datetime import datetime
import string
from sqlite3 import connect
from contextlib import contextmanager
from textblob import TextBlob
all_langs = ['ar','tl','und','es','th','en','ja','in','ko','tr']
en = ['en']
api = twitter.Api(consumer_key=os.getenv("CONSUMER_KEY"),
consumer_secret=os.getenv("CONSUMER_SECRET"),
access_token_key=os.getenv("ACCESS_TOKEN"),
access_token_secret=os.getenv("ACCESS_SECRET"))
table_name = 't7'
column_names = '(datetime_now, id_str, url, text, source)'
values = '(?, ?, ?, ?, ?)'
# values = '(?, ?, ?, ?, ?, ?, ?, ?, ?)'
db_name = 'sqlitestream4.db'
class TwitterSqlite:
def __init__(self, table_name, column_names, values, db_name):
self.table_name = table_name
self.column_names = column_names
self.values = values
self.db_name = db_name
@contextmanager
def _temptable(self, cur):
cur.execute(
f'''create table if not exists {self.table_name} {self.column_names}''')
try:
cur.execute("pragma journal_mode=off")
cur.execute("pragma temp_store=memory")
cur.execute("pragma synchronous=off")
cur.execute("pragma page_size=4096")
yield
finally:
return
def sqlite_db(self, *args):
with connect(self.db_name, check_same_thread=False, isolation_level=None) as conn:
cur = conn.cursor()
with self._temptable(cur):
cur.execute(
f'''insert into {self.table_name} {self.column_names} values{self.values}''', (args))
for row in cur.execute(f'''select * from {self.table_name}'''):
print(row)
sql_db = TwitterSqlite(table_name, column_names, values, db_name)
# https://stackoverflow.com/questions/26638329/incompleteread-error-when-retrieving-twitter-data-using-python
def main():
try:
assert hasattr(twitter.Api,'GetStreamFilter'), 'Did python-twitter Update?'
for msg in api.GetStreamFilter(track=crypto, languages=en, stall_warnings=True):
if not type(msg) is dict:
raise TypeError("python-twitter not dict: twitter_to_redis")
try:
datetime_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# created_at = msg['created_at']
id_str = msg['id_str']
url = "https://www.twitter.com/i/web/status/" + id_str
try:
if msg.get("extended_tweet"):
text = msg['extended_tweet']["full_text"]
else:
text = msg["text"]
except KeyError as e:
print(e)
source = msg.get('source', "No msg_source")
# usr_screen_name = status.user.screen_name.encode('utf-8'),
# usr_name = status.user.name
##############################
t = {
'datetime_now': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
# 'created_at': created_at,
'id_str': id_str,
'url': url,
'text': text,
'source': source}
sql_db.sqlite_db(t['datetime_now'], t['id_str'], t['url'], t['text'], t['source'])
except Exception as exc:
print(exc)
except requests.exceptions.ChunkedEncodingError as e:
time.sleep(10)
print('We timed out...Sleeping for 10 seconds.')
main()
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import json
import os
import pymongo
from pymongo import MongoClient
from datetime import datetime as dt
uri = "<connection_url>"
client = pymongo.MongoClient(uri)
db = client.test
collection = db['your_collection']
def stream_generator(text):
dic = []
for tweet in text.items():
dic.append(tweet)
yield dic
class TweetsListener(StreamListener):
def on_data(self, data):
try:
tweet = json.loads(data)
st = stream_generator(tweet)
for x in st:
collection.insert_one({'_id': dt.now(), 'tweets': x})
print('Inserting...')
return True
except BaseException as e:
print(e)
return True
if __name__ == "__main__":
track = ['trump', 'MAGA', 'trump2020', '#MAGA', 'thedonald', 'bitcoin', 'ripple']
access_token = os.environ.get('ACCESS_TOKEN')
access_secret = os.environ.get('ACCESS_SECRET')
consumer_key = os.environ.get('CONSUMER_KEY')
consumer_secret = os.environ.get('CONSUMER_SECRET')
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
twitter_stream = Stream(auth, TweetsListener())
twitter_stream.filter(track=track, languages=['en'])
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
store = RedisDataModel()
class StreamListener(tweepy.StreamListener):
def on_status(self, status):
if ('RT @' not in status.text):
blob = TextBlob(status.text)
sent = blob.sentiment
polarity = sent.polarity
subjectivity = sent.subjectivity
tweet_item = {
'id_str': status.id_str,
'text': status.text,
'polarity': polarity,
'subjectivity': subjectivity,
'username': status.user.screen_name,
'name': status.user.name,
'profile_image_url': status.user.profile_image_url,
'received_at': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
store.push(tweet_item)
print("Pushed to redis:", tweet_item)
def on_error(self, status_code):
if status_code == 420:
return False
stream_listener = StreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=["trump",'maga','donaldjtrump','thedonald'])
import json
import redis
# from funcs import TweetFuncs
from .funcs import TweetFuncs
class RedisDataModel:
# Redis Configuration
redis_host = "localhost"
# redis_host = "redis-server"
redis_port = 6379
redis_password = ""
# Tweet Configuration
redis_key = 'tweets'
num_tweets = 20
trim_threshold = 100
def __init__(self):
self.db = r = redis.Redis(
host=self.redis_host,
port=self.redis_port,
password=self.redis_password
)
self.trim_count = 0
def tweets(self, limit=15):
tweets = []
for item in self.db.lrange(self.redis_key, 0, limit-1):
tweet_obj = json.loads(item)
tweets.append(Tweet(tweet_obj))
return tweets
def push(self, data):
self.db.lpush(self.redis_key, json.dumps(data))
self.trim_count += 1
# Periodically trim the list so it doesn't grow too large.
if self.trim_count > 100:
self.db.ltrim(self.redis_key, 0, self.num_tweets)
self.trim_count = 0
import re
class TweetFuncs:
def __init__(self, data):
self.data = data
def user_link(self):
return "http://twitter.com/{}".format(self.data['username'])
def filtered_text(self):
return self.filter_brands(self.filter_urls(self.data['text']))
def filter_brands(self, text):
brands = ["@WarbyParker", "@Bonobos", "@Casper", "@Glossier", "@DollarShaveClub", "@Allbirds"]
for brand in brands:
if (brand in text):
text = text.replace(brand, "<mark>{}</mark>".format(brand))
else:
continue
return text
def filter_urls(self, text):
return re.sub("(https?:\/\/\w+(\.\w+)+(\/[\w\+\-\,\%]+)*(\?[\w\[\]]+(=\w*)?(&\w+(=\w*)?)*)?(#\w+)?)", r'<a href="\1" target="_blank">\1</a>', text)