# Copyright 2015 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'google/apis/core/multipart' require 'google/apis/core/http_command' require 'google/apis/core/api_command' require 'google/apis/errors' require 'addressable/uri' require 'mime-types' require 'tempfile' module Google module Apis module Core # Extension of Hurley's UploadIO to add length accessor class UploadIO < Hurley::UploadIO OCTET_STREAM_CONTENT_TYPE = 'application/octet-stream' # Get the length of the stream # @return [Fixnum] def length io.respond_to?(:length) ? io.length : File.size(local_path) end # Create a new instance given a file path # @param [String, File] file_name # Path to file # @param [String] content_type # Optional content type. If nil, will attempt to auto-detect # @return [Google::Apis::Core::UploadIO] def self.from_file(file_name, content_type: nil) if content_type.nil? type = MIME::Types.of(file_name) content_type = type.first.content_type unless type.nil? || type.empty? end new(file_name, content_type || OCTET_STREAM_CONTENT_TYPE) end # Wraps an IO stream in UploadIO # @param [#read] io # IO to wrap # @param [String] content_type # Optional content type. # @return [Google::Apis::Core::UploadIO] def self.from_io(io, content_type: OCTET_STREAM_CONTENT_TYPE) new(io, content_type) end end # Base upload command. Not intended to be used directly # @private class BaseUploadCommand < ApiCommand UPLOAD_PROTOCOL_HEADER = 'X-Goog-Upload-Protocol' UPLOAD_CONTENT_TYPE_HEADER = 'X-Goog-Upload-Header-Content-Type' UPLOAD_CONTENT_LENGTH = 'X-Goog-Upload-Header-Content-Length' # File name or IO containing the content to upload # @return [String, File, #read] attr_accessor :upload_source # Content type of the upload material # @return [String] attr_accessor :upload_content_type # Content, as UploadIO # @return [Google::Apis::Core::UploadIO] attr_accessor :upload_io # Ensure the content is readable and wrapped in an {{Google::Apis::Core::UploadIO}} instance. # # @return [void] # @raise [Google::Apis::ClientError] if upload source is invalid def prepare! super if streamable?(upload_source) self.upload_io = UploadIO.from_io(upload_source, content_type: upload_content_type) @close_io_on_finish = false elsif upload_source.is_a?(String) self.upload_io = UploadIO.from_file(upload_source, content_type: upload_content_type) @close_io_on_finish = true else fail Google::Apis::ClientError, 'Invalid upload source' end end # Close IO stream when command done. Only closes the stream if it was opened by the command. def release! upload_io.close if @close_io_on_finish end private def streamable?(upload_source) upload_source.is_a?(IO) || upload_source.is_a?(StringIO) || upload_source.is_a?(Tempfile) end end # Implementation of the raw upload protocol class RawUploadCommand < BaseUploadCommand RAW_PROTOCOL = 'raw' # Ensure the content is readable and wrapped in an {{Google::Apis::Core::UploadIO}} instance. # # @return [void] # @raise [Google::Apis::ClientError] if upload source is invalid def prepare! super self.body = upload_io header[UPLOAD_PROTOCOL_HEADER] = RAW_PROTOCOL header[UPLOAD_CONTENT_TYPE_HEADER] = upload_io.content_type end end # Implementation of the multipart upload protocol class MultipartUploadCommand < BaseUploadCommand UPLOAD_BOUNDARY = 'RubyApiClientUpload' MULTIPART_PROTOCOL = 'multipart' MULTIPART_RELATED = 'multipart/related' # Encode the multipart request # # @return [void] # @raise [Google::Apis::ClientError] if upload source is invalid def prepare! super @multipart = Multipart.new(boundary: UPLOAD_BOUNDARY, content_type: MULTIPART_RELATED) @multipart.add_json(body) @multipart.add_upload(upload_io) self.body = @multipart.assemble header[:content_type] = @multipart.content_type header[UPLOAD_PROTOCOL_HEADER] = MULTIPART_PROTOCOL end end # Implementation of the resumable upload protocol class ResumableUploadCommand < BaseUploadCommand UPLOAD_COMMAND_HEADER = 'X-Goog-Upload-Command' UPLOAD_OFFSET_HEADER = 'X-Goog-Upload-Offset' BYTES_RECEIVED_HEADER = 'X-Goog-Upload-Size-Received' UPLOAD_URL_HEADER = 'X-Goog-Upload-URL' UPLOAD_STATUS_HEADER = 'X-Goog-Upload-Status' STATUS_ACTIVE = 'active' STATUS_FINAL = 'final' STATUS_CANCELLED = 'cancelled' RESUMABLE = 'resumable' START_COMMAND = 'start' QUERY_COMMAND = 'query' UPLOAD_COMMAND = 'upload, finalize' # Reset upload to initial state. # # @return [void] # @raise [Google::Apis::ClientError] if upload source is invalid def prepare! @state = :start @upload_url = nil @offset = 0 super end # Check the to see if the upload is complete or needs to be resumed. # # @param [Fixnum] status # HTTP status code of response # @param [Hurley::Header] header # Response headers # @param [String, #read] body # Response body # @return [Object] # Response object # @raise [Google::Apis::ServerError] An error occurred on the server and the request can be retried # @raise [Google::Apis::ClientError] The request is invalid and should not be retried without modification # @raise [Google::Apis::AuthorizationError] Authorization is required def process_response(status, header, body) @offset = Integer(header[BYTES_RECEIVED_HEADER]) if header.key?(BYTES_RECEIVED_HEADER) @upload_url = header[UPLOAD_URL_HEADER] if header.key?(UPLOAD_URL_HEADER) upload_status = header[UPLOAD_STATUS_HEADER] logger.debug { sprintf('Upload status %s', upload_status) } if upload_status == STATUS_ACTIVE @state = :active elsif upload_status == STATUS_FINAL @state = :final elsif upload_status == STATUS_CANCELLED @state = :cancelled fail Google::Apis::ClientError, body end super(status, header, body) end # Send the start command to initiate the upload # # @param [Hurley::Client] client # HTTP client # @return [Hurley::Response] # @raise [Google::Apis::ServerError] Unable to send the request def send_start_command(client) logger.debug { sprintf('Sending upload start command to %s', url) } client.send(method, url, body) do |req| apply_request_options(req) req.header[UPLOAD_PROTOCOL_HEADER] = RESUMABLE req.header[UPLOAD_COMMAND_HEADER] = START_COMMAND req.header[UPLOAD_CONTENT_LENGTH] = upload_io.length.to_s req.header[UPLOAD_CONTENT_TYPE_HEADER] = upload_io.content_type end rescue => e raise Google::Apis::ServerError, e.message end # Query for the status of an incomplete upload # # @param [Hurley::Client] client # HTTP client # @return [Hurley::Response] # @raise [Google::Apis::ServerError] Unable to send the request def send_query_command(client) logger.debug { sprintf('Sending upload query command to %s', @upload_url) } client.post(@upload_url, nil) do |req| apply_request_options(req) req.header[UPLOAD_COMMAND_HEADER] = QUERY_COMMAND end end # Send the actual content # # @param [Hurley::Client] client # HTTP client # @return [Hurley::Response] # @raise [Google::Apis::ServerError] Unable to send the request def send_upload_command(client) logger.debug { sprintf('Sending upload command to %s', @upload_url) } content = upload_io content.pos = @offset client.post(@upload_url, content) do |req| apply_request_options(req) req.header[UPLOAD_COMMAND_HEADER] = UPLOAD_COMMAND req.header[UPLOAD_OFFSET_HEADER] = @offset.to_s end end # Execute the upload request once. This will typically perform two HTTP requests -- one to initiate or query # for the status of the upload, the second to send the (remaining) content. # # @private # @param [Hurley::Client] client # HTTP client # @yield [result, err] Result or error if block supplied # @return [Object] # @raise [Google::Apis::ServerError] An error occurred on the server and the request can be retried # @raise [Google::Apis::ClientError] The request is invalid and should not be retried without modification # @raise [Google::Apis::AuthorizationError] Authorization is required def execute_once(client, &block) case @state when :start response = send_start_command(client) result = process_response(response.status_code, response.header, response.body) when :active response = send_query_command(client) result = process_response(response.status_code, response.header, response.body) when :cancelled, :final error(@last_error, rethrow: true, &block) end if @state == :active response = send_upload_command(client) result = process_response(response.status_code, response.header, response.body) end success(result, &block) if @state == :final rescue => e # Some APIs like Youtube generate non-retriable 401 errors and mark # the upload as finalized. Save the error just in case we get # retried. @last_error = e error(e, rethrow: true, &block) end end end end end