HTML5技术

C#码农的大数据之路 - 使用C#编写MR作业 - hystar(5)

字号+ 作者:H5之家 来源:H5之家 2017-06-08 08:00 我要评论( )

几个注意的地方,SSH Host的地址不同于集群门户页面地址,其中多了个-ssh,默认ssh用户用户名为sshuser(在创建集群时可以更改)。第二图可以不设置,这样设置可以让我们的BitviseSSH功能保持单一,就是进行转发。

几个注意的地方,SSH Host的地址不同于集群门户页面地址,其中多了个-ssh,默认ssh用户用户名为sshuser(在创建集群时可以更改)。第二图可以不设置,这样设置可以让我们的BitviseSSH功能保持单一,就是进行转发。第三图最终要的就是端口号。链接成功后,我们可以在Firefox中使用FoxProxy等类似代理软件配置转发。
配置好浏览器转发后,我们打开之前记录的ip的8080端口。由于我们通过SSH建立了隧道,所以这个ip会被正确的路由。

注意:某些情况下,使用headnode0无法访问,可以尝试使用headnode1。

如果一切正常,会再次弹出Ambari的登陆框,登陆后,在这个新的Ambari登陆中的HDFS或YARN界面中的Quick Links菜单中就可以找到我们需要的那些管理网站入口。如图:

HDP2.7版本之后的HDInsight集群,ResourceMananger等管理页面貌似可以直接访问而不再需要上面的限制。

下面大致来介绍如果在基于Linux的集群上运行基于.NET Core的App。我们还是使用上一部分的Map和Reduce程序的代码。为了使用.NET Core的Framework,需要进行一些小小的改造。
新建两个.NET Core控制台应用程序,分别命名为NetCoreMapper与NetCoreReducer。其各自的Main方法如下:

namespace NetCoreMapper { public class Program { public static void Main(string[] args) { if (args.Length > 0) { Stream stream = new FileStream(args[0], FileMode.Open, FileAccess.Read, FileShare.Read); Console.SetIn(new StreamReader(stream)); } string line; while ((line = Console.ReadLine()) != null) { Console.WriteLine(line); } } } } namespace NetCoreReducer { public class Program { static void Main(string[] args) { ILoggerFactory loggerFactory = new LoggerFactory() .AddConsole() .AddDebug(); ILogger logger = loggerFactory.CreateLogger<Program>(); logger.LogInformation( "This is a test of the emergency broadcast system."); string line; var count = 0; if (args.Length > 0) { Console.SetIn(new StreamReader(new FileStream(args[0], FileMode.Open, FileAccess.Read))); } while ((line = Console.ReadLine()) != null) { count += line.Count(cr => (cr == ' ' || cr == '\n')); } Console.WriteLine(count); } } }

使用命令分别生成项目:

dotnet publish --framework netcoreapp1.0 --configuration release --output publish

把两个项目的输出放到一个文件夹中,按上面的配置,应该最终得到8个文件,其中两个pdb文件可以安全删除。

然后,我们将这些文件上传到Azure Blob Storage与HDInsight集群对应的容器中的/example/coreapp目录下(如果没有这个目录请先新建,如果不使用这个目录,在下文的提交任务的代码中要把程序路径换成相应的路径)

接下来需要修改下提交任务的代码:

private static void SubmitMRJob() { var paras = new MapReduceStreamingJobSubmissionParameters { Files = new List<string>() { "/example/coreapp", }, Mapper = "dotnet coreapp/NetCoreMapper.dll", Reducer = "dotnet coreapp/NetCoreReducer.dll", Input = "/example/data/gutenberg/davinci.txt", Output = "/example/data/StreamingOutput/wc.txt" }; Console.WriteLine("Submitting the MR job to the cluster..."); var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceStreamingJob(paras); var jobId = jobResponse.JobSubmissionJsonResponse.Id; Console.WriteLine("Response status code is " + jobResponse.StatusCode); Console.WriteLine("JobId is " + jobId); Console.WriteLine("Waiting for the job completion ..."); // Wait for job completion var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; while (!jobDetail.Status.JobComplete) { Thread.Sleep(1000); jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail; } // Get job output var storageAccess = new AzureStorageAccess(DefaultStorageAccountName, DefaultStorageAccountKey, DefaultStorageContainerName); var output = (jobDetail.ExitValue == 0) ? _hdiJobManagementClient.JobManagement.GetJobOutput(jobId, storageAccess) // fetch stdout output in case of success : _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); // fetch stderr output in case of failure Console.WriteLine("Job output is: "); using (var reader = new StreamReader(output, Encoding.UTF8)) { string value = reader.ReadToEnd(); Console.WriteLine(value); } }

 

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

相关文章
  • 数据库表设计,没有最好只有最适合(邻接表、路径枚举、嵌套集、闭包表) - 小小情意

    数据库表设计,没有最好只有最适合(邻接表、路径枚举、嵌套集、闭包

    2017-06-08 08:00

  • HTML5笔记3——Web Storage和本地数据库 - 邹琼俊

    HTML5笔记3——Web Storage和本地数据库 - 邹琼俊

    2017-06-07 16:00

  • Redis中的数据结构与常用命令 - 雪飞鸿

    Redis中的数据结构与常用命令 - 雪飞鸿

    2017-06-04 11:03

  • 每天4亿行SQLite订单大数据测试(源码) - 大石头

    每天4亿行SQLite订单大数据测试(源码) - 大石头

    2017-06-02 13:01

网友点评
f