from lxml import etree import base64 import random import time #@@CALIBRE_COMPAT_CODE@@ from setup.libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendRequestDocuRC, sendHTTPRequest from setup.libadobe import get_devkey_path, get_device_path, get_activation_xml_path from setup.libadobe import VAR_VER_SUPP_VERSIONS, VAR_VER_HOBBES_VERSIONS from setup.libadobe import VAR_VER_BUILD_IDS, VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER def buildFulfillRequest(acsm): adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) activationxml = etree.parse(get_activation_xml_path()) devicexml = etree.parse(get_device_path()) user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text device_uuid = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text try: fingerprint = None device_type = None fingerprint = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("fingerprint"))).text device_type = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("deviceType"))).text except: pass if (fingerprint is None or fingerprint == "" or device_type is None or device_type == ""): # This should usually never happen with a proper activation, but just in case it does, # I'll leave this code in - it loads the fingerprint from the device data instead. fingerprint = devicexml.find("./%s" % (adNS("fingerprint"))).text device_type = devicexml.find("./%s" % (adNS("deviceType"))).text version = None clientOS = None clientLocale = None ver = devicexml.findall("./%s" % (adNS("version"))) for f in ver: if f.get("name") == "hobbes": version = f.get("value") elif f.get("name") == "clientOS": clientOS = f.get("value") elif f.get("name") == "clientLocale": clientLocale = f.get("value") # Find matching client version depending on the Hobbes version. # This way we don't need to store and re-load it for each fulfillment. try: v_idx = VAR_VER_HOBBES_VERSIONS.index(version) clientVersion = VAR_VER_SUPP_VERSIONS[v_idx] except: # Version not present, probably the "old" 10.0.4 entry. # As 10.X is in the 3.0 range, assume we're on ADE 3.0 clientVersion = "3.0.1.91394" if clientVersion == "ADE WIN 9,0,1131,27": # Ancient ADE 1.7.2 does this request differently request = "\n" request += "%s\n" % (user_uuid) request += "%s\n" % (device_uuid) request += "%s\n" % (device_type) request += etree.tostring(acsm, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") request += "" return request, False else: request = "" request += "" request += "" request += "%s" % (user_uuid) request += "%s" % (device_uuid) request += "%s" % (device_type) request += etree.tostring(acsm, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") request += "" request += "%s" % (version) request += "%s" % (clientOS) request += "%s" % (clientLocale) request += "%s" % (clientVersion) request += "%s" % (device_type) request += "%s" % ("ADOBE Digitial Editions") # YES, this typo ("Digitial" instead of "Digital") IS present in ADE!! request += "%s" % (fingerprint) request += "" request += "%s" % (user_uuid) request += "%s" % (device_uuid) request += "" request += "" request += "" return request, True def buildInitLicenseServiceRequest(authURL): # type: (str) -> str adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) NSMAP = { "adept" : "http://ns.adobe.com/adept" } etree.register_namespace("adept", NSMAP["adept"]) activationxml = etree.parse(get_activation_xml_path()) user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text ret = "" ret += "" ret += "" ret += "%s" % (authURL) ret += addNonce() ret += "%s" % (user_uuid) ret += "" NSMAP = { "adept" : "http://ns.adobe.com/adept" } etree.register_namespace("adept", NSMAP["adept"]) req_xml = etree.fromstring(ret) signature = sign_node(req_xml) if (signature is None): return None etree.SubElement(req_xml, etree.QName(NSMAP["adept"], "signature")).text = signature return "\n" + etree.tostring(req_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") def getDecryptedCert(pkcs12_b64_string = None): if pkcs12_b64_string is None: activationxml = etree.parse(get_activation_xml_path()) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) pkcs12_b64_string = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text pkcs12_data = base64.b64decode(pkcs12_b64_string) try: from setup.libadobe import devkey_bytes as devkey_adobe except: pass if devkey_adobe is not None: devkey_bytes = devkey_adobe else: f = open(get_devkey_path(), "rb") devkey_bytes = f.read() f.close() try: return get_cert_from_pkcs12(pkcs12_data, base64.b64encode(devkey_bytes)) except: return None def buildAuthRequest(): activationxml = etree.parse(get_activation_xml_path()) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) my_cert = getDecryptedCert() if my_cert is None: print("Can't decrypt pkcs12 with devkey!") return None ret = "\n" ret += "\n" ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text) ret += "%s\n" % (base64.b64encode(my_cert).decode("utf-8")) ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("licenseCertificate"))).text) ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("authenticationCertificate"))).text) ret += "" return ret def doOperatorAuth(operatorURL): # type: (str) -> str auth_req = buildAuthRequest() if auth_req is None: return "Failed to create auth request" authURL = operatorURL if authURL.endswith("Fulfill"): authURL = authURL.replace("/Fulfill", "") replyData = sendRequestDocu(auth_req, authURL + "/Auth").decode("utf-8") if not " str adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) NSMAP = { "adept" : "http://ns.adobe.com/adept" } etree.register_namespace("adept", NSMAP["adept"]) activationxml = etree.parse(get_activation_xml_path()) try: operator_url_list = activationxml.findall("./%s/%s" % (adNS("operatorURLList"), adNS("operatorURL"))) for member in operator_url_list: if member.text.strip() == operatorURL: #print("Already authenticated to operator") return None except: pass ret = doOperatorAuth(operatorURL) if (ret is not None): return "doOperatorAuth error: %s" % ret # Check if list exists: list = activationxml.find("./%s" % (adNS("operatorURLList"))) user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text if list is None: x = etree.SubElement(activationxml.getroot(), etree.QName(NSMAP["adept"], "operatorURLList"), nsmap=NSMAP) etree.SubElement(x, etree.QName(NSMAP["adept"], "user")).text = user_uuid list = activationxml.find("./%s" % (adNS("operatorURLList"))) if list is None: return "Err, this list should not be none right now ..." etree.SubElement(list, etree.QName(NSMAP["adept"], "operatorURL")).text = operatorURL f = open(get_activation_xml_path(), "w") f.write("\n") f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) f.close() return None def buildRights(license_token_node): ret = "\n" ret += "\n" # Add license token ret += etree.tostring(license_token_node, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") ret += "\n" NSMAP = { "adept" : "http://ns.adobe.com/adept" } adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) lic_token_url = license_token_node.find("./%s" % (adNS("licenseURL"))).text ret += "%s\n" % lic_token_url # Get cert for this license URL: activationxml = etree.parse(get_activation_xml_path()) try: licInfo = activationxml.findall("./%s/%s" % (adNS("licenseServices"), adNS("licenseServiceInfo"))) found = False for member in licInfo: if member.find("./%s" % (adNS("licenseURL"))).text == lic_token_url: ret += "%s\n" % (member.find("./%s" % (adNS("certificate"))).text) found = True break except: pass if not found: print("Did not find the licenseService certificate in the activation data.") print("This usually means it failed to download from the distributor's servers.") print("Please try to download an ACSM book from the Adobe Sample Library, then if that was successful, ") print("try your ACSM book file again.") return None ret += "\n" ret += "\n" return ret def fulfill(acsm_file, do_notify = False): verbose_logging = False try: import prefs deacsmprefs = prefs.ACSMInput_Prefs() verbose_logging = deacsmprefs["detailed_logging"] except: pass # Get pkcs12: pkcs12 = None acsmxml = None try: activationxml = etree.parse(get_activation_xml_path()) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text except: return False, "Activation not found or invalid" if pkcs12 is None or len(pkcs12) == 0: return False, "Activation missing" try: acsmxml = etree.parse(acsm_file) except: return False, "ACSM not found or invalid" #print(etree.tostring(acsmxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) dcNS = lambda tag: '{%s}%s' % ('http://purl.org/dc/elements/1.1/', tag) try: mimetype = acsmxml.find("./%s/%s/%s" % (adNS("resourceItemInfo"), adNS("metadata"), dcNS("format"))).text if (mimetype == "application/pdf"): #print("You're trying to fulfill a PDF file.") pass elif (mimetype == "application/epub+zip"): #print("Trying to fulfill an EPUB file ...") pass else: print("Weird mimetype: %s" % (mimetype)) print("Continuing anyways ...") except: # Some books, like from Google Play books, use a different format and don't have that metadata tag. pass fulfill_request, adept_ns = buildFulfillRequest(acsmxml) if verbose_logging: print("Fulfill request:") print(fulfill_request) fulfill_request_xml = etree.fromstring(fulfill_request) # Sign the request: signature = sign_node(fulfill_request_xml) if (signature is None): return False, "Signing failed!" NSMAP = { "adept" : "http://ns.adobe.com/adept" } adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) if adept_ns: # "new" ADE etree.SubElement(fulfill_request_xml, etree.QName(NSMAP["adept"], "signature")).text = signature else: # ADE 1.7.2 etree.SubElement(fulfill_request_xml, etree.QName("signature")).text = signature # Get operator URL: operatorURL = None try: operatorURL = acsmxml.find("./%s" % (adNS("operatorURL"))).text.strip() except: pass if (operatorURL is None or len(operatorURL) == 0): return False, "OperatorURL missing in ACSM" fulfillURL = operatorURL + "/Fulfill" ret = operatorAuth(fulfillURL) if (ret is not None): return False, "operatorAuth error: %s" % ret if adept_ns: # "new" ADE fulfill_req_signed = "\n" + etree.tostring(fulfill_request_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") else: # ADE 1.7.2 fulfill_req_signed = etree.tostring(fulfill_request_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") #print("will send:\n %s" % fulfill_req_signed) #print("Sending fulfill request to %s" % fulfillURL) # For debugging only # fulfillURL = fulfillURL.replace("https:", "http:") replyData = sendRequestDocu(fulfill_req_signed, fulfillURL).decode("utf-8") if "= 10: print("Took us 10 attempts to acquire loan token lock, still didn't work.") print("(still the same token %s)" % (deacsmprefs["loan_identifier_token"])) print("If you see this error message please open a bug report.") # "Mark" the current access with a random token, to prevent multiple instances # of the plugin overwriting eachother's data. deacsmprefs.refresh() if deacsmprefs["loan_identifier_token"] == 0: random_identifier = random.getrandbits(64) deacsmprefs.set("loan_identifier_token", random_identifier) deacsmprefs.commit() deacsmprefs.refresh() if random_identifier != deacsmprefs["loan_identifier_token"]: #print("we broke another thread's token, try again") last_token = deacsmprefs["loan_identifier_token"] error_counter = error_counter + 1 continue else: if last_token != deacsmprefs["loan_identifier_token"]: #print("Token changed in the meantime ...") # Give it another 5 tries error_counter = max(0, error_counter - 5) pass last_token = deacsmprefs["loan_identifier_token"] #print("waiting on another thread ...") sleeptime = random.randrange(2, 10) / 1000 print(str(sleeptime)) time.sleep(sleeptime) error_counter = error_counter + 1 continue # Okay, now this thread can "use" the config list, and no other thread should overwrite it ... # Check if that exact loan is already in the list, and if so, delete it: done = False while not done: done = True for book in deacsmprefs["list_of_rented_books"]: if book["loanID"] == new_loan_record["loanID"]: done = False deacsmprefs["list_of_rented_books"].remove(book) break # Add all necessary information for a book return to the JSON array. # The config widget can then read this and present a list of not-yet-returned # books, and can then return them. # Also, the config widget is responsible for cleaning up that list once a book's validity period is up. deacsmprefs["list_of_rented_books"].append(new_loan_record) # Okay, now we added our loan record. # Remove the identifier token so other threads can use the config again: deacsmprefs.commit() deacsmprefs.refresh() if deacsmprefs["loan_identifier_token"] != random_identifier: print("Another thread stole the loan token while we were working with it - that's not supposed to happen ...") print("If you see this message, please open a bug report.") return False deacsmprefs.set("loan_identifier_token", 0) deacsmprefs.commit() return True def tryReturnBook(bookData): verbose_logging = False try: import calibre_plugins.deacsm.prefs as prefs deacsmprefs = prefs.ACSMInput_Prefs() verbose_logging = deacsmprefs["detailed_logging"] except: pass try: user = bookData["user"] loanID = bookData["loanID"] device = bookData["device"] operatorURL = bookData["operatorURL"] except: print("Invalid book data!") return False, "Invalid book data" req_data = "" req_data += "" req_data += "%s" % (user) if device is not None: req_data += "%s" % (device) req_data += "%s" % (loanID) req_data += addNonce() req_data += "" NSMAP = { "adept" : "http://ns.adobe.com/adept" } etree.register_namespace("adept", NSMAP["adept"]) full_text_xml = etree.fromstring(req_data) signature = sign_node(full_text_xml) if (signature is None): print("SIGN ERROR!") return False, "Sign error" etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature print("Notifying loan return server %s" % (operatorURL + "/LoanReturn")) doc_send = "\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") if verbose_logging: print(doc_send) retval = sendRequestDocu(doc_send, operatorURL + "/LoanReturn").decode("utf-8") if " tag not found. Guess nobody wants to be notified.") #print(etree.tostring(fulfillmentResultToken, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) return True, "" errmsg = "" errmsg_crit = "" for element in notifiers: url = element.find("./%s" % (adNS("notifyURL"))).text body = element.find("./%s" % (adNS("body"))) critical = True if element.get("critical", "yes") == "no": critical = False print("Notifying optional server %s" % (url)) else: print("Notifying server %s" % (url)) if (user is None): try: # "Normal" Adobe fulfillment user = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("user"))).text except AttributeError: # B&N Adobe PassHash fulfillment. Doesn't use notifications usually ... #user = body.find("./%s" % (adNS("user"))).text print("Skipping notify due to passHash?") print("If this is not a passHash book pls open a bug report.") continue if (device is None): try: # "Normal" Adobe fulfillment device = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("device"))).text except: print("Missing deviceID for loan metadata ... why?") print("Reading from device.xml instead.") # Lets try to read this from the activation ... activationxml = etree.parse(get_activation_xml_path()) device = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text full_text = "" full_text += "%s" % user full_text += "%s" % device # ADE 4.0 apparently changed the order of these two elements. # I still don't know exactly how this order is determined, but in most cases # ADE 4+ has the body first, then the nonce, while ADE 3 and lower usually has nonce first, then body. # It probably doesn't matter, but still, we want to behave exactly like ADE, so check the version number: devicexml = etree.parse(get_device_path()) for f in devicexml.findall("./%s" % (adNS("version"))): if f.get("name") == "hobbes": version = f.get("value") try: v_idx = VAR_VER_HOBBES_VERSIONS.index(version) clientVersion = VAR_VER_BUILD_IDS[v_idx] except: clientVersion = 0 if (clientVersion >= VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER): full_text += etree.tostring(body, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") full_text += addNonce() else: full_text += addNonce() full_text += etree.tostring(body, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") full_text += "" NSMAP = { "adept" : "http://ns.adobe.com/adept" } etree.register_namespace("adept", NSMAP["adept"]) full_text_xml = etree.fromstring(full_text) signature = sign_node(full_text_xml) if (signature is None): print("SIGN ERROR!") continue etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature doc_send = "\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") # Debug: Print notify request if (verbose_logging): print("Notify payload XML:") print(doc_send) try: code, msg = sendRequestDocuRC(doc_send, url) except: if not critical: print("There was an error during an optional fulfillment notification:") import traceback traceback.print_exc() print("Continuing execution ...") continue else: print("Error during critical notification:") raise try: msg = msg.decode("utf-8") except: pass if verbose_logging: print("MSG:") print(msg) if "\n") f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) f.close() return True, "Done"