#!/usr/bin/env python3 """ Copyright © 2022-2023 Matthias Koeppe Kwankyu Lee Sebastian Oehms Dima Pasechnik Modified and extended for the migration of SageMath from Trac to GitHub. Copyright © 2018-2019 Stefan Vigerske This is a modified/extended version of trac-to-gitlab from https://github.com/moimael/trac-to-gitlab. It has been adapted to fit the needs of a specific Trac to GitLab conversion. Then it has been adapted to fit the needs to another Trac to GitHub conversion. Copyright © 2013 Eric van der Vlist Jens Neuhalfen This software is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This sotfware is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library. If not, see . """ import re import os import sys import configparser import contextlib import ast import codecs import logging import mimetypes import types import gzip import json from urllib import parse from collections import defaultdict from copy import copy from datetime import datetime from difflib import unified_diff from time import sleep from roman import toRoman from xmlrpc import client from github import Github, GithubObject, InputFileContent from github.Attachment import Attachment from github.NamedUser import NamedUser from github.Repository import Repository from github.GithubException import IncompletableObject from enum import Enum # from migration_archive_writer import MigrationArchiveWritingRequester import markdown from markdown.extensions.tables import TableExtension from rich.console import Console from rich.table import Table # import github as gh # gh.enable_console_debug_logging() log = logging.getLogger("trac_to_gh") default_config = { "migrate": "true", "keywords_to_labels": "false", "export": "true", # attachments "url": "https://api.github.com", } sleep_after_request = 2.0 sleep_after_attachment = 60.0 sleep_after_10tickets = 0.0 # TODO maybe this can be reduced due to the longer sleep after attaching something sleep_before_xmlrpc = 0.33 sleep_before_xmlrpc_retry = 30.0 config = configparser.ConfigParser(default_config) if len(sys.argv) > 1: config.read(sys.argv[1]) else: config.read("migrate.cfg") config["source"] = { "url": "url", "keep_trac_ticket_references": True, } config["target"] = { "url": "url", "project_name": "project", "usernames": [], "issues_repo_url": "url", "git_repo_url": "url", } config["issues"] = { "migrate": False, "migrate_milestones": False, } config["attachments"] = { "export": False, } config["wiki"] = { "migrate": True, "export_dir": "export", } trac_url = config.get("source", "url") cgit_url = None if config.has_option("source", "cgit_url"): cgit_url = config.get("source", "cgit_url") milestone_prefix_from = "" if config.has_option("source", "milestone_prefix"): milestone_prefix_from = config.get("source", "milestone_prefix") trac_path = None if config.has_option("source", "path"): trac_path = config.get("source", "path") keep_trac_ticket_references = config.getboolean("source", "keep_trac_ticket_references") class subdir(Enum): """ Enum for subdirectories of `trac_url_dir` """ def optional_path(self): """ Return the optional path for this sub directory according to `trac_path` """ if trac_path: return os.path.join(trac_path, self.value) ticket = "ticket" wiki = "wiki" query = "query" report = "report" attachment = "attachment" raw_attachment = "raw-attachment" attachment_ticket = "attachment/ticket" raw_attachment_ticket = "raw-attachment/ticket" root = "" class cgit_cmd(Enum): """ Enum for git commands used in the cgit web interface. """ commit = "commit" diff = "diff" tree = "tree" log = "log" tag = "tag" refs = "refs" plain = "plain" patch = "patch" default = "" trac_url_dir = os.path.dirname(trac_url) trac_url_ticket = os.path.join(trac_url_dir, subdir.ticket.value) trac_url_wiki = os.path.join(trac_url_dir, subdir.wiki.value) trac_url_query = os.path.join(trac_url_dir, subdir.query.value) trac_url_report = os.path.join(trac_url_dir, subdir.report.value) trac_url_attachment = os.path.join(trac_url_dir, subdir.attachment.value) if config.has_option("target", "issues_repo_url"): target_url_issues_repo = config.get("target", "issues_repo_url") target_url_git_repo = config.get("target", "git_repo_url") if config.has_option("wiki", "url"): target_url_wiki = config.get("wiki", "url") github_api_url = config.get("target", "url") github_token = None if config.has_option("target", "token"): github_token = config.get("target", "token") elif config.has_option("target", "username"): github_username = config.get("target", "username") github_password = config.get("target", "password") else: github_username = None github_project = config.get("target", "project_name") migration_archive = None if config.has_option("target", "migration_archive"): migration_archive = config.get("target", "migration_archive") users_map = {} user_full_names = {} username_modules = [] if config.has_option("target", "username_modules"): username_modules = ast.literal_eval(config.get("target", "username_modules")) for module in username_modules: module = __import__(module) users_map.update(module.trac_to_github()) user_full_names.update(module.trac_full_names()) users_map.update(ast.literal_eval(config.get("target", "usernames"))) unknown_users_prefix = "" if config.has_option("target", "unknown_users_prefix"): unknown_users_prefix = config.get("target", "unknown_users_prefix") milestone_prefix_to = "" if config.has_option("target", "milestone_prefix"): milestone_prefix_to = config.get("target", "milestone_prefix") must_convert_issues = config.getboolean("issues", "migrate") only_issues = None if config.has_option("issues", "only_issues"): only_issues = ast.literal_eval(config.get("issues", "only_issues")) blacklist_issues = None if config.has_option("issues", "blacklist_issues"): blacklist_issues = ast.literal_eval(config.get("issues", "blacklist_issues")) filter_issues = "max=0&order=id" if config.has_option("issues", "filter_issues"): filter_issues = config.get("issues", "filter_issues") try: keywords_to_labels = config.getboolean("issues", "keywords_to_labels") except ValueError: keywords_to_labels = ast.literal_eval(config.get("issues", "keywords_to_labels")) migrate_milestones = config.getboolean("issues", "migrate_milestones") milestones_to_labels = {} if config.has_option("issues", "milestones_to_labels"): milestones_to_labels = ast.literal_eval( config.get("issues", "milestones_to_labels") ) canceled_milestones = {} if config.has_option("issues", "canceled_milestones"): canceled_milestones = ast.literal_eval(config.get("issues", "canceled_milestones")) components_to_labels = {} if config.has_option("issues", "components_to_labels"): components_to_labels = ast.literal_eval( config.get("issues", "components_to_labels") ) add_label = None if config.has_option("issues", "add_label"): add_label = config.get("issues", "add_label") # 6-digit hex notation with leading '#' sign (e.g. #FFAABB) or one of the CSS color names # (https://developer.mozilla.org/en-US/docs/Web/CSS/color_value#Color_keywords) labelcolor = { "component": "08517b", "priority": "ff0000", "severity": "ee0000", "type": "008080", "keyword": "eeeeee", "milestone": "008080", "resolution": "008080", } if config.has_option("issues", "label_colors"): labelcolor.update(ast.literal_eval(config.get("issues", "label_colors"))) ignored_values = [] if config.has_option("issues", "ignored_values"): ignored_values = ast.literal_eval(config.get("issues", "ignored_values")) ignored_names = set([]) if config.has_option("issues", "ignored_names"): ignored_names = set(ast.literal_eval(config.get("issues", "ignored_names"))) ignored_mentions = set([]) if config.has_option("issues", "ignored_mentions"): ignored_mentions = set(ast.literal_eval(config.get("issues", "ignored_mentions"))) attachment_export = config.getboolean("attachments", "export") if attachment_export: attachment_export_dir = config.get("attachments", "export_dir") if config.has_option("attachments", "export_url"): attachment_export_url = config.get("attachments", "export_url") if not attachment_export_url.endswith("/"): attachment_export_url += "/" else: attachment_export_url = target_url_issues_repo if not attachment_export_url.endswith("/"): attachment_export_url += "/" attachment_export_url += "files/" must_convert_wiki = config.getboolean("wiki", "migrate") wiki_export_dir = None if must_convert_wiki or config.has_option("wiki", "export_dir"): wiki_export_dir = config.get("wiki", "export_dir") default_multilines = False if config.has_option("source", "default_multilines"): # set this boolean in the source section of the configuration file # to change the default of the multilines flag in the function # trac2markdown default_multilines = config.getboolean("source", "default_multilines") from diskcache import Cache cache = Cache("trac_cache", size_limit=int(20e9)) gh_labels = dict() gh_user = None closing_commits = {} # (src_ticket_id, commit) -> closing_commit def read_closing_commits(): # Generated using write-closing-commits.sh if os.path.exists("closing_commits.txt"): with open("closing_commits.txt", "r") as f: for line_number, line in enumerate(f.readlines(), start=1): if m := re.match( "^([0-9a-f]{40}) Merge: ([0-9a-f]{40}) ([0-9a-f]{40}) Trac #([0-9]+):", line, ): sha = m.group(1) parent2_sha = m.group(3) src_ticket_id = int(m.group(4)) try: other_sha = closing_commits[src_ticket_id, parent2_sha] except KeyError: pass else: log.warning( f"closing_commits.txt:{line_number}: multiple commits for ticket #{src_ticket_id} {parent2_sha}: {other_sha}, {sha}" ) closing_commits[src_ticket_id, parent2_sha] = sha elif line: log.warning(f"closing_commits.txt:{line_number}: malformed line") # The file wiki_path_conversion_table.txt is created if not exists. If it # exists, the table below is constructed from the data in the file. create_wiki_link_conversion_table = False wiki_path_conversion_table = {} if os.path.exists("wiki_path_conversion_table.txt"): with open("wiki_path_conversion_table.txt", "r") as f: for line in f.readlines(): trac_wiki_path, wiki_path = line[:-1].split(" ") wiki_path_conversion_table[trac_wiki_path] = wiki_path elif must_convert_wiki: create_wiki_link_conversion_table = True RE_CAMELCASE1 = re.compile(r"(?<=\s)((?:[A-Z][a-z0-9]+){2,})(?=[\s\.\,\:\;\?\!])") RE_CAMELCASE2 = re.compile(r"(?<=\s)((?:[A-Z][a-z0-9]+){2,})$") RE_HEADING1 = re.compile(r"^(=)\s(.+)\s=\s*([\#][^\s]*)?") RE_HEADING2 = re.compile(r"^(==)\s(.+)\s==\s*([\#][^\s]*)?") RE_HEADING3 = re.compile(r"^(===)\s(.+)\s===\s*([\#][^\s]*)?") RE_HEADING4 = re.compile(r"^(====)\s(.+)\s====\s*([\#][^\s]*)?") RE_HEADING5 = re.compile(r"^(=====)\s(.+)\s=====\s*([\#][^\s]*)?") RE_HEADING6 = re.compile(r"^(======)\s(.+)\s======\s*([\#][^\s]*)?") RE_HEADING1a = re.compile(r"^(=)\s([^#]+)([\#][^\s]*)?") RE_HEADING2a = re.compile(r"^(==)\s([^#]+)([\#][^\s]*)?") RE_HEADING3a = re.compile(r"^(===)\s([^#]+)([\#][^\s]*)?") RE_HEADING4a = re.compile(r"^(====)\s([^#]+)([\#][^\s]*)?") RE_HEADING5a = re.compile(r"^(=====)\s([^#]+)([\#][^\s]*)?") RE_HEADING6a = re.compile(r"^(======)\s([^#]+)([\#][^\s]*)?") RE_SUPERSCRIPT1 = re.compile(r"\^([^\s]+?)\^") RE_SUBSCRIPT1 = re.compile(r",,([^\s]+?),,") RE_IMAGE1 = re.compile(r"\[\[Image\(source:([^(]+)\)\]\]") RE_IMAGE2 = re.compile(r"\[\[Image\(([^),]+)\)\]\]") RE_IMAGE3 = re.compile(r"\[\[Image\(([^),]+),\slink=([^(]+)\)\]\]") RE_IMAGE4 = re.compile(r"\[\[Image\((http[^),]+),\s([^)]+)\)\]\]") RE_IMAGE5 = re.compile(r"\[\[Image\(([^),]+),\s([^)]+)\)\]\]") RE_IMAGE6 = re.compile(r"\[\[Image\(([^),]+),\s*([^)]+),\s*([^)]+)\)\]\]") RE_HTTPS1 = re.compile(r"\[\[(https?://[^\s\]\|]+)\s*\|\s*(.+?)\]\]") RE_HTTPS2 = re.compile(r"\[\[(https?://[^\]]+)\]\]") RE_HTTPS3 = re.compile(r"\[(https?://[^\s\[\]\|]+)\s*[\s\|]\s*([^\[\]]+)\]") RE_HTTPS4 = re.compile(r"\[(https?://[^\s\[\]\|]+)\]") RE_TICKET_COMMENT1 = re.compile( r"\[\[ticket:([1-9]\d*)#comment:([1-9]\d*)\s*\|\s*(.+?)\]\]" ) RE_TICKET_COMMENT2 = re.compile(r"\[\[ticket:([1-9]\d*)#comment:([1-9]\d*)\]\]") RE_TICKET_COMMENT3 = re.compile(r"\[ticket:([1-9]\d*)#comment:([1-9]\d*)\s+(.*?)\]") RE_TICKET_COMMENT4 = re.compile(r"\[ticket:([1-9]\d*)#comment:([0])\s+(.*?)\]") RE_TICKET_COMMENT5 = re.compile(r"\[comment:ticket:([1-9]\d*):([1-9]\d*)\s+(.*?)\]") RE_TICKET_COMMENT6 = re.compile(r"ticket:([1-9]\d*)#comment:([1-9]\d*)") RE_COMMENT1 = re.compile(r"\[\[comment:([1-9]\d*)\]\]") RE_COMMENT2 = re.compile(r"\[\[comment:([1-9]\d*)\s*\|\s*(.+?)\]\]") RE_COMMENT3 = re.compile(r"\[comment:([1-9]\d*)\s+(.*?)\]") RE_COMMENT4 = re.compile( r"(?<=\s)comment:([1-9]\d*)" ) # need to exclude the string as part of http url RE_ATTACHMENT1 = re.compile(r"\[\[attachment:([^\s\|\]]+)[\s\|](.+?)\]\]") RE_ATTACHMENT2 = re.compile(r"\[\[attachment:([^\s]+?)\]\]") RE_ATTACHMENT3 = re.compile(r"\[attachment:([^\s\|\]]+)[\s\|](.+?)\]") RE_ATTACHMENT4 = re.compile(r"\[attachment:([^\s]+?)\]") RE_ATTACHMENT5 = re.compile(r"(?<=\s)attachment:([^\s]+)\.\s") RE_ATTACHMENT6 = re.compile(r"^attachment:([^\s]+)\.\s") RE_ATTACHMENT7 = re.compile(r"(?<=\s)attachment:([^\s]+)") RE_ATTACHMENT8 = re.compile(r"^attachment:([^\s]+)") RE_LINEBREAK1 = re.compile(r"(\[\[br\]\])") RE_LINEBREAK2 = re.compile(r"(\[\[BR\]\])") RE_LINEBREAK3 = re.compile(r"(\\\\\s*)") RE_WIKI1 = re.compile(r'\[\["([^\]\|]+)["]\s*([^\[\]"]+)?["]?\]\]') RE_WIKI2 = re.compile(r"\[\[\s*([^\]|]+)[\|]([^\[\]\|]+)\]\]") RE_WIKI3 = re.compile(r"\[\[\s*([^\]]+)\]\]") RE_WIKI4 = re.compile(r'\[wiki:"([^\[\]\|]+)["]\s*([^\[\]"]+)?["]?\]') RE_WIKI5 = re.compile(r"\[wiki:([^\s\[\]\|]+)\s*[\s\|]\s*([^\[\]]+)\]") RE_WIKI6 = re.compile(r"\[wiki:([^\s\[\]]+)\]") RE_WIKI7 = re.compile(r"\[/wiki/([^\s\[\]]+)\s+([^\[\]]+)\]") RE_QUERY1 = re.compile(r"\[query:\?") RE_SOURCE1 = re.compile(r"\[source:([^\s\[\]]+)\s+([^\[\]]+)\]") RE_SOURCE2 = re.compile(r"source:([\S]+)") RE_BOLDTEXT1 = re.compile(r"\'\'\'(.*?)\'\'\'") RE_ITALIC1 = re.compile(r"\'\'(.*?)\'\'") RE_ITALIC2 = re.compile(r"(?<=\s)//(.*?)//") RE_TICKET1 = re.compile(r"[\s]%s/([1-9]\d{0,4})" % trac_url_ticket) RE_TICKET2 = re.compile(r"\#([1-9]\d{0,4})") RE_UNDERLINED_CODE1 = re.compile(r"(?<=\s)_([a-zA-Z_]+)_(?=[\s,)])") RE_UNDERLINED_CODE2 = re.compile(r"(?<=\s)_([a-zA-Z_]+)_$") RE_UNDERLINED_CODE3 = re.compile(r"^_([a-zA-Z_]+)_(?=\s)") RE_CODE_SNIPPET = re.compile(r"(?([a-zA-Z]+)') RE_TRAC_REPORT = re.compile(r"\[report:([0-9]+)\s*(.*?)\]") RE_COMMIT_LIST1 = re.compile(r"\|\[(.+?)\]\((.*)\)\|(.*?)\|") RE_COMMIT_LIST2 = re.compile(r"\|\[(.+?)\]\((.*)\)\|`(.*?)`\|") RE_COMMIT_LIST3 = re.compile(r"\|(.*?)\|(.*?)\|") RE_NEW_COMMITS = re.compile(r"(?sm)(New commits:)\n((?:\|[^\n]*\|(?:\n|$))+)") RE_LAST_NEW_COMMITS = re.compile( r"(?sm)(Last \d+ new commits:)\n((?:\|[^\n]*\|(?:\n|$))+)" ) class CodeTag: """ Handler for code protectors. """ def replace(self, text): """ Return the given string with protection tags replaced by their proper counterparts. """ text = text.replace(self.tag, self._code) return text def __init__(self, tag, code): self.tag = tag self._code = code at_sign = CodeTag("AT__SIGN__IN__CODE", "@") linebreak_sign1 = CodeTag("LINEBREAK__SIGN1__IN__CODE", r"\\") linebreak_sign2 = CodeTag("LINEBREAK__SIGN2__IN__CODE", r"[[br]]") linebreak_sign3 = CodeTag("LINEBREAK__SIGN3__IN__CODE", r"[[BR]]") class Brackets: """ Handler for bracket protectors. """ def replace(self, text): """ Return the given string with protection tags replaced by their proper counterparts. """ text = text.replace(self.open, self._open_bracket) text = text.replace(self.close, self._close_bracket) return text def __init__(self, open_tag, close_tag, open_bracket, close_bracket): self.open = open_tag self.close = close_tag self._open_bracket = open_bracket self._close_bracket = close_bracket link_displ = Brackets("OPENING__LEFT__BRACKET", "CLOSING__RIGHT__BRACKET", "[", "]") proc_code = Brackets( "OPENING__PROCESSOR__CODE", "CLOSING__PROCESSOR__CODE", "```", "```" ) proc_td = Brackets( "OPENING__PROCESSOR__TD", "CLOSING__PROCESSOR__TD", r'
', r"
" ) class SourceUrlConversionHelper: """ Conversion helper for pattern involving url-data from source configuration. """ class regex(Enum): pass def __init__(self, url): self._re = {} if not url: # path might be optional dependend on configuration return for reg in self.regex: expr, path, argument = reg.value if isinstance(path, Enum): path = path.value if path is None: # path might be optional dependend on configuration continue path = os.path.join(url, path) self._re[reg] = re.compile(r"%s%s" % (self._url_pattern(path), expr)) def _url_pattern(self, url): pattern = url.replace("https", "https?") pattern = pattern.replace(".", "\\.") return pattern def sub(self, text): if not len(self._re): # all expressions are optional and not activ return text for reg in self._re.keys(): expr, path, argument = reg.value text = self._re[reg].sub(argument, text) return text class TracUrlConversionHelper(SourceUrlConversionHelper): """ Conversion helper for pattern involving the Trac url. """ class regex(Enum): """ """ def convert_wiki_link(match): trac_path = match.group(1) if trac_path in wiki_path_conversion_table: wiki_path = wiki_path_conversion_table[trac_path] return os.path.join(target_url_wiki, wiki_path) return match.group(0) def convert_ticket_attachment(match): ticket_id = match.group(1) filename = match.group(2) if keep_trac_ticket_references: return os.path.join(trac_url_attachment, "ticket", ticket_id, filename) return gh_attachment_url(ticket_id, filename) TICKET1 = [r"/(\d+)#comment:(\d+)?", subdir.ticket, r"ticket:\1#comment:\2"] TICKET2 = [ r"/(\d+)#comment:(\d+)?", subdir.ticket.optional_path(), r"ticket:\1#comment:\2", ] TICKET3 = [r"/(\d+)", subdir.ticket, r"%s/issues/\1" % target_url_issues_repo] TICKET4 = [ r"/(\d+)", subdir.ticket.optional_path(), r"%s/issues/\1" % target_url_issues_repo, ] WIKI1 = [r"/([/\-\w0-9@:%._+~#=]+)", subdir.wiki, convert_wiki_link] ATTACHMENT1 = [ r"/(\d+)/([/\-\w0-9@:%._+~#=]+)", subdir.attachment_ticket, convert_ticket_attachment, ] ATTACHMENT2 = [ r"/(\d+)/([/\-\w0-9@:%._+~#=]+)", subdir.attachment_ticket.optional_path(), convert_ticket_attachment, ] ATTACHMENT3 = [ r"/(\d+)/([/\-\w0-9@:%._+~#=]+)", subdir.raw_attachment_ticket, convert_ticket_attachment, ] ATTACHMENT4 = [ r"/(\d+)/([/\-\w0-9@:%._+~#=]+)", subdir.raw_attachment_ticket.optional_path(), convert_ticket_attachment, ] class CgitConversionHelper(SourceUrlConversionHelper): """ Conversion helper for pattern involving the cgit web interface. """ class regex(Enum): """ """ def convert_git_link_diff1(match): path = match.group(1) hash1 = match.group(2) return os.path.join(target_url_git_repo, "blob", hash1, path) def convert_git_link_diff2(match): path = match.group(1) hash1 = match.group(2) return os.path.join(target_url_git_repo, "compare", hash1 + "..." + path) def convert_git_link_diff3(match): hash1 = match.group(1) hash2 = match.group(2) return os.path.join(target_url_git_repo, "compare", hash1 + "..." + hash2) def convert_git_link_diff4(match): hash1 = match.group(1) return os.path.join(target_url_git_repo, "commit", hash1) def convert_git_link_diff5(match): path1 = match.group(1) path2 = match.group(2) hash1 = match.group(3) return os.path.join(target_url_git_repo, "compare", hash1 + "..." + path2) def convert_git_link_diff6(match): branch = match.group(1) path = match.group(2) return os.path.join(target_url_git_repo, "compare", path + "..." + branch) def convert_git_link_diff7(match): branch = match.group(1) return os.path.join(target_url_git_repo, "commits", branch) def convert_git_link_diff8(match): branch = match.group(1) return os.path.join(target_url_git_repo, "commits", branch) def convert_git_link_commit1(match): path = match.group(1) hash1 = match.group(2) return os.path.join(target_url_git_repo, "commit", hash1) def convert_git_link_commit3(match): hash1 = match.group(1) return os.path.join(target_url_git_repo, "commit", hash1) def convert_git_link_commit4(match): path = match.group(1) return os.path.join(target_url_git_repo, "commits", path) def convert_git_link_commit5(match): path = match.group(1) return os.path.join(target_url_git_repo, "commit", path) def convert_git_link_commit6(match): path = match.group(1) branch = match.group(2) hash1 = match.group(3) return os.path.join(target_url_git_repo, "compare", hash1 + "..." + branch) def convert_git_link_commit7(match): path = match.group(1) hash1 = match.group(2) return os.path.join(target_url_git_repo, "commit", hash1, path) def convert_git_link_tree1(match): path = match.group(1) return os.path.join(target_url_git_repo, "blob/develop", path) def convert_git_link_tree2(match): branch = match.group(1) return os.path.join(target_url_git_repo, "tree", branch) def convert_git_link_log1(match): path = match.group(1) return os.path.join(target_url_git_repo, "commits", path) def convert_git_link_log3(match): hash1 = match.group(1) hash2 = match.group(2) hash3 = match.group(3) return os.path.join(target_url_git_repo, "compare", hash1 + "..." + hash2) def convert_git_link_log4(match): path = match.group(1) hash1 = match.group(2) return os.path.join( target_url_git_repo, "commits", "develop?after=" + hash1 + "+0" + "&branch=develop" + "&path%5B%5D=" + "&path%5B%5D=".join(path.split("/")) + "&qualified_name=refs%2Fheads%2Fdevelop", ) def convert_git_link_log5(match): path = match.group(1) return os.path.join(target_url_git_repo, "commits/develop", path) def convert_git_link_plain(match): path = match.group(1) branch = match.group(2) return os.path.join(target_url_git_repo, "blob", branch, path) def convert_git_link_patch(match): hash1 = match.group(1) return os.path.join(target_url_git_repo, "commit", hash1 + ".patch") def convert_git_link(match): # catch all missed git link import pdb pdb.set_trace() DIFF1 = [ r"/([/\-\w0-9@:%._+~#=]+)\?id=([0-9a-f]+)", cgit_cmd.diff, convert_git_link_diff1, ] DIFF2 = [ r"/?\?h=([/\-\w0-9@:%._+~#=]+)&id2=([0-9a-f]+)", cgit_cmd.diff, convert_git_link_diff2, ] DIFF3 = [ r"/?\?id2?=([0-9a-f]+)&id=([0-9a-f]+)", cgit_cmd.diff, convert_git_link_diff3, ] DIFF4 = [r"/?\?id=([0-9a-f]+)", cgit_cmd.diff, convert_git_link_diff4] DIFF5 = [ r"/?([/\-\w0-9@:%._+~#=]+)\?h=([/\-\w0-9@:%._+~#=]+)&id=([0-9a-f]+)", cgit_cmd.diff, convert_git_link_diff5, ] DIFF6 = [ r"/?\?id2=([/\-\w0-9@:%._+~#=]+)&id=([/\-\w0-9@:%._+~#=]+)", cgit_cmd.diff, convert_git_link_diff6, ] DIFF7 = [r"/?\?h=([/\-\w0-9@:%._+~#=]+)", cgit_cmd.diff, convert_git_link_diff7] DIFF8 = [ r"/?\?id=([/\-\w0-9@:%._+~#=]+)", cgit_cmd.diff, convert_git_link_diff8, ] COMMIT1 = [ r"/?\?h=([/\-\w0-9@:%._+~#=]+)&id=([0-9a-f]+)", cgit_cmd.commit, convert_git_link_commit1, ] COMMIT2 = [ r"id=([0-9a-f]+)", cgit_cmd.commit, convert_git_link_commit3, ] # misspelled COMMIT3 = [r"/?\?id=([0-9a-f]+)", cgit_cmd.commit, convert_git_link_commit3] COMMIT4 = [ r"/?\?h=([/\-\w0-9@:%._+~#=]+)", cgit_cmd.commit, convert_git_link_commit4, ] COMMIT5 = [ r"/?\?id=([/\-\w0-9@:%._+~#=]+)", cgit_cmd.commit, convert_git_link_commit5, ] COMMIT6 = [ r"/([/\-\w0-9@:%._+~#=]+)\?h=([/\-\w0-9@:%._+~#=]+)&id=([0-9a-f]+)", cgit_cmd.commit, convert_git_link_commit6, ] COMMIT7 = [ r"/([/\-\w0-9@:%._+~#=]+)\?id=([0-9a-f]+)", cgit_cmd.commit, convert_git_link_commit7, ] COMMIT8 = [ r"/([/\-\w0-9@:%._+~#=]+)\?h=([0-9a-f]+)", cgit_cmd.commit, convert_git_link_commit7, ] TREE1 = [r"/([/\-\w0-9@:%._+~#=]+)", cgit_cmd.tree, convert_git_link_tree1] TREE2 = [r"/?\?h=([/\-\w0-9@:%._+~#=]+)", cgit_cmd.tree, convert_git_link_tree2] TREE3 = [r"/src/?", cgit_cmd.tree, r"%s/blob/master/src" % target_url_git_repo] LOG1 = [r"/?\?h=([/\-\w0-9@:%._+~#=]+)", cgit_cmd.log, convert_git_link_log1] LOG2 = [ r"/?\?q=([0-9a-f]+)..([0-9a-f]+)&h=([0-9a-f]+)&qt=range", cgit_cmd.log, convert_git_link_log3, ] LOG3 = [ r"/?([/\-\w0-9@:%._+~#=]+)\?h=([0-9a-f]+)", cgit_cmd.log, convert_git_link_log4, ] LOG4 = [r"/?([/\-\w0-9@:%._+~#=]+)", cgit_cmd.log, convert_git_link_log5] PLAIN1 = [ r"/([/\-\w0-9@:%._+~#=]+)\?h=([/\-\w0-9@:%._+~#=]+)", cgit_cmd.plain, convert_git_link_plain, ] PATCH1 = [r"/?\?id=([0-9a-f]+)", cgit_cmd.patch, convert_git_link_patch] REFS1 = [r"/?", cgit_cmd.refs, r"%s/branches" % target_url_git_repo] TAG1 = [ r"/?\?id=([/\-\w0-9@:%._+~#=]+)", cgit_cmd.tag, r"%s/releases/tag/\1" % target_url_git_repo, ] DEF = [r"/(.*)", cgit_cmd.default, convert_git_link] # catch all missed trac_url_conv_help = TracUrlConversionHelper(trac_url_dir) cgit_conv_help = CgitConversionHelper(cgit_url) RE_WRONG_FORMAT1 = re.compile(r"comment:(\d+):ticket:(\d+)") RE_REPLYING_TO = re.compile(r"Replying to \[comment:(\d+)\s([\-\w0-9@._]+)\]") RE_REPLYING_TO_TICKET = re.compile(r"Replying to \[ticket:(\d+)\s([\-\w0-9@._]+)\]") def inline_code_snippet(match): code = match.group(1) code = code.replace(r"@", at_sign.tag) code = code.replace(r"\\", linebreak_sign1.tag) code = code.replace(r"[[br]]", linebreak_sign2.tag) code = code.replace(r"[[BR]]", linebreak_sign3.tag) if "`" in code: return "" + code.replace("`", r"\`") + "" else: return "`" + code + "`" def convert_replying_to(match): comment_id = match.group(1) username = match.group(2) name = convert_trac_username(username) if name: # github username name = "@" + name else: name = username return "Replying to [comment:{} {}]".format(comment_id, name) def convert_replying_to_ticket(match): ticket_id = match.group(1) username = match.group(2) name = convert_trac_username(username) if name: # github username name = "@" + name else: name = username return "Replying to [ticket:{}#comment:0 {}]".format(ticket_id, name) def commits_list(match): t = match.group(1) + "\n" t += "" for c in match.group(2).split("\n")[2:]: # the first two are blank header if not c: continue m = RE_COMMIT_LIST1.match(c) if m: commit_id = m.group(1) commit_url = m.group(2) commit_msg = m.group(3).replace("\`", "`") t += r''.format( commit_url, commit_id, commit_msg ) else: m = RE_COMMIT_LIST2.match(c) if m: commit_id = m.group(1) commit_url = m.group(2) commit_msg = m.group(3) t += r''.format( commit_url, commit_id, commit_msg ) else: # unusual format m = RE_COMMIT_LIST3.match(c) commit_id = m.group(1) commit_msg = m.group(2) t += ( r"".format( commit_id, commit_msg ) ) t += "
{}{}
{}{}
{}{}
\n" return t def github_mention(match): username = match.group(1) github_username = convert_trac_username(username, is_mention=True) if github_username: return "@" + github_username return "`@`" + username def trac2markdown(text, base_path, conv_help, multilines=default_multilines): # conversion of url text = trac_url_conv_help.sub(text) text = cgit_conv_help.sub(text) # some normalization text = RE_WRONG_FORMAT1.sub(r"ticket:\2#comment:\1", text) text = RE_REPLYING_TO.sub(convert_replying_to, text) text = RE_REPLYING_TO_TICKET.sub(convert_replying_to_ticket, text) text = re.sub("\r\n", "\n", text) text = re.sub(r"\swiki:([a-zA-Z]+)", r" [wiki:\1]", text) text = re.sub(r"\[\[TOC[^]]*\]\]", "", text) text = re.sub(r"(?m)\[\[PageOutline\]\]\s*\n", "", text) if multilines: text = re.sub(r"^\S[^\n]+([^=-_|])\n([^\s`*0-9#=->-_|])", r"\1 \2", text) def heading_replace(match): """ Return the replacement for the heading """ level = len(match.group(1)) heading = match.group(2).rstrip() if ( not isinstance(conv_help, IssuesConversionHelper) and create_wiki_link_conversion_table ): with open("wiki_path_conversion_table.txt", "a") as f: f.write( conv_help._trac_wiki_path + "#" + heading.replace(" ", "") + " " + conv_help._wiki_path + "#" + heading.replace(" ", "-") ) f.write("\n") # There might be a second item if an anchor is set. # We ignore this anchor since it is automatically # set it GitHub Markdown. return "#" * level + " " + heading a = [] level = 0 in_td = False in_code = False in_html = False in_list = False in_table = False quote_depth_decreased = False block = [] table = [] list_indents = [] previous_line = "" quote_prefix = "" text_lines = text.split("\n") + [""] text_lines.reverse() line = True while text_lines: non_blank_previous_line = bool(line) line = text_lines.pop() # cut quote prefix if line.startswith(quote_prefix): line = line[len(quote_prefix) :] else: if in_code or in_html: # to recover from interrupted codeblock text_lines.append(line) # put it back text_lines.append(quote_prefix + "}}}") line = non_blank_previous_line continue if line: # insert a blank line when quote depth decreased quote_depth_decreased = True quote_prefix = "" if not (in_code or in_html): # quote prefix = "" m = re.match("^((?:>\s)*>\s)", line) if m: prefix += m.group(1) m = re.match("^(>[>\s]*)", line[len(prefix) :]) if m: prefix += m.group(1) quote_prefix += prefix if quote_depth_decreased: a.append(quote_prefix) quote_depth_decreased = False line = line[len(prefix) :] if previous_line: line = previous_line + line previous_line = "" line_temporary = line.lstrip() if line_temporary.startswith("{{{") and in_code: level += 1 elif re.match(r"{{{\s*#!td", line_temporary): in_td = True in_td_level = level in_td_prefix = re.search("{{{", line).start() in_td_n = 0 in_td_defect = 0 line = re.sub(r"{{{\s*#!td", r"%s" % proc_td.open, line) level += 1 elif re.match(r"{{{\s*#!html", line_temporary) and not (in_code or in_html): in_html = True in_html_level = level in_html_prefix = re.search("{{{", line).start() in_html_n = 0 in_html_defect = 0 line = re.sub(r"{{{\s*#!html", r"", line) level += 1 elif re.match(r"{{{\s*#!", line_temporary) and not ( in_code or in_html ): # code: python, diff, ... in_code = True in_code_level = level in_code_prefix = re.search("{{{", line).start() in_code_n = 0 in_code_defect = 0 if non_blank_previous_line: line = "\n" + line line = re.sub(r"{{{\s*#!([^\s]+)", r"%s\1" % proc_code.open, line) level += 1 elif line_temporary.rstrip() == "{{{" and not (in_code or in_html): # check dangling #!... next_line = text_lines.pop() if next_line.startswith(quote_prefix): m = re.match("#!([a-zA-Z]+)", next_line[len(quote_prefix) :].strip()) if m: if m.group(1) == "html": text_lines.append( quote_prefix + line.replace("{{{", "{{{#!html") ) continue line = line.rstrip() + m.group(1) else: text_lines.append(next_line) else: text_lines.append(next_line) in_code = True in_code_level = level in_code_prefix = re.search("{{{", line).start() in_code_n = 0 in_code_defect = 0 if line_temporary.rstrip() == "{{{": if non_blank_previous_line: line = "\n" + line line = line.replace("{{{", proc_code.open, 1) else: if non_blank_previous_line: line = "\n" + line line = line.replace("{{{", proc_code.open + "\n", 1) level += 1 elif line_temporary.rstrip() == "}}}": level -= 1 if in_td and in_td_level == level: in_td = False in_td_prefix = 0 if in_td_defect > 0: for i in range(in_td_n): prev_line = a[-i - 1] a[-i - 1] = ( prev_line[: len(quote_prefix)] + in_td_defect * " " + prev_line[len(quote_prefix) :] ) line = re.sub(r"}}}", r"%s" % proc_td.close, line) elif in_html and in_html_level == level: in_html = False id_html_prefix = 0 if in_html_defect > 0: for i in range(in_html_n): prev_line = a[-i - 1] a[-i - 1] = ( prev_line[: len(quote_prefix)] + in_html_defect * " " + prev_line[len(quote_prefix) :] ) line = re.sub(r"}}}", r"", line) elif in_code and in_code_level == level: in_code = False in_code_prefix = 0 if in_code_defect > 0: for i in range(in_code_n): prev_line = a[-i - 1] a[-i - 1] = ( prev_line[: len(quote_prefix)] + in_code_defect * " " + prev_line[len(quote_prefix) :] ) line = re.sub(r"}}}", r"%s" % proc_code.close, line) else: # adjust badly indented codeblocks if in_td: if line.strip(): indent = re.search("[^\s]", line).start() if indent < in_td_prefix: in_td_defect = max(in_td_defect, in_td_prefix - indent) in_td_n += 1 if in_html: if line.strip(): indent = re.search("[^\s]", line).start() if indent < in_html_prefix: in_html_defect = max(in_html_defect, in_html_prefix - indent) in_html_n += 1 if in_code: if line.strip(): indent = re.search("[^\s]", line).start() if indent < in_code_prefix: in_code_defect = max(in_code_defect, in_code_prefix - indent) in_code_n += 1 # CamelCase wiki link if not (in_code or in_html or in_td): new_line = "" depth = 0 start = 0 end = 0 l = len(line) for i in range(l + 1): if i == l: end = i elif line[i] == "[": if depth == 0: end = i depth += 1 elif line[i] == "]": depth -= 1 if depth == 0: start = i + 1 new_line += line[end:start] if end > start: converted_part = RE_CAMELCASE1.sub( conv_help.camelcase_wiki_link, line[start:end] ) converted_part = RE_CAMELCASE2.sub( conv_help.camelcase_wiki_link, converted_part ) new_line += converted_part start = end line = new_line if not (in_code or in_html): # heading line = re.sub(r"^(\s*)# ", r"\1\# ", line) # first fix unintended heading line = RE_HEADING1.sub(heading_replace, line) line = RE_HEADING2.sub(heading_replace, line) line = RE_HEADING3.sub(heading_replace, line) line = RE_HEADING4.sub(heading_replace, line) line = RE_HEADING5.sub(heading_replace, line) line = RE_HEADING6.sub(heading_replace, line) line = RE_HEADING1a.sub(heading_replace, line) line = RE_HEADING2a.sub(heading_replace, line) line = RE_HEADING3a.sub(heading_replace, line) line = RE_HEADING4a.sub(heading_replace, line) line = RE_HEADING5a.sub(heading_replace, line) line = RE_HEADING6a.sub(heading_replace, line) # code surrounded by underline, mistaken as italics by github line = RE_UNDERLINED_CODE1.sub(r"`_\1_`", line) line = RE_UNDERLINED_CODE2.sub(r"`_\1_`", line) line = RE_UNDERLINED_CODE3.sub(r"`_\1_`", line) # code snippet line = RE_CODE_SNIPPET.sub(inline_code_snippet, line) line = RE_SUPERSCRIPT1.sub(r"\1", line) # superscript ^abc^ line = RE_SUBSCRIPT1.sub(r"\1", line) # subscript ,,abc,, line = RE_QUERY1.sub( r"[%s?" % trac_url_query, line ) # preconversion to URL format line = RE_HTTPS1.sub(conv_help.wiki_link, line) line = RE_HTTPS2.sub(conv_help.wiki_link, line) # link without display text line = RE_HTTPS3.sub(conv_help.wiki_link, line) line = RE_HTTPS4.sub(conv_help.wiki_link, line) line = RE_IMAGE1.sub(conv_help.image_link_under_tree, line) line = RE_IMAGE2.sub(conv_help.image_link, line) line = RE_IMAGE3.sub(conv_help.image_link, line) line = RE_IMAGE4.sub(r'', line) line = RE_IMAGE5.sub(conv_help.wiki_image, line) # \2 is image width line = RE_IMAGE6.sub( conv_help.image_link, line ) # \2 is image width, \3 is alignment line = RE_TICKET_COMMENT1.sub(conv_help.ticket_comment_link, line) line = RE_TICKET_COMMENT2.sub(conv_help.ticket_comment_link, line) line = RE_TICKET_COMMENT3.sub(conv_help.ticket_comment_link, line) line = RE_TICKET_COMMENT4.sub(conv_help.ticket_comment_link, line) line = RE_TICKET_COMMENT5.sub(conv_help.ticket_comment_link, line) line = RE_TICKET_COMMENT6.sub(conv_help.ticket_comment_link, line) line = RE_COMMENT1.sub(conv_help.comment_link, line) line = RE_COMMENT2.sub(conv_help.comment_link, line) line = RE_COMMENT3.sub(conv_help.comment_link, line) line = RE_COMMENT4.sub(conv_help.comment_link, line) line = RE_ATTACHMENT1.sub(conv_help.attachment, line) line = RE_ATTACHMENT2.sub(conv_help.attachment, line) line = RE_ATTACHMENT3.sub(conv_help.attachment, line) line = RE_ATTACHMENT4.sub(conv_help.attachment, line) line = RE_ATTACHMENT5.sub(conv_help.attachment, line) line = RE_ATTACHMENT6.sub(conv_help.attachment, line) line = RE_ATTACHMENT7.sub(conv_help.attachment, line) line = RE_ATTACHMENT8.sub(conv_help.attachment, line) if in_table: line = RE_LINEBREAK1.sub("
", line) line = RE_LINEBREAK2.sub("
", line) else: line = RE_LINEBREAK1.sub("\n", line) line = RE_LINEBREAK2.sub("\n", line) line = RE_WIKI4.sub( conv_help.wiki_link, line ) # for pagenames containing whitespaces line = RE_WIKI5.sub(conv_help.wiki_link, line) line = RE_WIKI6.sub(conv_help.wiki_link, line) # link without display text line = RE_WIKI7.sub(conv_help.wiki_link, line) line = RE_SOURCE1.sub( r"[\2](%s/\1)" % os.path.relpath("/tree/master/", base_path), line ) line = RE_SOURCE2.sub( r"[\1](%s/\1)" % os.path.relpath("/tree/master/", base_path), line ) line = RE_BOLDTEXT1.sub(r"**\1**", line) line = RE_ITALIC1.sub(r"*\1*", line) line = RE_ITALIC2.sub(r"*\1*", line) line = RE_TICKET1.sub(r" #\1", line) # replace global ticket references line = RE_TICKET2.sub(conv_help.ticket_link, line) # to avoid unintended github mention line = RE_GITHUB_MENTION1.sub(github_mention, line) line = RE_GITHUB_MENTION2.sub(github_mention, line) if RE_RULE.match(line): if not a or not a[-1].strip(): line = "---" else: line = "\n---" line = RE_NO_CAMELCASE.sub( r"\1", line ) # no CamelCase wiki link because of leading "!" # convert a trac table to a github table if line.startswith("||"): if not in_table: # header row if line.endswith("||\\"): previous_line = line[:-3] continue elif line.endswith("|| \\"): previous_line = line[:-4] continue # construct header separator parts = line.split("||") sep = [] for part in parts: if part.startswith("="): part = part[1:] start = ":" else: start = "" if part.endswith("="): part = part[:-1] end = ":" else: end = "" sep.append(start + "-" * len(part) + end) sep = "||".join(sep) if ":" in sep: line = line + "\n" + sep else: # perhaps a table without header; github table needs header header = re.sub(r"[^|]", " ", sep) line = header + "\n" + sep + "\n" + line in_table = True # The wiki markup allows the alignment directives to be specified on a cell-by-cell # basis. This is used in many examples. AFAIK this can't be properly translated into # the GitHub markdown as it only allows to align statements column by column. line = line.replace("||=", "||") # ignore cellwise align instructions line = line.replace("=||", "||") # ignore cellwise align instructions line = line.replace("||", "|") # lists if in_list: if line.strip(): indent = re.search("[^\s]", line).start() if indent > list_leading_spaces: line = line[list_leading_spaces:] # adjust slightly-malformed paragraph in list for right indent -- fingers crossed indent = re.search("[^\s]", line).start() if indent == 1 and list_indents[0][1] == "*": line = " " + line elif indent == 1 and list_indents[0][1] == "-": line = " " + line elif indent in [1, 2] and list_indents[0][1] not in ["*", "-"]: line = (3 - indent) * " " + line elif indent < list_leading_spaces: in_list = False list_indents = [] elif indent == list_leading_spaces: l = line[indent:] if not ( l.startswith("* ") or l.startswith("- ") or re.match("^[^\s]+\.\s", l) ): in_list = False list_indents = [] else: line = line[list_leading_spaces:] l = line.lstrip() if l.startswith("* ") or l.startswith("- ") or re.match("^[^\s]+\.\s", l): if not in_list: list_leading_spaces = re.search("[^\s]", line).start() line = line[list_leading_spaces:] in_list = True indent = re.search("[^\s]", line).start() for i in range(len(list_indents)): d, t, c = list_indents[i] if indent == d: if line[indent] == t: c += 1 else: t = line[indent] c = 1 list_indents = list_indents[:i] + [(d, t, c)] break else: d = indent t = line[indent] c = 1 list_indents.append((d, t, c)) if t in ["*", "-"]: # depth = 0 # for dd, tt, cc in list_indents: # if tt == t: # depth += 1 pass elif t == "a": line = line.replace("a", chr(ord("a") + c - 1), 1) elif t == "1": line = line.replace("1", str(c), 1) elif t == "i": line = line.replace("i", toRoman(c).lower(), 1) # take care of line break "\\", which often occurs in code snippets l = len(line) new_line = "" start = 0 inline_code = False for i in range(l + 1): if i == l or line[i] == "`": end = i part = line[start:end] if not inline_code: if in_table: part = RE_LINEBREAK3.sub("
", part) else: part = RE_LINEBREAK3.sub("\n", part) part = RE_WIKI1.sub(conv_help.wiki_link, part) part = RE_WIKI2.sub(conv_help.wiki_link, part) part = RE_WIKI3.sub(conv_help.wiki_link, part) new_line += part start = end if i < l and line[i] == "`": if not inline_code: inline_code = True else: inline_code = False line = new_line # only for table with td blocks: if in_table: if line == "|\\" or line == "| \\": # leads td block block = [] continue if line == "|": table.append("|" + "NEW__LINE".join(block) + "|") block = [] continue if line.startswith(proc_td.open): if len(block) > 1: block.append("|") block.append(line) continue if in_td: line = re.sub("\n", "NEW__LINE", line) block.append(line) continue if line.startswith(proc_td.close): block.append(line) continue if line.startswith("|"): if line.endswith("|\\"): previous_line = line[:-2].replace( "|", "||" ) # restore to trac table row elif line.endswith("| \\"): previous_line = line[:-3].replace( "|", "||" ) # restore to trac table row else: table.append(line) continue if block: # td block may not be terminated by "|" (or trac "||") table.append("|" + "NEW__LINE".join(block) + "|") block = [] if table: table_text = "\n".join(table) if proc_td.open in table_text: html = markdown.markdown( table_text, extensions=[TableExtension(use_align_attribute=True)], ) html = proc_td.replace(html) else: html = table_text line = html.replace("NEW__LINE", "\n") + "\n" + line table = [] in_table = False for l in line.split("\n"): a.append(quote_prefix + l) a = a[:-1] text = "\n".join(a) # close unclosed codeblock if in_code or in_html: text += "\n%s" % proc_code.close # remove artifacts text = proc_code.replace(text) text = link_displ.replace(text) text = at_sign.replace(text) text = linebreak_sign1.replace(text) text = linebreak_sign2.replace(text) text = linebreak_sign3.replace(text) # Some rewritings text = RE_COLOR.sub(r"$\\textcolor{\1}{\\text{\2}}$", text) text = RE_TRAC_REPORT.sub(r"[Trac report of id \1](%s/\1)" % trac_url_report, text) text = RE_NEW_COMMITS.sub(commits_list, text) text = RE_LAST_NEW_COMMITS.sub(commits_list, text) text = unescape(text) return text def escape(text): text = text.replace("comment:", "COMMENT__COLON") return text def unescape(text): text = text.replace("COMMENT__COLON", "comment:") return text class WikiConversionHelper: """ A class that provides conversion methods that depend on information collected at startup, such as Wiki page names and configuration flags. """ def __init__(self, source=None, pagenames=None, sep="/"): """ The Python constructor collects all the necessary information. """ if source: pagenames = source.wiki.getAllPages() pagenames_splitted = [] for p in pagenames: pagenames_splitted += parse.unquote(str(p)).split(sep) pagenames_not_splitted = [ parse.unquote(str(p)) for p in pagenames if p not in pagenames_splitted ] self._pagenames_splitted = pagenames_splitted self._pagenames_not_splitted = pagenames_not_splitted self._attachment_path = "" def set_wikipage_paths(self, pagename, sep="/"): """ Set paths from the wiki pagename """ pagename = parse.unquote(str(pagename)) gh_pagename = " ".join(pagename.split(sep)) self._attachment_path = ( gh_pagename # attachment_path for the wiki_image method ) self._trac_wiki_path = parse.quote(pagename) self._wiki_path = gh_pagename.replace(" ", "/") if create_wiki_link_conversion_table: with open("wiki_path_conversion_table.txt", "a") as f: f.write(self._trac_wiki_path + " " + self._wiki_path) f.write("\n") def attachment(self, match): filename = match.group(1) if len(match.groups()) >= 2: label = match.group(2) else: label = "attachment:" + filename if not re.fullmatch("[-A-Za-z0-9_.]*", filename): import pathlib from hashlib import md5 extension = pathlib.Path(filename).suffix filename = md5(filename.encode("utf-8")).hexdigest() + extension return r"[%s](%s)" % (label, os.path.join(self._attachment_path, filename)) def ticket_link(self, match): """ Return a formatted string that replaces the match object found by re in the case of a Trac ticket link. """ ticket = match.groups()[0] if keep_trac_ticket_references: return r"[#%s](%s/%s)" % (ticket, trac_url_ticket, ticket) issue = ticket return r"[#%s](%s/issues/%s)" % (issue, target_url_issues_repo, issue) def ticket_comment_link(self, match): """ Return a formatted string that replaces the match object found by re in the case of a Trac ticket comment link. """ ticket = match.group(1) comment = match.group(2) if len(match.groups()) < 3: label = "#{} comment:{}".format(ticket, comment) else: label = match.group(3) if keep_trac_ticket_references: return escape( r"[%s](%s/%s#comment:%s)" % (label, trac_url_ticket, ticket, comment) ) return escape( r"[%s](%s/issues/%s#comment:%s)" % (label, target_url_issues_repo, ticket, comment) ) def comment_link(self, match): """ Return a formatted string that replaces the match object found by re in the case of a comment link. """ comment = match.group(1) if len(match.groups()) < 2: label = "comment:{}".format(comment) else: label = match.group(2) return escape( r"%s%s%s(#comment%s%s)" % (link_displ.open, label, link_displ.close, "%3A", comment) ) def image_link(self, match): """ Return a formatted string that replaces the match object found by re in the case of a image link. """ filename = match.group(1) if len(match.groups()) < 2: descr = "" else: descr = match.group(2) return r"!%s%s%s(%s)" % (link_displ.open, descr, link_displ.close, filename) def image_link_under_tree(self, match): """ Return a formatted string that replaces the match object found by re in the case of a image link under the tree path. """ mg = match.groups() filename = mg[0] path = os.path.relpath("/tree/master/") return r"!%s%s(%s/\1)" % (link_displ.open, link_displ.close, filename, path) def wiki_image(self, match): """ Return a formatted string that replaces the match object found by re in the case of a wiki link to an attached image. """ mg = match.groups() filename = os.path.join(self._attachment_path, mg[0]) if len(mg) > 1: return r'' % (filename, mg[1]) else: return r'' % filename def protect_wiki_link(self, display, link): """ Return the given string encapsuled with protection tags. These will be replaced at the end of conversion of a line by the brackets (see method `link_displ.replace`). This is needed to avoid a mixture with Trac wiki syntax. """ return r"%s%s%s(%s)" % (link_displ.open, display, link_displ.close, link) def wiki_link(self, match): """ Return a formatted string that replaces the match object found by re in the case of a link to a wiki page. """ mg = match.groups() pagename = mg[0] if len(mg) > 1: display = mg[1] if not display: display = pagename else: display = pagename # take care of section references pagename_sect = pagename.split("#") pagename_ori = pagename if len(pagename_sect) > 1: pagename = pagename_sect[0] if not display: display = pagename_sect[1] if pagename.startswith("http"): link = pagename_ori.strip() return self.protect_wiki_link(display, link) elif pagename in self._pagenames_splitted: link = pagename_ori.replace(" ", "-") return self.protect_wiki_link(display, link) elif pagename in self._pagenames_not_splitted: link = pagename_ori.replace("/", " ").replace( " ", "-" ) # convert to github link return self.protect_wiki_link(display, link) else: # We assume that this is a m = re.fullmatch(r"[a-zA-Z]+[?]?", pagename) if m: macro = m.group(0) args = None else: m = re.fullmatch(r"([a-zA-Z]+[?]?)\((.+)\)", pagename) if m: macro = m.group(1) args = m.group(2) else: macro = None args = None if macro: display = " link = "%s/WikiMacros#%s-macro" % (trac_url_wiki, macro) else: return ( link_displ.open + link_displ.open + mg[0] + link_displ.close + link_displ.close ) if args: args = args.replace("|", r"\|") return self.protect_wiki_link("%s(%s)" % (display, args), link) return self.protect_wiki_link(display, link) def camelcase_wiki_link(self, match): """ Return a formatted string that replaces the match object found by re in the case of a link to a wiki page recognized from CamelCase. """ if match.group(1) in self._pagenames_splitted: return self.wiki_link(match) return match.group(0) class IssuesConversionHelper(WikiConversionHelper): """ A class that provides conversion methods that depend on information collected at startup, such as Wiki page names and configuration flags. """ def set_ticket_paths(self, ticket_id): """ Set paths from the ticket id. """ self._ticket_id = ticket_id def attachment(self, match): filename = match.group(1) if len(match.groups()) >= 2: label = match.group(2) else: label = "attachment: " + filename if keep_trac_ticket_references: url = "%s/ticket/%s/%s" % ( trac_url_attachment, str(self._ticket_id), filename, ) else: a, _, _ = gh_create_attachment(dest, None, filename, self._ticket_id, None) if a.url.endswith(".gz"): filename += ".gz" url = os.path.join( attachment_export_url, attachment_path(self._ticket_id, filename) ) return r"[%s](%s)" % (label, url) def image_link(self, match): """ Return a formatted string that replaces the match object found by re in the case of a image link. """ filename = match.group(1) if len(match.groups()) == 1: descr = "" elif len(match.groups()) == 2: descr = match.group(2) else: if match.group(2).startswith("width="): width = match.group(2)[6:] alignment = match.group(3) elif match.group(3).startswith("width="): width = match.group(3)[6:] alignment = match.group(2) else: width = "100%" alignment = "center" if keep_trac_ticket_references: url = "%s/ticket/%s/%s" % ( trac_url_attachment, str(self._ticket_id), filename, ) else: if filename.startswith("http"): url = filename elif filename.startswith("ticket:"): _, ticket_id, fname = filename.split(":") url = os.path.join( attachment_export_url, attachment_path(ticket_id, fname) ) else: url = os.path.join( attachment_export_url, attachment_path(self._ticket_id, filename) ) if len(match.groups()) == 3: return r'
' % ( alignment, url, width, ) return r"!%s%s%s(%s)" % (link_displ.open, descr, link_displ.close, url) def wiki_link(self, match): """ Return a formatted string that replaces the match object found by re in the case of a link to a wiki page. """ mg = match.groups() pagename = mg[0] if len(mg) > 1: display = mg[1] if not display: display = pagename else: display = pagename # take care of section references pagename_sect = pagename.split("#") pagename_ori = pagename if len(pagename_sect) > 1: pagename = pagename_sect[0] if not display: display = pagename_sect[1] if pagename.startswith("http"): link = pagename_ori.strip() return self.protect_wiki_link(display, link) elif pagename in self._pagenames_splitted: link = pagename_ori.replace(" ", "") if link in wiki_path_conversion_table: link = wiki_path_conversion_table[link] else: link = pagename_ori.replace(" ", "-") return self.protect_wiki_link(display, "../wiki/" + link) elif pagename in self._pagenames_not_splitted: link = pagename_ori.replace(" ", "") if link in wiki_path_conversion_table: link = wiki_path_conversion_table[link] else: link = pagename_ori.replace(" ", "-") return self.protect_wiki_link(display, "../wiki/" + link) else: # We assume that this is a m = re.fullmatch(r"[a-zA-Z]+[?]?", pagename) if m: macro = m.group(0) args = None else: m = re.fullmatch(r"([a-zA-Z]+[?]?)\((.+)\)", pagename) if m: macro = m.group(1) args = m.group(2) else: macro = None args = None if macro: display = " link = "%s/WikiMacros#%s-macro" % (trac_url_wiki, macro) else: return ( link_displ.open + link_displ.open + mg[0] + link_displ.close + link_displ.close ) if args: args = args.replace("|", r"\|") return self.protect_wiki_link("%s(%s)" % (display, args), link) return self.protect_wiki_link(display, link) def github_ref_url(ref): if re.fullmatch(r"[0-9a-f]{40}", ref): # commit sha return f"{target_url_git_repo}/commit/{ref}" else: # assume branch return f"{target_url_git_repo}/tree/{ref}" def github_ref_markdown(ref): url = github_ref_url(ref) if re.fullmatch(r"[0-9a-f]{40}", ref): # shorten displayed commit sha and use monospace ref = "`" + ref[:7] + "`" return f"[{ref}]({url})" def convert_xmlrpc_datetime(dt): # datetime.strptime(str(dt), "%Y%m%dT%X").isoformat() + "Z" return datetime.strptime(str(dt), "%Y%m%dT%H:%M:%S") def convert_trac_datetime(dt): return datetime.strptime(str(dt), "%Y-%m-%d %H:%M:%S") def map_tickettype(tickettype): "Return GitHub label corresponding to Trac ``tickettype``" if not tickettype: return None if tickettype == "defect": return "t: bug" if tickettype == "enhancement": return "t: enhancement" # if tickettype == 'clarification': # return 'question' # if tickettype == 'task': # return 'enhancement' if tickettype == "PLEASE CHANGE": return None # return tickettype.lower() return None def map_resolution(resolution): "Return GitHub label corresponding to Trac ``resolution``" if resolution == "fixed": return None if not resolution: return None return "r: " + resolution component_frequency = defaultdict(lambda: 0) def map_component(component): "Return GitHub label corresponding to Trac ``component``" if component == "PLEASE CHANGE": return None component = component.replace("_", " ").lower() try: label = components_to_labels[component] except KeyError: # Prefix it with "c: " so that they show up as one group in the GitHub dropdown list label = f"c: {component}" component_frequency[label] += 1 return label default_priority = None def map_priority(priority): "Return GitHub label corresponding to Trac ``priority``" if priority == default_priority: return None try: numerical_priority = 5 - [ "trivial", "minor", "major", "critical", "blocker", ].index(priority) except ValueError: return priority return f"p: {priority} / {numerical_priority}" default_severity = "normal" def map_severity(severity): "Return GitHub label corresponding to Trac ``severity``" if severity == default_severity: return None return severity def map_status(status): "Return a pair: (status, label)" status = status.lower() if status in ["needs_review", "needs_work", "needs_info", "positive_review"]: return "open", "s: " + status.replace("_", " ") elif status in [ "", "new", "assigned", "analyzed", "reopened", "open", "needs_info_new", ]: return "open", None elif status in ["closed"]: return "closed", None else: log.warning("unknown ticket status: " + status) return "open", status.replace("_", " ") keyword_frequency = defaultdict(lambda: 0) def map_keywords(keywords): "Return a pair: (list of keywords for ticket description, list of labels)" keep_as_keywords = [] labels = [] keywords = keywords.replace(";", ",") has_comma = "," in keywords for keyword in keywords.split(","): keyword = keyword.strip() if not keyword: continue if keywords_to_labels is True: labels.append(keyword) elif ( isinstance(keywords_to_labels, dict) and keyword.lower() in keywords_to_labels ): labels.append(keywords_to_labels[keyword.lower()]) else: keep_as_keywords.append(keyword) keyword_frequency[keyword.lower()] += 1 if not has_comma: # Maybe not a phrase but whitespace-separated keywords words = keywords.split() if len(words) > 1: for word in words: word = word.lower() if ( isinstance(keywords_to_labels, dict) and word in keywords_to_labels ): # Map to label but don't remove from keywords because it may be part of a phrase. labels.append(keywords_to_labels[word]) else: keyword_frequency[word] += 1 return keep_as_keywords, labels milestone_map = {} unmapped_milestones = defaultdict(lambda: 0) def map_milestone(title): "Return a pair: (milestone title, label)" if not title: return None, None title = title.lower() if title in milestones_to_labels.keys(): return None, milestones_to_labels[title] # some normalization if re.match("^[0-9]", title): title = milestone_prefix_to + title if re.fullmatch("%s[1-9]" % milestone_prefix_from, title): title = title + ".0" if title in canceled_milestones.keys(): title = canceled_milestones[title] return title, None def gh_create_milestone(dest, milestone_data): if dest is None: return None milestone = dest.create_milestone(user=gh_user_url(dest, "git"), **milestone_data) sleep(sleep_after_request) return milestone def gh_ensure_label(dest, labelname, label_color=None, label_category=None): if dest is None or labelname is None: return labelname = labelname.lower() if labelname in gh_labels: return if label_color is None: label_color = labelcolor.get(labelname) if label_color is None: label_color = labelcolor[label_category] log.info('Create label "%s" with color #%s' % (labelname, label_color)) gh_label = dest.create_label(labelname, label_color) gh_labels[labelname] = gh_label sleep(sleep_after_request) def gh_create_issue(dest, issue_data): if dest is None: return None if "labels" in issue_data: labels = [gh_labels[label.lower()] for label in issue_data.pop("labels")] else: labels = GithubObject.NotSet description = issue_data.pop("description") if github: description_pre = "" description_pre += "Original creator: " + issue_data.pop("user") + "\n\n" description_pre += ( "Original creation time: " + str(issue_data.pop("created_at")) + "\n\n" ) description = description_pre + description else: user_url = gh_user_url(dest, issue_data["user"]) if user_url: issue_data["user"] = user_url ## assignee = issue_data.pop('assignee', GithubObject.NotSet) ## if assignee is GithubObject.NotSet: ## assignees = [] ## else: ## assignees = [assignee] gh_issue = dest.create_issue( issue_data.pop("title"), description, # assignee=assignee, assignees=assignees, milestone=issue_data.pop("milestone", GithubObject.NotSet), labels=labels, **issue_data, ) log.debug(" created issue " + str(gh_issue)) sleep(sleep_after_request) return gh_issue def attachment_path(src_ticket_id, filename): if not re.fullmatch("[-A-Za-z0-9_.]*", filename): import pathlib from hashlib import md5 extension = pathlib.Path(filename).suffix filename = md5(filename.encode("utf-8")).hexdigest() + extension return "ticket" + str(src_ticket_id) + "/" + filename def gh_attachment_url(src_ticket_id, filename): # Example attached to https://github.com/sagemath/trac-to-github/issues/53: # - https://github.com/sagemath/trac-to-github/files/10328066/test_attachment.txt a, local_filename, note = gh_create_attachment( dest, None, filename, src_ticket_id, None ) return a.url mime_type_allowed_extensions = { "application/pdf": [".pdf"], "application/vnd.openxmlformats-officedocument.wordprocessingml.document": [ ".docx" ], "application/vnd.openxmlformats-officedocument.presentationml.presentation": [ ".pptx" ], "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": [".xlsx"], "application/vnd.oasis.opendocument.text": [".odt", ".fodt"], "application/vnd.oasis.opendocument.spreadsheet": [".ods", ".fods"], "application/vnd.oasis.opendocument.presentation": [".odp", ".fodp"], "application/vnd.oasis.opendocument.graphics": [".odg", ".fodg"], "application/vnd.oasis.opendocument.formula": [".odf"], "application/vnd.ms-excel": [".csv", ".xls"], "application/zip": [".zip"], "application/x-zip-compressed": [".zip"], "application/gzip": [".gz", ".tgz"], "application/x-gzip": [".gz", ".tgz"], "text/plain": [".csv", ".txt", ".patch"], "text/x-log": [".log"], "text/csv": [".csv"], "text/comma-separated-values": [".csv"], "application/csv": [".csv"], "application/excel": [".csv"], "application/vnd.msexcel": [".csv"], "text/markdown": [".md"], # as attachments "image/gif": [".gif"], "image/jpeg": [".jpeg", ".jpg"], "image/png": [".png"], } def gh_create_attachment( dest, issue, filename, src_ticket_id, attachment=None, comment=None ): note = None if attachment_export: if github: a_path = attachment_path(src_ticket_id, filename) local_filename = os.path.join(attachment_export_dir, "attachments", a_path) else: match mimetypes.guess_type(filename): case (None, encoding): mimetype = "application/octet-stream" case (mimetype, encoding): pass case mimetype: pass if filename.endswith(".log"): # Python thinks it's text/plain. mimetype = "text/x-log" elif filename.endswith(".gz"): # Python thinks that .tar.gz is application/x-tar mimetype = "application/gzip" logging.info(f"Attachment {filename=} {mimetype=}") allowed_extensions = mime_type_allowed_extensions.get(mimetype, []) if not any(filename.endswith(ext) for ext in allowed_extensions): mimetype = "application/octet-stream" # which is not an allowed mime type, so will be gzipped. # supported types from bbs-exporter-1.5.5/lib/bbs_exporter/attachment_exporter/content_type.rb: if mimetype in ["image/gif", "image/jpeg", "image/png"]: # on GHE attachment URLs are rewritten to "/storage/user" paths, links broken. # so we just did everything via repository_file, not attachment dirname = "files" if issue: create = issue.create_attachment else: # Cannot make it an "attachment"(?) if mimetype not in [ "text/plain", "text/x-log", "application/gzip", "application/zip", ]: # Here we are stricter than what mime_type_allowed_extensions allows. # Replace by a gzipped file if attachment: attachment["attachment"] = gzip.compress( attachment["attachment"] ) filename += ".gz" mimetype = "application/gzip" logging.info(f"Replaced by {filename=} {mimetype=}") dirname = "files" if dest: create = dest.create_repository_file a_path = attachment_path(src_ticket_id, filename) local_filename = os.path.join(migration_archive, dirname, a_path) if github or not attachment: def create(asset_name, asset_content_type, asset_url, **kwds): # Only create the record locally return Attachment( dest._requester, None, {"url": attachment_export_url + a_path}, completed=True, ) os.makedirs(os.path.dirname(local_filename), exist_ok=True) if github: user = None asset_url = None else: user = comment and gh_user_url(dest, comment["user"]) asset_url = "tarball://root/" + dirname + "/" + a_path a = create( filename, mimetype, asset_url, user=user, created_at=comment and comment.get("created_at"), ) logging.info("Attachment link %s" % a.url) if comment: if github: note = "Attachment [%s](%s) by %s created at %s" % ( filename, a.url, comment["user"], comment["created_at"], ) else: note = "Attachment: **[%s](%s)**" % (filename, a.url) elif gh_user is not None: if dest is None: return gistname = dest.name + " issue " + str(issue.number) + " attachment " + filename filecontent = InputFileContent(attachment) try: gist = gh_user.create_gist( False, {gistname: filecontent}, "Attachment %s to issue #%d created by %s at %s" % (filename, issue.number, comment["user"], comment["created_at"]), ) note = "Attachment [%s](%s) by %s created at %s" % ( filename, gist.files[gistname].raw_url, comment["user"], comment["created_at"], ) except UnicodeDecodeError: note = ( "Binary attachment %s by %s created at %s lost by Trac to GitHub conversion." % (filename, comment["user"], comment["created_at"]) ) logging.warning("losing attachment", filename, "in issue", issue.number) sleep(sleep_after_attachment) else: note = "Attachment" return a, local_filename, note minimized_issue_comments = [] local_filenames = dict() # local_filename -> comment_id def gh_comment_issue( dest, issue, comment, src_ticket_id, comment_id=None, minimize=True ): preamble = "" attachments = comment.pop("attachments", []) # upload attachments, if there are any for attachment in attachments: a, local_filename, note = gh_create_attachment( dest, issue, attachment["attachment_name"], src_ticket_id, attachment, comment=comment, ) # write attachment data to binary file if local_filename in local_filenames: logging.warning( f"Overwriting attachment {local_filename} with a new version" ) else: local_filenames[local_filename] = comment_id open(local_filename, "wb").write(attachment["attachment"]) if preamble: preamble += "\n\n" preamble += note if not preamble and github: preamble = "Comment by %s created at %s" % ( comment.pop("user"), comment.pop("created_at"), ) note = comment.pop("note", "") if preamble and note: preamble += "\n\n" note = preamble + note if comment_id: if ( note.startswith("Branch pushed to git repo;") or note.startswith("New commits:") or re.match(r"^Last \d+ new commits:", note) ): anchor = f'
\n\n' else: anchor = f'
comment:{comment_id}
\n\n' note = anchor + note if dest is None: return if not github: user_url = gh_user_url(dest, comment["user"]) if user_url: comment["user"] = user_url c = issue.create_comment(note, **comment) if minimize: minimized_issue_comments.append(c.url) sleep(sleep_after_request) priority_labels = set( map_priority(priority) for priority in ["trivial", "minor", "major", "critical", "blocker"] ) def normalize_labels(dest, labels): if "r: invalid" in labels: if any( x in labels for x in ["r: duplicate", "r: invalid", "r: wontfix", "r: worksforme"] ): # Remove in favor of the more specific label. labels.remove("r: invalid") if any( x in labels for x in ["r: duplicate", "r: invalid", "r: wontfix", "r: worksforme"] ): labels = sorted(set(labels).difference(priority_labels)) return labels def gh_update_issue_property(dest, issue, key, val, oldval=None, **kwds): if dest is None: return if key == "labels": labels = [gh_labels[label.lower()] for label in val if label] labels = normalize_labels(dest, labels) if github: issue.set_labels(*labels) else: oldlabels = [gh_labels[label.lower()] for label in oldval if label] oldlabels = normalize_labels(dest, oldlabels) for label in oldlabels: if label not in labels: # https://docs.github.com/en/developers/webhooks-and-events/events/issue-event-types#unlabeled issue.create_event("unlabeled", label=label, **kwds) for label in labels: if label not in oldlabels: # https://docs.github.com/en/developers/webhooks-and-events/events/issue-event-types#labeled issue.create_event("labeled", label=label, **kwds) elif key == "assignees": if not github: kwds = copy(kwds) kwds["subject"] = kwds.pop("actor") for assignee in oldval: if assignee not in val: # https://docs.github.com/en/developers/webhooks-and-events/events/issue-event-types#unassigneeed issue.create_event("unassigned", actor=assignee, **kwds) for assignee in val: if assignee not in oldval: # https://docs.github.com/en/developers/webhooks-and-events/events/issue-event-types#assigned issue.create_event("assigned", actor=assignee, **kwds) elif key == "assignee": if issue.assignee == val: return if issue.assignees: issue.remove_from_assignees(issue.assignee) if val is not None and val is not GithubObject.NotSet and val != "": issue.add_to_assignees(val) elif key == "state": if github: issue.edit(state=val) else: # https://docs.github.com/en/developers/webhooks-and-events/events/issue-event-types#reopened # https://docs.github.com/en/developers/webhooks-and-events/events/issue-event-types#closed issue.create_event("reopened" if val == "open" else "closed", **kwds) elif key == "description": issue.edit(body=val) elif key == "title": if github: issue.edit(title=val) else: issue.create_event("renamed", title_was=oldval, title_is=val, **kwds) elif key == "milestone": if github: issue.edit(milestone=val) else: if oldval and oldval is not GithubObject.NotSet: # https://docs.github.com/en/developers/webhooks-and-events/events/issue-event-types#demilestoned issue.create_event("demilestoned", milestone=oldval, **kwds) if val and val is not GithubObject.NotSet: # https://docs.github.com/en/developers/webhooks-and-events/events/issue-event-types#milestoned issue.create_event("milestoned", milestone=val, **kwds) else: raise ValueError("Unknown key " + key) sleep(sleep_after_request) unmapped_users = defaultdict(lambda: 0) def convert_trac_username(origname, is_mention=False): if origname in ignored_values: return None if is_mention and origname in ignored_mentions: return None if origname in ignored_names: return None origname = origname.strip("\u200b").rstrip(".") if origname.startswith("gh-"): return origname[3:] if origname.startswith("github/"): # example: https://trac.sagemath.org/ticket/17999 return origname[7:] if origname.startswith("gh:"): # example: https://trac.sagemath.org/ticket/24876 return origname[3:] try: gh_name = users_map[origname] except KeyError: # not a known Trac user assert not origname.startswith("@") if re.fullmatch("[-A-Za-z._0-9]+", origname): # heuristic pattern for valid Trac account name (not an email address or full name or junk) pass else: return None gh_name = False else: if gh_name: return gh_name # create mannequin user username = origname.replace(".", "-").replace("_", "-").strip("-") username = f"{unknown_users_prefix}{username}" if is_mention and not username in gh_users: return None key = (origname, gh_name is not False, is_mention, "@" + username) if not unmapped_users[key]: if is_mention: logging.info(f"Unmapped @ mention of {origname}") else: logging.info(f"Unmapped Trac user {origname}") unmapped_users[key] += 1 return username def gh_username(dest, origname): username = convert_trac_username(origname) if username: _gh_user(dest, username, origname) return "@" + username return origname gh_users = {} def _gh_user(dest, username, origname): try: return gh_users[username] except KeyError: headers, data = dest._requester.requestJsonAndCheck( "GET", f"/users/{username}", input={"name": user_full_names.get(origname)} ) gh_users[username] = NamedUser(dest._requester, headers, data, completed=True) return gh_users[username] def gh_user_url(dest, origname): if origname.startswith("@"): username = origname[1:] origname = None else: username = convert_trac_username(origname) if not username: return None return _gh_user(dest, username, origname).url def gh_user_url_list(dest, orignames, ignore=["somebody", "tbd", "tdb", "tba"]): if not orignames: return [] urls = [] for origname in orignames.split(","): origname = origname.strip() if origname and origname not in ignore: url = gh_user_url(dest, origname) if url: urls.append(url) return urls def gh_username_list(dest, orignames, ignore=["somebody", "tbd", "tdb", "tba"]): "Split and transform comma- separated lists of names" if not orignames: return "" names = [] for origname in orignames.split(","): origname = origname.strip() if origname and origname not in ignore: name = gh_username(dest, origname) names.append(name) return ", ".join(names) @cache.memoize(ignore=[0, "source"]) def get_all_milestones(source): return source.ticket.milestone.getAll() @cache.memoize(ignore=[0, "source"]) def get_milestone(source, milestone_name): return source.ticket.milestone.get(milestone_name) @cache.memoize(ignore=[0, "source"]) def get_changeLog(source, src_ticket_id): while True: try: if sleep_before_xmlrpc: sleep(sleep_before_xmlrpc) return source.ticket.changeLog(src_ticket_id) except Exception as e: print(e) print("Sleeping") sleep(sleep_before_xmlrpc_retry) print("Retrying") @cache.memoize(ignore=[0, "source"]) def get_ticket_attachment(source, src_ticket_id, attachment_name): while True: try: return source.ticket.getAttachment(src_ticket_id, attachment_name) except Exception as e: print(e) print("Sleeping") sleep(sleep_before_xmlrpc_retry) print("Retrying") @cache.memoize() def get_all_tickets(filter_issues): call = client.MultiCall(source) for ticket in source.ticket.query(filter_issues): call.ticket.get(ticket) return call() def convert_issues(source, dest, only_issues=None, blacklist_issues=None): conv_help = IssuesConversionHelper(source) if migrate_milestones: for milestone_name in get_all_milestones(source): milestone = get_milestone(source, milestone_name) log.debug(f"Milestone: {milestone}") title = milestone.pop("name") title, label = map_milestone(title) if title: log.info("Creating milestone " + title) completed = milestone.pop("completed") new_milestone = { "description": trac2markdown( milestone.pop("description"), "/milestones/", conv_help, False ), "title": title, "state": "open" if not completed else "closed", } due = milestone.pop("due") if due: new_milestone["due_on"] = convert_xmlrpc_datetime(due) if completed: new_milestone["updated_at"] = convert_xmlrpc_datetime(completed) new_milestone["closed_at"] = convert_xmlrpc_datetime(completed) if milestone: log.warning(f"Discarded milestone data: {milestone}") milestone_map[milestone_name] = gh_create_milestone(dest, new_milestone) log.debug(milestone_map[milestone_name]) nextticketid = 1 ticketcount = 0 for src_ticket in get_all_tickets(filter_issues): src_ticket_id, time_created, time_changed, src_ticket_data = src_ticket if only_issues and src_ticket_id not in only_issues: print("SKIP unwanted ticket #%s" % src_ticket_id) continue if blacklist_issues and src_ticket_id in blacklist_issues: print("SKIP blacklisted ticket #%s" % src_ticket_id) continue if ( github and not only_issues and not blacklist_issues and not config.has_option("issues", "filter_issues") ): while nextticketid < src_ticket_id: print( "Ticket %d missing in Trac. Generating empty one in GitHub." % nextticketid ) issue_data = { "title": "Deleted trac ticket %d" % nextticketid, "description": "Ticket %d had been deleted in the original Trac instance. This empty ticket serves as placeholder to ensure a proper 1:1 mapping of ticket ids to issue ids." % nextticketid, "labels": [], } issue = gh_create_issue(dest, issue_data) gh_update_issue_property(dest, issue, "state", "closed") nextticketid = nextticketid + 1 nextticketid = nextticketid + 1 # src_ticket_data.keys(): ['status', 'changetime', 'description', 'reporter', 'cc', 'type', 'milestone', '_ts', # 'component', 'owner', 'summary', 'platform', 'version', 'time', 'keywords', 'resolution'] changelog = get_changeLog(source, src_ticket_id) log.info( 'Migrating ticket #%s (%3d changes): "%s"' % ( src_ticket_id, len(changelog), src_ticket_data["summary"][:50].replace('"', "'"), ) ) conv_help.set_ticket_paths(src_ticket_id) def attr_value(s): "Markup for an attribute value. Boldface if nonempty." if s: return f"**{s}**" return "none" def issue_description(src_ticket_data): description_pre = '
\n\n' description_post = "" description_post_items = [] depends = "" dependencies = src_ticket_data.pop("dependencies", "") other_deps = [] for dep in dependencies.replace(";", " ").replace(",", " ").split(): dep = dep.strip() if m := re.fullmatch("#?([0-9]+)", dep): if depends: depends += "\n" # Use this phrase, used by various dependency managers: # https://www.dpulls.com/ # https://github.com/z0al/dependent-issues # https://github.com/gregsdennis/dependencies-action/pull/5 depends += f"Depends on #{m.group(1)}" elif dep: # some free form remark in Dependencies other_deps.append(dep) if other_deps: # put it back src_ticket_data["dependencies"] = dependencies if depends: description_post_items.append(depends) owner = gh_username_list(dest, src_ticket_data.pop("owner", None)) if owner: description_post_items.append(f"Assignee: {attr_value(owner)}") version = src_ticket_data.pop("version", None) if version is not None and version != "trunk": description_post_items.append(f"Version: {attr_value(version)}") # subscribe persons in cc cc = src_ticket_data.pop("cc", "") ccstr = "" for person in cc.replace(";", ",").split(","): person = person.strip() if person == "": continue person = gh_username(dest, person) ccstr += " " + person if ccstr != "": description_post_items.append("CC: " + ccstr) keywords, labels = map_keywords(src_ticket_data.pop("keywords", "")) if keywords: description_post_items.append( "Keywords: " + attr_value(", ".join(keywords)) ) branch = src_ticket_data.pop("branch", "") commit = src_ticket_data.pop("commit", "") # These two are the same in all closed-fixed tickets. Reduce noise. if branch and commit: if branch == commit: description_post_items.append( "Branch/Commit: " + attr_value(github_ref_markdown(branch)) ) else: description_post_items.append( "Branch/Commit: " + attr_value( github_ref_markdown(branch) + " @ " + github_ref_markdown(commit) ) ) else: if branch: description_post_items.append( f"Branch: " + attr_value(github_ref_markdown(branch)) ) if commit: description_post_items.append( f"Commit: " + attr_value(github_ref_markdown(commit)) ) description = src_ticket_data.pop("description", "") for field, value in src_ticket_data.items(): if ( not field.startswith("_") and field not in ["changetime", "time"] and value and value not in ignored_values ): field = field.title().replace("_", " ") description_post_items.append(f"{field}: {attr_value(value)}") # Sort description items order = [ "depends", "upstream", "cc: ", "component", "keywords", "****", "assignee", "author", "branch", "commit", "reviewer", "merged", ] sort_order = [ item[:4].lower() for item in order ] # weigh only initial 4 characters def item_key(x): initial = x[:4].lower() try: return sort_order.index(initial), initial except ValueError: return sort_order.index("****"), initial description_post_items = sorted(description_post_items, key=item_key) description_post += "\n\n" + "\n\n".join(description_post_items) description_post += f"\n\n_Issue created by migration from {trac_url_ticket}/{src_ticket_id}_\n\n" return ( description_pre + trac2markdown(description, "/issues/", conv_help, False) + description_post ) # get original component, owner # src_ticket_data['component'] is the component after all changes, but for creating the issue we want the component # that was set when the issue was created; we should get this from the first changelog entry that changed a component # ... and similar for other attributes first_old_values = {} for change in changelog: time, author, change_type, oldvalue, newvalue, permanent = change if change_type not in first_old_values: if change_type not in [ "cc", "comment", "attachment", ] and not change_type.startswith("_comment"): field = change_type if isinstance(oldvalue, str): oldvalue = oldvalue.strip() first_old_values[field] = oldvalue # If no change changed a certain attribute, then that attribute is given by ticket data # (When writing migration archives, this is true unconditionally.) if github: src_ticket_data.update(first_old_values) # Process src_ticket_data and remove (using pop) attributes that are processed already. # issue_description dumps everything that has not been processed in the description. issue_data = {} def milestone_labels(src_ticket_data, status): labels = [] if add_label: labels.append(add_label) component = src_ticket_data.get("component", None) # We do not pop the component; this is to ensure that one can search for # Trac components even after an outdated component label is deleted in GitHub. if component is not None and component.strip() != "": label = map_component(component) if label: labels.append(label) gh_ensure_label(dest, label, label_category="component") priority = src_ticket_data.pop("priority", default_priority) if priority != default_priority: label = map_priority(priority) labels.append(label) gh_ensure_label(dest, label, label_category="priority") severity = src_ticket_data.pop("severity", default_severity) if severity != default_severity: labels.append(severity) gh_ensure_label(dest, severity, label_category="severity") tickettype = map_tickettype(src_ticket_data.pop("type", None)) if tickettype is not None: labels.append(tickettype) gh_ensure_label(dest, tickettype, label_category="type") resolution = map_resolution(src_ticket_data.pop("resolution", None)) if resolution is not None: labels.append(resolution) gh_ensure_label(dest, resolution, label_category="resolution") keywords, keyword_labels = map_keywords(src_ticket_data.get("keywords", "")) for label in keyword_labels: labels.append(label) gh_ensure_label(dest, label, label_category="keyword") milestone, label = map_milestone(src_ticket_data.pop("milestone", None)) if milestone and milestone in milestone_map: milestone = milestone_map[milestone] elif milestone: # Unknown milestone, put it back logging.warning(f'Unknown milestone "{milestone}"') unmapped_milestones[milestone] += 1 src_ticket_data["milestone"] = milestone milestone = None elif label: labels.append(label) gh_ensure_label(dest, label, label_category="milestone") status = src_ticket_data.pop("status", status) issue_state, label = map_status(status) if label: labels.append(label) gh_ensure_label(dest, label, label_category="resolution") labels = normalize_labels(dest, labels) return milestone, labels def title_status(summary, status=None): r""" Decode title prefixes such as [with patch, positive review] used in early Sage tickets. Return (cleaned up title, status) """ if m := re.match(r"^\[([A-Za-z_ ,;?]*)\] *", summary): phrases = m.group(1).replace(";", ",").split(",") keep_phrases = [] for phrase in phrases: phrase = phrase.strip() if re.fullmatch( r"needs review|(with )?positive review|needs work", phrase ): status = phrase.replace("with ", "").replace(" ", "_") elif re.fullmatch( r"(with)? *(new|trivial)? *(patch|bundl)e?s?|(with)? *spkg", phrase, ): pass else: keep_phrases.append(phrase) if keep_phrases: summary = "[" + ", ".join(keep_phrases) + "] " + summary[m.end(0) :] else: summary = summary[m.end(0) :] if not summary: summary = "No title" return summary, status tmp_src_ticket_data = copy(src_ticket_data) title, status = title_status(tmp_src_ticket_data.pop("summary")) milestone, labels = milestone_labels(tmp_src_ticket_data, status) issue_data["title"] = title issue_data["labels"] = labels if milestone: issue_data["milestone"] = milestone if not github: issue_data["user"] = gh_username(dest, tmp_src_ticket_data.pop("reporter")) issue_data["created_at"] = convert_xmlrpc_datetime(time_created) issue_data["updated_at"] = convert_xmlrpc_datetime(time_changed) issue_data["number"] = int(src_ticket_id) issue_data["reactions"] = [] assignees = gh_user_url_list(dest, tmp_src_ticket_data.pop("owner")) issue_data["assignees"] = assignees # Find closed_at for time, author, change_type, oldvalue, newvalue, permanent in reversed( changelog ): if change_type == "status": state, label = map_status(newvalue) if state == "closed": issue_data["closed_at"] = convert_xmlrpc_datetime(time) break # on the last status change issue_data["description"] = issue_description(tmp_src_ticket_data) issue = gh_create_issue(dest, issue_data) def update_labels(labels, add_label, remove_label, label_category="type"): oldlabels = copy(labels) if remove_label: with contextlib.suppress(ValueError): labels.remove(remove_label) if add_label: labels.append(add_label) gh_ensure_label(dest, add_label, label_category=label_category) labels = normalize_labels(dest, labels) if set(labels) != set(oldlabels): gh_update_issue_property( dest, issue, "labels", labels, oldval=oldlabels, **event_data ) return labels if github: status = src_ticket_data.pop("status") if status in ["closed"]: # sometimes a ticket is already closed at creation, so close issue gh_update_issue_property(dest, issue, "state", "closed") else: src_ticket_data.update(first_old_values) title, status = title_status( src_ticket_data.get("summary"), src_ticket_data.get("status") ) tmp_src_ticket_data = copy(src_ticket_data) milestone, labels = milestone_labels(tmp_src_ticket_data, status) assignees = gh_user_url_list(dest, tmp_src_ticket_data.pop("owner")) # Create issue events for initial labels & milestone user_url = gh_user_url(dest, tmp_src_ticket_data.get("reporter")) event_data = { "created_at": convert_xmlrpc_datetime(time_created), "actor": user_url, } if milestone: gh_update_issue_property( dest, issue, "milestone", milestone, None, **event_data ) for label in labels: update_labels([], label, None) gh_update_issue_property( dest, issue, "assignees", assignees, oldval=[], **event_data ) issue_state, label = map_status(status) if label and label not in labels: update_labels([], label, None) last_sha = None def change_status(newvalue): oldvalue = src_ticket_data.get("status") src_ticket_data["status"] = newvalue oldstate, oldlabel = map_status(oldvalue) newstate, newlabel = map_status(newvalue) new_labels = update_labels(labels, newlabel, oldlabel) if issue_state != newstate: if newstate == "closed" and last_sha: if closing_sha := closing_commits.pop( (src_ticket_id, last_sha), None ): ## We pop the item so that the importer does not complain about RecordNotUnique. # commit_id (string) -- The SHA of the commit that referenced this issue. event_data["commit_id"] = closing_sha # commit_url (string) -- The GitHub REST API link to the commit that referenced this issue. # event_data['commit_repository'] = target_url_git_repo event_data["commit_repository"] = target_url_issues_repo gh_update_issue_property(dest, issue, "state", newstate, **event_data) return newstate, new_labels attachments = [] for change in changelog: time, author, change_type, oldvalue, newvalue, permanent = change change_time = str(convert_xmlrpc_datetime(time)) # print(change) log.debug( " %s by %s (%s -> %s)" % ( change_type, author, str(oldvalue)[:40].replace("\n", " "), str(newvalue)[:40].replace("\n", " "), ) ) # assert attachment is None or change_type == "comment", "an attachment must be followed by a comment" # if author in ['anonymous', 'Draftmen888'] : # print (" SKIPPING CHANGE BY", author) # continue user = gh_username(dest, author) user_url = gh_user_url(dest, author) comment_data = { "created_at": convert_trac_datetime(change_time), "user": user, "formatter": "markdown", } event_data = { "created_at": convert_trac_datetime(change_time), "actor": user_url, } if change_type == "attachment": # The attachment may be described in the next comment attachments.append( { "attachment": get_ticket_attachment( source, src_ticket_id, newvalue ).data, "attachment_name": newvalue, } ) elif change_type == "comment": # oldvalue is here either x or y.x, where x is the number of this comment and y is the number of the comment that is replied to m = re.fullmatch(r"([0-9]+[.])?([0-9]+)", oldvalue) x = m and m.group(2) desc = newvalue.strip() if not desc and not attachments: # empty description and not description of attachment continue comment_data["note"] = trac2markdown(desc, "/issues/", conv_help, False) comment_data["attachments"] = attachments attachments = [] gh_comment_issue( dest, issue, comment_data, src_ticket_id, comment_id=x, minimize=False, ) elif change_type.startswith("_comment"): # this is an old version of a comment, which has been edited later (given in previous change), # e.g., see http://localhost:8080/ticket/3431#comment:9 http://localhost:8080/ticket/3400#comment:14 # we will forget about these old versions and only keep the latest one pass elif change_type == "status": issue_state, labels = change_status(newvalue) elif change_type == "resolution": oldresolution = map_resolution(oldvalue) newresolution = map_resolution(newvalue) labels = update_labels( labels, newresolution, oldresolution, "resolution" ) elif change_type == "component": oldlabel = map_component(oldvalue) newlabel = map_component(newvalue) labels = update_labels(labels, newlabel, oldlabel, "component") elif change_type == "owner": oldvalue = gh_user_url_list(dest, oldvalue) newvalue = gh_user_url_list(dest, newvalue) gh_update_issue_property( dest, issue, "assignees", newvalue, oldval=oldvalue, **event_data ) # oldvalue = gh_username_list(dest, oldvalue) # newvalue = gh_username_list(dest, newvalue) # if oldvalue and newvalue: # comment_data['note'] = 'Changed assignee from ' + attr_value(oldvalue) + ' to ' + attr_value(newvalue) # elif newvalue: # comment_data['note'] = 'Assignee: ' + attr_value(newvalue) # else: # comment_data['note'] = 'Removed assignee ' + attr_value(oldvalue) # if newvalue != oldvalue: # gh_comment_issue(dest, issue, comment_data, src_ticket_id) elif change_type == "version": if oldvalue != "": desc = "Changed version from %s to %s." % ( attr_value(oldvalue), attr_value(newvalue), ) else: desc = "Version: " + attr_value(newvalue) comment_data["note"] = desc gh_comment_issue(dest, issue, comment_data, src_ticket_id) elif change_type == "milestone": oldmilestone, oldlabel = map_milestone(oldvalue) newmilestone, newlabel = map_milestone(newvalue) if oldmilestone and oldmilestone in milestone_map: oldmilestone = milestone_map[oldmilestone] else: if oldmilestone: logging.warning(f'Ignoring unknown milestone "{oldmilestone}"') unmapped_milestones[oldmilestone] += 1 oldmilestone = GithubObject.NotSet if newmilestone and newmilestone in milestone_map: newmilestone = milestone_map[newmilestone] else: if newmilestone: logging.warning(f'Ignoring unknown milestone "{newmilestone}"') unmapped_milestones[newmilestone] += 1 newmilestone = GithubObject.NotSet if oldmilestone != newmilestone: gh_update_issue_property( dest, issue, "milestone", newmilestone, oldval=oldmilestone, **event_data, ) labels = update_labels(labels, newlabel, oldlabel, "milestone") elif change_type == "cc": pass # we handle only the final list of CCs (above) elif change_type == "type": oldtype = map_tickettype(oldvalue) newtype = map_tickettype(newvalue) labels = update_labels(labels, newtype, oldtype, "type") elif change_type == "description": if github: issue_data["description"] = ( issue_description(src_ticket_data) + "\n\n(changed by " + user + " at " + change_time + ")" ) gh_update_issue_property( dest, issue, "description", issue_data["description"], **event_data, ) else: body = "Description changed:\n``````diff\n" old_description = trac2markdown( oldvalue, "/issues/", conv_help, False ) new_description = trac2markdown( newvalue, "/issues/", conv_help, False ) body += "\n".join( unified_diff( old_description.split("\n"), new_description.split("\n"), lineterm="", ) ) body += "\n``````\n" comment_data["note"] = body gh_comment_issue(dest, issue, comment_data, src_ticket_id) elif change_type == "summary": oldtitle, oldstatus = title_status(oldvalue) title, status = title_status(newvalue) if title != oldtitle: issue_data["title"] = title gh_update_issue_property( dest, issue, "title", title, oldval=oldtitle, **event_data ) if status is not None: issue_state, labels = change_status(status) elif change_type == "priority": oldlabel = map_priority(oldvalue) newlabel = map_priority(newvalue) labels = update_labels(labels, newlabel, oldlabel, "priority") elif change_type == "severity": oldlabel = map_severity(oldvalue) newlabel = map_severity(newvalue) labels = update_labels(labels, newlabel, oldlabel, "severity") elif change_type == "keywords": oldlabels = copy(labels) oldkeywords, oldkeywordlabels = map_keywords(oldvalue) newkeywords, newkeywordlabels = map_keywords(newvalue) for label in oldkeywordlabels: with contextlib.suppress(ValueError): labels.remove(label) for label in newkeywordlabels: labels.append(label) gh_ensure_label(dest, label, label_category="keyword") if oldkeywords != newkeywords: comment_data["note"] = ( "Changed keywords from " + attr_value(", ".join(oldkeywords)) + " to " + attr_value(", ".join(newkeywords)) ) gh_comment_issue(dest, issue, comment_data, src_ticket_id) if labels != oldlabels: gh_update_issue_property( dest, issue, "labels", labels, oldval=oldlabels, **event_data ) else: if oldvalue in ignored_values: oldvalue = "" if newvalue in ignored_values: newvalue = "" if oldvalue != newvalue: if change_type in ["branch", "commit"]: if oldvalue: oldvalue = github_ref_markdown(oldvalue) if newvalue: if re.fullmatch("[0-9a-f]{40}", newvalue): # Store for closing references last_sha = newvalue newvalue = github_ref_markdown(newvalue) change_type = change_type.replace("_", " ") if not oldvalue: comment_data[ "note" ] = f"{change_type.title()}: {attr_value(newvalue)}" else: comment_data[ "note" ] = f"Changed {change_type} from {attr_value(oldvalue)} to {attr_value(newvalue)}" gh_comment_issue(dest, issue, comment_data, src_ticket_id) if attachments: comment_data["attachments"] = attachments attachments = [] gh_comment_issue(dest, issue, comment_data, src_ticket_id, minimize=False) ticketcount += 1 if ticketcount % 10 == 0 and sleep_after_10tickets > 0: print( "%d tickets migrated. Waiting %d seconds to let GitHub/Trac cool down." % (ticketcount, sleep_after_10tickets) ) sleep(sleep_after_10tickets) def convert_wiki(source, dest): exclude_authors = ["trac"] if not os.path.isdir(wiki_export_dir): os.makedirs(wiki_export_dir) client.MultiCall(source) conv_help = WikiConversionHelper(source) if os.path.exists("links.txt"): os.remove("links.txt") for pagename in source.wiki.getAllPages(): info = source.wiki.getPageInfo(pagename) if info["author"] in exclude_authors: continue page = source.wiki.getPage(pagename) print("Migrate Wikipage", pagename) # Github wiki does not have folder structure gh_pagename = " ".join(pagename.split("/")) conv_help.set_wikipage_paths(pagename) converted = trac2markdown( page, os.path.dirname("/wiki/%s" % gh_pagename), conv_help ) attachments = [] for attachment in source.wiki.listAttachments(pagename): print(" Attachment", attachment) attachmentname = os.path.basename(attachment) attachmentdata = source.wiki.getAttachment(attachment).data dirname = os.path.join(wiki_export_dir, gh_pagename) if not os.path.isdir(dirname): os.makedirs(dirname) # write attachment data to binary file open(os.path.join(dirname, attachmentname), "wb").write(attachmentdata) attachmenturl = gh_pagename + "/" + attachmentname converted = re.sub( r"\[attachment:%s\s([^\[\]]+)\]" % re.escape(attachmentname), r"[\1](%s)" % attachmenturl, converted, ) attachments.append((attachmentname, attachmenturl)) # add a list of attachments if len(attachments) > 0: converted += "\n---\n\nAttachments:\n" for name, url in attachments: converted += " * [" + name + "](" + url + ")\n" # TODO we could use the GitHub API to write into the Wiki repository of the GitHub project outfile = os.path.join(wiki_export_dir, gh_pagename + ".md") # For wiki page names with slashes os.makedirs(os.path.dirname(outfile), exist_ok=True) try: open(outfile, "w").write(converted) except UnicodeEncodeError as e: print("EXCEPTION:", e) print(" Context:", e.object[e.start - 20 : e.end + 20]) print(" Retrying with UTF-8 encoding") codecs.open(outfile, "w", "utf-8").write(converted) def output_unmapped_users(data): table = Table(title="Unmapped users") table.add_column("Username", justify="right", style="cyan", no_wrap=True) table.add_column("Known on Trac", justify="right", style="cyan", no_wrap=True) table.add_column("Mention", justify="right", style="cyan", no_wrap=True) table.add_column("Mannequin", justify="right", style="cyan", no_wrap=True) table.add_column("Frequency", style="magenta") for key, frequency in data: origname, known_on_trac, is_mention, mannequin = key table.add_row( origname, str(known_on_trac), str(is_mention), mannequin, str(frequency) ) console = Console() console.print(table) # The file is created if not exists if not os.path.exists("unmapped_users.txt"): with open("unmapped_users.txt", "a") as f: for key, frequency in data: origname, known_on_trac, is_mention, mannequin = key f.write( " ".join( [ origname, str(known_on_trac), str(is_mention), mannequin, str(frequency), ] ) + "\n" ) def output_unmapped_milestones(data): table = Table(title="Unmapped milestones") table.add_column("Milestone", justify="right", style="cyan", no_wrap=True) table.add_column("Frequency", style="magenta") for key, frequency in data: table.add_row(key, str(frequency)) console = Console() console.print(table) # The file is created if not exists if not os.path.exists("unmapped_milestones.txt"): with open("unmapped_milestones.txt", "a") as f: for key, frequency in data: f.write(" ".join([key, str(frequency)]) + "\n") min_keyword_frequency_displayed = 20 def output_keyword_frequency(data): table = Table(title="Unmapped keyword frequency") table.add_column("Keyword", justify="right", style="cyan", no_wrap=True) table.add_column("Frequency", style="magenta") for key, frequency in data: if frequency >= min_keyword_frequency_displayed: table.add_row(key, str(frequency)) console = Console() console.print(table) # The file is created if not exists if not os.path.exists("keyword_frequency.txt"): with open("keyword_frequency.txt", "a") as f: for key, frequency in data: f.write(" ".join([key, str(frequency)]) + "\n") def output_component_frequency(data): table = Table(title="Component frequency") table.add_column("Component", justify="right", style="cyan", no_wrap=True) table.add_column("Frequency", style="magenta") for keyword, frequency in data: table.add_row(keyword, str(frequency)) console = Console() console.print(table) # The file is created if not exists if not os.path.exists("component_frequency.txt"): with open("component_frequency.txt", "a") as f: for key, frequency in data: f.write(" ".join([key, str(frequency)]) + "\n") if __name__ == "__main__": from rich.logging import RichHandler FORMAT = "%(message)s" logging.basicConfig( level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()] ) source = client.ServerProxy(trac_url) github = None dest = None gh_user = None if must_convert_issues: if github_token is not None: github = Github(github_token, base_url=github_api_url) elif github_username is not None: github = Github(github_username, github_password, base_url=github_api_url) if github: dest = github.get_repo(github_project) gh_user = github.get_user() for l in dest.get_labels(): gh_labels[l.name.lower()] = l # print 'Existing labels:', gh_labels.keys() else: requester = MigrationArchiveWritingRequester( migration_archive, wiki_export_dir ) dest = Repository( requester, None, dict(name=github_project, url=target_url_issues_repo), None, ) # print(dest.url) sleep_after_request = 0 try: if must_convert_issues: read_closing_commits() convert_issues( source, dest, only_issues=only_issues, blacklist_issues=blacklist_issues ) if must_convert_wiki: convert_wiki(source, dest) finally: if must_convert_issues and not github: # Patch in labels dest._requester.requestJsonAndCheck( "PATCH", f"{dest.url}", input={ "labels": [ { "url": label.url, "name": label.name, "color": label.color, "description": label.description if label.description is not GithubObject.NotSet else None, "created_at": None, } for label in gh_labels.values() ] }, ) dest._requester.flush() with open("minimized_issue_comments.json", "w") as f: json.dump(minimized_issue_comments, f, indent=4) output_unmapped_users( sorted(unmapped_users.items(), key=lambda x: (x[0][0].lower(), *x[0][1:])) ) output_unmapped_milestones( sorted(unmapped_milestones.items(), key=lambda x: -x[1]) ) output_keyword_frequency(sorted(keyword_frequency.items(), key=lambda x: -x[1])) output_component_frequency( sorted(component_frequency.items(), key=lambda x: -x[1]) )