11package net .sansa_stack .examples .flink .inference
22
3- import java .io .File
3+ import java .io .{ File , FileInputStream }
4+ import java .net .URI
5+ import java .util .Properties
46
5- import scala .collection .mutable
7+ import scala .io .Source
8+
9+ import com .typesafe .config .ConfigFactory
10+ import org .apache .flink .api .java .utils .ParameterTool
611import org .apache .flink .api .scala .ExecutionEnvironment
12+ import org .apache .flink .configuration .Configuration
13+ import org .apache .flink .runtime .webmonitor .WebMonitorUtils
14+
15+ import net .sansa_stack .inference .flink .data .{ RDFGraphLoader , RDFGraphWriter }
16+ import net .sansa_stack .inference .flink .forwardchaining .{
17+ ForwardRuleReasonerOWLHorst ,
18+ ForwardRuleReasonerRDFS
19+ }
720import net .sansa_stack .inference .rules .ReasoningProfile ._
8- import net .sansa_stack .inference .flink .data .RDFGraphLoader
9- import net .sansa_stack .inference .flink .forwardchaining .{ForwardRuleReasonerOWLHorst , ForwardRuleReasonerRDFS }
10- import net .sansa_stack .inference .rules .ReasoningProfile
11- import net .sansa_stack .inference .flink .data .RDFGraphWriter
21+ import net .sansa_stack .inference .rules .{ RDFSLevel , ReasoningProfile }
1222
1323object RDFGraphInference {
1424
15- def main (args : Array [String ]) {
16- if (args.length < 3 ) {
17- System .err.println(
18- " Usage: RDFGraphInference <input> <output> <reasoner" )
19- System .err.println(" Supported 'reasoner' as follows:" )
20- System .err.println(" rdfs Forward Rule Reasoner RDFS" )
21- System .err.println(" owl-horst Forward Rule Reasoner OWL Horst" )
22- System .exit(1 )
25+ def main (args : Array [String ]) {
26+ parser.parse(args, Config ()) match {
27+ case Some (config) =>
28+ run(
29+ args,
30+ config.in,
31+ config.out,
32+ config.profile,
33+ config.writeToSingleFile,
34+ config.sortedOutput,
35+ config.propertiesFile,
36+ config.jobName)
37+ case None =>
38+ println(parser.usage)
2339 }
24- val input = args(0 )
25- val output = args(1 )
26- val argprofile = args(2 )
40+ }
2741
28- val profile = argprofile match {
29- case " rdfs" => ReasoningProfile .RDFS
30- case " owl-horst" => ReasoningProfile .OWL_HORST
42+ def run (
43+ args : Array [String ],
44+ input : Seq [URI ],
45+ output : URI ,
46+ profile : ReasoningProfile ,
47+ writeToSingleFile : Boolean ,
48+ sortedOutput : Boolean ,
49+ propertiesFile : File ,
50+ jobName : String ): Unit = {
3151
32- }
33- val optionsList = args.drop(3 ).map { arg =>
34- arg.dropWhile(_ == '-' ).split('=' ) match {
35- case Array (opt, v) => (opt -> v)
36- case _ => throw new IllegalArgumentException (" Invalid argument: " + arg)
37- }
38- }
39- val options = mutable.Map (optionsList : _* )
52+ // read reasoner optimization properties
53+ val reasonerConf =
54+ if (propertiesFile != null ) ConfigFactory .parseFile(propertiesFile)
55+ else ConfigFactory .load(" reasoner" )
56+
57+ // get params
58+ val params : ParameterTool = ParameterTool .fromArgs(args)
4059
41- options.foreach {
42- case (opt, _) => throw new IllegalArgumentException (" Invalid option: " + opt)
43- }
4460 println(" ======================================" )
4561 println(" | RDF Graph Inference |" )
4662 println(" ======================================" )
4763
64+ val conf = new Configuration ()
65+ conf.setInteger(" taskmanager.network.numberOfBuffers" , 3000 )
66+
67+ // set up the execution environment
4868 val env = ExecutionEnvironment .getExecutionEnvironment
69+ env.getConfig.disableSysoutLogging()
70+
71+ // make parameters available in the web interface
72+ env.getConfig.setGlobalJobParameters(params)
4973
5074 // load triples from disk
51- val graph = RDFGraphLoader .loadFromFile( new File ( input).getAbsolutePath , env)
52- println(s " |G|= ${graph.size()}" )
75+ val graph = RDFGraphLoader .loadFromDisk( input, env)
76+ println(s " |G| = ${graph.size()}" )
5377
5478 // create reasoner
5579 val reasoner = profile match {
56- case RDFS => new ForwardRuleReasonerRDFS (env)
80+ case RDFS | RDFS_SIMPLE =>
81+ val r = new ForwardRuleReasonerRDFS (env)
82+ r.useSchemaBroadCasting = reasonerConf.getBoolean(" reasoner.rdfs.schema.broadcast" )
83+ r.extractSchemaTriplesInAdvance =
84+ reasonerConf.getBoolean(" reasoner.rdfs.schema.extractTriplesInAdvance" )
85+ if (profile == RDFS_SIMPLE ) r.level = RDFSLevel .SIMPLE
86+ r
5787 case OWL_HORST => new ForwardRuleReasonerOWLHorst (env)
5888 }
5989
6090 // compute inferred graph
6191 val inferredGraph = reasoner.apply(graph)
62- println(s " |G_inferred|= ${inferredGraph.size()}" )
92+ println(s " |G_inf| = ${inferredGraph.size()}" )
93+
94+ val jn = if (jobName.isEmpty) s " RDF Graph Inference ( $profile) " else jobName
95+
96+ // run the program
97+ env.execute(jn)
98+ }
99+
100+ // the config object
101+ case class Config (
102+ in : Seq [URI ] = Seq (),
103+ out : URI = new URI (" ." ),
104+ profile : ReasoningProfile = ReasoningProfile .RDFS ,
105+ writeToSingleFile : Boolean = false ,
106+ sortedOutput : Boolean = false ,
107+ propertiesFile : File = null ,
108+ jobName : String = " " ) // new File(getClass.getResource("reasoner.properties").toURI)
109+
110+ // read ReasoningProfile enum
111+ implicit val profilesRead : scopt.Read [ReasoningProfile .Value ] =
112+ scopt.Read .reads(ReasoningProfile forName _.toLowerCase())
113+
114+ // the CLI parser
115+ val parser = new scopt.OptionParser [Config ](" RDFGraphMaterializer" ) {
116+ head(" RDFGraphMaterializer" , " 0.1.0" )
117+
118+ opt[Seq [URI ]]('i' , " input" )
119+ .required()
120+ .valueName(" <path>" )
121+ .action((x, c) => c.copy(in = x))
122+ .text(" path to file or directory that contains the input files (in N-Triple format)" )
123+
124+ opt[URI ]('o' , " out" )
125+ .required()
126+ .valueName(" <directory>" )
127+ .action((x, c) => c.copy(out = x))
128+ .text(" the output directory" )
129+
130+ opt[Unit ](" single-file" )
131+ .optional()
132+ .action((_, c) => c.copy(writeToSingleFile = true ))
133+ .text(" write the output to a single file in the output directory" )
134+
135+ opt[Unit ](" sorted" )
136+ .optional()
137+ .action((_, c) => c.copy(sortedOutput = true ))
138+ .text(" sorted output of the triples (per file)" )
139+
140+ opt[ReasoningProfile ]('p' , " profile" )
141+ .required()
142+ .valueName(" {rdfs | rdfs-simple | owl-horst}" )
143+ .action((x, c) => c.copy(profile = x))
144+ .text(" the reasoning profile" )
145+
146+ opt[File ]('p' , " prop" )
147+ .optional()
148+ .valueName(" <path_to_properties_file>" )
149+ .action((x, c) => c.copy(propertiesFile = x))
150+ .text(" the (optional) properties file which allows some more advanced options" )
63151
64- // write triples to disk
65- RDFGraphWriter .writeToFile(inferredGraph, new File (output).getAbsolutePath)
152+ opt[String ]('j' , " jobName" )
153+ .optional()
154+ .valueName(" <name_of_the_Flink_job>" )
155+ .action((x, c) => c.copy(jobName = x))
156+ .text(" the name of the Flink job that occurs also in the Web-UI" )
66157
67- env.execute( s " RDF Graph Inference ( $profile ) " )
158+ help( " help " ).text( " prints this usage text " )
68159
69160 }
161+ parser.showUsageOnError
70162}
0 commit comments