import datetime
from shutil import copyfile
from dataclasses import dataclass
-import atexit
import py7zr
import glob
import shutil
# Windows cannot handle : in filenames
SAFE_DATETIME_FORMAT = '%Y-%m-%d %H.%M.%S'
-API_BASE="https://api.thingiverse.com"
-ACCESS_QP="access_token={}"
-PAGE_QP="page={}"
+API_BASE = "https://api.thingiverse.com"
+ACCESS_QP = "access_token={}"
+PAGE_QP = "page={}"
API_USER_DESIGNS = API_BASE + "/users/{}/things/"
API_USER_COLLECTIONS = API_BASE + "/users/{}/collections/all?" + ACCESS_QP
SESSION = requests.Session()
+
@dataclass
class ThingLink:
thing_id: str
name: str
api_link: str
+
@dataclass
class FileLink:
name: str
last_update: datetime.datetime
link: str
+
@dataclass
class ImageLink:
name: str
link: str
+
class FileLinks:
- def __init__(self, initial_links=[]):
+ def __init__(self, initial_links=None):
+ if initial_links is None:
+ initial_links = []
self.links = []
self.last_update = None
- for link in initial_links:
+ for link in initial_links:
self.append(link)
def __iter__(self):
FAILED = enum.auto()
ALREADY_DOWNLOADED = enum.auto()
+
def sanitise_url(url):
""" remove api keys from an url
"""
'access_token=***',
url)
+
def strip_time(date_obj):
""" Takes a datetime object and returns another with the time set to 00:00
"""
return datetime.datetime.combine(date_obj.date(), datetime.time())
+
def rename_unique(dir_name, target_dir_name):
""" Move a directory sideways to a new name, ensuring it is unique.
"""
target_dir = target_dir_name
inc = 0
while os.path.exists(target_dir):
- target_dir = "{}_{}".format(target_dir_name, inc)
- inc += 1
+ target_dir = "{}_{}".format(target_dir_name, inc)
+ inc += 1
os.rename(dir_name, target_dir)
return target_dir
def fail_dir(dir_name):
""" When a download has failed, move it sideways.
"""
- return rename_unique(dir_name,"{}_failed".format(dir_name))
+ return rename_unique(dir_name, "{}_failed".format(dir_name))
def truncate_name(file_name):
return new_path
-def strip_ws(value):
- """ Remove whitespace from a string """
- return str(NO_WHITESPACE_REGEX.sub('-', value))
-
-
def slugify(value):
"""
Normalise string, removes invalid for filename charactersr
"""
logging.debug("Sluggyfying {}".format(value))
value = unicodedata.normalize('NFKC', value).lower().strip()
- value = re.sub(r'[\\/<>:\?\*\|"]', '', value)
+ value = re.sub(r'[\\/<>:?*|"]', '', value)
value = re.sub(r'\.*$', '', value)
return value
""" actual download loop.
"""
while True:
- thing_id = self.thing_queue.get()
+ thing_id = self.thing_queue.get
if thing_id is None:
logging.info("Shutting download queue")
self.thing_queue.task_done()
return
-
-
-
class Grouping:
""" Holds details of a group of things for download
This is effectively (although not actually) an abstract class
self.last_page = 0
self.per_page = None
# Should we stop downloading when we hit a known datestamp?
- self.quick = quick
+ self.quick = quick
self.compress = compress
# These should be set by child classes.
self.url = None
self.download_dir = None
+ @property
def get(self):
""" retrieve the things of the grouping. """
if self.things:
# Get the internal details of the grouping.
logging.debug("Querying {}".format(sanitise_url(self.url)))
page = 0
- # TODO:: Must be a way to refactor this cleanly
- if self.paginated:
- # Slightly nasty, but afaik python lacks a clean way to do partial string formatting.
- page_url = self.url + "?" + ACCESS_QP + "&" + PAGE_QP
- while True:
- page += 1
- current_url = page_url.format(API_KEY, page)
- logging.info("requesting:{}".format(sanitise_url(current_url)))
- current_req = SESSION.get(current_url)
- if current_req.status_code != 200:
- logging.error("Got unexpected code {} from url {}: {}".format(current_req.status_code, sanitise_url(current_url), current_req.text))
- break
- current_json = current_req.json()
- if not current_json:
- # No more!
- break
- for thing in current_json:
- self.things.append(ThingLink(thing['id'], thing['name'], thing['url']))
+
+ # self.url should already have been formatted as we don't need pagination
+ logging.info("requesting:{}".format(sanitise_url(self.url)))
+ current_req = SESSION.get(self.url)
+ if current_req.status_code != 200:
+ logging.error(
+ "Got unexpected code {} from url {}: {}".format(current_req.status_code, sanitise_url(self.url),
+ current_req.text))
else:
- # self.url should already have been formatted as we don't need pagination
- logging.info("requesting:{}".format(sanitise_url(self.url)))
- current_req = SESSION.get(self.url)
- if current_req.status_code != 200:
- logging.error("Got unexpected code {} from url {}: {}".format(current_req.status_code, sanitise_url(current_url), current_req.text))
- else:
- current_json = current_req.json()
- for thing in current_json:
- logging.info(thing)
- self.things.append(ThingLink(thing['id'], thing['name'], thing['url']))
+ current_json = current_req.json()
+ for thing in current_json:
+ logging.info(thing)
+ self.things.append(ThingLink(thing['id'], thing['name'], thing['url']))
logging.info("Found {} things.".format(len(self.things)))
return self.things
for idx, thing in enumerate(self.things):
logging.info("Downloading thing {} - {}".format(idx, thing))
RC = Thing(thing).download(self.download_dir, self.compress)
- if self.quick and RC==State.ALREADY_DOWNLOADED:
+ if self.quick and RC == State.ALREADY_DOWNLOADED:
logging.info("Caught up, stopping.")
return
+
class Collection(Grouping):
""" Holds details of a collection. """
try:
current_req = SESSION.get(collection_url)
except requests.exceptions.ConnectionError as error:
- logging.error("Unable to connect for thing {}: {}".format(
- self.thing_id, error))
+ logging.error("Unable to connect for collections for user {}: {}".format(
+ self.user, error))
return
if current_req.status_code != 200:
- logging.error("Got unexpected code {} from url {}: {}".format(current_req.status_code, sanitise_url(collection_url), current_req.text))
+ logging.error(
+ "Got unexpected code {} from url {}: {}".format(current_req.status_code, sanitise_url(collection_url),
+ current_req.text))
return
collection_list = current_req.json()
try:
if self._parsed:
return
-
# First get the broad details
url = API_THING_DETAILS.format(self.thing_id, API_KEY)
try:
logging.error("Access to thing {} is forbidden".format(self.thing_id))
return
if current_req.status_code != 200:
- logging.error("Got unexpected code {} from url {}: {}".format(current_req.status_code, sanitise_url(url), current_req.text))
+ logging.error("Got unexpected code {} from url {}: {}".format(current_req.status_code, sanitise_url(url),
+ current_req.text))
return
thing_json = current_req.json()
except KeyError:
logging.warning("No description found for thing {}?".format(self.thing_id))
-
-
# Now get the file details
file_url = API_THING_FILES.format(self.thing_id, API_KEY)
return
if current_req.status_code != 200:
- logging.error("Unexpected status code {} for {}: {}".format(current_req.status_code, sanitise_url(file_url), current_req.text))
+ logging.error("Unexpected status code {} for {}: {}".format(current_req.status_code, sanitise_url(file_url),
+ current_req.text))
return
link_list = current_req.json()
if not link_list:
- logging.error("No files found for thing {} - probably thingiverse being broken, try again later".format(self.thing_id))
+ logging.error("No files found for thing {} - probably thingiverse being broken, try again later".format(
+ self.thing_id))
for link in link_list:
logging.debug("Parsing link: {}".format(sanitise_url(link['url'])))
try:
datestamp = datetime.datetime.strptime(link['date'], DEFAULT_DATETIME_FORMAT)
- self._file_links.append(FileLink(link['name'], datestamp, link['url'] + API_THING_DOWNLOAD.format(API_KEY)))
+ self._file_links.append(
+ FileLink(link['name'], datestamp, link['url'] + API_THING_DOWNLOAD.format(API_KEY)))
except ValueError:
logging.error(link['date'])
return
if current_req.status_code != 200:
- logging.error("Unexpected status code {} for {}: {}".format(current_req.status_code, sanitise_url(image_url), current_req.text))
+ logging.error(
+ "Unexpected status code {} for {}: {}".format(current_req.status_code, sanitise_url(image_url),
+ current_req.text))
return
image_list = current_req.json()
if not image_list:
- logging.warning("No images found for thing {} - probably thingiverse being iffy as this seems unlikely".format(self.thing_id))
+ logging.warning(
+ "No images found for thing {} - probably thingiverse being iffy as this seems unlikely".format(
+ self.thing_id))
for image in image_list:
logging.debug("parsing image: {}".format(image))
+ name = None
try:
name = slugify(image['name'])
# TODO: fallback to other types
- url = [x for x in image['sizes'] if x['type']=='display' and x['size']=='large'][0]['url']
+ url = [x for x in image['sizes'] if x['type'] == 'display' and x['size'] == 'large'][0]['url']
except KeyError:
logging.warning("Missing image for {}".format(name))
self._image_links.append(ImageLink(name, url))
latest, self.last_time = self._find_last_download(base_dir)
if not latest:
- # Not yet downloaded
- self._parsed = True
- return
-
+ # Not yet downloaded
+ self._parsed = True
+ return
logging.info("last downloaded version: {}".format(self.last_time))
logging.info("Dropping time from comparison stamp as old-style download dir")
files_last_update = strip_time(files_last_update)
-
if files_last_update > self.last_time:
logging.info(
"Found new/updated files {}".format(self._file_links.last_update))
logging.warning("Found old style download_dir. Moving.")
rename_unique(old_dir, self.download_dir)
- def _handle_outdated_directory(self, base_dir):
+ def _handle_outdated_directory(self):
""" Move the current download directory sideways if the thing has changed.
"""
if not os.path.exists(self.download_dir):
# TODO: Maybe look for old download directories.
-
# Now look for 7z files
candidates = glob.glob(os.path.join(base_dir, "{}*.7z".format(self.thing_id)))
# +3 to allow for ' - '
- leading_length =len(self.slug)+3
+ leading_length = len(self.slug) + 3
for path in candidates:
candidate = os.path.basename(path)
try:
except TypeError:
latest_time = candidate_time
latest = candidate
- logging.info("Found last old thing: {} / {}".format(latest,latest_time))
+ logging.info("Found last old thing: {} / {}".format(latest, latest_time))
return (latest, latest_time)
-
-
def download(self, base_dir, compress):
""" Download all files for a given thing.
Returns True iff the thing is now downloaded (not iff it downloads the thing!)
return State.ALREADY_DOWNLOADED
if not self._file_links:
- logging.error("{} - {} appears to have no files. Thingiverse acting up again?".format(self.thing_id, self.name))
+ logging.error(
+ "{} - {} appears to have no files. Thingiverse acting up again?".format(self.thing_id, self.name))
return State.FAILED
# Have we already downloaded some things?
- renamed_dir = self._handle_outdated_directory(base_dir)
+ renamed_dir = self._handle_outdated_directory()
# Get the list of files to download
logging.debug("No last time, downloading all files")
new_file_links = self._file_links
self.time_stamp = new_file_links[0].last_update
-
+
for file_link in new_file_links:
self.time_stamp = max(self.time_stamp, file_link.last_update)
logging.debug("New timestamp will be {}".format(self.time_stamp))
filelist_file = os.path.join(self.download_dir, "filelist.txt")
with open(filelist_file, 'w', encoding="utf-8") as fl_handle:
for fl in self._file_links:
- fl_handle.write("{},{},{}\n".format(fl.link, fl.name, fl.last_update))
-
+ fl_handle.write("{},{},{}\n".format(fl.link, fl.name, fl.last_update))
# First grab the cached files (if any)
logging.info("Copying {} unchanged files.".format(len(old_file_links)))
"Typeerror looking for {} in {}".format(file_link.name, renamed_dir))
new_file_links.append(file_link)
-
# Now download the new ones
logging.info("Downloading {} new files of {}".format(
len(new_file_links), len(self._file_links)))
file_link.name, file_link.link, file_name))
data_req = SESSION.get(file_link.link)
if data_req.status_code != 200:
- logging.error("Unexpected status code {} for {}: {}".format(data_req.status_code, sanitise_url(file_link.link), data_req.text))
+ logging.error("Unexpected status code {} for {}: {}".format(data_req.status_code,
+ sanitise_url(file_link.link),
+ data_req.text))
fail_dir(self.download_dir)
return State.FAILED
-
with open(file_name, 'wb') as handle:
handle.write(data_req.content)
fail_dir(self.download_dir)
return State.FAILED
-
# People like images.
image_dir = os.path.join(self.download_dir, 'images')
logging.info("Downloading {} images.".format(len(self._image_links)))
filename = os.path.join(image_dir, imagelink.name)
image_req = SESSION.get(imagelink.link)
if image_req.status_code != 200:
- logging.error("Unexpected status code {} for {}: {}".format(image_req.status_code, sanitise_url(file_link.link), image_req.text))
+ logging.error("Unexpected status code {} for {}: {}".format(image_req.status_code,
+ sanitise_url(imagelink.link),
+ image_req.text))
fail_dir(self.download_dir)
return State.FAILED
with open(truncate_name(filename), 'wb') as handle:
logging.info("writing license file")
try:
if self._license:
- with open(truncate_name(os.path.join(self.download_dir, 'license.txt')), 'w', encoding="utf-8") as license_handle:
+ with open(truncate_name(os.path.join(self.download_dir, 'license.txt')), 'w',
+ encoding="utf-8") as license_handle:
license_handle.write("{}\n".format(self._license))
except IOError as exception:
logging.warning("Failed to write license! {}".format(exception))
logging.info("writing readme")
try:
if self._details:
- with open(truncate_name(os.path.join(self.download_dir, 'readme.txt')), 'w', encoding="utf-8") as readme_handle:
+ with open(truncate_name(os.path.join(self.download_dir, 'readme.txt')), 'w',
+ encoding="utf-8") as readme_handle:
readme_handle.write("{}\n".format(self._details))
except IOError as exception:
logging.warning("Failed to write readme! {}".format(exception))
try:
# Now write the timestamp
- with open(os.path.join(self.download_dir,TIMESTAMP_FILE), 'w', encoding="utf-8") as timestamp_handle:
+ with open(os.path.join(self.download_dir, TIMESTAMP_FILE), 'w', encoding="utf-8") as timestamp_handle:
timestamp_handle.write(self.time_stamp.__str__())
except Exception as exception:
logging.error("Failed to write timestamp file - {}".format(exception))
if not compress:
return State.OK
-
thing_dir = "{} - {} - {}".format(self.thing_id,
- slugify(self.name),
- self.time_stamp.strftime(SAFE_DATETIME_FORMAT))
+ slugify(self.name),
+ self.time_stamp.strftime(SAFE_DATETIME_FORMAT))
file_name = os.path.join(base_dir,
- "{}.7z".format(thing_dir))
+ "{}.7z".format(thing_dir))
logging.debug("Compressing {} to {}".format(
self.name,
file_name))
return State.OK
-
-
def do_batch(batch_file, download_dir, quick, compress):
""" Read a file in line by line, parsing each as a set of calls to this script."""
with open(batch_file) as handle:
""" Entry point for script being run as a command. """
parser = argparse.ArgumentParser()
parser.add_argument("-l", "--log-level", choices=[
- 'debug', 'info', 'warning'], default='info', help="level of logging desired")
+ 'debug', 'info', 'warning'], default='info', help="level of logging desired")
parser.add_argument("-d", "--directory",
help="Target directory to download into")
parser.add_argument("-f", "--log-file",
help="Compress files")
parser.add_argument("-a", "--api-key",
help="API key for thingiverse")
-
subparsers = parser.add_subparsers(
help="Type of thing to download", dest="subcommand")
collection_parser.add_argument(
"owner", help="The owner of the collection(s) to get")
collection_parser.add_argument(
- "collections", nargs="+", help="Space seperated list of the name(s) of collection to get")
+ "collections", nargs="+", help="Space seperated list of the name(s) of collection to get")
thing_parser = subparsers.add_parser(
'thing', help="Download a single thing.")
thing_parser.add_argument(
"things", nargs="*", help="Space seperated list of thing ID(s) to download")
user_parser = subparsers.add_parser(
- "user", help="Download all things by one or more users")
+ "user", help="Download all things by one or more users")
user_parser.add_argument(
"users", nargs="+", help="A space seperated list of the user(s) to get the designs of")
batch_parser = subparsers.add_parser(
global API_KEY
if args.api_key:
- API_KEY=args.api_key
+ API_KEY = args.api_key
else:
try:
with open("api.key") as fh:
- API_KEY=fh.read().strip()
+ API_KEY = fh.read().strip()
except Exception as e:
logging.error("Either specify the api-key on the command line or in a file called 'api.key'")
logging.error("Exception: {}".format(e))
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
-
# Start downloader
thing_queue = multiprocessing.JoinableQueue()
logging.debug("starting {} downloader(s)".format(DOWNLOADER_COUNT))
for downloader in downloaders:
downloader.start()
-
if args.subcommand.startswith("collection"):
for collection in args.collections:
Collection(args.owner, collection, args.directory, args.quick, args.compress).download()
do_batch(args.batch_file, args.directory, args.quick, args.compress)
# Stop the downloader processes
- for downloader in downloaders:
+ for _ in downloaders:
thing_queue.put(None)
-if __name__ == "__main__":
+if __name__ == "__main__":
multiprocessing.freeze_support()
main()