Module: fetch

The fetch module starts at your server where you're logged in and searches a hashtag. After it gets all the toots your server knows about, then it starts looking at where they came from. For each server that it finds mentioned, it calls fetch_remote(). Each time that it connects to a new server, it fetches every toot that server knows about the hashtag. Then it looks at the servers that are mentioned and adds any new ones to a list of servers to contact.

Public API

This code depends on Mastodon.py and uses it to connect to servers that are mentioned. If you know anything about the fediverse, you know that there's more than just Mastodon servers out there. There's Pleroma, Akkoma, and various other ActivityPub-compatible servers. Some are derived from Mastodon and implement the same APIs. Others don't. Some Mastodon servers offer public read APIs, others don't. So servers that allow public read of their APIs will send you the details on their toots. Servers that don't allow public read, or that don't implement a Mastodon-compatible timeline API will be quietly skipped.

Module for fetching toots for a hashtag.

check_journaldir(dir_name)

Check if a directory exists and create it if it doesn't.

Parameters

  • directory (str): The name of the directory to check/create.

Returns:

bool: True if the directory already existed or was created, False means we tried to create it and failed.

Source code in mastoscore/fetch.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def check_journaldir(dir_name: str) -> bool:
    """
    Check if a directory exists and create it if it doesn't.

    # Parameters
    - directory (str): The name of the directory to check/create.

    # Returns:
    bool: True if the directory already existed or was created, False means we tried
       to create it and failed.
    """
    logger = logging.getLogger(__name__)
    logging.basicConfig(format='%(levelname)s\t%(message)s')
    logger.setLevel(logging.WARN)

    journaldir = os.path.abspath(dir_name)
    if os.path.exists(journaldir):
        if os.path.isdir(journaldir):
            if os.access(journaldir, os.W_OK):
                return True
            else:
                logger.critical(f"'{journaldir}' directory exists but is not writeable")
                return False
        else:
            logger.critical(f"Something already exists at '{journaldir}' but it is not a directory")
            return False
    else:
        try:
            os.makedirs(journaldir)
            logger.warn(f"Created '{journaldir}' successfully.")
            return True
        except Exception as e:
            logger.critical(f"Error creating directory '{journaldir}': {e}")
            return False
        except OSError as e:
            logger.critical(f"Error creating directory '{journaldir}': {e}")
            return False

create_journal_directory(base_dir, year, month, day)

Create a hierarchical directory structure for journal files.

Parameters

  • base_dir: Base directory for journal files
  • year: Year as string (YYYY)
  • month: Month as string (MM)
  • day: Day as string (DD)

Returns

Full path to the created directory, or None if creation failed

Source code in mastoscore/fetch.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def create_journal_directory(base_dir: str, year: str, month: str, day: str) -> str:
    """
    Create a hierarchical directory structure for journal files.

    # Parameters
    - **base_dir**: Base directory for journal files
    - **year**: Year as string (YYYY)
    - **month**: Month as string (MM)
    - **day**: Day as string (DD)

    # Returns

    Full path to the created directory, or None if creation failed
    """
    global logger

    # Create the full path
    dir_path = os.path.join(base_dir, year, month, day)
    dir_path = os.path.abspath(dir_path)

    # Check if directory exists
    if os.path.exists(dir_path):
        if os.path.isdir(dir_path):
            if os.access(dir_path, os.W_OK):
                return dir_path
            else:
                logger.critical(f"Directory '{dir_path}' exists but is not writeable")
                return None
        else:
            logger.critical(f"Path '{dir_path}' exists but is not a directory")
            return None

    # Create directory structure
    try:
        os.makedirs(dir_path, exist_ok=True)
        logger.debug(f"Created directory structure: '{dir_path}'")
        return dir_path
    except Exception as e:
        logger.critical(f"Error creating directory structure '{dir_path}': {e}")
        return None

fetch(config)

This is the top-level function that will download toots and store them in a JSON cache. This function will create a tooter and login to the server named in the cred_file.

Parameters

  • config: A ConfigParser object from the config module

Config Parameters Used

  • fetch:lookback: Number of days to look back in time. Toots older than that are ignored
  • fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
  • fetch:max: Max number of toots to pull from a server (default: 2000)
  • fetch:hashtag: Hashtag to search for
  • fetch:dry_run: If True, we contact our home server, but make no remote connections. If False, do it for real.
  • fetch:api_base_url: Starting server for our first connection
  • fetch:cred_file: Implicitly used when we create our Tooter
  • mastoscore:event_year: Year of the event (YYYY)
  • mastoscore:event_month: Month of the event (MM)
  • mastoscore:event_day: Day of the event (DD)

Returns

None

Source code in mastoscore/fetch.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def fetch(config: ConfigParser) -> None:
    """
    This is the top-level function that will download toots and store them in a JSON cache. This
    function will create a [tooter](module-tooter.md) and login to the server named in the `cred_file`.

    # Parameters
    - **config**: A ConfigParser object from the [config](module-config.md) module

    # Config Parameters Used
    - fetch:lookback: Number of days to look back in time. Toots older than that are ignored
    - fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
    - fetch:max: Max number of toots to pull from a server (default: 2000)
    - fetch:hashtag: Hashtag to search for
    - fetch:dry_run: If True, we contact our home server, but make no remote connections. If False, do it for real.
    - fetch:api_base_url: Starting server for our first connection
    - fetch:cred_file: Implicitly used when we create our [Tooter](module-tooter.md)
    - mastoscore:event_year: Year of the event (YYYY)
    - mastoscore:event_month: Month of the event (MM)
    - mastoscore:event_day: Day of the event (DD)

    # Returns

    None
    """
    global logger

    lookback = config.getint('fetch', 'lookback')
    journaldir = config.get('fetch', 'journaldir')
    maxtoots = config.getint('fetch', 'max')
    hashtag = config.get('fetch', 'hashtag')
    debug = config.getint('fetch', 'debug')
    dry_run = config.getboolean('fetch', 'dry_run')
    api_base_url = config.get('fetch', 'api_base_url')

    logger = logging.getLogger(__name__)
    logging.basicConfig(format='%(levelname)s\t%(message)s')
    logger.setLevel(debug)

    interval = datetime.timedelta(days=lookback)
    oldest_date = datetime.datetime.utcnow() - interval
    oldest_str = oldest_date.strftime("%Y-%m-%d")
    logger.debug(f"Lookback is {lookback} days, which is {oldest_str}")

    # Make sure we can write data before we try to fetch it
    if not check_journaldir(journaldir):
        logger.critical("bailing out")
        return

    try:
        t = Tooter(config, 'fetch')
    except Exception as e:
        logger.critical(f"Failed to create Tooter for {api_base_url}")
        logger.critical(e)
        exit(1)

    logger.debug(
        f"Looking for at most {maxtoots} toots visible from {t.api_base_url} with #{hashtag} since {oldest_str}")

    toots = t.search_hashtag(hashtag, interval, maxtoots)
    if len(toots) == 0:
        logger.error(
            f"We found 0 toots for hashtag #{hashtag} on {api_base_url}")
        return
    else:
        logger.info(f"Found {len(toots)} local toots")
        df = toots2df(toots, api_base_url)
        write_journal(config, df, api_base_url.split('/')[2])

    # Look for non-local statuses. Let's figure out how many remote servers we need
    # to contact. This splits a URI like https://example.net/blah/blah/blah on slashes
    # takes the first 0-3 elements, and rejoins it on slashes. Produces https://example.net
    uris = ['/'.join(s.split('/')[0:3]) for s in df['uri']]

    # servers_done holds the list of servers that we've already contacted
    # servers_todo holds the list we still need to contact
    servers_done = set()
    servers_todo = set(uris)
    servers_fail = set()
    total_toots = len(df)
    try:
        # don't need to contact our own server, because we already got the local toots.
        servers_todo.remove(api_base_url)
    except Exception as e:
        logger.warn(
            f"api_base_url ({api_base_url}) wasn't in the set.")
    servers_done.add(api_base_url)

    if dry_run:
        # In a dry run, we don't reach out to remotes
        logger.info(
            f"We found {len(servers_todo)} remote servers, but dry_run is set, so we won't contact them")
        logger.info(f"Remotes: {servers_todo}")
        logger.info(
            f"Done! Found {len(toots)} total toots across {len(servers_done)} servers.")
        return
    else:
        del df
        del toots
    # Systematically reach out to each server and pull all the hashtag toots.
    # This will likely return some toots that did not appear in the original set,
    # because nobody on our local server follows the person on the other server.
    while len(servers_todo) > 0:
        uri = servers_todo.pop()
        newtoots = fetch_hashtag_remote(config, uri)
        servers_done.add(uri)
        if newtoots is None:
            logger.warn(f"Got no toots back from {uri}")
            servers_fail.add(uri)
            continue
        else:
            logger.info(
                f"Total {total_toots} after adding {len(newtoots)} toots from {uri}")
            try:
                df = toots2df(newtoots, uri)
            except Exception as e:
                logger.error(
                    f"Failed to convert {len(newtoots)} toots from {uri}")
                logger.error(e)
                continue
            if not write_journal(config, df, uri.split('/')[2]):
                return
            total_toots = total_toots + len(df)
            del df
        # Did we find any new servers mentioned as a side-effect of fetching this
        # latest batch?
        newuris = ['/'.join(s['uri'].split('/')[0:3]) for s in newtoots]
        n = 0
        for server in set(newuris):
            if server not in servers_done and server not in servers_todo:
                servers_todo.add(server)
                n = n + 1
        logger.info(
            f"Added {n} new servers added by {uri}. Todo: {len(servers_todo)}, Done: {len(servers_done)}, Fail: {len(servers_fail)}")

    logger.info(
        f"Done! Collected {total_toots} toots from {len(servers_done)} servers with {len(servers_fail)} failures.")

fetch_hashtag_remote(config, server)

Given a uri of a toot, (like from Mastodon.status), create a Tooter for that URI. Connect and fetch the statuses. Return a few fields, but not all.

Parameters

  • config: A ConfigParser object from the config module
  • server: The api_base_url of a server to fetch from

Config Parameters Used

  • fetch:lookback: Number of days to look back in time. Toots older than that are ignored
  • fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
  • fetch:max: Max number of toots to pull from a server (default: 2000)
  • fetch:hashtag: Hashtag to search for

Returns

Dictionary of statuses in the raw JSON format from the API. Fields are not normalised or converted in any way. Since not all ActivityPub servers are exactly the same, it's not even sure which fields you get.

Source code in mastoscore/fetch.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def fetch_hashtag_remote(config, server: str) -> dict:
    """
    Given a uri of a toot, (like from Mastodon.status), create a Tooter
    for that URI. Connect and fetch the statuses. Return a few fields, but not all.

    # Parameters
    - **config**: A ConfigParser object from the [config](module-config.md) module
    - **server**: The api_base_url of a server to fetch from

    # Config Parameters Used
    - fetch:lookback: Number of days to look back in time. Toots older than that are ignored
    - fetch:botusername: Name of our bot. Toots from our bot are dropped from the data set
    - fetch:max: Max number of toots to pull from a server (default: 2000)
    - fetch:hashtag: Hashtag to search for

    # Returns

    Dictionary of statuses in the raw JSON format from the API. Fields are not normalised or
    converted in any way. Since not all ActivityPub servers are exactly the same, it's not even
    sure which fields you get.
    """
    global logger

    lookback = config.getint('fetch', 'lookback')
    maxtoots = config.getint('fetch', 'max')
    hashtag = config.get('fetch', 'hashtag')

    interval = datetime.timedelta(days=lookback)
    oldest_date = datetime.datetime.utcnow() - interval
    oldest_str = oldest_date.strftime("%Y-%m-%d")
    logger.debug(f"Lookback is {lookback} days, which is {oldest_str}")

    # Make the tooter that will do the searching.
    try:
        t = Tooter(config, 'fetch', server)
        logger.info(f"Tooter created for {server}")
    except Exception as e:
        logger.warn(f"Failed to create Tooter for {server}")
        logger.warn(e)
        return None

    try:
        newtoots = t.search_hashtag(hashtag, interval, maxtoots)
    except Exception as e:
        logger.error(
            f"fetch_hashtag_remote: failure fetching {hashtag} from {server}.")
        return None
    return newtoots

toots2df(toots, api_base_url)

Take in a json dict of toots from a tooter object, turn it into a pandas dataframe with a bunch of data normalized.

Parameters

  • toots: dict. A dictionary of toots in the same format as returned by the read_timeline() API
  • api_base_url: string. Expected to include protocol, like https://server.example.com.

Returns

A Pandas DataFrame that contains all the toots normalised. Normalisation includes: - Converting date fields like created_at to timezone-aware datetime objects - Converting integer fields like reblogs_count to integers - Adding some columns (see below) - Deleting about 40 different columns we don't use in the analysis

Synthetic columns added:

  • server: The server part of api_base_url: server.example.com if the api_base_url is https://server.example.com
  • userid: The user's name in person@server.example.com format. Note it does not have the leading @ because tagging people is optional.
  • local: Boolean that is True if the toot comes from the api_base_url server. False otherwise.
  • source: The server part of the server who owns the toot. I might be talking to server.example.com, but they've sent me a copy of a toot from other.example.social.
Source code in mastoscore/fetch.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def toots2df(toots: dict, api_base_url: str) -> pd.DataFrame:
    """
    Take in a json dict of toots from a tooter object, turn it into a
    pandas dataframe with a bunch of data normalized.

    # Parameters
    - toots: dict. A dictionary of toots in the same format as returned by the read_timeline() API
    - api_base_url: string. Expected to include protocol, like `https://server.example.com`.

    # Returns
    A Pandas DataFrame that contains all the toots normalised. Normalisation includes:
    - Converting date fields like `created_at` to timezone-aware `datetime` objects
    - Converting integer fields like `reblogs_count` to integers
    - Adding some columns (see below)
    - Deleting about 40 different columns we don't use in the analysis

    # Synthetic columns added:
    - **server**: The server part of `api_base_url`: `server.example.com` if the `api_base_url` is `https://server.example.com`
    - **userid**: The user's name in `person@server.example.com` format. Note it does not have the leading `@` because tagging people is optional.
    - **local**: Boolean that is **True** if the toot comes from the `api_base_url` server. **False** otherwise.
    - **source**: The server part of the server who owns the toot. I might be talking to `server.example.com`, but they've sent me a copy of a toot from `other.example.social`.
    """

    df = pd.json_normalize(toots)
    df['source'] = api_base_url.split('/')[2]
    df['local'] = [True if i.startswith(
        api_base_url) else False for i in df['uri']]
    # make a new "server" column off of uris
    df['server'] = [n.split('/')[2] for n in df['uri']]
    df['userid'] = df['account.username'] + '@' + df['server']
    df['reblogs_count'] = df['reblogs_count'].astype(int)
    df['replies_count'] = df['replies_count'].astype(int)
    df['favourites_count'] = df['favourites_count'].astype(int)
    df['created_at'] = pd.to_datetime(
        df['created_at'], utc=True, format='ISO8601')
    df.drop(columns=['in_reply_to_id', 'account.indexable',
                     'in_reply_to_account_id', 'filtered', 'muted', 'favourited',
                     'application.name', 'application.website', 'account.group',
                     'account.created_at', 'account.avatar', 'account.avatar_static',
                     'account.locked', 'account.bot', 'account.discoverable',
                     'account.note', 'account.fields', 'pinned', 'reblogged',
                     'account.header', 'account.header_static', 'account.last_status_at',
                     'sensitive', 'bookmarked', 'muted', 'account.followers_count',
                     'spoiler_text', 'visibility', 'account.following_count',
                     'language', 'edited_at', 'account.statuses_count',
                     'local_only', 'reblog', 'application', 'account.hide_collections',
                     'media_attachments', 'mentions', 'tags', 'account.noindex',
                     'emojis', 'poll', 'account.emojis', 'account.roles',
                     'card.url', 'card.title', 'card.description', 'card.language',
                     'card.type', 'card.author_name', 'card.author_url',
                     'card.provider_name', 'card.provider_url', 'card.html', 'card.width',
                     'card.height', 'card.image', 'card.image_description', 'card.embed_url',
                     'card.blurhash', 'card.published_at', 'card.authors',],
            inplace=True, errors='ignore')
    return df

write_journal(config, df, server)

Take dataframe and the url it represents, and calls pandas.DataFrame.to_json() to write it to a corresponding json journal file. Writes it to a file in a hierarchical directory structure: journaldir/year/month/day/journalfile-server.json.

Parameters

  • config: A ConfigParser object from the config module
  • df: A Pandas DataFrame full of toots to write out.
  • server: The api_base_url of a server to fetch from

Config Parameters Used

  • fetch:journaldir: Base directory to write journal files into
  • fetch:journalfile: Journal file template
  • mastoscore:event_year: Year of the event (YYYY)
  • mastoscore:event_month: Month of the event (MM)
  • mastoscore:event_day: Day of the event (DD)

Returns

True if successful, False otherwise

Source code in mastoscore/fetch.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def write_journal(config: ConfigParser, df: pd.DataFrame, server: str) -> bool:
    """
    Take dataframe and the url it represents, and calls
    [pandas.DataFrame.to_json()](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_json.html)
    to write it to a corresponding json journal file. Writes it to a file in a hierarchical
    directory structure: `journaldir/year/month/day/journalfile-server.json`.

    # Parameters
    - **config**: A ConfigParser object from the [config](module-config.md) module
    - **df**: A Pandas DataFrame full of toots to write out.
    - **server**: The api_base_url of a server to fetch from

    # Config Parameters Used
    - fetch:journaldir: Base directory to write journal files into
    - fetch:journalfile: Journal file template
    - mastoscore:event_year: Year of the event (YYYY)
    - mastoscore:event_month: Month of the event (MM)
    - mastoscore:event_day: Day of the event (DD)

    # Returns

    True if successful, False otherwise
    """
    journaldir = config.get('fetch', 'journaldir')
    journalfile = config.get('fetch', 'journalfile')

    # Get date components from config
    try:
        year = config.get('mastoscore', 'event_year')
        month = config.get('mastoscore', 'event_month')
        day = config.get('mastoscore', 'event_day')
    except Exception as e:
        logger.error(f"Failed to get date components from config: {e}")
        logger.error("Falling back to current date")
        now = datetime.datetime.now()
        year = now.strftime("%Y")
        month = now.strftime("%m")
        day = now.strftime("%d")

    # Create directory structure
    dir_path = create_journal_directory(journaldir, year, month, day)
    if dir_path is None:
        return False

    # Create full file path
    jfilename = os.path.join(dir_path, f"{journalfile}-{server}.json")

    try:
        df.to_json(jfilename, orient='records', date_format='iso',
                   date_unit='s')
        logger.info(f"Wrote {len(df)} total toots to {jfilename}")
    except Exception as e:
        logger.critical(
            f"Failed to write {len(df)} toots to {jfilename}")
        logger.critical(e)
        return False
    return True